Skip to content

API Docs

Async Reverse Geocode

Reverse geocoding to the nearest city over 1000 population.

Source code in pg_nearest_city/_async/nearest_city.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class AsyncNearestCity:
    """Reverse geocoding to the nearest city over 1000 population."""

    connection: psycopg.AsyncConnection

    def __init__(
        self,
        db: psycopg.AsyncConnection | DbConfig | None = None,
        logger: Optional[logging.Logger] = None,
        data_path: str | None = None,
    ):
        """Initialize reverse geocoder.

        Args:
            db: An existing psycopg AsyncConnection or DbConfig
            logger: Optional custom logger.
            data_path: Optional path to directory containing exported CSV data files.
        """
        self._logger = logger or logging.getLogger("pg_nearest_city")
        self._db = db
        self._data_path = data_path
        self._is_external_connection = False
        self._is_initialized = False

    async def __aenter__(self):
        """Open the context manager."""
        self.connection = await self.get_connection(self._db)
        await self.initialize()
        self._is_initialized = True
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        """Close the context manager."""
        if self.connection and not self._is_external_connection:
            await self.connection.close()
        self._is_initialized = False

    async def get_connection(
        self,
        db: Optional[psycopg.AsyncConnection | DbConfig] = None,
    ) -> psycopg.AsyncConnection:
        """Determine the database connection to use."""
        if isinstance(db, psycopg.AsyncConnection):
            self._is_external_connection = True
            return db
        self._is_external_connection = False
        if isinstance(db, DbConfig):
            return await psycopg.AsyncConnection.connect(db.get_connection_string())
        return await psycopg.AsyncConnection.connect(
            DbConfig().get_connection_string(),
        )

    async def initialize(self) -> None:
        """Initialize the geocoding database with validation checks.

        Checks for country and geocoding tables. If not present,
        attempts auto-import from exported CSV data files.

        Uses a PostgreSQL advisory lock to prevent concurrent bootstrap races
        when multiple processes start up simultaneously.
        """
        if not getattr(self, "connection", None):
            self._inform_user_if_not_context_manager()

        # "pnc\0" as a 32-bit integer advisory lock ID
        _advisory_lock_id = 0x706E6300

        try:
            async with self.connection.cursor() as cur:
                await cur.execute("SELECT pg_advisory_lock(%s)", (_advisory_lock_id,))

            self._logger.info("Starting database initialization check")
            async with self.connection.cursor() as cur:
                status = await self._check_initialization_status(cur)

            if status.is_fully_initialized:
                self._logger.info("Database already properly initialized")
                return

            missing = status.get_missing_components()

            # Attempt auto-import from data directory
            data_path = BaseNearestCity._find_data_path(self._data_path)
            if data_path:
                self._logger.warning(
                    "Database not ready (missing: %s); importing from: %s",
                    ", ".join(missing),
                    data_path,
                )
                await self._import_from_data(data_path)

                # Re-check after import
                async with self.connection.cursor() as cur:
                    status = await self._check_initialization_status(cur)
                if status.is_fully_initialized:
                    self._logger.warning("Database initialized from data files")
                    return

            raise RuntimeError(
                "Database is not initialized and no data files found. "
                "Run the bootstrap pipeline first (pgnearest-load), "
                f"or provide a data directory via {BaseNearestCity.DATA_ENV_VAR} "
                "env var or data_path constructor arg."
            )

        except RuntimeError:
            raise
        except Exception as e:
            self._logger.error("Database initialization failed: %s", str(e))
            raise RuntimeError(f"Database initialization failed: {str(e)}") from e
        finally:
            async with self.connection.cursor() as cur:
                await cur.execute("SELECT pg_advisory_unlock(%s)", (_advisory_lock_id,))

    async def _import_from_data(self, data_path: str) -> None:
        """Bootstrap the database from exported CSV data files."""
        data_dir = Path(data_path)
        country_file = BaseNearestCity._find_data_file(
            data_dir, BaseNearestCity.COUNTRY_DATA_STEM
        )
        geocoding_file = BaseNearestCity._find_data_file(
            data_dir, BaseNearestCity.GEOCODING_DATA_STEM
        )
        if not country_file or not geocoding_file:
            raise RuntimeError(f"Data files not found in {data_path}")

        async with self.connection.cursor() as cur:
            # Create schema
            for stmt in BaseNearestCity._get_bootstrap_sql():
                await cur.execute(stmt)
            await self.connection.commit()

            # Import country data (alpha2, alpha3, name, geom as hex WKB)
            self._logger.info("Importing country data from %s", country_file.name)
            with open_compressed(country_file) as fh:
                async with cur.copy(BaseNearestCity.COPY_COUNTRY_FROM) as copy:
                    while data := fh.read(8192):
                        await copy.write(data)
            await self.connection.commit()

            # Import geocoding data (city, country, lat, lon)
            self._logger.info("Importing geocoding data from %s", geocoding_file.name)
            with open_compressed(geocoding_file) as fh:
                async with cur.copy(BaseNearestCity.COPY_GEOCODING_FROM) as copy:
                    while data := fh.read(8192):
                        await copy.write(data)
            await self.connection.commit()

            # Create indices
            self._logger.info("Creating indices")
            for stmt in BaseNearestCity._get_bootstrap_index_sql():
                await cur.execute(stmt)
            await self.connection.commit()

    def _inform_user_if_not_context_manager(self):
        """Raise an error if the context manager was not used."""
        if not self._is_initialized:
            raise RuntimeError(
                fill(
                    dedent("""
                AsyncNearestCity must be used within 'async with' context.\n
                    For example:\n
                    async with AsyncNearestCity() as geocoder:\n
                        details = geocoder.query(lon, lat)
            """)
                )
            )

    async def query(self, lon: float, lat: float) -> Optional[Location]:
        """Find the nearest city to the given coordinates.

        Uses ST_Covers + lateral join on country and geocoding tables.
        Coordinates use (lon, lat) order, matching PostGIS convention
        where longitude is the X axis and latitude is Y.

        Args:
            lon: Longitude in degrees (-180 to 180)
            lat: Latitude in degrees (-90 to 90)

        Returns:
            Location object if a matching city is found, None otherwise

        Raises:
            ValueError: If coordinates are out of valid ranges
            RuntimeError: If database query fails
        """
        self._inform_user_if_not_context_manager()

        BaseNearestCity.validate_coordinates(lon, lat)

        try:
            async with self.connection.cursor() as cur:
                await cur.execute(
                    BaseNearestCity._get_reverse_geocoding_query(lon, lat),
                )
                result = await cur.fetchone()

                if not result:
                    return None

                return Location(
                    city=result[0],
                    country=result[1],
                    lat=float(result[2]),
                    lon=float(result[3]),
                    country_alpha3=result[4],
                    country_name=result[5],
                )
        except Exception as e:
            self._logger.error(f"Reverse geocoding failed: {str(e)}")
            raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

    async def _check_initialization_status(
        self,
        cur: psycopg.AsyncCursor,
    ) -> InitializationStatus:
        """Check the status and integrity of the geocoding database."""
        status = InitializationStatus()

        # Check table existence
        await cur.execute(BaseNearestCity._get_tables_existence_query())
        result = await cur.fetchone()
        status.has_country_table = bool(result and result[0])
        status.has_geocoding_table = bool(result and result[1])

        if not (status.has_country_table and status.has_geocoding_table):
            return status

        # Check data completeness
        await cur.execute(BaseNearestCity._get_data_completeness_query())
        counts = await cur.fetchone()
        if counts is not None:
            status.has_country_data = counts[0] > 0
            status.has_geocoding_data = counts[1] > 0

        # Check spatial indices
        await cur.execute(BaseNearestCity._get_spatial_index_check_query())
        idx_result = await cur.fetchone()
        status.has_spatial_index = bool(idx_result and idx_result[0] and idx_result[1])

        return status

