Skip to content

Module src.app

Page contains Main core logic of app

Variables

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
BUCKET_NAME
DEFAULT_README_TEXT
ENABLE_CUSTOM_EXPORTS
ENABLE_HDX_EXPORTS
ENABLE_POLYGON_STATISTICS_ENDPOINTS
ENABLE_SOZIP
ENABLE_TILES
EXPORT_MAX_AREA_SQKM
EXPORT_TYPE_MAPPING
HDX_FILTER_CRITERIA
HDX_MARKDOWN
LOCAL_CON_POOL
MAX_WORKERS
PARALLEL_PROCESSING_CATEGORIES
POLYGON_STATISTICS_API_URL
PROCESS_SINGLE_CATEGORY_IN_POSTGRES
USE_DUCK_DB_FOR_CUSTOM_EXPORTS
USE_S3_TO_UPLOAD
database_instance
export_path
index_threshold
level
use_connection_pooling

Functions

check_for_json

def check_for_json(
    result_str
)
Check if the Payload is a JSON document

Return: bool: True in case of success, False otherwise

convert_dict_to_conn_str

def convert_dict_to_conn_str(
    db_dict
)

dict_none_clean

def dict_none_clean(
    to_clean
)
Clean DictWriter

generate_ogr2ogr_cmd_from_psql

def generate_ogr2ogr_cmd_from_psql(
    export_file_path,
    export_file_format_driver,
    postgres_query,
    layer_creation_options,
    query_dump_path
)
Generates ogr2ogr command for postgresql queries

def print_psycopg2_exception(
    err
)
Function that handles and parses Psycopg2 exceptions

run_ogr2ogr_cmd

def run_ogr2ogr_cmd(
    cmd
)
Runs command and monitors the file size until the process runs

Args: cmd (type): Command to run for subprocess binding_file_dir (type): description

Raises: Exception: If process gets failed

Classes

Cron

class Cron(

)

Methods

create_cron

def create_cron(
    self,
    cron_data
)
Create a new Cron entry in the database.

Args: cron_data (dict): Data for creating the Cron entry.

Returns: dict: Result of the cron creation process.

delete_cron

def delete_cron(
    self,
    cron_id: int
)
Delete an existing Cron entry from the database.

Args: cron_id (int): ID of the Cron entry to delete.

Returns: dict: Result of the Cron deletion process.

Raises: HTTPException: If the Cron entry is not found.

get_cron_by_id

def get_cron_by_id(
    self,
    cron_id: int
)
Retrieve a specific Cron entry by its ID.

Args: cron_id (int): ID of the Cron entry to retrieve.

Returns: dict: Details of the requested Cron entry.

Raises: HTTPException: If the Cron entry is not found.

get_cron_list_with_filters

def get_cron_list_with_filters(
    self,
    skip: int = 0,
    limit: int = 10,
    filters: dict = {}
)
Retrieve a list of Cron entries based on provided filters.

Args: skip (int): Number of entries to skip. limit (int): Maximum number of entries to retrieve. filters (dict): Filtering criteria.

Returns: List[dict]: List of Cron entries.

patch_cron

def patch_cron(
    self,
    cron_id: int,
    cron_data: dict
)
Partially update an existing Cron entry in the database.

Args: cron_id (int): ID of the Cron entry to update. cron_data (dict): Data for partially updating the Cron entry.

Returns: dict: Result of the Cron update process.

Raises: HTTPException: If the Cron entry is not found.

search_cron_by_dataset_title

def search_cron_by_dataset_title(
    self,
    dataset_title: str,
    skip: int = 0,
    limit: int = 10
)
Search for Cron entries by dataset title.

Args: dataset_title (str): The title of the dataset to search for. skip (int): Number of entries to skip. limit (int): Maximum number of entries to retrieve.

Returns: List[dict]: List of Cron entries matching the dataset title.

update_cron

def update_cron(
    self,
    cron_id: int,
    cron_data
)
Update an existing Cron entry in the database.

Args: cron_id (int): ID of the Cron entry to update. cron_data (dict): Data for updating the Cron entry.

Returns: dict: Result of the Cron update process.

Raises: HTTPException: If the Cron entry is not found.

CustomExport

class CustomExport(
    params,
    uid=None
)

