Skip to content

API Docs for conflator

conflator.py

Bases: object

Parameters:

Name Type Description Default
uri str

URI for the primary database

None
boundary str

Boundary to limit SQL queries

None

Returns:

Type Description
Conflator

An instance of this object

Source code in osm_merge/conflator.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def __init__(self,
             uri: str = None,
             boundary: str = None
             ):
    """
    Initialize Input data sources.

    Args:
        uri (str): URI for the primary database
        boundary (str, optional): Boundary to limit SQL queries

    Returns:
        (Conflator): An instance of this object
    """
    self.postgres = list()
    self.tags = dict()
    self.boundary = boundary
    self.dburi = uri
    self.primary = None
    if boundary:
        infile = open(boundary, 'r')
        self.boundary = geojson.load(infile)
        infile.close()
    # Distance in meters for conflating with postgis
    self.tolerance = 7
    self.data = dict()
    self.analyze = ("building", "name", "amenity", "landuse", "cuisine", "tourism", "leisure")

getDistance

getDistance(newdata, olddata)

Compute the distance between two features in meters

Parameters:

Name Type Description Default
newdata Feature

A feature from the external dataset

required
olddata Feature

A feature from the existing OSM dataset

required

Returns:

Type Description
float

The distance between the two features

Source code in osm_merge/conflator.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def getDistance(self,
        newdata: Feature,
        olddata: Feature,
        ) -> float:
    """
    Compute the distance between two features in meters

    Args:
        newdata (Feature): A feature from the external dataset
        olddata (Feature): A feature from the existing OSM dataset

    Returns:
        (float): The distance between the two features
    """
    # timer = Timer(text="getDistance() took {seconds:.0f}s")
    # timer.start()
    # dist = shapely.hausdorff_distance(center, wkt)
    dist = float()

    # Transform so the results are in meters instead of degress of the
    # earth's radius.
    project = pyproj.Transformer.from_proj(
        pyproj.Proj(init='epsg:4326'),
        pyproj.Proj(init='epsg:32633')
        )
    newobj = transform(project.transform, shape(newdata["geometry"]))
    oldobj = transform(project.transform, shape(olddata["geometry"]))

    if newobj.type == "MultiLineString":
        lines = newobj.geoms
    elif newobj.type == "GeometryCollection":
        lines = newobj.geoms
    else:
        lines = MultiLineString([newobj]).geoms

    # dists = list()
    best = None
    for segment in lines:
        if oldobj.geom_type == "LineString" and segment.geom_type == "LineString":
            # Compare two highways
            if oldobj.within(segment):
                log.debug(f"CONTAINS")
            dist = segment.distance(oldobj)
        elif oldobj.geom_type == "Point" and segment.geom_type == "LineString":
            # We only want to compare LineStrings, so force the distance check
            # to be False
            dist = 12345678.9
        elif oldobj.geom_type == "Point" and segment.geom_type == "Point":
            dist = segment.distance(oldobj)
        elif oldobj.geom_type == "Polygon" and segment.geom_type == "Polygon":
            # compare two buildings
            pass
        elif oldobj.geom_type == "Polygon" and segment.geom_type == "Point":
            # Compare a point with a building, used for ODK Collect data
            center = shapely.centroid(oldobj)
            dist = segment.distance(center)
        elif oldobj.geom_type == "Point" and segment.geom_type == "LineString":
            dist = segment.distance(oldobj)
        elif oldobj.geom_type == "LineString" and segment.geom_type == "Point":
            dist = segment.distance(oldobj)

        # Find the closest segment
        if best is None:
            best = dist
        elif dist < best:
            # log.debug(f"BEST: {best} < {dist}")
            best = dist

    # timer.stop()
    return best # dist # best

checkTags

checkTags(extfeat, osm)

Check tags between 2 features.

Parameters:

Name Type Description Default
extfeat Feature

The feature from the external dataset

required
osm Feature

The result of the SQL query

required

Returns:

Type Description
int

The number of tag matches

dict

The updated tags

Source code in osm_merge/conflator.py
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def checkTags(self,
              extfeat: Feature,
              osm: Feature,
               ):
    """
    Check tags between 2 features.

    Args:
        extfeat (Feature): The feature from the external dataset
        osm (Feature): The result of the SQL query

    Returns:
        (int): The number of tag matches
        (dict): The updated tags
    """
    match_threshold = 85
    match = ["name", "ref", "ref:usfs"]
    hits = 0
    props = dict()
    id = 0
    version = 0
    props = extfeat['properties'] | osm['properties']
    # ODK Collect adds these two tags we don't need.
    if "title" in props:
        del props["title"]
    if "label" in props:
        del props["label"]

    if "id" in props:
        # External data not from an OSM source always has
        # negative IDs to distinguish it from current OSM data.
        id = int(props["id"])
    else:
        id -= 1
        props["id"] = id

    if "version" in props:
        # Always use the OSM version if it exists, since it gets
        # incremented so JOSM see it's been modified.
        props["version"] = int(version)
        # Name may also be name:en, name:np, etc... There may also be
        # multiple name:* values in the tags.
    else:
        props["version"] = 1

    for key in match:
        if "highway" in osm["properties"]:
            # Always use the value in the secondary, which is
            # likely OSM.
            props["highway"] = osm["properties"]["highway"]
        if key not in props:
            continue

        # Usually it's the name field that has the most variety in
        # in trying to match strings. This often is differences in
        # capitalization, singular vs plural, and typos from using
        # your phone to enter the name. Course names also change
        # too so if it isn't a match, use the new name from the
        # external dataset.
        if key in osm["properties"] and key in extfeat["properties"]:
            # Sometimes there will be a word match, which returns a
            # ratio in the low 80s. In that case they should be
            # a similar length.
            length = len(extfeat["properties"][key]) - len(osm["properties"][key])
            ratio = fuzz.ratio(extfeat["properties"][key].lower(), osm["properties"][key].lower())
            if ratio > match_threshold and length <= 3:
                hits += 1
                props["ratio"] = ratio
                props[key] = extfeat["properties"][key]
                if ratio != 100:
                    # Often the only difference is using FR or FS as the
                    # prefix. In that case, see if the ref matches.
                    if key[:3] == "ref":
                        # This assume all the data has been converted
                        # by one of the utility programs, which enfore
                        # using the ref:usfs tag.
                        tmp = extfeat["properties"]["ref:usfs"].split(' ')
                        extref = tmp[1].upper()
                        tmp = osm["properties"]["ref:usfs"].split(' ')
                        newref = tmp[1].upper()
                        # log.debug(f"REFS: {extref} vs {newref}: {extref == newref}")
                        if extref == newref:
                            hits += 1
                            # Many minor changes of FS to FR don't require
                            # caching the exising value as it's only the
                            # prefix that changed. It always stayes in this
                            # range.
                            if osm["properties"]["ref:usfs"][:3] == "FS " and ratio > 80 and ratio < 90:
                                log.debug(f"Ignoring old ref {osm["properties"]["ref:usfs"]}")
                                continue
                    # For a fuzzy match, cache the value from the
                    # secondary dataset and use the value in the
                    # primary dataset.
                    props[f"old_{key}"] = osm["properties"][key]

    # print(props)
    return hits, props

