fix(upload): prevent false 422s and EMFILE crash during bulk uploads
Four related issues made uploading 271+ activities unreliable: 1. merge_all/write_combined_feed were inside the extraction try/except — any merge race returned 422 even though the file was on disk, causing the mobile app to permanently mark the upload as failed. Fixed by moving them to a separate best-effort try/except after the extraction block. Switch to merge_one (single-activity symlink) instead of merge_all (full rebuild) so each upload is O(1) FS ops, not O(N). 2. The dev watcher fired merge_all for every activity .json write AND the upload endpoint also ran merge_all — O(N²) symlink operations during bulk uploads. Watcher now skips activities/*.json changes (upload endpoint handles those directly). 3. Vite/Chokidar followed the public/data symlink and opened a handle per activity file; constant merge rebuilds exhausted file descriptors and crashed the Astro dev server. Fixed with watch.ignored on public/data. 4. _write_year_shards and write_combined_feed used f.unlink() without missing_ok=True — concurrent callers racing the same file threw FileNotFoundError which propagated as a false extraction failure.
This commit is contained in:
@@ -141,6 +141,11 @@ def _watch_data(data: Path) -> None:
|
|||||||
continue
|
continue
|
||||||
for prefix, user_dir in prefix_to_user.items():
|
for prefix, user_dir in prefix_to_user.items():
|
||||||
if path.startswith(prefix):
|
if path.startswith(prefix):
|
||||||
|
# Skip new-activity .json writes in activities/ — the upload
|
||||||
|
# endpoint calls merge_one inline, so firing merge_all here
|
||||||
|
# too would cause O(N²) full rebuilds during bulk uploads.
|
||||||
|
if path.startswith(str(user_dir / "activities")) and path.endswith(".json"):
|
||||||
|
break
|
||||||
affected.add(user_dir)
|
affected.add(user_dir)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|||||||
@@ -98,6 +98,11 @@ def merge_one(data_dir: Path, activity_id: str) -> None:
|
|||||||
|
|
||||||
Use merge_all() for bulk operations (first run, Strava sync, etc.).
|
Use merge_all() for bulk operations (first run, Strava sync, etc.).
|
||||||
"""
|
"""
|
||||||
|
with _merge_lock(data_dir):
|
||||||
|
_merge_one_locked(data_dir, activity_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_one_locked(data_dir: Path, activity_id: str) -> None:
|
||||||
edits_dir = data_dir / "edits"
|
edits_dir = data_dir / "edits"
|
||||||
acts_dir = data_dir / "activities"
|
acts_dir = data_dir / "activities"
|
||||||
merged_dir = data_dir / "_merged"
|
merged_dir = data_dir / "_merged"
|
||||||
@@ -311,7 +316,7 @@ def _write_year_shards(merged_dir: Path, activities: list[dict], index_meta: dic
|
|||||||
|
|
||||||
# Remove stale year shard files from previous runs
|
# Remove stale year shard files from previous runs
|
||||||
for f in merged_dir.glob("index-*.json"):
|
for f in merged_dir.glob("index-*.json"):
|
||||||
f.unlink()
|
f.unlink(missing_ok=True)
|
||||||
|
|
||||||
by_year: dict[str, list[dict]] = defaultdict(list)
|
by_year: dict[str, list[dict]] = defaultdict(list)
|
||||||
for a in activities:
|
for a in activities:
|
||||||
@@ -398,7 +403,7 @@ def write_combined_feed(data_dir: Path) -> int:
|
|||||||
|
|
||||||
# Remove stale feed pages
|
# Remove stale feed pages
|
||||||
for f in data_dir.glob("feed*.json"):
|
for f in data_dir.glob("feed*.json"):
|
||||||
f.unlink()
|
f.unlink(missing_ok=True)
|
||||||
|
|
||||||
if not all_activities:
|
if not all_activities:
|
||||||
return 0
|
return 0
|
||||||
|
|||||||
+15
-6
@@ -596,9 +596,12 @@ async def upload_bas_activity(
|
|||||||
|
|
||||||
_upsert_index_summary(user_dir, activity_id, activity, geojson_body)
|
_upsert_index_summary(user_dir, activity_id, activity, geojson_body)
|
||||||
|
|
||||||
from bincio.render.merge import merge_all, write_combined_feed
|
try:
|
||||||
merge_all(user_dir)
|
from bincio.render.merge import merge_one, write_combined_feed
|
||||||
|
merge_one(user_dir, activity_id)
|
||||||
write_combined_feed(_get_data_dir())
|
write_combined_feed(_get_data_dir())
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("upload/bas[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
|
||||||
|
|
||||||
log.info("upload/bas[%s]: imported %s", user.handle, activity_id)
|
log.info("upload/bas[%s]: imported %s", user.handle, activity_id)
|
||||||
return JSONResponse({"ok": True, "id": activity_id, "status": "imported"})
|
return JSONResponse({"ok": True, "id": activity_id, "status": "imported"})
|
||||||
@@ -688,10 +691,6 @@ async def upload_raw_activity(
|
|||||||
|
|
||||||
_upsert_index_summary(user_dir, act_id, detail, geojson)
|
_upsert_index_summary(user_dir, act_id, detail, geojson)
|
||||||
|
|
||||||
from bincio.render.merge import merge_all, write_combined_feed
|
|
||||||
merge_all(user_dir)
|
|
||||||
write_combined_feed(_get_data_dir())
|
|
||||||
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.warning("upload/raw[%s]: extraction failed: %s", user.handle, exc)
|
log.warning("upload/raw[%s]: extraction failed: %s", user.handle, exc)
|
||||||
raise HTTPException(422, f"Could not extract activity: {exc}") from exc
|
raise HTTPException(422, f"Could not extract activity: {exc}") from exc
|
||||||
@@ -699,6 +698,16 @@ async def upload_raw_activity(
|
|||||||
tmp_in.unlink(missing_ok=True)
|
tmp_in.unlink(missing_ok=True)
|
||||||
shutil.rmtree(tmp_out, ignore_errors=True)
|
shutil.rmtree(tmp_out, ignore_errors=True)
|
||||||
|
|
||||||
|
# Merge and update feed — best effort; a race or transient FS error here must
|
||||||
|
# not turn a successful extraction into a 422 (the file is on disk; the mobile
|
||||||
|
# would retry indefinitely and the activity would never be marked synced).
|
||||||
|
try:
|
||||||
|
from bincio.render.merge import merge_one, write_combined_feed
|
||||||
|
merge_one(user_dir, act_id)
|
||||||
|
write_combined_feed(_get_data_dir())
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("upload/raw[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
|
||||||
|
|
||||||
log.info("upload/raw[%s]: imported %s", user.handle, act_id)
|
log.info("upload/raw[%s]: imported %s", user.handle, act_id)
|
||||||
return JSONResponse({
|
return JSONResponse({
|
||||||
"ok": True,
|
"ok": True,
|
||||||
|
|||||||
@@ -39,6 +39,12 @@ export default defineConfig({
|
|||||||
// Proxy /api/* to bincio serve/edit so cookies work same-origin in dev.
|
// Proxy /api/* to bincio serve/edit so cookies work same-origin in dev.
|
||||||
// In production nginx handles this — same pattern, no code change needed.
|
// In production nginx handles this — same pattern, no code change needed.
|
||||||
server: {
|
server: {
|
||||||
|
watch: {
|
||||||
|
// public/data is a symlink to the live data dir; Chokidar follows it and
|
||||||
|
// opens a handle per file, which causes EMFILE during bulk uploads as
|
||||||
|
// activity count grows. Data files don't need HMR — exclude them.
|
||||||
|
ignored: ['**/public/data/**'],
|
||||||
|
},
|
||||||
proxy: {
|
proxy: {
|
||||||
// Both /api/upload and /api/upload/strava-zip return SSE streams in response
|
// Both /api/upload and /api/upload/strava-zip return SSE streams in response
|
||||||
// to POST requests. Vite's default proxy buffers the full body before forwarding,
|
// to POST requests. Vite's default proxy buffers the full body before forwarding,
|
||||||
|
|||||||
Reference in New Issue
Block a user