import sys,os import uuid current_path = os.getcwd() sys.path.append(current_path) from config.site import SiteConfig from fastapi import APIRouter, Depends, Query from db.database import get_db from sqlalchemy.orm import Session from agent.models.web.response import StandardResponse,FAILED,SUCCESS from agent.models.web.request import BasicRequest from agent.libs.user import UserBusiness,SessionBusiness, UserRoleBusiness, RoleBusiness, PermissionBusiness from agent.libs.sys import SysUserRoleOrganBusiness import logging from pydantic import BaseModel from typing import Optional router = APIRouter(prefix="/user", tags=["agent job interface"]) logger = logging.getLogger(__name__) config = SiteConfig() @router.post("/session", response_model=StandardResponse) def register(request: BasicRequest, db: Session = Depends(get_db)): if request.action == 'register': biz = UserBusiness(db) request_username = request.get_param("username", "") request_password = request.get_param("password", "") user = biz.get_user_by_username(request_username) if user is not None: return StandardResponse(code=FAILED, message="user already exists") user = biz.create_user(request_username, request_password) if user is None: return StandardResponse(code=FAILED, message="create user failed") return StandardResponse(code=SUCCESS, message="create user success") elif request.action == 'modifyPWD': biz = UserBusiness(db) request_password = request.get_param("password", "") request_new_password = request.get_param("new_password", "") session_id = request.get_param("session_id", "") session = SessionBusiness(db) user_id = session.get_session(session_id).user_id user = biz.get_user(user_id) if user is None: return StandardResponse(code=FAILED, message="modify passward failed") if not biz.verify_password(request_password, user.hashed_password): return StandardResponse(code=FAILED, message="password error") biz.update_user(user_id, password=request_new_password) session.delete_session(session_id) return StandardResponse(code=SUCCESS, message="modify passward success") elif request.action =='login': request_username = request.get_param("username", "") request_password = request.get_param("password", "") logger.info(f"login: {request_username} {request_password}") biz = UserBusiness(db) user = biz.get_user_by_username(request_username) if user is None: return StandardResponse(code=FAILED, message="user not exists") if not biz.verify_password(request_password, user.hashed_password): return StandardResponse(code=FAILED, message="password error") session = SessionBusiness(db) old_session = session.get_session_by_user_id(user.id) if old_session is not None: logger.info("delete old session") session.delete_session(old_session.session_id) sysUserRoleOrganBusiness = SysUserRoleOrganBusiness(db); sysUserRoleOrgan = sysUserRoleOrganBusiness.get_last_use_SURO(user.id) if sysUserRoleOrgan is None: return StandardResponse(code=FAILED, message="请先设置权限后再登录") logger.info("create new session") new_session = session.create_session(user,sysUserRoleOrgan) # Get user roles and permissions user_role_biz = UserRoleBusiness(db) user_roles = user_role_biz.get_user_roles(sysUserRoleOrgan.role_id) user_menu_permissions = user_role_biz.get_role_menu_permissions(sysUserRoleOrgan.role_id) # Prepare roles and permissions for response roles_data = [{ "id": role.id, "name": role.name, "description": role.description } for role in user_roles] # permissions_data = [{ # "id": perm.id, # "name": perm.name, # "description": perm.description, # "menu_name": perm.menu_name, # "menu_route": perm.menu_route, # "menu_icon": perm.menu_icon, # "parent_id": perm.parent_id # } for perm in user_menu_permissions] # 构建权限字典,方便通过ID查找 permission_map = {p.id: { "id": p.id, "name": p.name, "description": p.description, "menu_name": p.menu_name, "menu_route": p.menu_route, "menu_icon": p.menu_icon, "parent_id": p.parent_id, "children": [] } for p in user_menu_permissions} # 构建树形结构 tree = [] for p_id, p_data in permission_map.items(): parent_id = p_data["parent_id"] if parent_id and parent_id in permission_map: permission_map[parent_id]["children"].append(p_data) else: tree.append(p_data) session_data = { "session_id": new_session.session_id, "user_id": new_session.user_id, "username": new_session.username, "full_name": new_session.full_name } return StandardResponse(code=SUCCESS, message="login success", records=[{"session": session_data, "roles": roles_data, "menu_permissions": tree}]) elif request.action == "login_session": session_id = request.get_param("session_id", "") session = SessionBusiness(db) old_session = session.get_session(session_id) if old_session is None: return StandardResponse(code=FAILED, message="session not exists") old_session_data = { "session_id": old_session.session_id, "user_id": old_session.user_id, "username": old_session.username, "full_name": old_session.full_name } return StandardResponse(code=SUCCESS, message="login success", records=[old_session_data]) elif request.action == "logout": session_id = request.get_param("session_id", "") session = SessionBusiness(db) session.delete_session(session_id) return StandardResponse(code=SUCCESS, message="logout success") @router.get("/logout/{session_id}", response_model=StandardResponse) def logout(session_id: str, db: Session = Depends(get_db)): session = SessionBusiness(db) session.delete_session(session_id) return StandardResponse(code=SUCCESS, message="logout success") @router.post("/signin", response_model=StandardResponse) def signin(request: BasicRequest, db: Session = Depends(get_db)): if request.action == 'signin': biz = UserBusiness(db) request_username = request.get_param("username", "") request_password = request.get_param("password", "") request_fullname = request.get_param("full_name", request_username) # 如果 full_name 未提供,则使用 username request_email = request.get_param("email", "") # 确保提供了用户名和密码 if not request_username or not request_password: return StandardResponse(code=FAILED, message="Username and password are required") user = biz.get_user_by_username(request_username) if user is not None: return StandardResponse(code=FAILED, message="用户名已存在") user = biz.create_user(username=request_username, password=request_password, fullname=request_fullname, email=request_email) if user is None: return StandardResponse(code=FAILED, message="创建用户失败") # 分配角色 user_role_biz = UserRoleBusiness(db) assigned = user_role_biz.assign_role_to_user(user.id, 10) # 分配角色ID为10 if not assigned: logger.warning(f"Failed to assign role 10 to user {user.id} during signin") # 即使角色分配失败,也认为注册成功 return StandardResponse(code=SUCCESS, data=user.to_dict(), message="成功创建用户,请继续登录") return StandardResponse(code=FAILED, message="invalid action") # Pydantic models for request bodies class RoleCreateWithPermissionsRequest(BaseModel): role_id: Optional[int] = None name: str description: Optional[str] = None permission_ids: list[int] = [] class PermissionCreateRequest(BaseModel): name: str description: Optional[str] = None menu_name: Optional[str] = None menu_route: Optional[str] = None menu_icon: Optional[str] = None parent_id: Optional[int] = None # Role Management Endpoints @router.post("/roles", response_model=StandardResponse) def create_role_with_permissions_endpoint(request: RoleCreateWithPermissionsRequest, db: Session = Depends(get_db)): role_id = request.role_id role_name = request.name role_description = request.description permission_ids = request.permission_ids role_biz = RoleBusiness(db) if role_id: # 修改现有角色 role = role_biz.get_role(role_id) if not role: return StandardResponse(code=FAILED, message="角色不存在") # 更新角色名称和描述(如果提供) if role_name and role_name != role.name: existing_role_by_name = role_biz.get_role_by_name(role_name) if existing_role_by_name and existing_role_by_name.id != role_id: return StandardResponse(code=FAILED, message="新角色名称已存在") role_biz.update_role(role_id, name=role_name) if role_description is not None and role_description != role.description: role_biz.update_role(role_id, description=role_description) # 撤销所有现有权限 role_biz.revoke_all_permissions_from_role(role_id) # 重新分配权限 success_permissions = [] failed_permissions = [] for permission_id in permission_ids: if role_biz.assign_permission_to_role(role.id, permission_id): success_permissions.append(permission_id) else: failed_permissions.append(permission_id) response_message = f"角色 '{role.name}' 更新成功" if success_permissions: response_message += f", 成功分配 {len(success_permissions)} 个权限" if failed_permissions: response_message += f", {len(failed_permissions)} 个权限分配失败" return StandardResponse( code=SUCCESS, message=response_message, records=[{ "id": role.id, "name": role.name, "success_permissions": success_permissions, "failed_permissions": failed_permissions }] ) else: # 新增角色 if not role_name: return StandardResponse(code=FAILED, message="角色名称不能为空") existing_role = role_biz.get_role_by_name(role_name) if existing_role: return StandardResponse(code=FAILED, message="角色已存在") # 创建角色 role = role_biz.create_role(role_name, role_description) if not role: return StandardResponse(code=FAILED, message="创建角色失败") # 分配权限 success_permissions = [] failed_permissions = [] for permission_id in permission_ids: if role_biz.assign_permission_to_role(role.id, permission_id): success_permissions.append(permission_id) else: failed_permissions.append(permission_id) response_message = f"角色创建成功" if success_permissions: response_message += f", 成功分配 {len(success_permissions)} 个权限" if failed_permissions: response_message += f", {len(failed_permissions)} 个权限分配失败" return StandardResponse( code=SUCCESS, message=response_message, records=[{ "id": role.id, "name": role.name, "success_permissions": success_permissions, "failed_permissions": failed_permissions }] ) @router.get("/roles", response_model=StandardResponse) def get_roles_endpoint(db: Session = Depends(get_db)): role_biz = RoleBusiness(db) roles = role_biz.get_all_roles() roles_data = [{ "id": role.id, "name": role.name, "description": role.description, "permission_ids": [perm.id for perm in role_biz.get_role_permissions(role.id)] } for role in roles] return StandardResponse(code=SUCCESS, message="角色列表获取成功", records=roles_data) # Permission Management Endpoints @router.post("/permissions", response_model=StandardResponse) def create_permission_endpoint(request: PermissionCreateRequest, db: Session = Depends(get_db)): perm_name = request.name perm_desc = request.description menu_name = request.menu_name menu_route = request.menu_route menu_icon = request.menu_icon parent_id = request.parent_id if not perm_name: return StandardResponse(code=FAILED, message="Permission name is required") perm_biz = PermissionBusiness(db) existing_perm = perm_biz.get_permission_by_name(perm_name) if existing_perm: return StandardResponse(code=FAILED, message="Permission already exists") permission = perm_biz.create_permission(perm_name, perm_desc, menu_name, menu_route, menu_icon, parent_id) if permission: return StandardResponse(code=SUCCESS, message="Permission created successfully", records=[{"id": permission.id, "name": permission.name}]) return StandardResponse(code=FAILED, message="Failed to create permission") @router.get("/permissions", response_model=StandardResponse) def get_permissions_endpoint(db: Session = Depends(get_db)): perm_biz = PermissionBusiness(db) permissions = perm_biz.get_all_permissions() # 构建权限字典,方便通过ID查找 permission_map = {p.id: { "id": p.id, "name": p.name, "description": p.description, "menu_name": p.menu_name, "menu_route": p.menu_route, "menu_icon": p.menu_icon, "parent_id": p.parent_id, "children": [] } for p in permissions} # 构建树形结构 tree = [] for p_id, p_data in permission_map.items(): parent_id = p_data["parent_id"] if parent_id and parent_id in permission_map: permission_map[parent_id]["children"].append(p_data) else: tree.append(p_data) return StandardResponse(code=SUCCESS, message="权限列表获取成功", records=tree) class UserRoleAssignmentRequest(BaseModel): user_id: int role_ids: list[int] @router.post("/users/roles", response_model=StandardResponse) def assign_roles_to_user_endpoint(request: UserRoleAssignmentRequest, db: Session = Depends(get_db)): user_role_biz = UserRoleBusiness(db) user_id = request.user_id new_role_ids = set(request.role_ids) # 获取用户当前的角色ID current_roles = user_role_biz.get_user_roles(user_id) current_role_ids = {role.id for role in current_roles} if current_roles else set() # 需要添加的角色 roles_to_add = list(new_role_ids - current_role_ids) # 需要移除的角色 roles_to_remove = list(current_role_ids - new_role_ids) success_add_count = 0 failed_add_roles = [] for role_id in roles_to_add: if user_role_biz.assign_role_to_user(user_id, role_id): success_add_count += 1 else: failed_add_roles.append(role_id) success_remove_count = 0 failed_remove_roles = [] for role_id in roles_to_remove: if user_role_biz.revoke_role_from_user(user_id, role_id): success_remove_count += 1 else: failed_remove_roles.append(role_id) message = f"角色分配更新完成。成功添加 {success_add_count} 个角色,成功移除 {success_remove_count} 个角色。" if failed_add_roles: message += f" 添加失败的角色ID: {failed_add_roles}." if failed_remove_roles: message += f" 移除失败的角色ID: {failed_remove_roles}." if not failed_add_roles and not failed_remove_roles: return StandardResponse(code=SUCCESS, message="用户角色更新成功") else: return StandardResponse(code=FAILED, message=message) @router.get("/users", response_model=StandardResponse) def get_users_endpoint( db: Session = Depends(get_db), userName: Optional[str] = Query(None, description="用户名,用于模糊查询"), pageNo: int = Query(1, ge=1, description="页码,从1开始"), pageSize: int = Query(10, ge=1, le=100, description="每页数量,最大100") ): user_biz = UserBusiness(db) user_role_biz = UserRoleBusiness(db) paginated_users, total_count = user_biz.get_users_paginated(userName, pageNo, pageSize) users_data = [] for user in paginated_users: roles = user_role_biz.get_user_roles(user.id) role_ids = [role.id for role in roles] if roles else [] users_data.append({ "id": user.id, "username": user.username, "full_name": user.full_name, "email": user.email, "role_ids": role_ids }) return StandardResponse(code=SUCCESS, message="用户列表获取成功", records=users_data, total=total_count) user_router = router