Skip to content

API Reference

osm_ai_helper.download_osm

download_osm(area, output_dir, selector, discard=None)

Download OSM elements for the given areas and selector.

Parameters:

Name Type Description Default
output_dir str

Output directory.

required
selector str

OSM tag to select elements. Uses the Overpass API.

Example: "leisure=swimming_pool"

required
area str

Name of area to download. Can be city, state, country, etc. Uses the Nominatim API.

required
discard Optional[dict[str, str]]

Discard elements matching any of the given tags. Defaults to None. Example: {"location": "indoor", "building": "yes"}

None
Source code in src/osm_ai_helper/download_osm.py
@logger.catch(reraise=True)
def download_osm(
    area: str,
    output_dir: str,
    selector: str,
    discard: Optional[Dict[str, str]] = None,
):
    """Download OSM elements for the given areas and selector.

    Args:
        output_dir (str): Output directory.
        selector (str): OSM tag to select elements.
            Uses the [Overpass API](https://wiki.openstreetmap.org/wiki/Overpass_API/Language_Guide).

            Example: ["leisure=swimming_pool"](https://wiki.openstreetmap.org/wiki/Tag:leisure%3Dswimming_pool)

        area (str): Name of area to download.
            Can be city, state, country, etc.
            Uses the [Nominatim API](https://nominatim.org/release-docs/develop/api/Search/).

        discard (Optional[dict[str, str]], optional): Discard elements matching
            any of the given tags.
            Defaults to None.
            Example: {"location": "indoor", "building": "yes"}
    """
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True, parents=True)

    discard = discard or {}

    logger.info(f"Downloading osm data for {area}")
    elements = [
        element
        for element in get_elements(selector, area=area)
        if all(element.get("tags", {}).get(k) != v for k, v in discard.items())
    ]

    output_file = output_path / f"{area}.json"
    logger.info(f"Writing {len(elements)} elements to {output_file}")
    output_file.write_text(json.dumps(elements))
    logger.success("Done!")

osm_ai_helper.group_elements_and_download_tiles

group_elements_and_download_tiles(elements_file, output_dir, mapbox_token, zoom=18)

Groups the elements by tile and downloads the satellite image corresponding to the tile.

Parameters:

Name Type Description Default
elements_file str

Path to the JSON file containing OSM elements. See download_osm.

required
output_dir str

Output directory. The images and annotations will be saved in this directory. The images will be saved as JPEG files and the annotations as JSON files. The names of the files will be in the format {zoom}_{tile_col}_{tile_row}.

required
mapbox_token str

Mapbox token.

required
zoom int

Zoom level of the tiles to download. See https://docs.mapbox.com/help/glossary/zoom-level/. Defaults to 18.

18
Source code in src/osm_ai_helper/group_elements_and_download_tiles.py
@logger.catch(reraise=True)
def group_elements_and_download_tiles(
    elements_file: str, output_dir: str, mapbox_token: str, zoom: int = 18
):
    """
    Groups the elements by tile and downloads the satellite image corresponding to the tile.

    Args:
        elements_file (str): Path to the JSON file containing OSM elements.
            See [download_osm][osm_ai_helper.download_osm.download_osm].
        output_dir (str): Output directory.
            The images and annotations will be saved in this directory.
            The images will be saved as JPEG files and the annotations as JSON files.
            The names of the files will be in the format `{zoom}_{tile_col}_{tile_row}`.
        mapbox_token (str): [Mapbox](https://console.mapbox.com/) token.
        zoom (int, optional): Zoom level of the tiles to download.
            See https://docs.mapbox.com/help/glossary/zoom-level/.
            Defaults to 18.
    """
    annotation_path = Path(elements_file)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True, parents=True)

    elements = json.loads(annotation_path.read_text())

    logger.info("Grouping elements by tile")
    grouped = group_elements_by_tile(elements, zoom)

    total = len(grouped)
    n = 0
    logger.info("Downloading tiles and writing annotation")
    for (tile_col, tile_row), group in grouped.items():
        if n % 50 == 0:
            logger.info(f"Processed {n}/{total} tiles")
        n += 1
        output_name = f"{zoom}_{tile_col}_{tile_row}"
        image_name = f"{output_path / output_name}.jpg"
        annotation_name = f"{output_path / output_name}.json"
        if not Path(image_name).exists():
            image = download_tile(zoom, tile_col, tile_row, mapbox_token)
            image.save(image_name)
        if not Path(annotation_name).exists():
            Path(annotation_name).write_text(
                json.dumps(
                    {
                        "elements": group,
                    }
                )
            )