loadFile

loadFile(osmfile)

Read a OSM XML file and convert it to GeoJson for consistency.

Parameters:

Name Type Description Default
osmfile str

The OSM XML file to load

required

Returns:

Type Description
list

The entries in the OSM XML file

Source code in osm_merge/conflator.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
def loadFile(
    self,
    osmfile: str,
) -> list:
    """
    Read a OSM XML file and convert it to GeoJson for consistency.

    Args:
        osmfile (str): The OSM XML file to load

    Returns:
        (list): The entries in the OSM XML file
    """
    alldata = list()
    size = os.path.getsize(osmfile)
    with open(osmfile, "r") as file:
        xml = file.read(size)
        doc = xmltodict.parse(xml)
        if "osm" not in doc:
            logging.warning("No data in this instance")
            return False
        data = doc["osm"]
        if "node" not in data:
            logging.warning("No nodes in this instance")
            return False

    nodes = dict()
    for node in data["node"]:
        properties = {
            "id": int(node["@id"]),
        }
        if "@version" not in node:
            properties["version"] = 1
        else:
            properties["version"] = node["@version"]

        if "@timestamp" in node:
            properties["timestamp"] = node["@timestamp"]

        if "tag" in node:
            for tag in node["tag"]:
                if type(tag) == dict:
                    # Drop all the TIGER tags based on
                    # https://wiki.openstreetmap.org/wiki/TIGER_fixup
                    if tag["@k"] in properties:
                        if properties[tag["@k"]][:7] == "tiger:":
                            continue
                    properties[tag["@k"]] = tag["@v"].strip()
                    # continue
                else:
                    properties[node["tag"]["@k"]] = node["tag"]["@v"].strip()
                # continue
        geom = Point((float(node["@lon"]), float(node["@lat"])))
        # cache the nodes so we can dereference the refs into
        # coordinates, but we don't need them in GeoJson format.
        nodes[properties["id"]] = geom
        if len(properties) > 2:
            alldata.append(Feature(geometry=geom, properties=properties))

    for way in data["way"]:
        attrs = dict()
        properties = {
            "id": int(way["@id"]),
        }
        refs = list()
        if "nd" in way:
            if len(way["nd"]) > 0:
                for ref in way["nd"]:
                    refs.append(int(ref["@ref"]))
            properties["refs"] = refs

        if "@version" not in node:
            properties["version"] = 1
        else:
            properties["version"] = node["@version"]

        if "@timestamp" in node:
            attrs["timestamp"] = node["@timestamp"]

        if "tag" in way:
            for tag in way["tag"]:
                if type(tag) == dict:
                    properties[tag["@k"]] = tag["@v"].strip()
                    # continue
                else:
                    properties[way["tag"]["@k"]] = way["tag"]["@v"].strip()
                # continue
        # geom =
        tmp = list()
        for ref in refs:
            tmp.append(nodes[ref]['coordinates'])
        geom = LineString(tmp)
        if geom is None:
            breakpoint()
        alldata.append(Feature(geometry=geom, properties=properties))

    return alldata

initInputDB async

initInputDB(config=None, dburi=None)

When async, we can't initialize the async database connection, so it has to be done as an extrat step.

Parameters:

Name Type Description Default
dburi str

The database URI

None
config str

The config file from the osm-rawdata project

None

Returns: (bool): Whether it initialiized

Source code in osm_merge/conflator.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
async def initInputDB(self,
                    config: str = None,
                    dburi: str = None,
                    ) -> bool:
    """
    When async, we can't initialize the async database connection,
    so it has to be done as an extrat step.

    Args:
        dburi (str, optional): The database URI
        config (str, optional): The config file from the osm-rawdata project
    Returns:
        (bool): Whether it initialiized
    """
    db = GeoSupport(dburi, config)
    await db.initialize()
    self.postgres.append(db)

    return True

initOutputDB async

initOutputDB(dburi=None)

When async, we can't initialize the async database connection, so it has to be done as an extrat step.

Parameters:

Name Type Description Default
dburi str

The database URI

None
config str

The config file from the osm-rawdata project

required
Source code in osm_merge/conflator.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
async def initOutputDB(self,
                    dburi: str = None,
                    ):
    """
    When async, we can't initialize the async database connection,
    so it has to be done as an extrat step.

    Args:
        dburi (str, optional): The database URI
        config (str, optional): The config file from the osm-rawdata project
    """
    if dburi:
        self.dburi = dburi
        await self.createDBThreads(dburi, config)
    elif self.dburi:
        await self.createDBThreads(self.dburi, config)

createDBThreads async

createDBThreads(uri=None, config=None, execs=cores)

Create threads for writting to the primary datatbase to avoid problems with corrupting data.

Parameters:

Name Type Description Default
uri str

URI for the primary database

None
config str

The config file from the osm-rawdata project

None
threads int

The number of threads to create

required

Returns:

Type Description
bool

Whether the threads were created sucessfully

Source code in osm_merge/conflator.py
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
async def createDBThreads(self,
                    uri: str = None,
                    config: str = None,
                    execs: int = cores,
                    ) -> bool:
    """
    Create threads for writting to the primary datatbase to avoid
    problems with corrupting data.

    Args:
        uri (str): URI for the primary database
        config (str, optional): The config file from the osm-rawdata project
        threads (int, optional): The number of threads to create

    Returns:
        (bool): Whether the threads were created sucessfully
    """
    # Each thread needs it's own connection to postgres to avoid problems
    # when inserting or updating the primary database.
    if uri:
        for thread in range(0, execs + 1):
            db = GeoSupport(uri)
            await db.initialize(uri, config)
            if not db:
                return False
            self.postgres.append(db)
        if self.boundary:
            if 'features' in self.boundary:
                poly = self.boundary["features"][0]["geometry"]
            else:
                poly = shape(self.boundary['geometry'])

            # FIXME: we only need to clip once to create the view, this is not
            # confirmed yet.
            await db.clipDB(poly, self.postgres[0])

        return True

