feat: OG link previews — track image + meta tags for Telegram/WhatsApp

- bincio/render/ogimage.py: generate 400x400 elevation-coloured PNG with Pillow
- bincio/serve/routers/ogimage.py: /activity/{id}/ OG HTML stub for bot UAs;
  /og-image/{user}/{id}.png serves pre-generated images with on-demand fallback
- scripts/generate_og_images.py: batch pre-generation, incremental (mtime skip)
- scripts/strava_elevation_audit.py: add source/threshold/MA columns and pct stats
- pyproject.toml: add Pillow>=10 to serve extras
This commit is contained in:
Davide Scaini
2026-05-23 21:44:19 +02:00
parent 56932f7f25
commit 693f720cbd
6 changed files with 574 additions and 0 deletions
+121
View File
@@ -0,0 +1,121 @@
"""OG image generation — 400×400 track-on-dark PNG for social link previews."""
from __future__ import annotations
import io
import math
from pathlib import Path
from typing import Optional
# Colour stops matching ActivityMap.svelte _linearColor stops
_STOPS: list[tuple[float, tuple[int, int, int]]] = [
(0.00, (59, 130, 246)), # blue-500 (low)
(0.33, (74, 222, 128)), # green-400
(0.66, (250, 204, 21)), # yellow-400
(1.00, (239, 68, 68)), # red-500 (high)
]
_BG = (9, 9, 11) # zinc-950
_SIZE = 400
_PAD = 28
_WIDTH = 5 # logical line width; rendered at 2× then downscaled
def _lerp_color(t: float) -> tuple[int, int, int]:
t = max(0.0, min(1.0, t))
for i in range(len(_STOPS) - 1):
t0, c0 = _STOPS[i]
t1, c1 = _STOPS[i + 1]
if t <= t1:
f = (t - t0) / (t1 - t0) if t1 > t0 else 0.0
return (
round(c0[0] + f * (c1[0] - c0[0])),
round(c0[1] + f * (c1[1] - c0[1])),
round(c0[2] + f * (c1[2] - c0[2])),
)
return _STOPS[-1][1]
def generate(
lat_arr: list[Optional[float]],
lon_arr: list[Optional[float]],
ele_arr: list[Optional[float]],
) -> bytes:
"""Return PNG bytes for a 400×400 elevation-coloured track image.
Any of the three arrays may have None gaps (no-GPS seconds).
Returns a plain dark square if there are fewer than 2 valid GPS points.
"""
try:
from PIL import Image, ImageDraw # type: ignore[import]
except ImportError as e:
raise RuntimeError("Pillow is required for OG image generation") from e
# Collect valid GPS points paired with elevation (None → 0 for colouring)
pts: list[tuple[float, float, float]] = []
for lat, lon, ele in zip(lat_arr, lon_arr, ele_arr):
if lat is not None and lon is not None:
pts.append((float(lat), float(lon), float(ele) if ele is not None else 0.0))
if len(pts) < 2:
img = Image.new("RGB", (_SIZE, _SIZE), _BG)
buf = io.BytesIO()
img.save(buf, "PNG", optimize=True)
return buf.getvalue()
lats = [p[0] for p in pts]
lons = [p[1] for p in pts]
eles = [p[2] for p in pts]
min_lat, max_lat = min(lats), max(lats)
min_lon, max_lon = min(lons), max(lons)
min_ele, max_ele = min(eles), max(eles)
ele_range = max_ele - min_ele or 1.0
# Mercator correction: compress longitude range by cos(mid_lat) so the
# track doesn't look stretched horizontally at higher latitudes.
cos_lat = math.cos(math.radians((min_lat + max_lat) / 2))
usable = _SIZE - 2 * _PAD
lat_span = max_lat - min_lat or 1e-6
lon_span = (max_lon - min_lon) * cos_lat or 1e-6
scale = min(usable / lat_span, usable / lon_span)
# Centre the track within the canvas
x_off = _PAD + (usable - (max_lon - min_lon) * cos_lat * scale) / 2
y_off = _PAD + (usable - lat_span * scale) / 2
def project(lat: float, lon: float) -> tuple[float, float]:
x = x_off + (lon - min_lon) * cos_lat * scale
y = _SIZE - (y_off + (lat - min_lat) * scale)
return x, y
# Render at 2× resolution then downscale for cheap anti-aliasing
S = _SIZE * 2
lw = _WIDTH * 2
img = Image.new("RGB", (S, S), _BG)
draw = ImageDraw.Draw(img)
for i in range(len(pts) - 1):
x0, y0 = project(pts[i][0], pts[i][1])
x1, y1 = project(pts[i+1][0], pts[i+1][1])
t = (eles[i] - min_ele) / ele_range
color = _lerp_color(t)
draw.line([(x0 * 2, y0 * 2), (x1 * 2, y1 * 2)], fill=color, width=lw)
img = img.resize((_SIZE, _SIZE), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, "PNG", optimize=True)
return buf.getvalue()
def generate_for_activity(ts_path: Path) -> bytes:
"""Convenience wrapper: read a .timeseries.json file and call generate()."""
import json
ts = json.loads(ts_path.read_text(encoding="utf-8"))
return generate(
ts.get("lat") or [],
ts.get("lon") or [],
ts.get("elevation_m") or [],
)
+161
View File
@@ -0,0 +1,161 @@
"""OG preview endpoints.
GET /activity/{activity_id}
Returns a minimal HTML page with Open Graph meta tags for social link
previews (Telegram, WhatsApp, Slack, …). nginx proxies only bot
User-Agents here; regular browsers still get the static SPA shell.
GET /api/og-image/{user_handle}/{activity_id}.png
Returns the pre-generated 400×400 track PNG. Falls back to generating
on the fly if the static file doesn't exist yet (e.g. a brand-new import
before the next deploy-time generation run).
"""
from __future__ import annotations
import html
import json
from datetime import datetime, timezone
from pathlib import Path
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import HTMLResponse, Response
from bincio.serve import deps
router = APIRouter()
# Sport → emoji map (extend as needed)
_SPORT_EMOJI: dict[str, str] = {
"cycling": "🚴",
"running": "🏃",
"swimming": "🏊",
"hiking": "🥾",
"walking": "🚶",
"skiing": "⛷️",
"rowing": "🚣",
"triathlon": "🏊",
"e_cycling": "🚴",
"gravel": "🚴",
}
def _find_user(data_dir: Path, activity_id: str) -> str | None:
"""Return the user handle that owns *activity_id*, or None."""
for user_dir in sorted(data_dir.iterdir()):
if not user_dir.is_dir() or user_dir.name.startswith("_") or user_dir.name == "segments":
continue
if (user_dir / "activities" / f"{activity_id}.json").exists():
return user_dir.name
return None
def _fmt_description(detail: dict, handle: str) -> str:
parts: list[str] = []
sport = (detail.get("sport") or "").lower()
emoji = _SPORT_EMOJI.get(sport, "🏅")
parts.append(emoji)
dist_m = detail.get("distance_m")
if dist_m:
parts.append(f"{dist_m / 1000:.1f} km")
gain = detail.get("elevation_gain_m")
if gain:
parts.append(f"{gain:.0f} m ↑")
dur = detail.get("moving_time_s") or detail.get("duration_s")
if dur:
h, rem = divmod(int(dur), 3600)
m = rem // 60
parts.append(f"{h}h {m:02d}m" if h else f"{m}m")
started = detail.get("started_at")
if started:
try:
dt = datetime.fromisoformat(started).astimezone(timezone.utc)
parts.append(dt.strftime("%-d %b %Y"))
except ValueError:
pass
parts.append(f"@{handle}")
return " · ".join(parts)
@router.get("/activity/{activity_id}", response_class=HTMLResponse, include_in_schema=False)
@router.get("/activity/{activity_id}/", response_class=HTMLResponse, include_in_schema=False)
async def og_preview(activity_id: str, request: Request) -> HTMLResponse:
data_dir = deps._get_data_dir()
handle = _find_user(data_dir, activity_id)
if handle is None:
raise HTTPException(404)
json_path = data_dir / handle / "activities" / f"{activity_id}.json"
detail = json.loads(json_path.read_text(encoding="utf-8"))
title = detail.get("title") or activity_id
desc = _fmt_description(detail, handle)
base = str(request.base_url).rstrip("/")
img_url = f"{base}/og-image/{handle}/{activity_id}.png"
act_url = f"{base}/activity/{activity_id}/"
h_title = html.escape(title)
h_desc = html.escape(desc)
h_img = html.escape(img_url)
h_url = html.escape(act_url)
content = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>{h_title} BincioActivity</title>
<meta property="og:title" content="{h_title}" />
<meta property="og:description" content="{h_desc}" />
<meta property="og:image" content="{h_img}" />
<meta property="og:image:width" content="400" />
<meta property="og:image:height" content="400" />
<meta property="og:url" content="{h_url}" />
<meta property="og:type" content="website" />
<meta property="og:site_name" content="BincioActivity" />
<meta name="twitter:card" content="summary" />
<meta name="twitter:title" content="{h_title}" />
<meta name="twitter:description" content="{h_desc}" />
<meta name="twitter:image" content="{h_img}" />
<script>window.location.replace("{act_url}");</script>
</head>
<body></body>
</html>"""
return HTMLResponse(content=content)
@router.get("/og-image/{user_handle}/{activity_id}.png", include_in_schema=False)
async def og_image(user_handle: str, activity_id: str) -> Response:
data_dir = deps._get_data_dir()
www_root = Path("/var/www/activity")
img_path = www_root / "og-image" / user_handle / f"{activity_id}.png"
if img_path.exists():
return Response(
content=img_path.read_bytes(),
media_type="image/png",
headers={"Cache-Control": "public, max-age=86400"},
)
# Fallback: generate on the fly (e.g. new activity before next deploy run)
ts_path = data_dir / user_handle / "activities" / f"{activity_id}.timeseries.json"
if not ts_path.exists():
raise HTTPException(404)
try:
from bincio.render.ogimage import generate_for_activity
png = generate_for_activity(ts_path)
img_path.parent.mkdir(parents=True, exist_ok=True)
img_path.write_bytes(png)
except Exception:
raise HTTPException(500, "Image generation failed")
return Response(
content=png,
media_type="image/png",
headers={"Cache-Control": "public, max-age=86400"},
)
+2
View File
@@ -25,6 +25,7 @@ from bincio.serve.routers import (
garmin, garmin,
ideas, ideas,
me, me,
ogimage,
segments, segments,
strava, strava,
uploads, uploads,
@@ -67,5 +68,6 @@ for _router in [
strava.router, strava.router,
garmin.router, garmin.router,
ideas.router, ideas.router,
ogimage.router,
]: ]:
app.include_router(_router) app.include_router(_router)
+1
View File
@@ -40,6 +40,7 @@ serve = [
"uvicorn[standard]>=0.29", "uvicorn[standard]>=0.29",
"python-multipart>=0.0.9", "python-multipart>=0.0.9",
"bcrypt>=4.1", "bcrypt>=4.1",
"Pillow>=10.0",
] ]
strava = [ strava = [
"requests>=2.32", "requests>=2.32",
+91
View File
@@ -0,0 +1,91 @@
"""Pre-generate OG track images for all activities.
Writes 400×400 PNGs to {www_root}/og-image/{user}/{activity_id}.png.
Skips activities that already have an up-to-date image (mtime check).
Safe to run repeatedly — only processes new/changed activities.
Usage:
uv run scripts/generate_og_images.py [--data-dir /var/bincio/data] [--www-root /var/www/activity]
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
def generate_all(data_dir: Path, www_root: Path) -> None:
out_root = www_root / "og-image"
out_root.mkdir(parents=True, exist_ok=True)
from bincio.render.ogimage import generate
total = generated = skipped = errors = 0
users = sorted(
d.name for d in data_dir.iterdir()
if d.is_dir() and not d.name.startswith("_") and d.name != "segments"
)
for handle in users:
user_dir = data_dir / handle
acts_dir = user_dir / "activities"
img_dir = out_root / handle
if not acts_dir.exists():
continue
img_dir.mkdir(exist_ok=True)
u_gen = u_skip = u_err = 0
for ts_path in sorted(acts_dir.glob("*.timeseries.json")):
activity_id = ts_path.name.replace(".timeseries.json", "")
out_path = img_dir / f"{activity_id}.png"
total += 1
# Skip if image is newer than timeseries
if out_path.exists() and out_path.stat().st_mtime >= ts_path.stat().st_mtime:
skipped += 1
u_skip += 1
continue
try:
ts = json.loads(ts_path.read_text(encoding="utf-8"))
lat_arr = ts.get("lat") or []
lon_arr = ts.get("lon") or []
ele_arr = ts.get("elevation_m") or []
png = generate(lat_arr, lon_arr, ele_arr)
out_path.write_bytes(png)
generated += 1
u_gen += 1
except Exception as exc:
errors += 1
u_err += 1
print(f" ERROR {handle}/{activity_id}: {exc}", file=sys.stderr)
if u_gen or u_err:
print(f"{handle:<25} generated={u_gen:4d} skipped={u_skip:4d} errors={u_err}")
else:
print(f"{handle:<25} skipped={u_skip:4d} (all up to date)")
print(f"\nDone — {generated} generated, {skipped} skipped, {errors} errors (total {total})")
def main() -> None:
ap = argparse.ArgumentParser(description="Pre-generate OG track images")
ap.add_argument("--data-dir", default="/var/bincio/data", type=Path)
ap.add_argument("--www-root", default="/var/www/activity", type=Path)
args = ap.parse_args()
if not args.data_dir.exists():
print(f"ERROR: data dir not found: {args.data_dir}", file=sys.stderr)
sys.exit(1)
print(f"data-dir : {args.data_dir}")
print(f"www-root : {args.www_root}")
print(f"output : {args.www_root}/og-image/\n")
generate_all(args.data_dir, args.www_root)
if __name__ == "__main__":
main()
+198
View File
@@ -0,0 +1,198 @@
"""Audit elevation accuracy vs Strava.
Friends add a note with the Strava elevation to their activity descriptions.
Supported formats (case-insensitive):
- "strava 1323md+" most common
- "strava 1323 m d+"
- "Strava 1625 m d+"
- "Strava Elevation 1173m"
- "1038 m d+ Strava" number before the word strava
- "Strava 207 metri di dislivello"
Descriptions live in _merged/activities/ (sidecar merge).
Computed elevation_gain_m is read from activities/ (main file).
Usage:
uv run scripts/strava_elevation_audit.py [--data-dir /var/bincio/data] [--out elevation_audit.csv]
"""
from __future__ import annotations
import argparse
import csv
import json
import re
import sys
from pathlib import Path
from bincio.extract.metrics import elevation_params
# Patterns tried in order; first match wins.
# Each pattern must have exactly one capturing group for the numeric value.
_PATTERNS: list[re.Pattern] = [
# "strava NNN m ..." or "strava NNNmd+"
re.compile(r'\bstrava\b\s*([0-9][0-9.,]*)\s*m', re.IGNORECASE),
# "Strava Elevation NNNm" or "Strava ... NNNm" (one word between)
re.compile(r'\bstrava\b\s+\w+\s+([0-9][0-9.,]*)\s*m', re.IGNORECASE),
# "NNN m ... strava" (number comes first, up to 20 chars before strava)
re.compile(r'([0-9][0-9.,]*)\s*m\b.{0,20}?\bstrava\b', re.IGNORECASE),
# "Strava NNN metri di dislivello" (Italian)
re.compile(r'\bstrava\b.*?([0-9][0-9.,]*)\s+metr', re.IGNORECASE),
]
def _find_strava_elevation(description: str) -> float | None:
for pat in _PATTERNS:
m = pat.search(description)
if m:
raw = m.group(1).replace(',', '.')
try:
return float(raw)
except ValueError:
continue
return None
def audit(data_dir: Path, out_path: Path) -> list[dict]:
rows: list[dict] = []
unmatched: list[tuple[str, str]] = [] # (path, desc) couldn't parse elevation
for merged_path in sorted(data_dir.glob("*/_merged/activities/*.json")):
if merged_path.suffix != ".json":
continue
if ".timeseries." in merged_path.name or ".geojson" in merged_path.name:
continue
try:
merged = json.loads(merged_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
continue
description = merged.get("description") or ""
if not description or "strava" not in description.lower():
continue
# Skip strava:// athlete-mention links (not elevation notes)
if re.search(r'strava://', description, re.IGNORECASE):
continue
strava_elev = _find_strava_elevation(description)
if strava_elev is None:
unmatched.append((str(merged_path), description))
continue
# Read computed elevation from main activity file
main_path = (
merged_path.parents[3] # data_dir
/ merged_path.parents[2].name # user
/ "activities"
/ merged_path.name
)
try:
main = json.loads(main_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
main = merged # fall back to merged values
our_elev = main.get("elevation_gain_m")
title = main.get("title") or merged.get("title") or merged_path.stem
user = merged_path.parents[2].name
altitude_source = main.get("altitude_source") or "unknown"
source = main.get("source") or ""
device = main.get("device") or "unknown"
ma_window, threshold = elevation_params(altitude_source, source)
delta = round(our_elev - strava_elev, 1) if our_elev is not None else None
pct = (
round((our_elev - strava_elev) / strava_elev * 100, 1)
if our_elev is not None and strava_elev != 0
else None
)
rows.append({
"file": merged_path.name,
"user": user,
"title": title,
"device": device,
"altitude_source": altitude_source,
"source": source,
"ma_window_s": ma_window,
"threshold_m": threshold,
"our_elevation_m": our_elev,
"strava_elevation_m": strava_elev,
"delta_m": delta,
"delta_pct": pct,
"description": description[:120].replace("\n", " ").replace("\r", ""),
})
rows.sort(key=lambda r: abs(r["delta_m"] or 0), reverse=True)
if rows:
with out_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
if unmatched:
print(f"\nCould not parse elevation from {len(unmatched)} description(s):")
for path, desc in unmatched:
print(f" {Path(path).name} {desc[:80]!r}")
return rows
def main() -> None:
ap = argparse.ArgumentParser(description="Audit elevation accuracy vs Strava notes")
ap.add_argument("--data-dir", default="/var/bincio/data", type=Path)
ap.add_argument("--out", default="elevation_audit.csv", type=Path)
args = ap.parse_args()
if not args.data_dir.exists():
print(f"ERROR: data dir not found: {args.data_dir}", file=sys.stderr)
sys.exit(1)
print(f"Scanning {args.data_dir}")
rows = audit(args.data_dir, args.out)
if not rows:
print("No activities found with a parseable Strava elevation note.")
return
print(f"\nFound {len(rows)} activit{'y' if len(rows)==1 else 'ies'}:\n")
header = (
f"{'File':<50} {'User':<15} {'Source':<16} {'AltSrc':<12}"
f" {'MA':>4} {'Thr':>5} {'Ours':>8} {'Strava':>8} {'Delta':>8} {'Delta%':>7}"
)
print(header)
print("-" * len(header))
for r in rows:
delta_str = f"{r['delta_m']:+.0f}" if r['delta_m'] is not None else "n/a"
pct_str = f"{r['delta_pct']:+.1f}%" if r['delta_pct'] is not None else "n/a"
our_str = f"{r['our_elevation_m']:.0f}" if r['our_elevation_m'] is not None else "n/a"
print(
f"{r['file']:<50} {r['user']:<15} {r['source']:<16} {r['altitude_source']:<12}"
f" {r['ma_window_s']:>4} {r['threshold_m']:>5.1f}"
f" {our_str:>8} {r['strava_elevation_m']:>8.0f}"
f" {delta_str:>8} {pct_str:>7}"
)
n = len(rows)
pcts = [r["delta_pct"] for r in rows if r["delta_pct"] is not None]
deltas = [r["delta_m"] for r in rows if r["delta_m"] is not None]
if pcts:
avg_pct = sum(pcts) / len(pcts)
sorted_pcts = sorted(pcts)
median_pct = sorted_pcts[len(sorted_pcts) // 2]
within_10 = sum(1 for p in pcts if abs(p) <= 10)
within_15 = sum(1 for p in pcts if abs(p) <= 15)
avg_d = sum(deltas) / len(deltas) if deltas else 0
print(
f"\n n={n} avg={avg_pct:+.1f}% median={median_pct:+.1f}%"
f" avg delta={avg_d:+.0f} m"
f" within ±10%: {within_10}/{n} within ±15%: {within_15}/{n}"
)
print(f"\nCSV saved to: {args.out}")
if __name__ == "__main__":
main()