import os import re import traceback from datetime import date, datetime from decimal import Decimal from typing import Any import pymysql from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from pymysql.cursors import DictCursor MYSQL = { "host": os.environ.get("MYSQL_HOST", "db"), "port": int(os.environ.get("MYSQL_PORT", "3306")), "user": os.environ.get("MYSQL_USER", "root"), "password": os.environ.get("MYSQL_PASSWORD", ""), "database": os.environ.get("MYSQL_DATABASE", "j7508239_tracker"), "charset": "utf8mb4", "cursorclass": DictCursor, } app = FastAPI(title="Mera user reader", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def get_conn(): return pymysql.connect(**MYSQL) def _json_cell(v: Any) -> Any: if v is None: return None if isinstance(v, datetime): return v.isoformat(sep=" ", timespec="seconds") if isinstance(v, date): return v.isoformat() if isinstance(v, Decimal): return float(v) if isinstance(v, bytes): return v.decode("utf-8", errors="replace") return v def _is_secret_col(name: str) -> bool: n = name.lower() return n.endswith("_pass") or "password" in n or "secret" in n def _strip_sensitive(row: dict) -> dict: return {k: v for k, v in row.items() if not _is_secret_col(k)} def json_rows(rows: list[dict]) -> list[dict]: return [ {k: _json_cell(v) for k, v in _strip_sensitive(row).items()} for row in rows ] def _db_name(cur) -> str: cur.execute("SELECT DATABASE() AS d") return cur.fetchone()["d"] def _list_tables(cur, db: str) -> list[str]: cur.execute( """ SELECT TABLE_NAME AS n FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s ORDER BY TABLE_NAME """, (db,), ) return [r["n"] for r in cur.fetchall()] def _quote_ident(name: str) -> str: return "`" + name.replace("`", "``") + "`" def resolve_emp_table(cur, db: str) -> str: """Имя таблицы сотрудников Merakomis (или из EMP_TABLE).""" env = os.environ.get("EMP_TABLE", "").strip().strip("`") if env: cur.execute( """ SELECT COUNT(*) AS c FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s """, (db, env), ) if cur.fetchone()["c"]: return env raise RuntimeError( f"EMP_TABLE={env!r} не найдена в БД {db!r}. " f"Есть таблицы: {', '.join(_list_tables(cur, db)[:50])}" ) cur.execute( """ SELECT TABLE_NAME AS n FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND ( LOWER(TABLE_NAME) = 'tmerakomisemp' OR ( LOWER(TABLE_NAME) REGEXP 'merakomis.*emp' AND LOWER(TABLE_NAME) NOT LIKE '%%children%%' AND LOWER(TABLE_NAME) NOT LIKE '%%group%%' ) ) ORDER BY CASE WHEN LOWER(TABLE_NAME) = 'tmerakomisemp' THEN 0 ELSE 1 END, CHAR_LENGTH(TABLE_NAME) LIMIT 1 """, (db,), ) row = cur.fetchone() if row: return row["n"] all_t = _list_tables(cur, db) hint = ", ".join([t for t in all_t if re.search(r"emp|user|staff|profile", t, re.I)][:30]) raise RuntimeError( "Таблица сотрудников Merakomis не найдена (ожидали что-то вроде tmerakomisemp). " "Импортируйте дамп с этой таблицей или задайте переменную окружения EMP_TABLE. " f"База: {db!r}, таблиц: {len(all_t)}. Похожие по имени: {hint or '—'}" ) def _table_columns(cur, db: str, table: str) -> list[str]: cur.execute( """ SELECT COLUMN_NAME AS c FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s ORDER BY ORDINAL_POSITION """, (db, table), ) return [r["c"] for r in cur.fetchall()] def _pick_order_column(cols: list[str]) -> str: for c in cols: if "name" in c.lower() and "emp" in c.lower(): return c for c in cols: if c.lower().endswith("_name"): return c return cols[0] def _pick_removed_column(cols: list[str]) -> str | None: for c in cols: if c.lower().endswith("_removed"): return c return None @app.get("/") def index(): return FileResponse( os.path.join(os.path.dirname(__file__), "..", "static", "index.html") ) @app.get("/api/health") def health() -> dict[str, Any]: try: with get_conn() as c: with c.cursor() as cur: cur.execute("SELECT 1 AS ok") row = cur.fetchone() db = _db_name(cur) return {"ok": True, "db": bool(row and row.get("ok") == 1), "database": db} except pymysql.Error as e: return {"ok": False, "db": False, "error": str(e)} @app.get("/api/meta") def meta() -> dict[str, Any]: """Какую таблицу нашли и какие колонки (без запуска полного списка людей).""" try: with get_conn() as c: with c.cursor() as cur: db = _db_name(cur) table = resolve_emp_table(cur, db) cols = _table_columns(cur, db, table) return { "database": db, "table": table, "columns": cols, "removed_column": _pick_removed_column(cols), "order_column": _pick_order_column(cols), } except pymysql.Error as e: raise HTTPException(status_code=500, detail=str(e)) from e_ @app.get("/api/employees") def employees( limit: int = Query(100, ge=1, le=500), offset: int = Query(0, ge=0), ) -> dict[str, Any]: try: with get_conn() as c: with c.cursor() as cur: db = _db_name(cur) table = resolve_emp_table(cur, db) cols = _table_columns(cur, db, table) if not cols: raise RuntimeError(f"Таблица {_quote_ident(table)} без колонок") removed = _pick_removed_column(cols) order_col = _pick_order_column(cols) tq = _quote_ident(table) oq = _quote_ident(order_col) where_sql = "" if removed: where_sql = f" WHERE {_quote_ident(removed)} = 0 " sql = f"SELECT * FROM {tq} {where_sql} ORDER BY {oq} LIMIT %s OFFSET %s" cur.execute(sql, (limit, offset)) rows = cur.fetchall() cnt_sql = f"SELECT COUNT(*) AS n FROM {tq} {where_sql}" cur.execute(cnt_sql) total = cur.fetchone()["n"] return { "table": table, "total": int(total), "limit": limit, "offset": offset, "items": json_rows(rows), } except pymysql.Error as e: raise HTTPException(status_code=500, detail=str(e)) from e except Exception as e: dbg = os.environ.get("DEBUG", "") msg = str(e) if dbg == "1": msg = f"{msg}\n{traceback.format_exc()}" raise HTTPException(status_code=500, detail=msg) from e_