Skip to content

API Docs

Raw Data API Python Client

Client

Get OSM data for a specified area.

This is a convenience wrapper around RawDataClient.get_osm_data.

Parameters:

Name Type Description Default
geometry Union[Dict[str, Any], str]

GeoJSON geometry object or string

required
**kwargs

Additional parameters for customizing the request - fileName: Name for the export file (default: "osm_export") - outputType: Format of the output (default: "geojson") - bindZip: Whether to retrieve results as a zip file (default: False) - filters: Dictionary of filters to apply - geometryType: List of geometry types to include

{}

Returns:

Type Description
RawDataResult

Path to the downloaded data file or directory

Raises:

Type Description
ValidationError

If inputs are invalid

APIRequestError

If the API request fails

TaskPollingError

If polling the task status fails

DownloadError

If downloading data fails

Source code in osm_data_client/client.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
async def get_osm_data(geometry: Union[Dict[str, Any], str], **kwargs) -> RawDataResult:
    """
    Get OSM data for a specified area.

    This is a convenience wrapper around RawDataClient.get_osm_data.

    Args:
        geometry: GeoJSON geometry object or string
        **kwargs: Additional parameters for customizing the request
            - fileName: Name for the export file (default: "osm_export")
            - outputType: Format of the output (default: "geojson")
            - bindZip: Whether to retrieve results as a zip file (default: False)
            - filters: Dictionary of filters to apply
            - geometryType: List of geometry types to include

    Returns:
        Path to the downloaded data file or directory

    Raises:
        ValidationError: If inputs are invalid
        APIRequestError: If the API request fails
        TaskPollingError: If polling the task status fails
        DownloadError: If downloading data fails
    """
    client = RawDataClient()
    return await client.get_osm_data(geometry, **kwargs)

options: show_source: false heading_level: 3

Client for fetching OSM data via the HOTOSM Raw Data API.

This client provides a high-level interface for requesting and downloading OpenStreetMap data for a specified area with customizable filters.

Source code in osm_data_client/client.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
class RawDataClient:
    """
    Client for fetching OSM data via the HOTOSM Raw Data API.

    This client provides a high-level interface for requesting and downloading
    OpenStreetMap data for a specified area with customizable filters.
    """

    def __init__(self, config: RawDataClientConfig = RawDataClientConfig.default()):
        """
        Initialize the client.

        Args:
            config: Configuration for the client
        """
        self.config = config
        self.api = RawDataAPI(config)

    async def get_osm_data(
        self,
        geometry: Union[Dict[str, Any], str],
        output_options: RawDataOutputOptions = RawDataOutputOptions.default(),
        **kwargs,
    ) -> RawDataResult:
        """
        Get OSM data for a specified area.

        Args:
            geometry: GeoJSON geometry object or string
            output_options: Options for controlling output behavior
            **kwargs: Additional parameters for customizing the request
                - fileName: Name for the export file (default: "osm_export")
                - outputType: Format of the output (default: "geojson")
                - bindZip: Whether to retrieve results as a zip file (default: False)
                - filters: Dictionary of filters to apply
                - geometryType: List of geometry types to include

        Returns:
            Path to the downloaded data file or directory

        Raises:
            ValidationError: If inputs are invalid
            APIRequestError: If the API request fails
            TaskPollingError: If polling the task status fails
            DownloadError: If downloading data fails

        Examples:
            >>> data_path = await get_osm_data(
            ...     {"type": "Polygon", "coordinates": [...]},
            ...     fileName="my_buildings",
            ...     outputType="geojson",
            ...     filters={"tags": {"all_geometry": {"building": []}}}
            ... )
        """
        # Validate inputs
        geometry_input = GeometryInput.from_input(geometry)
        params = RequestParams.from_kwargs(**kwargs)

        # Request snapshot
        log.info("Requesting OSM data snapshot for %s", params.file_name)
        task_response = await self.api.request_snapshot(geometry_input, params)

        # Get task link for polling
        task_link = task_response.get("track_link")
        if not task_link:
            raise TaskPollingError("No task link found in API response")

        # Poll for task completion
        result = await self.api.poll_task_status(task_link)

        if result["status"] != "SUCCESS":
            # Handle failure
            error_msg = f"Task failed with status: {result['status']}"
            if result.get("result", {}).get("error_msg"):
                error_msg += f" - {result['result']['error_msg']}"
            raise DownloadError(error_msg)

        # Create metadata from the result
        metadata = RawDataApiMetadata.from_api_result(result, params)
        log.debug("Data metadata: %s", metadata)

        # Download the data
        return await self.api.download_to_disk(metadata, output_options)

__init__(config=RawDataClientConfig.default())

Initialize the client.

Parameters:

Name Type Description Default
config RawDataClientConfig

Configuration for the client

default()
Source code in osm_data_client/client.py
238
239
240
241
242
243
244
245
246
def __init__(self, config: RawDataClientConfig = RawDataClientConfig.default()):
    """
    Initialize the client.

    Args:
        config: Configuration for the client
    """
    self.config = config
    self.api = RawDataAPI(config)

get_osm_data(geometry, output_options=RawDataOutputOptions.default(), **kwargs) async

Get OSM data for a specified area.

Parameters:

Name Type Description Default
geometry Union[Dict[str, Any], str]

GeoJSON geometry object or string

required
output_options RawDataOutputOptions

