meraproject/services/user-reader/app/project_members_write.py

248 lines
7.7 KiB
Python
Raw Permalink Normal View History

"""Write API составов проектов."""
from __future__ import annotations
import os
import traceback
from typing import Annotated, Any
import pymysql
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from app.labor import _column_lookup, _prefixed_col, _resolve_table
from app.labor_identity import ensure_emp_exists, parse_acting_emp_id
from app.labor_permissions import can_write_project_member
from app.main import _db_name, _quote_ident, _table_columns, get_conn, require_api_key
from app.member_roles import is_valid_member_role
from app.merakomis_schema import TEAM_MEMBER_TABLE
from app.project_members_helpers import (
emp_is_active_for_team,
fetch_member_item,
fetch_project_row,
fetch_team_member_row,
project_section_ids,
)
router = APIRouter()
class ProjectMemberWrite(BaseModel):
project_id: int = Field(..., ge=1)
emp_id: int = Field(..., ge=1)
section_id: int = Field(..., ge=1)
role: int = Field(..., ge=1)
active: bool = True
text: str = ""
def _http_500(e: Exception) -> HTTPException:
dbg = os.environ.get("DEBUG", "")
msg = str(e)
if dbg == "1":
msg = f"{msg}\n{traceback.format_exc()}"
return HTTPException(status_code=500, detail=msg)
def _member_columns(cur, db: str) -> dict[str, str]:
table = _resolve_table(cur, db, TEAM_MEMBER_TABLE)
cols = _table_columns(cur, db, table)
lut = _column_lookup(cols)
mapping: dict[str, str] = {"table": table}
for f in ("id", "emp", "team", "section", "role", "active", "text", "portal"):
c = _prefixed_col(lut, TEAM_MEMBER_TABLE, f)
if c:
mapping[f] = c
return mapping
def validate_member_write(
cur,
db: str,
body: ProjectMemberWrite,
) -> tuple[dict[str, Any], list[int]]:
"""Валидация тела PUT. Возвращает project row и allowed section ids."""
if not is_valid_member_role(body.role):
raise HTTPException(
status_code=400,
detail={"code": "invalid_role", "message": "Укажите роль в проекте"},
)
project = fetch_project_row(cur, db, body.project_id)
if not project:
raise HTTPException(
status_code=404,
detail={"code": "project_not_found", "message": "Проект не найден"},
)
try:
ensure_emp_exists(cur, db, body.emp_id, kind="target")
except HTTPException as e:
if e.status_code == 404:
raise HTTPException(
status_code=404,
detail={"code": "emp_not_found", "message": f"Сотрудник {body.emp_id} не найден"},
) from e
raise
if not emp_is_active_for_team(cur, db, body.emp_id):
raise HTTPException(
status_code=400,
detail={
"code": "invalid_employee",
"message": "Нельзя добавить архивного или удалённого сотрудника",
},
)
allowed = project_section_ids(cur, db, body.project_id)
if not allowed:
raise HTTPException(
status_code=400,
detail={
"code": "no_project_sections",
"message": "У проекта не настроены разделы",
},
)
if body.section_id not in allowed:
raise HTTPException(
status_code=400,
detail={
"code": "invalid_section",
"message": "Раздел не входит в состав проекта",
},
)
team_id = int(project.get("team") or 0)
if not team_id:
raise HTTPException(
status_code=400,
detail={"code": "no_team", "message": "У проекта не задана команда"},
)
return project, allowed
def upsert_team_member(
cur,
db: str,
*,
team_id: int,
emp_id: int,
section_id: int,
role: int,
active: bool,
text: str,
) -> int:
"""INSERT или UPDATE; возвращает member id."""
cols = _member_columns(cur, db)
table = cols["table"]
tq = _quote_ident(table)
existing = fetch_team_member_row(cur, db, team_id, emp_id)
active_val = 1 if active else 0
if existing:
member_id = int(existing["id"])
sets = []
params: list[Any] = []
for field, value in (
("section", section_id),
("role", role),
("active", active_val),
("text", text),
):
if field in cols:
sets.append(f"{_quote_ident(cols[field])} = %s")
params.append(value)
if sets:
params.append(member_id)
cur.execute(
f"UPDATE {tq} SET {', '.join(sets)} WHERE {_quote_ident(cols['id'])} = %s",
params,
)
return member_id
insert_cols: list[str] = []
insert_vals: list[Any] = []
for field, value in (
("emp", emp_id),
("team", team_id),
("section", section_id),
("role", role),
("active", active_val),
("text", text),
("portal", 0),
):
if field in cols:
insert_cols.append(_quote_ident(cols[field]))
insert_vals.append(value)
placeholders = ", ".join(["%s"] * len(insert_vals))
cur.execute(
f"INSERT INTO {tq} ({', '.join(insert_cols)}) VALUES ({placeholders})",
insert_vals,
)
return int(cur.lastrowid)
@router.put("/api/project-members")
def put_project_member(
body: ProjectMemberWrite,
_auth: Annotated[None, Depends(require_api_key)],
acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)],
) -> dict[str, Any]:
try:
with get_conn() as c:
with c.cursor() as cur:
db = _db_name(cur)
ensure_emp_exists(cur, db, acting_emp_id, kind="acting")
project, _allowed = validate_member_write(cur, db, body)
team_id = int(project["team"])
existing = fetch_team_member_row(cur, db, team_id, body.emp_id)
member_id = int(existing["id"]) if existing else None
if not can_write_project_member(
cur,
db,
acting_emp_id,
body.project_id,
member_id=member_id,
target_emp_id=body.emp_id,
):
raise HTTPException(
status_code=403,
detail={
"code": "forbidden",
"message": "Нет прав на изменение состава проекта",
},
)
upsert_team_member(
cur,
db,
team_id=team_id,
emp_id=body.emp_id,
section_id=body.section_id,
role=body.role,
active=body.active,
text=body.text or "",
)
c.commit()
item = fetch_member_item(cur, db, body.project_id, body.emp_id)
if not item:
raise HTTPException(
status_code=500,
detail={"code": "db_error", "message": "Запись не найдена после сохранения"},
)
return {"ok": True, **item}
except HTTPException:
raise
except pymysql.Error as e:
raise HTTPException(status_code=500, detail=str(e)) from e
except Exception as e:
raise _http_500(e) from e