179 lines
5.2 KiB
Python
179 lines
5.2 KiB
Python
"""Должности сотрудника (staffing → tMerakomisDStaffing): Архитектор, ГИП, ГАП…"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
from app.emp_schema import EMP_TABLE_CANONICAL
|
|
from app.merakomis_schema import STAFFING_TABLE
|
|
|
|
|
|
def _quote_ident(name: str) -> str:
|
|
return "`" + name.replace("`", "``") + "`"
|
|
|
|
|
|
def _column_lookup(cols: list[str]) -> dict[str, str]:
|
|
return {c.lower(): c for c in cols}
|
|
|
|
|
|
def _table_columns(cur, db: str, table: str) -> list[str]:
|
|
cur.execute(
|
|
"""
|
|
SELECT COLUMN_NAME AS c
|
|
FROM information_schema.COLUMNS
|
|
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
|
|
ORDER BY ORDINAL_POSITION
|
|
""",
|
|
(db, table),
|
|
)
|
|
return [r["c"] for r in cur.fetchall()]
|
|
|
|
|
|
def _resolve_table(cur, db: str, canonical: str) -> str:
|
|
want = canonical.lower()
|
|
cur.execute(
|
|
"""
|
|
SELECT TABLE_NAME AS n
|
|
FROM information_schema.TABLES
|
|
WHERE TABLE_SCHEMA = %s AND LOWER(TABLE_NAME) = %s
|
|
LIMIT 1
|
|
""",
|
|
(db, want),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
return row["n"]
|
|
raise RuntimeError(f"Таблица {canonical!r} не найдена в БД {db!r}.")
|
|
|
|
|
|
def _prefixed_col(lut: dict[str, str], table_canonical: str, field: str) -> str | None:
|
|
key = f"{table_canonical.lower()}_{field.lower()}"
|
|
if key in lut:
|
|
return lut[key]
|
|
return lut.get(field.lower())
|
|
|
|
|
|
def load_staffing_dict(cur, db: str) -> dict[int, dict[str, str]]:
|
|
"""id → {name, text} из tMerakomisDStaffing."""
|
|
try:
|
|
table = _resolve_table(cur, db, STAFFING_TABLE)
|
|
except RuntimeError:
|
|
return {}
|
|
lut = _column_lookup(_table_columns(cur, db, table))
|
|
id_col = _prefixed_col(lut, STAFFING_TABLE, "id")
|
|
name_col = _prefixed_col(lut, STAFFING_TABLE, "name")
|
|
text_col = _prefixed_col(lut, STAFFING_TABLE, "text")
|
|
if not id_col or not name_col:
|
|
return {}
|
|
text_sql = (
|
|
f", {_quote_ident(text_col)} AS text" if text_col else ", '' AS text"
|
|
)
|
|
cur.execute(
|
|
f"""
|
|
SELECT {_quote_ident(id_col)} AS id, {_quote_ident(name_col)} AS name
|
|
{text_sql}
|
|
FROM {_quote_ident(table)}
|
|
"""
|
|
)
|
|
return {
|
|
int(r["id"]): {
|
|
"name": str(r.get("name") or ""),
|
|
"text": str(r.get("text") or ""),
|
|
}
|
|
for r in cur.fetchall()
|
|
}
|
|
|
|
|
|
def parse_staffing_ids(raw: Any) -> list[int]:
|
|
"""tMerakomisEmp_staffing — JSON-массив id должностей."""
|
|
if raw is None:
|
|
return []
|
|
if isinstance(raw, list):
|
|
ids = raw
|
|
else:
|
|
s = str(raw).strip()
|
|
if not s or s in ("[]", "null"):
|
|
return []
|
|
try:
|
|
parsed = json.loads(s)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
if not isinstance(parsed, list):
|
|
return []
|
|
ids = parsed
|
|
out: list[int] = []
|
|
for x in ids:
|
|
try:
|
|
v = int(x)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if v > 0:
|
|
out.append(v)
|
|
return out
|
|
|
|
|
|
def staffing_labels(
|
|
staffing_ids: list[int], staffing_dict: dict[int, dict[str, str]]
|
|
) -> tuple[list[dict[str, Any]], list[str], str]:
|
|
"""Собрать объекты, массив названий и строку для отображения."""
|
|
entries: list[dict[str, Any]] = []
|
|
names: list[str] = []
|
|
for sid in staffing_ids:
|
|
row = staffing_dict.get(sid, {})
|
|
name = row.get("name") or ""
|
|
entries.append(
|
|
{
|
|
"id": sid,
|
|
"name": name,
|
|
"text": row.get("text") or "",
|
|
}
|
|
)
|
|
if name:
|
|
names.append(name)
|
|
return entries, names, ", ".join(names)
|
|
|
|
|
|
def enrich_employee_staffing(
|
|
items: list[dict[str, Any]], staffing_dict: dict[int, dict[str, str]]
|
|
) -> list[dict[str, Any]]:
|
|
for item in items:
|
|
ids = parse_staffing_ids(item.get("staffing"))
|
|
entries, names, title = staffing_labels(ids, staffing_dict)
|
|
item["staffing_ids"] = ids
|
|
item["staffings"] = entries
|
|
item["staffing_names"] = names
|
|
item["staffing_title"] = title or None
|
|
return items
|
|
|
|
|
|
def load_emp_staffing_titles(cur, db: str) -> dict[int, str]:
|
|
"""emp_id → должности одной строкой (для сводок)."""
|
|
staffing_dict = load_staffing_dict(cur, db)
|
|
if not staffing_dict:
|
|
return {}
|
|
try:
|
|
emp_t = _resolve_table(cur, db, EMP_TABLE_CANONICAL)
|
|
except RuntimeError:
|
|
return {}
|
|
lut = _column_lookup(_table_columns(cur, db, emp_t))
|
|
prefix = EMP_TABLE_CANONICAL.lower()
|
|
id_col = lut.get(f"{prefix}_id") or lut.get("id")
|
|
staffing_col = lut.get(f"{prefix}_staffing") or lut.get("staffing")
|
|
if not id_col or not staffing_col:
|
|
return {}
|
|
cur.execute(
|
|
f"""
|
|
SELECT {_quote_ident(id_col)} AS id, {_quote_ident(staffing_col)} AS staffing
|
|
FROM {_quote_ident(emp_t)}
|
|
"""
|
|
)
|
|
out: dict[int, str] = {}
|
|
for row in cur.fetchall():
|
|
_, _, title = staffing_labels(
|
|
parse_staffing_ids(row.get("staffing")), staffing_dict
|
|
)
|
|
if title:
|
|
out[int(row["id"])] = title
|
|
return out
|