diff --git a/bincio/render/ogimage.py b/bincio/render/ogimage.py
new file mode 100644
index 0000000..96fafdb
--- /dev/null
+++ b/bincio/render/ogimage.py
@@ -0,0 +1,121 @@
+"""OG image generation — 400×400 track-on-dark PNG for social link previews."""
+
+from __future__ import annotations
+
+import io
+import math
+from pathlib import Path
+from typing import Optional
+
+# Colour stops matching ActivityMap.svelte _linearColor stops
+_STOPS: list[tuple[float, tuple[int, int, int]]] = [
+ (0.00, (59, 130, 246)), # blue-500 (low)
+ (0.33, (74, 222, 128)), # green-400
+ (0.66, (250, 204, 21)), # yellow-400
+ (1.00, (239, 68, 68)), # red-500 (high)
+]
+
+_BG = (9, 9, 11) # zinc-950
+_SIZE = 400
+_PAD = 28
+_WIDTH = 5 # logical line width; rendered at 2× then downscaled
+
+
+def _lerp_color(t: float) -> tuple[int, int, int]:
+ t = max(0.0, min(1.0, t))
+ for i in range(len(_STOPS) - 1):
+ t0, c0 = _STOPS[i]
+ t1, c1 = _STOPS[i + 1]
+ if t <= t1:
+ f = (t - t0) / (t1 - t0) if t1 > t0 else 0.0
+ return (
+ round(c0[0] + f * (c1[0] - c0[0])),
+ round(c0[1] + f * (c1[1] - c0[1])),
+ round(c0[2] + f * (c1[2] - c0[2])),
+ )
+ return _STOPS[-1][1]
+
+
+def generate(
+ lat_arr: list[Optional[float]],
+ lon_arr: list[Optional[float]],
+ ele_arr: list[Optional[float]],
+) -> bytes:
+ """Return PNG bytes for a 400×400 elevation-coloured track image.
+
+ Any of the three arrays may have None gaps (no-GPS seconds).
+ Returns a plain dark square if there are fewer than 2 valid GPS points.
+ """
+ try:
+ from PIL import Image, ImageDraw # type: ignore[import]
+ except ImportError as e:
+ raise RuntimeError("Pillow is required for OG image generation") from e
+
+ # Collect valid GPS points paired with elevation (None → 0 for colouring)
+ pts: list[tuple[float, float, float]] = []
+ for lat, lon, ele in zip(lat_arr, lon_arr, ele_arr):
+ if lat is not None and lon is not None:
+ pts.append((float(lat), float(lon), float(ele) if ele is not None else 0.0))
+
+ if len(pts) < 2:
+ img = Image.new("RGB", (_SIZE, _SIZE), _BG)
+ buf = io.BytesIO()
+ img.save(buf, "PNG", optimize=True)
+ return buf.getvalue()
+
+ lats = [p[0] for p in pts]
+ lons = [p[1] for p in pts]
+ eles = [p[2] for p in pts]
+
+ min_lat, max_lat = min(lats), max(lats)
+ min_lon, max_lon = min(lons), max(lons)
+ min_ele, max_ele = min(eles), max(eles)
+ ele_range = max_ele - min_ele or 1.0
+
+ # Mercator correction: compress longitude range by cos(mid_lat) so the
+ # track doesn't look stretched horizontally at higher latitudes.
+ cos_lat = math.cos(math.radians((min_lat + max_lat) / 2))
+
+ usable = _SIZE - 2 * _PAD
+ lat_span = max_lat - min_lat or 1e-6
+ lon_span = (max_lon - min_lon) * cos_lat or 1e-6
+ scale = min(usable / lat_span, usable / lon_span)
+
+ # Centre the track within the canvas
+ x_off = _PAD + (usable - (max_lon - min_lon) * cos_lat * scale) / 2
+ y_off = _PAD + (usable - lat_span * scale) / 2
+
+ def project(lat: float, lon: float) -> tuple[float, float]:
+ x = x_off + (lon - min_lon) * cos_lat * scale
+ y = _SIZE - (y_off + (lat - min_lat) * scale)
+ return x, y
+
+ # Render at 2× resolution then downscale for cheap anti-aliasing
+ S = _SIZE * 2
+ lw = _WIDTH * 2
+ img = Image.new("RGB", (S, S), _BG)
+ draw = ImageDraw.Draw(img)
+
+ for i in range(len(pts) - 1):
+ x0, y0 = project(pts[i][0], pts[i][1])
+ x1, y1 = project(pts[i+1][0], pts[i+1][1])
+ t = (eles[i] - min_ele) / ele_range
+ color = _lerp_color(t)
+ draw.line([(x0 * 2, y0 * 2), (x1 * 2, y1 * 2)], fill=color, width=lw)
+
+ img = img.resize((_SIZE, _SIZE), Image.LANCZOS)
+
+ buf = io.BytesIO()
+ img.save(buf, "PNG", optimize=True)
+ return buf.getvalue()
+
+
+def generate_for_activity(ts_path: Path) -> bytes:
+ """Convenience wrapper: read a .timeseries.json file and call generate()."""
+ import json
+ ts = json.loads(ts_path.read_text(encoding="utf-8"))
+ return generate(
+ ts.get("lat") or [],
+ ts.get("lon") or [],
+ ts.get("elevation_m") or [],
+ )
diff --git a/bincio/serve/routers/ogimage.py b/bincio/serve/routers/ogimage.py
new file mode 100644
index 0000000..182e5cb
--- /dev/null
+++ b/bincio/serve/routers/ogimage.py
@@ -0,0 +1,161 @@
+"""OG preview endpoints.
+
+GET /activity/{activity_id}
+ Returns a minimal HTML page with Open Graph meta tags for social link
+ previews (Telegram, WhatsApp, Slack, …). nginx proxies only bot
+ User-Agents here; regular browsers still get the static SPA shell.
+
+GET /api/og-image/{user_handle}/{activity_id}.png
+ Returns the pre-generated 400×400 track PNG. Falls back to generating
+ on the fly if the static file doesn't exist yet (e.g. a brand-new import
+ before the next deploy-time generation run).
+"""
+
+from __future__ import annotations
+
+import html
+import json
+from datetime import datetime, timezone
+from pathlib import Path
+
+from fastapi import APIRouter, HTTPException, Request
+from fastapi.responses import HTMLResponse, Response
+
+from bincio.serve import deps
+
+router = APIRouter()
+
+# Sport → emoji map (extend as needed)
+_SPORT_EMOJI: dict[str, str] = {
+ "cycling": "🚴",
+ "running": "🏃",
+ "swimming": "🏊",
+ "hiking": "🥾",
+ "walking": "🚶",
+ "skiing": "⛷️",
+ "rowing": "🚣",
+ "triathlon": "🏊",
+ "e_cycling": "🚴",
+ "gravel": "🚴",
+}
+
+
+def _find_user(data_dir: Path, activity_id: str) -> str | None:
+ """Return the user handle that owns *activity_id*, or None."""
+ for user_dir in sorted(data_dir.iterdir()):
+ if not user_dir.is_dir() or user_dir.name.startswith("_") or user_dir.name == "segments":
+ continue
+ if (user_dir / "activities" / f"{activity_id}.json").exists():
+ return user_dir.name
+ return None
+
+
+def _fmt_description(detail: dict, handle: str) -> str:
+ parts: list[str] = []
+ sport = (detail.get("sport") or "").lower()
+ emoji = _SPORT_EMOJI.get(sport, "🏅")
+ parts.append(emoji)
+
+ dist_m = detail.get("distance_m")
+ if dist_m:
+ parts.append(f"{dist_m / 1000:.1f} km")
+
+ gain = detail.get("elevation_gain_m")
+ if gain:
+ parts.append(f"{gain:.0f} m ↑")
+
+ dur = detail.get("moving_time_s") or detail.get("duration_s")
+ if dur:
+ h, rem = divmod(int(dur), 3600)
+ m = rem // 60
+ parts.append(f"{h}h {m:02d}m" if h else f"{m}m")
+
+ started = detail.get("started_at")
+ if started:
+ try:
+ dt = datetime.fromisoformat(started).astimezone(timezone.utc)
+ parts.append(dt.strftime("%-d %b %Y"))
+ except ValueError:
+ pass
+
+ parts.append(f"@{handle}")
+ return " · ".join(parts)
+
+
+@router.get("/activity/{activity_id}", response_class=HTMLResponse, include_in_schema=False)
+@router.get("/activity/{activity_id}/", response_class=HTMLResponse, include_in_schema=False)
+async def og_preview(activity_id: str, request: Request) -> HTMLResponse:
+ data_dir = deps._get_data_dir()
+ handle = _find_user(data_dir, activity_id)
+ if handle is None:
+ raise HTTPException(404)
+
+ json_path = data_dir / handle / "activities" / f"{activity_id}.json"
+ detail = json.loads(json_path.read_text(encoding="utf-8"))
+
+ title = detail.get("title") or activity_id
+ desc = _fmt_description(detail, handle)
+ base = str(request.base_url).rstrip("/")
+ img_url = f"{base}/og-image/{handle}/{activity_id}.png"
+ act_url = f"{base}/activity/{activity_id}/"
+
+ h_title = html.escape(title)
+ h_desc = html.escape(desc)
+ h_img = html.escape(img_url)
+ h_url = html.escape(act_url)
+
+ content = f"""
+
+
+
+ {h_title} – BincioActivity
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+ return HTMLResponse(content=content)
+
+
+@router.get("/og-image/{user_handle}/{activity_id}.png", include_in_schema=False)
+async def og_image(user_handle: str, activity_id: str) -> Response:
+ data_dir = deps._get_data_dir()
+ www_root = Path("/var/www/activity")
+ img_path = www_root / "og-image" / user_handle / f"{activity_id}.png"
+
+ if img_path.exists():
+ return Response(
+ content=img_path.read_bytes(),
+ media_type="image/png",
+ headers={"Cache-Control": "public, max-age=86400"},
+ )
+
+ # Fallback: generate on the fly (e.g. new activity before next deploy run)
+ ts_path = data_dir / user_handle / "activities" / f"{activity_id}.timeseries.json"
+ if not ts_path.exists():
+ raise HTTPException(404)
+
+ try:
+ from bincio.render.ogimage import generate_for_activity
+ png = generate_for_activity(ts_path)
+ img_path.parent.mkdir(parents=True, exist_ok=True)
+ img_path.write_bytes(png)
+ except Exception:
+ raise HTTPException(500, "Image generation failed")
+
+ return Response(
+ content=png,
+ media_type="image/png",
+ headers={"Cache-Control": "public, max-age=86400"},
+ )
diff --git a/bincio/serve/server.py b/bincio/serve/server.py
index b52c39e..a296a9a 100644
--- a/bincio/serve/server.py
+++ b/bincio/serve/server.py
@@ -25,6 +25,7 @@ from bincio.serve.routers import (
garmin,
ideas,
me,
+ ogimage,
segments,
strava,
uploads,
@@ -67,5 +68,6 @@ for _router in [
strava.router,
garmin.router,
ideas.router,
+ ogimage.router,
]:
app.include_router(_router)
diff --git a/pyproject.toml b/pyproject.toml
index f9ee2d1..bd21af6 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -40,6 +40,7 @@ serve = [
"uvicorn[standard]>=0.29",
"python-multipart>=0.0.9",
"bcrypt>=4.1",
+ "Pillow>=10.0",
]
strava = [
"requests>=2.32",
diff --git a/scripts/generate_og_images.py b/scripts/generate_og_images.py
new file mode 100644
index 0000000..fb80aad
--- /dev/null
+++ b/scripts/generate_og_images.py
@@ -0,0 +1,91 @@
+"""Pre-generate OG track images for all activities.
+
+Writes 400×400 PNGs to {www_root}/og-image/{user}/{activity_id}.png.
+Skips activities that already have an up-to-date image (mtime check).
+Safe to run repeatedly — only processes new/changed activities.
+
+Usage:
+ uv run scripts/generate_og_images.py [--data-dir /var/bincio/data] [--www-root /var/www/activity]
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+from pathlib import Path
+
+
+def generate_all(data_dir: Path, www_root: Path) -> None:
+ out_root = www_root / "og-image"
+ out_root.mkdir(parents=True, exist_ok=True)
+
+ from bincio.render.ogimage import generate
+
+ total = generated = skipped = errors = 0
+
+ users = sorted(
+ d.name for d in data_dir.iterdir()
+ if d.is_dir() and not d.name.startswith("_") and d.name != "segments"
+ )
+
+ for handle in users:
+ user_dir = data_dir / handle
+ acts_dir = user_dir / "activities"
+ img_dir = out_root / handle
+ if not acts_dir.exists():
+ continue
+ img_dir.mkdir(exist_ok=True)
+ u_gen = u_skip = u_err = 0
+
+ for ts_path in sorted(acts_dir.glob("*.timeseries.json")):
+ activity_id = ts_path.name.replace(".timeseries.json", "")
+ out_path = img_dir / f"{activity_id}.png"
+ total += 1
+
+ # Skip if image is newer than timeseries
+ if out_path.exists() and out_path.stat().st_mtime >= ts_path.stat().st_mtime:
+ skipped += 1
+ u_skip += 1
+ continue
+
+ try:
+ ts = json.loads(ts_path.read_text(encoding="utf-8"))
+ lat_arr = ts.get("lat") or []
+ lon_arr = ts.get("lon") or []
+ ele_arr = ts.get("elevation_m") or []
+ png = generate(lat_arr, lon_arr, ele_arr)
+ out_path.write_bytes(png)
+ generated += 1
+ u_gen += 1
+ except Exception as exc:
+ errors += 1
+ u_err += 1
+ print(f" ERROR {handle}/{activity_id}: {exc}", file=sys.stderr)
+
+ if u_gen or u_err:
+ print(f"{handle:<25} generated={u_gen:4d} skipped={u_skip:4d} errors={u_err}")
+ else:
+ print(f"{handle:<25} skipped={u_skip:4d} (all up to date)")
+
+ print(f"\nDone — {generated} generated, {skipped} skipped, {errors} errors (total {total})")
+
+
+def main() -> None:
+ ap = argparse.ArgumentParser(description="Pre-generate OG track images")
+ ap.add_argument("--data-dir", default="/var/bincio/data", type=Path)
+ ap.add_argument("--www-root", default="/var/www/activity", type=Path)
+ args = ap.parse_args()
+
+ if not args.data_dir.exists():
+ print(f"ERROR: data dir not found: {args.data_dir}", file=sys.stderr)
+ sys.exit(1)
+
+ print(f"data-dir : {args.data_dir}")
+ print(f"www-root : {args.www_root}")
+ print(f"output : {args.www_root}/og-image/\n")
+ generate_all(args.data_dir, args.www_root)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/strava_elevation_audit.py b/scripts/strava_elevation_audit.py
new file mode 100644
index 0000000..6a68a98
--- /dev/null
+++ b/scripts/strava_elevation_audit.py
@@ -0,0 +1,198 @@
+"""Audit elevation accuracy vs Strava.
+
+Friends add a note with the Strava elevation to their activity descriptions.
+Supported formats (case-insensitive):
+ - "strava 1323md+" most common
+ - "strava 1323 m d+"
+ - "Strava 1625 m d+"
+ - "Strava Elevation 1173m"
+ - "1038 m d+ Strava" number before the word strava
+ - "Strava 207 metri di dislivello"
+
+Descriptions live in _merged/activities/ (sidecar merge).
+Computed elevation_gain_m is read from activities/ (main file).
+
+Usage:
+ uv run scripts/strava_elevation_audit.py [--data-dir /var/bincio/data] [--out elevation_audit.csv]
+"""
+
+from __future__ import annotations
+
+import argparse
+import csv
+import json
+import re
+import sys
+from pathlib import Path
+
+from bincio.extract.metrics import elevation_params
+
+# Patterns tried in order; first match wins.
+# Each pattern must have exactly one capturing group for the numeric value.
+_PATTERNS: list[re.Pattern] = [
+ # "strava NNN m ..." or "strava NNNmd+"
+ re.compile(r'\bstrava\b\s*([0-9][0-9.,]*)\s*m', re.IGNORECASE),
+ # "Strava Elevation NNNm" or "Strava ... NNNm" (one word between)
+ re.compile(r'\bstrava\b\s+\w+\s+([0-9][0-9.,]*)\s*m', re.IGNORECASE),
+ # "NNN m ... strava" (number comes first, up to 20 chars before strava)
+ re.compile(r'([0-9][0-9.,]*)\s*m\b.{0,20}?\bstrava\b', re.IGNORECASE),
+ # "Strava NNN metri di dislivello" (Italian)
+ re.compile(r'\bstrava\b.*?([0-9][0-9.,]*)\s+metr', re.IGNORECASE),
+]
+
+
+def _find_strava_elevation(description: str) -> float | None:
+ for pat in _PATTERNS:
+ m = pat.search(description)
+ if m:
+ raw = m.group(1).replace(',', '.')
+ try:
+ return float(raw)
+ except ValueError:
+ continue
+ return None
+
+
+def audit(data_dir: Path, out_path: Path) -> list[dict]:
+ rows: list[dict] = []
+ unmatched: list[tuple[str, str]] = [] # (path, desc) couldn't parse elevation
+
+ for merged_path in sorted(data_dir.glob("*/_merged/activities/*.json")):
+ if merged_path.suffix != ".json":
+ continue
+ if ".timeseries." in merged_path.name or ".geojson" in merged_path.name:
+ continue
+
+ try:
+ merged = json.loads(merged_path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ continue
+
+ description = merged.get("description") or ""
+ if not description or "strava" not in description.lower():
+ continue
+
+ # Skip strava:// athlete-mention links (not elevation notes)
+ if re.search(r'strava://', description, re.IGNORECASE):
+ continue
+
+ strava_elev = _find_strava_elevation(description)
+ if strava_elev is None:
+ unmatched.append((str(merged_path), description))
+ continue
+
+ # Read computed elevation from main activity file
+ main_path = (
+ merged_path.parents[3] # data_dir
+ / merged_path.parents[2].name # user
+ / "activities"
+ / merged_path.name
+ )
+ try:
+ main = json.loads(main_path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ main = merged # fall back to merged values
+
+ our_elev = main.get("elevation_gain_m")
+ title = main.get("title") or merged.get("title") or merged_path.stem
+ user = merged_path.parents[2].name
+ altitude_source = main.get("altitude_source") or "unknown"
+ source = main.get("source") or ""
+ device = main.get("device") or "unknown"
+ ma_window, threshold = elevation_params(altitude_source, source)
+
+ delta = round(our_elev - strava_elev, 1) if our_elev is not None else None
+ pct = (
+ round((our_elev - strava_elev) / strava_elev * 100, 1)
+ if our_elev is not None and strava_elev != 0
+ else None
+ )
+
+ rows.append({
+ "file": merged_path.name,
+ "user": user,
+ "title": title,
+ "device": device,
+ "altitude_source": altitude_source,
+ "source": source,
+ "ma_window_s": ma_window,
+ "threshold_m": threshold,
+ "our_elevation_m": our_elev,
+ "strava_elevation_m": strava_elev,
+ "delta_m": delta,
+ "delta_pct": pct,
+ "description": description[:120].replace("\n", " ").replace("\r", ""),
+ })
+
+ rows.sort(key=lambda r: abs(r["delta_m"] or 0), reverse=True)
+
+ if rows:
+ with out_path.open("w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
+ writer.writeheader()
+ writer.writerows(rows)
+
+ if unmatched:
+ print(f"\nCould not parse elevation from {len(unmatched)} description(s):")
+ for path, desc in unmatched:
+ print(f" {Path(path).name} {desc[:80]!r}")
+
+ return rows
+
+
+def main() -> None:
+ ap = argparse.ArgumentParser(description="Audit elevation accuracy vs Strava notes")
+ ap.add_argument("--data-dir", default="/var/bincio/data", type=Path)
+ ap.add_argument("--out", default="elevation_audit.csv", type=Path)
+ args = ap.parse_args()
+
+ if not args.data_dir.exists():
+ print(f"ERROR: data dir not found: {args.data_dir}", file=sys.stderr)
+ sys.exit(1)
+
+ print(f"Scanning {args.data_dir} …")
+ rows = audit(args.data_dir, args.out)
+
+ if not rows:
+ print("No activities found with a parseable Strava elevation note.")
+ return
+
+ print(f"\nFound {len(rows)} activit{'y' if len(rows)==1 else 'ies'}:\n")
+ header = (
+ f"{'File':<50} {'User':<15} {'Source':<16} {'AltSrc':<12}"
+ f" {'MA':>4} {'Thr':>5} {'Ours':>8} {'Strava':>8} {'Delta':>8} {'Delta%':>7}"
+ )
+ print(header)
+ print("-" * len(header))
+ for r in rows:
+ delta_str = f"{r['delta_m']:+.0f}" if r['delta_m'] is not None else "n/a"
+ pct_str = f"{r['delta_pct']:+.1f}%" if r['delta_pct'] is not None else "n/a"
+ our_str = f"{r['our_elevation_m']:.0f}" if r['our_elevation_m'] is not None else "n/a"
+ print(
+ f"{r['file']:<50} {r['user']:<15} {r['source']:<16} {r['altitude_source']:<12}"
+ f" {r['ma_window_s']:>4} {r['threshold_m']:>5.1f}"
+ f" {our_str:>8} {r['strava_elevation_m']:>8.0f}"
+ f" {delta_str:>8} {pct_str:>7}"
+ )
+
+ n = len(rows)
+ pcts = [r["delta_pct"] for r in rows if r["delta_pct"] is not None]
+ deltas = [r["delta_m"] for r in rows if r["delta_m"] is not None]
+ if pcts:
+ avg_pct = sum(pcts) / len(pcts)
+ sorted_pcts = sorted(pcts)
+ median_pct = sorted_pcts[len(sorted_pcts) // 2]
+ within_10 = sum(1 for p in pcts if abs(p) <= 10)
+ within_15 = sum(1 for p in pcts if abs(p) <= 15)
+ avg_d = sum(deltas) / len(deltas) if deltas else 0
+ print(
+ f"\n n={n} avg={avg_pct:+.1f}% median={median_pct:+.1f}%"
+ f" avg delta={avg_d:+.0f} m"
+ f" within ±10%: {within_10}/{n} within ±15%: {within_15}/{n}"
+ )
+
+ print(f"\nCSV saved to: {args.out}")
+
+
+if __name__ == "__main__":
+ main()