Skip to content

API Docs

Async Reverse Geocode

Reverse geocoding to the nearest city over 1000 population.

Source code in pg_nearest_city/_async/nearest_city.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class AsyncNearestCity:
    """Reverse geocoding to the nearest city over 1000 population."""

    def __init__(
        self,
        db: psycopg.AsyncConnection | DbConfig | None = None,
        logger: Optional[logging.Logger] = None,
    ):
        """Initialize reverse geocoder with an existing AsyncConnection.

        Args:
            db: An existing psycopg AsyncConnection
            connection: psycopg.AsyncConnection
            logger: Optional custom logger. If not provided, uses package logger.
        """
        # Allow users to provide their own logger while having a sensible default
        self._logger = logger or logging.getLogger("pg_nearest_city")
        self._db = db
        self.connection: psycopg.AsyncConnection = None
        self._is_external_connection = False
        self._is_initialized = False

        with importlib.resources.path(
            "pg_nearest_city.data", "cities_1000_simple.txt.gz"
        ) as cities_path:
            self.cities_file = cities_path
        with importlib.resources.path(
            "pg_nearest_city.data", "voronois.wkb.gz"
        ) as voronoi_path:
            self.voronoi_file = voronoi_path

    async def __aenter__(self):
        """Open the context manager."""
        self.connection = await self.get_connection(self._db)
        # Create the relevant tables and validate
        await self.initialize()
        self._is_initialized = True
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        """Close the context manager."""
        if self.connection and not self._is_external_connection:
            await self.connection.close()
        self._initialized = False

    async def get_connection(
        self,
        db: Optional[psycopg.AsyncConnection | DbConfig] = None,
    ) -> psycopg.AsyncConnection:
        """Determine the database connection to use."""
        self._is_external_connection = isinstance(db, psycopg.AsyncConnection)
        is_db_config = isinstance(db, DbConfig)

        if self._is_external_connection:
            return db
        elif is_db_config:
            return await psycopg.AsyncConnection.connect(db.get_connection_string())
        else:
            # Fallback to env var extraction, or defaults for testing
            return await psycopg.AsyncConnection.connect(
                DbConfig().get_connection_string(),
            )

    async def initialize(self) -> None:
        """Initialize the geocoding database with validation checks."""
        if not self.connection:
            self._inform_user_if_not_context_manager()

        try:
            async with self.connection.cursor() as cur:
                self._logger.info("Starting database initialization check")
                status = await self._check_initialization_status(cur)

                if status.is_fully_initialized:
                    self._logger.info("Database already properly initialized")
                    return

                if status.has_table and not status.is_fully_initialized:
                    missing = status.get_missing_components()
                    self._logger.warning(
                        "Database needs repair. Missing components: %s",
                        ", ".join(missing),
                    )
                    self._logger.info("Reinitializing from scratch")
                    await cur.execute("DROP TABLE IF EXISTS pg_nearest_city_geocoding;")

                self._logger.info("Creating geocoding table")
                await self._create_geocoding_table(cur)

                self._logger.info("Importing city data")
                await self._import_cities(cur)

                self._logger.info("Processing Voronoi polygons")
                await self._import_voronoi_polygons(cur)

                self._logger.info("Creating spatial index")
                await self._create_spatial_index(cur)

                await self.connection.commit()

                self._logger.debug("Verifying final initialization state")
                final_status = await self._check_initialization_status(cur)
                if not final_status.is_fully_initialized:
                    missing = final_status.get_missing_components()
                    self._logger.error(
                        "Initialization failed final validation. Missing: %s",
                        ", ".join(missing),
                    )
                    raise RuntimeError(
                        "Initialization failed final validation. "
                        f"Missing components: {', '.join(missing)}"
                    )

                self._logger.info("Initialization complete and verified")

        except Exception as e:
            self._logger.error("Database initialization failed: %s", str(e))
            raise RuntimeError(f"Database initialization failed: {str(e)}") from e

    def _inform_user_if_not_context_manager(self):
        """Raise an error if the context manager was not used."""
        if not self._is_initialized:
            raise RuntimeError(
                fill(
                    dedent("""
                AsyncNearestCity must be used within 'async with' context.\n
                    For example:\n
                    async with AsyncNearestCity() as geocoder:\n
                        details = geocoder.query(lat, lon)
            """)
                )
            )

    async def query(self, lat: float, lon: float) -> Optional[Location]:
        """Find the nearest city to the given coordinates using Voronoi regions.

        Args:
            lat: Latitude in degrees (-90 to 90)
            lon: Longitude in degrees (-180 to 180)

        Returns:
            Location object if a matching city is found, None otherwise

        Raises:
            ValueError: If coordinates are out of valid ranges
            RuntimeError: If database query fails
        """
        # Throw an error if not used in 'with' block
        self._inform_user_if_not_context_manager()

        # Validate coordinate ranges
        BaseNearestCity.validate_coordinates(lon, lat)

        try:
            async with self.connection.cursor() as cur:
                await cur.execute(
                    BaseNearestCity._get_reverse_geocoding_query(lon, lat),
                )
                result = await cur.fetchone()

                if not result:
                    return None

                return Location(
                    city=result[0],
                    country=result[1],
                    lat=float(result[2]),
                    lon=float(result[3]),
                )
        except Exception as e:
            self._logger.error(f"Reverse geocoding failed: {str(e)}")
            raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

    async def _check_initialization_status(
        self,
        cur: psycopg.AsyncCursor,
    ) -> InitializationStatus:
        """Check the status and integrity of the geocoding database.

        Performs essential validation checks to ensure the database is
        properly initialized and contains valid data.
        """
        status = InitializationStatus()

        # Check table existence
        await cur.execute(BaseNearestCity._get_tableexistence_query())
        table_exists = await cur.fetchone()
        status.has_table = bool(table_exists and table_exists[0])

        # If table doesn't exist, we can't check other properties
        if not status.has_table:
            return status

        # Check table structure
        await cur.execute(BaseNearestCity._get_table_structure_query())
        columns = {col: dtype for col, dtype in await cur.fetchall()}
        expected_columns = {
            "city": "character varying",
            "country": "character varying",
            "lat": "numeric",
            "lon": "numeric",
            "geom": "geometry",
            "voronoi": "geometry",
        }
        status.has_valid_structure = all(col in columns for col in expected_columns)
        # If table doesn't have valid structure, we can't check other properties
        if not status.has_valid_structure:
            return status

        # Check data completeness
        await cur.execute(BaseNearestCity._get_data_completeness_query())
        counts = await cur.fetchone()
        total_cities, cities_with_voronoi = counts

        status.has_data = total_cities > 0
        status.has_complete_voronoi = cities_with_voronoi == total_cities

        # Check spatial index
        await cur.execute(BaseNearestCity._get_spatial_index_check_query())
        has_index = await cur.fetchone()
        status.has_spatial_index = bool(has_index and has_index[0])

        return status

    async def _import_cities(self, cur: AsyncCursor):
        if not self.cities_file.exists():
            raise FileNotFoundError(f"Cities file not found: {self.cities_file}")

        """Import city data using COPY protocol."""
        async with cur.copy(
            "COPY pg_nearest_city_geocoding(city, country, lat, lon) FROM STDIN"
        ) as copy:
            with gzip.open(self.cities_file, "r") as f:
                copied_bytes = 0
                while data := f.read(8192):
                    await copy.write(data)
                    copied_bytes += len(data)
                self._logger.info(f"Imported {copied_bytes:,} bytes of city data")

    async def _create_geocoding_table(self, cur: AsyncCursor):
        """Create the main table."""
        await cur.execute("""
            CREATE TABLE pg_nearest_city_geocoding (
                city varchar,
                country varchar,
                lat decimal,
                lon decimal,
                geom geometry(Point,4326)
                  GENERATED ALWAYS AS (ST_SetSRID(ST_MakePoint(lon, lat), 4326))
                  STORED,
                voronoi geometry(Polygon,4326)
            );
        """)

    async def _import_voronoi_polygons(self, cur: AsyncCursor):
        """Import and integrate Voronoi polygons into the main table."""
        if not self.voronoi_file.exists():
            raise FileNotFoundError(f"Voronoi file not found: {self.voronoi_file}")

        # First create temporary table for the import
        await cur.execute("""
            CREATE TEMP TABLE voronoi_import (
                city text,
                country text,
                wkb bytea
            );
        """)

        # Import the binary WKB data
        async with cur.copy(
            "COPY voronoi_import (city, country, wkb) FROM STDIN",
        ) as copy:
            with gzip.open(self.voronoi_file, "rb") as f:
                while data := f.read(8192):
                    await copy.write(data)

        # Update main table with Voronoi geometries
        await cur.execute("""
            UPDATE pg_nearest_city_geocoding g
            SET voronoi = ST_GeomFromWKB(v.wkb, 4326)
            FROM voronoi_import v
            WHERE g.city = v.city
            AND g.country = v.country;
        """)

        # Clean up temporary table
        await cur.execute("DROP TABLE voronoi_import;")

    async def _create_spatial_index(self, cur: AsyncCursor):
        """Create a spatial index on the Voronoi polygons for efficient queries."""
        await cur.execute("""
            CREATE INDEX geocoding_voronoi_idx
            ON pg_nearest_city_geocoding
            USING GIST (voronoi);
        """)

