253 lines
7.5 KiB
Python
253 lines
7.5 KiB
Python
import os
|
||
import re
|
||
import traceback
|
||
from datetime import date, datetime
|
||
from decimal import Decimal
|
||
from typing import Any
|
||
|
||
import pymysql
|
||
from fastapi import FastAPI, HTTPException, Query
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import FileResponse
|
||
from pymysql.cursors import DictCursor
|
||
|
||
MYSQL = {
|
||
"host": os.environ.get("MYSQL_HOST", "db"),
|
||
"port": int(os.environ.get("MYSQL_PORT", "3306")),
|
||
"user": os.environ.get("MYSQL_USER", "root"),
|
||
"password": os.environ.get("MYSQL_PASSWORD", ""),
|
||
"database": os.environ.get("MYSQL_DATABASE", "j7508239_tracker"),
|
||
"charset": "utf8mb4",
|
||
"cursorclass": DictCursor,
|
||
}
|
||
|
||
app = FastAPI(title="Mera user reader", version="1.0.0")
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
def get_conn():
|
||
return pymysql.connect(**MYSQL)
|
||
|
||
|
||
def _json_cell(v: Any) -> Any:
|
||
if v is None:
|
||
return None
|
||
if isinstance(v, datetime):
|
||
return v.isoformat(sep=" ", timespec="seconds")
|
||
if isinstance(v, date):
|
||
return v.isoformat()
|
||
if isinstance(v, Decimal):
|
||
return float(v)
|
||
if isinstance(v, bytes):
|
||
return v.decode("utf-8", errors="replace")
|
||
return v
|
||
|
||
|
||
def _is_secret_col(name: str) -> bool:
|
||
n = name.lower()
|
||
return n.endswith("_pass") or "password" in n or "secret" in n
|
||
|
||
|
||
def _strip_sensitive(row: dict) -> dict:
|
||
return {k: v for k, v in row.items() if not _is_secret_col(k)}
|
||
|
||
|
||
def json_rows(rows: list[dict]) -> list[dict]:
|
||
return [
|
||
{k: _json_cell(v) for k, v in _strip_sensitive(row).items()} for row in rows
|
||
]
|
||
|
||
|
||
def _db_name(cur) -> str:
|
||
cur.execute("SELECT DATABASE() AS d")
|
||
return cur.fetchone()["d"]
|
||
|
||
|
||
def _list_tables(cur, db: str) -> list[str]:
|
||
cur.execute(
|
||
"""
|
||
SELECT TABLE_NAME AS n FROM information_schema.TABLES
|
||
WHERE TABLE_SCHEMA = %s ORDER BY TABLE_NAME
|
||
""",
|
||
(db,),
|
||
)
|
||
return [r["n"] for r in cur.fetchall()]
|
||
|
||
|
||
def _quote_ident(name: str) -> str:
|
||
return "`" + name.replace("`", "``") + "`"
|
||
|
||
|
||
def resolve_emp_table(cur, db: str) -> str:
|
||
"""Имя таблицы сотрудников Merakomis (или из EMP_TABLE)."""
|
||
env = os.environ.get("EMP_TABLE", "").strip().strip("`")
|
||
if env:
|
||
cur.execute(
|
||
"""
|
||
SELECT COUNT(*) AS c FROM information_schema.TABLES
|
||
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
|
||
""",
|
||
(db, env),
|
||
)
|
||
if cur.fetchone()["c"]:
|
||
return env
|
||
raise RuntimeError(
|
||
f"EMP_TABLE={env!r} не найдена в БД {db!r}. "
|
||
f"Есть таблицы: {', '.join(_list_tables(cur, db)[:50])}"
|
||
)
|
||
|
||
cur.execute(
|
||
"""
|
||
SELECT TABLE_NAME AS n
|
||
FROM information_schema.TABLES
|
||
WHERE TABLE_SCHEMA = %s
|
||
AND (
|
||
LOWER(TABLE_NAME) = 'tmerakomisemp'
|
||
OR (
|
||
LOWER(TABLE_NAME) REGEXP 'merakomis.*emp'
|
||
AND LOWER(TABLE_NAME) NOT LIKE '%%children%%'
|
||
AND LOWER(TABLE_NAME) NOT LIKE '%%group%%'
|
||
)
|
||
)
|
||
ORDER BY
|
||
CASE WHEN LOWER(TABLE_NAME) = 'tmerakomisemp' THEN 0 ELSE 1 END,
|
||
CHAR_LENGTH(TABLE_NAME)
|
||
LIMIT 1
|
||
""",
|
||
(db,),
|
||
)
|
||
row = cur.fetchone()
|
||
if row:
|
||
return row["n"]
|
||
|
||
all_t = _list_tables(cur, db)
|
||
hint = ", ".join([t for t in all_t if re.search(r"emp|user|staff|profile", t, re.I)][:30])
|
||
raise RuntimeError(
|
||
"Таблица сотрудников Merakomis не найдена (ожидали что-то вроде tmerakomisemp). "
|
||
"Импортируйте дамп с этой таблицей или задайте переменную окружения EMP_TABLE. "
|
||
f"База: {db!r}, таблиц: {len(all_t)}. Похожие по имени: {hint or '—'}"
|
||
)
|
||
|
||
|
||
def _table_columns(cur, db: str, table: str) -> list[str]:
|
||
cur.execute(
|
||
"""
|
||
SELECT COLUMN_NAME AS c
|
||
FROM information_schema.COLUMNS
|
||
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
|
||
ORDER BY ORDINAL_POSITION
|
||
""",
|
||
(db, table),
|
||
)
|
||
return [r["c"] for r in cur.fetchall()]
|
||
|
||
|
||
def _pick_order_column(cols: list[str]) -> str:
|
||
for c in cols:
|
||
if "name" in c.lower() and "emp" in c.lower():
|
||
return c
|
||
for c in cols:
|
||
if c.lower().endswith("_name"):
|
||
return c
|
||
return cols[0]
|
||
|
||
|
||
def _pick_removed_column(cols: list[str]) -> str | None:
|
||
for c in cols:
|
||
if c.lower().endswith("_removed"):
|
||
return c
|
||
return None
|
||
|
||
|
||
@app.get("/")
|
||
def index():
|
||
return FileResponse(
|
||
os.path.join(os.path.dirname(__file__), "..", "static", "index.html")
|
||
)
|
||
|
||
|
||
@app.get("/api/health")
|
||
def health() -> dict[str, Any]:
|
||
try:
|
||
with get_conn() as c:
|
||
with c.cursor() as cur:
|
||
cur.execute("SELECT 1 AS ok")
|
||
row = cur.fetchone()
|
||
db = _db_name(cur)
|
||
return {"ok": True, "db": bool(row and row.get("ok") == 1), "database": db}
|
||
except pymysql.Error as e:
|
||
return {"ok": False, "db": False, "error": str(e)}
|
||
|
||
|
||
@app.get("/api/meta")
|
||
def meta() -> dict[str, Any]:
|
||
"""Какую таблицу нашли и какие колонки (без запуска полного списка людей)."""
|
||
try:
|
||
with get_conn() as c:
|
||
with c.cursor() as cur:
|
||
db = _db_name(cur)
|
||
table = resolve_emp_table(cur, db)
|
||
cols = _table_columns(cur, db, table)
|
||
return {
|
||
"database": db,
|
||
"table": table,
|
||
"columns": cols,
|
||
"removed_column": _pick_removed_column(cols),
|
||
"order_column": _pick_order_column(cols),
|
||
}
|
||
except pymysql.Error as e:
|
||
raise HTTPException(status_code=500, detail=str(e)) from e_
|
||
|
||
|
||
@app.get("/api/employees")
|
||
def employees(
|
||
limit: int = Query(100, ge=1, le=500),
|
||
offset: int = Query(0, ge=0),
|
||
) -> dict[str, Any]:
|
||
try:
|
||
with get_conn() as c:
|
||
with c.cursor() as cur:
|
||
db = _db_name(cur)
|
||
table = resolve_emp_table(cur, db)
|
||
cols = _table_columns(cur, db, table)
|
||
if not cols:
|
||
raise RuntimeError(f"Таблица {_quote_ident(table)} без колонок")
|
||
|
||
removed = _pick_removed_column(cols)
|
||
order_col = _pick_order_column(cols)
|
||
tq = _quote_ident(table)
|
||
oq = _quote_ident(order_col)
|
||
|
||
where_sql = ""
|
||
if removed:
|
||
where_sql = f" WHERE {_quote_ident(removed)} = 0 "
|
||
|
||
sql = f"SELECT * FROM {tq} {where_sql} ORDER BY {oq} LIMIT %s OFFSET %s"
|
||
cur.execute(sql, (limit, offset))
|
||
rows = cur.fetchall()
|
||
|
||
cnt_sql = f"SELECT COUNT(*) AS n FROM {tq} {where_sql}"
|
||
cur.execute(cnt_sql)
|
||
total = cur.fetchone()["n"]
|
||
|
||
return {
|
||
"table": table,
|
||
"total": int(total),
|
||
"limit": limit,
|
||
"offset": offset,
|
||
"items": json_rows(rows),
|
||
}
|
||
except pymysql.Error as e:
|
||
raise HTTPException(status_code=500, detail=str(e)) from e
|
||
except Exception as e:
|
||
dbg = os.environ.get("DEBUG", "")
|
||
msg = str(e)
|
||
if dbg == "1":
|
||
msg = f"{msg}\n{traceback.format_exc()}"
|
||
raise HTTPException(status_code=500, detail=msg) from e_
|