osm_ai_helper.convert_to_yolo_dataset

convert_to_yolo_dataset(input_dir)

Convert the output of group_elements_and_download_tiles.py to the YOLO format.

Parameters:

Name Type Description Default
input_dir str

Input directory containing the images and annotations. The images are expected to be in the format zoom_tile_col_tile_row.jpg. The annotations are expected to be in the format zoom_tile_col_tile_row.json.

required
Source code in src/osm_ai_helper/convert_to_yolo_dataset.py
@logger.catch(reraise=True)
def convert_to_yolo_dataset(
    input_dir: str,
):
    """Convert the output of `group_elements_and_download_tiles.py` to the [YOLO format](https://docs.ultralytics.com/datasets/detect/).

    Args:
        input_dir (str): Input directory containing the images and annotations.
            The images are expected to be in the format `zoom_tile_col_tile_row.jpg`.
            The annotations are expected to be in the format `zoom_tile_col_tile_row.json`.
    """
    input_path = Path(input_dir)

    for image_path in input_path.glob("**/*.jpg"):
        annotation_path = image_path.with_suffix(".json")
        annotation = json.loads(annotation_path.read_text())
        zoom, tile_col, tile_row = map(int, image_path.stem.split("_"))

        yolo_annotation = grouped_elements_to_annotation(
            annotation["elements"], zoom, tile_col, tile_row
        )
        image_path.with_suffix(".txt").write_text(yolo_annotation)

grouped_elements_to_annotation(group, zoom, tile_col, tile_row)

Output format: https://docs.ultralytics.com/datasets/detect/

Source code in src/osm_ai_helper/convert_to_yolo_dataset.py
def grouped_elements_to_annotation(
    group: List[Dict], zoom: int, tile_col: int, tile_row: int
) -> str:
    """
    Output format: https://docs.ultralytics.com/datasets/detect/
    """
    annotation = ""
    left_pixel = tile_col * TILE_SIZE
    top_pixel = tile_row * TILE_SIZE
    bbox = box(left_pixel, top_pixel, left_pixel + TILE_SIZE, top_pixel + TILE_SIZE)
    for element in group:
        pixel_polygon = [
            meters_col_row_to_pixel_col_row(
                *lat_lon_to_meters_col_row(point["lat"], point["lon"]), zoom
            )
            for point in element["geometry"]
        ]
        try:
            bounded_polygon = Polygon(pixel_polygon).intersection(bbox)
        except AttributeError:
            continue
        min_col, min_row, max_col, max_row = bounded_polygon.bounds
        col_center = (min_col + max_col) / 2
        col_center = (col_center - left_pixel) / TILE_SIZE
        col_center = round(col_center, 2)
        row_center = (min_row + max_row) / 2
        row_center = (row_center - top_pixel) / TILE_SIZE
        row_center = round(row_center, 2)
        width = max_col - min_col
        width /= TILE_SIZE
        width = round(width, 2)
        height = max_row - min_row
        height /= TILE_SIZE
        height = round(height, 2)
        annotation += f"0 {col_center} {row_center} {width} {height}\n"

    return annotation

osm_ai_helper.run_inference

run_inference(yolo_model_file, output_dir, lat_lon, margin=1, sam_model='facebook/sam2.1-hiera-small', selector='leisure=swimming_pool', zoom=18)

Run inference on a given location.

Parameters:

Name Type Description Default
yolo_model_file str

Path to the YOLO model file.

required
output_dir str

Output directory. The images and annotations will be saved in this directory. The images will be saved as PNG files and the annotations as JSON files. The names of the files will be in the format {zoom}_{tile_col}_{tile_row}.

required
lat_lon Tuple[float, float]

Latitude and longitude of the location.

required
margin int

Number of tiles around the location. Defaults to 1.

1
sam_model str

SAM2 model to use. Defaults to "facebook/sam2.1-hiera-small".

'facebook/sam2.1-hiera-small'
selector str

OpenStreetMap selector. Defaults to "leisure=swimming_pool".

'leisure=swimming_pool'
zoom int

Zoom level. Defaults to 18. See https://docs.mapbox.com/help/glossary/zoom-level/.