conflateData async

conflateData(odkspec, osmspec, threshold=3.0)

Open the two source files and contlate them.

Parameters:

Name Type Description Default
odkspec str

The external data uri

required
osmspec str

The existing OSM data uri

required
threshold float

Threshold for distance calculations in meters

3.0

Returns:

Type Description
list

The conflated output

Source code in osm_merge/conflator.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
async def conflateData(self,
                odkspec: str,
                osmspec: str,
                threshold: float = 3.0,
                ) -> list:
    """
    Open the two source files and contlate them.

    Args:
        odkspec (str): The external data uri
        osmspec (str): The existing OSM data uri
        threshold (float): Threshold for distance calculations in meters

    Returns:
        (list):  The conflated output
    """
    timer = Timer(text="conflateData() took {seconds:.0f}s")
    timer.start()
    odkdata = list()
    osmdata = list()

    result = list()
    if odkspec[:3].lower() == "pg:":
        db = GeoSupport(odkspec[3:])
        result = await db.queryDB()
    else:
        odkdata = self.parseFile(odkspec)

    if osmspec[:3].lower() == "pg:":
        db = GeoSupport(osmspec[3:])
        result = await db.queryDB()
    else:
        osmdata = self.parseFile(osmspec)

    entries = len(odkdata)
    chunk = round(entries / cores)

    alldata = list()
    tasks = list()

    log.info(f"The primary dataset has {len(odkdata)} entries")
    log.info(f"The secondary dataset has {len(osmdata)} entries")

    # Make threading optional for easier debugging
    single = False

    if single:
        alldata = conflateThread(odkdata, osmdata)
    else:
        futures = list()
        with concurrent.futures.ProcessPoolExecutor(max_workers=cores) as executor:
            for block in range(0, entries, chunk):
                future = executor.submit(conflateThread,
                        odkdata[block:block + chunk - 1],
                        osmdata
                        )
                futures.append(future)
            #for thread in concurrent.futures.wait(futures, return_when='ALL_COMPLETED'):
            for future in concurrent.futures.as_completed(futures):
                log.debug(f"Waiting for thread to complete..")
                alldata += future.result()

        executor.shutdown()

    timer.stop()

    return alldata

dump

dump()

Dump internal data for debugging.

Source code in osm_merge/conflator.py
786
787
788
789
790
791
def dump(self):
    """
    Dump internal data for debugging.
    """
    print(f"Data source is: {self.dburi}")
    print(f"There are {len(self.data)} existing features")

parseFile

parseFile(filespec)

Parse the input file based on it's format.

Parameters:

Name Type Description Default
filespec str

The file to parse

required

Returns:

Type Description
list

The parsed data from the file

Source code in osm_merge/conflator.py
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
def parseFile(self,
            filespec: str,
            ) ->list:
    """
    Parse the input file based on it's format.

    Args:
        filespec (str): The file to parse

    Returns:
        (list): The parsed data from the file
    """
    odkpath = Path(filespec)
    odkdata = list()
    if odkpath.suffix == '.geojson':
        # FIXME: This should also work for any GeoJson file, not
        # only ODK ones, but this has yet to be tested.
        log.debug(f"Parsing GeoJson files {odkpath}")
        odkfile = open(odkpath, 'r')
        features = geojson.load(odkfile)
        odkdata = features['features']
    elif odkpath.suffix == '.osm':
        log.debug(f"Parsing OSM XML files {odkpath}")
        osmfile = OsmFile()
        odkdata = self.loadFile(odkpath)
    elif odkpath.suffix == ".csv":
        log.debug(f"Parsing csv files {odkpath}")
        odk = ODKParsers()
        for entry in odk.CSVparser(odkpath):
            odkdata.append(odk.createEntry(entry))
    elif odkpath.suffix == ".json":
        log.debug(f"Parsing json files {odkpath}")
        odk = ODKParsers()
        for entry in odk.JSONparser(odkpath):
            odkdata.append(odk.createEntry(entry))
    return odkdata

conflateDB

conflateDB(source)

Conflate all the data. This the primary interfacte for conflation.

Parameters:

Name Type Description Default
source str

The source file to conflate

required

Returns:

Type Description
dict

The conflated features

Source code in osm_merge/conflator.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
def conflateDB(self,
                 source: str,
                 ) -> dict:
    """
    Conflate all the data. This the primary interfacte for conflation.

    Args:
        source (str): The source file to conflate

    Returns:
        (dict):  The conflated features
    """
    timer = Timer(text="conflateData() took {seconds:.0f}s")
    timer.start()

    log.info("Opening data file: %s" % source)
    toplevel = Path(source)
    if toplevel.suffix == ".geosjon":
        src = open(source, "r")
        self.data = geojson.load(src)
    elif toplevel.suffix == ".osm":
        src = open(source, "r")
        osmin = OsmFile()
        self.data = osmin.loadFile(source) # input file
        if self.boundary:
            gs = GeoSupport(source)
            # self.data = gs.clipFile(self.data)

    # Use fuzzy string matching to handle minor issues in the name column,
    # which is often used to match an amenity.
    if len(self.data) == 0:
        self.postgres[0].query("CREATE EXTENSION IF NOT EXISTS fuzzystrmatch")
    # log.debug(f"OdkMerge::conflateData() called! {len(odkdata)} features")

    # A chunk is a group of threads
    chunk = round(len(self.data) / cores)

    # cycle = range(0, len(odkdata), chunk)

    # Chop the data into a subset for each thread
    newdata = list()
    future = None
    result = None
    index = 0
    if True:                # DEBUGGING HACK ALERT!
        result = conflateThread(self.data, self, index)
        return dict()

    with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
        i = 0
        subset = dict()
        futures = list()
        for key, value in self.data.items():
            subset[key] = value
            if i == chunk:
                i = 0
                result = executor.submit(conflateThread, subset, self, index)
                index += 1
                # result.add_done_callback(callback)
                futures.append(result)
                subset = dict()
            i += 1
        for future in concurrent.futures.as_completed(futures):
        # # for future in concurrent.futures.wait(futures, return_when='ALL_COMPLETED'):
            log.debug(f"Waiting for thread to complete..")
            # print(f"YYEESS!! {future.result(timeout=10)}")
            newdata.append(future.result(timeout=5))
    timer.stop()
    return newdata

writeOSM

writeOSM(data, filespec)

Write the data to an OSM XML file.

Parameters:

Name Type Description Default
data list

