meraproject/services/user-reader/app/emp_departments.py
keboss-m 5c21d25d45 Initial commit: Merakomis portal, Docker stack and user-reader API.
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-24 11:04:05 +03:00

163 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Принадлежность сотрудника к отделу (Post + Department), коды АР, ГП, КР…"""
from __future__ import annotations
from datetime import date, datetime
from decimal import Decimal
from typing import Any
POST_TABLE = "tMerakomisPost"
DEPARTMENT_TABLE = "tMerakomisDDepartment"
DEPARTMENT_CODE_ALIASES: dict[str, str] = {
"ОВиК": "ОВ",
}
def _quote_ident(name: str) -> str:
return "`" + name.replace("`", "``") + "`"
def _column_lookup(cols: list[str]) -> dict[str, str]:
return {c.lower(): c for c in cols}
def _table_columns(cur, db: str, table: str) -> list[str]:
cur.execute(
"""
SELECT COLUMN_NAME AS c
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
""",
(db, table),
)
return [r["c"] for r in cur.fetchall()]
def _resolve_table(cur, db: str, canonical: str) -> str:
want = canonical.lower()
cur.execute(
"""
SELECT TABLE_NAME AS n
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s AND LOWER(TABLE_NAME) = %s
LIMIT 1
""",
(db, want),
)
row = cur.fetchone()
if row:
return row["n"]
raise RuntimeError(f"Таблица {canonical!r} не найдена в БД {db!r}.")
def _prefixed_col(lut: dict[str, str], table_canonical: str, field: str) -> str | None:
key = f"{table_canonical.lower()}_{field.lower()}"
if key in lut:
return lut[key]
return lut.get(field.lower())
def _json_cell(v: Any) -> Any:
if v is None:
return None
if isinstance(v, datetime):
return v.isoformat(sep=" ", timespec="seconds")
if isinstance(v, date):
return v.isoformat()
if isinstance(v, Decimal):
return float(v)
if isinstance(v, bytes):
return v.decode("utf-8", errors="replace")
return v
def normalize_department_code(short: str | None) -> str:
"""«о.АР» → «АР», «о.ОВиК» → «ОВ»."""
if not short or not str(short).strip():
return ""
s = str(short).strip()
if "." in s:
s = s.split(".", 1)[1].strip()
return DEPARTMENT_CODE_ALIASES.get(s, s)
def load_emp_departments(cur, db: str) -> dict[int, list[dict[str, Any]]]:
"""emp_id → отделы из tMerakomisPost (как Emp::getDepartmentList в портале)."""
try:
post_t = _resolve_table(cur, db, POST_TABLE)
dept_t = _resolve_table(cur, db, DEPARTMENT_TABLE)
except RuntimeError:
return {}
pl = _column_lookup(_table_columns(cur, db, post_t))
dl = _column_lookup(_table_columns(cur, db, dept_t))
p_emp = _prefixed_col(pl, POST_TABLE, "emp")
p_dep = _prefixed_col(pl, POST_TABLE, "department")
d_id = _prefixed_col(dl, DEPARTMENT_TABLE, "id")
d_name = _prefixed_col(dl, DEPARTMENT_TABLE, "name")
d_short = _prefixed_col(dl, DEPARTMENT_TABLE, "short")
if not all([p_emp, p_dep, d_id, d_name]):
return {}
short_sql = (
f", {_quote_ident(d_short)} AS short" if d_short else ", NULL AS short"
)
cur.execute(
f"""
SELECT
{_quote_ident(p_emp)} AS emp_id,
{_quote_ident(d_id)} AS department_id,
{_quote_ident(d_name)} AS name
{short_sql}
FROM {_quote_ident(post_t)} p
INNER JOIN {_quote_ident(dept_t)} d
ON {_quote_ident(d_id)} = {_quote_ident(p_dep)}
ORDER BY {_quote_ident(p_emp)}, {_quote_ident(d_name)}
"""
)
out: dict[int, list[dict[str, Any]]] = {}
for row in cur.fetchall():
emp_id = int(row["emp_id"])
short = row.get("short")
code = normalize_department_code(short)
out.setdefault(emp_id, []).append(
{
"department_id": int(row["department_id"]),
"name": _json_cell(row.get("name")),
"short": _json_cell(short),
"code": code or None,
}
)
return out
def enrich_employee_items(
items: list[dict[str, Any]], dept_map: dict[int, list[dict[str, Any]]]
) -> list[dict[str, Any]]:
for item in items:
emp_id = item.get("id")
try:
eid = int(emp_id) if emp_id is not None else None
except (TypeError, ValueError):
eid = None
deps = dept_map.get(eid, []) if eid is not None else []
codes: list[str] = []
seen: set[str] = set()
for d in deps:
c = d.get("code")
if c and c not in seen:
seen.add(c)
codes.append(c)
item["departments"] = deps
item["department_codes"] = codes
item["department"] = ", ".join(codes) if codes else None
return items