parallelizing extraction, fix tcx files
This commit is contained in:
+149
-122
@@ -1,6 +1,7 @@
|
||||
"""bincio extract — CLI command."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
@@ -12,15 +13,88 @@ from rich.progress import BarColumn, MofNCompleteColumn, Progress, TextColumn, T
|
||||
|
||||
from bincio.extract.config import ExtractConfig, default_config, load_config
|
||||
from bincio.extract.dedup import ActivityRecord, DedupIndex
|
||||
from bincio.extract.metrics import compute
|
||||
from bincio.extract.models import ParsedActivity
|
||||
from bincio.extract.parsers.factory import is_supported, parse_file
|
||||
from bincio.extract.strava_csv import StravaMetadata
|
||||
from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index
|
||||
from bincio.extract.parsers.factory import is_supported
|
||||
|
||||
console = Console()
|
||||
|
||||
|
||||
# ── per-worker state (set once via initializer, never re-pickled) ─────────────
|
||||
|
||||
_known_hashes: frozenset = frozenset()
|
||||
_strava_lookup: dict = {}
|
||||
_output_dir: Path = Path(".")
|
||||
_privacy: str = "public"
|
||||
_rdp_epsilon: float = 0.0001
|
||||
|
||||
|
||||
def _worker_init(
|
||||
known_hashes: frozenset,
|
||||
strava_lookup: dict,
|
||||
output_dir: Path,
|
||||
privacy: str,
|
||||
rdp_epsilon: float,
|
||||
) -> None:
|
||||
global _known_hashes, _strava_lookup, _output_dir, _privacy, _rdp_epsilon
|
||||
_known_hashes = known_hashes
|
||||
_strava_lookup = strava_lookup
|
||||
_output_dir = output_dir
|
||||
_privacy = privacy
|
||||
_rdp_epsilon = rdp_epsilon
|
||||
|
||||
|
||||
def _process_file(path: Path) -> dict:
|
||||
"""Runs inside a worker process. Only receives a Path (tiny pickle).
|
||||
All heavy shared data (_known_hashes, _strava_lookup, etc.) is already
|
||||
in the worker's memory from the initializer — zero per-task overhead.
|
||||
"""
|
||||
from bincio.extract.metrics import compute
|
||||
from bincio.extract.parsers.factory import parse_file
|
||||
from bincio.extract.writer import build_summary, make_activity_id, write_activity
|
||||
|
||||
try:
|
||||
activity = parse_file(path)
|
||||
except Exception as exc:
|
||||
return {"status": "error", "path": str(path), "error": str(exc)}
|
||||
|
||||
# Exact-duplicate check (free — just a set lookup)
|
||||
if activity.source_hash in _known_hashes:
|
||||
return {"status": "duplicate"}
|
||||
|
||||
# Enrich from Strava CSV
|
||||
row = _strava_lookup.get(activity.source_file)
|
||||
if row:
|
||||
if not activity.title:
|
||||
activity.title = row.get("Activity Name", "").strip() or None
|
||||
if not activity.description:
|
||||
activity.description = row.get("Activity Description", "").strip() or None
|
||||
if not activity.strava_id:
|
||||
activity.strava_id = row.get("Activity ID", "").strip() or None
|
||||
|
||||
try:
|
||||
metrics = compute(activity)
|
||||
activity_id = make_activity_id(activity)
|
||||
write_activity(
|
||||
activity, metrics, _output_dir,
|
||||
privacy=_privacy,
|
||||
rdp_epsilon=_rdp_epsilon,
|
||||
)
|
||||
summary = build_summary(activity, metrics, activity_id, _privacy)
|
||||
except Exception as exc:
|
||||
return {"status": "error", "path": str(path), "error": str(exc)}
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"summary": summary,
|
||||
"id": activity_id,
|
||||
"hash": activity.source_hash,
|
||||
"started_at": activity.started_at.isoformat(),
|
||||
"distance_m": metrics.distance_m,
|
||||
"source": summary.get("source"),
|
||||
}
|
||||
|
||||
|
||||
# ── CLI ────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@click.command()
|
||||
@click.option("--config", "config_path", type=click.Path(exists=True), default=None,
|
||||
help="Path to extract_config.yaml (default: ./extract_config.yaml).")
|
||||
@@ -32,49 +106,48 @@ console = Console()
|
||||
help="Process a single file and print JSON to stdout.")
|
||||
@click.option("--since", default=None, metavar="YYYY-MM-DD",
|
||||
help="Only process files modified after this date.")
|
||||
@click.option("--workers", default=4, show_default=True,
|
||||
help="Number of parallel worker processes.")
|
||||
@click.option("--workers", default=None, type=int,
|
||||
help="Parallel worker processes (default: CPU count).")
|
||||
def extract(
|
||||
config_path: Optional[str],
|
||||
input_dir: Optional[str],
|
||||
output_dir: Optional[str],
|
||||
single_file: Optional[str],
|
||||
since: Optional[str],
|
||||
workers: int,
|
||||
workers: Optional[int],
|
||||
) -> None:
|
||||
"""Parse GPX/FIT/TCX files and write BAS JSON data store."""
|
||||
|
||||
# ── single file mode ─────────────────────────────────────────────────────
|
||||
if single_file:
|
||||
_process_single(Path(single_file))
|
||||
return
|
||||
|
||||
# ── load config ──────────────────────────────────────────────────────────
|
||||
cfg = _resolve_config(config_path, input_dir, output_dir)
|
||||
cfg.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# ── gather files ─────────────────────────────────────────────────────────
|
||||
files = _collect_files(cfg, since)
|
||||
if not files:
|
||||
console.print("[yellow]No supported files found.[/yellow]")
|
||||
return
|
||||
console.print(f"Found [bold]{len(files)}[/bold] activity files.")
|
||||
|
||||
# ── Strava metadata ──────────────────────────────────────────────────────
|
||||
strava_meta: Optional[StravaMetadata] = None
|
||||
# Build strava lookup once (serialised dict, sent to workers via initializer)
|
||||
strava_lookup: dict = {}
|
||||
if cfg.metadata_csv and cfg.metadata_csv.exists():
|
||||
strava_meta = StravaMetadata(cfg.metadata_csv)
|
||||
from bincio.extract.strava_csv import StravaMetadata
|
||||
strava_lookup = StravaMetadata(cfg.metadata_csv)._by_filename
|
||||
console.print(f"Loaded Strava metadata from [cyan]{cfg.metadata_csv.name}[/cyan].")
|
||||
|
||||
# ── dedup index ──────────────────────────────────────────────────────────
|
||||
dedup = DedupIndex(output_dir=cfg.output_dir)
|
||||
known_hashes: frozenset = frozenset(dedup._by_hash.keys())
|
||||
|
||||
# ── process ──────────────────────────────────────────────────────────────
|
||||
summaries: list[dict] = []
|
||||
errors: list[tuple[Path, str]] = []
|
||||
skipped = 0
|
||||
n_workers = workers or os.cpu_count() or 4
|
||||
console.print(f"Using [bold]{n_workers}[/bold] worker processes.")
|
||||
|
||||
owner = {"handle": cfg.owner_handle, "display_name": cfg.owner_display_name}
|
||||
summaries: list[dict] = []
|
||||
errors: list[tuple[str, str]] = []
|
||||
skipped = 0
|
||||
|
||||
with Progress(
|
||||
TextColumn("[progress.description]{task.description}"),
|
||||
@@ -85,80 +158,52 @@ def extract(
|
||||
) as progress:
|
||||
task = progress.add_task("Processing...", total=len(files))
|
||||
|
||||
with ProcessPoolExecutor(max_workers=workers) as pool:
|
||||
futures = {pool.submit(_parse_worker, f): f for f in files}
|
||||
with ProcessPoolExecutor(
|
||||
max_workers=n_workers,
|
||||
initializer=_worker_init,
|
||||
initargs=(known_hashes, strava_lookup, cfg.output_dir, cfg.default_privacy, cfg.track.rdp_epsilon),
|
||||
) as pool:
|
||||
futures = {pool.submit(_process_file, f): f for f in files}
|
||||
for future in as_completed(futures):
|
||||
path = futures[future]
|
||||
progress.advance(task)
|
||||
try:
|
||||
activity = future.result()
|
||||
except Exception as exc:
|
||||
errors.append((path, str(exc)))
|
||||
continue
|
||||
result = future.result()
|
||||
|
||||
# ── incremental skip ──────────────────────────────────────
|
||||
if cfg.incremental:
|
||||
existing_id = dedup.is_exact_duplicate(activity.source_hash)
|
||||
if existing_id:
|
||||
skipped += 1
|
||||
continue
|
||||
if result["status"] == "duplicate":
|
||||
skipped += 1
|
||||
elif result["status"] == "error":
|
||||
errors.append((result["path"], result["error"]))
|
||||
else:
|
||||
# Near-duplicate check — must be sequential (stateful)
|
||||
from datetime import datetime
|
||||
started_at = datetime.fromisoformat(result["started_at"])
|
||||
near_id = dedup.find_near_duplicate(started_at, result["distance_m"])
|
||||
|
||||
# ── enrich from Strava CSV ────────────────────────────────
|
||||
if strava_meta:
|
||||
strava_meta.enrich(activity.source_file, activity)
|
||||
if near_id:
|
||||
canonical = dedup.pick_canonical(near_id, result.get("source"))
|
||||
if canonical != "__new__":
|
||||
_patch_duplicate_of(cfg.output_dir, result["id"], near_id)
|
||||
skipped += 1
|
||||
continue
|
||||
_patch_duplicate_of(cfg.output_dir, near_id, result["id"])
|
||||
dedup._records[near_id].duplicate_of = result["id"]
|
||||
|
||||
# ── compute metrics ───────────────────────────────────────
|
||||
metrics = compute(activity)
|
||||
dedup.register(ActivityRecord(
|
||||
id=result["id"],
|
||||
source_hash=result["hash"],
|
||||
started_at=started_at,
|
||||
distance_m=result["distance_m"],
|
||||
source=result.get("source"),
|
||||
))
|
||||
summaries.append(result["summary"])
|
||||
|
||||
# ── deduplication ─────────────────────────────────────────
|
||||
activity_id = make_activity_id(activity)
|
||||
duplicate_of: Optional[str] = None
|
||||
|
||||
near_dup_id = dedup.find_near_duplicate(
|
||||
activity.started_at, metrics.distance_m
|
||||
)
|
||||
if near_dup_id:
|
||||
source = _infer_source(activity)
|
||||
canonical = dedup.pick_canonical(near_dup_id, source)
|
||||
if canonical == "__new__":
|
||||
# New one is better — mark existing as duplicate
|
||||
existing = dedup._records[near_dup_id]
|
||||
existing.duplicate_of = activity_id
|
||||
else:
|
||||
duplicate_of = near_dup_id
|
||||
|
||||
# ── write files ───────────────────────────────────────────
|
||||
written_id = write_activity(
|
||||
activity, metrics, cfg.output_dir,
|
||||
privacy=cfg.default_privacy,
|
||||
duplicate_of=duplicate_of,
|
||||
rdp_epsilon=cfg.track.rdp_epsilon,
|
||||
)
|
||||
|
||||
# Register in dedup index
|
||||
dedup.register(ActivityRecord(
|
||||
id=written_id,
|
||||
source_hash=activity.source_hash,
|
||||
started_at=activity.started_at,
|
||||
distance_m=metrics.distance_m,
|
||||
source=_infer_source(activity),
|
||||
))
|
||||
|
||||
if duplicate_of is None:
|
||||
summaries.append(
|
||||
build_summary(activity, metrics, written_id, cfg.default_privacy)
|
||||
)
|
||||
|
||||
# ── write index.json ──────────────────────────────────────────────────────
|
||||
# Merge with any existing summaries from previous incremental runs
|
||||
existing_index = _load_existing_summaries(cfg.output_dir)
|
||||
all_summaries = {s["id"]: s for s in existing_index}
|
||||
from bincio.extract.writer import write_index
|
||||
existing = _load_existing_summaries(cfg.output_dir)
|
||||
merged = {s["id"]: s for s in existing}
|
||||
for s in summaries:
|
||||
all_summaries[s["id"]] = s
|
||||
write_index(list(all_summaries.values()), cfg.output_dir, owner)
|
||||
merged[s["id"]] = s
|
||||
write_index(list(merged.values()), cfg.output_dir, owner)
|
||||
dedup.save()
|
||||
|
||||
# ── summary ───────────────────────────────────────────────────────────────
|
||||
console.print(
|
||||
f"\n[green]Done.[/green] "
|
||||
f"Processed [bold]{len(summaries)}[/bold] activities, "
|
||||
@@ -168,28 +213,22 @@ def extract(
|
||||
if errors:
|
||||
console.print("\n[red]Errors:[/red]")
|
||||
for path, msg in errors[:20]:
|
||||
console.print(f" {path.name}: {msg}")
|
||||
console.print(f" {Path(path).name}: {msg}")
|
||||
if len(errors) > 20:
|
||||
console.print(f" ... and {len(errors) - 20} more.")
|
||||
|
||||
|
||||
# ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _parse_worker(path: Path) -> ParsedActivity:
|
||||
"""Run in worker process — imports are isolated."""
|
||||
from bincio.extract.parsers.factory import parse_file
|
||||
return parse_file(path)
|
||||
|
||||
|
||||
def _process_single(path: Path) -> None:
|
||||
from bincio.extract.metrics import compute
|
||||
from bincio.extract.parsers.factory import parse_file
|
||||
from bincio.extract.writer import build_summary, make_activity_id
|
||||
try:
|
||||
activity = parse_file(path)
|
||||
metrics = compute(activity)
|
||||
activity_id = make_activity_id(activity)
|
||||
from bincio.extract.writer import build_summary
|
||||
result = build_summary(activity, metrics, activity_id)
|
||||
click.echo(json.dumps(result, indent=2))
|
||||
click.echo(json.dumps(build_summary(activity, metrics, activity_id), indent=2))
|
||||
except Exception as exc:
|
||||
console.print(f"[red]Error:[/red] {exc}")
|
||||
sys.exit(1)
|
||||
@@ -221,51 +260,39 @@ def _resolve_config(
|
||||
|
||||
|
||||
def _collect_files(cfg: ExtractConfig, since: Optional[str]) -> list[Path]:
|
||||
from bincio.extract.parsers.factory import is_supported
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
since_ts: Optional[float] = None
|
||||
if since:
|
||||
since_ts = datetime.strptime(since, "%Y-%m-%d").timestamp()
|
||||
|
||||
files = []
|
||||
for d in cfg.input_dirs:
|
||||
if not d.exists():
|
||||
console.print(f"[yellow]Warning:[/yellow] input dir not found: {d}")
|
||||
continue
|
||||
for path in d.rglob("*"):
|
||||
if not path.is_file():
|
||||
continue
|
||||
if not is_supported(path):
|
||||
continue
|
||||
if since_ts and path.stat().st_mtime < since_ts:
|
||||
continue
|
||||
files.append(path)
|
||||
if path.is_file() and is_supported(path):
|
||||
if not since_ts or path.stat().st_mtime >= since_ts:
|
||||
files.append(path)
|
||||
return files
|
||||
|
||||
|
||||
def _load_existing_summaries(output_dir: Path) -> list[dict]:
|
||||
index_path = output_dir / "index.json"
|
||||
if not index_path.exists():
|
||||
p = output_dir / "index.json"
|
||||
if not p.exists():
|
||||
return []
|
||||
try:
|
||||
data = json.loads(index_path.read_text())
|
||||
return data.get("activities", [])
|
||||
return json.loads(p.read_text()).get("activities", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _infer_source(activity: ParsedActivity) -> Optional[str]:
|
||||
if activity.strava_id:
|
||||
return "strava_export"
|
||||
name = activity.source_file.lower()
|
||||
if "activity" in name and len(name.split(".")) >= 3:
|
||||
return "karoo"
|
||||
if name.endswith((".fit", ".fit.gz")):
|
||||
return "fit_file"
|
||||
if name.endswith((".gpx", ".gpx.gz")):
|
||||
return "gpx_file"
|
||||
if name.endswith((".tcx", ".tcx.gz")):
|
||||
return "tcx_file"
|
||||
return None
|
||||
def _patch_duplicate_of(output_dir: Path, activity_id: str, canonical_id: str) -> None:
|
||||
p = output_dir / "activities" / f"{activity_id}.json"
|
||||
if not p.exists():
|
||||
return
|
||||
try:
|
||||
data = json.loads(p.read_text())
|
||||
data["duplicate_of"] = canonical_id
|
||||
p.write_text(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user