The list of GeoJson features

required
filespec str

The output file name

required
Source code in osm_merge/conflator.py
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
    def writeOSM(self,
                 data: list,
                 filespec: str,
                 ):
        """
        Write the data to an OSM XML file.

        Args:
            data (list): The list of GeoJson features
            filespec (str): The output file name
        """
        osm = OsmFile(filespec)
        negid = -100
        id = -1
        out = str()
        for entry in data:
            version = 1
            tags = entry["properties"]
            if "osm_id" in tags:
                id = tags["osm_id"]
            elif "id" in tags:
                id = tags["id"]
            elif "id" not in tags:
                # There is no id or version for non OSM features
                id -= 1
            if "version" in entry["properties"]:
                version = int(entry["properties"]["version"])
                version += 1
            attrs = {"id": id, "version": version}
            # These are OSM attributes, not tags
            if "id" in tags:
                del tags["id"]
            if "version" in tags:
                del tags["version"]
            item = {"attrs": attrs, "tags": tags}
            # if entry["geometry"]["type"] == "LineString" or entry["geometry"]["type"] == "Polygon":
            # print(entry)
            out = str()
            if 'refs' in tags and "lat" not in attrs:
            # if "geometry" not in entry:
                # OSM ways don't have a geometry, just references to node IDs.
                # The OSM XML file won't have any nodes, so at first won't
                # display in JOSM until you do a File->"Update modified",
                if len(tags['refs']) > 0:
                    if type(tags["refs"]) != list:
                        item["refs"] = eval(tags["refs"])
                    else:
                        item["refs"] = tags["refs"]
                    del tags["refs"]
                    out = osm.createWay(item, True)

#            elif "geometry" in entry and entry["geometry"] is not None:
#                out = osm.createNode(item, True)
            if len(out) > 0:
                osm.write(out)

writeGeoJson

writeGeoJson(data, filespec)

Write the data to a GeoJson file.

Parameters:

Name Type Description Default
data dict

The list of GeoJson features

required
filespec str

The output file name

required
Source code in osm_merge/conflator.py
960
961
962
963
964
965
966
967
968
969
970
971
972
973
def writeGeoJson(self,
             data: dict,
             filespec: str,
             ):
    """
    Write the data to a GeoJson file.

    Args:
        data (dict): The list of GeoJson features
        filespec (str): The output file name
    """
    file = open(filespec, "w")
    fc = FeatureCollection(data)
    geojson.dump(fc, file, indent=4)

osmToFeature

osmToFeature(osm)

Convert an entry from an OSM XML file with attrs and tags into a GeoJson Feature.

Parameters:

Name Type Description Default
osm dict

The OSM entry

required

Returns:

Type Description
Feature

A GeoJson feature

Source code in osm_merge/conflator.py
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
def osmToFeature(self,
                 osm: dict(),
                 ) -> Feature:
    """
    Convert an entry from an OSM XML file with attrs and tags into
    a GeoJson Feature.

    Args:
        osm (dict): The OSM entry

    Returns:
        (Feature): A GeoJson feature
    """
    if "attrs" not in osm:
        return Feature(geometry=shape(osm["geometry"]), properties=osm["properties"])

    if "osm_id" in osm["attrs"]:
        id = osm["attrs"]["osm_id"]
    elif "id" in osm["attrs"]:
        id = osm["attrs"]["id"]
    props = {"id": id}
    if "version" in osm["attrs"]:
        props["version"] = osm["attrs"]["version"]

    props.update(osm["tags"])
    # It's a way, so no coordinate
    if "refs" in osm:
        return Feature(properties=props)
    else:
        geom = Point((float(osm["attrs"]["lon"]), float(osm["attrs"]["lat"])))

        return Feature(geometry=geom, properties=props)

options: show_source: false heading_level: 3

conflateBuildings.py

Bases: object

Parameters:

Name Type Description Default
dburi str

The DB URI

None
boundary Polygon

The AOI of the project

None

Returns:

Type Description
ConflateDB

An instance of this object

Source code in osm_merge/conflateBuildings.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    dburi: str = None,
    boundary: Polygon = None,
):
    """This class conflates data that has been imported into a postgres
    database using the Underpass raw data schema.

    Args:
        dburi (str): The DB URI
        boundary (Polygon): The AOI of the project

    Returns:
        (ConflateDB): An instance of this object
    """
    self.postgres = list()
    self.uri = None
    if dburi:
        self.uri = uriParser(dburi)
        self.db = GeoSupport(dburi)
    self.boundary = boundary
    self.view = "ways_poly"
    self.filter = list()

addSourceFilter

addSourceFilter(source)

Add to a list of suspect bad source datasets

Source code in osm_merge/conflateBuildings.py
66
67
68
69
70
71
def addSourceFilter(
    self,
    source: str,
):
    """Add to a list of suspect bad source datasets"""
    self.filter.append(source)

overlapDB

overlapDB(dburi)

Conflate buildings where all the data is in the same postgres database using the Underpass raw data schema.

Parameters:

Name Type Description Default
dburi str

The URI for the existing OSM data

required

This is not fast for large areas!

