Skip to content

Python Client

The OSMClient class is the main interface for querying your local OSM stack. It returns results as GeoPandas GeoDataFrame objects in EPSG:4326.

Setup

from osmforge import OSMClient

# Default — connects to http://localhost:8000
client = OSMClient()

# Custom host / port
client = OSMClient(base_url="http://192.168.1.10:8000")

Bounding-box queries

Propagation features — propagation_bbox

Returns features classified into propagation layers within a lat/lon bounding box.

gdf = client.propagation_bbox(
    min_lon=-1.6, min_lat=50.55,
    max_lon=-1.0, max_lat=50.85,
)
print(gdf.columns.tolist())
# ['geometry', 'layer', 'osm_id', ...]
print(gdf["layer"].value_counts())

Filter to specific layers:

gdf = client.propagation_bbox(
    min_lon=-1.6, min_lat=50.55,
    max_lon=-1.0, max_lat=50.85,
    layers=["buildings", "vegetation"],
)

Cap the number of features returned:

gdf = client.propagation_bbox(
    min_lon=-0.13, min_lat=51.49,
    max_lon=-0.11, max_lat=51.51,
    limit=500,
)

Raw OSM features — features_bbox

Returns all OSM tags without propagation classification. Useful for exploration.

gdf = client.features_bbox(
    min_lon=-0.13, min_lat=51.49,
    max_lon=-0.11, max_lat=51.51,
)
# All raw OSM tag columns are present
print(gdf.columns.tolist())

Geometry queries

Propagation features — propagation_geometry

Accepts a Shapely geometry or a GeoJSON dict (Polygon or MultiPolygon).

With a Shapely polygon:

from shapely.geometry import box

aoi = box(-1.6, 50.55, -1.0, 50.85)  # (min_lon, min_lat, max_lon, max_lat)

gdf = client.propagation_geometry(aoi)
print(len(gdf), "features")

With a GeoJSON dict:

geojson_polygon = {
    "type": "Polygon",
    "coordinates": [[
        [-1.6, 50.55],
        [-1.0, 50.55],
        [-1.0, 50.85],
        [-1.6, 50.85],
        [-1.6, 50.55],
    ]],
}

gdf = client.propagation_geometry(geojson_polygon)

Filter layers and cap results:

gdf = client.propagation_geometry(
    aoi,
    layers=["buildings", "roads", "water"],
    limit=1000,
)

Working with results

Results are standard GeoDataFrame objects — use any GeoPandas or Shapely operation:

import geopandas as gpd

gdf = client.propagation_bbox(
    min_lon=-1.6, min_lat=50.55,
    max_lon=-1.0, max_lat=50.85,
)

# Filter to buildings only
buildings = gdf[gdf["layer"] == "buildings"]

# Reproject for metric calculations (e.g. area in m²)
buildings_bng = buildings.to_crs("EPSG:27700")
buildings_bng["area_m2"] = buildings_bng.geometry.area

# Save to file
gdf.to_file("isle_of_wight.gpkg", driver="GPKG")

# Plot
gdf.plot(column="layer", legend=True, figsize=(12, 8))

API Reference

osmforge.client.OSMClient

Source code in osmforge/client.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class OSMClient:
    def __init__(self, base_url: str = "http://localhost:8000"):
        self.base_url = base_url.rstrip("/")

    def _get(self, path: str, params: dict) -> dict:
        r = requests.get(f"{self.base_url}{path}", params=params, timeout=120)
        r.raise_for_status()
        return r.json()

    def _post(self, path: str, body: dict) -> dict:
        r = requests.post(f"{self.base_url}{path}", json=body, timeout=120)
        r.raise_for_status()
        return r.json()

    @staticmethod
    def _to_gdf(fc: dict) -> gpd.GeoDataFrame:
        features = fc.get("features", [])
        if not features:
            return gpd.GeoDataFrame()
        return gpd.GeoDataFrame.from_features(features, crs="EPSG:4326")

    # ------------------------------------------------------------------
    # Public methods
    # ------------------------------------------------------------------

    def propagation_bbox(
        self,
        min_lon: float,
        min_lat: float,
        max_lon: float,
        max_lat: float,
        layers: Optional[List[str]] = None,
        limit: Optional[int] = None,
    ) -> gpd.GeoDataFrame:
        """
        Return propagation-classified features within a bounding box.

        Parameters
        ----------
        min_lon, min_lat, max_lon, max_lat : float
            Bounding box in WGS-84 degrees.
        layers : list of str, optional
            Subset of ALL_LAYERS to include. Defaults to all.
        limit : int, optional
            Maximum number of features to return.
        """
        params: dict = {
            "min_lon": min_lon, "min_lat": min_lat,
            "max_lon": max_lon, "max_lat": max_lat,
        }
        for layer in (layers or ALL_LAYERS):
            params.setdefault("layers", [])
            params["layers"].append(layer)
        if limit is not None:
            params["limit"] = limit
        return self._to_gdf(self._get("/propagation/bbox", params))

    def propagation_geometry(
        self,
        geometry: _Geometry,
        layers: Optional[List[str]] = None,
        limit: Optional[int] = None,
    ) -> gpd.GeoDataFrame:
        """
        Return propagation-classified features intersecting a polygon or multipolygon.

        Parameters
        ----------
        geometry : GeoJSON dict or Shapely geometry
            The area of interest (Polygon or MultiPolygon).
        layers : list of str, optional
            Subset of ALL_LAYERS to include. Defaults to all.
        limit : int, optional
            Maximum number of features to return.
        """
        body: dict = {"geometry": _to_geojson(geometry), "layers": layers or ALL_LAYERS}
        if limit is not None:
            body["limit"] = limit
        return self._to_gdf(self._post("/propagation/geometry", body))

    def features_bbox(
        self,
        min_lon: float,
        min_lat: float,
        max_lon: float,
        max_lat: float,
        limit: Optional[int] = None,
    ) -> gpd.GeoDataFrame:
        """
        Return raw OSM features (all tags) within a bounding box.

        Useful for exploration; use propagation_bbox for modelling.
        """
        params: dict = {
            "min_lon": min_lon, "min_lat": min_lat,
            "max_lon": max_lon, "max_lat": max_lat,
        }
        if limit is not None:
            params["limit"] = limit
        return self._to_gdf(self._get("/features/bbox", params))

