diff --git a/bincio/edit/server.py b/bincio/edit/server.py index 0cd8077..ac39035 100644 --- a/bincio/edit/server.py +++ b/bincio/edit/server.py @@ -554,10 +554,15 @@ async def upload_activity(file: UploadFile = File(...)) -> JSONResponse: if suffix not in _SUPPORTED_SUFFIXES: raise HTTPException(400, f"Unsupported file type '{Path(name).suffix}'. Expected FIT, GPX, or TCX.") + _MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB + contents = await file.read() + if len(contents) > _MAX_UPLOAD_BYTES: + raise HTTPException(413, f"File too large ({len(contents)} bytes). Maximum is 50 MB.") + staging = dd / "_uploads" staging.mkdir(exist_ok=True) staged = staging / name - staged.write_bytes(await file.read()) + staged.write_bytes(contents) try: from bincio.extract.metrics import compute @@ -592,7 +597,7 @@ async def upload_activity(file: UploadFile = File(...)) -> JSONResponse: except HTTPException: raise except Exception as exc: - raise HTTPException(422, str(exc)) + raise HTTPException(422, f"Failed to process activity file: {type(exc).__name__}") finally: staged.unlink(missing_ok=True) diff --git a/bincio/extract/cli.py b/bincio/extract/cli.py index 5a5d1ba..9150e8d 100644 --- a/bincio/extract/cli.py +++ b/bincio/extract/cli.py @@ -46,6 +46,9 @@ def _process_file(path: Path) -> dict: """Runs inside a worker process. Only receives a Path (tiny pickle). All heavy shared data (_known_hashes, _strava_lookup, etc.) is already in the worker's memory from the initializer — zero per-task overhead. + + Writes to pending files (not final paths) so the main process can + arbitrate collisions and pick the best version. """ from bincio.extract.metrics import compute from bincio.extract.parsers.factory import parse_file @@ -80,11 +83,17 @@ def _process_file(path: Path) -> dict: activity, metrics, _output_dir, privacy=_privacy, rdp_epsilon=_rdp_epsilon, + pending=True, ) summary = build_summary(activity, metrics, activity_id, _privacy) except Exception as exc: return {"status": "error", "path": str(path), "error": str(exc)} + # Quality signals for the main process to compare competing results + sensor_channels = sum(1 for v in [ + metrics.avg_hr_bpm, metrics.avg_power_w, metrics.avg_cadence_rpm, + ] if v is not None) + return { "status": "ok", "summary": summary, @@ -94,6 +103,8 @@ def _process_file(path: Path) -> dict: "distance_m": metrics.distance_m, "source": summary.get("source"), "mmp": metrics.mmp, + "point_count": len(activity.points), + "sensor_channels": sensor_channels, } @@ -177,6 +188,8 @@ def extract( summaries: list[dict] = [] errors: list[tuple[str, str]] = [] skipped = 0 + # Collect all pending results, grouped by activity_id for collision arbitration + pending_by_id: dict[str, list[dict]] = {} with Progress( TextColumn("[progress.description]{task.description}"), @@ -202,30 +215,61 @@ def extract( elif result["status"] == "error": errors.append((result["path"], result["error"])) else: - # Near-duplicate check — must be sequential (stateful) - from datetime import datetime - started_at = datetime.fromisoformat(result["started_at"]) - near_id = dedup.find_near_duplicate(started_at, result["distance_m"]) + pending_by_id.setdefault(result["id"], []).append(result) - if near_id: - canonical = dedup.pick_canonical(near_id, result.get("source")) - if canonical != "__new__": - _patch_duplicate_of(cfg.output_dir, result["id"], near_id) - skipped += 1 - continue - _patch_duplicate_of(cfg.output_dir, near_id, result["id"]) - dedup._records[near_id].duplicate_of = result["id"] + # ── Arbitrate collisions and finalize pending files ─────────────────────── + from bincio.extract.writer import ( + activity_quality, cleanup_pending, finalize_pending, write_athlete_json, write_index, + ) - dedup.register(ActivityRecord( - id=result["id"], - source_hash=result["hash"], - started_at=started_at, - distance_m=result["distance_m"], - source=result.get("source"), - )) - summaries.append(result["summary"]) + for activity_id, candidates in pending_by_id.items(): + # Pick the best candidate by quality score + candidates.sort(key=activity_quality, reverse=True) + winner = candidates[0] + + # Clean up losing candidates' pending files + for loser in candidates[1:]: + cleanup_pending(cfg.output_dir, activity_id, loser["hash"]) + skipped += 1 + + # Near-duplicate check against already-known activities + from datetime import datetime + started_at = datetime.fromisoformat(winner["started_at"]) + near_id = dedup.find_near_duplicate(started_at, winner["distance_m"]) + + if near_id: + canonical = dedup.pick_canonical(near_id, winner.get("source")) + if canonical != "__new__": + # Existing is better — finalize winner as duplicate, then patch it + final_id = finalize_pending(cfg.output_dir, activity_id, winner["hash"]) + _patch_duplicate_of(cfg.output_dir, final_id, near_id) + skipped += 1 + continue + # New is better — patch the existing one as duplicate + final_id = finalize_pending(cfg.output_dir, activity_id, winner["hash"]) + _patch_duplicate_of(cfg.output_dir, near_id, final_id) + dedup._records[near_id].duplicate_of = final_id + else: + final_id = finalize_pending(cfg.output_dir, activity_id, winner["hash"]) + + # Update summary with the finalized ID (may include hash suffix) + summary = winner["summary"] + if final_id != activity_id: + summary = dict(summary) + summary["id"] = final_id + summary["detail_url"] = f"activities/{final_id}.json" + if summary.get("track_url"): + summary["track_url"] = f"activities/{final_id}.geojson" + + dedup.register(ActivityRecord( + id=final_id, + source_hash=winner["hash"], + started_at=started_at, + distance_m=winner["distance_m"], + source=winner.get("source"), + )) + summaries.append(summary) - from bincio.extract.writer import write_athlete_json, write_index existing = _load_existing_summaries(cfg.output_dir) merged = {s["id"]: s for s in existing} for s in summaries: diff --git a/bincio/extract/parsers/gpx.py b/bincio/extract/parsers/gpx.py index bdf85df..9208e87 100644 --- a/bincio/extract/parsers/gpx.py +++ b/bincio/extract/parsers/gpx.py @@ -76,6 +76,8 @@ def _apply_extensions(pt: gpxpy.gpx.GPXTrackPoint, dp: DataPoint) -> None: dp.temperature_c = float(val) elif tag == "speed": dp.speed_kmh = float(val) * 3.6 # m/s → km/h + elif tag in ("pwr", "power", "watts"): + dp.power_w = int(float(val)) def _strip_ns(tag: str) -> str: diff --git a/bincio/extract/parsers/tcx.py b/bincio/extract/parsers/tcx.py index ddd60e4..5d52c2e 100644 --- a/bincio/extract/parsers/tcx.py +++ b/bincio/extract/parsers/tcx.py @@ -97,8 +97,8 @@ def _parse_ts(s: str) -> datetime: return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc) except ValueError: continue - # Numeric offset like +02:00 or -05:30 — parse with %z then convert to UTC - m = _re.match(r"^(.+)([+-]\d{2}:\d{2})$", s) + # Numeric offset like +02:00, -05:30, or +0200 — parse with %z then convert to UTC + m = _re.match(r"^(.+)([+-]\d{2}:?\d{2})$", s) if m: body, off = m.group(1), m.group(2).replace(":", "") for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"): diff --git a/bincio/extract/simplify.py b/bincio/extract/simplify.py index 7fd900f..de9488d 100644 --- a/bincio/extract/simplify.py +++ b/bincio/extract/simplify.py @@ -67,8 +67,12 @@ def build_geojson( if p.lon is not None and p.lat is not None ] - # Parallel speed array for gradient coloring - speeds = [round(p.speed_kmh, 2) if p.speed_kmh is not None else None for p in simplified] + # Parallel speed array for gradient coloring — same filter as coordinates + speeds = [ + round(p.speed_kmh, 2) if p.speed_kmh is not None else None + for p in simplified + if p.lon is not None and p.lat is not None + ] return { "type": "Feature", diff --git a/bincio/extract/writer.py b/bincio/extract/writer.py index ed1c7e0..55e97c7 100644 --- a/bincio/extract/writer.py +++ b/bincio/extract/writer.py @@ -33,8 +33,16 @@ def write_activity( privacy: str = "public", duplicate_of: str | None = None, rdp_epsilon: float = 0.0001, + pending: bool = False, ) -> str: - """Write {id}.json and (if GPS available) {id}.geojson. Returns the ID.""" + """Write {id}.json and (if GPS available) {id}.geojson. Returns the ID. + + When pending=True, writes to a uniquely-named pending file + ({id}.{hash[:8]}.pending.json) instead of the final path. This avoids + race conditions when multiple workers process activities with the same ID. + The main process is responsible for promoting pending files to final paths + via finalize_pending(). + """ activity_id = make_activity_id(activity) acts_dir = output_dir / "activities" acts_dir.mkdir(parents=True, exist_ok=True) @@ -82,26 +90,108 @@ def write_activity( "custom": {}, } - json_path = acts_dir / f"{activity_id}.json" - # Collision guard: if a *different* activity already has this ID, append a - # short hash suffix to disambiguate (same hash = idempotent re-extract). - if json_path.exists(): - existing = json.loads(json_path.read_text(encoding="utf-8")) - if existing.get("source_hash") != activity.source_hash: - activity_id = f"{activity_id}-{activity.source_hash[-6:]}" - json_path = acts_dir / f"{activity_id}.json" - detail["id"] = activity_id + if pending: + # Write to a unique pending file — no collision possible + tag = activity.source_hash[-8:] if activity.source_hash else "unknown" + json_path = acts_dir / f"{activity_id}.{tag}.pending.json" + else: + json_path = acts_dir / f"{activity_id}.json" + # Legacy non-pending path: collision guard for callers that don't use + # the pending workflow (e.g. edit server upload_activity) + if json_path.exists(): + existing = json.loads(json_path.read_text(encoding="utf-8")) + if existing.get("source_hash") != activity.source_hash: + activity_id = f"{activity_id}-{activity.source_hash[-6:]}" + json_path = acts_dir / f"{activity_id}.json" + detail["id"] = activity_id + json_path.write_text(json.dumps(detail, indent=2, ensure_ascii=False)) # ── GeoJSON track ──────────────────────────────────────────────────────── if has_gps: geojson = build_geojson(activity.points, activity_id, epsilon=rdp_epsilon) - geojson_path = acts_dir / f"{activity_id}.geojson" + if pending: + geojson_path = acts_dir / f"{activity_id}.{tag}.pending.geojson" + else: + geojson_path = acts_dir / f"{activity_id}.geojson" geojson_path.write_text(json.dumps(geojson, indent=2, ensure_ascii=False)) return activity_id +def activity_quality(result: dict) -> int: + """Compute a quality score for an activity result from a worker. + + Higher is better. Used by the main process to pick the best version + when multiple workers produce results for the same activity ID. + """ + from bincio.extract.dedup import _SOURCE_QUALITY + + score = 0 + # Source type quality (FIT > GPX > TCX) + score += _SOURCE_QUALITY.get(result.get("source") or "", 0) * 100 + # Sensor channel count + score += result.get("sensor_channels", 0) * 10 + # Point count (more data = better) + score += min(result.get("point_count", 0), 50000) // 100 + return score + + +def finalize_pending(output_dir: Path, activity_id: str, source_hash: str) -> str: + """Promote a pending file to its final path via atomic rename. + + If another activity already occupies the ID (different source_hash), + the pending file is disambiguated with a hash suffix. + + Returns the final activity_id (may include suffix). + """ + acts_dir = output_dir / "activities" + tag = source_hash[-8:] if source_hash else "unknown" + + pending_json = acts_dir / f"{activity_id}.{tag}.pending.json" + pending_geojson = acts_dir / f"{activity_id}.{tag}.pending.geojson" + + final_id = activity_id + final_json = acts_dir / f"{final_id}.json" + + # Check for ID collision with a different activity + if final_json.exists(): + existing = json.loads(final_json.read_text(encoding="utf-8")) + if existing.get("source_hash") != source_hash: + final_id = f"{activity_id}-{source_hash[-6:]}" + final_json = acts_dir / f"{final_id}.json" + + # Update the ID inside the JSON if it changed + if final_id != activity_id and pending_json.exists(): + detail = json.loads(pending_json.read_text(encoding="utf-8")) + detail["id"] = final_id + pending_json.write_text(json.dumps(detail, indent=2, ensure_ascii=False)) + + # Atomic rename: pending → final + if pending_json.exists(): + pending_json.rename(final_json) + + final_geojson = acts_dir / f"{final_id}.geojson" + if pending_geojson.exists(): + # Update the ID in GeoJSON properties too + if final_id != activity_id: + geo = json.loads(pending_geojson.read_text(encoding="utf-8")) + geo["properties"]["id"] = final_id + pending_geojson.write_text(json.dumps(geo, indent=2, ensure_ascii=False)) + pending_geojson.rename(final_geojson) + + return final_id + + +def cleanup_pending(output_dir: Path, activity_id: str, source_hash: str) -> None: + """Remove pending files for a losing activity (the one not chosen as canonical).""" + acts_dir = output_dir / "activities" + tag = source_hash[-8:] if source_hash else "unknown" + for suffix in (".pending.json", ".pending.geojson"): + p = acts_dir / f"{activity_id}.{tag}{suffix}" + p.unlink(missing_ok=True) + + def build_summary( activity: ParsedActivity, metrics: ComputedMetrics, diff --git a/site/src/components/ActivityCharts.svelte b/site/src/components/ActivityCharts.svelte index 4a3f807..6f07c5d 100644 --- a/site/src/components/ActivityCharts.svelte +++ b/site/src/components/ActivityCharts.svelte @@ -1,6 +1,6 @@