SGTY 3 週間 前
コミット
98a1c13afe

+ 15 - 0
build/lib/knowledge/model/dict_system.py

@@ -0,0 +1,15 @@
+from sqlalchemy import Column, Integer, String, Identity
+from ..db.base_class import Base
+
+class DictSystem(Base):
+    __tablename__ = 'dict_system'
+
+    id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
+    dict_name = Column(String(255))
+    dict_value = Column(String(255))
+    dict_desc = Column(String(255))
+    #默认值为0,1为删除 
+    is_deleted = Column(Integer, default=0)
+    
+    def __repr__(self):
+        return f"<DictSystem(id={self.id}, dict_name={self.dict_name}, dict_value={self.dict_value})>"

+ 96 - 0
build/lib/knowledge/service/dict_system_service.py

@@ -0,0 +1,96 @@
+import copy
+from sqlalchemy.orm import Session
+from sqlalchemy import or_
+from typing import Optional, List
+from ..model.dict_system import DictSystem
+import logging
+from sqlalchemy.exc import IntegrityError
+from cachetools import TTLCache
+
+logger = logging.getLogger(__name__)
+
+class DictSystemService:
+    _cache = TTLCache(maxsize=100, ttl=60*60*24)
+    def __init__(self, db: Session):
+        self.db = db
+
+    def get_dict(self, dict_id: int):
+        dict_item = self.db.query(DictSystem).get(dict_id)
+        if not dict_item:
+            raise ValueError("Dict not found")
+        return dict_item
+
+    def create_dict(self, dict_data: dict):
+        try:
+            existing = self.db.query(DictSystem).filter(
+                DictSystem.dict_name == dict_data['dict_name'],
+                DictSystem.dict_value == dict_data['dict_value'],
+                DictSystem.is_deleted == 0
+            ).first()
+
+            if existing:
+                raise ValueError("Dict already exists")
+
+            new_dict = DictSystem(**dict_data)
+            self.db.add(new_dict)
+            self.db.commit()
+            return new_dict
+
+        except IntegrityError as e:
+            self.db.rollback()
+            logger.error(f"创建字典失败: {str(e)}")
+            raise ValueError("Database integrity error")
+
+    def update_dict(self, dict_id: int, update_data: dict):
+        dict_item = self.db.query(DictSystem).get(dict_id)
+        if not dict_item:
+            raise ValueError("Dict not found")
+
+        try:
+            for key, value in update_data.items():
+                setattr(dict_item, key, value)
+            self.db.commit()
+            return dict_item
+        except Exception as e:
+            self.db.rollback()
+            logger.error(f"更新字典失败: {str(e)}")
+            raise ValueError("Update failed")
+
+    def delete_dict(self, dict_id: int):
+        dict_item = self.db.query(DictSystem).get(dict_id)
+        if not dict_item:
+            raise ValueError("Dict not found")
+
+        try:
+            dict_item.is_deleted = 1
+            self.db.commit()
+            return None
+        except Exception as e:
+            self.db.rollback()
+            logger.error(f"删除字典失败: {str(e)}")
+            raise ValueError("Delete failed")
+
+    def get_dicts_by_name(self, dict_name):
+        cache_key = f"get_dicts_by_name_{dict_name}"
+        if cache_key in self._cache:
+            return copy.deepcopy(self._cache[cache_key])
+
+        try:
+            filters = [DictSystem.is_deleted == 0]
+            if dict_name:
+                filters.append(DictSystem.dict_name == dict_name)
+            
+            dicts = self.db.query(DictSystem).filter(*filters).all()
+            
+            result = [{
+                'id': d.id,
+                'dict_name': d.dict_name,
+                'dict_value': d.dict_value,
+                'dict_desc': d.dict_desc
+            } for d in dicts]
+            
+            self._cache[cache_key] = copy.deepcopy(result[0])
+            return result
+        except Exception as e:
+            logger.error(f"查询字典失败: {str(e)}")
+            raise e

