Skip to content

API Docs for fmtm-splitter

fmtm_splitter.py

Bases: object

A class to split polygons.

Parameters:

Name Type Description Default
aoi_obj (str, FeatureCollection)

Input AOI, either a file path, or GeoJSON string.

None

Returns:

Name Type Description
instance FMTMSplitter

An instance of this class

Source code in fmtm_splitter/splitter.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    aoi_obj: Optional[Union[str, FeatureCollection, dict]] = None,
):
    """This class splits a polygon into tasks using a variety of algorithms.

    Args:
        aoi_obj (str, FeatureCollection): Input AOI, either a file path,
            or GeoJSON string.

    Returns:
        instance (FMTMSplitter): An instance of this class
    """
    # Parse AOI, merge if multiple geometries
    if aoi_obj:
        geojson = self.input_to_geojson(aoi_obj)
        self.aoi = self.geojson_to_shapely_polygon(geojson)

    # Init split features
    self.split_features = None

input_to_geojson staticmethod

input_to_geojson(input_data, merge=False)

Parse input data consistently to a GeoJSON obj.

Source code in fmtm_splitter/splitter.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@staticmethod
def input_to_geojson(
    input_data: Union[str, FeatureCollection, dict], merge: bool = False
) -> GeoJSON:
    """Parse input data consistently to a GeoJSON obj."""
    log.info(f"Parsing GeoJSON from type {type(input_data)}")
    if (
        isinstance(input_data, str)
        and len(input_data) < 250
        and Path(input_data).is_file()
    ):
        # Impose restriction for path lengths <250 chars
        with open(input_data, "r") as jsonfile:
            try:
                parsed_geojson = geojson.load(jsonfile)
            except json.decoder.JSONDecodeError as e:
                raise IOError(
                    f"File exists, but content is invalid JSON: {input_data}"
                ) from e

    elif isinstance(input_data, FeatureCollection):
        parsed_geojson = input_data
    elif isinstance(input_data, dict):
        parsed_geojson = geojson.loads(geojson.dumps(input_data))
    elif isinstance(input_data, str):
        geojson_truncated = (
            input_data if len(input_data) < 250 else f"{input_data[:250]}..."
        )
        log.debug(f"GeoJSON string passed: {geojson_truncated}")
        parsed_geojson = geojson.loads(input_data)
    else:
        err = (
            f"The specified AOI is not valid (must be geojson or str): {input_data}"
        )
        log.error(err)
        raise ValueError(err)

    return parsed_geojson

geojson_to_featcol staticmethod

geojson_to_featcol(geojson)

Standardise any geojson type to FeatureCollection.

Source code in fmtm_splitter/splitter.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@staticmethod
def geojson_to_featcol(
    geojson: Union[FeatureCollection, Feature, dict],
) -> FeatureCollection:
    """Standardise any geojson type to FeatureCollection."""
    # Parse and unparse geojson to extract type
    if isinstance(geojson, FeatureCollection):
        # Handle FeatureCollection nesting
        features = geojson.get("features", [])
    elif isinstance(geojson, Feature):
        # Must be a list
        features = [geojson]
    else:
        # A standard geometry type. Has coordinates, no properties
        features = [Feature(geometry=geojson)]
    return FeatureCollection(features)

geojson_to_shapely_polygon staticmethod

geojson_to_shapely_polygon(geojson)

Parse GeoJSON and return shapely Polygon.

The GeoJSON may be of type FeatureCollection, Feature, or Polygon, but should only contain one Polygon geometry in total.