__aenter__() async

Open the context manager.

Source code in pg_nearest_city/_async/nearest_city.py
45
46
47
48
49
50
async def __aenter__(self):
    """Open the context manager."""
    self.connection = await self.get_connection(self._db)
    await self.initialize()
    self._is_initialized = True
    return self

__aexit__(exc_type, exc_value, traceback) async

Close the context manager.

Source code in pg_nearest_city/_async/nearest_city.py
52
53
54
55
56
async def __aexit__(self, exc_type, exc_value, traceback):
    """Close the context manager."""
    if self.connection and not self._is_external_connection:
        await self.connection.close()
    self._is_initialized = False

__init__(db=None, logger=None, data_path=None)

Initialize reverse geocoder.

Parameters:

Name Type Description Default
db AsyncConnection | DbConfig | None

An existing psycopg AsyncConnection or DbConfig

None
logger Optional[Logger]

Optional custom logger.

None
data_path str | None

Optional path to directory containing exported CSV data files.

None
Source code in pg_nearest_city/_async/nearest_city.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    db: psycopg.AsyncConnection | DbConfig | None = None,
    logger: Optional[logging.Logger] = None,
    data_path: str | None = None,
):
    """Initialize reverse geocoder.

    Args:
        db: An existing psycopg AsyncConnection or DbConfig
        logger: Optional custom logger.
        data_path: Optional path to directory containing exported CSV data files.
    """
    self._logger = logger or logging.getLogger("pg_nearest_city")
    self._db = db
    self._data_path = data_path
    self._is_external_connection = False
    self._is_initialized = False