Source code in osm_merge/conflateBuildings.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def overlapDB(
    self,
    dburi: str,
):
    """Conflate buildings where all the data is in the same postgres database
    using the Underpass raw data schema.

    Args:
        dburi (str): The URI for the existing OSM data

    This is not fast for large areas!
    """
    timer = Timer(text="conflateData() took {seconds:.0f}s")
    timer.start()
    # Find duplicate buildings in the same database
    # sql = f"DROP VIEW IF EXISTS overlap_view;CREATE VIEW overlap_view AS SELECT ST_Area(ST_INTERSECTION(g1.geom::geography, g2.geom::geography)) AS area,g1.osm_id AS id1,g1.geom as geom1,g2.osm_id AS id2,g2.geom as geom2 FROM {self.view} AS g1, {self.view} AS g2 WHERE ST_OVERLAPS(g1.geom, g2.geom) AND (g1.tags->>'building' IS NOT NULL AND g2.tags->>'building' IS NOT NULL)"
    # sql = "SELECT * FROM (SELECT ways_view.id, tags, ROW_NUMBER() OVER(PARTITION BY geom ORDER BY ways_view.geom asc) AS Row, geom FROM ONLY ways_view) dups WHERE dups.Row > 1"
    # Make a new postgres VIEW of all overlapping or touching buildings
    # log.info(f"Looking for overlapping buildings in \"{self.uri['dbname']}\", this make take awhile...")
    # print(sql)
    # Views must be dropped in the right order
    sql = (
        "DROP TABLE IF EXISTS dups_view CASCADE; DROP TABLE IF EXISTS osm_view CASCADE;DROP TABLE IF EXISTS ways_view CASCADE;"
    )
    result = self.db.queryDB(sql)

    if self.boundary:
        self.db.clipDB(self.boundary)

    log.debug("Clipping OSM database")
    ewkt = shape(self.boundary)
    uri = uriParser(dburi)
    log.debug(f"Extracting OSM subset from \"{uri['dbname']}\"")
    sql = f"CREATE TABLE osm_view AS SELECT osm_id,tags,geom FROM dblink('dbname={uri['dbname']}', 'SELECT osm_id,tags,geom FROM ways_poly') AS t1(osm_id int, tags jsonb, geom geometry) WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{ewkt}'), geom) AND tags->>'building' IS NOT NULL"
    # print(sql)
    result = self.db.queryDB(sql)

    sql = "CREATE TABLE dups_view AS SELECT ST_Area(ST_INTERSECTION(g1.geom::geography, g2.geom::geography)) AS area,g1.osm_id AS id1,g1.geom as geom1,g1.tags AS tags1,g2.osm_id AS id2,g2.geom as geom2, g2.tags AS tags2 FROM ways_view AS g1, osm_view AS g2 WHERE ST_INTERSECTS(g1.geom, g2.geom) AND g2.tags->>'building' IS NOT NULL"
    print(sql)
    result = self.db.queryDB(sql)

cleanDuplicates

cleanDuplicates()

Delete the entries from the duplicate building view.

Returns:

Type Description
FeatureCollection

The entries from the datbase table

Source code in osm_merge/conflateBuildings.py
114
115
116
117
118
119
120
121
122
123
124
def cleanDuplicates(self):
    """Delete the entries from the duplicate building view.

    Returns:
        (FeatureCollection): The entries from the datbase table
    """
    log.debug("Removing duplicate buildings from ways_view")
    sql = "DELETE FROM ways_view WHERE osm_id IN (SELECT id1 FROM dups_view)"

    result = self.db.queryDB(sql)
    return True

getNew

getNew()

Get only the new buildings

Returns:

Type Description
FeatureCollection

The entries from the datbase table

Source code in osm_merge/conflateBuildings.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def getNew(self):
    """Get only the new buildings

    Returns:
        (FeatureCollection): The entries from the datbase table
    """
    sql = "SELECT osm_id,geom,tags FROM ways_view"
    result = self.db.queryDB(sql)
    features = list()
    for item in result:
        # log.debug(item)
        entry = {"osm_id": item[0]}
        entry.update(item[2])
        geom = wkb.loads(item[1])
        features.append(Feature(geometry=geom, properties=entry))

    log.debug(f"{len(features)} new features found")
    return FeatureCollection(features)

findHighway

findHighway(feature)

Find the nearest highway to a feature

Parameters:

Name Type Description Default
feature Feature

The feature to check against

required
Source code in osm_merge/conflateBuildings.py
145
146
147
148
149
150
151
152
153
154
def findHighway(
    self,
    feature: Feature,
):
    """Find the nearest highway to a feature

    Args:
        feature (Feature): The feature to check against
    """
    pass

getDuplicates

getDuplicates()

Get the entries from the duplicate building view.

Returns:

Type Description
FeatureCollection

The entries from the datbase table

Source code in osm_merge/conflateBuildings.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def getDuplicates(self):
    """Get the entries from the duplicate building view.

    Returns:
        (FeatureCollection): The entries from the datbase table
    """
    sql = "SELECT area,id1,geom1,tags1,id2,geom2,tags2 FROM dups_view"
    result = self.db.queryDB(sql)
    features = list()
    for item in result:
        # log.debug(item)
        # First building identified
        entry = {"area": float(item[0]), "id": int(item[1])}
        geom = wkb.loads(item[2])
        entry.update(item[3])
        features.append(Feature(geometry=geom, properties=entry))

        # Second building identified
        entry = {"area": float(item[0]), "id": int(item[4])}
        entry["id"] = int(item[4])
        geom = wkb.loads(item[5])
        entry.update(item[6])
        # FIXME: Merge the tags from the buildings into the OSM feature
        # entry.update(item[3])
        features.append(Feature(geometry=geom, properties=entry))

    log.debug(f"{len(features)} duplicate features found")
    return FeatureCollection(features)

options: show_source: false heading_level: 3

conflatePOI.py

Bases: object

Parameters:

Name Type Description Default
dburi str

The DB URI

None
boundary Polygon

The AOI of the project

None
threshold int

The distance in meters for distance calculations

7

Returns:

Type Description
ConflatePOI

An instance of this object

Source code in osm_merge/conflatePOI.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(self,
             dburi: str = None,
             boundary: Polygon = None,
             threshold: int = 7,
             ):
    """
    This class conflates data that has been imported into a postgres
    database using the Underpass raw data schema.

    Args:
        dburi (str): The DB URI
        boundary (Polygon): The AOI of the project
        threshold (int): The distance in meters for distance calculations

    Returns:
        (ConflatePOI): An instance of this object
    """
    self.data = dict()
    self.db = None
    self.tolerance = threshold # Distance in meters for conflating with postgis
    self.boundary = boundary
    # Use a common select so it's consistent when parsing results
    self.select = "SELECT osm_id,tags,version,ST_AsText(geom),ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;%s\'))"
    if dburi:
        # for thread in range(0, cores + 1):
        self.db = GeoSupport(dburi)
        # self.db.append(db)
        # We only need to clip the database into a new table once
        if boundary:
            self.db.clipDB(boundary, self.db.db)
            self.db.clipDB(boundary, self.db.db, "nodes_view", "nodes")

overlaps

overlaps(feature)

Conflate a POI against all the features in a GeoJson file

Parameters:

Name Type Description Default
feature dict

The feature to conflate

required

Returns:

Type Description
dict

The modified feature