18
Source code in src/osm_ai_helper/run_inference.py
@logger.catch(reraise=True)
def run_inference(
    yolo_model_file: str,
    output_dir: str,
    lat_lon: Tuple[float, float],
    margin: int = 1,
    sam_model: str = "facebook/sam2.1-hiera-small",
    selector: str = "leisure=swimming_pool",
    zoom: int = 18,
):
    """
    Run inference on a given location.

    Args:
        yolo_model_file (str): Path to the [YOLO](https://docs.ultralytics.com/tasks/detect/) model file.
        output_dir (str): Output directory.
            The images and annotations will be saved in this directory.
            The images will be saved as PNG files and the annotations as JSON files.
            The names of the files will be in the format `{zoom}_{tile_col}_{tile_row}`.
        lat_lon (Tuple[float, float]): Latitude and longitude of the location.
        margin (int, optional): Number of tiles around the location.
            Defaults to 1.
        sam_model (str, optional): [SAM2](https://github.com/facebookresearch/sam2) model to use.
            Defaults to "facebook/sam2.1-hiera-small".
        selector (str, optional): OpenStreetMap selector.
            Defaults to "leisure=swimming_pool".
        zoom (int, optional): Zoom level.
            Defaults to 18.
            See https://docs.mapbox.com/help/glossary/zoom-level/.
    """
    bbox_predictor = YOLO(yolo_model_file)
    sam_predictor = SAM2ImagePredictor.from_pretrained(
        sam_model, device="cuda" if torch.cuda.is_available() else "cpu"
    )

    bbox = lat_lon_to_bbox(*lat_lon, zoom, margin)

    output_path = Path(output_dir) / f"{zoom}_{'_'.join(map(str, bbox))}"
    output_path.mkdir(exist_ok=True, parents=True)

    logger.info(f"Downloading elements for {selector} in {bbox}")
    elements = get_elements(selector, bbox=bbox)
    grouped_elements = group_elements_by_tile(elements, zoom)
    logger.info(f"Found {len(elements)} elements")

    logger.info(f"Downloading all tiles within {bbox}")
    stacked_image, stacked_mask = download_stacked_image_and_mask(
        bbox, grouped_elements, zoom, os.environ["MAPBOX_TOKEN"]
    )
    Image.fromarray(stacked_image).save(output_path / "full_image.png")
    Image.fromarray(stacked_mask).save(output_path / "full_mask.png")

    logger.info("Predicting on stacked image")
    # Change to BGR for inference
    stacked_output = tile_prediction(
        bbox_predictor, sam_predictor, stacked_image[:, :, ::-1]
    )

    logger.info("Finding existing, new and missed polygons")
    existing, new, missed = polygon_evaluation(stacked_mask, stacked_output)
    logger.info(f"{len(existing)} exiting, {len(new)} new and {len(missed)} missied.")
    logger.info("Painting evaluation")
    stacked_image_pil = Image.fromarray(stacked_image)
    painted_img = paint_polygon_evaluation(stacked_image_pil, existing, new, missed)
    painted_img.save(output_path / "full_image_painted.png")

    _, west, north, _ = bbox
    left_col, top_row = lat_lon_to_tile_col_row(north, west, zoom)
    top_pixel = top_row * TILE_SIZE
    left_pixel = left_col * TILE_SIZE

    logger.info("Saving new polygons")
    for n, polygon in enumerate(new):
        lon_lat_polygon = pixel_polygon_to_lat_lon_polygon(
            polygon, top_pixel, left_pixel, zoom
        )

        with open(f"{output_path}/{n}.json", "w") as f:
            json.dump(lon_lat_polygon, f)

        crop_polygon(polygon, painted_img, margin=100).save(
            f"{output_path}/{n}_painted.png"
        )

        crop_polygon(polygon, stacked_image_pil, margin=100).save(
            f"{output_path}/{n}.png"
        )

    return output_path, existing, new, missed

osm_ai_helper.upload_osm

upload_osm(results_dir, client_id, client_secret, comment='Add Swimming Pools')

Upload the results to OpenStreetMap.

Parameters:

Name Type Description Default
results_dir str

Directory containing the results. The results should be in the format of *.json files. See run_inference.

required
client_id str

OpenStreetMap Oauth client ID.

required
client_secret str

OpenStreetMap Oauth client secret.

required
comment str

Comment to add to the changeset. Defaults to "Add Swimming Pools".

