Files
2026-06-02 15:47:55 +02:00

355 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# /// script
# dependencies = ["matplotlib>=3.9", "pandas>=2.2"]
# ///
"""
Bincio usage statistics — parses nginx access logs and produces a
multi-panel matplotlib figure saved as a PNG.
Run locally: uv run scripts/usage_stats.py
On VPS cron: 0 3 * * 1 cd /opt/bincio && uv run scripts/usage_stats.py
Output: /var/bincio/stats/latest.png (served at /api/admin/stats)
"""
from __future__ import annotations
import argparse
import gzip
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import pandas as pd
# ── Config ────────────────────────────────────────────────────────────────────
LOG_DIR = Path("/var/log/nginx")
OUTPUT_DIR = Path("/var/bincio/stats")
OUTPUT = OUTPUT_DIR / "latest.png"
HISTORY = OUTPUT_DIR / "weekly_history.csv"
MAX_WEEKS = 26 # 6 months
# ── Log parsing ───────────────────────────────────────────────────────────────
_LOG_RE = re.compile(
r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) [^"]+" '
r'(?P<status>\d+) \S+ '
r'"(?P<referer>[^"]*)" "(?P<ua>[^"]*)"'
)
_TS_FMT = "%d/%b/%Y:%H:%M:%S %z"
# ── Bot filtering ─────────────────────────────────────────────────────────────
_BOT_UA_RE = re.compile(
r"bot|crawl|spider|scan|nmap|masscan|zgrab|python-requests|"
r"curl|wget|nikto|nuclei|go-http|censys|shodan|paloalto|expanse|"
r"dataforseo|semrush|ahrefs|mj12|dotbot|petalbot|fuzz|dirbuster",
re.I,
)
_BOT_PATH_RE = re.compile(
r"^/(wp-|phpmyadmin|xmlrpc|\.env|\.git|setup\.php|"
r"SDK/|actuator/|cgi-bin/|PROPFIND|\.well-known/acme)",
re.I,
)
def _is_bot(ua: str, path: str) -> bool:
if not ua or ua == "-":
return True
if _BOT_UA_RE.search(ua):
return True
if _BOT_PATH_RE.search(path):
return True
return False
# ── Feature mapping (from Referer header) ─────────────────────────────────────
# Evaluated in order: first match wins. None label = exclude.
# Tuple: (host, path_startswith_or_None, path_contains_or_None, label_or_None)
_FEATURE_MAP: list[tuple[str, str | None, str | None, str | None]] = [
("planner.bincio.org", None, None, "planner"),
("wiki.bincio.org", None, None, "wiki"),
("activity.bincio.org", "/admin/", None, None), # exclude
("activity.bincio.org", "/activity/", None, "activity"),
("activity.bincio.org", "/segments/", None, "segments"),
("activity.bincio.org", "/stats/", None, "stats"),
("activity.bincio.org", "/community/", None, "community"),
("activity.bincio.org", "/ideas/", None, "ideas"),
# explore lives at /u/{handle}/athlete/explore/ — check before generic /u/
("activity.bincio.org", "/u/", "athlete/explore", "explore"),
("activity.bincio.org", "/u/", None, "profile"),
("activity.bincio.org", None, None, "feed"),
("bincio.org", None, None, "hub"),
]
FEATURE_COLORS = {
"feed": "#60a5fa",
"activity": "#4ade80",
"segments": "#facc15",
"planner": "#f97316",
"wiki": "#a855f7",
"ideas": "#f43f5e",
"explore": "#34d399",
"community": "#22d3ee",
"profile": "#94a3b8",
"hub": "#64748b",
"stats": "#e879a0",
}
def _feature(referer: str) -> str | None:
if not referer or referer == "-":
return None
try:
p = urlparse(referer)
host = p.netloc.lower().lstrip("www.")
path = p.path
except Exception:
return None
for h, prefix, contains, label in _FEATURE_MAP:
if host == h:
if (prefix is None or path.startswith(prefix)) and \
(contains is None or contains in path):
return label
return None
# ── History management ────────────────────────────────────────────────────────
def load_history(history_file: Path) -> pd.DataFrame:
if history_file.exists():
try:
df = pd.read_csv(history_file, parse_dates=["week_start"])
return df.set_index("week_start")
except Exception as e:
print(f"Warning: could not load history: {e}", file=sys.stderr)
return pd.DataFrame()
return pd.DataFrame()
def save_history(history: pd.DataFrame, history_file: Path) -> None:
history_file.parent.mkdir(parents=True, exist_ok=True)
history.reset_index().to_csv(history_file, index=False)
# ── Loading ───────────────────────────────────────────────────────────────────
def load_logs(log_dir: Path) -> pd.DataFrame:
rows = []
files = sorted(log_dir.glob("access.log*"), reverse=True)
if not files:
print(f"No log files found in {log_dir}", file=sys.stderr)
sys.exit(1)
for f in files:
opener = gzip.open if f.suffix == ".gz" else open
try:
with opener(f, "rt", errors="replace") as fh:
for line in fh:
m = _LOG_RE.match(line)
if not m:
continue
ua = m.group("ua")
path = m.group("path")
if _is_bot(ua, path):
continue
try:
ts = datetime.strptime(m.group("time"), _TS_FMT)
except ValueError:
continue
rows.append({
"ts": ts,
"ip": m.group("ip"),
"method": m.group("method"),
"path": path,
"status": int(m.group("status")),
"referer": m.group("referer"),
})
except Exception as exc:
print(f"Warning: skipping {f.name}: {exc}", file=sys.stderr)
if not rows:
print("No usable log entries found after bot filtering.", file=sys.stderr)
sys.exit(1)
df = pd.DataFrame(rows)
df["ts"] = pd.to_datetime(df["ts"], utc=True)
df["hour"] = df["ts"].dt.hour
df["dow"] = df["ts"].dt.dayofweek # 0 = Monday
df["feature"] = df["referer"].map(_feature)
return df
# ── Figure ────────────────────────────────────────────────────────────────────
BG = "#09090b"
FG = "#e4e4e7"
GRID = "#27272a"
BLUE = "#60a5fa"
def _style_ax(ax: plt.Axes) -> None:
ax.set_facecolor(BG)
ax.tick_params(colors=FG, labelsize=9)
for spine in ax.spines.values():
spine.set_edgecolor(GRID)
def make_figure(df: pd.DataFrame, feat_weekly: pd.DataFrame, output: Path) -> None:
# ── daily logins ──────────────────────────────────────────────────────────
login_mask = (df["method"] == "POST") & (df["path"] == "/api/auth/login") & (df["status"] == 200)
weekly_logins = df[login_mask].set_index("ts").resample("W-MON").size()
# ── feature usage (weekly) ────────────────────────────────────────────────
feat_order = [f for f in FEATURE_COLORS if f in feat_weekly.columns]
feat_weekly = feat_weekly[feat_order]
# ── heatmap: hour × weekday (API requests, no admin) ─────────────────────
api_df = df[
df["path"].str.startswith("/api/") &
~df["path"].str.startswith("/api/admin")
]
heat = (
api_df.groupby(["dow", "hour"]).size()
.unstack(fill_value=0)
.reindex(index=range(7), columns=range(24), fill_value=0)
)
# ── layout ────────────────────────────────────────────────────────────────
plt.style.use("dark_background")
fig = plt.figure(figsize=(15, 10), facecolor=BG)
gs = fig.add_gridspec(2, 2, hspace=0.42, wspace=0.30,
left=0.07, right=0.97, top=0.92, bottom=0.08)
ax_log = fig.add_subplot(gs[0, 0])
ax_heat = fig.add_subplot(gs[0, 1])
ax_feat = fig.add_subplot(gs[1, :])
for ax in (ax_log, ax_heat, ax_feat):
_style_ax(ax)
# Panel 1 — weekly logins
n_w = len(weekly_logins)
week_x = np.arange(n_w)
ax_log.bar(week_x, weekly_logins.values, color=BLUE, alpha=0.70, width=0.6)
ax_log.set_xticks(week_x)
ax_log.set_xticklabels(
[str(w.date()) for w in weekly_logins.index],
rotation=30, ha="right", fontsize=8, color=FG,
)
ax_log.set_title("Weekly logins", color=FG, fontsize=11, pad=8)
ax_log.set_ylabel("count", color=FG, fontsize=9)
ax_log.yaxis.set_major_locator(ticker.MaxNLocator(integer=True, nbins=5))
ax_log.grid(axis="y", color=GRID, linewidth=0.5)
# Panel 2 — heatmap
days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
im = ax_heat.imshow(heat.values, aspect="auto", cmap="Blues",
interpolation="nearest", origin="upper")
ax_heat.set_xticks(range(0, 24, 3))
ax_heat.set_xticklabels([f"{h:02d}h" for h in range(0, 24, 3)], color=FG, fontsize=8)
ax_heat.set_yticks(range(7))
ax_heat.set_yticklabels(days, color=FG, fontsize=9)
ax_heat.set_title("API requests: hour × weekday (UTC)", color=FG, fontsize=11, pad=8)
cb = fig.colorbar(im, ax=ax_heat, fraction=0.046, pad=0.04)
cb.ax.tick_params(labelcolor=FG, labelsize=8)
# Panel 3 — feature usage stacked area
if not feat_weekly.empty:
n = len(feat_weekly)
x = np.arange(n)
bottom = np.zeros(n)
for feat in feat_order:
vals = feat_weekly[feat].values.astype(float)
ax_feat.fill_between(x, bottom, bottom + vals,
color=FEATURE_COLORS[feat], alpha=0.80, label=feat)
bottom += vals
week_labels = [str(w.date()) for w in feat_weekly.index]
step = max(1, n // 12)
ax_feat.set_xticks(x[::step])
ax_feat.set_xticklabels(week_labels[::step], rotation=30, ha="right",
fontsize=8, color=FG)
ax_feat.set_xlim(0, n - 1)
ax_feat.set_title("Feature usage per week (from Referer)", color=FG, fontsize=11, pad=8)
ax_feat.set_ylabel("API requests", color=FG, fontsize=9)
ax_feat.legend(loc="upper left", fontsize=8, framealpha=0.15,
facecolor=BG, edgecolor=GRID, ncol=len(feat_order))
ax_feat.grid(axis="y", color=GRID, linewidth=0.5)
else:
ax_feat.text(0.5, 0.5, "No feature data (no Referer headers yet)",
ha="center", va="center", color=FG, transform=ax_feat.transAxes)
total_logins = int(weekly_logins.sum())
span_days = (df["ts"].max() - df["ts"].min()).days + 1
fig.suptitle(
f"bincio — {total_logins} logins over {span_days} days "
f"· generated {datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}",
color=FG, fontsize=12, y=0.97,
)
output.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(output, dpi=150, facecolor=BG, bbox_inches="tight")
print(f"Saved → {output}")
plt.close(fig)
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Generate bincio usage stats figure.")
ap.add_argument("--log-dir", type=Path, default=LOG_DIR, metavar="DIR")
ap.add_argument("--output", type=Path, default=OUTPUT, metavar="FILE")
ap.add_argument("--history", type=Path, default=HISTORY, metavar="FILE")
args = ap.parse_args()
print("Loading logs…", file=sys.stderr)
df = load_logs(args.log_dir)
span = (df["ts"].max() - df["ts"].min()).days + 1
print(f" {len(df):,} non-bot requests over {span} days", file=sys.stderr)
feat_counts = df["feature"].value_counts(dropna=False)
print(" Feature breakdown:", file=sys.stderr)
for feat, count in feat_counts.items():
print(f" {str(feat):12s} {count:,}", file=sys.stderr)
# Load current week's feature usage
feat_df = df[df["feature"].notna()].copy()
current_weekly = (
feat_df.set_index("ts")
.groupby([pd.Grouper(freq="W-MON"), "feature"])
.size()
.unstack(fill_value=0)
)
# Load historical data and merge
history = load_history(args.history)
if not history.empty:
# Remove any weeks that overlap with current logs (in case of reruns)
if len(current_weekly) > 0:
latest_week_in_history = history.index.max()
earliest_week_in_current = current_weekly.index.min()
if latest_week_in_history >= earliest_week_in_current:
history = history[history.index < earliest_week_in_current]
# Concatenate and drop duplicates
all_weekly = pd.concat([history, current_weekly])
all_weekly = all_weekly[~all_weekly.index.duplicated(keep='first')]
else:
all_weekly = current_weekly
# Keep only last MAX_WEEKS weeks
if len(all_weekly) > MAX_WEEKS:
all_weekly = all_weekly.iloc[-MAX_WEEKS:]
# Save updated history (all data we kept)
save_history(all_weekly, args.history)
# Ensure all feature columns exist in all_weekly
feat_order = [f for f in FEATURE_COLORS if f in all_weekly.columns]
for feat in feat_order:
if feat not in all_weekly.columns:
all_weekly[feat] = 0
make_figure(df, all_weekly, args.output)
if __name__ == "__main__":
main()