Files
bincio-activity/scripts/usage_stats.py
T
Davide Scaini cd80b8e32e usage_stats: fix datetime.utcnow() deprecation warning
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 21:27:29 +02:00

295 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# /// script
# dependencies = ["matplotlib>=3.9", "pandas>=2.2"]
# ///
"""
Bincio usage statistics — parses nginx access logs and produces a
multi-panel matplotlib figure saved as a PNG.
Run locally: uv run scripts/usage_stats.py
On VPS cron: 0 3 * * 1 cd /opt/bincio && uv run scripts/usage_stats.py
Output: /var/bincio/stats/latest.png (served at /api/admin/stats)
"""
from __future__ import annotations
import argparse
import gzip
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import pandas as pd
# ── Config ────────────────────────────────────────────────────────────────────
LOG_DIR = Path("/var/log/nginx")
OUTPUT_DIR = Path("/var/bincio/stats")
OUTPUT = OUTPUT_DIR / "latest.png"
# ── Log parsing ───────────────────────────────────────────────────────────────
_LOG_RE = re.compile(
r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) [^"]+" '
r'(?P<status>\d+) \S+ '
r'"(?P<referer>[^"]*)" "(?P<ua>[^"]*)"'
)
_TS_FMT = "%d/%b/%Y:%H:%M:%S %z"
# ── Bot filtering ─────────────────────────────────────────────────────────────
_BOT_UA_RE = re.compile(
r"bot|crawl|spider|scan|nmap|masscan|zgrab|python-requests|"
r"curl|wget|nikto|nuclei|go-http|censys|shodan|paloalto|expanse|"
r"dataforseo|semrush|ahrefs|mj12|dotbot|petalbot|fuzz|dirbuster",
re.I,
)
_BOT_PATH_RE = re.compile(
r"^/(wp-|phpmyadmin|xmlrpc|\.env|\.git|setup\.php|"
r"SDK/|actuator/|cgi-bin/|PROPFIND|\.well-known/acme)",
re.I,
)
def _is_bot(ua: str, path: str) -> bool:
if not ua or ua == "-":
return True
if _BOT_UA_RE.search(ua):
return True
if _BOT_PATH_RE.search(path):
return True
return False
# ── Feature mapping (from Referer header) ─────────────────────────────────────
# Evaluated in order: first match wins. None label = exclude.
_FEATURE_MAP: list[tuple[str, str | None, str | None]] = [
("planner.bincio.org", None, "planner"),
("wiki.bincio.org", None, "wiki"),
("activity.bincio.org", "/admin/", None), # exclude admin polling
("activity.bincio.org", "/activity/", "activity"),
("activity.bincio.org", "/segments/", "segments"),
("activity.bincio.org", "/stats/", "stats"),
("activity.bincio.org", "/explore/", "explore"),
("activity.bincio.org", "/ideas/", "ideas"),
("activity.bincio.org", "/u/", "profile"),
("activity.bincio.org", None, "feed"),
("bincio.org", None, "hub"),
]
FEATURE_COLORS = {
"feed": "#60a5fa",
"activity": "#4ade80",
"segments": "#facc15",
"planner": "#f97316",
"wiki": "#a855f7",
"ideas": "#f43f5e",
"explore": "#34d399",
"profile": "#94a3b8",
"hub": "#64748b",
"stats": "#e879a0",
}
def _feature(referer: str) -> str | None:
if not referer or referer == "-":
return None
try:
p = urlparse(referer)
host = p.netloc.lower().lstrip("www.")
path = p.path
except Exception:
return None
for h, prefix, label in _FEATURE_MAP:
if host == h:
if prefix is None or path.startswith(prefix):
return label
return None
# ── Loading ───────────────────────────────────────────────────────────────────
def load_logs(log_dir: Path) -> pd.DataFrame:
rows = []
files = sorted(log_dir.glob("access.log*"), reverse=True)
if not files:
print(f"No log files found in {log_dir}", file=sys.stderr)
sys.exit(1)
for f in files:
opener = gzip.open if f.suffix == ".gz" else open
try:
with opener(f, "rt", errors="replace") as fh:
for line in fh:
m = _LOG_RE.match(line)
if not m:
continue
ua = m.group("ua")
path = m.group("path")
if _is_bot(ua, path):
continue
try:
ts = datetime.strptime(m.group("time"), _TS_FMT)
except ValueError:
continue
rows.append({
"ts": ts,
"ip": m.group("ip"),
"method": m.group("method"),
"path": path,
"status": int(m.group("status")),
"referer": m.group("referer"),
})
except Exception as exc:
print(f"Warning: skipping {f.name}: {exc}", file=sys.stderr)
if not rows:
print("No usable log entries found after bot filtering.", file=sys.stderr)
sys.exit(1)
df = pd.DataFrame(rows)
df["ts"] = pd.to_datetime(df["ts"], utc=True)
df["hour"] = df["ts"].dt.hour
df["dow"] = df["ts"].dt.dayofweek # 0 = Monday
df["feature"] = df["referer"].map(_feature)
return df
# ── Figure ────────────────────────────────────────────────────────────────────
BG = "#09090b"
FG = "#e4e4e7"
GRID = "#27272a"
BLUE = "#60a5fa"
def _style_ax(ax: plt.Axes) -> None:
ax.set_facecolor(BG)
ax.tick_params(colors=FG, labelsize=9)
for spine in ax.spines.values():
spine.set_edgecolor(GRID)
def make_figure(df: pd.DataFrame, output: Path) -> None:
# ── daily logins ──────────────────────────────────────────────────────────
login_mask = (df["method"] == "POST") & (df["path"] == "/api/auth/login") & (df["status"] == 200)
full_range = pd.date_range(df["ts"].min(), df["ts"].max(), freq="D", tz="UTC")
daily_logins = df[login_mask].set_index("ts").resample("D").size().reindex(full_range, fill_value=0)
rolling7 = daily_logins.rolling(7, center=True, min_periods=1).mean()
# ── feature usage (weekly) ────────────────────────────────────────────────
feat_df = df[df["feature"].notna()].copy()
feat_weekly = (
feat_df.set_index("ts")
.groupby([pd.Grouper(freq="W-MON"), "feature"])
.size()
.unstack(fill_value=0)
)
feat_order = [f for f in FEATURE_COLORS if f in feat_weekly.columns]
feat_weekly = feat_weekly[feat_order]
# ── heatmap: hour × weekday (API requests, no admin) ─────────────────────
api_df = df[
df["path"].str.startswith("/api/") &
~df["path"].str.startswith("/api/admin")
]
heat = (
api_df.groupby(["dow", "hour"]).size()
.unstack(fill_value=0)
.reindex(index=range(7), columns=range(24), fill_value=0)
)
# ── layout ────────────────────────────────────────────────────────────────
plt.style.use("dark_background")
fig = plt.figure(figsize=(15, 10), facecolor=BG)
gs = fig.add_gridspec(2, 2, hspace=0.42, wspace=0.30,
left=0.07, right=0.97, top=0.92, bottom=0.08)
ax_log = fig.add_subplot(gs[0, 0])
ax_heat = fig.add_subplot(gs[0, 1])
ax_feat = fig.add_subplot(gs[1, :])
for ax in (ax_log, ax_heat, ax_feat):
_style_ax(ax)
# Panel 1 — daily logins + rolling mean
ax_log.bar(daily_logins.index, daily_logins.values,
color=BLUE, alpha=0.30, width=pd.Timedelta(hours=20))
ax_log.plot(daily_logins.index, rolling7.values,
color=BLUE, linewidth=2, label="7-day avg")
ax_log.set_title("Daily logins", color=FG, fontsize=11, pad=8)
ax_log.set_ylabel("count", color=FG, fontsize=9)
ax_log.yaxis.set_major_locator(ticker.MaxNLocator(integer=True, nbins=5))
ax_log.tick_params(axis="x", rotation=25)
ax_log.legend(fontsize=8, framealpha=0.15, facecolor=BG, edgecolor=GRID)
ax_log.grid(axis="y", color=GRID, linewidth=0.5)
# Panel 2 — heatmap
days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
im = ax_heat.imshow(heat.values, aspect="auto", cmap="Blues",
interpolation="nearest", origin="upper")
ax_heat.set_xticks(range(0, 24, 3))
ax_heat.set_xticklabels([f"{h:02d}h" for h in range(0, 24, 3)], color=FG, fontsize=8)
ax_heat.set_yticks(range(7))
ax_heat.set_yticklabels(days, color=FG, fontsize=9)
ax_heat.set_title("API requests: hour × weekday (UTC)", color=FG, fontsize=11, pad=8)
cb = fig.colorbar(im, ax=ax_heat, fraction=0.046, pad=0.04)
cb.ax.tick_params(labelcolor=FG, labelsize=8)
# Panel 3 — feature usage stacked area
if not feat_weekly.empty:
n = len(feat_weekly)
x = np.arange(n)
bottom = np.zeros(n)
for feat in feat_order:
vals = feat_weekly[feat].values.astype(float)
ax_feat.fill_between(x, bottom, bottom + vals,
color=FEATURE_COLORS[feat], alpha=0.80, label=feat)
bottom += vals
week_labels = [str(w.date()) for w in feat_weekly.index]
step = max(1, n // 12)
ax_feat.set_xticks(x[::step])
ax_feat.set_xticklabels(week_labels[::step], rotation=30, ha="right",
fontsize=8, color=FG)
ax_feat.set_xlim(0, n - 1)
ax_feat.set_title("Feature usage per week (from Referer)", color=FG, fontsize=11, pad=8)
ax_feat.set_ylabel("API requests", color=FG, fontsize=9)
ax_feat.legend(loc="upper left", fontsize=8, framealpha=0.15,
facecolor=BG, edgecolor=GRID, ncol=len(feat_order))
ax_feat.grid(axis="y", color=GRID, linewidth=0.5)
else:
ax_feat.text(0.5, 0.5, "No feature data (no Referer headers yet)",
ha="center", va="center", color=FG, transform=ax_feat.transAxes)
total_logins = int(login_mask.sum())
span_days = (df["ts"].max() - df["ts"].min()).days + 1
fig.suptitle(
f"bincio — {total_logins} logins over {span_days} days "
f"· generated {datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}",
color=FG, fontsize=12, y=0.97,
)
output.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(output, dpi=150, facecolor=BG, bbox_inches="tight")
print(f"Saved → {output}")
plt.close(fig)
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Generate bincio usage stats figure.")
ap.add_argument("--log-dir", type=Path, default=LOG_DIR, metavar="DIR")
ap.add_argument("--output", type=Path, default=OUTPUT, metavar="FILE")
args = ap.parse_args()
print("Loading logs…", file=sys.stderr)
df = load_logs(args.log_dir)
span = (df["ts"].max() - df["ts"].min()).days + 1
print(f" {len(df):,} non-bot requests over {span} days", file=sys.stderr)
make_figure(df, args.output)
if __name__ == "__main__":
main()