Options for controlling output behavior

default()
**kwargs

Additional parameters for customizing the request - fileName: Name for the export file (default: "osm_export") - outputType: Format of the output (default: "geojson") - bindZip: Whether to retrieve results as a zip file (default: False) - filters: Dictionary of filters to apply - geometryType: List of geometry types to include

{}

Returns:

Type Description
RawDataResult

Path to the downloaded data file or directory

Raises:

Type Description
ValidationError

If inputs are invalid

APIRequestError

If the API request fails

TaskPollingError

If polling the task status fails

DownloadError

If downloading data fails

Examples:

>>> data_path = await get_osm_data(
...     {"type": "Polygon", "coordinates": [...]},
...     fileName="my_buildings",
...     outputType="geojson",
...     filters={"tags": {"all_geometry": {"building": []}}}
... )
Source code in osm_data_client/client.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
async def get_osm_data(
    self,
    geometry: Union[Dict[str, Any], str],
    output_options: RawDataOutputOptions = RawDataOutputOptions.default(),
    **kwargs,
) -> RawDataResult:
    """
    Get OSM data for a specified area.

    Args:
        geometry: GeoJSON geometry object or string
        output_options: Options for controlling output behavior
        **kwargs: Additional parameters for customizing the request
            - fileName: Name for the export file (default: "osm_export")
            - outputType: Format of the output (default: "geojson")
            - bindZip: Whether to retrieve results as a zip file (default: False)
            - filters: Dictionary of filters to apply
            - geometryType: List of geometry types to include

    Returns:
        Path to the downloaded data file or directory

    Raises:
        ValidationError: If inputs are invalid
        APIRequestError: If the API request fails
        TaskPollingError: If polling the task status fails
        DownloadError: If downloading data fails

    Examples:
        >>> data_path = await get_osm_data(
        ...     {"type": "Polygon", "coordinates": [...]},
        ...     fileName="my_buildings",
        ...     outputType="geojson",
        ...     filters={"tags": {"all_geometry": {"building": []}}}
        ... )
    """
    # Validate inputs
    geometry_input = GeometryInput.from_input(geometry)
    params = RequestParams.from_kwargs(**kwargs)

    # Request snapshot
    log.info("Requesting OSM data snapshot for %s", params.file_name)
    task_response = await self.api.request_snapshot(geometry_input, params)

    # Get task link for polling
    task_link = task_response.get("track_link")
    if not task_link:
        raise TaskPollingError("No task link found in API response")

    # Poll for task completion
    result = await self.api.poll_task_status(task_link)

    if result["status"] != "SUCCESS":
        # Handle failure
        error_msg = f"Task failed with status: {result['status']}"
        if result.get("result", {}).get("error_msg"):
            error_msg += f" - {result['result']['error_msg']}"
        raise DownloadError(error_msg)

    # Create metadata from the result
    metadata = RawDataApiMetadata.from_api_result(result, params)
    log.debug("Data metadata: %s", metadata)

    # Download the data
    return await self.api.download_to_disk(metadata, output_options)

options: show_source: false heading_level: 3

Client for the HOTOSM Raw Data API.