'Add Swimming Pools'
Source code in src/osm_ai_helper/upload_osm.py
def upload_osm(
    results_dir: str,
    client_id: str,
    client_secret: str,
    comment: str = "Add Swimming Pools",
):
    """
    Upload the results to OpenStreetMap.

    Args:
        results_dir (str): Directory containing the results.
            The results should be in the format of `*.json` files.
            See [`run_inference`][osm_ai_helper.run_inference.run_inference].
        client_id (str): OpenStreetMap Oauth client ID.
        client_secret (str): OpenStreetMap Oauth client secret.
        comment (str, optional): Comment to add to the changeset.
            Defaults to "Add Swimming Pools".
    """
    osm_session = ensure_authorized_session(client_id, client_secret)

    lon_lat_polygons = [
        json.loads(result.read_text()) for result in Path(results_dir).glob("*.json")
    ]

    with open_changeset(osm_session, comment=comment) as changeset:
        for lon_lat_polygon in lon_lat_polygons:
            upload_polygon(osm_session, lon_lat_polygon, changeset)

        return changeset

osm_ai_helper.utils.inference

download_stacked_image_and_mask(bbox, grouped_elements, zoom, mapbox_token)

Download all tiles within a bounding box and stack them into a single image.

All the grouped_elements are painted on the mask.

Parameters:

Name Type Description Default
bbox tuple

Bounding box in the form of (south, west, north, east).

required
grouped_elements dict

OpenStreetMap elements grouped with group_elements_by_tile.

required
zoom int

Zoom level. See https://docs.mapbox.com/help/glossary/zoom-level/.

required
mapbox_token str

Mapbox token. See https://docs.mapbox.com/help/getting-started/access-tokens/.

required

Returns:

Name Type Description
tuple tuple[ndarray, ndarray]

Stacked image and mask.

Source code in src/osm_ai_helper/utils/inference.py
def download_stacked_image_and_mask(
    bbox: tuple[float, float, float, float],
    grouped_elements: dict,
    zoom: int,
    mapbox_token: str,
) -> tuple[np.ndarray, np.ndarray]:
    """Download all tiles within a bounding box and stack them into a single image.

    All the grouped_elements are painted on the mask.

    Args:
        bbox (tuple): Bounding box in the form of (south, west, north, east).
        grouped_elements (dict): OpenStreetMap elements grouped with
            [group_elements_by_tile][osm_ai_helper.utils.tiles.group_elements_by_tile].
        zoom (int): Zoom level.
            See https://docs.mapbox.com/help/glossary/zoom-level/.
        mapbox_token (str): Mapbox token.
            See https://docs.mapbox.com/help/getting-started/access-tokens/.

    Returns:
        tuple: Stacked image and mask.
    """
    south, west, north, east = bbox
    left, top = lat_lon_to_tile_col_row(north, west, zoom)
    right, bottom = lat_lon_to_tile_col_row(south, east, zoom)

    stacked_image = np.zeros(
        ((right - left) * TILE_SIZE, (bottom - top) * TILE_SIZE, 3), dtype=np.uint8
    )
    stacked_mask = np.zeros(
        ((right - left) * TILE_SIZE, (bottom - top) * TILE_SIZE), dtype=np.uint8
    )

    for n_col, tile_col in enumerate(range(left, right)):
        for n_row, tile_row in enumerate(range(top, bottom)):
            group = grouped_elements[(tile_col, tile_row)]

            img = download_tile(zoom, tile_col, tile_row, mapbox_token)

            mask = grouped_elements_to_mask(group, zoom, tile_col, tile_row)

            stacked_image[
                n_row * TILE_SIZE : (n_row + 1) * TILE_SIZE,
                n_col * TILE_SIZE : (n_col + 1) * TILE_SIZE,
            ] = np.array(img)

            stacked_mask[
                n_row * TILE_SIZE : (n_row + 1) * TILE_SIZE,
                n_col * TILE_SIZE : (n_col + 1) * TILE_SIZE,
            ] = mask

    return stacked_image, stacked_mask

tile_prediction(bbox_predictor, sam_predictor, image, overlap=0.125, bbox_conf=0.4, bbox_pad=0)

Predict on a large image by splitting it into tiles.

Parameters:

Name Type Description Default
bbox_predictor YOLO

YOLO bounding box. See https://docs.ultralytics.com/tasks/detect/.

required
sam_predictor SAM2ImagePredictor

Segment Anything Image Predictor. See https://github.com/facebookresearch/sam2?tab=readme-ov-file#image-prediction.

required
image ndarray

Image to predict on.

required
overlap float

Overlap between tiles. Defaults to 0.125.

0.125
bbox_conf float

Sets the minimum confidence threshold for detections. Defaults to 0.4.

0.4
bbox_pad int