Source code in fmtm_splitter/splitter.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@staticmethod
def geojson_to_shapely_polygon(
    geojson: Union[FeatureCollection, Feature, dict],
) -> Polygon:
    """Parse GeoJSON and return shapely Polygon.

    The GeoJSON may be of type FeatureCollection, Feature, or Polygon,
    but should only contain one Polygon geometry in total.
    """
    features = FMTMSplitter.geojson_to_featcol(geojson).get("features", [])
    log.debug("Converting AOI to Shapely geometry")

    if len(features) == 0:
        msg = "The input AOI contains no geometries."
        log.error(msg)
        raise ValueError(msg)
    elif len(features) > 1:
        msg = "The input AOI cannot contain multiple geometries."
        log.error(msg)
        raise ValueError(msg)

    return shape(features[0].get("geometry"))

meters_to_degrees

meters_to_degrees(meters, reference_lat)

Converts meters to degrees at a given latitude.

Using WGS84 ellipsoidal calculations.

Parameters:

Name Type Description Default
meters float

The distance in meters to convert.

required
reference_lat float

The latitude at which to ,

required

Returns:

Type Description
Tuple[float, float]

Tuple[float, float]: Degree values for latitude and longitude.

Source code in fmtm_splitter/splitter.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def meters_to_degrees(
    self, meters: float, reference_lat: float
) -> Tuple[float, float]:
    """Converts meters to degrees at a given latitude.

    Using WGS84 ellipsoidal calculations.

    Args:
        meters (float): The distance in meters to convert.
        reference_lat (float): The latitude at which to ,
        perform the conversion (in degrees).

    Returns:
        Tuple[float, float]: Degree values for latitude and longitude.
    """
    # INFO:
    # The geodesic distance is the shortest distance on the surface
    # of an ellipsoidal model of the earth

    lat_rad = math.radians(reference_lat)

    # Using WGS84 parameters
    a = 6378137.0  # Semi-major axis in meters
    f = 1 / 298.257223563  # Flattening factor

    # Applying formula
    e2 = (2 * f) - (f**2)  # Eccentricity squared
    n = a / math.sqrt(
        1 - e2 * math.sin(lat_rad) ** 2
    )  # Radius of curvature in the prime vertical
    m = (
        a * (1 - e2) / (1 - e2 * math.sin(lat_rad) ** 2) ** (3 / 2)
    )  # Radius of curvature in the meridian

    lat_deg_change = meters / m  # Latitude change in degrees
    lon_deg_change = meters / (n * math.cos(lat_rad))  # Longitude change in degrees

    # Convert changes to degrees by dividing by radians to degrees
    lat_deg_change = math.degrees(lat_deg_change)
    lon_deg_change = math.degrees(lon_deg_change)

    return lat_deg_change, lon_deg_change

splitBySquare

splitBySquare(meters, db, extract_geojson=None)

Split the polygon into squares.

Parameters:

Name Type Description Default
meters int

The size of each task square in meters.

required
db (str, connection)

The db url, format: postgresql://myusername:mypassword@myhost:5432/mydatabase OR an psycopg2 connection object object that is reused. Passing an connection object prevents requiring additional database connections to be spawned.

required
extract_geojson (dict, FeatureCollection)

an OSM extract geojson, containing building polygons, or linestrings.

None

Returns:

Name Type Description
data FeatureCollection

A multipolygon of all the task boundaries.