get_connection(db=None) async

Determine the database connection to use.

Source code in pg_nearest_city/_async/nearest_city.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def get_connection(
    self,
    db: Optional[psycopg.AsyncConnection | DbConfig] = None,
) -> psycopg.AsyncConnection:
    """Determine the database connection to use."""
    if isinstance(db, psycopg.AsyncConnection):
        self._is_external_connection = True
        return db
    self._is_external_connection = False
    if isinstance(db, DbConfig):
        return await psycopg.AsyncConnection.connect(db.get_connection_string())
    return await psycopg.AsyncConnection.connect(
        DbConfig().get_connection_string(),
    )

initialize() async

Initialize the geocoding database with validation checks.

Checks for country and geocoding tables. If not present, attempts auto-import from exported CSV data files.

Uses a PostgreSQL advisory lock to prevent concurrent bootstrap races when multiple processes start up simultaneously.

Source code in pg_nearest_city/_async/nearest_city.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def initialize(self) -> None:
    """Initialize the geocoding database with validation checks.

    Checks for country and geocoding tables. If not present,
    attempts auto-import from exported CSV data files.

    Uses a PostgreSQL advisory lock to prevent concurrent bootstrap races
    when multiple processes start up simultaneously.
    """
    if not getattr(self, "connection", None):
        self._inform_user_if_not_context_manager()

    # "pnc\0" as a 32-bit integer advisory lock ID
    _advisory_lock_id = 0x706E6300

    try:
        async with self.connection.cursor() as cur:
            await cur.execute("SELECT pg_advisory_lock(%s)", (_advisory_lock_id,))

        self._logger.info("Starting database initialization check")
        async with self.connection.cursor() as cur:
            status = await self._check_initialization_status(cur)

        if status.is_fully_initialized:
            self._logger.info("Database already properly initialized")
            return

        missing = status.get_missing_components()

        # Attempt auto-import from data directory
        data_path = BaseNearestCity._find_data_path(self._data_path)
        if data_path:
            self._logger.warning(
                "Database not ready (missing: %s); importing from: %s",
                ", ".join(missing),
                data_path,
            )
            await self._import_from_data(data_path)

            # Re-check after import
            async with self.connection.cursor() as cur:
                status = await self._check_initialization_status(cur)
            if status.is_fully_initialized:
                self._logger.warning("Database initialized from data files")
                return

        raise RuntimeError(
            "Database is not initialized and no data files found. "
            "Run the bootstrap pipeline first (pgnearest-load), "
            f"or provide a data directory via {BaseNearestCity.DATA_ENV_VAR} "
            "env var or data_path constructor arg."
        )

    except RuntimeError:
        raise
    except Exception as e:
        self._logger.error("Database initialization failed: %s", str(e))
        raise RuntimeError(f"Database initialization failed: {str(e)}") from e
    finally:
        async with self.connection.cursor() as cur:
            await cur.execute("SELECT pg_advisory_unlock(%s)", (_advisory_lock_id,))