__aenter__() async

Open the context manager.

Source code in pg_nearest_city/_async/nearest_city.py
53
54
55
56
57
58
59
async def __aenter__(self):
    """Open the context manager."""
    self.connection = await self.get_connection(self._db)
    # Create the relevant tables and validate
    await self.initialize()
    self._is_initialized = True
    return self

__aexit__(exc_type, exc_value, traceback) async

Close the context manager.

Source code in pg_nearest_city/_async/nearest_city.py
61
62
63
64
65
async def __aexit__(self, exc_type, exc_value, traceback):
    """Close the context manager."""
    if self.connection and not self._is_external_connection:
        await self.connection.close()
    self._initialized = False

__init__(db=None, logger=None)

Initialize reverse geocoder with an existing AsyncConnection.

Parameters:

Name Type Description Default
db AsyncConnection | DbConfig | None

An existing psycopg AsyncConnection

None
connection

psycopg.AsyncConnection

required
logger Optional[Logger]

Optional custom logger. If not provided, uses package logger.

None
Source code in pg_nearest_city/_async/nearest_city.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    db: psycopg.AsyncConnection | DbConfig | None = None,
    logger: Optional[logging.Logger] = None,
):
    """Initialize reverse geocoder with an existing AsyncConnection.

    Args:
        db: An existing psycopg AsyncConnection
        connection: psycopg.AsyncConnection
        logger: Optional custom logger. If not provided, uses package logger.
    """
    # Allow users to provide their own logger while having a sensible default
    self._logger = logger or logging.getLogger("pg_nearest_city")
    self._db = db
    self.connection: psycopg.AsyncConnection = None
    self._is_external_connection = False
    self._is_initialized = False

    with importlib.resources.path(
        "pg_nearest_city.data", "cities_1000_simple.txt.gz"
    ) as cities_path:
        self.cities_file = cities_path
    with importlib.resources.path(
        "pg_nearest_city.data", "voronois.wkb.gz"
    ) as voronoi_path:
        self.voronoi_file = voronoi_path

