second pass. low
This commit is contained in:
+65
-21
@@ -46,6 +46,9 @@ def _process_file(path: Path) -> dict:
|
||||
"""Runs inside a worker process. Only receives a Path (tiny pickle).
|
||||
All heavy shared data (_known_hashes, _strava_lookup, etc.) is already
|
||||
in the worker's memory from the initializer — zero per-task overhead.
|
||||
|
||||
Writes to pending files (not final paths) so the main process can
|
||||
arbitrate collisions and pick the best version.
|
||||
"""
|
||||
from bincio.extract.metrics import compute
|
||||
from bincio.extract.parsers.factory import parse_file
|
||||
@@ -80,11 +83,17 @@ def _process_file(path: Path) -> dict:
|
||||
activity, metrics, _output_dir,
|
||||
privacy=_privacy,
|
||||
rdp_epsilon=_rdp_epsilon,
|
||||
pending=True,
|
||||
)
|
||||
summary = build_summary(activity, metrics, activity_id, _privacy)
|
||||
except Exception as exc:
|
||||
return {"status": "error", "path": str(path), "error": str(exc)}
|
||||
|
||||
# Quality signals for the main process to compare competing results
|
||||
sensor_channels = sum(1 for v in [
|
||||
metrics.avg_hr_bpm, metrics.avg_power_w, metrics.avg_cadence_rpm,
|
||||
] if v is not None)
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"summary": summary,
|
||||
@@ -94,6 +103,8 @@ def _process_file(path: Path) -> dict:
|
||||
"distance_m": metrics.distance_m,
|
||||
"source": summary.get("source"),
|
||||
"mmp": metrics.mmp,
|
||||
"point_count": len(activity.points),
|
||||
"sensor_channels": sensor_channels,
|
||||
}
|
||||
|
||||
|
||||
@@ -177,6 +188,8 @@ def extract(
|
||||
summaries: list[dict] = []
|
||||
errors: list[tuple[str, str]] = []
|
||||
skipped = 0
|
||||
# Collect all pending results, grouped by activity_id for collision arbitration
|
||||
pending_by_id: dict[str, list[dict]] = {}
|
||||
|
||||
with Progress(
|
||||
TextColumn("[progress.description]{task.description}"),
|
||||
@@ -202,30 +215,61 @@ def extract(
|
||||
elif result["status"] == "error":
|
||||
errors.append((result["path"], result["error"]))
|
||||
else:
|
||||
# Near-duplicate check — must be sequential (stateful)
|
||||
from datetime import datetime
|
||||
started_at = datetime.fromisoformat(result["started_at"])
|
||||
near_id = dedup.find_near_duplicate(started_at, result["distance_m"])
|
||||
pending_by_id.setdefault(result["id"], []).append(result)
|
||||
|
||||
if near_id:
|
||||
canonical = dedup.pick_canonical(near_id, result.get("source"))
|
||||
if canonical != "__new__":
|
||||
_patch_duplicate_of(cfg.output_dir, result["id"], near_id)
|
||||
skipped += 1
|
||||
continue
|
||||
_patch_duplicate_of(cfg.output_dir, near_id, result["id"])
|
||||
dedup._records[near_id].duplicate_of = result["id"]
|
||||
# ── Arbitrate collisions and finalize pending files ───────────────────────
|
||||
from bincio.extract.writer import (
|
||||
activity_quality, cleanup_pending, finalize_pending, write_athlete_json, write_index,
|
||||
)
|
||||
|
||||
dedup.register(ActivityRecord(
|
||||
id=result["id"],
|
||||
source_hash=result["hash"],
|
||||
started_at=started_at,
|
||||
distance_m=result["distance_m"],
|
||||
source=result.get("source"),
|
||||
))
|
||||
summaries.append(result["summary"])
|
||||
for activity_id, candidates in pending_by_id.items():
|
||||
# Pick the best candidate by quality score
|
||||
candidates.sort(key=activity_quality, reverse=True)
|
||||
winner = candidates[0]
|
||||
|
||||
# Clean up losing candidates' pending files
|
||||
for loser in candidates[1:]:
|
||||
cleanup_pending(cfg.output_dir, activity_id, loser["hash"])
|
||||
skipped += 1
|
||||
|
||||
# Near-duplicate check against already-known activities
|
||||
from datetime import datetime
|
||||
started_at = datetime.fromisoformat(winner["started_at"])
|
||||
near_id = dedup.find_near_duplicate(started_at, winner["distance_m"])
|
||||
|
||||
if near_id:
|
||||
canonical = dedup.pick_canonical(near_id, winner.get("source"))
|
||||
if canonical != "__new__":
|
||||
# Existing is better — finalize winner as duplicate, then patch it
|
||||
final_id = finalize_pending(cfg.output_dir, activity_id, winner["hash"])
|
||||
_patch_duplicate_of(cfg.output_dir, final_id, near_id)
|
||||
skipped += 1
|
||||
continue
|
||||
# New is better — patch the existing one as duplicate
|
||||
final_id = finalize_pending(cfg.output_dir, activity_id, winner["hash"])
|
||||
_patch_duplicate_of(cfg.output_dir, near_id, final_id)
|
||||
dedup._records[near_id].duplicate_of = final_id
|
||||
else:
|
||||
final_id = finalize_pending(cfg.output_dir, activity_id, winner["hash"])
|
||||
|
||||
# Update summary with the finalized ID (may include hash suffix)
|
||||
summary = winner["summary"]
|
||||
if final_id != activity_id:
|
||||
summary = dict(summary)
|
||||
summary["id"] = final_id
|
||||
summary["detail_url"] = f"activities/{final_id}.json"
|
||||
if summary.get("track_url"):
|
||||
summary["track_url"] = f"activities/{final_id}.geojson"
|
||||
|
||||
dedup.register(ActivityRecord(
|
||||
id=final_id,
|
||||
source_hash=winner["hash"],
|
||||
started_at=started_at,
|
||||
distance_m=winner["distance_m"],
|
||||
source=winner.get("source"),
|
||||
))
|
||||
summaries.append(summary)
|
||||
|
||||
from bincio.extract.writer import write_athlete_json, write_index
|
||||
existing = _load_existing_summaries(cfg.output_dir)
|
||||
merged = {s["id"]: s for s in existing}
|
||||
for s in summaries:
|
||||
|
||||
@@ -76,6 +76,8 @@ def _apply_extensions(pt: gpxpy.gpx.GPXTrackPoint, dp: DataPoint) -> None:
|
||||
dp.temperature_c = float(val)
|
||||
elif tag == "speed":
|
||||
dp.speed_kmh = float(val) * 3.6 # m/s → km/h
|
||||
elif tag in ("pwr", "power", "watts"):
|
||||
dp.power_w = int(float(val))
|
||||
|
||||
|
||||
def _strip_ns(tag: str) -> str:
|
||||
|
||||
@@ -97,8 +97,8 @@ def _parse_ts(s: str) -> datetime:
|
||||
return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
# Numeric offset like +02:00 or -05:30 — parse with %z then convert to UTC
|
||||
m = _re.match(r"^(.+)([+-]\d{2}:\d{2})$", s)
|
||||
# Numeric offset like +02:00, -05:30, or +0200 — parse with %z then convert to UTC
|
||||
m = _re.match(r"^(.+)([+-]\d{2}:?\d{2})$", s)
|
||||
if m:
|
||||
body, off = m.group(1), m.group(2).replace(":", "")
|
||||
for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"):
|
||||
|
||||
@@ -67,8 +67,12 @@ def build_geojson(
|
||||
if p.lon is not None and p.lat is not None
|
||||
]
|
||||
|
||||
# Parallel speed array for gradient coloring
|
||||
speeds = [round(p.speed_kmh, 2) if p.speed_kmh is not None else None for p in simplified]
|
||||
# Parallel speed array for gradient coloring — same filter as coordinates
|
||||
speeds = [
|
||||
round(p.speed_kmh, 2) if p.speed_kmh is not None else None
|
||||
for p in simplified
|
||||
if p.lon is not None and p.lat is not None
|
||||
]
|
||||
|
||||
return {
|
||||
"type": "Feature",
|
||||
|
||||
+101
-11
@@ -33,8 +33,16 @@ def write_activity(
|
||||
privacy: str = "public",
|
||||
duplicate_of: str | None = None,
|
||||
rdp_epsilon: float = 0.0001,
|
||||
pending: bool = False,
|
||||
) -> str:
|
||||
"""Write {id}.json and (if GPS available) {id}.geojson. Returns the ID."""
|
||||
"""Write {id}.json and (if GPS available) {id}.geojson. Returns the ID.
|
||||
|
||||
When pending=True, writes to a uniquely-named pending file
|
||||
({id}.{hash[:8]}.pending.json) instead of the final path. This avoids
|
||||
race conditions when multiple workers process activities with the same ID.
|
||||
The main process is responsible for promoting pending files to final paths
|
||||
via finalize_pending().
|
||||
"""
|
||||
activity_id = make_activity_id(activity)
|
||||
acts_dir = output_dir / "activities"
|
||||
acts_dir.mkdir(parents=True, exist_ok=True)
|
||||
@@ -82,26 +90,108 @@ def write_activity(
|
||||
"custom": {},
|
||||
}
|
||||
|
||||
json_path = acts_dir / f"{activity_id}.json"
|
||||
# Collision guard: if a *different* activity already has this ID, append a
|
||||
# short hash suffix to disambiguate (same hash = idempotent re-extract).
|
||||
if json_path.exists():
|
||||
existing = json.loads(json_path.read_text(encoding="utf-8"))
|
||||
if existing.get("source_hash") != activity.source_hash:
|
||||
activity_id = f"{activity_id}-{activity.source_hash[-6:]}"
|
||||
json_path = acts_dir / f"{activity_id}.json"
|
||||
detail["id"] = activity_id
|
||||
if pending:
|
||||
# Write to a unique pending file — no collision possible
|
||||
tag = activity.source_hash[-8:] if activity.source_hash else "unknown"
|
||||
json_path = acts_dir / f"{activity_id}.{tag}.pending.json"
|
||||
else:
|
||||
json_path = acts_dir / f"{activity_id}.json"
|
||||
# Legacy non-pending path: collision guard for callers that don't use
|
||||
# the pending workflow (e.g. edit server upload_activity)
|
||||
if json_path.exists():
|
||||
existing = json.loads(json_path.read_text(encoding="utf-8"))
|
||||
if existing.get("source_hash") != activity.source_hash:
|
||||
activity_id = f"{activity_id}-{activity.source_hash[-6:]}"
|
||||
json_path = acts_dir / f"{activity_id}.json"
|
||||
detail["id"] = activity_id
|
||||
|
||||
json_path.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
|
||||
|
||||
# ── GeoJSON track ────────────────────────────────────────────────────────
|
||||
if has_gps:
|
||||
geojson = build_geojson(activity.points, activity_id, epsilon=rdp_epsilon)
|
||||
geojson_path = acts_dir / f"{activity_id}.geojson"
|
||||
if pending:
|
||||
geojson_path = acts_dir / f"{activity_id}.{tag}.pending.geojson"
|
||||
else:
|
||||
geojson_path = acts_dir / f"{activity_id}.geojson"
|
||||
geojson_path.write_text(json.dumps(geojson, indent=2, ensure_ascii=False))
|
||||
|
||||
return activity_id
|
||||
|
||||
|
||||
def activity_quality(result: dict) -> int:
|
||||
"""Compute a quality score for an activity result from a worker.
|
||||
|
||||
Higher is better. Used by the main process to pick the best version
|
||||
when multiple workers produce results for the same activity ID.
|
||||
"""
|
||||
from bincio.extract.dedup import _SOURCE_QUALITY
|
||||
|
||||
score = 0
|
||||
# Source type quality (FIT > GPX > TCX)
|
||||
score += _SOURCE_QUALITY.get(result.get("source") or "", 0) * 100
|
||||
# Sensor channel count
|
||||
score += result.get("sensor_channels", 0) * 10
|
||||
# Point count (more data = better)
|
||||
score += min(result.get("point_count", 0), 50000) // 100
|
||||
return score
|
||||
|
||||
|
||||
def finalize_pending(output_dir: Path, activity_id: str, source_hash: str) -> str:
|
||||
"""Promote a pending file to its final path via atomic rename.
|
||||
|
||||
If another activity already occupies the ID (different source_hash),
|
||||
the pending file is disambiguated with a hash suffix.
|
||||
|
||||
Returns the final activity_id (may include suffix).
|
||||
"""
|
||||
acts_dir = output_dir / "activities"
|
||||
tag = source_hash[-8:] if source_hash else "unknown"
|
||||
|
||||
pending_json = acts_dir / f"{activity_id}.{tag}.pending.json"
|
||||
pending_geojson = acts_dir / f"{activity_id}.{tag}.pending.geojson"
|
||||
|
||||
final_id = activity_id
|
||||
final_json = acts_dir / f"{final_id}.json"
|
||||
|
||||
# Check for ID collision with a different activity
|
||||
if final_json.exists():
|
||||
existing = json.loads(final_json.read_text(encoding="utf-8"))
|
||||
if existing.get("source_hash") != source_hash:
|
||||
final_id = f"{activity_id}-{source_hash[-6:]}"
|
||||
final_json = acts_dir / f"{final_id}.json"
|
||||
|
||||
# Update the ID inside the JSON if it changed
|
||||
if final_id != activity_id and pending_json.exists():
|
||||
detail = json.loads(pending_json.read_text(encoding="utf-8"))
|
||||
detail["id"] = final_id
|
||||
pending_json.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
|
||||
|
||||
# Atomic rename: pending → final
|
||||
if pending_json.exists():
|
||||
pending_json.rename(final_json)
|
||||
|
||||
final_geojson = acts_dir / f"{final_id}.geojson"
|
||||
if pending_geojson.exists():
|
||||
# Update the ID in GeoJSON properties too
|
||||
if final_id != activity_id:
|
||||
geo = json.loads(pending_geojson.read_text(encoding="utf-8"))
|
||||
geo["properties"]["id"] = final_id
|
||||
pending_geojson.write_text(json.dumps(geo, indent=2, ensure_ascii=False))
|
||||
pending_geojson.rename(final_geojson)
|
||||
|
||||
return final_id
|
||||
|
||||
|
||||
def cleanup_pending(output_dir: Path, activity_id: str, source_hash: str) -> None:
|
||||
"""Remove pending files for a losing activity (the one not chosen as canonical)."""
|
||||
acts_dir = output_dir / "activities"
|
||||
tag = source_hash[-8:] if source_hash else "unknown"
|
||||
for suffix in (".pending.json", ".pending.geojson"):
|
||||
p = acts_dir / f"{activity_id}.{tag}{suffix}"
|
||||
p.unlink(missing_ok=True)
|
||||
|
||||
|
||||
def build_summary(
|
||||
activity: ParsedActivity,
|
||||
metrics: ComputedMetrics,
|
||||
|
||||
Reference in New Issue
Block a user