|
- import sys,os
- import uuid
- current_path = os.getcwd()
- sys.path.append(current_path)
- from config.site import SiteConfig
- from fastapi import APIRouter, Depends, Query
- from db.database import get_db
- from sqlalchemy.orm import Session
- from agent.models.web.response import StandardResponse,FAILED,SUCCESS
- from agent.models.web.request import BasicRequest
- from agent.libs.user import UserBusiness,SessionBusiness, UserRoleBusiness, RoleBusiness, PermissionBusiness
- import logging
- from pydantic import BaseModel
- from typing import Optional
- router = APIRouter(prefix="/user", tags=["agent job interface"])
- logger = logging.getLogger(__name__)
- config = SiteConfig()
- @router.post("/session", response_model=StandardResponse)
- def register(request: BasicRequest, db: Session = Depends(get_db)):
- if request.action == 'register':
- biz = UserBusiness(db)
- request_username = request.get_param("username", "")
- request_password = request.get_param("password", "")
- user = biz.get_user_by_username(request_username)
- if user is not None:
- return StandardResponse(code=FAILED, message="user already exists")
- user = biz.create_user(request_username, request_password)
- if user is None:
- return StandardResponse(code=FAILED, message="create user failed")
- return StandardResponse(code=SUCCESS, message="create user success")
- elif request.action =='login':
- request_username = request.get_param("username", "")
- request_password = request.get_param("password", "")
- logger.info(f"login: {request_username} {request_password}")
- biz = UserBusiness(db)
- user = biz.get_user_by_username(request_username)
- if user is None:
- return StandardResponse(code=FAILED, message="user not exists")
- if not biz.verify_password(request_password, user.hashed_password):
- return StandardResponse(code=FAILED, message="password error")
- session = SessionBusiness(db)
- old_session = session.get_session_by_user_id(user.id)
- if old_session is not None:
- logger.info("delete old session")
- session.delete_session(old_session.session_id)
-
- logger.info("create new session")
- new_session = session.create_session(user)
- # Get user roles and permissions
- user_role_biz = UserRoleBusiness(db)
- user_roles = user_role_biz.get_user_roles(user.id)
- user_menu_permissions = user_role_biz.get_user_menu_permissions(user.id)
- # Prepare roles and permissions for response
- roles_data = [{
- "id": role.id,
- "name": role.name,
- "description": role.description
- } for role in user_roles]
- # permissions_data = [{
- # "id": perm.id,
- # "name": perm.name,
- # "description": perm.description,
- # "menu_name": perm.menu_name,
- # "menu_route": perm.menu_route,
- # "menu_icon": perm.menu_icon,
- # "parent_id": perm.parent_id
- # } for perm in user_menu_permissions]
- # 构建权限字典,方便通过ID查找
- permission_map = {p.id: {
- "id": p.id, "name": p.name, "description": p.description,
- "menu_name": p.menu_name, "menu_route": p.menu_route,
- "menu_icon": p.menu_icon, "parent_id": p.parent_id,
- "children": []
- } for p in user_menu_permissions}
- # 构建树形结构
- tree = []
- for p_id, p_data in permission_map.items():
- parent_id = p_data["parent_id"]
- if parent_id and parent_id in permission_map:
- permission_map[parent_id]["children"].append(p_data)
- else:
- tree.append(p_data)
- session_data = {
- "session_id": new_session.session_id,
- "user_id": new_session.user_id,
- "username": new_session.username,
- "full_name": new_session.full_name
- }
- return StandardResponse(code=SUCCESS, message="login success", records=[{"session": session_data, "roles": roles_data, "menu_permissions": tree}])
- elif request.action == "login_session":
- session_id = request.get_param("session_id", "")
- session = SessionBusiness(db)
- old_session = session.get_session(session_id)
- if old_session is None:
- return StandardResponse(code=FAILED, message="session not exists")
-
- old_session_data = {
- "session_id": old_session.session_id,
- "user_id": old_session.user_id,
- "username": old_session.username,
- "full_name": old_session.full_name
- }
- return StandardResponse(code=SUCCESS, message="login success", records=[old_session_data])
- elif request.action == "logout":
- session_id = request.get_param("session_id", "")
- session = SessionBusiness(db)
- session.delete_session(session_id)
- return StandardResponse(code=SUCCESS, message="logout success")
- @router.get("/logout/{session_id}", response_model=StandardResponse)
- def logout(session_id: str, db: Session = Depends(get_db)):
- session = SessionBusiness(db)
- session.delete_session(session_id)
- return StandardResponse(code=SUCCESS, message="logout success")
- @router.post("/signin", response_model=StandardResponse)
- def signin(request: BasicRequest, db: Session = Depends(get_db)):
- if request.action == 'signin':
- biz = UserBusiness(db)
- request_username = request.get_param("username", "")
- request_password = request.get_param("password", "")
- request_fullname = request.get_param("full_name", request_username) # 如果 full_name 未提供,则使用 username
- request_email = request.get_param("email", "")
- # 确保提供了用户名和密码
- if not request_username or not request_password:
- return StandardResponse(code=FAILED, message="Username and password are required")
- user = biz.get_user_by_username(request_username)
- if user is not None:
- return StandardResponse(code=FAILED, message="用户名已存在")
-
- user = biz.create_user(username=request_username, password=request_password, fullname=request_fullname, email=request_email)
- if user is None:
- return StandardResponse(code=FAILED, message="创建用户失败")
-
- # 分配角色
- user_role_biz = UserRoleBusiness(db)
- assigned = user_role_biz.assign_role_to_user(user.id, 10) # 分配角色ID为10
- if not assigned:
- logger.warning(f"Failed to assign role 10 to user {user.id} during signin")
- # 即使角色分配失败,也认为注册成功
- return StandardResponse(code=SUCCESS, data=user.to_dict(), message="成功创建用户,请继续登录")
- return StandardResponse(code=FAILED, message="invalid action")
- # Pydantic models for request bodies
- class RoleCreateWithPermissionsRequest(BaseModel):
- role_id: Optional[int] = None
- name: str
- description: Optional[str] = None
- permission_ids: list[int] = []
- class PermissionCreateRequest(BaseModel):
- name: str
- description: Optional[str] = None
- menu_name: Optional[str] = None
- menu_route: Optional[str] = None
- menu_icon: Optional[str] = None
- parent_id: Optional[int] = None
- # Role Management Endpoints
- @router.post("/roles", response_model=StandardResponse)
- def create_role_with_permissions_endpoint(request: RoleCreateWithPermissionsRequest, db: Session = Depends(get_db)):
- role_id = request.role_id
- role_name = request.name
- role_description = request.description
- permission_ids = request.permission_ids
- role_biz = RoleBusiness(db)
- if role_id:
- # 修改现有角色
- role = role_biz.get_role(role_id)
- if not role:
- return StandardResponse(code=FAILED, message="角色不存在")
-
- # 更新角色名称和描述(如果提供)
- if role_name and role_name != role.name:
- existing_role_by_name = role_biz.get_role_by_name(role_name)
- if existing_role_by_name and existing_role_by_name.id != role_id:
- return StandardResponse(code=FAILED, message="新角色名称已存在")
- role_biz.update_role(role_id, name=role_name)
- if role_description is not None and role_description != role.description:
- role_biz.update_role(role_id, description=role_description)
- # 撤销所有现有权限
- role_biz.revoke_all_permissions_from_role(role_id)
- # 重新分配权限
- success_permissions = []
- failed_permissions = []
- for permission_id in permission_ids:
- if role_biz.assign_permission_to_role(role.id, permission_id):
- success_permissions.append(permission_id)
- else:
- failed_permissions.append(permission_id)
- response_message = f"角色 '{role.name}' 更新成功"
- if success_permissions:
- response_message += f", 成功分配 {len(success_permissions)} 个权限"
- if failed_permissions:
- response_message += f", {len(failed_permissions)} 个权限分配失败"
- return StandardResponse(
- code=SUCCESS,
- message=response_message,
- records=[{
- "id": role.id,
- "name": role.name,
- "success_permissions": success_permissions,
- "failed_permissions": failed_permissions
- }]
- )
- else:
- # 新增角色
- if not role_name:
- return StandardResponse(code=FAILED, message="角色名称不能为空")
- existing_role = role_biz.get_role_by_name(role_name)
- if existing_role:
- return StandardResponse(code=FAILED, message="角色已存在")
- # 创建角色
- role = role_biz.create_role(role_name, role_description)
- if not role:
- return StandardResponse(code=FAILED, message="创建角色失败")
- # 分配权限
- success_permissions = []
- failed_permissions = []
- for permission_id in permission_ids:
- if role_biz.assign_permission_to_role(role.id, permission_id):
- success_permissions.append(permission_id)
- else:
- failed_permissions.append(permission_id)
- response_message = f"角色创建成功"
- if success_permissions:
- response_message += f", 成功分配 {len(success_permissions)} 个权限"
- if failed_permissions:
- response_message += f", {len(failed_permissions)} 个权限分配失败"
- return StandardResponse(
- code=SUCCESS,
- message=response_message,
- records=[{
- "id": role.id,
- "name": role.name,
- "success_permissions": success_permissions,
- "failed_permissions": failed_permissions
- }]
- )
- @router.get("/roles", response_model=StandardResponse)
- def get_roles_endpoint(db: Session = Depends(get_db)):
- role_biz = RoleBusiness(db)
- roles = role_biz.get_all_roles()
- roles_data = [{
- "id": role.id,
- "name": role.name,
- "description": role.description,
- "permission_ids": [perm.id for perm in role_biz.get_role_permissions(role.id)]
- } for role in roles]
- return StandardResponse(code=SUCCESS, message="角色列表获取成功", records=roles_data)
- # Permission Management Endpoints
- @router.post("/permissions", response_model=StandardResponse)
- def create_permission_endpoint(request: PermissionCreateRequest, db: Session = Depends(get_db)):
- perm_name = request.name
- perm_desc = request.description
- menu_name = request.menu_name
- menu_route = request.menu_route
- menu_icon = request.menu_icon
- parent_id = request.parent_id
- if not perm_name:
- return StandardResponse(code=FAILED, message="Permission name is required")
- perm_biz = PermissionBusiness(db)
- existing_perm = perm_biz.get_permission_by_name(perm_name)
- if existing_perm:
- return StandardResponse(code=FAILED, message="Permission already exists")
- permission = perm_biz.create_permission(perm_name, perm_desc, menu_name, menu_route, menu_icon, parent_id)
- if permission:
- return StandardResponse(code=SUCCESS, message="Permission created successfully", records=[{"id": permission.id, "name": permission.name}])
- return StandardResponse(code=FAILED, message="Failed to create permission")
- @router.get("/permissions", response_model=StandardResponse)
- def get_permissions_endpoint(db: Session = Depends(get_db)):
- perm_biz = PermissionBusiness(db)
- permissions = perm_biz.get_all_permissions()
- # 构建权限字典,方便通过ID查找
- permission_map = {p.id: {
- "id": p.id, "name": p.name, "description": p.description,
- "menu_name": p.menu_name, "menu_route": p.menu_route,
- "menu_icon": p.menu_icon, "parent_id": p.parent_id,
- "children": []
- } for p in permissions}
- # 构建树形结构
- tree = []
- for p_id, p_data in permission_map.items():
- parent_id = p_data["parent_id"]
- if parent_id and parent_id in permission_map:
- permission_map[parent_id]["children"].append(p_data)
- else:
- tree.append(p_data)
- return StandardResponse(code=SUCCESS, message="权限列表获取成功", records=tree)
- class UserRoleAssignmentRequest(BaseModel):
- user_id: int
- role_ids: list[int]
- @router.post("/users/roles", response_model=StandardResponse)
- def assign_roles_to_user_endpoint(request: UserRoleAssignmentRequest, db: Session = Depends(get_db)):
- user_role_biz = UserRoleBusiness(db)
- user_id = request.user_id
- new_role_ids = set(request.role_ids)
- # 获取用户当前的角色ID
- current_roles = user_role_biz.get_user_roles(user_id)
- current_role_ids = {role.id for role in current_roles} if current_roles else set()
- # 需要添加的角色
- roles_to_add = list(new_role_ids - current_role_ids)
- # 需要移除的角色
- roles_to_remove = list(current_role_ids - new_role_ids)
- success_add_count = 0
- failed_add_roles = []
- for role_id in roles_to_add:
- if user_role_biz.assign_role_to_user(user_id, role_id):
- success_add_count += 1
- else:
- failed_add_roles.append(role_id)
- success_remove_count = 0
- failed_remove_roles = []
- for role_id in roles_to_remove:
- if user_role_biz.revoke_role_from_user(user_id, role_id):
- success_remove_count += 1
- else:
- failed_remove_roles.append(role_id)
- message = f"角色分配更新完成。成功添加 {success_add_count} 个角色,成功移除 {success_remove_count} 个角色。"
- if failed_add_roles:
- message += f" 添加失败的角色ID: {failed_add_roles}."
- if failed_remove_roles:
- message += f" 移除失败的角色ID: {failed_remove_roles}."
- if not failed_add_roles and not failed_remove_roles:
- return StandardResponse(code=SUCCESS, message="用户角色更新成功")
- else:
- return StandardResponse(code=FAILED, message=message)
- @router.get("/users", response_model=StandardResponse)
- def get_users_endpoint(
- db: Session = Depends(get_db),
- userName: Optional[str] = Query(None, description="用户名,用于模糊查询"),
- pageNo: int = Query(1, ge=1, description="页码,从1开始"),
- pageSize: int = Query(10, ge=1, le=100, description="每页数量,最大100")
- ):
- user_biz = UserBusiness(db)
- user_role_biz = UserRoleBusiness(db)
- paginated_users, total_count = user_biz.get_users_paginated(userName, pageNo, pageSize)
- users_data = []
- for user in paginated_users:
- roles = user_role_biz.get_user_roles(user.id)
- role_ids = [role.id for role in roles] if roles else []
- users_data.append({
- "id": user.id,
- "username": user.username,
- "full_name": user.full_name,
- "email": user.email,
- "role_ids": role_ids
- })
- return StandardResponse(code=SUCCESS, message="用户列表获取成功", records=users_data, total=total_count)
- user_router = router
|