get_connection(db=None) async

Determine the database connection to use.

Source code in pg_nearest_city/_async/nearest_city.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def get_connection(
    self,
    db: Optional[psycopg.AsyncConnection | DbConfig] = None,
) -> psycopg.AsyncConnection:
    """Determine the database connection to use."""
    self._is_external_connection = isinstance(db, psycopg.AsyncConnection)
    is_db_config = isinstance(db, DbConfig)

    if self._is_external_connection:
        return db
    elif is_db_config:
        return await psycopg.AsyncConnection.connect(db.get_connection_string())
    else:
        # Fallback to env var extraction, or defaults for testing
        return await psycopg.AsyncConnection.connect(
            DbConfig().get_connection_string(),
        )

initialize() async

Initialize the geocoding database with validation checks.

Source code in pg_nearest_city/_async/nearest_city.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def initialize(self) -> None:
    """Initialize the geocoding database with validation checks."""
    if not self.connection:
        self._inform_user_if_not_context_manager()

    try:
        async with self.connection.cursor() as cur:
            self._logger.info("Starting database initialization check")
            status = await self._check_initialization_status(cur)

            if status.is_fully_initialized:
                self._logger.info("Database already properly initialized")
                return

            if status.has_table and not status.is_fully_initialized:
                missing = status.get_missing_components()
                self._logger.warning(
                    "Database needs repair. Missing components: %s",
                    ", ".join(missing),
                )
                self._logger.info("Reinitializing from scratch")
                await cur.execute("DROP TABLE IF EXISTS pg_nearest_city_geocoding;")

            self._logger.info("Creating geocoding table")
            await self._create_geocoding_table(cur)

            self._logger.info("Importing city data")
            await self._import_cities(cur)

            self._logger.info("Processing Voronoi polygons")
            await self._import_voronoi_polygons(cur)

            self._logger.info("Creating spatial index")
            await self._create_spatial_index(cur)

            await self.connection.commit()

            self._logger.debug("Verifying final initialization state")
            final_status = await self._check_initialization_status(cur)
            if not final_status.is_fully_initialized:
                missing = final_status.get_missing_components()
                self._logger.error(
                    "Initialization failed final validation. Missing: %s",
                    ", ".join(missing),
                )
                raise RuntimeError(
                    "Initialization failed final validation. "
                    f"Missing components: {', '.join(missing)}"
                )

            self._logger.info("Initialization complete and verified")

    except Exception as e:
        self._logger.error("Database initialization failed: %s", str(e))
        raise RuntimeError(f"Database initialization failed: {str(e)}") from e