Source code in osm_merge/conflatePOI.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def overlaps(self,
            feature: dict,
            ):
    """
    Conflate a POI against all the features in a GeoJson file

    Args:
        feature (dict): The feature to conflate

    Returns:
        (dict):  The modified feature
    """
    # Most smartphone GPS are 5-10m off most of the time, plus sometimes
    # we're standing in front of an amenity and recording that location
    # instead of in the building.
    gps_accuracy = 10
    # this is the treshold for fuzzy string matching
    match_threshold = 80
    # log.debug(f"conflateFile({feature})")
    hits = False
    data = dict()
    geom = Point((float(feature["attrs"]["lon"]), float(feature["attrs"]["lat"])))
    wkt = shape(geom)
    for existing in self.data['features']:
        id = int(existing['properties']['id'])
        entry = shapely.from_geojson(str(existing))
        if entry.geom_type != 'Point':
            center = shapely.centroid(entry)
        else:
            center = entry
            # dist = shapely.hausdorff_distance(center, wkt)
            # if 'name' in existing['properties']:
            #     print(f"DIST1: {dist}, {existing['properties']['name']}")
        # x = shapely.distance(wkt, entry)
        # haversine reverses the order of lat & lon from what shapely uses. We
        # use this as meters is easier to deal with than cartesian coordinates.
        x1 = (center.coords[0][1], center.coords[0][0])
        x2 = (wkt.coords[0][1], wkt.coords[0][0])
        dist = haversine(x1, x2, unit=Unit.METERS)
        if dist < gps_accuracy:
            # if 'name' in existing['properties']:
            # log.debug(f"DIST2: {dist}")
            # log.debug(f"Got a Hit! {feature['tags']['name']}")
            for key,value in feature['tags'].items():
                if key in self.analyze:
                    if key in existing['properties']:
                        result = fuzz.ratio(value, existing['properties'][key])
                        if result > match_threshold:
                            # log.debug(f"Matched: {result}: {feature['tags']['name']}")
                            existing['properties']['fixme'] = "Probably a duplicate!"
                            log.debug(f"Got a dup in file!!! {existing['properties']['name'] }")
                            hits = True
                            break
        if hits:
            version = int(existing['properties']['version'])
            # coords = feature['geometry']['coordinates']
            # lat = coords[1]
            # lon = coords[0]
            attrs = {'id': id, 'version': version, 'lat': feature['attrs']['lat'], 'lon': feature['attrs']['lon']}
            tags = existing['properties']
            tags['fixme'] = "Probably a duplicate!"
            # Data extracts for ODK Collect
            del tags['title']
            del tags['label']
            if 'building' in tags:
                return {'attrs': attrs, 'tags': tags, 'refs': list()}
            return {'attrs': attrs, 'tags': tags}
    return dict()

queryToFeature

queryToFeature(results)

Convert the results of an SQL to a GeoJson Feature

Parameters:

Name Type Description Default
results list

The results of the query

required

Returns:

Type Description
list

a list of the features fromn the results

Source code in osm_merge/conflatePOI.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def queryToFeature(self,
                   results: list,
                   ):
    """
    Convert the results of an SQL to a GeoJson Feature

    Args:
        results (list): The results of the query

    Returns:
        (list): a list of the features fromn the results
    """

    features = list()
    for entry in results:
        osm_id = int(entry[0])
        tags = entry[1]
        version = int(entry[2])
        coords = shapely.from_wkt(entry[3])
        dist = entry[4]
        # ways have an additional column
        if len(entry) == 6:
            refs = entry[5]
        else:
            refs = list()
        if coords.geom_type == 'Polygon':
            center = shapely.centroid(coords)
            lat = center.y
            lon = center.x
            tags['geom_type'] = 'way'
        elif coords.geom_type == "Point":
            lat = coords.y
            lon = coords.x
            tags['geom_type'] = 'node'
        else:
            log.error(f"Unsupported geometry type: {coords.geom_type}")
        # match = entry[5] # FIXME: for debugging
        # the timestamp attribute gets added when it's uploaded to OSM.
        attrs = {'id': osm_id,
                'version': version,
                'lat': lat,
                'lon': lon,
                }
        tags['dist'] = dist
        # tags['match'] = match # FIXME: for debugging
        # tags['fixme'] = "Probably a duplicate node!"
        features.append({'attrs': attrs, 'tags': tags, 'refs': refs})

    return features

checkTags

checkTags(feature, osm)

Check tags between 2 features.

Parameters:

Name Type Description Default
feature Feature

The feature from the external dataset

required
osm dict

The result of the SQL query

required

Returns:

Type Description
int

The nunber of tag matches

dict

The updated tags

Source code in osm_merge/conflatePOI.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def checkTags(self,
              feature: Feature,
              osm: dict,
              ):
    """
    Check tags between 2 features.

    Args:
        feature (Feature): The feature from the external dataset
        osm (dict): The result of the SQL query

    Returns:
        (int): The nunber of tag matches
        (dict): The updated tags
    """
    tags = osm['tags']
    hits = 0
    match_threshold = 80
    if osm['tags']['dist'] > float(self.tolerance):
        return 0, osm['tags']
    for key, value in feature['tags'].items():
        if key in tags:
            ratio = fuzz.ratio(value, tags[key])
            if ratio > match_threshold:
                hits += 1
            else:
                if key != 'note':
                    tags[f'old_{key}'] = value
        tags[key] = value

    return hits, tags

conflateData

conflateData(data, threshold=7)

Conflate all the data. This the primary interfacte for conflation.

Parameters:

Name Type Description Default
data list

A list of all the entries in the OSM XML input file

required
threshold int

The threshold for distance calculations

7

Returns:

Type Description
dict

The modified features

Source code in osm_merge/conflatePOI.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def conflateData(self,
                 data: list,
                 threshold: int = 7,
                 ):
    """
    Conflate all the data. This the primary interfacte for conflation.

    Args:
        data (list): A list of all the entries in the OSM XML input file
        threshold (int): The threshold for distance calculations

    Returns:
        (dict):  The modified features
    """
    timer = Timer(text="conflateData() took {seconds:.0f}s")
    timer.start()
    # Use fuzzy string matching to handle minor issues in the name column,
    # which is often used to match an amenity.
    if len(self.data) == 0:
        self.db.queryDB("CREATE EXTENSION IF NOT EXISTS fuzzystrmatch")
    log.debug(f"conflateData() called! {len(data)} features")

    # A chunk is a group of threads
    entries = len(data)
    chunk = round(len(data) / cores)

    if True: # FIXME: entries <= chunk:
        result = conflateThread(data, self)
        timer.stop()
        return result

    # Chop the data into a subset for each thread
    newdata = list()
    future = None
    result = None
    index = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
        i = 0
        subset = dict()
        futures = list()
        for key, value in data.items():
            subset[key] = value
            if i == chunk:
                i = 0
                result = executor.submit(conflateThread, subset, self)
                index += 1
                # result.add_done_callback(callback)
                futures.append(result)
                subset = dict()
            i += 1
        for future in concurrent.futures.as_completed(futures):
            log.debug(f"Waiting for thread to complete..")
            # print(f"YYEESS!! {future.result(timeout=10)}")
            newdata.append(future.result(timeout=5))
    timer.stop()
    return newdata