Source code in osm_data_client/client.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class RawDataAPI:
    """Client for the HOTOSM Raw Data API."""

    def __init__(self, config: RawDataClientConfig = RawDataClientConfig.default()):
        """
        Initialize the API client.

        Args:
            config: Configuration for the client
        """
        self.config = config
        self.headers = {
            "accept": "application/json",
            "Content-Type": "application/json",
            "Referer": "raw-data-client-py",
        }

        if config.access_token:
            self.headers["Authorization"] = f"Bearer {config.access_token}"
            log.debug("Using access token for authentication")

    async def request_snapshot(
        self, geometry: GeometryInput, params: RequestParams
    ) -> Dict[str, Any]:
        """
        Request a snapshot of OSM data.

        Args:
            geometry: Validated GeoJSON geometry object
            params: Validated request parameters

        Returns:
            API response with task tracking information

        Raises:
            APIRequestError: If the API request fails
        """
        payload = {
            **params.to_api_params(),
            "geometry": geometry.to_dict(),
        }

        log.debug("Requesting snapshot with params: %s", json.dumps(payload))

        async with ClientSession() as session:
            try:
                async with session.post(
                    f"{self.config.base_api_url}/snapshot/",
                    data=json.dumps(payload),
                    headers=self.headers,
                ) as response:
                    response_data = await response.json()
                    if response.status >= 400:
                        log.error(
                            "API request failed with status %d: %s",
                            response.status,
                            response_data,
                        )
                        raise APIRequestError(response.status, response_data)

                    # Log queue information if available
                    if "queue" in response_data:
                        queue_position = response_data.get("queue", 0)
                        if queue_position > 0:
                            log.info("Request queued at position %d", queue_position)

                    log.debug("Snapshot request successful: %s", response_data)
                    return response_data
            except ClientResponseError as ex:
                log.error("API client error: %s", str(ex))
                raise APIRequestError(ex.status, {}, str(ex)) from ex
            except Exception as ex:
                log.error("Unexpected error in API request: %s", str(ex))
                raise APIRequestError(0, {}, str(ex)) from ex

    async def poll_task_status(
        self, task_link: str, polling_interval: int = 2
    ) -> Dict[str, Any]:
        """
        Poll the API to check task status until completion.

        Args:
            task_link: Task tracking URL
            polling_interval: Seconds between polling attempts

        Returns:
            Task status details

        Raises:
            TaskPollingError: If polling fails
        """
        log.info("Starting task polling: %s", task_link)

        # Track previous status to log changes
        previous_status = None

        async with ClientSession() as session:
            while True:
                try:
                    async with session.get(
                        url=f"{self.config.base_api_url}{task_link}",
                        headers=self.headers,
                    ) as response:
                        if response.status >= 400:
                            response_data = await response.json()
                            log.error(
                                "Polling failed with status %d: %s",
                                response.status,
                                response_data,
                            )
                            raise TaskPollingError(
                                f"Polling failed with status {response.status}: {response_data}"
                            )

                        result = await response.json()
                        current_status = result.get("status")

                        # Log status changes
                        if current_status != previous_status:
                            log.info("Task status: %s", current_status)
                            previous_status = current_status

                        if current_status in ["SUCCESS", "FAILED"]:
                            if current_status == "FAILED":
                                error_msg = result.get("result", {}).get(
                                    "error_msg", "Unknown error"
                                )
                                log.error("Task failed: %s", error_msg)
                            else:
                                log.info("Task completed successfully")
                            return result

                        log.debug(
                            "Task still processing, waiting %d seconds",
                            polling_interval,
                        )
                        await asyncio.sleep(polling_interval)
                except TaskPollingError:
                    raise
                except Exception as ex:
                    log.error("Error polling task status: %s", str(ex))
                    raise TaskPollingError(
                        f"Error polling task status: {str(ex)}"
                    ) from ex

    async def download_to_disk(
        self,
        data: RawDataApiMetadata,
        options: RawDataOutputOptions = RawDataOutputOptions.default(),
    ) -> RawDataResult:
        """
        Stream data from API to disk

        Args:
            data: Metadata containing download information
            options: Options for controlling extraction behavior

        Returns:
            RawDataResult with information about the downloaded file

        Raises:
            DownloadError: If downloading or processing fails
        """
        processor = OutputProcessor(self.config, options)
        file_path = processor.get_output_path(data)

        file_path.parent.mkdir(parents=True, exist_ok=True)
        log.info("Downloading data to %s (%s bytes)", file_path, data.size_bytes)

        try:
            async with ClientSession() as session:
                async with session.get(
                    data.download_url, headers=self.headers
                ) as response:
                    if response.status >= 400:
                        log.error("Download failed with status %d", response.status)
                        raise DownloadError(
                            f"Download failed with status {response.status}"
                        )

                    with open(file_path, "wb") as f:
                        log.debug("Streaming file contents using 1MB chunks")
                        downloaded_bytes = 0
                        async for chunk in response.content.iter_chunked(
                            1024 * 1024
                        ):  # 1MB chunks
                            f.write(chunk)
                            downloaded_bytes += len(chunk)
                            if (
                                data.size_bytes > 10 * 1024 * 1024
                                and downloaded_bytes % (10 * 1024 * 1024) == 0
                            ):
                                progress = (downloaded_bytes / data.size_bytes) * 100
                                log.info(
                                    "Download progress: %.1f%% (%d/%d bytes)",
                                    progress,
                                    downloaded_bytes,
                                    data.size_bytes,
                                )

                    log.info("Download complete: %s", file_path)

                    return await processor.process_download(file_path, data)

        except Exception as ex:
            log.error("Error downloading data: %s", str(ex))
            raise DownloadError(f"Error downloading data: {str(ex)}") from ex

__init__(config=RawDataClientConfig.default())

Initialize the API client.

Parameters:

Name Type Description Default
config RawDataClientConfig

Configuration for the client

default()
Source code in osm_data_client/client.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, config: RawDataClientConfig = RawDataClientConfig.default()):
    """
    Initialize the API client.

    Args:
        config: Configuration for the client
    """
    self.config = config
    self.headers = {
        "accept": "application/json",
        "Content-Type": "application/json",
        "Referer": "raw-data-client-py",
    }

    if config.access_token:
        self.headers["Authorization"] = f"Bearer {config.access_token}"
        log.debug("Using access token for authentication")

download_to_disk(data, options=RawDataOutputOptions.default()) async

Stream data from API to disk

Parameters:

Name Type Description Default
data RawDataApiMetadata

Metadata containing download information

required
options RawDataOutputOptions

Options for controlling extraction behavior

default()

Returns:

Type Description
RawDataResult

RawDataResult with information about the downloaded file

Raises:

Type Description
DownloadError

If downloading or processing fails

