fix(merge): serialize merge_all per user-dir to prevent concurrent rmtree race
When the dev file-watcher and an upload endpoint both trigger merge_all on the same user directory at the same time, the shutil.rmtree/_merged/activities/ sequence collides: one thread deletes the directory while the other is mid-walk, causing ENOTEMPTY or ENOENT crashes. Added a per-user-dir threading.Lock (_merge_lock) at the top of merge.py so all callers (upload endpoints, watcher, admin rebuild) are serialized without changing any call site. Also made the rmtree ignore_errors=True and the symlink_to conditional to survive any stale leftover from a prior crash.
This commit is contained in:
+23
-4
@@ -14,10 +14,24 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
|
import threading
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
# Per-user-directory lock so concurrent upload requests and the dev file-watcher
|
||||||
|
# cannot run merge_all simultaneously on the same directory.
|
||||||
|
_merge_locks: dict[str, threading.Lock] = {}
|
||||||
|
_merge_locks_mu = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_lock(data_dir: Path) -> threading.Lock:
|
||||||
|
key = str(data_dir.resolve())
|
||||||
|
with _merge_locks_mu:
|
||||||
|
if key not in _merge_locks:
|
||||||
|
_merge_locks[key] = threading.Lock()
|
||||||
|
return _merge_locks[key]
|
||||||
|
|
||||||
|
|
||||||
def parse_sidecar(path: Path) -> tuple[dict, str]:
|
def parse_sidecar(path: Path) -> tuple[dict, str]:
|
||||||
"""Return (frontmatter_dict, markdown_body) from a sidecar .md file."""
|
"""Return (frontmatter_dict, markdown_body) from a sidecar .md file."""
|
||||||
@@ -163,6 +177,11 @@ def merge_all(data_dir: Path) -> int:
|
|||||||
|
|
||||||
Returns the number of sidecars found and applied.
|
Returns the number of sidecars found and applied.
|
||||||
"""
|
"""
|
||||||
|
with _merge_lock(data_dir):
|
||||||
|
return _merge_all_locked(data_dir)
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_all_locked(data_dir: Path) -> int:
|
||||||
edits_dir = data_dir / "edits"
|
edits_dir = data_dir / "edits"
|
||||||
acts_dir = data_dir / "activities"
|
acts_dir = data_dir / "activities"
|
||||||
merged_dir = data_dir / "_merged"
|
merged_dir = data_dir / "_merged"
|
||||||
@@ -191,9 +210,8 @@ def merge_all(data_dir: Path) -> int:
|
|||||||
to_merge = set(sidecars) | set(image_lists)
|
to_merge = set(sidecars) | set(image_lists)
|
||||||
|
|
||||||
# Wipe and recreate _merged/activities/
|
# Wipe and recreate _merged/activities/
|
||||||
if merged_acts.exists():
|
shutil.rmtree(merged_acts, ignore_errors=True)
|
||||||
shutil.rmtree(merged_acts)
|
merged_acts.mkdir(parents=True, exist_ok=True)
|
||||||
merged_acts.mkdir(parents=True)
|
|
||||||
|
|
||||||
# Mirror activities/ — symlink unmodified, write merged copies for overridden
|
# Mirror activities/ — symlink unmodified, write merged copies for overridden
|
||||||
if acts_dir.exists():
|
if acts_dir.exists():
|
||||||
@@ -212,7 +230,8 @@ def merge_all(data_dir: Path) -> int:
|
|||||||
detail["custom"]["images"] = image_lists[activity_id]
|
detail["custom"]["images"] = image_lists[activity_id]
|
||||||
dest.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
|
dest.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
|
||||||
else:
|
else:
|
||||||
dest.symlink_to(src.resolve())
|
if not dest.exists() and not dest.is_symlink():
|
||||||
|
dest.symlink_to(src.resolve())
|
||||||
|
|
||||||
# Mirror edits/images/ → _merged/activities/images/ so the site can serve them
|
# Mirror edits/images/ → _merged/activities/images/ so the site can serve them
|
||||||
if images_root and images_root.exists():
|
if images_root and images_root.exists():
|
||||||
|
|||||||
Reference in New Issue
Block a user