diff --git a/bincio/extract/cli.py b/bincio/extract/cli.py index d57b03d..814316e 100644 --- a/bincio/extract/cli.py +++ b/bincio/extract/cli.py @@ -1,6 +1,7 @@ """bincio extract — CLI command.""" import json +import os import sys from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path @@ -12,15 +13,88 @@ from rich.progress import BarColumn, MofNCompleteColumn, Progress, TextColumn, T from bincio.extract.config import ExtractConfig, default_config, load_config from bincio.extract.dedup import ActivityRecord, DedupIndex -from bincio.extract.metrics import compute -from bincio.extract.models import ParsedActivity -from bincio.extract.parsers.factory import is_supported, parse_file -from bincio.extract.strava_csv import StravaMetadata -from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index +from bincio.extract.parsers.factory import is_supported console = Console() +# ── per-worker state (set once via initializer, never re-pickled) ───────────── + +_known_hashes: frozenset = frozenset() +_strava_lookup: dict = {} +_output_dir: Path = Path(".") +_privacy: str = "public" +_rdp_epsilon: float = 0.0001 + + +def _worker_init( + known_hashes: frozenset, + strava_lookup: dict, + output_dir: Path, + privacy: str, + rdp_epsilon: float, +) -> None: + global _known_hashes, _strava_lookup, _output_dir, _privacy, _rdp_epsilon + _known_hashes = known_hashes + _strava_lookup = strava_lookup + _output_dir = output_dir + _privacy = privacy + _rdp_epsilon = rdp_epsilon + + +def _process_file(path: Path) -> dict: + """Runs inside a worker process. Only receives a Path (tiny pickle). + All heavy shared data (_known_hashes, _strava_lookup, etc.) is already + in the worker's memory from the initializer — zero per-task overhead. + """ + from bincio.extract.metrics import compute + from bincio.extract.parsers.factory import parse_file + from bincio.extract.writer import build_summary, make_activity_id, write_activity + + try: + activity = parse_file(path) + except Exception as exc: + return {"status": "error", "path": str(path), "error": str(exc)} + + # Exact-duplicate check (free — just a set lookup) + if activity.source_hash in _known_hashes: + return {"status": "duplicate"} + + # Enrich from Strava CSV + row = _strava_lookup.get(activity.source_file) + if row: + if not activity.title: + activity.title = row.get("Activity Name", "").strip() or None + if not activity.description: + activity.description = row.get("Activity Description", "").strip() or None + if not activity.strava_id: + activity.strava_id = row.get("Activity ID", "").strip() or None + + try: + metrics = compute(activity) + activity_id = make_activity_id(activity) + write_activity( + activity, metrics, _output_dir, + privacy=_privacy, + rdp_epsilon=_rdp_epsilon, + ) + summary = build_summary(activity, metrics, activity_id, _privacy) + except Exception as exc: + return {"status": "error", "path": str(path), "error": str(exc)} + + return { + "status": "ok", + "summary": summary, + "id": activity_id, + "hash": activity.source_hash, + "started_at": activity.started_at.isoformat(), + "distance_m": metrics.distance_m, + "source": summary.get("source"), + } + + +# ── CLI ──────────────────────────────────────────────────────────────────────── + @click.command() @click.option("--config", "config_path", type=click.Path(exists=True), default=None, help="Path to extract_config.yaml (default: ./extract_config.yaml).") @@ -32,49 +106,48 @@ console = Console() help="Process a single file and print JSON to stdout.") @click.option("--since", default=None, metavar="YYYY-MM-DD", help="Only process files modified after this date.") -@click.option("--workers", default=4, show_default=True, - help="Number of parallel worker processes.") +@click.option("--workers", default=None, type=int, + help="Parallel worker processes (default: CPU count).") def extract( config_path: Optional[str], input_dir: Optional[str], output_dir: Optional[str], single_file: Optional[str], since: Optional[str], - workers: int, + workers: Optional[int], ) -> None: """Parse GPX/FIT/TCX files and write BAS JSON data store.""" - # ── single file mode ───────────────────────────────────────────────────── if single_file: _process_single(Path(single_file)) return - # ── load config ────────────────────────────────────────────────────────── cfg = _resolve_config(config_path, input_dir, output_dir) cfg.output_dir.mkdir(parents=True, exist_ok=True) - # ── gather files ───────────────────────────────────────────────────────── files = _collect_files(cfg, since) if not files: console.print("[yellow]No supported files found.[/yellow]") return console.print(f"Found [bold]{len(files)}[/bold] activity files.") - # ── Strava metadata ────────────────────────────────────────────────────── - strava_meta: Optional[StravaMetadata] = None + # Build strava lookup once (serialised dict, sent to workers via initializer) + strava_lookup: dict = {} if cfg.metadata_csv and cfg.metadata_csv.exists(): - strava_meta = StravaMetadata(cfg.metadata_csv) + from bincio.extract.strava_csv import StravaMetadata + strava_lookup = StravaMetadata(cfg.metadata_csv)._by_filename console.print(f"Loaded Strava metadata from [cyan]{cfg.metadata_csv.name}[/cyan].") - # ── dedup index ────────────────────────────────────────────────────────── dedup = DedupIndex(output_dir=cfg.output_dir) + known_hashes: frozenset = frozenset(dedup._by_hash.keys()) - # ── process ────────────────────────────────────────────────────────────── - summaries: list[dict] = [] - errors: list[tuple[Path, str]] = [] - skipped = 0 + n_workers = workers or os.cpu_count() or 4 + console.print(f"Using [bold]{n_workers}[/bold] worker processes.") owner = {"handle": cfg.owner_handle, "display_name": cfg.owner_display_name} + summaries: list[dict] = [] + errors: list[tuple[str, str]] = [] + skipped = 0 with Progress( TextColumn("[progress.description]{task.description}"), @@ -85,80 +158,52 @@ def extract( ) as progress: task = progress.add_task("Processing...", total=len(files)) - with ProcessPoolExecutor(max_workers=workers) as pool: - futures = {pool.submit(_parse_worker, f): f for f in files} + with ProcessPoolExecutor( + max_workers=n_workers, + initializer=_worker_init, + initargs=(known_hashes, strava_lookup, cfg.output_dir, cfg.default_privacy, cfg.track.rdp_epsilon), + ) as pool: + futures = {pool.submit(_process_file, f): f for f in files} for future in as_completed(futures): - path = futures[future] progress.advance(task) - try: - activity = future.result() - except Exception as exc: - errors.append((path, str(exc))) - continue + result = future.result() - # ── incremental skip ────────────────────────────────────── - if cfg.incremental: - existing_id = dedup.is_exact_duplicate(activity.source_hash) - if existing_id: - skipped += 1 - continue + if result["status"] == "duplicate": + skipped += 1 + elif result["status"] == "error": + errors.append((result["path"], result["error"])) + else: + # Near-duplicate check — must be sequential (stateful) + from datetime import datetime + started_at = datetime.fromisoformat(result["started_at"]) + near_id = dedup.find_near_duplicate(started_at, result["distance_m"]) - # ── enrich from Strava CSV ──────────────────────────────── - if strava_meta: - strava_meta.enrich(activity.source_file, activity) + if near_id: + canonical = dedup.pick_canonical(near_id, result.get("source")) + if canonical != "__new__": + _patch_duplicate_of(cfg.output_dir, result["id"], near_id) + skipped += 1 + continue + _patch_duplicate_of(cfg.output_dir, near_id, result["id"]) + dedup._records[near_id].duplicate_of = result["id"] - # ── compute metrics ─────────────────────────────────────── - metrics = compute(activity) + dedup.register(ActivityRecord( + id=result["id"], + source_hash=result["hash"], + started_at=started_at, + distance_m=result["distance_m"], + source=result.get("source"), + )) + summaries.append(result["summary"]) - # ── deduplication ───────────────────────────────────────── - activity_id = make_activity_id(activity) - duplicate_of: Optional[str] = None - - near_dup_id = dedup.find_near_duplicate( - activity.started_at, metrics.distance_m - ) - if near_dup_id: - source = _infer_source(activity) - canonical = dedup.pick_canonical(near_dup_id, source) - if canonical == "__new__": - # New one is better — mark existing as duplicate - existing = dedup._records[near_dup_id] - existing.duplicate_of = activity_id - else: - duplicate_of = near_dup_id - - # ── write files ─────────────────────────────────────────── - written_id = write_activity( - activity, metrics, cfg.output_dir, - privacy=cfg.default_privacy, - duplicate_of=duplicate_of, - rdp_epsilon=cfg.track.rdp_epsilon, - ) - - # Register in dedup index - dedup.register(ActivityRecord( - id=written_id, - source_hash=activity.source_hash, - started_at=activity.started_at, - distance_m=metrics.distance_m, - source=_infer_source(activity), - )) - - if duplicate_of is None: - summaries.append( - build_summary(activity, metrics, written_id, cfg.default_privacy) - ) - - # ── write index.json ────────────────────────────────────────────────────── - # Merge with any existing summaries from previous incremental runs - existing_index = _load_existing_summaries(cfg.output_dir) - all_summaries = {s["id"]: s for s in existing_index} + from bincio.extract.writer import write_index + existing = _load_existing_summaries(cfg.output_dir) + merged = {s["id"]: s for s in existing} for s in summaries: - all_summaries[s["id"]] = s - write_index(list(all_summaries.values()), cfg.output_dir, owner) + merged[s["id"]] = s + write_index(list(merged.values()), cfg.output_dir, owner) dedup.save() - # ── summary ─────────────────────────────────────────────────────────────── console.print( f"\n[green]Done.[/green] " f"Processed [bold]{len(summaries)}[/bold] activities, " @@ -168,28 +213,22 @@ def extract( if errors: console.print("\n[red]Errors:[/red]") for path, msg in errors[:20]: - console.print(f" {path.name}: {msg}") + console.print(f" {Path(path).name}: {msg}") if len(errors) > 20: console.print(f" ... and {len(errors) - 20} more.") # ── helpers ─────────────────────────────────────────────────────────────────── -def _parse_worker(path: Path) -> ParsedActivity: - """Run in worker process — imports are isolated.""" - from bincio.extract.parsers.factory import parse_file - return parse_file(path) - - def _process_single(path: Path) -> None: + from bincio.extract.metrics import compute from bincio.extract.parsers.factory import parse_file + from bincio.extract.writer import build_summary, make_activity_id try: activity = parse_file(path) metrics = compute(activity) activity_id = make_activity_id(activity) - from bincio.extract.writer import build_summary - result = build_summary(activity, metrics, activity_id) - click.echo(json.dumps(result, indent=2)) + click.echo(json.dumps(build_summary(activity, metrics, activity_id), indent=2)) except Exception as exc: console.print(f"[red]Error:[/red] {exc}") sys.exit(1) @@ -221,51 +260,39 @@ def _resolve_config( def _collect_files(cfg: ExtractConfig, since: Optional[str]) -> list[Path]: - from bincio.extract.parsers.factory import is_supported - import os from datetime import datetime - since_ts: Optional[float] = None if since: since_ts = datetime.strptime(since, "%Y-%m-%d").timestamp() - files = [] for d in cfg.input_dirs: if not d.exists(): console.print(f"[yellow]Warning:[/yellow] input dir not found: {d}") continue for path in d.rglob("*"): - if not path.is_file(): - continue - if not is_supported(path): - continue - if since_ts and path.stat().st_mtime < since_ts: - continue - files.append(path) + if path.is_file() and is_supported(path): + if not since_ts or path.stat().st_mtime >= since_ts: + files.append(path) return files def _load_existing_summaries(output_dir: Path) -> list[dict]: - index_path = output_dir / "index.json" - if not index_path.exists(): + p = output_dir / "index.json" + if not p.exists(): return [] try: - data = json.loads(index_path.read_text()) - return data.get("activities", []) + return json.loads(p.read_text()).get("activities", []) except Exception: return [] -def _infer_source(activity: ParsedActivity) -> Optional[str]: - if activity.strava_id: - return "strava_export" - name = activity.source_file.lower() - if "activity" in name and len(name.split(".")) >= 3: - return "karoo" - if name.endswith((".fit", ".fit.gz")): - return "fit_file" - if name.endswith((".gpx", ".gpx.gz")): - return "gpx_file" - if name.endswith((".tcx", ".tcx.gz")): - return "tcx_file" - return None +def _patch_duplicate_of(output_dir: Path, activity_id: str, canonical_id: str) -> None: + p = output_dir / "activities" / f"{activity_id}.json" + if not p.exists(): + return + try: + data = json.loads(p.read_text()) + data["duplicate_of"] = canonical_id + p.write_text(json.dumps(data, indent=2, ensure_ascii=False)) + except Exception: + pass diff --git a/bincio/extract/dedup.py b/bincio/extract/dedup.py index 642b3f9..90b50d6 100644 --- a/bincio/extract/dedup.py +++ b/bincio/extract/dedup.py @@ -108,6 +108,8 @@ class DedupIndex: if distance_m is None or r.distance_m is None: continue ref = max(distance_m, r.distance_m) + if ref < 1.0: + continue # both near-zero (indoor/manual) — skip distance check if abs(distance_m - r.distance_m) / ref < 0.05: return r.id return None diff --git a/bincio/extract/metrics.py b/bincio/extract/metrics.py index ad589f8..f492825 100644 --- a/bincio/extract/metrics.py +++ b/bincio/extract/metrics.py @@ -1,19 +1,28 @@ """Compute aggregated metrics from a ParsedActivity. All calculations are self-contained — no external state needed. +Uses inline haversine rather than geopy.geodesic to keep the hot path fast. """ import math from dataclasses import dataclass -from datetime import datetime from typing import Optional -from geopy.distance import geodesic - from bincio.extract.models import DataPoint, ParsedActivity # Speed below which we consider the athlete stopped (km/h) _STOPPED_THRESHOLD_KMH = 1.0 +_EARTH_R = 6_371_000.0 # metres + + +def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Great-circle distance in metres. ~10x faster than geopy.geodesic.""" + phi1 = math.radians(lat1) + phi2 = math.radians(lat2) + dphi = phi2 - phi1 + dlam = math.radians(lon2 - lon1) + a = math.sin(dphi * 0.5) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam * 0.5) ** 2 + return 2.0 * _EARTH_R * math.asin(math.sqrt(min(a, 1.0))) @dataclass @@ -30,7 +39,7 @@ class ComputedMetrics: avg_cadence_rpm: Optional[int] avg_power_w: Optional[int] max_power_w: Optional[int] - bbox: Optional[tuple[float, float, float, float]] # min_lon, min_lat, max_lon, max_lat + bbox: Optional[tuple[float, float, float, float]] # min_lon, min_lat, max_lon, max_lat start_latlng: Optional[tuple[float, float]] end_latlng: Optional[tuple[float, float]] @@ -41,10 +50,8 @@ def compute(activity: ParsedActivity) -> ComputedMetrics: return _empty() duration_s = _duration(pts) - distance_m = _distance(pts) - moving_time_s, moving_speed_kmh = _moving_stats(pts) + distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh = _gps_stats(pts) gain, loss = _elevation(pts) - max_speed = _max_speed(pts) avg_hr, max_hr = _hr_stats(pts) avg_cad = _avg_nonnull([p.cadence_rpm for p in pts]) avg_pow = _avg_nonnull([p.power_w for p in pts]) @@ -58,8 +65,8 @@ def compute(activity: ParsedActivity) -> ComputedMetrics: moving_time_s=moving_time_s, elevation_gain_m=round(gain, 1) if gain is not None else None, elevation_loss_m=round(abs(loss), 1) if loss is not None else None, - avg_speed_kmh=round(moving_speed_kmh, 2) if moving_speed_kmh else None, - max_speed_kmh=round(max_speed, 2) if max_speed else None, + avg_speed_kmh=round(avg_speed_kmh, 2) if avg_speed_kmh else None, + max_speed_kmh=round(max_speed_kmh, 2) if max_speed_kmh else None, avg_hr_bpm=avg_hr, max_hr_bpm=max_hr, avg_cadence_rpm=avg_cad, @@ -71,66 +78,75 @@ def compute(activity: ParsedActivity) -> ComputedMetrics: ) -# ── helpers ────────────────────────────────────────────────────────────────── +# ── single-pass GPS stats ────────────────────────────────────────────────────── +# distance, moving time, avg speed, and max speed are all derived from the same +# per-segment loop, so we compute them in one pass instead of four. -def _duration(pts: list[DataPoint]) -> Optional[int]: - if len(pts) < 2: - return None - return int((pts[-1].timestamp - pts[0].timestamp).total_seconds()) +def _gps_stats( + pts: list[DataPoint], +) -> tuple[Optional[float], Optional[int], Optional[float], Optional[float]]: + """Return (distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh).""" - -def _distance(pts: list[DataPoint]) -> Optional[float]: - """Prefer device-recorded cumulative distance; fall back to GPS geodesic.""" - # If the last point has a device distance, use it - last_dist = next( + # Prefer device-recorded cumulative distance (FIT files always have this) + device_dist = next( (p.distance_m for p in reversed(pts) if p.distance_m is not None), None ) - if last_dist is not None: - return round(last_dist, 1) - # GPS fallback - total = 0.0 - has_gps = False - for a, b in zip(pts, pts[1:]): - if a.lat is None or a.lon is None or b.lat is None or b.lon is None: - continue - has_gps = True - total += geodesic((a.lat, a.lon), (b.lat, b.lon)).meters - return round(total, 1) if has_gps else None - - -def _moving_stats(pts: list[DataPoint]) -> tuple[Optional[int], Optional[float]]: - """Return (moving_time_s, avg_speed_kmh_over_moving_time).""" moving_s = 0 moving_dist_m = 0.0 - has_gps = False + total_dist_m = 0.0 + max_seg_kmh = 0.0 + has_data = False + + # Device speed values (used for max if present) + device_max_kmh: Optional[float] = None + if any(p.speed_kmh is not None for p in pts): + device_max_kmh = max(p.speed_kmh for p in pts if p.speed_kmh is not None) for a, b in zip(pts, pts[1:]): dt = (b.timestamp - a.timestamp).total_seconds() if dt <= 0: continue - # Compute speed for this interval from GPS if a.lat is not None and a.lon is not None and b.lat is not None and b.lon is not None: - has_gps = True - seg_m = geodesic((a.lat, a.lon), (b.lat, b.lon)).meters + seg_m = _haversine_m(a.lat, a.lon, b.lat, b.lon) seg_kmh = (seg_m / dt) * 3.6 + has_data = True elif a.speed_kmh is not None: seg_kmh = a.speed_kmh seg_m = (seg_kmh / 3.6) * dt - has_gps = True # speed data present + has_data = True else: continue + total_dist_m += seg_m + if seg_kmh > max_seg_kmh: + max_seg_kmh = seg_kmh + if seg_kmh >= _STOPPED_THRESHOLD_KMH: moving_s += int(dt) moving_dist_m += seg_m - if not has_gps or moving_s == 0: - return None, None + if not has_data: + return device_dist, None, None, None - avg_kmh = (moving_dist_m / moving_s) * 3.6 - return moving_s, avg_kmh + distance_m = device_dist if device_dist is not None else round(total_dist_m, 1) + moving_time_s = moving_s if moving_s > 0 else None + avg_speed_kmh = (moving_dist_m / moving_s) * 3.6 if moving_s > 0 else None + # Prefer device speed for max (more stable than GPS-derived per-second spikes) + max_speed_kmh = device_max_kmh if device_max_kmh is not None else ( + max_seg_kmh if max_seg_kmh > 0 else None + ) + + return distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh + + +# ── remaining helpers ────────────────────────────────────────────────────────── + +def _duration(pts: list[DataPoint]) -> Optional[int]: + if len(pts) < 2: + return None + return int((pts[-1].timestamp - pts[0].timestamp).total_seconds()) def _elevation(pts: list[DataPoint]) -> tuple[Optional[float], Optional[float]]: @@ -147,24 +163,6 @@ def _elevation(pts: list[DataPoint]) -> tuple[Optional[float], Optional[float]]: return gain, loss -def _max_speed(pts: list[DataPoint]) -> Optional[float]: - # Prefer device speed; fall back to GPS-derived - device_speeds = [p.speed_kmh for p in pts if p.speed_kmh is not None] - if device_speeds: - return max(device_speeds) - # GPS-derived max - gps_speeds = [] - for a, b in zip(pts, pts[1:]): - if a.lat is None or b.lat is None: - continue - dt = (b.timestamp - a.timestamp).total_seconds() - if dt <= 0: - continue - m = geodesic((a.lat, a.lon), (b.lat, b.lon)).meters - gps_speeds.append((m / dt) * 3.6) - return max(gps_speeds) if gps_speeds else None - - def _hr_stats(pts: list[DataPoint]) -> tuple[Optional[int], Optional[int]]: hrs = [p.hr_bpm for p in pts if p.hr_bpm is not None] if not hrs: diff --git a/bincio/extract/parsers/tcx.py b/bincio/extract/parsers/tcx.py index bda8998..c80e9d1 100644 --- a/bincio/extract/parsers/tcx.py +++ b/bincio/extract/parsers/tcx.py @@ -8,18 +8,24 @@ from lxml import etree from bincio.extract.models import DataPoint, ParsedActivity from bincio.extract.sport import normalise_sport -_NS = { +_NS_HTTP = { "tcx": "http://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2", "ext": "http://www.garmin.com/xmlschemas/ActivityExtension/v2", } +_NS_HTTPS = { + "tcx": "https://www.garmin.com/xmlschemas/TrainingCenterDatabase/v2", + "ext": "https://www.garmin.com/xmlschemas/ActivityExtension/v2", +} class TcxParser: def parse(self, path: Path, raw_bytes: bytes) -> ParsedActivity: - # Some exporters (e.g. Garmin) prepend whitespace before the XML - # declaration, which is technically invalid. Strip it. + # Some exporters prepend whitespace before the XML declaration. Strip it. root = etree.fromstring(raw_bytes.lstrip()) + # Garmin sometimes uses https:// instead of http:// in the namespace URI. + _NS = _NS_HTTPS if b"https://www.garmin.com" in raw_bytes else _NS_HTTP + activities = root.findall(".//tcx:Activity", _NS) if not activities: raise ValueError(f"No Activity elements found in {path.name}") diff --git a/bincio/extract/writer.py b/bincio/extract/writer.py index 8872011..4376480 100644 --- a/bincio/extract/writer.py +++ b/bincio/extract/writer.py @@ -12,11 +12,13 @@ from bincio.extract.timeseries import build_timeseries def make_activity_id(activity: ParsedActivity) -> str: - """Generate a BAS activity ID from started_at + optional title slug.""" - ts = activity.started_at - # Compact ISO format: 2024-06-01T073012+0200 - tz_str = ts.strftime("%z") # e.g. "+0200" or "" - ts_part = ts.strftime("%Y-%m-%dT%H%M%S") + (tz_str or "Z") + """Generate a BAS activity ID from started_at + optional title slug. + + Always uses UTC with Z suffix so IDs are URL-safe (no + chars). + """ + from datetime import timezone + ts = activity.started_at.astimezone(timezone.utc) + ts_part = ts.strftime("%Y-%m-%dT%H%M%SZ") if activity.title: slug = _slugify(activity.title) diff --git a/pyproject.toml b/pyproject.toml index 66a7e88..f6f8893 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ dependencies = [ # Data "pandas>=2.2", # Geo - "geopy>=2.4", "rdp>=0.8", # Config & CLI "pyyaml>=6.0",