Source code in osm_data_client/client.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def download_to_disk(
    self,
    data: RawDataApiMetadata,
    options: RawDataOutputOptions = RawDataOutputOptions.default(),
) -> RawDataResult:
    """
    Stream data from API to disk

    Args:
        data: Metadata containing download information
        options: Options for controlling extraction behavior

    Returns:
        RawDataResult with information about the downloaded file

    Raises:
        DownloadError: If downloading or processing fails
    """
    processor = OutputProcessor(self.config, options)
    file_path = processor.get_output_path(data)

    file_path.parent.mkdir(parents=True, exist_ok=True)
    log.info("Downloading data to %s (%s bytes)", file_path, data.size_bytes)

    try:
        async with ClientSession() as session:
            async with session.get(
                data.download_url, headers=self.headers
            ) as response:
                if response.status >= 400:
                    log.error("Download failed with status %d", response.status)
                    raise DownloadError(
                        f"Download failed with status {response.status}"
                    )

                with open(file_path, "wb") as f:
                    log.debug("Streaming file contents using 1MB chunks")
                    downloaded_bytes = 0
                    async for chunk in response.content.iter_chunked(
                        1024 * 1024
                    ):  # 1MB chunks
                        f.write(chunk)
                        downloaded_bytes += len(chunk)
                        if (
                            data.size_bytes > 10 * 1024 * 1024
                            and downloaded_bytes % (10 * 1024 * 1024) == 0
                        ):
                            progress = (downloaded_bytes / data.size_bytes) * 100
                            log.info(
                                "Download progress: %.1f%% (%d/%d bytes)",
                                progress,
                                downloaded_bytes,
                                data.size_bytes,
                            )

                log.info("Download complete: %s", file_path)

                return await processor.process_download(file_path, data)

    except Exception as ex:
        log.error("Error downloading data: %s", str(ex))
        raise DownloadError(f"Error downloading data: {str(ex)}") from ex

poll_task_status(task_link, polling_interval=2) async

Poll the API to check task status until completion.

Parameters:

Name Type Description Default
task_link str

Task tracking URL

required
polling_interval int

Seconds between polling attempts

2

Returns:

Type Description
Dict[str, Any]

Task status details

Raises:

Type Description
TaskPollingError

If polling fails

Source code in osm_data_client/client.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def poll_task_status(
    self, task_link: str, polling_interval: int = 2
) -> Dict[str, Any]:
    """
    Poll the API to check task status until completion.

    Args:
        task_link: Task tracking URL
        polling_interval: Seconds between polling attempts

    Returns:
        Task status details

    Raises:
        TaskPollingError: If polling fails
    """
    log.info("Starting task polling: %s", task_link)

    # Track previous status to log changes
    previous_status = None

    async with ClientSession() as session:
        while True:
            try:
                async with session.get(
                    url=f"{self.config.base_api_url}{task_link}",
                    headers=self.headers,
                ) as response:
                    if response.status >= 400:
                        response_data = await response.json()
                        log.error(
                            "Polling failed with status %d: %s",
                            response.status,
                            response_data,
                        )
                        raise TaskPollingError(
                            f"Polling failed with status {response.status}: {response_data}"
                        )

                    result = await response.json()
                    current_status = result.get("status")

                    # Log status changes
                    if current_status != previous_status:
                        log.info("Task status: %s", current_status)
                        previous_status = current_status

                    if current_status in ["SUCCESS", "FAILED"]:
                        if current_status == "FAILED":
                            error_msg = result.get("result", {}).get(
                                "error_msg", "Unknown error"
                            )
                            log.error("Task failed: %s", error_msg)
                        else:
                            log.info("Task completed successfully")
                        return result

                    log.debug(
                        "Task still processing, waiting %d seconds",
                        polling_interval,
                    )
                    await asyncio.sleep(polling_interval)
            except TaskPollingError:
                raise
            except Exception as ex:
                log.error("Error polling task status: %s", str(ex))
                raise TaskPollingError(
                    f"Error polling task status: {str(ex)}"
                ) from ex

request_snapshot(geometry, params) async

Request a snapshot of OSM data.

Parameters:

Name Type Description Default
geometry GeometryInput

Validated GeoJSON geometry object

required
params RequestParams

Validated request parameters

required

Returns:

Type Description
Dict[str, Any]

API response with task tracking information

Raises:

Type Description
APIRequestError

If the API request fails

Source code in osm_data_client/client.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def request_snapshot(
    self, geometry: GeometryInput, params: RequestParams
) -> Dict[str, Any]:
    """
    Request a snapshot of OSM data.

    Args:
        geometry: Validated GeoJSON geometry object
        params: Validated request parameters

    Returns:
        API response with task tracking information

    Raises:
        APIRequestError: If the API request fails
    """
    payload = {
        **params.to_api_params(),
        "geometry": geometry.to_dict(),
    }

    log.debug("Requesting snapshot with params: %s", json.dumps(payload))

    async with ClientSession() as session:
        try:
            async with session.post(
                f"{self.config.base_api_url}/snapshot/",
                data=json.dumps(payload),
                headers=self.headers,
            ) as response:
                response_data = await response.json()
                if response.status >= 400:
                    log.error(
                        "API request failed with status %d: %s",
                        response.status,
                        response_data,
                    )
                    raise APIRequestError(response.status, response_data)

                # Log queue information if available
                if "queue" in response_data:
                    queue_position = response_data.get("queue", 0)
                    if queue_position > 0:
                        log.info("Request queued at position %d", queue_position)

                log.debug("Snapshot request successful: %s", response_data)
                return response_data
        except ClientResponseError as ex:
            log.error("API client error: %s", str(ex))
            raise APIRequestError(ex.status, {}, str(ex)) from ex
        except Exception as ex:
            log.error("Unexpected error in API request: %s", str(ex))
            raise APIRequestError(0, {}, str(ex)) from ex

options: show_source: false heading_level: 3

Results

Result object containing processed file path and associated metadata.

Attributes:

Name Type Description
path Path

Path to the final processed file or directory

metadata RawDataApiMetadata

Original metadata from the API response

extracted bool

Whether the file was extracted from an archive