Source code in fmtm_splitter/splitter.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def splitBySquare(  # noqa: N802
    self,
    meters: int,
    db: Union[str, connection],
    extract_geojson: Optional[Union[dict, FeatureCollection]] = None,
) -> FeatureCollection:
    """Split the polygon into squares.

    Args:
        meters (int):  The size of each task square in meters.
        db (str, psycopg2.extensions.connection): The db url, format:
            postgresql://myusername:mypassword@myhost:5432/mydatabase
            OR an psycopg2 connection object object that is reused.
            Passing an connection object prevents requiring additional
            database connections to be spawned.
        extract_geojson (dict, FeatureCollection): an OSM extract geojson,
            containing building polygons, or linestrings.

    Returns:
        data (FeatureCollection): A multipolygon of all the task boundaries.
    """
    log.debug("Splitting the AOI by squares")

    xmin, ymin, xmax, ymax = self.aoi.bounds

    reference_lat = (ymin + ymax) / 2
    length_deg, width_deg = self.meters_to_degrees(meters, reference_lat)

    # Create grid columns and rows based on the AOI bounds
    cols = np.arange(xmin, xmax + width_deg, width_deg)
    rows = np.arange(ymin, ymax + length_deg, length_deg)

    with create_connection(db) as conn:
        with conn.cursor() as cur:
            # Drop the table if it exists
            cur.execute("DROP TABLE IF EXISTS temp_polygons;")
            # Create temporary table
            cur.execute("""
                CREATE TEMP TABLE temp_polygons (
                    id SERIAL PRIMARY KEY,
                    geom GEOMETRY(GEOMETRY, 4326),
                    area DOUBLE PRECISION
                );
            """)

            extract_geoms = []
            if extract_geojson:
                features = (
                    extract_geojson.get("features", extract_geojson)
                    if isinstance(extract_geojson, dict)
                    else extract_geojson.features
                )
                extract_geoms = [shape(feature["geometry"]) for feature in features]

            # Generate grid polygons and clip them by AOI
            polygons = []
            for x in cols[:-1]:
                for y in rows[:-1]:
                    grid_polygon = box(x, y, x + width_deg, y + length_deg)
                    clipped_polygon = grid_polygon.intersection(self.aoi)

                    if clipped_polygon.is_empty:
                        continue

                    # Check intersection with extract geometries if available
                    if extract_geoms:
                        if any(
                            geom.centroid.within(clipped_polygon)
                            for geom in extract_geoms
                        ):
                            polygons.append(
                                (clipped_polygon.wkt, clipped_polygon.wkt)
                            )

                    else:
                        polygons.append((clipped_polygon.wkt, clipped_polygon.wkt))

            insert_query = """
                    INSERT INTO temp_polygons (geom, area)
                    SELECT ST_GeomFromText(%s, 4326),
                    ST_Area(ST_GeomFromText(%s, 4326)::geography)
                """

            if polygons:
                cur.executemany(insert_query, polygons)

            area_threshold = 0.35 * (meters**2)

            cur.execute(
                """
                DO $$
                DECLARE
                    small_polygon RECORD;
                    nearest_neighbor RECORD;
                BEGIN
                DROP TABLE IF EXISTS small_polygons;
                CREATE TEMP TABLE small_polygons As
                    SELECT id, geom, area
                    FROM temp_polygons
                    WHERE area < %s;
                FOR small_polygon IN SELECT * FROM small_polygons
                LOOP
                    FOR nearest_neighbor IN
                    SELECT id,
                        lp.geom AS large_geom,
                        ST_LENGTH2D(
                        ST_INTERSECTION(small_polygon.geom, geom)
                        ) AS shared_bound
                    FROM temp_polygons lp
                    WHERE id NOT IN (SELECT id FROM small_polygons)
                    AND ST_Touches(small_polygon.geom, lp.geom)
                    AND ST_GEOMETRYTYPE(
                    ST_INTERSECTION(small_polygon.geom, geom)
                    ) != 'ST_Point'
                    ORDER BY shared_bound DESC
                    LIMIT 1
                    LOOP
                        UPDATE temp_polygons
                        SET geom = ST_UNION(small_polygon.geom, geom)
                        WHERE id = nearest_neighbor.id;

                        DELETE FROM temp_polygons WHERE id = small_polygon.id;
                        EXIT;
                    END LOOP;
                END LOOP;
                END $$;
            """,
                (area_threshold,),
            )

            cur.execute(
                """
                SELECT
                JSONB_BUILD_OBJECT(
                'type', 'FeatureCollection',
                'features', JSONB_AGG(feature)
                )
                FROM(
                SELECT JSONB_BUILD_OBJECT(
                'type', 'Feature',
                'properties', JSONB_BUILD_OBJECT('area', (t.area)),
                'geometry', ST_ASGEOJSON(t.geom)::json
                ) AS feature
                FROM temp_polygons as t
                ) AS features;
                """
            )
            self.split_features = cur.fetchone()[0]
    return self.split_features