features_bbox(min_lon, min_lat, max_lon, max_lat, limit=None)

Return raw OSM features (all tags) within a bounding box.

Useful for exploration; use propagation_bbox for modelling.

Source code in osmforge/client.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def features_bbox(
    self,
    min_lon: float,
    min_lat: float,
    max_lon: float,
    max_lat: float,
    limit: Optional[int] = None,
) -> gpd.GeoDataFrame:
    """
    Return raw OSM features (all tags) within a bounding box.

    Useful for exploration; use propagation_bbox for modelling.
    """
    params: dict = {
        "min_lon": min_lon, "min_lat": min_lat,
        "max_lon": max_lon, "max_lat": max_lat,
    }
    if limit is not None:
        params["limit"] = limit
    return self._to_gdf(self._get("/features/bbox", params))

propagation_bbox(min_lon, min_lat, max_lon, max_lat, layers=None, limit=None)

Return propagation-classified features within a bounding box.

Parameters:

Name Type Description Default
min_lon float

Bounding box in WGS-84 degrees.

required
min_lat float

Bounding box in WGS-84 degrees.

required
max_lon float

Bounding box in WGS-84 degrees.

required
max_lat float

Bounding box in WGS-84 degrees.

required
layers list of str

Subset of ALL_LAYERS to include. Defaults to all.

None
limit int

Maximum number of features to return.

None
Source code in osmforge/client.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def propagation_bbox(
    self,
    min_lon: float,
    min_lat: float,
    max_lon: float,
    max_lat: float,
    layers: Optional[List[str]] = None,
    limit: Optional[int] = None,
) -> gpd.GeoDataFrame:
    """
    Return propagation-classified features within a bounding box.

    Parameters
    ----------
    min_lon, min_lat, max_lon, max_lat : float
        Bounding box in WGS-84 degrees.
    layers : list of str, optional
        Subset of ALL_LAYERS to include. Defaults to all.
    limit : int, optional
        Maximum number of features to return.
    """
    params: dict = {
        "min_lon": min_lon, "min_lat": min_lat,
        "max_lon": max_lon, "max_lat": max_lat,
    }
    for layer in (layers or ALL_LAYERS):
        params.setdefault("layers", [])
        params["layers"].append(layer)
    if limit is not None:
        params["limit"] = limit
    return self._to_gdf(self._get("/propagation/bbox", params))

propagation_geometry(geometry, layers=None, limit=None)

Return propagation-classified features intersecting a polygon or multipolygon.

Parameters:

Name Type Description Default
geometry GeoJSON dict or Shapely geometry

The area of interest (Polygon or MultiPolygon).

required
layers list of str

Subset of ALL_LAYERS to include. Defaults to all.

None
limit int

Maximum number of features to return.

None
Source code in osmforge/client.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def propagation_geometry(
    self,
    geometry: _Geometry,
    layers: Optional[List[str]] = None,
    limit: Optional[int] = None,
) -> gpd.GeoDataFrame:
    """
    Return propagation-classified features intersecting a polygon or multipolygon.

    Parameters
    ----------
    geometry : GeoJSON dict or Shapely geometry
        The area of interest (Polygon or MultiPolygon).
    layers : list of str, optional
        Subset of ALL_LAYERS to include. Defaults to all.
    limit : int, optional
        Maximum number of features to return.
    """
    body: dict = {"geometry": _to_geojson(geometry), "layers": layers or ALL_LAYERS}
    if limit is not None:
        body["limit"] = limit
    return self._to_gdf(self._post("/propagation/geometry", body))