original_path Optional[Path]

Path to the original downloaded file (if different from path)

extracted_files Optional[List[Path]]

List of files that were extracted (if applicable)

Source code in osm_data_client/processing.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass
class RawDataResult:
    """
    Result object containing processed file path and associated metadata.

    Attributes:
        path: Path to the final processed file or directory
        metadata: Original metadata from the API response
        extracted: Whether the file was extracted from an archive
        original_path: Path to the original downloaded file (if different from path)
        extracted_files: List of files that were extracted (if applicable)
    """

    path: Path
    metadata: RawDataApiMetadata
    extracted: bool = False
    original_path: Optional[Path] = None
    extracted_files: Optional[List[Path]] = None

    def exists(self) -> bool:
        """Check if the result file or directory exists."""
        return self.path.exists()

    def __str__(self) -> str:
        """Return string representation of the result."""
        return str(self.path)

__str__()

Return string representation of the result.

Source code in osm_data_client/processing.py
37
38
39
def __str__(self) -> str:
    """Return string representation of the result."""
    return str(self.path)

exists()

Check if the result file or directory exists.

Source code in osm_data_client/processing.py
33
34
35
def exists(self) -> bool:
    """Check if the result file or directory exists."""
    return self.path.exists()

options: show_source: false heading_level: 3

Models

Validated geometry input for OSM API requests.

Source code in osm_data_client/models.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@dataclass
class GeometryInput:
    """Validated geometry input for OSM API requests."""

    type: str
    coordinates: List[Any]

    @classmethod
    def from_input(cls, geometry: Union[Dict[str, Any], str]) -> "GeometryInput":
        """
        Create a GeometryInput from either a dictionary or a JSON string.

        Args:
            geometry: GeoJSON geometry object or string

        Returns:
            Validated GeometryInput object

        Raises:
            ValidationError: If geometry is invalid
        """
        from .exceptions import ValidationError

        if isinstance(geometry, str):
            try:
                geometry_dict = json.loads(geometry)
                log.debug("Parsed geometry from JSON string")
            except json.JSONDecodeError:
                log.error("Failed to parse geometry JSON string")
                raise ValidationError("Invalid GeoJSON string")
        else:
            geometry_dict = geometry

        if (
            geometry_dict.get("type") == "FeatureCollection"
            and "features" in geometry_dict
        ):
            log.debug("Converting FeatureCollection to Geometry")
            if geometry_dict["features"]:
                feature = geometry_dict["features"][0]
                if "geometry" in feature:
                    geometry_dict = feature["geometry"]

        if "type" not in geometry_dict:
            log.error("Geometry missing 'type' field")
            raise ValidationError("Geometry must have a 'type' field")

        if "coordinates" not in geometry_dict:
            log.error("Geometry missing 'coordinates' field")
            raise ValidationError("Geometry must have a 'coordinates' field")

        valid_types = ["Polygon", "MultiPolygon"]
        if geometry_dict["type"] not in valid_types:
            log.error("Invalid geometry type: %s", geometry_dict["type"])
            raise ValidationError(f"Geometry type must be one of {valid_types}")

        # Check CRS if present (basic validation)
        if "crs" in geometry_dict:
            crs = geometry_dict.get("crs", {}).get("properties", {}).get("name")
            valid_crs = [
                "urn:ogc:def:crs:OGC:1.3:CRS84",
                "urn:ogc:def:crs:EPSG::4326",
                "WGS 84",
            ]
            if crs and crs not in valid_crs:
                log.warning("Unsupported CRS: %s. Raw Data API requires EPSG:4326", crs)
                raise ValidationError(
                    "Unsupported coordinate system. Raw Data API requires "
                    "GeoJSON in WGS84 (EPSG:4326) standard."
                )

        # Basic coordinate validation for first coordinate
        try:
            first_coord = cls._get_first_coordinate(geometry_dict["coordinates"])
            if first_coord and not cls._is_valid_coordinate(first_coord):
                log.error("Invalid coordinates: %s", first_coord)
                raise ValidationError(
                    "Coordinates appear to be invalid. Should be longitude/latitude "
                    "in the range of -180 to 180 and -90 to 90 respectively."
                )
        except (IndexError, TypeError):
            log.warning("Could not validate coordinates format")

        log.debug("Validated geometry of type %s", geometry_dict["type"])
        return cls(type=geometry_dict["type"], coordinates=geometry_dict["coordinates"])

    @staticmethod
    def _get_first_coordinate(coordinates):
        """Extract the first coordinate from nested arrays."""
        # For Polygon or MultiPolygon in GeoJSON, we need to navigate the nested structure
        # A Polygon coordinate is [[[x1,y1], [x2,y2], ...]]
        # A MultiPolygon coordinate is [[[[x1,y1], [x2,y2], ...]], ...]

        result = coordinates
        while isinstance(result, list) and isinstance(result[0], list):
            result = result[0]

        return result

    @staticmethod
    def _is_valid_coordinate(coord):
        """Check if a coordinate is valid (within expected range)."""
        if not isinstance(coord, list) or len(coord) < 2:
            return False
        return -180 <= coord[0] <= 180 and -90 <= coord[1] <= 90

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary representation."""
        return {"type": self.type, "coordinates": self.coordinates}

from_input(geometry) classmethod

Create a GeometryInput from either a dictionary or a JSON string.

Parameters:

Name Type Description Default
geometry Union[Dict[str, Any], str]

GeoJSON geometry object or string

required

Returns:

Type Description
GeometryInput

Validated GeometryInput object

Raises:

Type Description
ValidationError

If geometry is invalid

Source code in osm_data_client/models.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@classmethod
def from_input(cls, geometry: Union[Dict[str, Any], str]) -> "GeometryInput":
    """
    Create a GeometryInput from either a dictionary or a JSON string.

    Args:
        geometry: GeoJSON geometry object or string

    Returns:
        Validated GeometryInput object

    Raises:
        ValidationError: If geometry is invalid
    """
    from .exceptions import ValidationError

    if isinstance(geometry, str):
        try:
            geometry_dict = json.loads(geometry)
            log.debug("Parsed geometry from JSON string")
        except json.JSONDecodeError:
            log.error("Failed to parse geometry JSON string")
            raise ValidationError("Invalid GeoJSON string")
    else:
        geometry_dict = geometry

    if (
        geometry_dict.get("type") == "FeatureCollection"
        and "features" in geometry_dict
    ):
        log.debug("Converting FeatureCollection to Geometry")
        if geometry_dict["features"]:
            feature = geometry_dict["features"][0]
            if "geometry" in feature:
                geometry_dict = feature["geometry"]

    if "type" not in geometry_dict:
        log.error("Geometry missing 'type' field")
        raise ValidationError("Geometry must have a 'type' field")

    if "coordinates" not in geometry_dict:
        log.error("Geometry missing 'coordinates' field")
        raise ValidationError("Geometry must have a 'coordinates' field")

    valid_types = ["Polygon", "MultiPolygon"]
    if geometry_dict["type"] not in valid_types:
        log.error("Invalid geometry type: %s", geometry_dict["type"])
        raise ValidationError(f"Geometry type must be one of {valid_types}")

    # Check CRS if present (basic validation)
    if "crs" in geometry_dict:
        crs = geometry_dict.get("crs", {}).get("properties", {}).get("name")
        valid_crs = [
            "urn:ogc:def:crs:OGC:1.3:CRS84",
            "urn:ogc:def:crs:EPSG::4326",
            "WGS 84",
        ]
        if crs and crs not in valid_crs:
            log.warning("Unsupported CRS: %s. Raw Data API requires EPSG:4326", crs)
            raise ValidationError(
                "Unsupported coordinate system. Raw Data API requires "
                "GeoJSON in WGS84 (EPSG:4326) standard."
            )

    # Basic coordinate validation for first coordinate
    try:
        first_coord = cls._get_first_coordinate(geometry_dict["coordinates"])
        if first_coord and not cls._is_valid_coordinate(first_coord):
            log.error("Invalid coordinates: %s", first_coord)
            raise ValidationError(
                "Coordinates appear to be invalid. Should be longitude/latitude "
                "in the range of -180 to 180 and -90 to 90 respectively."
            )
    except (IndexError, TypeError):
        log.warning("Could not validate coordinates format")

    log.debug("Validated geometry of type %s", geometry_dict["type"])
    return cls(type=geometry_dict["type"], coordinates=geometry_dict["coordinates"])

to_dict()

Convert to dictionary representation.

Source code in osm_data_client/models.py
124
125
126
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary representation."""
    return {"type": self.type, "coordinates": self.coordinates}

