"""Права на табель — порт Rules + getTimeTable can_edit.""" from __future__ import annotations from app.emp_schema import EMP_TABLE_CANONICAL from app.labor import _column_lookup, _prefixed_col, _resolve_table from app.main import _quote_ident, _table_columns, resolve_emp_table from app.project_members_helpers import fetch_member_by_id, fetch_team_member_row from app.project_members_permissions_logic import evaluate_is_rw_member from app.merakomis_schema import ( DEPARTMENT_TABLE, EMP_TYPE_ADMIN, POST_TABLE, PROJECT_TABLE, ROLE_MAIN_ENGINEER, TEAM_MEMBER_TABLE, ) def _emp_type_col(lut: dict[str, str]) -> str | None: return lut.get(f"{EMP_TABLE_CANONICAL.lower()}_type") or lut.get("type") def is_admin(cur, db: str, emp_id: int) -> bool: table = resolve_emp_table(cur, db) cols = _table_columns(cur, db, table) lut = _column_lookup(cols) id_col = lut.get(f"{EMP_TABLE_CANONICAL.lower()}_id") or lut.get("id") type_col = _emp_type_col(lut) if not id_col or not type_col: return False cur.execute( f""" SELECT {_quote_ident(type_col)} AS t FROM {_quote_ident(table)} WHERE {_quote_ident(id_col)} = %s LIMIT 1 """, (emp_id,), ) row = cur.fetchone() return bool(row and int(row["t"]) == EMP_TYPE_ADMIN) def _department_children_map(cur, db: str) -> dict[int, list[int]]: table = _resolve_table(cur, db, DEPARTMENT_TABLE) cols = _table_columns(cur, db, table) lut = _column_lookup(cols) id_col = _prefixed_col(lut, DEPARTMENT_TABLE, "id") parent_col = _prefixed_col(lut, DEPARTMENT_TABLE, "parent") if not id_col: return {} cur.execute(f"SELECT {_quote_ident(id_col)} AS id FROM {_quote_ident(table)}") all_ids = [int(r["id"]) for r in cur.fetchall()] children: dict[int, list[int]] = {i: [] for i in all_ids} if parent_col: cur.execute( f""" SELECT {_quote_ident(id_col)} AS id, {_quote_ident(parent_col)} AS parent FROM {_quote_ident(table)} """ ) for row in cur.fetchall(): pid = int(row["parent"] or 0) cid = int(row["id"]) if pid and pid in children: children[pid].append(cid) return children def _collect_sub_department_ids(root_id: int, children: dict[int, list[int]]) -> set[int]: out: set[int] = {root_id} stack = list(children.get(root_id, [])) while stack: dep = stack.pop() if dep in out: continue out.add(dep) stack.extend(children.get(dep, [])) return out def get_sub_emp_ids(cur, db: str, acting_emp_id: int) -> set[int]: """Порт Emp::getSubEmps — директора отделов и сотрудники поддерева.""" post_t = _resolve_table(cur, db, POST_TABLE) pcols = _table_columns(cur, db, post_t) plut = _column_lookup(pcols) p_emp = _prefixed_col(plut, POST_TABLE, "emp") p_dep = _prefixed_col(plut, POST_TABLE, "department") p_dir = _prefixed_col(plut, POST_TABLE, "director") if not all([p_emp, p_dep, p_dir]): return set() cur.execute( f""" SELECT {_quote_ident(p_dep)} AS dep FROM {_quote_ident(post_t)} WHERE {_quote_ident(p_emp)} = %s AND {_quote_ident(p_dir)} = 1 """, (acting_emp_id,), ) director_deps = [int(r["dep"]) for r in cur.fetchall() if r.get("dep")] if not director_deps: return set() children = _department_children_map(cur, db) dept_ids: set[int] = set() for dep in director_deps: dept_ids |= _collect_sub_department_ids(dep, children) if not dept_ids: return set() placeholders = ", ".join(["%s"] * len(dept_ids)) cur.execute( f""" SELECT DISTINCT {_quote_ident(p_emp)} AS emp FROM {_quote_ident(post_t)} WHERE {_quote_ident(p_dep)} IN ({placeholders}) """, tuple(dept_ids), ) return {int(r["emp"]) for r in cur.fetchall() if r.get("emp")} def write_other_table_write_ids(cur, db: str, acting_emp_id: int) -> set[int]: return get_sub_emp_ids(cur, db, acting_emp_id) def is_write_other_table_write(cur, db: str, acting_emp_id: int, target_emp_id: int) -> bool: if is_admin(cur, db, acting_emp_id): return True return target_emp_id in write_other_table_write_ids(cur, db, acting_emp_id) def _team_member_row(cur, db: str, emp_id: int, project_id: int) -> dict | None: proj_t = _resolve_table(cur, db, PROJECT_TABLE) pcols = _table_columns(cur, db, proj_t) plut = _column_lookup(pcols) p_id = _prefixed_col(plut, PROJECT_TABLE, "id") p_team = _prefixed_col(plut, PROJECT_TABLE, "team") p_archive = _prefixed_col(plut, PROJECT_TABLE, "archive") if not p_id or not p_team: return None mem_t = _resolve_table(cur, db, TEAM_MEMBER_TABLE) mcols = _table_columns(cur, db, mem_t) mlut = _column_lookup(mcols) m_emp = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "emp") m_team = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "team") m_active = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "active") if not m_emp or not m_team: return None archive_sql = ( f", p.{_quote_ident(p_archive)} AS archive" if p_archive else ", 0 AS archive" ) active_sql = ( f", m.{_quote_ident(m_active)} AS active" if m_active else ", 1 AS active" ) pq = _quote_ident(proj_t) mq = _quote_ident(mem_t) cur.execute( f""" SELECT m.{_quote_ident(m_emp)} AS emp {active_sql} {archive_sql} FROM {pq} p LEFT JOIN {mq} m ON m.{_quote_ident(m_team)} = p.{_quote_ident(p_team)} AND m.{_quote_ident(m_emp)} = %s WHERE p.{_quote_ident(p_id)} = %s LIMIT 1 """, (emp_id, project_id), ) return cur.fetchone() def can_read_time_calendar( cur, db: str, acting_emp_id: int, target_emp_id: int, project_id: int | None = None ) -> bool: if acting_emp_id == target_emp_id: return True if is_admin(cur, db, acting_emp_id): return True if is_write_other_table_write(cur, db, acting_emp_id, target_emp_id): return True return False def can_write_time_entry( cur, db: str, acting_emp_id: int, target_emp_id: int, project_id: int ) -> bool: if not project_id: return False if is_write_other_table_write(cur, db, acting_emp_id, target_emp_id): return True if acting_emp_id != target_emp_id: return False row = _team_member_row(cur, db, target_emp_id, project_id) if not row or not row.get("emp"): return False if int(row.get("archive") or 0): return False return bool(int(row.get("active") or 0)) def can_write_absence(cur, db: str, acting_emp_id: int, target_emp_id: int) -> bool: return can_read_time_calendar(cur, db, acting_emp_id, target_emp_id) def is_me_any_department_director(cur, db: str, acting_emp_id: int) -> bool: """Порт Emp::isMeAnyDepartmentDirector.""" post_t = _resolve_table(cur, db, POST_TABLE) pcols = _table_columns(cur, db, post_t) plut = _column_lookup(pcols) p_emp = _prefixed_col(plut, POST_TABLE, "emp") p_dir = _prefixed_col(plut, POST_TABLE, "director") if not p_emp or not p_dir: return False cur.execute( f""" SELECT 1 AS ok FROM {_quote_ident(post_t)} WHERE {_quote_ident(p_emp)} = %s AND {_quote_ident(p_dir)} = 1 LIMIT 1 """, (acting_emp_id,), ) return bool(cur.fetchone()) def get_role_in_project(cur, db: str, acting_emp_id: int, project_id: int) -> int: """Порт Rules::getRoleInProject — роль acting в команде проекта.""" proj_t = _resolve_table(cur, db, PROJECT_TABLE) pcols = _table_columns(cur, db, proj_t) plut = _column_lookup(pcols) p_id = _prefixed_col(plut, PROJECT_TABLE, "id") p_team = _prefixed_col(plut, PROJECT_TABLE, "team") if not p_id or not p_team: return 0 mem_t = _resolve_table(cur, db, TEAM_MEMBER_TABLE) mcols = _table_columns(cur, db, mem_t) mlut = _column_lookup(mcols) m_emp = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "emp") m_team = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "team") m_role = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "role") if not m_emp or not m_team or not m_role: return 0 cur.execute( f""" SELECT m.{_quote_ident(m_role)} AS role FROM {_quote_ident(proj_t)} p INNER JOIN {_quote_ident(mem_t)} m ON m.{_quote_ident(m_team)} = p.{_quote_ident(p_team)} AND m.{_quote_ident(m_emp)} = %s WHERE p.{_quote_ident(p_id)} = %s LIMIT 1 """, (acting_emp_id, project_id), ) row = cur.fetchone() return int(row["role"]) if row and row.get("role") is not None else 0 def is_me_director(cur, db: str, acting_emp_id: int, project_id: int) -> bool: """Порт Rules::isMeDirector.""" proj_t = _resolve_table(cur, db, PROJECT_TABLE) pcols = _table_columns(cur, db, proj_t) plut = _column_lookup(pcols) p_id = _prefixed_col(plut, PROJECT_TABLE, "id") p_director = _prefixed_col(plut, PROJECT_TABLE, "director") if not p_id or not p_director: return False cur.execute( f""" SELECT {_quote_ident(p_director)} AS director FROM {_quote_ident(proj_t)} WHERE {_quote_ident(p_id)} = %s LIMIT 1 """, (project_id,), ) row = cur.fetchone() return bool(row and int(row.get("director") or 0) == acting_emp_id) def can_write_project_member( cur, db: str, acting_emp_id: int, project_id: int, *, member_id: int | None = None, target_emp_id: int | None = None, ) -> bool: """Порт Rules::isRwMember для project_id.""" if is_admin(cur, db, acting_emp_id): return True if is_me_director(cur, db, acting_emp_id, project_id): return True if get_role_in_project(cur, db, acting_emp_id, project_id) == ROLE_MAIN_ENGINEER: return True sub_ids = get_sub_emp_ids(cur, db, acting_emp_id) is_edit = member_id is not None if not is_edit: project = fetch_project_row_for_permissions(cur, db, project_id) if project and target_emp_id: team_id = int(project.get("team") or 0) if team_id and fetch_team_member_row(cur, db, team_id, target_emp_id): is_edit = True if is_edit: edit_emp_id = target_emp_id if member_id and not edit_emp_id: member = fetch_member_by_id(cur, db, member_id) if member: edit_emp_id = int(member["emp"]) if edit_emp_id and edit_emp_id in sub_ids: return True return False return is_me_any_department_director(cur, db, acting_emp_id) def fetch_project_row_for_permissions(cur, db: str, project_id: int) -> dict | None: proj_t = _resolve_table(cur, db, PROJECT_TABLE) pcols = _table_columns(cur, db, proj_t) plut = _column_lookup(pcols) p_id = _prefixed_col(plut, PROJECT_TABLE, "id") p_team = _prefixed_col(plut, PROJECT_TABLE, "team") if not p_id: return None fields = [f"{_quote_ident(p_id)} AS id"] if p_team: fields.append(f"{_quote_ident(p_team)} AS team") cur.execute( f"SELECT {', '.join(fields)} FROM {_quote_ident(proj_t)}" f" WHERE {_quote_ident(p_id)} = %s LIMIT 1", (project_id,), ) return cur.fetchone()