27f6d141f7
Replaced 28 bare `except Exception` catches across 8 files with specific exception types reflecting the actual failure modes: - JSON file reads → (OSError, json.JSONDecodeError) - datetime parsing → ValueError - base64 decoding → ValueError - YAML parsing → (OSError, yaml.YAMLError); import moved above try - GeoJSON coord extraction → (TypeError, IndexError, AttributeError) - Startup temp-file cleanup → OSError - Single JSON line parsing (SSE batch) → json.JSONDecodeError Kept broad catches only where intentional: - Background thread top-level guards (tasks.py, admin.py) with log.exception - SSE stream generator tops (strava.py, garmin.py, uploads.py) - Per-item batch loops that must not abort the whole operation - Explicitly non-fatal post-upload merge steps with log.warning
638 lines
24 KiB
Python
638 lines
24 KiB
Python
"""Admin endpoints (/api/admin/*)."""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from fastapi import APIRouter, Cookie, HTTPException, Request
|
||
from fastapi.responses import JSONResponse, StreamingResponse
|
||
|
||
from bincio.serve import deps, tasks
|
||
from bincio.serve.models import ResetPasswordCodeResponse
|
||
from bincio.serve.db import (
|
||
User,
|
||
get_user,
|
||
list_users,
|
||
)
|
||
|
||
log = logging.getLogger("bincio.serve")
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
def _wipe_user_activities(user_dir: Path) -> int:
|
||
"""Delete all extracted activity files and caches for a user.
|
||
|
||
Removes activities/ (JSON + GeoJSON + timeseries), edits/, originals/,
|
||
_merged/, index.json, athlete.json, and the dedup cache.
|
||
Leaves the user directory itself intact (account remains in the DB).
|
||
Returns the number of files deleted.
|
||
"""
|
||
import shutil
|
||
deleted = 0
|
||
|
||
for subdir in ("activities", "edits", "originals"):
|
||
d = user_dir / subdir
|
||
if d.exists():
|
||
for f in d.rglob("*"):
|
||
if f.is_file():
|
||
deleted += 1
|
||
shutil.rmtree(d)
|
||
|
||
for name in ("_merged", ):
|
||
d = user_dir / name
|
||
if d.exists():
|
||
shutil.rmtree(d)
|
||
|
||
for name in ("index.json", "athlete.json", ".bincio_cache.json"):
|
||
f = user_dir / name
|
||
if f.exists():
|
||
f.unlink()
|
||
deleted += 1
|
||
|
||
return deleted
|
||
|
||
|
||
@router.get("/api/admin/users")
|
||
async def admin_users(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
|
||
deps._require_admin(bincio_session)
|
||
users = list_users(deps._get_db())
|
||
return JSONResponse([{
|
||
"handle": u.handle,
|
||
"display_name": u.display_name,
|
||
"is_admin": u.is_admin,
|
||
"suspended": u.suspended,
|
||
"created_at": u.created_at,
|
||
} for u in users])
|
||
|
||
|
||
@router.get("/api/admin/jobs")
|
||
async def admin_jobs(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
|
||
"""Return currently active upload/processing jobs. Admin only."""
|
||
deps._require_admin(bincio_session)
|
||
with tasks._jobs_lock:
|
||
jobs = list(tasks._active_jobs.values())
|
||
return JSONResponse(jobs)
|
||
|
||
|
||
@router.get("/api/admin/disk")
|
||
async def admin_disk(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
|
||
"""Per-user disk usage breakdown. Admin only."""
|
||
deps._require_admin(bincio_session)
|
||
import shutil
|
||
|
||
data_dir = deps._get_data_dir()
|
||
|
||
def _mb(path: Path) -> float:
|
||
if not path.exists():
|
||
return 0.0
|
||
# Use lstat to count symlink entries (few bytes each) rather than following
|
||
# the link to the target — prevents _merged/ from double-counting activities/.
|
||
total = sum(f.lstat().st_size for f in path.rglob("*") if f.is_file() or f.is_symlink())
|
||
return round(total / 1_048_576, 1)
|
||
|
||
def _count(path: Path, pattern: str = "*") -> int:
|
||
if not path.exists():
|
||
return 0
|
||
return sum(1 for f in path.glob(pattern) if f.is_file())
|
||
|
||
db = deps._get_db()
|
||
from bincio.serve.db import get_user as _get_user
|
||
users = []
|
||
for user_dir in sorted(data_dir.iterdir()):
|
||
if not user_dir.is_dir() or user_dir.name.startswith("_"):
|
||
continue
|
||
# leaked tmp zips
|
||
leaked = [f for f in user_dir.glob("tmp*.zip") if f.is_file()]
|
||
db_user = _get_user(db, user_dir.name)
|
||
users.append({
|
||
"handle": user_dir.name,
|
||
"in_db": db_user is not None,
|
||
"suspended": db_user.suspended if db_user else False,
|
||
"total_mb": _mb(user_dir),
|
||
"activities_mb": _mb(user_dir / "activities"),
|
||
"activities_count": _count(user_dir / "activities", "*.json"),
|
||
"merged_mb": _mb(user_dir / "_merged"),
|
||
"originals_mb": _mb(user_dir / "originals"),
|
||
"originals_strava_mb": _mb(user_dir / "originals" / "strava"),
|
||
"images_mb": _mb(user_dir / "edits" / "images"),
|
||
"leaked_zips_mb": round(sum(f.stat().st_size for f in leaked) / 1_048_576, 1),
|
||
"leaked_zips_count": len(leaked),
|
||
})
|
||
|
||
disk = shutil.disk_usage("/")
|
||
return JSONResponse({
|
||
"disk": {
|
||
"total_gb": round(disk.total / 1_073_741_824, 1),
|
||
"used_gb": round(disk.used / 1_073_741_824, 1),
|
||
"free_gb": round(disk.free / 1_073_741_824, 1),
|
||
"percent": round(disk.used / disk.total * 100, 1),
|
||
},
|
||
"users": users,
|
||
})
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/reset-password-code", response_model=ResetPasswordCodeResponse)
|
||
async def admin_reset_password_code(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Generate a one-time password reset code for a user. Admin only."""
|
||
from bincio.serve.db import create_reset_code
|
||
admin = deps._require_admin(bincio_session)
|
||
db = deps._get_db()
|
||
if not get_user(db, handle):
|
||
raise HTTPException(404, f"User '{handle}' not found")
|
||
code = create_reset_code(db, handle, admin.handle)
|
||
return JSONResponse({"ok": True, "code": code, "expires_in_hours": 24})
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/suspend")
|
||
async def admin_suspend(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Suspend a user account. Blocks login and invalidates existing sessions. Admin only."""
|
||
from bincio.serve.db import set_suspended, purge_expired_sessions
|
||
admin = deps._require_admin(bincio_session)
|
||
if handle == admin.handle:
|
||
raise HTTPException(400, "Cannot suspend yourself")
|
||
db = deps._get_db()
|
||
if not get_user(db, handle):
|
||
raise HTTPException(404, "User not found")
|
||
set_suspended(db, handle, True)
|
||
db.execute("DELETE FROM sessions WHERE handle = ?", (handle,))
|
||
db.commit()
|
||
return JSONResponse({"status": "suspended", "handle": handle})
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/unsuspend")
|
||
async def admin_unsuspend(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Re-enable a suspended user account. Admin only."""
|
||
from bincio.serve.db import set_suspended
|
||
deps._require_admin(bincio_session)
|
||
db = deps._get_db()
|
||
if not get_user(db, handle):
|
||
raise HTTPException(404, "User not found")
|
||
set_suspended(db, handle, False)
|
||
return JSONResponse({"status": "unsuspended", "handle": handle})
|
||
|
||
|
||
@router.delete("/api/admin/users/{handle}/account")
|
||
async def admin_delete_account(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Delete a user account from the database. Data directory is NOT removed. Admin only."""
|
||
from bincio.serve.db import delete_user as _delete_user
|
||
admin = deps._require_admin(bincio_session)
|
||
if handle == admin.handle:
|
||
raise HTTPException(400, "Cannot delete your own account")
|
||
db = deps._get_db()
|
||
if not get_user(db, handle):
|
||
raise HTTPException(404, "User not found")
|
||
_delete_user(db, handle)
|
||
return JSONResponse({"status": "deleted", "handle": handle})
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/rebuild")
|
||
async def admin_rebuild(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Trigger a merge_all + site rebuild for a user. Admin only."""
|
||
deps._require_admin(bincio_session)
|
||
user_dir = deps._get_data_dir() / handle
|
||
if not user_dir.is_dir():
|
||
raise HTTPException(404, f"No data directory for user '{handle}'")
|
||
tasks._trigger_rebuild(handle)
|
||
return JSONResponse({"ok": True})
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/rebuild-sync")
|
||
async def admin_rebuild_sync(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Run merge+rebuild synchronously and return full output. Admin only.
|
||
|
||
Unlike /rebuild (fire-and-forget), this blocks until done and returns stdout/stderr.
|
||
Use for debugging when you need to see what went wrong.
|
||
"""
|
||
deps._require_admin(bincio_session)
|
||
user_dir = deps._get_data_dir() / handle
|
||
if not user_dir.is_dir():
|
||
raise HTTPException(404, f"No data directory for user '{handle}'")
|
||
if deps.site_dir is None:
|
||
raise HTTPException(503, "Server has no --site-dir configured; rebuild not available")
|
||
|
||
uv = shutil.which("uv") or str(Path.home() / ".local" / "bin" / "uv")
|
||
cmd = [uv, "run", "bincio", "render",
|
||
"--data-dir", str(deps.data_dir),
|
||
"--site-dir", str(deps.site_dir),
|
||
"--handle", handle,
|
||
"--no-build"]
|
||
if deps.webroot:
|
||
cmd = [uv, "run", "bincio", "render",
|
||
"--data-dir", str(deps.data_dir),
|
||
"--site-dir", str(deps.site_dir),
|
||
"--handle", handle]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||
resp: dict[str, Any] = {
|
||
"ok": result.returncode == 0,
|
||
"returncode": result.returncode,
|
||
"stdout": result.stdout,
|
||
"stderr": result.stderr,
|
||
}
|
||
|
||
if result.returncode == 0 and deps.webroot:
|
||
dist_data = deps.site_dir / "dist" / "data"
|
||
if dist_data.exists():
|
||
shutil.rmtree(dist_data)
|
||
rsync = subprocess.run(
|
||
["rsync", "-a", "--delete", "--exclude=data/",
|
||
f"{deps.site_dir}/dist/", str(deps.webroot) + "/"],
|
||
capture_output=True, text=True, timeout=120,
|
||
)
|
||
resp["rsync_returncode"] = rsync.returncode
|
||
resp["rsync_stdout"] = rsync.stdout
|
||
resp["rsync_stderr"] = rsync.stderr
|
||
resp["ok"] = rsync.returncode == 0
|
||
|
||
return JSONResponse(resp)
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/reextract-originals")
|
||
async def admin_reextract_originals(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> StreamingResponse:
|
||
"""Re-extract activities from stored Strava originals without hitting the API.
|
||
|
||
Spawns `bincio reextract-originals` as a subprocess so heavy memory use
|
||
is isolated from the server process. Streams its JSON-lines output as SSE.
|
||
Triggers a full rebuild on completion.
|
||
"""
|
||
import asyncio
|
||
deps._require_admin(bincio_session)
|
||
user_dir = deps._get_data_dir() / handle
|
||
originals_dir = user_dir / "originals" / "strava"
|
||
if not originals_dir.exists():
|
||
raise HTTPException(404, f"No Strava originals directory for '{handle}'")
|
||
|
||
# Use the bincio script from the same venv bin dir as the running Python.
|
||
# This is reliable in systemd environments where PATH may not include uv.
|
||
import sys as _sys
|
||
bincio_exe = str(Path(_sys.executable).parent / "bincio")
|
||
data_dir = str(deps._get_data_dir())
|
||
|
||
# Count originals so we can split into memory-safe batches.
|
||
total_originals = len(list(originals_dir.glob("*.json")))
|
||
# Each activity can briefly peak at ~10–30 MB; 100 per batch keeps RSS
|
||
# well under 3 GB even on a cheap VPS.
|
||
_BATCH = 100
|
||
log.info("reextract[%s]: %d originals, batch size %d, via %s",
|
||
handle, total_originals, _BATCH, bincio_exe)
|
||
|
||
async def event_stream():
|
||
total_imported = total_skipped = total_errors = 0
|
||
offset = 0
|
||
|
||
while offset < total_originals:
|
||
limit = min(_BATCH, total_originals - offset)
|
||
proc = await asyncio.create_subprocess_exec(
|
||
bincio_exe, "reextract-originals",
|
||
"--data-dir", data_dir,
|
||
"--handle", handle,
|
||
"--offset", str(offset),
|
||
"--limit", str(limit),
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
assert proc.stdout is not None
|
||
|
||
async for raw_line in proc.stdout:
|
||
line = raw_line.decode(errors="replace").strip()
|
||
if not line:
|
||
continue
|
||
yield f"data: {line}\n\n"
|
||
try:
|
||
evt = json.loads(line)
|
||
if evt.get("type") == "done":
|
||
total_imported += evt.get("imported", 0)
|
||
total_skipped += evt.get("skipped", 0)
|
||
total_errors += evt.get("errors", 0)
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
await proc.wait()
|
||
if proc.returncode != 0:
|
||
stderr_out = await proc.stderr.read() if proc.stderr else b""
|
||
log.error("reextract[%s]: batch offset=%d exited %d — stderr: %s",
|
||
handle, offset, proc.returncode,
|
||
stderr_out.decode(errors="replace")[:500])
|
||
yield f"data: {json.dumps({'type': 'error', 'message': f'Batch {offset}–{offset+limit} exited with code {proc.returncode}'})}\n\n"
|
||
return # stop on batch failure
|
||
|
||
offset += limit
|
||
|
||
# All batches complete
|
||
log.info("reextract[%s]: all batches done — imported=%d skipped=%d errors=%d; triggering rebuild",
|
||
handle, total_imported, total_skipped, total_errors)
|
||
tasks._trigger_rebuild(handle)
|
||
yield f"data: {json.dumps({'type': 'done', 'imported': total_imported, 'skipped': total_skipped, 'errors': total_errors})}\n\n"
|
||
|
||
return StreamingResponse(
|
||
event_stream(),
|
||
media_type="text/event-stream",
|
||
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||
)
|
||
|
||
|
||
@router.get("/api/admin/users/{handle}/diag")
|
||
async def admin_diag(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Return a diagnostic snapshot of a user's data directory. Admin only."""
|
||
deps._require_admin(bincio_session)
|
||
user_dir = deps._get_data_dir() / handle
|
||
if not user_dir.is_dir():
|
||
raise HTTPException(404, f"No data directory for user '{handle}'")
|
||
|
||
def _count(path: Path, glob: str = "*") -> int:
|
||
return sum(1 for f in path.glob(glob) if f.is_file()) if path.exists() else 0
|
||
|
||
def _size_mb(path: Path) -> float:
|
||
if not path.exists():
|
||
return 0.0
|
||
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) / 1_048_576
|
||
|
||
activities_dir = user_dir / "activities"
|
||
merged_dir = user_dir / "_merged"
|
||
originals_dir = user_dir / "originals"
|
||
uploads_dir = user_dir / "_uploads"
|
||
|
||
merged_index = merged_dir / "index.json"
|
||
root_index = user_dir / "index.json"
|
||
|
||
merged_activity_count: int | None = None
|
||
if merged_index.exists():
|
||
try:
|
||
idx = json.loads(merged_index.read_text())
|
||
merged_activity_count = len(idx.get("activities", []))
|
||
except (OSError, json.JSONDecodeError):
|
||
merged_activity_count = -1
|
||
|
||
root_activity_count: int | None = None
|
||
if root_index.exists():
|
||
try:
|
||
idx = json.loads(root_index.read_text())
|
||
root_activity_count = len(idx.get("activities", []))
|
||
except (OSError, json.JSONDecodeError):
|
||
root_activity_count = -1
|
||
|
||
# Peek at a few filenames in activities/ to understand the actual state
|
||
acts_sample: list[str] = []
|
||
acts_symlinks = 0
|
||
if activities_dir.exists():
|
||
for f in sorted(activities_dir.iterdir())[:10]:
|
||
acts_sample.append(f.name + (" → symlink" if f.is_symlink() else ""))
|
||
if f.is_symlink():
|
||
acts_symlinks += 1
|
||
|
||
# Check _merged/activities/ separately
|
||
merged_acts_dir = merged_dir / "activities"
|
||
merged_acts_json = _count(merged_acts_dir, "*.json")
|
||
merged_acts_geojson = _count(merged_acts_dir, "*.geojson")
|
||
|
||
# List pending files
|
||
pending_files: list[str] = []
|
||
if uploads_dir.exists():
|
||
pending_files = [f.name for f in uploads_dir.iterdir() if f.is_file()]
|
||
|
||
return JSONResponse({
|
||
"handle": handle,
|
||
"user_dir": str(user_dir),
|
||
"activities": {
|
||
"json_files": _count(activities_dir, "*.json"),
|
||
"geojson_files": _count(activities_dir, "*.geojson"),
|
||
"size_mb": round(_size_mb(activities_dir), 2),
|
||
"sample": acts_sample,
|
||
"symlink_count": acts_symlinks,
|
||
},
|
||
"originals": {
|
||
"exists": originals_dir.exists(),
|
||
"size_mb": round(_size_mb(originals_dir), 2),
|
||
"strava_originals": _count(originals_dir / "strava", "*.json") if (originals_dir / "strava").exists() else 0,
|
||
},
|
||
"merged": {
|
||
"exists": merged_dir.exists(),
|
||
"activity_count_in_index": merged_activity_count,
|
||
"size_mb": round(_size_mb(merged_dir), 2),
|
||
"activities_json": merged_acts_json,
|
||
"activities_geojson": merged_acts_geojson,
|
||
},
|
||
"root_index": {
|
||
"exists": root_index.exists(),
|
||
"activity_count": root_activity_count,
|
||
},
|
||
"pending_uploads": len(pending_files),
|
||
"pending_files": pending_files,
|
||
"dedup_cache_exists": (user_dir / ".bincio_cache.json").exists(),
|
||
"athlete_json_exists": (user_dir / "athlete.json").exists(),
|
||
})
|
||
|
||
|
||
@router.delete("/api/admin/users/{handle}/activities")
|
||
async def admin_delete_activities(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Delete all activity data for a user and wipe the merged cache."""
|
||
deps._require_admin(bincio_session)
|
||
user_dir = deps._get_data_dir() / handle
|
||
if not user_dir.is_dir():
|
||
raise HTTPException(404, f"No data directory for user '{handle}'")
|
||
|
||
deleted = _wipe_user_activities(user_dir)
|
||
tasks._trigger_rebuild(handle)
|
||
return JSONResponse({"ok": True, "deleted": deleted})
|
||
|
||
|
||
@router.delete("/api/admin/users/{handle}/directory")
|
||
async def admin_delete_user_directory(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Delete the entire user directory from disk (for ghost users not in the DB).
|
||
|
||
Refuses if the handle exists as an account in the database — use
|
||
DELETE /api/admin/users/{handle}/activities for registered users.
|
||
"""
|
||
import shutil
|
||
deps._require_admin(bincio_session)
|
||
db = deps._get_db()
|
||
from bincio.serve.db import get_user as _get_user
|
||
if _get_user(db, handle) is not None:
|
||
raise HTTPException(
|
||
400,
|
||
f"User '{handle}' is still in the database. Remove the account first, "
|
||
"or use 'Reset data' to wipe only activity files.",
|
||
)
|
||
user_dir = deps._get_data_dir() / handle
|
||
if not user_dir.is_dir():
|
||
raise HTTPException(404, f"No directory for '{handle}'")
|
||
shutil.rmtree(user_dir)
|
||
# Rebuild root manifest so the ghost shard disappears from the site
|
||
from bincio.render.cli import _write_root_manifest
|
||
try:
|
||
_write_root_manifest(deps._get_data_dir())
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
return JSONResponse({"ok": True})
|
||
|
||
|
||
@router.get("/api/admin/strava-sync")
|
||
async def admin_strava_sync_status(
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Return per-user Strava sync status for the admin panel."""
|
||
deps._require_admin(bincio_session)
|
||
root = deps._get_data_dir()
|
||
users = []
|
||
for tf in sorted(root.glob("*/strava_token.json")):
|
||
user_dir = tf.parent
|
||
handle = user_dir.name
|
||
has_creds = (user_dir / "strava_credentials.json").exists()
|
||
|
||
last_sync: str | None = None
|
||
total_imported = 0
|
||
sync_path = user_dir / "_strava_sync.json"
|
||
if sync_path.exists():
|
||
try:
|
||
sc = json.loads(sync_path.read_text(encoding="utf-8"))
|
||
last_sync = sc.get("last_sync")
|
||
total_imported = len(sc.get("imported_ids", []))
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
|
||
run_status: str | None = None
|
||
run_imported = 0
|
||
run_errors = 0
|
||
run_error_message: str | None = None
|
||
last_run: str | None = None
|
||
status_path = user_dir / "_strava_sync_status.json"
|
||
if status_path.exists():
|
||
try:
|
||
ss = json.loads(status_path.read_text(encoding="utf-8"))
|
||
run_status = ss.get("status")
|
||
run_imported = ss.get("imported", 0)
|
||
run_errors = ss.get("errors", 0)
|
||
run_error_message = ss.get("error_message")
|
||
last_run = ss.get("last_run")
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
|
||
users.append({
|
||
"handle": handle,
|
||
"has_credentials": has_creds,
|
||
"last_sync": last_sync,
|
||
"total_imported": total_imported,
|
||
"run_status": run_status,
|
||
"run_imported": run_imported,
|
||
"run_errors": run_errors,
|
||
"run_error_message": run_error_message,
|
||
"last_run": last_run,
|
||
})
|
||
|
||
return JSONResponse({"running": deps._strava_sync_running, "users": users})
|
||
|
||
|
||
@router.post("/api/admin/strava-sync/run")
|
||
async def admin_strava_sync_run(
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Trigger an immediate Strava sync for all users (admin only)."""
|
||
deps._require_admin(bincio_session)
|
||
with deps._strava_sync_lock:
|
||
if deps._strava_sync_running:
|
||
raise HTTPException(409, "Sync already running")
|
||
deps._strava_sync_running = True
|
||
|
||
def _run() -> None:
|
||
try:
|
||
from bincio.sync_strava import sync_all
|
||
results = sync_all(deps._get_data_dir())
|
||
total_new = sum(n for n, _ in results.values())
|
||
if total_new > 0:
|
||
tasks._site_rebuild_event.set()
|
||
except Exception:
|
||
log.exception("admin_strava_sync_run: unexpected error")
|
||
finally:
|
||
deps._strava_sync_running = False
|
||
|
||
threading.Thread(target=_run, daemon=True, name="admin-strava-sync").start()
|
||
return JSONResponse({"ok": True}, status_code=202)
|
||
|
||
|
||
@router.post("/api/admin/users/{handle}/recompute-elevation")
|
||
async def admin_recompute_elevation(
|
||
handle: str,
|
||
bincio_session: str | None = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Recompute elevation gain/loss for all activities of a user from stored timeseries.
|
||
|
||
Skips activities with altitude_source == 'dem' (already DEM-corrected).
|
||
Applies the leading-zero no-fix fix and source-aware hysteresis.
|
||
Returns patched/skipped/error counts.
|
||
"""
|
||
deps._require_admin(bincio_session)
|
||
user_dir = deps._get_data_dir() / handle
|
||
if not user_dir.is_dir():
|
||
raise HTTPException(404, f"No data directory for '{handle}'")
|
||
|
||
from bincio.extract.dem import recalculate_elevation_hysteresis
|
||
from bincio.render.merge import merge_one
|
||
|
||
patched = skipped = errors = 0
|
||
acts_dir = user_dir / "activities"
|
||
for json_path in sorted(acts_dir.glob("*.json")):
|
||
if json_path.name.endswith(".timeseries.json"):
|
||
continue
|
||
activity_id = json_path.stem
|
||
try:
|
||
detail = json.loads(json_path.read_text(encoding="utf-8"))
|
||
if detail.get("altitude_source") == "dem":
|
||
skipped += 1
|
||
continue
|
||
ts_path = acts_dir / f"{activity_id}.timeseries.json"
|
||
if not ts_path.exists():
|
||
skipped += 1
|
||
continue
|
||
ts = json.loads(ts_path.read_text(encoding="utf-8"))
|
||
ele_arr = ts.get("elevation_m") or []
|
||
if not any(e for e in ele_arr if e is not None):
|
||
skipped += 1
|
||
continue
|
||
recalculate_elevation_hysteresis(user_dir, activity_id)
|
||
merge_one(user_dir, activity_id)
|
||
patched += 1
|
||
except Exception as exc:
|
||
log.warning("recompute-elevation[%s/%s]: %s", handle, activity_id, exc)
|
||
errors += 1
|
||
|
||
if patched > 0:
|
||
tasks._trigger_rebuild(handle)
|
||
|
||
return JSONResponse({"ok": True, "patched": patched, "skipped": skipped, "errors": errors})
|