feat(auth): wiki/activity access flags, SESSION_DOMAIN, wiki nav link

This commit is contained in:
Davide Scaini
2026-05-01 21:56:02 +02:00
parent f6e9fe8198
commit 82288a35ea
5 changed files with 161 additions and 59 deletions
+70 -30
View File
@@ -23,11 +23,13 @@ import bcrypt
_SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
handle TEXT PRIMARY KEY,
display_name TEXT NOT NULL DEFAULT '',
password_hash TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL
handle TEXT PRIMARY KEY,
display_name TEXT NOT NULL DEFAULT '',
password_hash TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
wiki_access INTEGER NOT NULL DEFAULT 1,
activity_access INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
@@ -38,11 +40,12 @@ CREATE TABLE IF NOT EXISTS sessions (
);
CREATE TABLE IF NOT EXISTS invites (
code TEXT PRIMARY KEY,
created_by TEXT NOT NULL REFERENCES users(handle) ON DELETE CASCADE,
used_by TEXT REFERENCES users(handle) ON DELETE SET NULL,
created_at INTEGER NOT NULL,
used_at INTEGER
code TEXT PRIMARY KEY,
created_by TEXT NOT NULL REFERENCES users(handle) ON DELETE CASCADE,
used_by TEXT REFERENCES users(handle) ON DELETE SET NULL,
created_at INTEGER NOT NULL,
used_at INTEGER,
grants_activity INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS reset_codes (
@@ -81,19 +84,22 @@ _RESET_CODE_TTL_S = 24 * 3600 # 24 hours
@dataclass
class User:
handle: str
display_name: str
is_admin: bool
created_at: int
handle: str
display_name: str
is_admin: bool
wiki_access: bool
activity_access: bool
created_at: int
@dataclass
class Invite:
code: str
created_by: str
used_by: Optional[str]
created_at: int
used_at: Optional[int]
code: str
created_by: str
used_by: Optional[str]
created_at: int
used_at: Optional[int]
grants_activity: bool = False
@property
def used(self) -> bool:
@@ -121,16 +127,20 @@ def create_user(
display_name: str,
password: str,
is_admin: bool = False,
wiki_access: bool = True,
activity_access: bool = False,
) -> User:
password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
now = int(time.time())
db.execute(
"INSERT INTO users (handle, display_name, password_hash, is_admin, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(handle, display_name, password_hash, int(is_admin), now),
"INSERT INTO users (handle, display_name, password_hash, is_admin, "
"wiki_access, activity_access, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(handle, display_name, password_hash, int(is_admin),
int(wiki_access), int(activity_access), now),
)
db.commit()
return User(handle=handle, display_name=display_name, is_admin=is_admin, created_at=now)
return User(handle=handle, display_name=display_name, is_admin=is_admin,
wiki_access=wiki_access, activity_access=activity_access, created_at=now)
def get_user(db: sqlite3.Connection, handle: str) -> Optional[User]:
@@ -141,6 +151,8 @@ def get_user(db: sqlite3.Connection, handle: str) -> Optional[User]:
handle=row["handle"],
display_name=row["display_name"],
is_admin=bool(row["is_admin"]),
wiki_access=bool(row["wiki_access"]),
activity_access=bool(row["activity_access"]),
created_at=row["created_at"],
)
@@ -158,6 +170,8 @@ def authenticate(db: sqlite3.Connection, handle: str, password: str) -> Optional
handle=row["handle"],
display_name=row["display_name"],
is_admin=bool(row["is_admin"]),
wiki_access=bool(row["wiki_access"]),
activity_access=bool(row["activity_access"]),
created_at=row["created_at"],
)
@@ -172,7 +186,9 @@ def change_password(db: sqlite3.Connection, handle: str, new_password: str) -> N
def list_users(db: sqlite3.Connection) -> list[User]:
rows = db.execute("SELECT * FROM users ORDER BY created_at").fetchall()
return [User(handle=r["handle"], display_name=r["display_name"],
is_admin=bool(r["is_admin"]), created_at=r["created_at"]) for r in rows]
is_admin=bool(r["is_admin"]), wiki_access=bool(r["wiki_access"]),
activity_access=bool(r["activity_access"]),
created_at=r["created_at"]) for r in rows]
def delete_user(db: sqlite3.Connection, handle: str) -> None:
@@ -213,6 +229,16 @@ def count_users(db: sqlite3.Connection) -> int:
return row[0] if row else 0
def count_wiki_users(db: sqlite3.Connection) -> int:
row = db.execute("SELECT COUNT(*) FROM users WHERE wiki_access = 1").fetchone()
return row[0] if row else 0
def count_activity_users(db: sqlite3.Connection) -> int:
row = db.execute("SELECT COUNT(*) FROM users WHERE activity_access = 1").fetchone()
return row[0] if row else 0
# ── Settings ──────────────────────────────────────────────────────────────────
def get_setting(db: sqlite3.Connection, key: str) -> Optional[str]:
@@ -247,7 +273,8 @@ def create_session(db: sqlite3.Connection, handle: str) -> str:
def get_session(db: sqlite3.Connection, token: str) -> Optional[User]:
"""Return the User owning this session, or None if expired/invalid."""
row = db.execute(
"SELECT s.handle, s.expires_at, u.display_name, u.is_admin, u.created_at "
"SELECT s.handle, s.expires_at, u.display_name, u.is_admin, "
"u.wiki_access, u.activity_access, u.created_at "
"FROM sessions s JOIN users u ON s.handle = u.handle "
"WHERE s.token = ?",
(token,),
@@ -261,6 +288,8 @@ def get_session(db: sqlite3.Connection, token: str) -> Optional[User]:
handle=row["handle"],
display_name=row["display_name"],
is_admin=bool(row["is_admin"]),
wiki_access=bool(row["wiki_access"]),
activity_access=bool(row["activity_access"]),
created_at=row["created_at"],
)
@@ -281,10 +310,18 @@ def purge_expired_sessions(db: sqlite3.Connection) -> int:
_MAX_USER_INVITES = 3 # regular users; admins are unlimited
def create_invite(db: sqlite3.Connection, created_by: str) -> str:
"""Generate an invite code. Raises ValueError if the user has hit their limit."""
def create_invite(
db: sqlite3.Connection,
created_by: str,
grants_activity: bool = False,
) -> str:
"""Generate an invite code. Raises ValueError if limits are exceeded."""
user = get_user(db, created_by)
if user and not user.is_admin:
if not user:
raise ValueError("Unknown user")
if grants_activity and not user.activity_access and not user.is_admin:
raise ValueError("Cannot grant activity access you don't have")
if not user.is_admin:
count = db.execute(
"SELECT COUNT(*) FROM invites WHERE created_by = ?", (created_by,)
).fetchone()[0]
@@ -293,8 +330,9 @@ def create_invite(db: sqlite3.Connection, created_by: str) -> str:
code = secrets.token_urlsafe(_INVITE_LENGTH)[:_INVITE_LENGTH].upper()
db.execute(
"INSERT INTO invites (code, created_by, created_at) VALUES (?, ?, ?)",
(code, created_by, int(time.time())),
"INSERT INTO invites (code, created_by, created_at, grants_activity) "
"VALUES (?, ?, ?, ?)",
(code, created_by, int(time.time()), int(grants_activity)),
)
db.commit()
return code
@@ -327,6 +365,7 @@ def list_invites(db: sqlite3.Connection, handle: str) -> list[Invite]:
used_by=r["used_by"],
created_at=r["created_at"],
used_at=r["used_at"],
grants_activity=bool(r["grants_activity"]),
)
for r in rows
]
@@ -342,6 +381,7 @@ def get_invite(db: sqlite3.Connection, code: str) -> Optional[Invite]:
used_by=row["used_by"],
created_at=row["created_at"],
used_at=row["used_at"],
grants_activity=bool(row["grants_activity"]),
)