| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- import sqlite3
- from datetime import timedelta
- from typing import Optional
- from flask import abort, session
- from .core import isoformat, parse_datetime, utcnow
- from .db import execute, fetch_one
- def current_user() -> Optional[sqlite3.Row]:
- user_id = session.get("user_id")
- if not user_id:
- return None
- return fetch_one("SELECT * FROM users WHERE id = ?", (user_id,))
- def current_admin() -> Optional[sqlite3.Row]:
- admin_id = session.get("admin_user_id")
- if not admin_id:
- return None
- return fetch_one("SELECT * FROM admin_users WHERE id = ?", (admin_id,))
- def require_user() -> sqlite3.Row:
- user = current_user()
- if user is None:
- abort(401)
- if user["status"] != "ACTIVE":
- abort(403)
- return user
- def require_admin() -> sqlite3.Row:
- admin = current_admin()
- if admin is None:
- abort(401)
- if admin["status"] != "ACTIVE":
- abort(403)
- return admin
- def is_vip_active(user: sqlite3.Row) -> bool:
- vip_expire_at = parse_datetime(user["vip_expire_at"])
- return vip_expire_at is not None and vip_expire_at > utcnow()
- def extend_vip(user_id: int, duration_days: int) -> None:
- user = fetch_one("SELECT vip_expire_at FROM users WHERE id = ?", (user_id,))
- if user is None:
- abort(404)
- now = utcnow()
- current = parse_datetime(user["vip_expire_at"])
- start = current if (current is not None and current > now) else now
- new_expire = start + timedelta(days=duration_days)
- execute("UPDATE users SET vip_expire_at = ? WHERE id = ?", (isoformat(new_expire), user_id))
|