splitBySQL

splitBySQL(sql, db, buildings=None, osm_extract=None)

Split the polygon by features in the database using an SQL query.

FIXME this requires some work to function with custom SQL.

Parameters:

Name Type Description Default
sql str

The SQL query to execute

required
db (str, connection)

The db url, format: postgresql://myusername:mypassword@myhost:5432/mydatabase OR an psycopg2 connection object object that is reused. Passing an connection object prevents requiring additional database connections to be spawned.

required
buildings int

The number of buildings in each task

None
osm_extract (dict, FeatureCollection)

an OSM extract geojson, containing building polygons, or linestrings.

None

Returns:

Name Type Description
data FeatureCollection

A multipolygon of all the task boundaries.

Source code in fmtm_splitter/splitter.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def splitBySQL(  # noqa: N802
    self,
    sql: str,
    db: Union[str, connection],
    buildings: Optional[int] = None,
    osm_extract: Optional[Union[dict, FeatureCollection]] = None,
) -> FeatureCollection:
    """Split the polygon by features in the database using an SQL query.

    FIXME this requires some work to function with custom SQL.

    Args:
        sql (str): The SQL query to execute
        db (str, psycopg2.extensions.connection): The db url, format:
            postgresql://myusername:mypassword@myhost:5432/mydatabase
            OR an psycopg2 connection object object that is reused.
            Passing an connection object prevents requiring additional
            database connections to be spawned.
        buildings (int): The number of buildings in each task
        osm_extract (dict, FeatureCollection): an OSM extract geojson,
            containing building polygons, or linestrings.

    Returns:
        data (FeatureCollection): A multipolygon of all the task boundaries.
    """
    # Validation
    if buildings and not osm_extract:
        msg = (
            "To use the FMTM splitting algo, an OSM data extract must be passed "
            "via param `osm_extract` as a geojson dict or FeatureCollection."
        )
        log.error(msg)
        raise ValueError(msg)

    # Run custom SQL
    if not buildings or not osm_extract:
        log.info(
            "No `buildings` or `osm_extract` params passed, executing custom SQL"
        )
        # FIXME untested
        conn = create_connection(db)
        splitter_cursor = conn.cursor()
        log.debug("Running custom splitting algorithm")
        splitter_cursor.execute(sql)
        features = splitter_cursor.fetchall()[0][0]["features"]
        if features:
            log.info(f"Query returned {len(features)} features")
        else:
            log.info("Query returned no features")
        self.split_features = FeatureCollection(features)
        return self.split_features

    # Get existing db engine, or create new one
    conn = create_connection(db)

    # Generate db tables if not exist
    log.debug("Generating required temp tables")
    create_tables(conn)

    # Add aoi to project_aoi table
    aoi_to_postgis(conn, self.aoi)

    def json_str_to_dict(json_item: Union[str, dict]) -> dict:
        """Convert a JSON string to dict."""
        if isinstance(json_item, dict):
            return json_item
        if isinstance(json_item, str):
            try:
                return json.loads(json_item)
            except json.JSONDecodeError:
                msg = f"Error decoding key in GeoJSON: {json_item}"
                log.error(msg)
                # Set tags to empty, skip feature
                return {}

    # Insert data extract into db, using same cursor
    log.debug("Inserting data extract into db")
    cur = conn.cursor()
    for feature in osm_extract["features"]:
        # NOTE must handle format generated from FMTMSplitter __init__
        wkb_element = shape(feature["geometry"]).wkb_hex
        properties = feature.get("properties", {})
        if "tags" in properties.keys():
            # Sometimes tags are placed under tags key
            tags = properties.get("tags", {})
        else:
            # Sometimes tags are directly in properties
            tags = properties

        # Handle nested 'tags' key if present
        tags = json_str_to_dict(tags).get("tags", json_str_to_dict(tags))
        osm_id = properties.get("osm_id")

        # Common attributes for db tables
        common_args = dict(osm_id=osm_id, geom=wkb_element, tags=tags)

        # Insert building polygons
        if tags.get("building") == "yes":
            insert_geom(cur, "ways_poly", **common_args)

        # Insert highway/waterway/railway polylines
        elif any(key in tags for key in ["highway", "waterway", "railway"]):
            insert_geom(cur, "ways_line", **common_args)

    # Use raw sql for view generation & remainder of script
    # TODO get geom from project_aoi table instead of wkb string
    log.debug("Creating db view with intersecting polylines")
    view = (
        "DROP VIEW IF EXISTS lines_view;"
        "CREATE VIEW lines_view AS SELECT "
        "tags,geom FROM ways_line WHERE "
        "ST_Intersects(ST_SetSRID(CAST(%s AS GEOMETRY), 4326), geom)"
    )
    cur.execute(view, (self.aoi.wkb_hex,))
    # Close current cursor
    cur.close()

    splitter_cursor = conn.cursor()
    log.debug("Running task splitting algorithm")
    splitter_cursor.execute(sql, {"num_buildings": buildings})

    features = splitter_cursor.fetchall()[0][0]["features"]
    if features:
        log.info(f"Query returned {len(features)} features")
    else:
        log.info("Query returned no features")

    self.split_features = FeatureCollection(features)

    # Drop tables & close (+commit) db connection
    drop_tables(conn)
    close_connection(conn)

    return self.split_features