queryWays

queryWays(feature, db=None)

Conflate a POI against all the ways in a postgres view

Parameters:

Name Type Description Default
feature Feature

The feature to conflate

required
db GeoSupport

The datbase connection to use

None

Returns:

Type Description
list

The data with tags added from the conflation

Source code in osm_merge/conflatePOI.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
    def queryWays(self,
                    feature: Feature,
                    db: GeoSupport = None,
                    ):
        """
        Conflate a POI against all the ways in a postgres view

        Args:
            feature (Feature): The feature to conflate
            db (GeoSupport): The datbase connection to use

        Returns:
            (list): The data with tags added from the conflation
        """
        # log.debug(f"conflateWay({feature})")
        hits = 0
        result = list()
        geom = Point((float(feature["attrs"]["lon"]), float(feature["attrs"]["lat"])))
        wkt = shape(geom)

        # cleanval = escape(value)
        # Get all ways close to this feature.
#        query = f"SELECT osm_id,tags,version,ST_AsText(ST_Centroid(geom)),ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) FROM ways_view WHERE ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) < {self.tolerance} ORDER BY ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\'))"
        query = f"{self.select}" % wkt.wkt
        query += f", refs FROM ways_view WHERE ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) < {self.tolerance} ORDER BY ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\'))"
        #log.debug(query)
        result = list()
        if db:
            result = db.queryDB(query)
        else:
            result = self.db.queryDB(query)
        if len(result) > 0:
            hits += 1
        else:
            log.warning(f"No results at all for {query}")

        return result

queryNodes

queryNodes(feature, db=None)

Find all the nodes in the view within a certain distance that are buildings or amenities.

Parameters:

Name Type Description Default
feature Feature

The feature to use as the location

required
db GeoSupport

The database connection to use

None

Returns:

Type Description
list

The results of the conflation

Source code in osm_merge/conflatePOI.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def queryNodes(self,
                 feature: Feature,
                 db: GeoSupport = None,
                 ):
    """
    Find all the nodes in the view within a certain distance that
    are buildings or amenities.

    Args:
        feature (Feature): The feature to use as the location
        db (GeoSupport): The database connection to use

    Returns:
        (list): The results of the conflation
    """
    # log.debug(f"queryNodes({feature})")
    hits = 0
    geom = Point((float(feature["attrs"]["lon"]), float(feature["attrs"]["lat"])))
    wkt = shape(geom)
    result = list()
    ratio = 1

    # for key,value in feature['tags'].items():
    # print(f"NODE: {key} = {value}")
    # if key not in self.analyze:
    #     continue

    # Use a Geography data type to get the answer in meters, which
    # is easier to deal with than degress of the earth.
    # cleanval = escape(value)
    # query = f"SELECT osm_id,tags,version,ST_AsEWKT(geom),ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')),levenshtein(tags->>'{key}', '{cleanval}') FROM nodes_view WHERE ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) < {self.tolerance} AND levenshtein(tags->>'{key}', '{cleanval}') <= {ratio}"
    # AND (tags->>'amenity' IS NOT NULL OR tags->>'shop' IS NOT NULL)"
    query = f"{self.select}" % wkt.wkt
    query += f" FROM nodes_view WHERE ST_Distance(geom::geography, ST_GeogFromText(\'SRID=4326;{wkt.wkt}\')) < {self.tolerance} AND (tags->>'amenity' IS NOT NULL OR tags->>'building' IS NOT NULL)"
    #log.debug(query)
    # FIXME: this currently only works with a local database,
    # not underpass yet
    if db:
        result = db.queryDB(query)
    else:
        result = self.db.queryDB(query)
    # log.debug(f"Got {len(result)} results")
    if len(result) > 0:
        hits += 1
        # break
    # else:
    #     log.warning(f"No results at all for {query}")

    return result

options: show_source: false heading_level: 3

geosupport.py

Bases: object

Parameters:

Name Type Description Default
dburi str

The database URI

None
config str

The config file from the osm-rawdata project

None

Returns:

Type Description
GeoSupport

An instance of this object

Source code in osm_merge/geosupport.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self,
             dburi: str = None,
             config: str = None,
             ):
    """
    This class conflates data that has been imported into a postgres
    database using the Underpass raw data schema.

    Args:
        dburi (str, optional): The database URI
        config (str, optional): The config file from the osm-rawdata project

    Returns:
        (GeoSupport): An instance of this object
    """
    self.db = None
    self.dburi = dburi
    self.config = config

importDataset async

importDataset(filespec)

Import a GeoJson file into a postgres database for conflation.

Parameters:

Name Type Description Default
filespec str

The GeoJson file to import

required

Returns:

Type Description
bool

If the import was successful

Source code in osm_merge/geosupport.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def importDataset(self,
                 filespec: str,
                 ) -> bool:
    """
    Import a GeoJson file into a postgres database for conflation.

    Args:
        filespec (str): The GeoJson file to import

    Returns:
        (bool): If the import was successful
    """
    file = open(filespec, "r")
    data = geojson.load(file)

    # Create the tables
    sql = "CREATE EXTENSION postgis;"
    result = await self.db.execute(sql)
    sql = f"DROP TABLE IF EXISTS public.nodes CASCADE; CREATE TABLE public.nodes (osm_id bigint, geom geometry, tags jsonb);"
    result = await self.db.execute(sql)
    sql = f"DROP TABLE IF EXISTS public.ways_line CASCADE; CREATE TABLE public.ways_line (osm_id bigint, geom geometry, tags jsonb);"
    result = await self.db.execute(sql)
    sql = f"DROP TABLE IF EXISTS public.poly CASCADE; CREATE TABLE public.ways_poly (osm_id bigint, geom geometry, tags jsonb);"
    result = await self.db.execute(sql)

    # if self.db.is_closed():
    #     return False

    table = self.dburi.split('/')[1]
    for entry in data["features"]:
        keys = "geom, "
        geometry = shape(entry["geometry"])
        ewkt = geometry.wkt
        if geometry.geom_type == "LineString":
            table = "ways_line"
        if geometry.geom_type == "Polygon":
            table = "ways_poly"
        if geometry.geom_type == "Point":
            table = "nodes"
        tags = f"\'{{"
        for key, value in entry["properties"].items():
            tags += f"\"{key}\": \"{value}\", "
        tags = tags[:-2]
        tags += "}\'::jsonb)"
        sql = f"INSERT INTO {table} (geom, tags) VALUES(ST_GeomFromEWKT(\'SRID=4326;{ewkt}\'), {tags}"
        result = await self.db.pg.execute(sql)

    return False