Constructor for the custom export class.

Parameters: - params (DynamicCategoriesModel): An instance of DynamicCategoriesModel containing configuration settings.

Methods

clean_resources

def clean_resources(
    self
)
Cleans up temporary resources.

file_to_zip

def file_to_zip(
    self,
    working_dir,
    zip_path
)
Creates a ZIP file from files in a directory.

Parameters: - working_dir (str): Path to the directory containing files to be zipped. - zip_path (str): Path to the resulting ZIP file.

Returns: - Path to the created ZIP file.

format_where_clause_duckdb

def format_where_clause_duckdb(
    self,
    where_clause
)
Formats the where_clause by replacing the first occurrence of the pattern.

Parameters: - where_clause (str): SQL-like condition to filter features.

Returns: - Formatted where_clause.

process_category

def process_category(
    self,
    category
)
Processes a category by executing queries and handling exports.

Parameters: - category (Dict[str, CategoryModel]): Dictionary representing a category.

Returns: - List of resource dictionaries containing export information.

process_category_result

def process_category_result(
    self,
    category_result
)
Processes the result of a category and prepares the response.

Parameters: - category_result (CategoryResult): Instance of CategoryResult.

Returns: - Dictionary containing processed category result.

process_custom_categories

def process_custom_categories(
    self
)
Processes Custom tags and executes category processing in parallel.

Returns: - Dictionary containing the processed dataset information.

query_to_file

def query_to_file(
    self,
    query,
    category_name,
    feature_type,
    export_formats
)
Executes a query and exports the result to file(s).

Parameters: - query (str): SQL query to execute. - category_name (str): Name of the category. - feature_type (str): Feature type. - export_formats (List[ExportTypeInfo]): List of export formats.

Returns: - List of resource dictionaries containing export information.

resource_to_hdx

def resource_to_hdx(
    self,
    uploaded_resources,
    dataset_config,
    category
)
Converts uploaded resources to an HDX dataset and uploads to HDX.

Parameters: - uploaded_resources (List[Dict[str, Any]]): List of resource dictionaries. - dataset_config (DatasetConfig): Instance of DatasetConfig. - category (Dict[str, CategoryModel]): Dictionary representing a category.

Returns: - Dictionary containing the HDX upload information.

resource_to_response

def resource_to_response(
    self,
    uploaded_resources,
    category
)
Converts uploaded resources to a response format.

Parameters: - uploaded_resources (List[Dict[str, Any]]): List of resource dictionaries. - category (Dict[str, CategoryModel]): Dictionary representing a category.

Returns: - Dictionary containing the response information.

types_to_tables

def types_to_tables(
    self,
    type_list: list
)
Maps feature types to corresponding database tables.

Parameters: - type_list (List[str]): List of feature types.

Returns: - List of database tables associated with the given feature types.

upload_resources

def upload_resources(
    self,
    resource_path
)
Uploads a resource file to Amazon S3.

Parameters: - resource_path (str): Path to the resource file on the local filesystem.

Returns: - Download URL for the uploaded resource.

zip_to_s3

def zip_to_s3(
    self,
    resources
)
Zips and uploads a list of resources to Amazon S3.

Parameters: - resources (List[Dict[str, Any]]): List of resource dictionaries.

Returns: - List of resource dictionaries with added download URLs.

Database

class Database(
    db_params
)

Database class is used to connect with your database , run query and get result from it . It has all tests and validation inside class

Methods

close_conn

def close_conn(
    self
)
function for clossing connection to avoid memory leaks

connect

def connect(
    self
)
Database class instance method used to connect to database parameters with error printing

executequery

def executequery(
    self,
    query
)
Function to execute query after connection

DownloadMetrics

class DownloadMetrics(

)

Methods

get_summary_stats

def get_summary_stats(
    self,
    start_date,
    end_date,
    group_by
)
Get summary metrics for raw-data-api downlaods

DuckDB

class DuckDB(
    db_path,
    temp_dir=None
)

Constructor for the DuckDB class.

Parameters: - db_path (str): The path to the DuckDB database file.

Methods

run_query

def run_query(
    self,
    query,
    attach_pgsql=False,
    load_spatial=False
)
Executes a query on the DuckDB database.

