340 lines
11 KiB
Python
340 lines
11 KiB
Python
"""Права на табель — порт Rules + getTimeTable can_edit."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from app.emp_schema import EMP_TABLE_CANONICAL
|
|
from app.labor import _column_lookup, _prefixed_col, _resolve_table
|
|
from app.main import _quote_ident, _table_columns, resolve_emp_table
|
|
from app.project_members_helpers import fetch_member_by_id, fetch_team_member_row
|
|
from app.project_members_permissions_logic import evaluate_is_rw_member
|
|
from app.merakomis_schema import (
|
|
DEPARTMENT_TABLE,
|
|
EMP_TYPE_ADMIN,
|
|
POST_TABLE,
|
|
PROJECT_TABLE,
|
|
ROLE_MAIN_ENGINEER,
|
|
TEAM_MEMBER_TABLE,
|
|
)
|
|
|
|
|
|
def _emp_type_col(lut: dict[str, str]) -> str | None:
|
|
return lut.get(f"{EMP_TABLE_CANONICAL.lower()}_type") or lut.get("type")
|
|
|
|
|
|
def is_admin(cur, db: str, emp_id: int) -> bool:
|
|
table = resolve_emp_table(cur, db)
|
|
cols = _table_columns(cur, db, table)
|
|
lut = _column_lookup(cols)
|
|
id_col = lut.get(f"{EMP_TABLE_CANONICAL.lower()}_id") or lut.get("id")
|
|
type_col = _emp_type_col(lut)
|
|
if not id_col or not type_col:
|
|
return False
|
|
cur.execute(
|
|
f"""
|
|
SELECT {_quote_ident(type_col)} AS t FROM {_quote_ident(table)}
|
|
WHERE {_quote_ident(id_col)} = %s LIMIT 1
|
|
""",
|
|
(emp_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return bool(row and int(row["t"]) == EMP_TYPE_ADMIN)
|
|
|
|
|
|
def _department_children_map(cur, db: str) -> dict[int, list[int]]:
|
|
table = _resolve_table(cur, db, DEPARTMENT_TABLE)
|
|
cols = _table_columns(cur, db, table)
|
|
lut = _column_lookup(cols)
|
|
id_col = _prefixed_col(lut, DEPARTMENT_TABLE, "id")
|
|
parent_col = _prefixed_col(lut, DEPARTMENT_TABLE, "parent")
|
|
if not id_col:
|
|
return {}
|
|
cur.execute(f"SELECT {_quote_ident(id_col)} AS id FROM {_quote_ident(table)}")
|
|
all_ids = [int(r["id"]) for r in cur.fetchall()]
|
|
children: dict[int, list[int]] = {i: [] for i in all_ids}
|
|
if parent_col:
|
|
cur.execute(
|
|
f"""
|
|
SELECT {_quote_ident(id_col)} AS id, {_quote_ident(parent_col)} AS parent
|
|
FROM {_quote_ident(table)}
|
|
"""
|
|
)
|
|
for row in cur.fetchall():
|
|
pid = int(row["parent"] or 0)
|
|
cid = int(row["id"])
|
|
if pid and pid in children:
|
|
children[pid].append(cid)
|
|
return children
|
|
|
|
|
|
def _collect_sub_department_ids(root_id: int, children: dict[int, list[int]]) -> set[int]:
|
|
out: set[int] = {root_id}
|
|
stack = list(children.get(root_id, []))
|
|
while stack:
|
|
dep = stack.pop()
|
|
if dep in out:
|
|
continue
|
|
out.add(dep)
|
|
stack.extend(children.get(dep, []))
|
|
return out
|
|
|
|
|
|
def get_sub_emp_ids(cur, db: str, acting_emp_id: int) -> set[int]:
|
|
"""Порт Emp::getSubEmps — директора отделов и сотрудники поддерева."""
|
|
post_t = _resolve_table(cur, db, POST_TABLE)
|
|
pcols = _table_columns(cur, db, post_t)
|
|
plut = _column_lookup(pcols)
|
|
p_emp = _prefixed_col(plut, POST_TABLE, "emp")
|
|
p_dep = _prefixed_col(plut, POST_TABLE, "department")
|
|
p_dir = _prefixed_col(plut, POST_TABLE, "director")
|
|
if not all([p_emp, p_dep, p_dir]):
|
|
return set()
|
|
|
|
cur.execute(
|
|
f"""
|
|
SELECT {_quote_ident(p_dep)} AS dep
|
|
FROM {_quote_ident(post_t)}
|
|
WHERE {_quote_ident(p_emp)} = %s AND {_quote_ident(p_dir)} = 1
|
|
""",
|
|
(acting_emp_id,),
|
|
)
|
|
director_deps = [int(r["dep"]) for r in cur.fetchall() if r.get("dep")]
|
|
if not director_deps:
|
|
return set()
|
|
|
|
children = _department_children_map(cur, db)
|
|
dept_ids: set[int] = set()
|
|
for dep in director_deps:
|
|
dept_ids |= _collect_sub_department_ids(dep, children)
|
|
if not dept_ids:
|
|
return set()
|
|
|
|
placeholders = ", ".join(["%s"] * len(dept_ids))
|
|
cur.execute(
|
|
f"""
|
|
SELECT DISTINCT {_quote_ident(p_emp)} AS emp
|
|
FROM {_quote_ident(post_t)}
|
|
WHERE {_quote_ident(p_dep)} IN ({placeholders})
|
|
""",
|
|
tuple(dept_ids),
|
|
)
|
|
return {int(r["emp"]) for r in cur.fetchall() if r.get("emp")}
|
|
|
|
|
|
def write_other_table_write_ids(cur, db: str, acting_emp_id: int) -> set[int]:
|
|
return get_sub_emp_ids(cur, db, acting_emp_id)
|
|
|
|
|
|
def is_write_other_table_write(cur, db: str, acting_emp_id: int, target_emp_id: int) -> bool:
|
|
if is_admin(cur, db, acting_emp_id):
|
|
return True
|
|
return target_emp_id in write_other_table_write_ids(cur, db, acting_emp_id)
|
|
|
|
|
|
def _team_member_row(cur, db: str, emp_id: int, project_id: int) -> dict | None:
|
|
proj_t = _resolve_table(cur, db, PROJECT_TABLE)
|
|
pcols = _table_columns(cur, db, proj_t)
|
|
plut = _column_lookup(pcols)
|
|
p_id = _prefixed_col(plut, PROJECT_TABLE, "id")
|
|
p_team = _prefixed_col(plut, PROJECT_TABLE, "team")
|
|
p_archive = _prefixed_col(plut, PROJECT_TABLE, "archive")
|
|
if not p_id or not p_team:
|
|
return None
|
|
|
|
mem_t = _resolve_table(cur, db, TEAM_MEMBER_TABLE)
|
|
mcols = _table_columns(cur, db, mem_t)
|
|
mlut = _column_lookup(mcols)
|
|
m_emp = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "emp")
|
|
m_team = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "team")
|
|
m_active = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "active")
|
|
if not m_emp or not m_team:
|
|
return None
|
|
|
|
archive_sql = (
|
|
f", p.{_quote_ident(p_archive)} AS archive" if p_archive else ", 0 AS archive"
|
|
)
|
|
active_sql = (
|
|
f", m.{_quote_ident(m_active)} AS active" if m_active else ", 1 AS active"
|
|
)
|
|
pq = _quote_ident(proj_t)
|
|
mq = _quote_ident(mem_t)
|
|
cur.execute(
|
|
f"""
|
|
SELECT m.{_quote_ident(m_emp)} AS emp {active_sql} {archive_sql}
|
|
FROM {pq} p
|
|
LEFT JOIN {mq} m
|
|
ON m.{_quote_ident(m_team)} = p.{_quote_ident(p_team)}
|
|
AND m.{_quote_ident(m_emp)} = %s
|
|
WHERE p.{_quote_ident(p_id)} = %s
|
|
LIMIT 1
|
|
""",
|
|
(emp_id, project_id),
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def can_read_time_calendar(
|
|
cur, db: str, acting_emp_id: int, target_emp_id: int, project_id: int | None = None
|
|
) -> bool:
|
|
if acting_emp_id == target_emp_id:
|
|
return True
|
|
if is_admin(cur, db, acting_emp_id):
|
|
return True
|
|
if is_write_other_table_write(cur, db, acting_emp_id, target_emp_id):
|
|
return True
|
|
return False
|
|
|
|
|
|
def can_write_time_entry(
|
|
cur, db: str, acting_emp_id: int, target_emp_id: int, project_id: int
|
|
) -> bool:
|
|
if not project_id:
|
|
return False
|
|
if is_write_other_table_write(cur, db, acting_emp_id, target_emp_id):
|
|
return True
|
|
if acting_emp_id != target_emp_id:
|
|
return False
|
|
row = _team_member_row(cur, db, target_emp_id, project_id)
|
|
if not row or not row.get("emp"):
|
|
return False
|
|
if int(row.get("archive") or 0):
|
|
return False
|
|
return bool(int(row.get("active") or 0))
|
|
|
|
|
|
def can_write_absence(cur, db: str, acting_emp_id: int, target_emp_id: int) -> bool:
|
|
return can_read_time_calendar(cur, db, acting_emp_id, target_emp_id)
|
|
|
|
|
|
def is_me_any_department_director(cur, db: str, acting_emp_id: int) -> bool:
|
|
"""Порт Emp::isMeAnyDepartmentDirector."""
|
|
post_t = _resolve_table(cur, db, POST_TABLE)
|
|
pcols = _table_columns(cur, db, post_t)
|
|
plut = _column_lookup(pcols)
|
|
p_emp = _prefixed_col(plut, POST_TABLE, "emp")
|
|
p_dir = _prefixed_col(plut, POST_TABLE, "director")
|
|
if not p_emp or not p_dir:
|
|
return False
|
|
cur.execute(
|
|
f"""
|
|
SELECT 1 AS ok FROM {_quote_ident(post_t)}
|
|
WHERE {_quote_ident(p_emp)} = %s AND {_quote_ident(p_dir)} = 1
|
|
LIMIT 1
|
|
""",
|
|
(acting_emp_id,),
|
|
)
|
|
return bool(cur.fetchone())
|
|
|
|
|
|
def get_role_in_project(cur, db: str, acting_emp_id: int, project_id: int) -> int:
|
|
"""Порт Rules::getRoleInProject — роль acting в команде проекта."""
|
|
proj_t = _resolve_table(cur, db, PROJECT_TABLE)
|
|
pcols = _table_columns(cur, db, proj_t)
|
|
plut = _column_lookup(pcols)
|
|
p_id = _prefixed_col(plut, PROJECT_TABLE, "id")
|
|
p_team = _prefixed_col(plut, PROJECT_TABLE, "team")
|
|
if not p_id or not p_team:
|
|
return 0
|
|
|
|
mem_t = _resolve_table(cur, db, TEAM_MEMBER_TABLE)
|
|
mcols = _table_columns(cur, db, mem_t)
|
|
mlut = _column_lookup(mcols)
|
|
m_emp = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "emp")
|
|
m_team = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "team")
|
|
m_role = _prefixed_col(mlut, TEAM_MEMBER_TABLE, "role")
|
|
if not m_emp or not m_team or not m_role:
|
|
return 0
|
|
|
|
cur.execute(
|
|
f"""
|
|
SELECT m.{_quote_ident(m_role)} AS role
|
|
FROM {_quote_ident(proj_t)} p
|
|
INNER JOIN {_quote_ident(mem_t)} m
|
|
ON m.{_quote_ident(m_team)} = p.{_quote_ident(p_team)}
|
|
AND m.{_quote_ident(m_emp)} = %s
|
|
WHERE p.{_quote_ident(p_id)} = %s
|
|
LIMIT 1
|
|
""",
|
|
(acting_emp_id, project_id),
|
|
)
|
|
row = cur.fetchone()
|
|
return int(row["role"]) if row and row.get("role") is not None else 0
|
|
|
|
|
|
def is_me_director(cur, db: str, acting_emp_id: int, project_id: int) -> bool:
|
|
"""Порт Rules::isMeDirector."""
|
|
proj_t = _resolve_table(cur, db, PROJECT_TABLE)
|
|
pcols = _table_columns(cur, db, proj_t)
|
|
plut = _column_lookup(pcols)
|
|
p_id = _prefixed_col(plut, PROJECT_TABLE, "id")
|
|
p_director = _prefixed_col(plut, PROJECT_TABLE, "director")
|
|
if not p_id or not p_director:
|
|
return False
|
|
cur.execute(
|
|
f"""
|
|
SELECT {_quote_ident(p_director)} AS director
|
|
FROM {_quote_ident(proj_t)}
|
|
WHERE {_quote_ident(p_id)} = %s LIMIT 1
|
|
""",
|
|
(project_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return bool(row and int(row.get("director") or 0) == acting_emp_id)
|
|
|
|
|
|
def can_write_project_member(
|
|
cur,
|
|
db: str,
|
|
acting_emp_id: int,
|
|
project_id: int,
|
|
*,
|
|
member_id: int | None = None,
|
|
target_emp_id: int | None = None,
|
|
) -> bool:
|
|
"""Порт Rules::isRwMember для project_id."""
|
|
if is_admin(cur, db, acting_emp_id):
|
|
return True
|
|
if is_me_director(cur, db, acting_emp_id, project_id):
|
|
return True
|
|
if get_role_in_project(cur, db, acting_emp_id, project_id) == ROLE_MAIN_ENGINEER:
|
|
return True
|
|
|
|
sub_ids = get_sub_emp_ids(cur, db, acting_emp_id)
|
|
is_edit = member_id is not None
|
|
if not is_edit:
|
|
project = fetch_project_row_for_permissions(cur, db, project_id)
|
|
if project and target_emp_id:
|
|
team_id = int(project.get("team") or 0)
|
|
if team_id and fetch_team_member_row(cur, db, team_id, target_emp_id):
|
|
is_edit = True
|
|
|
|
if is_edit:
|
|
edit_emp_id = target_emp_id
|
|
if member_id and not edit_emp_id:
|
|
member = fetch_member_by_id(cur, db, member_id)
|
|
if member:
|
|
edit_emp_id = int(member["emp"])
|
|
if edit_emp_id and edit_emp_id in sub_ids:
|
|
return True
|
|
return False
|
|
|
|
return is_me_any_department_director(cur, db, acting_emp_id)
|
|
|
|
|
|
def fetch_project_row_for_permissions(cur, db: str, project_id: int) -> dict | None:
|
|
proj_t = _resolve_table(cur, db, PROJECT_TABLE)
|
|
pcols = _table_columns(cur, db, proj_t)
|
|
plut = _column_lookup(pcols)
|
|
p_id = _prefixed_col(plut, PROJECT_TABLE, "id")
|
|
p_team = _prefixed_col(plut, PROJECT_TABLE, "team")
|
|
if not p_id:
|
|
return None
|
|
fields = [f"{_quote_ident(p_id)} AS id"]
|
|
if p_team:
|
|
fields.append(f"{_quote_ident(p_team)} AS team")
|
|
cur.execute(
|
|
f"SELECT {', '.join(fields)} FROM {_quote_ident(proj_t)}"
|
|
f" WHERE {_quote_ident(p_id)} = %s LIMIT 1",
|
|
(project_id,),
|
|
)
|
|
return cur.fetchone()
|