initialize async

initialize(dburi=None, config=None)

When async, we can't initialize the async database connection, so it has to be done as an extrat step.

Parameters:

Name Type Description Default
dburi str

The database URI

None
config str

The config file from the osm-rawdata project

None
Source code in osm_merge/geosupport.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def initialize(self,
                    dburi: str = None,
                    config: str = None,
                    ):
    """
    When async, we can't initialize the async database connection,
    so it has to be done as an extrat step.

    Args:
        dburi (str, optional): The database URI
        config (str, optional): The config file from the osm-rawdata project
    """
    if dburi:
        self.db = PostgresClient()
        await self.db.connect(dburi)
    elif self.dburi:
        self.db = PostgresClient()
        await self.db.connect(self.dburi)

    if config:
        await self.db.loadConfig(config)
    elif self.config:
        await self.db.loadConfig(config)

clipDB async

clipDB(boundary, db=None, view='ways_view')

Clip a database table by a boundary

Parameters:

Name Type Description Default
boundary Polygon

The AOI of the project

required
db PostgresClient

A reference to the existing database connection

None
view str

The name of the new view

'ways_view'

Returns:

Type Description
bool

If the region was clipped sucessfully

Source code in osm_merge/geosupport.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
async def clipDB(self,
         boundary: Polygon,
         db: PostgresClient = None,
         view: str = "ways_view",
         ):
    """
    Clip a database table by a boundary

    Args:
        boundary (Polygon): The AOI of the project
        db (PostgresClient): A reference to the existing database connection
        view (str): The name of the new view

    Returns:
        (bool): If the region was clipped sucessfully
    """
    remove = list()
    if not boundary:
        return False

    ewkt = shape(boundary)

    # Create a new postgres view
    # FIXME: this should be a temp view in the future, this is to make
    # debugging easier.
    sql = f"DROP VIEW IF EXISTS {view} CASCADE ;CREATE VIEW {view} AS SELECT * FROM ways_poly WHERE ST_CONTAINS(ST_GeomFromEWKT('SRID=4326;{ewkt}'), geom)"
    # log.debug(sql)
    if db:
        result = await db.queryDB(sql)
    elif self.db:
        result = await self.db.queryDBl(sql)
    else:
        return False

    return True

queryDB async

queryDB(sql=None, db=None)

Query a database table

Parameters:

Name Type Description Default
db PostgreClient

A reference to the existing database connection

None
sql str

The SQL query to execute

None

Returns:

Type Description
list

The results of the query

Source code in osm_merge/geosupport.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
async def queryDB(self,
            sql: str = None,
            db: PostgresClient = None,
            ) -> list:
    """
    Query a database table

    Args:
        db (PostgreClient, optional): A reference to the existing database connection
        sql (str): The SQL query to execute

    Returns:
        (list): The results of the query
    """
    result = list()
    if not sql:
        log.error(f"You need to pass a valid SQL string!")
        return result

    if db:
        result = db.queryLocal(sql)
    elif self.db:
        result = self.db.queryLocal(sql)

    return result

clipFile async

clipFile(boundary, data)

Clip a database table by a boundary

Parameters:

Name Type Description Default
boundary Polygon

The filespec of the project AOI

required
data FeatureCollection

The data to clip

required

Returns:

Type Description
FeatureCollection

The data within the boundary

Source code in osm_merge/geosupport.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
async def clipFile(self,
            boundary: Polygon,
            data: FeatureCollection,
            ):
    """
    Clip a database table by a boundary

    Args:
        boundary (Polygon): The filespec of the project AOI
        data (FeatureCollection): The data to clip

    Returns:
        (FeatureCollection): The data within the boundary
    """
    new = list()
    if len(self.data) > 0:
        for feature in self.data["features"]:
            shapely.from_geojson(feature)
            if not shapely.contains(ewkt, entry):
                log.debug(f"CONTAINS {entry}")
                new.append(feature)
                #  del self.data[self.data['features']]

    return new

copyTable async

copyTable(table, remote)

Use DBLINK to copy a table from the external database to a local table so conflating is much faster.

Parameters:

Name Type Description Default
table str

The table to copy

required
Source code in osm_merge/geosupport.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
async def copyTable(self,
                    table: str,
                    remote: PostgresClient,
                    ):
    """
    Use DBLINK to copy a table from the external
    database to a local table so conflating is much faster.

    Args:
        table (str): The table to copy
    """
    timer = Timer(initial_text=f"Copying {table}...",
                  text="copying {table} took {seconds:.0f}s",
                  logger=log.debug,
                )
    # Get the columns from the remote database table
    self.columns = await remote.getColumns(table)

    print(f"SELF: {self.pg.dburi}")
    print(f"REMOTE: {remote.dburi}")

    # Do we already have a local copy ?
    sql = f"SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename  = '{table}'"
    result = await self.pg.execute(sql)
    print(result)

    # cleanup old temporary tables in the current database
    # drop = ["DROP TABLE IF EXISTS users_bak",
    #         "DROP TABLE IF EXISTS user_interests",
    #         "DROP TABLE IF EXISTS foo"]
    # result = await pg.pg.executemany(drop)
    sql = f"DROP TABLE IF EXISTS new_{table} CASCADE"
    result = await self.pg.execute(sql)
    sql = f"DROP TABLE IF EXISTS {table}_bak CASCADE"
    result = await self.pg.execute(sql)
    timer.start()
    dbuser = self.pg.dburi["dbuser"]
    dbpass = self.pg.dburi["dbpass"]
    sql = f"CREATE SERVER IF NOT EXISTS pg_rep_db FOREIGN DATA WRAPPER dblink_fdw  OPTIONS (dbname 'tm4');"
    data = await self.pg.execute(sql)

    sql = f"CREATE USER MAPPING IF NOT EXISTS FOR {dbuser} SERVER pg_rep_db OPTIONS ( user '{dbuser}', password '{dbpass}');"
    result = await self.pg.execute(sql)

    # Copy table from remote database so JOIN is faster when it's in the
    # same database
    #columns = await sel.getColumns(table)
    log.warning(f"Copying a remote table is slow, but faster than remote access......")
    sql = f"SELECT * INTO {table} FROM dblink('pg_rep_db','SELECT * FROM {table}') AS {table}({self.columns})"
    print(sql)
    result = await self.pg.execute(sql)

    return True

options: show_source: false heading_level: 3