query(lat, lon) async

Find the nearest city to the given coordinates using Voronoi regions.

Parameters:

Name Type Description Default
lat float

Latitude in degrees (-90 to 90)

required
lon float

Longitude in degrees (-180 to 180)

required

Returns:

Type Description
Optional[Location]

Location object if a matching city is found, None otherwise

Raises:

Type Description
ValueError

If coordinates are out of valid ranges

RuntimeError

If database query fails

Source code in pg_nearest_city/_async/nearest_city.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
async def query(self, lat: float, lon: float) -> Optional[Location]:
    """Find the nearest city to the given coordinates using Voronoi regions.

    Args:
        lat: Latitude in degrees (-90 to 90)
        lon: Longitude in degrees (-180 to 180)

    Returns:
        Location object if a matching city is found, None otherwise

    Raises:
        ValueError: If coordinates are out of valid ranges
        RuntimeError: If database query fails
    """
    # Throw an error if not used in 'with' block
    self._inform_user_if_not_context_manager()

    # Validate coordinate ranges
    BaseNearestCity.validate_coordinates(lon, lat)

    try:
        async with self.connection.cursor() as cur:
            await cur.execute(
                BaseNearestCity._get_reverse_geocoding_query(lon, lat),
            )
            result = await cur.fetchone()

            if not result:
                return None

            return Location(
                city=result[0],
                country=result[1],
                lat=float(result[2]),
                lon=float(result[3]),
            )
    except Exception as e:
        self._logger.error(f"Reverse geocoding failed: {str(e)}")
        raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

options: show_source: false heading_level: 3

Sync Reverse Geocode

Reverse geocoding to the nearest city over 1000 population.

