| |
|
|
| import io |
| import pickle |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| import numpy as np |
| from PIL import Image |
| import rtree |
|
|
| from utils.geo import BoundaryBox, Projection |
| from .data import MapData |
| from .download import get_osm |
| from .parser import Groups |
| from .raster import Canvas, render_raster_map, render_raster_masks |
| from .reader import OSMData, OSMNode, OSMWay |
|
|
|
|
| class MapIndex: |
| def __init__( |
| self, |
| data: MapData, |
| ): |
| self.index_nodes = rtree.index.Index() |
| for i, node in data.nodes.items(): |
| self.index_nodes.insert(i, tuple(node.xy) * 2) |
|
|
| self.index_lines = rtree.index.Index() |
| for i, line in data.lines.items(): |
| bbox = tuple(np.r_[line.xy.min(0), line.xy.max(0)]) |
| self.index_lines.insert(i, bbox) |
|
|
| self.index_areas = rtree.index.Index() |
| for i, area in data.areas.items(): |
| xy = np.concatenate(area.outers + area.inners) |
| bbox = tuple(np.r_[xy.min(0), xy.max(0)]) |
| self.index_areas.insert(i, bbox) |
|
|
| self.data = data |
|
|
| def query(self, bbox: BoundaryBox) -> Tuple[List[OSMNode], List[OSMWay]]: |
| query = tuple(np.r_[bbox.min_, bbox.max_]) |
| ret = [] |
| for x in ["nodes", "lines", "areas"]: |
| ids = getattr(self, "index_" + x).intersection(query) |
| ret.append([getattr(self.data, x)[i] for i in ids]) |
| return tuple(ret) |
|
|
|
|
| def bbox_to_slice(bbox: BoundaryBox, canvas: Canvas): |
| uv_min = np.ceil(canvas.to_uv(bbox.min_)).astype(int) |
| uv_max = np.ceil(canvas.to_uv(bbox.max_)).astype(int) |
| slice_ = (slice(uv_max[1], uv_min[1]), slice(uv_min[0], uv_max[0])) |
| return slice_ |
|
|
|
|
| def round_bbox(bbox: BoundaryBox, origin: np.ndarray, ppm: int): |
| bbox = bbox.translate(-origin) |
| bbox = BoundaryBox(np.round(bbox.min_ * ppm) / ppm, np.round(bbox.max_ * ppm) / ppm) |
| return bbox.translate(origin) |
|
|
| class MapTileManager: |
| def __init__( |
| self, |
| osmpath:Path, |
| ): |
|
|
| self.osm = OSMData.from_file(osmpath) |
|
|
|
|
| |
| def from_bbox( |
| self, |
| projection: Projection, |
| bbox: BoundaryBox, |
| ppm: int, |
| tile_size: int = 128, |
| ): |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| self.osm.add_xy_to_nodes(projection) |
| map_data = MapData.from_osm(self.osm) |
| map_index = MapIndex(map_data) |
|
|
| bounds_x, bounds_y = [ |
| np.r_[np.arange(min_, max_, tile_size), max_] |
| for min_, max_ in zip(bbox.min_, bbox.max_) |
| ] |
| bbox_tiles = {} |
| for i, xmin in enumerate(bounds_x[:-1]): |
| for j, ymin in enumerate(bounds_y[:-1]): |
| bbox_tiles[i, j] = BoundaryBox( |
| [xmin, ymin], [bounds_x[i + 1], bounds_y[j + 1]] |
| ) |
|
|
| tiles = {} |
| for ij, bbox_tile in bbox_tiles.items(): |
| canvas = Canvas(bbox_tile, ppm) |
| nodes, lines, areas = map_index.query(bbox_tile) |
| masks = render_raster_masks(nodes, lines, areas, canvas) |
| canvas.raster = render_raster_map(masks) |
| tiles[ij] = canvas |
|
|
| groups = {k: v for k, v in vars(Groups).items() if not k.startswith("__")} |
|
|
| self.origin = bbox.min_ |
| self.bbox = bbox |
| self.tiles = tiles |
| self.tile_size = tile_size |
| self.ppm = ppm |
| self.projection = projection |
| self.groups = groups |
| self.map_data = map_data |
|
|
| return self.query(bbox) |
| |
|
|
| def query(self, bbox: BoundaryBox) -> Canvas: |
| bbox = round_bbox(bbox, self.bbox.min_, self.ppm) |
| canvas = Canvas(bbox, self.ppm) |
| raster = np.zeros((3, canvas.h, canvas.w), np.uint8) |
|
|
| bbox_all = bbox & self.bbox |
| ij_min = np.floor((bbox_all.min_ - self.origin) / self.tile_size).astype(int) |
| ij_max = np.ceil((bbox_all.max_ - self.origin) / self.tile_size).astype(int) - 1 |
| for i in range(ij_min[0], ij_max[0] + 1): |
| for j in range(ij_min[1], ij_max[1] + 1): |
| tile = self.tiles[i, j] |
| bbox_select = tile.bbox & bbox |
| slice_query = bbox_to_slice(bbox_select, canvas) |
| slice_tile = bbox_to_slice(bbox_select, tile) |
| raster[(slice(None),) + slice_query] = tile.raster[ |
| (slice(None),) + slice_tile |
| ] |
| canvas.raster = raster |
| return canvas |
|
|
| def save(self, path: Path): |
| dump = { |
| "bbox": self.bbox.format(), |
| "tile_size": self.tile_size, |
| "ppm": self.ppm, |
| "groups": self.groups, |
| "tiles_bbox": {}, |
| "tiles_raster": {}, |
| } |
| if self.projection is not None: |
| dump["ref_latlonalt"] = self.projection.latlonalt |
| for ij, canvas in self.tiles.items(): |
| dump["tiles_bbox"][ij] = canvas.bbox.format() |
| raster_bytes = io.BytesIO() |
| raster = Image.fromarray(canvas.raster.transpose(1, 2, 0).astype(np.uint8)) |
| raster.save(raster_bytes, format="PNG") |
| dump["tiles_raster"][ij] = raster_bytes |
| with open(path, "wb") as fp: |
| pickle.dump(dump, fp) |
|
|
| @classmethod |
| def load(cls, path: Path): |
| with path.open("rb") as fp: |
| dump = pickle.load(fp) |
| tiles = {} |
| for ij, bbox in dump["tiles_bbox"].items(): |
| tiles[ij] = Canvas(BoundaryBox.from_string(bbox), dump["ppm"]) |
| raster = np.asarray(Image.open(dump["tiles_raster"][ij])) |
| tiles[ij].raster = raster.transpose(2, 0, 1).copy() |
| projection = Projection(*dump["ref_latlonalt"]) |
| return cls( |
| tiles, |
| BoundaryBox.from_string(dump["bbox"]), |
| dump["tile_size"], |
| dump["ppm"], |
| projection, |
| dump["groups"], |
| ) |
|
|
| class TileManager: |
| def __init__( |
| self, |
| tiles: Dict, |
| bbox: BoundaryBox, |
| tile_size: int, |
| ppm: int, |
| projection: Projection, |
| groups: Dict[str, List[str]], |
| map_data: Optional[MapData] = None, |
| ): |
| self.origin = bbox.min_ |
| self.bbox = bbox |
| self.tiles = tiles |
| self.tile_size = tile_size |
| self.ppm = ppm |
| self.projection = projection |
| self.groups = groups |
| self.map_data = map_data |
| assert np.all(tiles[0, 0].bbox.min_ == self.origin) |
| for tile in tiles.values(): |
| assert bbox.contains(tile.bbox) |
|
|
| @classmethod |
| def from_bbox( |
| cls, |
| projection: Projection, |
| bbox: BoundaryBox, |
| ppm: int, |
| path: Optional[Path] = None, |
| tile_size: int = 128, |
| ): |
| bbox_osm = projection.unproject(bbox) |
| if path is not None and path.is_file(): |
| print(OSMData.from_file) |
| osm = OSMData.from_file(path) |
| if osm.box is not None: |
| assert osm.box.contains(bbox_osm) |
| else: |
| osm = OSMData.from_dict(get_osm(bbox_osm, path)) |
|
|
| osm.add_xy_to_nodes(projection) |
| map_data = MapData.from_osm(osm) |
| map_index = MapIndex(map_data) |
|
|
| bounds_x, bounds_y = [ |
| np.r_[np.arange(min_, max_, tile_size), max_] |
| for min_, max_ in zip(bbox.min_, bbox.max_) |
| ] |
| bbox_tiles = {} |
| for i, xmin in enumerate(bounds_x[:-1]): |
| for j, ymin in enumerate(bounds_y[:-1]): |
| bbox_tiles[i, j] = BoundaryBox( |
| [xmin, ymin], [bounds_x[i + 1], bounds_y[j + 1]] |
| ) |
|
|
| tiles = {} |
| for ij, bbox_tile in bbox_tiles.items(): |
| canvas = Canvas(bbox_tile, ppm) |
| nodes, lines, areas = map_index.query(bbox_tile) |
| masks = render_raster_masks(nodes, lines, areas, canvas) |
| canvas.raster = render_raster_map(masks) |
| tiles[ij] = canvas |
|
|
| groups = {k: v for k, v in vars(Groups).items() if not k.startswith("__")} |
|
|
| return cls(tiles, bbox, tile_size, ppm, projection, groups, map_data) |
|
|
| def query(self, bbox: BoundaryBox) -> Canvas: |
| bbox = round_bbox(bbox, self.bbox.min_, self.ppm) |
| canvas = Canvas(bbox, self.ppm) |
| raster = np.zeros((3, canvas.h, canvas.w), np.uint8) |
|
|
| bbox_all = bbox & self.bbox |
| ij_min = np.floor((bbox_all.min_ - self.origin) / self.tile_size).astype(int) |
| ij_max = np.ceil((bbox_all.max_ - self.origin) / self.tile_size).astype(int) - 1 |
| for i in range(ij_min[0], ij_max[0] + 1): |
| for j in range(ij_min[1], ij_max[1] + 1): |
| tile = self.tiles[i, j] |
| bbox_select = tile.bbox & bbox |
| slice_query = bbox_to_slice(bbox_select, canvas) |
| slice_tile = bbox_to_slice(bbox_select, tile) |
| raster[(slice(None),) + slice_query] = tile.raster[ |
| (slice(None),) + slice_tile |
| ] |
| canvas.raster = raster |
| return canvas |
|
|
| def save(self, path: Path): |
| dump = { |
| "bbox": self.bbox.format(), |
| "tile_size": self.tile_size, |
| "ppm": self.ppm, |
| "groups": self.groups, |
| "tiles_bbox": {}, |
| "tiles_raster": {}, |
| } |
| if self.projection is not None: |
| dump["ref_latlonalt"] = self.projection.latlonalt |
| for ij, canvas in self.tiles.items(): |
| dump["tiles_bbox"][ij] = canvas.bbox.format() |
| raster_bytes = io.BytesIO() |
| raster = Image.fromarray(canvas.raster.transpose(1, 2, 0).astype(np.uint8)) |
| raster.save(raster_bytes, format="PNG") |
| dump["tiles_raster"][ij] = raster_bytes |
| with open(path, "wb") as fp: |
| pickle.dump(dump, fp) |
|
|
| @classmethod |
| def load(cls, path: Path): |
| with path.open("rb") as fp: |
| dump = pickle.load(fp) |
| tiles = {} |
| for ij, bbox in dump["tiles_bbox"].items(): |
| tiles[ij] = Canvas(BoundaryBox.from_string(bbox), dump["ppm"]) |
| raster = np.asarray(Image.open(dump["tiles_raster"][ij])) |
| tiles[ij].raster = raster.transpose(2, 0, 1).copy() |
| projection = Projection(*dump["ref_latlonalt"]) |
| return cls( |
| tiles, |
| BoundaryBox.from_string(dump["bbox"]), |
| dump["tile_size"], |
| dump["ppm"], |
| projection, |
| dump["groups"], |
| ) |
|
|