options: show_source: false heading_level: 3

Validated parameters for OSM API requests.

Source code in osm_data_client/models.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@dataclass
class RequestParams:
    """Validated parameters for OSM API requests."""

    file_name: str = "osm_export"
    output_type: str = "geojson"
    bind_zip: bool = True
    filters: Optional[FilterDict] = None
    geometry_type: Optional[List[str]] = None

    VALID_OUTPUT_TYPES = [
        "geojson",
        "shp",
        "kml",
        "mbtiles",
        "flatgeobuf",
        "csv",
        "geopackage",
        "pgdump",
    ]

    @classmethod
    def from_kwargs(cls, **kwargs) -> "RequestParams":
        """
        Create a RequestParams from keyword arguments.

        Args:
            **kwargs: Keyword arguments for request parameters

        Returns:
            Validated RequestParams object

        Raises:
            ValidationError: If parameters are invalid
        """
        from .exceptions import ValidationError

        # Convert to snake_case internally
        params = {}
        if "fileName" in kwargs:
            params["file_name"] = kwargs.pop("fileName")
        if "outputType" in kwargs:
            params["output_type"] = kwargs.pop("outputType")
        if "geometryType" in kwargs:
            params["geometry_type"] = kwargs.pop("geometryType")
        if "bindZip" in kwargs:
            params["bind_zip"] = kwargs.pop("bindZip")

        params.update(kwargs)

        if "output_type" in params and "bind_zip" in params:
            params["bind_zip"] = RequestParams.validate_bind_zip_compatibility(
                params["output_type"], params["bind_zip"]
            )

        instance = cls(**params)

        if instance.output_type not in cls.VALID_OUTPUT_TYPES:
            log.error("Invalid output type: %s", instance.output_type)
            raise ValidationError(f"outputType must be one of {cls.VALID_OUTPUT_TYPES}")

        return instance

    def to_api_params(self) -> Dict[str, Any]:
        """Convert to API parameter dictionary."""
        # Convert to camelCase for API
        params = {
            "fileName": self.file_name,
            "outputType": self.output_type,
            "bindZip": self.bind_zip,
        }

        if self.filters:
            params["filters"] = self.filters

        if self.geometry_type:
            params["geometryType"] = self.geometry_type

        return params

    @staticmethod
    def validate_bind_zip_compatibility(output_type, bind_zip):
        """Validate if the output format is compatible with bindZip=False"""
        streaming_compatible_formats = ["geojson", "cog"]  # Cloud Optimized GeoTIFF

        if not bind_zip and output_type.lower() not in streaming_compatible_formats:
            log.warning(
                f"Format '{output_type}' requires ZIP packaging. "
                f"Automatically setting bindZip=True"
            )
            return True
        return bind_zip