query(lon, lat) async

Find the nearest city to the given coordinates.

Uses ST_Covers + lateral join on country and geocoding tables. Coordinates use (lon, lat) order, matching PostGIS convention where longitude is the X axis and latitude is Y.

Parameters:

Name Type Description Default
lon float

Longitude in degrees (-180 to 180)

required
lat float

Latitude in degrees (-90 to 90)

required

Returns:

Type Description
Optional[Location]

Location object if a matching city is found, None otherwise

Raises:

Type Description
ValueError

If coordinates are out of valid ranges

RuntimeError

If database query fails

Source code in pg_nearest_city/_async/nearest_city.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
async def query(self, lon: float, lat: float) -> Optional[Location]:
    """Find the nearest city to the given coordinates.

    Uses ST_Covers + lateral join on country and geocoding tables.
    Coordinates use (lon, lat) order, matching PostGIS convention
    where longitude is the X axis and latitude is Y.

    Args:
        lon: Longitude in degrees (-180 to 180)
        lat: Latitude in degrees (-90 to 90)

    Returns:
        Location object if a matching city is found, None otherwise

    Raises:
        ValueError: If coordinates are out of valid ranges
        RuntimeError: If database query fails
    """
    self._inform_user_if_not_context_manager()

    BaseNearestCity.validate_coordinates(lon, lat)

    try:
        async with self.connection.cursor() as cur:
            await cur.execute(
                BaseNearestCity._get_reverse_geocoding_query(lon, lat),
            )
            result = await cur.fetchone()

            if not result:
                return None

            return Location(
                city=result[0],
                country=result[1],
                lat=float(result[2]),
                lon=float(result[3]),
                country_alpha3=result[4],
                country_name=result[5],
            )
    except Exception as e:
        self._logger.error(f"Reverse geocoding failed: {str(e)}")
        raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

options: show_source: false heading_level: 3

Sync Reverse Geocode

Reverse geocoding to the nearest city over 1000 population.

