Module src.app
Page contains Main core logic of app
Variables
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
BUCKET_NAME
DEFAULT_README_TEXT
ENABLE_CUSTOM_EXPORTS
ENABLE_HDX_EXPORTS
ENABLE_POLYGON_STATISTICS_ENDPOINTS
ENABLE_SOZIP
ENABLE_TILES
EXPORT_MAX_AREA_SQKM
EXPORT_TYPE_MAPPING
HDX_FILTER_CRITERIA
HDX_MARKDOWN
LOCAL_CON_POOL
MAX_WORKERS
PARALLEL_PROCESSING_CATEGORIES
POLYGON_STATISTICS_API_URL
PROCESS_SINGLE_CATEGORY_IN_POSTGRES
USE_DUCK_DB_FOR_CUSTOM_EXPORTS
USE_S3_TO_UPLOAD
database_instance
export_path
index_threshold
level
use_connection_pooling
Functions
check_for_json
def check_for_json(
result_str
)
Return: bool: True in case of success, False otherwise
convert_dict_to_conn_str
def convert_dict_to_conn_str(
db_dict
)
dict_none_clean
def dict_none_clean(
to_clean
)
generate_ogr2ogr_cmd_from_psql
def generate_ogr2ogr_cmd_from_psql(
export_file_path,
export_file_format_driver,
postgres_query,
layer_creation_options,
query_dump_path
)
print_psycopg2_exception
def print_psycopg2_exception(
err
)
run_ogr2ogr_cmd
def run_ogr2ogr_cmd(
cmd
)
Args: cmd (type): Command to run for subprocess binding_file_dir (type): description
Raises: Exception: If process gets failed
Classes
Cron
class Cron(
)
Methods
create_cron
def create_cron(
self,
cron_data
)
Args: cron_data (dict): Data for creating the Cron entry.
Returns: dict: Result of the cron creation process.
delete_cron
def delete_cron(
self,
cron_id: int
)
Args: cron_id (int): ID of the Cron entry to delete.
Returns: dict: Result of the Cron deletion process.
Raises: HTTPException: If the Cron entry is not found.
get_cron_by_id
def get_cron_by_id(
self,
cron_id: int
)
Args: cron_id (int): ID of the Cron entry to retrieve.
Returns: dict: Details of the requested Cron entry.
Raises: HTTPException: If the Cron entry is not found.
get_cron_list_with_filters
def get_cron_list_with_filters(
self,
skip: int = 0,
limit: int = 10,
filters: dict = {}
)
Args: skip (int): Number of entries to skip. limit (int): Maximum number of entries to retrieve. filters (dict): Filtering criteria.
Returns: List[dict]: List of Cron entries.
patch_cron
def patch_cron(
self,
cron_id: int,
cron_data: dict
)
Args: cron_id (int): ID of the Cron entry to update. cron_data (dict): Data for partially updating the Cron entry.
Returns: dict: Result of the Cron update process.
Raises: HTTPException: If the Cron entry is not found.
search_cron_by_dataset_title
def search_cron_by_dataset_title(
self,
dataset_title: str,
skip: int = 0,
limit: int = 10
)
Args: dataset_title (str): The title of the dataset to search for. skip (int): Number of entries to skip. limit (int): Maximum number of entries to retrieve.
Returns: List[dict]: List of Cron entries matching the dataset title.
update_cron
def update_cron(
self,
cron_id: int,
cron_data
)
Args: cron_id (int): ID of the Cron entry to update. cron_data (dict): Data for updating the Cron entry.
Returns: dict: Result of the Cron update process.
Raises: HTTPException: If the Cron entry is not found.
CustomExport
class CustomExport(
params,
uid=None
)
Constructor for the custom export class.
Parameters: - params (DynamicCategoriesModel): An instance of DynamicCategoriesModel containing configuration settings.
Methods
clean_resources
def clean_resources(
self
)
file_to_zip
def file_to_zip(
self,
working_dir,
zip_path
)
Parameters: - working_dir (str): Path to the directory containing files to be zipped. - zip_path (str): Path to the resulting ZIP file.
Returns: - Path to the created ZIP file.
format_where_clause_duckdb
def format_where_clause_duckdb(
self,
where_clause
)
Parameters: - where_clause (str): SQL-like condition to filter features.
Returns: - Formatted where_clause.
process_category
def process_category(
self,
category
)
Parameters: - category (Dict[str, CategoryModel]): Dictionary representing a category.
Returns: - List of resource dictionaries containing export information.
process_category_result
def process_category_result(
self,
category_result
)
Parameters: - category_result (CategoryResult): Instance of CategoryResult.
Returns: - Dictionary containing processed category result.
process_custom_categories
def process_custom_categories(
self
)
Returns: - Dictionary containing the processed dataset information.
query_to_file
def query_to_file(
self,
query,
category_name,
feature_type,
export_formats
)
Parameters: - query (str): SQL query to execute. - category_name (str): Name of the category. - feature_type (str): Feature type. - export_formats (List[ExportTypeInfo]): List of export formats.
Returns: - List of resource dictionaries containing export information.
resource_to_hdx
def resource_to_hdx(
self,
uploaded_resources,
dataset_config,
category
)
Parameters: - uploaded_resources (List[Dict[str, Any]]): List of resource dictionaries. - dataset_config (DatasetConfig): Instance of DatasetConfig. - category (Dict[str, CategoryModel]): Dictionary representing a category.
Returns: - Dictionary containing the HDX upload information.
resource_to_response
def resource_to_response(
self,
uploaded_resources,
category
)
Parameters: - uploaded_resources (List[Dict[str, Any]]): List of resource dictionaries. - category (Dict[str, CategoryModel]): Dictionary representing a category.
Returns: - Dictionary containing the response information.
types_to_tables
def types_to_tables(
self,
type_list: list
)
Parameters: - type_list (List[str]): List of feature types.
Returns: - List of database tables associated with the given feature types.
upload_resources
def upload_resources(
self,
resource_path
)
Parameters: - resource_path (str): Path to the resource file on the local filesystem.
Returns: - Download URL for the uploaded resource.
zip_to_s3
def zip_to_s3(
self,
resources
)
Parameters: - resources (List[Dict[str, Any]]): List of resource dictionaries.
Returns: - List of resource dictionaries with added download URLs.
Database
class Database(
db_params
)
Database class is used to connect with your database , run query and get result from it . It has all tests and validation inside class
Methods
close_conn
def close_conn(
self
)
connect
def connect(
self
)
executequery
def executequery(
self,
query
)
DownloadMetrics
class DownloadMetrics(
)
Methods
get_summary_stats
def get_summary_stats(
self,
start_date,
end_date,
group_by
)
DuckDB
class DuckDB(
db_path,
temp_dir=None
)
Constructor for the DuckDB class.
Parameters: - db_path (str): The path to the DuckDB database file.
Methods
run_query
def run_query(
self,
query,
attach_pgsql=False,
load_spatial=False
)
Parameters: - query (str): The SQL query to execute. - attach_pgsql (bool): Flag to indicate whether to attach a PostgreSQL database. - load_spatial (bool): Flag to indicate whether to load the spatial extension.
HDXUploader
class HDXUploader(
category,
hdx,
uuid,
default_category_path,
completeness_metadata=None
)
Constructor for the HDXUploader class.
Parameters: - category (Dict[str, CategoryModel]): Dictionary representing a category. - hdx (HDX): Instance of the HDX class. - uuid (str): Universally unique identifier. - default_category_path (str): Default path for the category. - completeness_metadata (Optional[Dict[str, Any]]): Metadata for completeness.
Methods
add_notes
def add_notes(
self
)
Returns: - Notes string.
add_resource
def add_resource(
self,
resource_meta
)
Parameters: - resource_meta (Dict[str, Any]): Metadata for the resource.
init_dataset
def init_dataset(
self
)
slugify
def slugify(
self,
name
)
Parameters: - name (str): Input string.
Returns: - Slugified string.
upload_dataset
def upload_dataset(
self,
dump_config_to_s3=False
)
Parameters: - dump_config_to_s3 (bool): Flag to indicate whether to dump configuration to S3.
Returns: - Tuple containing category name and dataset information.
PolygonStats
class PolygonStats(
geojson=None,
iso3=None
)
Generates stats for polygon
Static methods
get_building_pattern_statement
def get_building_pattern_statement(
osm_building_count,
ai_building_count,
avg_timestamp,
last_edit_timestamp,
osm_building_count_6_months
)
Args: osm_building_count (int): Count of buildings from OpenStreetMap. ai_building_count (int): Count of buildings from AI estimates. avg_timestamp (timestamp): Average timestamp of data. last_edit_timestamp(timestamp): Last edit timestamp of an area osm_building_count_6_months (int): Count of buildings updated in the last 6 months.
Returns: str: Human-readable building statement.
get_road_pattern_statement
def get_road_pattern_statement(
osm_highway_length,
ai_highway_length,
avg_timestamp,
last_edit_timestamp,
osm_highway_length_6_months
)
Args: osm_highway_length (float): Length of roads from OpenStreetMap. ai_highway_length (float): Length of roads from AI estimates. avg_timestamp (str): Average timestamp of data. osm_highway_length_6_months (float): Length of roads updated in the last 6 months.
Returns: str: Human-readable road statement.
Methods
get_osm_analytics_meta_stats
def get_osm_analytics_meta_stats(
self
)
Returns: dict: Raw statistics translated into JSON.
get_summary_stats
def get_summary_stats(
self
)
Returns: dict: Summary statistics including building and road statements.
RawData
class RawData(
parameters=None,
request_uid='raw-data-api',
dbdict=None
)
Class responsible for the Rawdata Extraction from available sources , Currently Works for Underpass source Current Snapshot Returns: Geojson Zip file Supports: -Any Key value pair of osm tags -A Polygon -Osm element type (Optional)
Static methods
close_con
def close_con(
con
)
get_grid_id
def get_grid_id(
geom,
cur
)
Args: geom (type): description cur (type): description
Returns: type: grid id , geometry dump and the area of geometry
ogr_export
def ogr_export(
query,
outputtype,
working_dir,
dump_temp_path,
params
)
Args: query (type): Postgresql query to extract outputtype (type): description working_dir (type): description dump_temp_path (type): temp file path for metadata gen params (type): description
ogr_export_shp
def ogr_export_shp(
point_query,
line_query,
poly_query,
working_dir,
file_name
)
query2geojson
def query2geojson(
con,
extraction_query,
dump_temp_file_path
)
Methods
check_status
def check_status(
self
)
cleanup
def cleanup(
self
)
extract_current_data
def extract_current_data(
self,
exportname
)
Returns: geom_area: area of polygon supplied working_dir: dir where results are saved
get_countries_list
def get_countries_list(
self,
q
)
Args: q (type): list filter query string
Returns: featurecollection: geojson of country
get_country
def get_country(
self,
q
)
Args: cid (type): country cid
Returns: featurecollection: geojson of country
get_osm_feature
def get_osm_feature(
self,
osm_id
)
Args: osm_id (type): osm_id of feature
Returns: featurecollection: Geojson
S3FileTransfer
class S3FileTransfer(
)
Responsible for the file transfer to s3 from API maachine
Methods
get_bucket_location
def get_bucket_location(
self,
bucket_name
)
list_buckets
def list_buckets(
self
)
upload
def upload(
self,
file_path,
file_name,
file_suffix=None
)
Users
class Users(
)
Users class provides CRUD operations for interacting with the 'users' table in the database.
Methods: - create_user(osm_id: int, role: int) -> Dict[str, Any]: Inserts a new user into the database. - read_user(osm_id: int) -> Dict[str, Any]: Retrieves user information based on the given osm_id. - update_user(osm_id: int, update_data: UserUpdate) -> Dict[str, Any]: Updates user information based on the given osm_id. - delete_user(osm_id: int) -> Dict[str, Any]: Deletes a user based on the given osm_id. - read_users(skip: int = 0, limit: int = 10) -> List[Dict[str, Any]]: Retrieves a list of users with optional pagination.
Usage: users = Users()
Methods
create_user
def create_user(
self,
osm_id,
role
)
Args: - osm_id (int): The OSM ID of the new user. - role (int): The role of the new user.
Returns: - Dict[str, Any]: A dictionary containing the osm_id of the newly created user.
Raises: - HTTPException: If the user creation fails.
delete_user
def delete_user(
self,
osm_id
)
Args: - osm_id (int): The OSM ID of the user to delete.
Returns: - Dict[str, Any]: A dictionary containing the deleted user information.
Raises: - HTTPException: If the user with the given osm_id is not found.
read_user
def read_user(
self,
osm_id
)
Args: - osm_id (int): The OSM ID of the user to retrieve.
Returns: - Dict[str, Any]: A dictionary containing user information if the user is found. If the user is not found, returns a default user with 'role' set to 3.
Raises: - HTTPException: If there's an issue with the database query.
read_users
def read_users(
self,
skip=0,
limit=10
)
Args: - skip (int): The number of users to skip (for pagination). - limit (int): The maximum number of users to retrieve (for pagination).
Returns: - List[Dict[str, Any]]: A list of dictionaries containing user information.
update_user
def update_user(
self,
osm_id,
update_data
)
Args: - osm_id (int): The OSM ID of the user to update. - update_data (UserUpdate): The data to update for the user.
Returns: - Dict[str, Any]: A dictionary containing the updated user information.
Raises: - HTTPException: If the user with the given osm_id is not found.