Padding to be added to the predicted bbox. Defaults to 0.

0

Returns:

Type Description
ndarray

np.ndarray: Stacked output.

Source code in src/osm_ai_helper/utils/inference.py
def tile_prediction(
    bbox_predictor: YOLO,
    sam_predictor: SAM2ImagePredictor,
    image: np.ndarray,
    overlap: float = 0.125,
    bbox_conf: float = 0.4,
    bbox_pad: int = 0,
) -> np.ndarray:
    """
    Predict on a large image by splitting it into tiles.

    Args:
        bbox_predictor (YOLO): YOLO bounding box.
            See https://docs.ultralytics.com/tasks/detect/.
        sam_predictor (SAM2ImagePredictor): Segment Anything Image Predictor.
            See https://github.com/facebookresearch/sam2?tab=readme-ov-file#image-prediction.
        image (np.ndarray): Image to predict on.
        overlap (float): Overlap between tiles.
            Defaults to 0.125.
        bbox_conf (float): Sets the minimum confidence threshold for detections.
            Defaults to 0.4.
        bbox_pad (int): Padding to be added to the predicted bbox.
            Defaults to 0.

    Returns:
        np.ndarray: Stacked output.
    """
    stacked_output = np.zeros((image.shape[0], image.shape[1]), dtype=np.uint8)
    for top, left, bottom, right in yield_tile_corners(image, TILE_SIZE, overlap):
        logger.debug(f"Predicting {(top, left, bottom, right)}")
        tile_image = image[left:right, top:bottom].copy()
        sam_predictor.set_image(tile_image)

        bbox_result = bbox_predictor.predict(tile_image, conf=bbox_conf, verbose=False)

        for bbox in bbox_result:
            if len(bbox.boxes.xyxy) == 0:
                continue

            bbox_int = list(int(x) for x in bbox.boxes.xyxy[0])

            if bbox_pad > 0:
                bbox_int[0] = max(0, bbox_int[0] - bbox_pad)
                bbox_int[1] = max(0, bbox_int[1] - bbox_pad)
                bbox_int[2] = min(512, bbox_int[2] + bbox_pad)
                bbox_int[3] = min(512, bbox_int[3] + bbox_pad)

            masks, *_ = sam_predictor.predict(
                box=[bbox_int],
                multimask_output=False,
            )

            stacked_output[left:right, top:bottom] += masks[0].astype(np.uint8)

    stacked_output[stacked_output != 0] = 255

    return stacked_output

osm_ai_helper.utils.osm

get_area_id(area_name)

Get the Nominatim ID of an area.

Uses the Nominatim API.

Parameters:

Name Type Description Default
area_name str

The name of the area.

required

Returns:

Type Description
Optional[int]

Optional[int]: The Nominatim ID of the area.

Source code in src/osm_ai_helper/utils/osm.py
def get_area_id(area_name: str) -> Optional[int]:
    """
    Get the Nominatim ID of an area.

    Uses the [Nominatim API](https://nominatim.org/release-docs/develop/api/Search/).

    Args:
        area_name (str): The name of the area.

    Returns:
        Optional[int]: The Nominatim ID of the area.
    """
    response = requests.get(
        f"https://nominatim.openstreetmap.org/search?q={area_name}&format=json",
        headers={"User-Agent": "Mozilla/5.0"},
    )
    response.raise_for_status()
    response_json = json.loads(response.content.decode())
    for area in response_json:
        osm_type = area.get("osm_type")
        osm_id = area.get("osm_id")
        if osm_type == "way":
            return osm_id + 2400000000
        if osm_type == "relation":
            return osm_id + 3600000000

get_elements(selector, area=None, bbox=None)

Get elements from OpenStreetMap using the Overpass API.

Uses the Overpass API.

Parameters:

Name Type Description Default
selector str

The selector to use. Example: "leisure=swimming_pool"

required
area Optional[str]

The area to search in. Can be city, state, country, etc. Defaults to None.

None
bbox Optional[Tuple[float, float, float, float]]

The bounding box to search in. Defaults to None. Format: https://wiki.openstreetmap.org/wiki/Overpass_API/Language_Guide#The_bounding_box

None

Returns:

Type Description
list[dict]

The elements found.