Source code in pg_nearest_city/_sync/nearest_city.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class NearestCity:
    """Reverse geocoding to the nearest city over 1000 population."""

    connection: psycopg.Connection

    def __init__(
        self,
        db: psycopg.Connection | DbConfig | None = None,
        logger: Optional[logging.Logger] = None,
        data_path: str | None = None,
    ):
        """Initialize reverse geocoder.

        Args:
            db: An existing psycopg Connection or DbConfig
            logger: Optional custom logger.
            data_path: Optional path to directory containing exported CSV data files.
        """
        self._logger = logger or logging.getLogger("pg_nearest_city")
        self._db = db
        self._data_path = data_path
        self._is_external_connection = False
        self._is_initialized = False

    def __enter__(self):
        """Open the context manager."""
        self.connection = self.get_connection(self._db)
        self.initialize()
        self._is_initialized = True
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Close the context manager."""
        if self.connection and not self._is_external_connection:
            self.connection.close()
        self._is_initialized = False

    def get_connection(
        self,
        db: Optional[psycopg.Connection | DbConfig] = None,
    ) -> psycopg.Connection:
        """Determine the database connection to use."""
        if isinstance(db, psycopg.Connection):
            self._is_external_connection = True
            return db
        self._is_external_connection = False
        if isinstance(db, DbConfig):
            return psycopg.Connection.connect(db.get_connection_string())
        return psycopg.Connection.connect(
            DbConfig().get_connection_string(),
        )

    def initialize(self) -> None:
        """Initialize the geocoding database with validation checks.

        Checks for country and geocoding tables. If not present,
        attempts auto-import from exported CSV data files.

        Uses a PostgreSQL advisory lock to prevent concurrent bootstrap races
        when multiple processes start up simultaneously.
        """
        if not getattr(self, "connection", None):
            self._inform_user_if_not_context_manager()

        # "pnc\0" as a 32-bit integer advisory lock ID
        _advisory_lock_id = 0x706E6300

        try:
            with self.connection.cursor() as cur:
                cur.execute("SELECT pg_advisory_lock(%s)", (_advisory_lock_id,))

            self._logger.info("Starting database initialization check")
            with self.connection.cursor() as cur:
                status = self._check_initialization_status(cur)

            if status.is_fully_initialized:
                self._logger.info("Database already properly initialized")
                return

            missing = status.get_missing_components()

            # Attempt auto-import from data directory
            data_path = BaseNearestCity._find_data_path(self._data_path)
            if data_path:
                self._logger.warning(
                    "Database not ready (missing: %s); importing from: %s",
                    ", ".join(missing),
                    data_path,
                )
                self._import_from_data(data_path)

                # Re-check after import
                with self.connection.cursor() as cur:
                    status = self._check_initialization_status(cur)
                if status.is_fully_initialized:
                    self._logger.warning("Database initialized from data files")
                    return

            raise RuntimeError(
                "Database is not initialized and no data files found. "
                "Run the bootstrap pipeline first (pgnearest-load), "
                f"or provide a data directory via {BaseNearestCity.DATA_ENV_VAR} "
                "env var or data_path constructor arg."
            )

        except RuntimeError:
            raise
        except Exception as e:
            self._logger.error("Database initialization failed: %s", str(e))
            raise RuntimeError(f"Database initialization failed: {str(e)}") from e
        finally:
            with self.connection.cursor() as cur:
                cur.execute("SELECT pg_advisory_unlock(%s)", (_advisory_lock_id,))

    def _import_from_data(self, data_path: str) -> None:
        """Bootstrap the database from exported CSV data files."""
        data_dir = Path(data_path)
        country_file = BaseNearestCity._find_data_file(
            data_dir, BaseNearestCity.COUNTRY_DATA_STEM
        )
        geocoding_file = BaseNearestCity._find_data_file(
            data_dir, BaseNearestCity.GEOCODING_DATA_STEM
        )
        if not country_file or not geocoding_file:
            raise RuntimeError(f"Data files not found in {data_path}")

        with self.connection.cursor() as cur:
            # Create schema
            for stmt in BaseNearestCity._get_bootstrap_sql():
                cur.execute(stmt)
            self.connection.commit()

            # Import country data (alpha2, alpha3, name, geom as hex WKB)
            self._logger.info("Importing country data from %s", country_file.name)
            with open_compressed(country_file) as fh:
                with cur.copy(BaseNearestCity.COPY_COUNTRY_FROM) as copy:
                    while data := fh.read(8192):
                        copy.write(data)
            self.connection.commit()

            # Import geocoding data (city, country, lat, lon)
            self._logger.info("Importing geocoding data from %s", geocoding_file.name)
            with open_compressed(geocoding_file) as fh:
                with cur.copy(BaseNearestCity.COPY_GEOCODING_FROM) as copy:
                    while data := fh.read(8192):
                        copy.write(data)
            self.connection.commit()

            # Create indices
            self._logger.info("Creating indices")
            for stmt in BaseNearestCity._get_bootstrap_index_sql():
                cur.execute(stmt)
            self.connection.commit()

    def _inform_user_if_not_context_manager(self):
        """Raise an error if the context manager was not used."""
        if not self._is_initialized:
            raise RuntimeError(
                fill(
                    dedent("""
                NearestCity must be used within 'with' context.\n
                    For example:\n
                    with NearestCity() as geocoder:\n
                        details = geocoder.query(lon, lat)
            """)
                )
            )

    def query(self, lon: float, lat: float) -> Optional[Location]:
        """Find the nearest city to the given coordinates.

        Uses ST_Covers + lateral join on country and geocoding tables.
        Coordinates use (lon, lat) order, matching PostGIS convention
        where longitude is the X axis and latitude is Y.

        Args:
            lon: Longitude in degrees (-180 to 180)
            lat: Latitude in degrees (-90 to 90)

        Returns:
            Location object if a matching city is found, None otherwise

        Raises:
            ValueError: If coordinates are out of valid ranges
            RuntimeError: If database query fails
        """
        self._inform_user_if_not_context_manager()

        BaseNearestCity.validate_coordinates(lon, lat)

        try:
            with self.connection.cursor() as cur:
                cur.execute(
                    BaseNearestCity._get_reverse_geocoding_query(lon, lat),
                )
                result = cur.fetchone()

                if not result:
                    return None

                return Location(
                    city=result[0],
                    country=result[1],
                    lat=float(result[2]),
                    lon=float(result[3]),
                    country_alpha3=result[4],
                    country_name=result[5],
                )
        except Exception as e:
            self._logger.error(f"Reverse geocoding failed: {str(e)}")
            raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

    def _check_initialization_status(
        self,
        cur: psycopg.Cursor,
    ) -> InitializationStatus:
        """Check the status and integrity of the geocoding database."""
        status = InitializationStatus()

        # Check table existence
        cur.execute(BaseNearestCity._get_tables_existence_query())
        result = cur.fetchone()
        status.has_country_table = bool(result and result[0])
        status.has_geocoding_table = bool(result and result[1])

        if not (status.has_country_table and status.has_geocoding_table):
            return status

        # Check data completeness
        cur.execute(BaseNearestCity._get_data_completeness_query())
        counts = cur.fetchone()
        if counts is not None:
            status.has_country_data = counts[0] > 0
            status.has_geocoding_data = counts[1] > 0

        # Check spatial indices
        cur.execute(BaseNearestCity._get_spatial_index_check_query())
        idx_result = cur.fetchone()
        status.has_spatial_index = bool(idx_result and idx_result[0] and idx_result[1])

        return status

__enter__()

Open the context manager.

Source code in pg_nearest_city/_sync/nearest_city.py
45
46
47
48
49
50
def __enter__(self):
    """Open the context manager."""
    self.connection = self.get_connection(self._db)
    self.initialize()
    self._is_initialized = True
    return self

__exit__(exc_type, exc_value, traceback)

Close the context manager.

Source code in pg_nearest_city/_sync/nearest_city.py
52
53
54
55
56
def __exit__(self, exc_type, exc_value, traceback):
    """Close the context manager."""
    if self.connection and not self._is_external_connection:
        self.connection.close()
    self._is_initialized = False

__init__(db=None, logger=None, data_path=None)

Initialize reverse geocoder.

Parameters:

Name Type Description Default
db Connection | DbConfig | None

An existing psycopg Connection or DbConfig

None
logger Optional[Logger]

Optional custom logger.

None
data_path str | None

Optional path to directory containing exported CSV data files.

None
Source code in pg_nearest_city/_sync/nearest_city.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    db: psycopg.Connection | DbConfig | None = None,
    logger: Optional[logging.Logger] = None,
    data_path: str | None = None,
):
    """Initialize reverse geocoder.

    Args:
        db: An existing psycopg Connection or DbConfig
        logger: Optional custom logger.
        data_path: Optional path to directory containing exported CSV data files.
    """
    self._logger = logger or logging.getLogger("pg_nearest_city")
    self._db = db
    self._data_path = data_path
    self._is_external_connection = False
    self._is_initialized = False

get_connection(db=None)

Determine the database connection to use.

Source code in pg_nearest_city/_sync/nearest_city.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def get_connection(
    self,
    db: Optional[psycopg.Connection | DbConfig] = None,
) -> psycopg.Connection:
    """Determine the database connection to use."""
    if isinstance(db, psycopg.Connection):
        self._is_external_connection = True
        return db
    self._is_external_connection = False
    if isinstance(db, DbConfig):
        return psycopg.Connection.connect(db.get_connection_string())
    return psycopg.Connection.connect(
        DbConfig().get_connection_string(),
    )

initialize()

Initialize the geocoding database with validation checks.

Checks for country and geocoding tables. If not present, attempts auto-import from exported CSV data files.

Uses a PostgreSQL advisory lock to prevent concurrent bootstrap races when multiple processes start up simultaneously.

Source code in pg_nearest_city/_sync/nearest_city.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def initialize(self) -> None:
    """Initialize the geocoding database with validation checks.

    Checks for country and geocoding tables. If not present,
    attempts auto-import from exported CSV data files.

    Uses a PostgreSQL advisory lock to prevent concurrent bootstrap races
    when multiple processes start up simultaneously.
    """
    if not getattr(self, "connection", None):
        self._inform_user_if_not_context_manager()

    # "pnc\0" as a 32-bit integer advisory lock ID
    _advisory_lock_id = 0x706E6300

    try:
        with self.connection.cursor() as cur:
            cur.execute("SELECT pg_advisory_lock(%s)", (_advisory_lock_id,))

        self._logger.info("Starting database initialization check")
        with self.connection.cursor() as cur:
            status = self._check_initialization_status(cur)

        if status.is_fully_initialized:
            self._logger.info("Database already properly initialized")
            return

        missing = status.get_missing_components()

        # Attempt auto-import from data directory
        data_path = BaseNearestCity._find_data_path(self._data_path)
        if data_path:
            self._logger.warning(
                "Database not ready (missing: %s); importing from: %s",
                ", ".join(missing),
                data_path,
            )
            self._import_from_data(data_path)

            # Re-check after import
            with self.connection.cursor() as cur:
                status = self._check_initialization_status(cur)
            if status.is_fully_initialized:
                self._logger.warning("Database initialized from data files")
                return

        raise RuntimeError(
            "Database is not initialized and no data files found. "
            "Run the bootstrap pipeline first (pgnearest-load), "
            f"or provide a data directory via {BaseNearestCity.DATA_ENV_VAR} "
            "env var or data_path constructor arg."
        )

    except RuntimeError:
        raise
    except Exception as e:
        self._logger.error("Database initialization failed: %s", str(e))
        raise RuntimeError(f"Database initialization failed: {str(e)}") from e
    finally:
        with self.connection.cursor() as cur:
            cur.execute("SELECT pg_advisory_unlock(%s)", (_advisory_lock_id,))

query(lon, lat)

Find the nearest city to the given coordinates.

Uses ST_Covers + lateral join on country and geocoding tables. Coordinates use (lon, lat) order, matching PostGIS convention where longitude is the X axis and latitude is Y.

Parameters:

Name Type Description Default
lon float

Longitude in degrees (-180 to 180)

required
lat float

Latitude in degrees (-90 to 90)

required

Returns:

Type Description
Optional[Location]

Location object if a matching city is found, None otherwise

Raises:

Type Description
ValueError

If coordinates are out of valid ranges

RuntimeError

If database query fails

Source code in pg_nearest_city/_sync/nearest_city.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def query(self, lon: float, lat: float) -> Optional[Location]:
    """Find the nearest city to the given coordinates.

    Uses ST_Covers + lateral join on country and geocoding tables.
    Coordinates use (lon, lat) order, matching PostGIS convention
    where longitude is the X axis and latitude is Y.

    Args:
        lon: Longitude in degrees (-180 to 180)
        lat: Latitude in degrees (-90 to 90)

    Returns:
        Location object if a matching city is found, None otherwise

    Raises:
        ValueError: If coordinates are out of valid ranges
        RuntimeError: If database query fails
    """
    self._inform_user_if_not_context_manager()

    BaseNearestCity.validate_coordinates(lon, lat)

    try:
        with self.connection.cursor() as cur:
            cur.execute(
                BaseNearestCity._get_reverse_geocoding_query(lon, lat),
            )
            result = cur.fetchone()

            if not result:
                return None

            return Location(
                city=result[0],
                country=result[1],
                lat=float(result[2]),
                lon=float(result[3]),
                country_alpha3=result[4],
                country_name=result[5],
            )
    except Exception as e:
        self._logger.error(f"Reverse geocoding failed: {str(e)}")
        raise RuntimeError(f"Reverse geocoding failed: {str(e)}") from e

options: show_source: false heading_level: 3

Query Output

A location object for JSON serialisation.

Parameters:

Name Type Description Default
city str

The city.

required
country str

The country.

required
lat str

Latitude.

required
lon str

Longitude.

required
Source code in pg_nearest_city/base_nearest_city.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@dataclass
class Location:
    """A location object for JSON serialisation.

    Args:
        city(str): The city.
        country(str): The country.
        lat(str): Latitude.
        lon(str): Longitude.
    """

    city: str
    country: str
    lat: float
    lon: float
    country_alpha3: Optional[str] = None
    country_name: Optional[str] = None

options: show_source: false heading_level: 3

Database Configuration

Database config for Postgres.

Allows overriding values via constructor parameters, fallback to env vars.

Parameters:

Name Type Description Default
dbname str

Database name.

None
user str

Database user.

None
password str

Database password.

None
host str

Database host.

None
port str

Database port.

None
Source code in pg_nearest_city/base_nearest_city.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class DbConfig:
    """Database config for Postgres.

    Allows overriding values via constructor parameters, fallback to env vars.


    Args:
        dbname(str): Database name.
        user(str): Database user.
        password(str): Database password.
        host(str): Database host.
        port(str): Database port.
    """

    dbname: Optional[str] = None
    user: Optional[str] = None
    password: Optional[str] = None
    host: Optional[str] = None
    port: Optional[int] = None

    def __post_init__(self):
        """Ensures env variables are read at runtime, not at class definition."""
        self.dbname = self.dbname or os.getenv("PGNEAREST_DB_NAME", "postgres")
        self.user = self.user or os.getenv("PGNEAREST_DB_USER")
        self.password = self.password or os.getenv("PGNEAREST_DB_PASSWORD")
        self.host = self.host or os.getenv("PGNEAREST_DB_HOST", "localhost")
        self.port = self.port or int(os.getenv("PGNEAREST_DB_PORT", "5432"))

        # Raise error if any required field is missing
        missing_fields = [
            field for field in ["dbname", "user"] if not getattr(self, field)
        ]
        if missing_fields:
            raise ValueError(
                f"Missing required database config fields: {', '.join(missing_fields)}"
            )
        if not self.password:
            logging.getLogger("pg_nearest_city").debug(
                "No database password configured; connecting without a password"
            )

    def get_connection_string(self) -> str:
        """Connection string that psycopg accepts."""
        parts = [f"dbname={self.dbname}", f"user={self.user}"]
        if self.password:
            parts.append(f"password={self.password}")
        parts.extend([f"host={self.host}", f"port={self.port}"])
        return " ".join(parts)

__post_init__()

Ensures env variables are read at runtime, not at class definition.

Source code in pg_nearest_city/base_nearest_city.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __post_init__(self):
    """Ensures env variables are read at runtime, not at class definition."""
    self.dbname = self.dbname or os.getenv("PGNEAREST_DB_NAME", "postgres")
    self.user = self.user or os.getenv("PGNEAREST_DB_USER")
    self.password = self.password or os.getenv("PGNEAREST_DB_PASSWORD")
    self.host = self.host or os.getenv("PGNEAREST_DB_HOST", "localhost")
    self.port = self.port or int(os.getenv("PGNEAREST_DB_PORT", "5432"))

    # Raise error if any required field is missing
    missing_fields = [
        field for field in ["dbname", "user"] if not getattr(self, field)
    ]
    if missing_fields:
        raise ValueError(
            f"Missing required database config fields: {', '.join(missing_fields)}"
        )
    if not self.password:
        logging.getLogger("pg_nearest_city").debug(
            "No database password configured; connecting without a password"
        )

get_connection_string()

Connection string that psycopg accepts.

Source code in pg_nearest_city/base_nearest_city.py
58
59
60
61
62
63
64
def get_connection_string(self) -> str:
    """Connection string that psycopg accepts."""
    parts = [f"dbname={self.dbname}", f"user={self.user}"]
    if self.password:
        parts.append(f"password={self.password}")
    parts.extend([f"host={self.host}", f"port={self.port}"])
    return " ".join(parts)

options: show_source: false heading_level: 3