"""GPS track simplification using the Ramer-Douglas-Peucker algorithm.""" from typing import Optional from rdp import rdp from bincio.extract.models import DataPoint def simplify_track( points: list[DataPoint], epsilon: float = 0.0001, ) -> list[DataPoint]: """Return a simplified subset of points using RDP. epsilon is in degrees (~11m at equator for 0.0001). Points without GPS coordinates are dropped. """ gps_pts = [(p, p.lat, p.lon) for p in points if p.lat is not None and p.lon is not None] if len(gps_pts) < 2: return [p for p, _, _ in gps_pts] coords = [[lon, lat] for _, lat, lon in gps_pts] mask = rdp(coords, epsilon=epsilon, return_mask=True) return [p for (p, _, _), keep in zip(gps_pts, mask) if keep] def preview_coords( points: list[DataPoint], max_points: int = 20, ) -> list[list[float]] | None: """Return a small list of [lat, lon] pairs for card thumbnail rendering. Uses a coarser RDP pass, then subsamples to at most max_points. Returns None if there is no GPS data. """ gps = [(p.lat, p.lon) for p in points if p.lat is not None and p.lon is not None] if len(gps) < 2: return None # Coarse RDP (larger epsilon = fewer points) coords = [[lon, lat] for lat, lon in gps] mask = rdp(coords, epsilon=0.001, return_mask=True) reduced = [gps[i] for i, keep in enumerate(mask) if keep] # Subsample if still too many if len(reduced) > max_points: step = len(reduced) / max_points reduced = [reduced[int(i * step)] for i in range(max_points)] reduced.append(gps[-1]) # always include the last point return [[round(lat, 5), round(lon, 5)] for lat, lon in reduced] def build_geojson( points: list[DataPoint], activity_id: str, epsilon: float = 0.0001, original_count: Optional[int] = None, ) -> dict: """Build a GeoJSON Feature for the simplified track.""" simplified = simplify_track(points, epsilon=epsilon) coordinates = [ [p.lon, p.lat, p.elevation_m] if p.elevation_m is not None else [p.lon, p.lat] for p in simplified if p.lon is not None and p.lat is not None ] # Parallel speed array for gradient coloring speeds = [round(p.speed_kmh, 2) if p.speed_kmh is not None else None for p in simplified] return { "type": "Feature", "geometry": { "type": "LineString", "coordinates": coordinates, }, "properties": { "id": activity_id, "speeds": speeds, "simplification": "rdp", "rdp_epsilon": epsilon, "point_count_original": original_count or len(points), "point_count_simplified": len(coordinates), }, }