253 lines
7.5 KiB
Python
253 lines
7.5 KiB
Python
|
|
import os
|
|||
|
|
import re
|
|||
|
|
import traceback
|
|||
|
|
from datetime import date, datetime
|
|||
|
|
from decimal import Decimal
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
import pymysql
|
|||
|
|
from fastapi import FastAPI, HTTPException, Query
|
|||
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|||
|
|
from fastapi.responses import FileResponse
|
|||
|
|
from pymysql.cursors import DictCursor
|
|||
|
|
|
|||
|
|
MYSQL = {
|
|||
|
|
"host": os.environ.get("MYSQL_HOST", "db"),
|
|||
|
|
"port": int(os.environ.get("MYSQL_PORT", "3306")),
|
|||
|
|
"user": os.environ.get("MYSQL_USER", "root"),
|
|||
|
|
"password": os.environ.get("MYSQL_PASSWORD", ""),
|
|||
|
|
"database": os.environ.get("MYSQL_DATABASE", "j7508239_tracker"),
|
|||
|
|
"charset": "utf8mb4",
|
|||
|
|
"cursorclass": DictCursor,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
app = FastAPI(title="Mera user reader", version="1.0.0")
|
|||
|
|
app.add_middleware(
|
|||
|
|
CORSMiddleware,
|
|||
|
|
allow_origins=["*"],
|
|||
|
|
allow_methods=["*"],
|
|||
|
|
allow_headers=["*"],
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_conn():
|
|||
|
|
return pymysql.connect(**MYSQL)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _json_cell(v: Any) -> Any:
|
|||
|
|
if v is None:
|
|||
|
|
return None
|
|||
|
|
if isinstance(v, datetime):
|
|||
|
|
return v.isoformat(sep=" ", timespec="seconds")
|
|||
|
|
if isinstance(v, date):
|
|||
|
|
return v.isoformat()
|
|||
|
|
if isinstance(v, Decimal):
|
|||
|
|
return float(v)
|
|||
|
|
if isinstance(v, bytes):
|
|||
|
|
return v.decode("utf-8", errors="replace")
|
|||
|
|
return v
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _is_secret_col(name: str) -> bool:
|
|||
|
|
n = name.lower()
|
|||
|
|
return n.endswith("_pass") or "password" in n or "secret" in n
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _strip_sensitive(row: dict) -> dict:
|
|||
|
|
return {k: v for k, v in row.items() if not _is_secret_col(k)}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def json_rows(rows: list[dict]) -> list[dict]:
|
|||
|
|
return [
|
|||
|
|
{k: _json_cell(v) for k, v in _strip_sensitive(row).items()} for row in rows
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _db_name(cur) -> str:
|
|||
|
|
cur.execute("SELECT DATABASE() AS d")
|
|||
|
|
return cur.fetchone()["d"]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _list_tables(cur, db: str) -> list[str]:
|
|||
|
|
cur.execute(
|
|||
|
|
"""
|
|||
|
|
SELECT TABLE_NAME AS n FROM information_schema.TABLES
|
|||
|
|
WHERE TABLE_SCHEMA = %s ORDER BY TABLE_NAME
|
|||
|
|
""",
|
|||
|
|
(db,),
|
|||
|
|
)
|
|||
|
|
return [r["n"] for r in cur.fetchall()]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _quote_ident(name: str) -> str:
|
|||
|
|
return "`" + name.replace("`", "``") + "`"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def resolve_emp_table(cur, db: str) -> str:
|
|||
|
|
"""Имя таблицы сотрудников Merakomis (или из EMP_TABLE)."""
|
|||
|
|
env = os.environ.get("EMP_TABLE", "").strip().strip("`")
|
|||
|
|
if env:
|
|||
|
|
cur.execute(
|
|||
|
|
"""
|
|||
|
|
SELECT COUNT(*) AS c FROM information_schema.TABLES
|
|||
|
|
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
|
|||
|
|
""",
|
|||
|
|
(db, env),
|
|||
|
|
)
|
|||
|
|
if cur.fetchone()["c"]:
|
|||
|
|
return env
|
|||
|
|
raise RuntimeError(
|
|||
|
|
f"EMP_TABLE={env!r} не найдена в БД {db!r}. "
|
|||
|
|
f"Есть таблицы: {', '.join(_list_tables(cur, db)[:50])}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
cur.execute(
|
|||
|
|
"""
|
|||
|
|
SELECT TABLE_NAME AS n
|
|||
|
|
FROM information_schema.TABLES
|
|||
|
|
WHERE TABLE_SCHEMA = %s
|
|||
|
|
AND (
|
|||
|
|
LOWER(TABLE_NAME) = 'tmerakomisemp'
|
|||
|
|
OR (
|
|||
|
|
LOWER(TABLE_NAME) REGEXP 'merakomis.*emp'
|
|||
|
|
AND LOWER(TABLE_NAME) NOT LIKE '%%children%%'
|
|||
|
|
AND LOWER(TABLE_NAME) NOT LIKE '%%group%%'
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
ORDER BY
|
|||
|
|
CASE WHEN LOWER(TABLE_NAME) = 'tmerakomisemp' THEN 0 ELSE 1 END,
|
|||
|
|
CHAR_LENGTH(TABLE_NAME)
|
|||
|
|
LIMIT 1
|
|||
|
|
""",
|
|||
|
|
(db,),
|
|||
|
|
)
|
|||
|
|
row = cur.fetchone()
|
|||
|
|
if row:
|
|||
|
|
return row["n"]
|
|||
|
|
|
|||
|
|
all_t = _list_tables(cur, db)
|
|||
|
|
hint = ", ".join([t for t in all_t if re.search(r"emp|user|staff|profile", t, re.I)][:30])
|
|||
|
|
raise RuntimeError(
|
|||
|
|
"Таблица сотрудников Merakomis не найдена (ожидали что-то вроде tmerakomisemp). "
|
|||
|
|
"Импортируйте дамп с этой таблицей или задайте переменную окружения EMP_TABLE. "
|
|||
|
|
f"База: {db!r}, таблиц: {len(all_t)}. Похожие по имени: {hint or '—'}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _table_columns(cur, db: str, table: str) -> list[str]:
|
|||
|
|
cur.execute(
|
|||
|
|
"""
|
|||
|
|
SELECT COLUMN_NAME AS c
|
|||
|
|
FROM information_schema.COLUMNS
|
|||
|
|
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
|
|||
|
|
ORDER BY ORDINAL_POSITION
|
|||
|
|
""",
|
|||
|
|
(db, table),
|
|||
|
|
)
|
|||
|
|
return [r["c"] for r in cur.fetchall()]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _pick_order_column(cols: list[str]) -> str:
|
|||
|
|
for c in cols:
|
|||
|
|
if "name" in c.lower() and "emp" in c.lower():
|
|||
|
|
return c
|
|||
|
|
for c in cols:
|
|||
|
|
if c.lower().endswith("_name"):
|
|||
|
|
return c
|
|||
|
|
return cols[0]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _pick_removed_column(cols: list[str]) -> str | None:
|
|||
|
|
for c in cols:
|
|||
|
|
if c.lower().endswith("_removed"):
|
|||
|
|
return c
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
@app.get("/")
|
|||
|
|
def index():
|
|||
|
|
return FileResponse(
|
|||
|
|
os.path.join(os.path.dirname(__file__), "..", "static", "index.html")
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@app.get("/api/health")
|
|||
|
|
def health() -> dict[str, Any]:
|
|||
|
|
try:
|
|||
|
|
with get_conn() as c:
|
|||
|
|
with c.cursor() as cur:
|
|||
|
|
cur.execute("SELECT 1 AS ok")
|
|||
|
|
row = cur.fetchone()
|
|||
|
|
db = _db_name(cur)
|
|||
|
|
return {"ok": True, "db": bool(row and row.get("ok") == 1), "database": db}
|
|||
|
|
except pymysql.Error as e:
|
|||
|
|
return {"ok": False, "db": False, "error": str(e)}
|
|||
|
|
|
|||
|
|
|
|||
|
|
@app.get("/api/meta")
|
|||
|
|
def meta() -> dict[str, Any]:
|
|||
|
|
"""Какую таблицу нашли и какие колонки (без запуска полного списка людей)."""
|
|||
|
|
try:
|
|||
|
|
with get_conn() as c:
|
|||
|
|
with c.cursor() as cur:
|
|||
|
|
db = _db_name(cur)
|
|||
|
|
table = resolve_emp_table(cur, db)
|
|||
|
|
cols = _table_columns(cur, db, table)
|
|||
|
|
return {
|
|||
|
|
"database": db,
|
|||
|
|
"table": table,
|
|||
|
|
"columns": cols,
|
|||
|
|
"removed_column": _pick_removed_column(cols),
|
|||
|
|
"order_column": _pick_order_column(cols),
|
|||
|
|
}
|
|||
|
|
except pymysql.Error as e:
|
|||
|
|
raise HTTPException(status_code=500, detail=str(e)) from e_
|
|||
|
|
|
|||
|
|
|
|||
|
|
@app.get("/api/employees")
|
|||
|
|
def employees(
|
|||
|
|
limit: int = Query(100, ge=1, le=500),
|
|||
|
|
offset: int = Query(0, ge=0),
|
|||
|
|
) -> dict[str, Any]:
|
|||
|
|
try:
|
|||
|
|
with get_conn() as c:
|
|||
|
|
with c.cursor() as cur:
|
|||
|
|
db = _db_name(cur)
|
|||
|
|
table = resolve_emp_table(cur, db)
|
|||
|
|
cols = _table_columns(cur, db, table)
|
|||
|
|
if not cols:
|
|||
|
|
raise RuntimeError(f"Таблица {_quote_ident(table)} без колонок")
|
|||
|
|
|
|||
|
|
removed = _pick_removed_column(cols)
|
|||
|
|
order_col = _pick_order_column(cols)
|
|||
|
|
tq = _quote_ident(table)
|
|||
|
|
oq = _quote_ident(order_col)
|
|||
|
|
|
|||
|
|
where_sql = ""
|
|||
|
|
if removed:
|
|||
|
|
where_sql = f" WHERE {_quote_ident(removed)} = 0 "
|
|||
|
|
|
|||
|
|
sql = f"SELECT * FROM {tq} {where_sql} ORDER BY {oq} LIMIT %s OFFSET %s"
|
|||
|
|
cur.execute(sql, (limit, offset))
|
|||
|
|
rows = cur.fetchall()
|
|||
|
|
|
|||
|
|
cnt_sql = f"SELECT COUNT(*) AS n FROM {tq} {where_sql}"
|
|||
|
|
cur.execute(cnt_sql)
|
|||
|
|
total = cur.fetchone()["n"]
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"table": table,
|
|||
|
|
"total": int(total),
|
|||
|
|
"limit": limit,
|
|||
|
|
"offset": offset,
|
|||
|
|
"items": json_rows(rows),
|
|||
|
|
}
|
|||
|
|
except pymysql.Error as e:
|
|||
|
|
raise HTTPException(status_code=500, detail=str(e)) from e
|
|||
|
|
except Exception as e:
|
|||
|
|
dbg = os.environ.get("DEBUG", "")
|
|||
|
|
msg = str(e)
|
|||
|
|
if dbg == "1":
|
|||
|
|
msg = f"{msg}\n{traceback.format_exc()}"
|
|||
|
|
raise HTTPException(status_code=500, detail=msg) from e_
|