Parameters: - query (str): The SQL query to execute. - attach_pgsql (bool): Flag to indicate whether to attach a PostgreSQL database. - load_spatial (bool): Flag to indicate whether to load the spatial extension.

HDXUploader

class HDXUploader(
    category,
    hdx,
    uuid,
    default_category_path,
    completeness_metadata=None
)

Constructor for the HDXUploader class.

Parameters: - category (Dict[str, CategoryModel]): Dictionary representing a category. - hdx (HDX): Instance of the HDX class. - uuid (str): Universally unique identifier. - default_category_path (str): Default path for the category. - completeness_metadata (Optional[Dict[str, Any]]): Metadata for completeness.

Methods

add_notes

def add_notes(
    self
)
Adds notes based on category data.

Returns: - Notes string.

add_resource

def add_resource(
    self,
    resource_meta
)
Adds a resource to the list of resources.

Parameters: - resource_meta (Dict[str, Any]): Metadata for the resource.

init_dataset

def init_dataset(
    self
)
Initializes the HDX dataset.

slugify

def slugify(
    self,
    name
)
Converts a string to a valid slug format.

Parameters: - name (str): Input string.

Returns: - Slugified string.

upload_dataset

def upload_dataset(
    self,
    dump_config_to_s3=False
)
Uploads the dataset to HDX.

Parameters: - dump_config_to_s3 (bool): Flag to indicate whether to dump configuration to S3.

Returns: - Tuple containing category name and dataset information.

PolygonStats

class PolygonStats(
    geojson=None,
    iso3=None
)

Generates stats for polygon

Static methods

get_building_pattern_statement

def get_building_pattern_statement(
    osm_building_count,
    ai_building_count,
    avg_timestamp,
    last_edit_timestamp,
    osm_building_count_6_months
)
Translates building stats to a human-readable statement.

Args: osm_building_count (int): Count of buildings from OpenStreetMap. ai_building_count (int): Count of buildings from AI estimates. avg_timestamp (timestamp): Average timestamp of data. last_edit_timestamp(timestamp): Last edit timestamp of an area osm_building_count_6_months (int): Count of buildings updated in the last 6 months.

Returns: str: Human-readable building statement.

get_road_pattern_statement

def get_road_pattern_statement(
    osm_highway_length,
    ai_highway_length,
    avg_timestamp,
    last_edit_timestamp,
    osm_highway_length_6_months
)
Translates road stats to a human-readable statement.

Args: osm_highway_length (float): Length of roads from OpenStreetMap. ai_highway_length (float): Length of roads from AI estimates. avg_timestamp (str): Average timestamp of data. osm_highway_length_6_months (float): Length of roads updated in the last 6 months.

Returns: str: Human-readable road statement.

Methods

get_osm_analytics_meta_stats

def get_osm_analytics_meta_stats(
    self
)
Gets the raw stats translated into a JSON body using the OSM Analytics API.

Returns: dict: Raw statistics translated into JSON.

get_summary_stats

def get_summary_stats(
    self
)
Generates summary statistics for buildings and roads.

Returns: dict: Summary statistics including building and road statements.

RawData

class RawData(
    parameters=None,
    request_uid='raw-data-api',
    dbdict=None
)

Class responsible for the Rawdata Extraction from available sources , Currently Works for Underpass source Current Snapshot Returns: Geojson Zip file Supports: -Any Key value pair of osm tags -A Polygon -Osm element type (Optional)

Static methods

close_con

def close_con(
    con
)
Closes connection if exists

get_grid_id

def get_grid_id(
    geom,
    cur
)
Gets the intersecting related grid id for the geometry that is passed

Args: geom (type): description cur (type): description

Returns: type: grid id , geometry dump and the area of geometry

ogr_export

def ogr_export(
    query,
    outputtype,
    working_dir,
    dump_temp_path,
    params
)
Generates ogr2ogr command based on outputtype and parameters

Args: query (type): Postgresql query to extract outputtype (type): description working_dir (type): description dump_temp_path (type): temp file path for metadata gen params (type): description

ogr_export_shp

def ogr_export_shp(
    point_query,
    line_query,
    poly_query,
    working_dir,
    file_name
)
Function written to support ogr type extractions as well , In this way we will be able to support all file formats supported by Ogr , Currently it is slow when dataset gets bigger as compared to our own conversion method but rich in feature and data types even though it is slow