Source code in src/osm_ai_helper/utils/osm.py
def get_elements(
    selector: str,
    area: Optional[str] = None,
    bbox: Optional[Tuple[float, float, float, float]] = None,
) -> list[dict]:
    """
    Get elements from OpenStreetMap using the Overpass API.

    Uses the [Overpass API](https://wiki.openstreetmap.org/wiki/Overpass_API/Language_Guide).

    Args:
        selector (str): The selector to use.
            Example: "leisure=swimming_pool"
        area (Optional[str], optional): The area to search in.
            Can be city, state, country, etc.
            Defaults to None.

        bbox (Optional[Tuple[float, float, float, float]], optional): The bounding box to search in.
            Defaults to None.
            Format: https://wiki.openstreetmap.org/wiki/Overpass_API/Language_Guide#The_bounding_box

    Returns:
        The elements found.
    """
    query = "[out:json];"

    if area:
        area_id = get_area_id(area)
        query += f"area({area_id})->.searchArea;(way[{selector}](area.searchArea););"
    elif bbox:
        bbox_str = ",".join(map(str, bbox))
        query += f"(way[{selector}]({bbox_str}););"
    else:
        raise ValueError("area or bbox must be provided")

    query += " out body geom;"

    response = requests.get(
        "https://overpass-api.de/api/interpreter",
        params={"data": query},
        headers={"User-Agent": "Mozilla/5.0"},
    )
    response.raise_for_status()
    response_json = json.loads(response.content.decode())
    return response_json["elements"]

osm_ai_helper.utils.plots

show_vlm_entry(entry)

Extracts image and points from entry and draws the points.

Parameters:

Name Type Description Default
entry dict

Dataset entry generated by convert_to_vlm_dataset. Expected format:

entry = {
    "messages": [
        { "role": "user",
        "content" : [
            {"type" : "text",  "text"  : instruction},
            {"type" : "image", "image" : image} ]
        },
        { "role" : "assistant",
        "content" : [
            {"type" : "text",  "text"  : str(points)} ]
        },
    ]
}
required

Returns:

Name Type Description
Image Image

Image with points drawn.

Source code in src/osm_ai_helper/utils/plots.py
def show_vlm_entry(entry) -> Image:
    """
    Extracts image and points from entry and draws the points.

    Args:
        entry (dict): Dataset entry generated by `convert_to_vlm_dataset`.
            Expected format:

            ```py
            entry = {
                "messages": [
                    { "role": "user",
                    "content" : [
                        {"type" : "text",  "text"  : instruction},
                        {"type" : "image", "image" : image} ]
                    },
                    { "role" : "assistant",
                    "content" : [
                        {"type" : "text",  "text"  : str(points)} ]
                    },
                ]
            }
            ```

    Returns:
        Image: Image with points drawn.
    """
    messages = entry["messages"]
    image = messages[0]["content"][1]["image"]
    width, height = image.size
    points = eval(messages[1]["content"][0]["text"])
    draw = ImageDraw.Draw(image)

    for point in points:
        draw.circle((point[0] * width, point[1] * height), 5, fill="red")

    return image

osm_ai_helper.utils.tiles

group_elements_by_tile(elements, zoom)

Broup elements by the tiles they belong to, based on the zoom level.

Each MAPBOX tile is a 512x512 pixel image.

Parameters:

Name Type Description Default
elements List[Dict]

List of elements from download_osm.

required
zoom int

Zoom level. See https://docs.mapbox.com/help/glossary/zoom-level/.

required

Returns:

Type Description
dict[tuple, list[dict]]

dict[tuple, list[dict]]: Grouped elements.

Source code in src/osm_ai_helper/utils/tiles.py
def group_elements_by_tile(elements: List[Dict], zoom: int) -> dict[tuple, list[dict]]:
    """Broup elements by the tiles they belong to, based on the zoom level.

    Each MAPBOX tile is a 512x512 pixel image.

    Args:
        elements (List[Dict]): List of elements from
            [download_osm][osm_ai_helper.download_osm.download_osm].
        zoom (int): Zoom level. See https://docs.mapbox.com/help/glossary/zoom-level/.

    Returns:
        dict[tuple, list[dict]]: Grouped elements.
    """
    grouped: dict[tuple, list[dict]] = defaultdict(list)

    for element in elements:
        pixel_polygon = []
        for point in element["geometry"]:
            pixel_point = lat_lon_to_pixel_col_row(point["lat"], point["lon"], zoom)
            pixel_polygon.append(pixel_point)

        pixel_polygon = np.array(pixel_polygon, dtype=np.int32)

        tiles = map(tuple, np.unique(pixel_polygon // TILE_SIZE, axis=0))
        for group in tiles:
            grouped[group].append(element)

    return grouped