Source code in pg_nearest_city/_sync/nearest_city.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class NearestCity:
    """Reverse geocoding to the nearest city over 1000 population."""

    def __init__(
        self,
        db: psycopg.Connection | DbConfig | None = None,
        logger: Optional[logging.Logger] = None,
    ):
        """Initialize reverse geocoder with an existing Connection.

        Args:
            db: An existing psycopg Connection
            connection: psycopg.Connection
            logger: Optional custom logger. If not provided, uses package logger.
        """
        # Allow users to provide their own logger while having a sensible default
        self._logger = logger or logging.getLogger("pg_nearest_city")
        self._db = db
        self.connection: psycopg.Connection = None
        self._is_external_connection = False
        self._is_initialized = False

        with importlib.resources.path(
            "pg_nearest_city.data", "cities_1000_simple.txt.gz"
        ) as cities_path:
            self.cities_file = cities_path
        with importlib.resources.path(
            "pg_nearest_city.data", "voronois.wkb.gz"
        ) as voronoi_path:
            self.voronoi_file = voronoi_path

    def __enter__(self):
        """Open the context manager."""
        self.connection = self.get_connection(self._db)
        # Create the relevant tables and validate
        self.initialize()
        self._is_initialized = True
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Close the context manager."""
        if self.connection and not self._is_external_connection:
            self.connection.close()
        self._initialized = False

    def get_connection(
        self,
        db: Optional[psycopg.Connection | DbConfig] = None,
    ) -> psycopg.Connection:
        """Determine the database connection to use."""
        self._is_external_connection = isinstance(db, psycopg.Connection)
        is_db_config = isinstance(db, DbConfig)

        if self._is_external_connection:
            return db
        elif is_db_config:
            return psycopg.Connection.connect(db.get_connection_string())
        else:
            # Fallback to env var extraction, or defaults for testing
            return psycopg.Connection.connect(
                DbConfig().get_connection_string(),
            )

    def initialize(self) -> None:
        """Initialize the geocoding database with validation checks."""
        if not self.connection:
            self._inform_user_if_not_context_manager()

        try:
            with self.connection.cursor() as cur:
                self._logger.info("Starting database initialization check")
                status = self._check_initialization_status(cur)

                if status.is_fully_initialized:
                    self._logger.info("Database already properly initialized")
                    return

                if status.has_table and not status.is_fully_initialized:
                    missing = status.get_missing_components()
                    self._logger.warning(
                        "Database needs repair. Missing components: %s",
                        ", ".join(missing),
                    )
                    self._logger.info("Reinitializing from scratch")
                    cur.execute("DROP TABLE IF EXISTS pg_nearest_city_geocoding;")

                self._logger.info("Creating geocoding table")
                self._create_geocoding_table(cur)

                self._logger.info("Importing city data")
                self._import_cities(cur)

                self._logger.info("Processing Voronoi polygons")
                self._import_voronoi_polygons(cur)

                self._logger.info("Creating spatial index")
                self._create_spatial_index(cur)

                self.connection.commit()

                self._logger.debug("Verifying final initialization state")
                final_status = self._check_initialization_status(cur)
                if not final_status.is_fully_initialized:
                    missing = final_status.get_missing_components()
                    self._logger.error(
                        "Initialization failed final validation. Missing: %s",
                        ", ".join(missing),
                    )
                    raise RuntimeError(
                        "Initialization failed final validation. "
                        f"Missing components: {', '.join(missing)}"
                    )

                self._logger.info("Initialization complete and verified")

        except Exception as e:
            self._logger.error("Database initialization failed: %s", str(e))
            raise RuntimeError(f"Database initialization failed: {str(e)}") from e

    def _inform_user_if_not_context_manager(self):
        """Raise an error if the context manager was not used."""
        if not self._is_initialized:
            raise RuntimeError(
                fill(
                    dedent("""
                NearestCity must be used within 'with' context.\n
                    For example:\n
                    with NearestCity() as geocoder:\n
                        details = geocoder.query(lat, lon)
            """)
                )
            )

    def query(self, lat: float, lon: float) -> Optional[Location]:
        """Find the nearest city to the given coordinates using Voronoi regions.

        Args:
            lat: Latitude in degrees (-90 to 90)
            lon: Longitude in degrees (-180 to 180)

        Returns:
            Location object if a matching city is found, None otherwise

        Raises:
            ValueError: If coordinates are out of valid ranges
            RuntimeError: If database query fails
        """
        # Throw an error if not used in 'with' block
        self._inform_user_if_not_context_manager()

        # Validate coordinate ranges
        BaseNearestCity.validate_coordinates(lon, lat)

        try:
            with self.connection.cursor() as cur:
                cur.execute(
                    BaseNearestCity._get_reverse_geocoding_query(lon, lat),
                )
                result = cur.fetchone()

                if not result:
                    return None

                return Location(
                    city=result[0],
                    country=result[1],
                    lat=float(result[2]),
                    lon=float(result[3]),
                )
        except Exception as e:
            self._logger.error(f"Reverse geocoding failed: {str(e)}")
            raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

    def _check_initialization_status(
        self,
        cur: psycopg.Cursor,
    ) -> InitializationStatus:
        """Check the status and integrity of the geocoding database.

        Performs essential validation checks to ensure the database is
        properly initialized and contains valid data.
        """
        status = InitializationStatus()

        # Check table existence
        cur.execute(BaseNearestCity._get_tableexistence_query())
        table_exists = cur.fetchone()
        status.has_table = bool(table_exists and table_exists[0])

        # If table doesn't exist, we can't check other properties
        if not status.has_table:
            return status

        # Check table structure
        cur.execute(BaseNearestCity._get_table_structure_query())
        columns = {col: dtype for col, dtype in cur.fetchall()}
        expected_columns = {
            "city": "character varying",
            "country": "character varying",
            "lat": "numeric",
            "lon": "numeric",
            "geom": "geometry",
            "voronoi": "geometry",
        }
        status.has_valid_structure = all(col in columns for col in expected_columns)
        # If table doesn't have valid structure, we can't check other properties
        if not status.has_valid_structure:
            return status

        # Check data completeness
        cur.execute(BaseNearestCity._get_data_completeness_query())
        counts = cur.fetchone()
        total_cities, cities_with_voronoi = counts

        status.has_data = total_cities > 0
        status.has_complete_voronoi = cities_with_voronoi == total_cities

        # Check spatial index
        cur.execute(BaseNearestCity._get_spatial_index_check_query())
        has_index = cur.fetchone()
        status.has_spatial_index = bool(has_index and has_index[0])

        return status

    def _import_cities(self, cur: Cursor):
        if not self.cities_file.exists():
            raise FileNotFoundError(f"Cities file not found: {self.cities_file}")

        """Import city data using COPY protocol."""
        with cur.copy(
            "COPY pg_nearest_city_geocoding(city, country, lat, lon) FROM STDIN"
        ) as copy:
            with gzip.open(self.cities_file, "r") as f:
                copied_bytes = 0
                while data := f.read(8192):
                    copy.write(data)
                    copied_bytes += len(data)
                self._logger.info(f"Imported {copied_bytes:,} bytes of city data")

    def _create_geocoding_table(self, cur: Cursor):
        """Create the main table."""
        cur.execute("""
            CREATE TABLE pg_nearest_city_geocoding (
                city varchar,
                country varchar,
                lat decimal,
                lon decimal,
                geom geometry(Point,4326)
                  GENERATED ALWAYS AS (ST_SetSRID(ST_MakePoint(lon, lat), 4326))
                  STORED,
                voronoi geometry(Polygon,4326)
            );
        """)

    def _import_voronoi_polygons(self, cur: Cursor):
        """Import and integrate Voronoi polygons into the main table."""
        if not self.voronoi_file.exists():
            raise FileNotFoundError(f"Voronoi file not found: {self.voronoi_file}")

        # First create temporary table for the import
        cur.execute("""
            CREATE TEMP TABLE voronoi_import (
                city text,
                country text,
                wkb bytea
            );
        """)

        # Import the binary WKB data
        with cur.copy(
            "COPY voronoi_import (city, country, wkb) FROM STDIN",
        ) as copy:
            with gzip.open(self.voronoi_file, "rb") as f:
                while data := f.read(8192):
                    copy.write(data)

        # Update main table with Voronoi geometries
        cur.execute("""
            UPDATE pg_nearest_city_geocoding g
            SET voronoi = ST_GeomFromWKB(v.wkb, 4326)
            FROM voronoi_import v
            WHERE g.city = v.city
            AND g.country = v.country;
        """)

        # Clean up temporary table
        cur.execute("DROP TABLE voronoi_import;")

    def _create_spatial_index(self, cur: Cursor):
        """Create a spatial index on the Voronoi polygons for efficient queries."""
        cur.execute("""
            CREATE INDEX geocoding_voronoi_idx
            ON pg_nearest_city_geocoding
            USING GIST (voronoi);
        """)

__enter__()

Open the context manager.

Source code in pg_nearest_city/_sync/nearest_city.py
53
54
55
56
57
58
59
def __enter__(self):
    """Open the context manager."""
    self.connection = self.get_connection(self._db)
    # Create the relevant tables and validate
    self.initialize()
    self._is_initialized = True
    return self

__exit__(exc_type, exc_value, traceback)

Close the context manager.

Source code in pg_nearest_city/_sync/nearest_city.py
61
62
63
64
65
def __exit__(self, exc_type, exc_value, traceback):
    """Close the context manager."""
    if self.connection and not self._is_external_connection:
        self.connection.close()
    self._initialized = False

__init__(db=None, logger=None)

Initialize reverse geocoder with an existing Connection.

Parameters:

Name Type Description Default
db Connection | DbConfig | None

An existing psycopg Connection

None
connection

psycopg.Connection

required
logger Optional[Logger]

Optional custom logger. If not provided, uses package logger.

None
Source code in pg_nearest_city/_sync/nearest_city.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    db: psycopg.Connection | DbConfig | None = None,
    logger: Optional[logging.Logger] = None,
):
    """Initialize reverse geocoder with an existing Connection.

    Args:
        db: An existing psycopg Connection
        connection: psycopg.Connection
        logger: Optional custom logger. If not provided, uses package logger.
    """
    # Allow users to provide their own logger while having a sensible default
    self._logger = logger or logging.getLogger("pg_nearest_city")
    self._db = db
    self.connection: psycopg.Connection = None
    self._is_external_connection = False
    self._is_initialized = False

    with importlib.resources.path(
        "pg_nearest_city.data", "cities_1000_simple.txt.gz"
    ) as cities_path:
        self.cities_file = cities_path
    with importlib.resources.path(
        "pg_nearest_city.data", "voronois.wkb.gz"
    ) as voronoi_path:
        self.voronoi_file = voronoi_path

get_connection(db=None)

Determine the database connection to use.

Source code in pg_nearest_city/_sync/nearest_city.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def get_connection(
    self,
    db: Optional[psycopg.Connection | DbConfig] = None,
) -> psycopg.Connection:
    """Determine the database connection to use."""
    self._is_external_connection = isinstance(db, psycopg.Connection)
    is_db_config = isinstance(db, DbConfig)

    if self._is_external_connection:
        return db
    elif is_db_config:
        return psycopg.Connection.connect(db.get_connection_string())
    else:
        # Fallback to env var extraction, or defaults for testing
        return psycopg.Connection.connect(
            DbConfig().get_connection_string(),
        )

initialize()

Initialize the geocoding database with validation checks.

Source code in pg_nearest_city/_sync/nearest_city.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def initialize(self) -> None:
    """Initialize the geocoding database with validation checks."""
    if not self.connection:
        self._inform_user_if_not_context_manager()

    try:
        with self.connection.cursor() as cur:
            self._logger.info("Starting database initialization check")
            status = self._check_initialization_status(cur)

            if status.is_fully_initialized:
                self._logger.info("Database already properly initialized")
                return

            if status.has_table and not status.is_fully_initialized:
                missing = status.get_missing_components()
                self._logger.warning(
                    "Database needs repair. Missing components: %s",
                    ", ".join(missing),
                )
                self._logger.info("Reinitializing from scratch")
                cur.execute("DROP TABLE IF EXISTS pg_nearest_city_geocoding;")

            self._logger.info("Creating geocoding table")
            self._create_geocoding_table(cur)

            self._logger.info("Importing city data")
            self._import_cities(cur)

            self._logger.info("Processing Voronoi polygons")
            self._import_voronoi_polygons(cur)

            self._logger.info("Creating spatial index")
            self._create_spatial_index(cur)

            self.connection.commit()

            self._logger.debug("Verifying final initialization state")
            final_status = self._check_initialization_status(cur)
            if not final_status.is_fully_initialized:
                missing = final_status.get_missing_components()
                self._logger.error(
                    "Initialization failed final validation. Missing: %s",
                    ", ".join(missing),
                )
                raise RuntimeError(
                    "Initialization failed final validation. "
                    f"Missing components: {', '.join(missing)}"
                )

            self._logger.info("Initialization complete and verified")

    except Exception as e:
        self._logger.error("Database initialization failed: %s", str(e))
        raise RuntimeError(f"Database initialization failed: {str(e)}") from e

query(lat, lon)

Find the nearest city to the given coordinates using Voronoi regions.

Parameters:

Name Type Description Default
lat float

Latitude in degrees (-90 to 90)

required
lon float

Longitude in degrees (-180 to 180)

required

Returns:

Type Description
Optional[Location]

Location object if a matching city is found, None otherwise

Raises:

Type Description
ValueError

If coordinates are out of valid ranges

RuntimeError

If database query fails

Source code in pg_nearest_city/_sync/nearest_city.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def query(self, lat: float, lon: float) -> Optional[Location]:
    """Find the nearest city to the given coordinates using Voronoi regions.

    Args:
        lat: Latitude in degrees (-90 to 90)
        lon: Longitude in degrees (-180 to 180)

    Returns:
        Location object if a matching city is found, None otherwise

    Raises:
        ValueError: If coordinates are out of valid ranges
        RuntimeError: If database query fails
    """
    # Throw an error if not used in 'with' block
    self._inform_user_if_not_context_manager()

    # Validate coordinate ranges
    BaseNearestCity.validate_coordinates(lon, lat)

    try:
        with self.connection.cursor() as cur:
            cur.execute(
                BaseNearestCity._get_reverse_geocoding_query(lon, lat),
            )
            result = cur.fetchone()

            if not result:
                return None

            return Location(
                city=result[0],
                country=result[1],
                lat=float(result[2]),
                lon=float(result[3]),
            )
    except Exception as e:
        self._logger.error(f"Reverse geocoding failed: {str(e)}")
        raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

options: show_source: false heading_level: 3

Query Output

A location object for JSON serialisation.

Parameters:

Name Type Description Default
city(str)

The city.

required
country(str)

The country.

required
lat(str)

Latitude.

required
lon(str)

Longitude.

required
Source code in pg_nearest_city/base_nearest_city.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@dataclass
class Location:
    """A location object for JSON serialisation.

    Args:
        city(str): The city.
        country(str): The country.
        lat(str): Latitude.
        lon(str): Longitude.
    """

    city: str
    country: str
    lat: float
    lon: float

options: show_source: false heading_level: 3

Database Configuration

Database config for Postgres.

Allows overriding values via constructor parameters, fallback to env vars.

Parameters:

Name Type Description Default
dbname(str)

Database name.

required
user(str)

Database user.

required
password(str)

Database password.

required
host(str)

Database host.

required
port(str)

Database port.

required
Source code in pg_nearest_city/base_nearest_city.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@dataclass
class DbConfig:
    """Database config for Postgres.

    Allows overriding values via constructor parameters, fallback to env vars.


    Args:
        dbname(str): Database name.
        user(str): Database user.
        password(str): Database password.
        host(str): Database host.
        port(str): Database port.
    """

    dbname: Optional[str] = None
    user: Optional[str] = None
    password: Optional[str] = None
    host: Optional[str] = None
    port: Optional[int] = None

    def __post_init__(self):
        """Ensures env variables are read at runtime, not at class definition."""
        self.dbname = self.dbname or os.getenv("PGNEAREST_DB_NAME")
        self.user = self.user or os.getenv("PGNEAREST_DB_USER")
        self.password = self.password or os.getenv("PGNEAREST_DB_PASSWORD")
        self.host = self.host or os.getenv("PGNEAREST_DB_HOST", "db")
        self.port = self.port or int(os.getenv("PGNEAREST_DB_PORT", "5432"))

        # Raise error if any required field is missing
        missing_fields = [
            field
            for field in ["dbname", "user", "password"]
            if not getattr(self, field)
        ]
        if missing_fields:
            raise ValueError(
                f"Missing required database config fields: {', '.join(missing_fields)}"
            )

    def get_connection_string(self) -> str:
        """Connection string that psycopg accepts."""
        return (
            f"dbname={self.dbname} user={self.user} password={self.password} "
            f"host={self.host} port={self.port}"
        )

__post_init__()

Ensures env variables are read at runtime, not at class definition.

Source code in pg_nearest_city/base_nearest_city.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __post_init__(self):
    """Ensures env variables are read at runtime, not at class definition."""
    self.dbname = self.dbname or os.getenv("PGNEAREST_DB_NAME")
    self.user = self.user or os.getenv("PGNEAREST_DB_USER")
    self.password = self.password or os.getenv("PGNEAREST_DB_PASSWORD")
    self.host = self.host or os.getenv("PGNEAREST_DB_HOST", "db")
    self.port = self.port or int(os.getenv("PGNEAREST_DB_PORT", "5432"))

    # Raise error if any required field is missing
    missing_fields = [
        field
        for field in ["dbname", "user", "password"]
        if not getattr(self, field)
    ]
    if missing_fields:
        raise ValueError(
            f"Missing required database config fields: {', '.join(missing_fields)}"
        )

get_connection_string()

Connection string that psycopg accepts.

Source code in pg_nearest_city/base_nearest_city.py
50
51
52
53
54
55
def get_connection_string(self) -> str:
    """Connection string that psycopg accepts."""
    return (
        f"dbname={self.dbname} user={self.user} password={self.password} "
        f"host={self.host} port={self.port}"
    )

options: show_source: false heading_level: 3