from_kwargs(**kwargs) classmethod

Create a RequestParams from keyword arguments.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments for request parameters

{}

Returns:

Type Description
RequestParams

Validated RequestParams object

Raises:

Type Description
ValidationError

If parameters are invalid

Source code in osm_data_client/models.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@classmethod
def from_kwargs(cls, **kwargs) -> "RequestParams":
    """
    Create a RequestParams from keyword arguments.

    Args:
        **kwargs: Keyword arguments for request parameters

    Returns:
        Validated RequestParams object

    Raises:
        ValidationError: If parameters are invalid
    """
    from .exceptions import ValidationError

    # Convert to snake_case internally
    params = {}
    if "fileName" in kwargs:
        params["file_name"] = kwargs.pop("fileName")
    if "outputType" in kwargs:
        params["output_type"] = kwargs.pop("outputType")
    if "geometryType" in kwargs:
        params["geometry_type"] = kwargs.pop("geometryType")
    if "bindZip" in kwargs:
        params["bind_zip"] = kwargs.pop("bindZip")

    params.update(kwargs)

    if "output_type" in params and "bind_zip" in params:
        params["bind_zip"] = RequestParams.validate_bind_zip_compatibility(
            params["output_type"], params["bind_zip"]
        )

    instance = cls(**params)

    if instance.output_type not in cls.VALID_OUTPUT_TYPES:
        log.error("Invalid output type: %s", instance.output_type)
        raise ValidationError(f"outputType must be one of {cls.VALID_OUTPUT_TYPES}")

    return instance

to_api_params()

Convert to API parameter dictionary.

Source code in osm_data_client/models.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def to_api_params(self) -> Dict[str, Any]:
    """Convert to API parameter dictionary."""
    # Convert to camelCase for API
    params = {
        "fileName": self.file_name,
        "outputType": self.output_type,
        "bindZip": self.bind_zip,
    }

    if self.filters:
        params["filters"] = self.filters

    if self.geometry_type:
        params["geometryType"] = self.geometry_type

    return params

validate_bind_zip_compatibility(output_type, bind_zip) staticmethod

Validate if the output format is compatible with bindZip=False

Source code in osm_data_client/models.py
209
210
211
212
213
214
215
216
217
218
219
220
@staticmethod
def validate_bind_zip_compatibility(output_type, bind_zip):
    """Validate if the output format is compatible with bindZip=False"""
    streaming_compatible_formats = ["geojson", "cog"]  # Cloud Optimized GeoTIFF

    if not bind_zip and output_type.lower() not in streaming_compatible_formats:
        log.warning(
            f"Format '{output_type}' requires ZIP packaging. "
            f"Automatically setting bindZip=True"
        )
        return True
    return bind_zip

options: show_source: false heading_level: 3

Immutable metadata about a dataset

Source code in osm_data_client/models.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
@dataclass(frozen=True)
class RawDataApiMetadata:
    """Immutable metadata about a dataset"""

    task_id: str
    format_ext: str
    timestamp: str
    size_bytes: int
    file_name: str
    download_url: str
    is_zipped: bool
    bbox: Optional[Tuple[float, float, float, float]] = None

    @classmethod
    def from_api_result(
        cls, result: Dict[str, Any], params: RequestParams
    ) -> "RawDataApiMetadata":
        """
        Create a RawDataApiMetadata from API result and request parameters.

        Args:
            result: API result dictionary from task status
            params: Request parameters used for the API request

        Returns:
            RawDataApiMetadata instance
        """
        task_result = result.get("result", {})
        task_id = result.get("id", "")
        timestamp = task_result.get("response_time", "")
        size_bytes = task_result.get("zip_file_size_bytes", 0)
        download_url = task_result.get("download_url", "")

        bbox = None
        query_area = task_result.get("queryArea", "")
        if query_area and query_area.startswith("bbox[") and query_area.endswith("]"):
            try:
                coords_str = query_area[5:-1]  # Remove "bbox[" and "]"
                coords = [float(x) for x in coords_str.split(",")]
                if len(coords) == 4:
                    bbox = tuple(coords)
                    log.debug("Extracted bbox: %s", bbox)
            except (ValueError, IndexError):
                log.warning("Could not parse bbox from queryArea: %s", query_area)

        return cls(
            is_zipped=params.bind_zip,
            file_name=params.file_name,
            task_id=task_id,
            format_ext=params.output_type,
            timestamp=timestamp,
            size_bytes=size_bytes,
            download_url=download_url,
            bbox=bbox,
        )

    def __str__(self) -> str:
        """Returns a string representation of RawDataApiMetadata for debugging purposes."""
        bbox_str = f"{self.bbox}" if self.bbox else "None"

        return (
            f"RawDataApiMetadata(\n"
            f"  task_id: {self.task_id}\n"
            f"  format_ext: {self.format_ext}\n"
            f"  timestamp: {self.timestamp}\n"
            f"  size_bytes: {self.size_bytes:,} bytes ({self._format_size()})\n"
            f"  file_name: {self.file_name}\n"
            f"  is_zipped: {self.is_zipped}\n"
            f"  bbox: {bbox_str}\n"
            f")"
        )

    def _format_size(self) -> str:
        """Helper method to format size in human-readable form."""
        size = self.size_bytes
        for unit in ["B", "KB", "MB", "GB", "TB"]:
            if size < 1024 or unit == "TB":
                return f"{size:.2f} {unit}"
            size /= 1024