+ 15 - 0
src/knowledge/model/dict_system.py

@@ -0,0 +1,15 @@
+from sqlalchemy import Column, Integer, String, Identity
+from ..db.base_class import Base
+
+class DictSystem(Base):
+    __tablename__ = 'dict_system'
+
+    id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
+    dict_name = Column(String(255))
+    dict_value = Column(String(255))
+    dict_desc = Column(String(255))
+    #默认值为0,1为删除 
+    is_deleted = Column(Integer, default=0)
+    
+    def __repr__(self):
+        return f"<DictSystem(id={self.id}, dict_name={self.dict_name}, dict_value={self.dict_value})>"

+ 96 - 0
src/knowledge/service/dict_system_service.py

@@ -0,0 +1,96 @@
+import copy
+from sqlalchemy.orm import Session
+from sqlalchemy import or_
+from typing import Optional, List
+from ..model.dict_system import DictSystem
+import logging
+from sqlalchemy.exc import IntegrityError
+from cachetools import TTLCache
+
+logger = logging.getLogger(__name__)
+
+class DictSystemService:
+    _cache = TTLCache(maxsize=100, ttl=60*60*24)
+    def __init__(self, db: Session):
+        self.db = db
+
+    def get_dict(self, dict_id: int):
+        dict_item = self.db.query(DictSystem).get(dict_id)
+        if not dict_item:
+            raise ValueError("Dict not found")
+        return dict_item
+
+    def create_dict(self, dict_data: dict):
+        try:
+            existing = self.db.query(DictSystem).filter(
+                DictSystem.dict_name == dict_data['dict_name'],
+                DictSystem.dict_value == dict_data['dict_value'],
+                DictSystem.is_deleted == 0
+            ).first()
+
+            if existing:
+                raise ValueError("Dict already exists")
+
+            new_dict = DictSystem(**dict_data)
+            self.db.add(new_dict)
+            self.db.commit()
+            return new_dict
+
+        except IntegrityError as e:
+            self.db.rollback()
+            logger.error(f"创建字典失败: {str(e)}")
+            raise ValueError("Database integrity error")
+
+    def update_dict(self, dict_id: int, update_data: dict):
+        dict_item = self.db.query(DictSystem).get(dict_id)
+        if not dict_item:
+            raise ValueError("Dict not found")
+
+        try:
+            for key, value in update_data.items():
+                setattr(dict_item, key, value)
+            self.db.commit()
+            return dict_item
+        except Exception as e:
+            self.db.rollback()
+            logger.error(f"更新字典失败: {str(e)}")
+            raise ValueError("Update failed")
+
+    def delete_dict(self, dict_id: int):
+        dict_item = self.db.query(DictSystem).get(dict_id)
+        if not dict_item:
+            raise ValueError("Dict not found")
+
+        try:
+            dict_item.is_deleted = 1
+            self.db.commit()
+            return None
+        except Exception as e:
+            self.db.rollback()
+            logger.error(f"删除字典失败: {str(e)}")
+            raise ValueError("Delete failed")
+
+    def get_dicts_by_name(self, dict_name):
+        cache_key = f"get_dicts_by_name_{dict_name}"
+        if cache_key in self._cache:
+            return copy.deepcopy(self._cache[cache_key])
+
+        try:
+            filters = [DictSystem.is_deleted == 0]
+            if dict_name:
+                filters.append(DictSystem.dict_name == dict_name)
+            
+            dicts = self.db.query(DictSystem).filter(*filters).all()
+            
+            result = [{
+                'id': d.id,
+                'dict_name': d.dict_name,
+                'dict_value': d.dict_value,
+                'dict_desc': d.dict_desc
+            } for d in dicts]
+            
+            self._cache[cache_key] = copy.deepcopy(result[0])
+            return result
+        except Exception as e:
+            logger.error(f"查询字典失败: {str(e)}")
+            raise e