query2geojson

def query2geojson(
    con,
    extraction_query,
    dump_temp_file_path
)
Function written from scratch without being dependent on any library, Provides better performance for geojson binding

Methods

check_status

def check_status(
    self
)
Gives status about DB update, Substracts with current time and last db update time

cleanup

def cleanup(
    self
)
Cleans up temporary resources.

extract_current_data

def extract_current_data(
    self,
    exportname
)
Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump Args: exportname: takes filename as argument to create geojson file passed from routers

Returns: geom_area: area of polygon supplied working_dir: dir where results are saved

get_countries_list

def get_countries_list(
    self,
    q
)
Gets Countries list from the database

Args: q (type): list filter query string

Returns: featurecollection: geojson of country

get_country

def get_country(
    self,
    q
)
Gets specific country from the database

Args: cid (type): country cid

Returns: featurecollection: geojson of country

get_osm_feature

def get_osm_feature(
    self,
    osm_id
)
Returns geometry of osm_id in geojson

Args: osm_id (type): osm_id of feature

Returns: featurecollection: Geojson

S3FileTransfer

class S3FileTransfer(

)

Responsible for the file transfer to s3 from API maachine

Methods

get_bucket_location

def get_bucket_location(
    self,
    bucket_name
)
Provides the bucket location on aws, takes bucket_name as string -- name of repo on s3

list_buckets

def list_buckets(
    self
)
used to list all the buckets available on s3

upload

def upload(
    self,
    file_path,
    file_name,
    file_suffix=None
)
Used for transferring file to s3 after reading path from the user , It will wait for the upload to complete Parameters :file_path --- your local file path to upload , file_prefix -- prefix for the filename which is stored sample function call : S3FileTransfer.transfer(file_path="exports",file_prefix="upload_test")

Users

class Users(

)

Users class provides CRUD operations for interacting with the 'users' table in the database.

Methods: - create_user(osm_id: int, role: int) -> Dict[str, Any]: Inserts a new user into the database. - read_user(osm_id: int) -> Dict[str, Any]: Retrieves user information based on the given osm_id. - update_user(osm_id: int, update_data: UserUpdate) -> Dict[str, Any]: Updates user information based on the given osm_id. - delete_user(osm_id: int) -> Dict[str, Any]: Deletes a user based on the given osm_id. - read_users(skip: int = 0, limit: int = 10) -> List[Dict[str, Any]]: Retrieves a list of users with optional pagination.

Usage: users = Users()

Methods

create_user

def create_user(
    self,
    osm_id,
    role
)
Inserts a new user into the 'users' table and returns the created user's osm_id.

Args: - osm_id (int): The OSM ID of the new user. - role (int): The role of the new user.

Returns: - Dict[str, Any]: A dictionary containing the osm_id of the newly created user.

Raises: - HTTPException: If the user creation fails.

delete_user

def delete_user(
    self,
    osm_id
)
Deletes a user based on the given osm_id.

Args: - osm_id (int): The OSM ID of the user to delete.

Returns: - Dict[str, Any]: A dictionary containing the deleted user information.

Raises: - HTTPException: If the user with the given osm_id is not found.

read_user

def read_user(
    self,
    osm_id
)
Retrieves user information based on the given osm_id.

Args: - osm_id (int): The OSM ID of the user to retrieve.

Returns: - Dict[str, Any]: A dictionary containing user information if the user is found. If the user is not found, returns a default user with 'role' set to 3.

Raises: - HTTPException: If there's an issue with the database query.

read_users

def read_users(
    self,
    skip=0,
    limit=10
)
Retrieves a list of users with optional pagination.

Args: - skip (int): The number of users to skip (for pagination). - limit (int): The maximum number of users to retrieve (for pagination).

Returns: - List[Dict[str, Any]]: A list of dictionaries containing user information.

update_user

def update_user(
    self,
    osm_id,
    update_data
)
Updates user information based on the given osm_id.

Args: - osm_id (int): The OSM ID of the user to update. - update_data (UserUpdate): The data to update for the user.

Returns: - Dict[str, Any]: A dictionary containing the updated user information.

Raises: - HTTPException: If the user with the given osm_id is not found.