__str__()

Returns a string representation of RawDataApiMetadata for debugging purposes.

Source code in osm_data_client/models.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def __str__(self) -> str:
    """Returns a string representation of RawDataApiMetadata for debugging purposes."""
    bbox_str = f"{self.bbox}" if self.bbox else "None"

    return (
        f"RawDataApiMetadata(\n"
        f"  task_id: {self.task_id}\n"
        f"  format_ext: {self.format_ext}\n"
        f"  timestamp: {self.timestamp}\n"
        f"  size_bytes: {self.size_bytes:,} bytes ({self._format_size()})\n"
        f"  file_name: {self.file_name}\n"
        f"  is_zipped: {self.is_zipped}\n"
        f"  bbox: {bbox_str}\n"
        f")"
    )

from_api_result(result, params) classmethod

Create a RawDataApiMetadata from API result and request parameters.

Parameters:

Name Type Description Default
result Dict[str, Any]

API result dictionary from task status

required
params RequestParams

Request parameters used for the API request

required

Returns:

Type Description
RawDataApiMetadata

RawDataApiMetadata instance

Source code in osm_data_client/models.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
@classmethod
def from_api_result(
    cls, result: Dict[str, Any], params: RequestParams
) -> "RawDataApiMetadata":
    """
    Create a RawDataApiMetadata from API result and request parameters.

    Args:
        result: API result dictionary from task status
        params: Request parameters used for the API request

    Returns:
        RawDataApiMetadata instance
    """
    task_result = result.get("result", {})
    task_id = result.get("id", "")
    timestamp = task_result.get("response_time", "")
    size_bytes = task_result.get("zip_file_size_bytes", 0)
    download_url = task_result.get("download_url", "")

    bbox = None
    query_area = task_result.get("queryArea", "")
    if query_area and query_area.startswith("bbox[") and query_area.endswith("]"):
        try:
            coords_str = query_area[5:-1]  # Remove "bbox[" and "]"
            coords = [float(x) for x in coords_str.split(",")]
            if len(coords) == 4:
                bbox = tuple(coords)
                log.debug("Extracted bbox: %s", bbox)
        except (ValueError, IndexError):
            log.warning("Could not parse bbox from queryArea: %s", query_area)

    return cls(
        is_zipped=params.bind_zip,
        file_name=params.file_name,
        task_id=task_id,
        format_ext=params.output_type,
        timestamp=timestamp,
        size_bytes=size_bytes,
        download_url=download_url,
        bbox=bbox,
    )

options: show_source: false heading_level: 3

Bases: Enum

Options for controlling extraction behavior of ZIP archives.

Source code in osm_data_client/models.py
304
305
306
307
308
309
class AutoExtractOption(Enum):
    """Options for controlling extraction behavior of ZIP archives."""

    automatic = auto()  # Decide based on format and size
    force_zip = auto()  # Always keep as zip
    force_extract = auto()  # Always extract regardless of size/format

options: show_source: false heading_level: 3

Configuration for Raw Data API client.

Source code in osm_data_client/models.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
@dataclass
class RawDataClientConfig:
    """Configuration for Raw Data API client."""

    access_token: Optional[str] = None
    memory_threshold_mb: int = 50
    base_api_url: str = "https://api-prod.raw-data.hotosm.org/v1"
    output_directory: Path = Path.cwd()

    @property
    def memory_threshold_bytes(self) -> int:
        """Convert memory threshold to bytes."""
        return self.memory_threshold_mb * 1024 * 1024

    @classmethod
    def default(cls) -> "RawDataClientConfig":
        """Create a default configuration."""
        return cls()

memory_threshold_bytes: int property

Convert memory threshold to bytes.

default() classmethod

Create a default configuration.

Source code in osm_data_client/models.py
326
327
328
329
@classmethod
def default(cls) -> "RawDataClientConfig":
    """Create a default configuration."""
    return cls()

options: show_source: false heading_level: 3

Options for controlling how output data is handled.

Source code in osm_data_client/models.py
332
333
334
335
336
337
338
339
340
341
@dataclass
class RawDataOutputOptions:
    """Options for controlling how output data is handled."""

    auto_extract: AutoExtractOption = AutoExtractOption.automatic

    @classmethod
    def default(cls) -> "RawDataOutputOptions":
        """Create default output options."""
        return cls()

default() classmethod

Create default output options.

Source code in osm_data_client/models.py
338
339
340
341
@classmethod
def default(cls) -> "RawDataOutputOptions":
    """Create default output options."""
    return cls()

options: show_source: false heading_level: 3