splitByFeature

splitByFeature(features)

Split the polygon by features in the database.

Parameters:

Name Type Description Default
features(FeatureCollection)

FeatureCollection of features to polygonise and return.

required

Returns:

Name Type Description
data FeatureCollection

A multipolygon of all the task boundaries.

Source code in fmtm_splitter/splitter.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def splitByFeature(  # noqa: N802
    self,
    features: FeatureCollection,
) -> FeatureCollection:
    """Split the polygon by features in the database.

    Args:
        features(FeatureCollection): FeatureCollection of features
            to polygonise and return.

    Returns:
        data (FeatureCollection): A multipolygon of all the task boundaries.
    """
    log.debug("Polygonising the FeatureCollection features")
    # Extract all geometries from the input features
    geometries = []
    for feature in features["features"]:
        geom = feature["geometry"]
        if geom["type"] == "Polygon":
            geometries.append(shape(geom))
        elif geom["type"] == "LineString":
            geometries.append(shape(geom))
        else:
            log.warning(f"Ignoring unsupported geometry type: {geom['type']}")

    # Create a single MultiPolygon from all the polygons and linestrings
    multi_polygon = unary_union(geometries)

    # Clip the multi_polygon by the AOI boundary
    clipped_multi_polygon = multi_polygon.intersection(self.aoi)

    polygon_features = [
        Feature(geometry=polygon) for polygon in list(clipped_multi_polygon.geoms)
    ]

    # Convert the Polygon Features into a FeatureCollection
    self.split_features = FeatureCollection(features=polygon_features)

    return self.split_features

outputGeojson

outputGeojson(filename='output.geojson')

Output a geojson file from split features.

Source code in fmtm_splitter/splitter.py
522
523
524
525
526
527
528
529
530
531
532
533
534
def outputGeojson(  # noqa: N802
    self,
    filename: str = "output.geojson",
) -> None:
    """Output a geojson file from split features."""
    if not self.split_features:
        msg = "Feature splitting has not been executed. Do this first."
        log.error(msg)
        raise RuntimeError(msg)

    with open(filename, "w") as jsonfile:
        geojson.dump(self.split_features, jsonfile)
        log.debug(f"Wrote split features to {filename}")

options: show_source: true