#通过分析文章,生成分析结果 import asyncio import os import time import json from typing import List, Dict, AsyncGenerator import httpx from dotenv import load_dotenv from typing import List, Dict, AsyncGenerator # 加载环境变量 load_dotenv() # DeepSeek API配置 DEEPSEEK_API_URL = os.getenv("DEEPSEEK_API_URL") DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") #API_URL = "https://api.siliconflow.cn/v1/chat/completions" API_URL = "http://localhost/v1/chat-messages" API_KEY = "app-rXG0orb4Ap1slxQvAZKkAdGk" #API_URL = "http://localhost/v1/completion-messages" # #API_KEY = "app-U23k5t9iNdwbulCZBCgvJusS" def load_prompt(filename): with open(filename, "r", encoding="utf-8") as f: return "".join(f.readlines()) async def chat_with_dify(prompt: str) -> AsyncGenerator[List, None]: print(">>> start chat_with_dify ") """与DeepSeek模型进行流式对话""" headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json; charset=utf-8" } data = { #"inputs": {"query": prompt}, "inputs": {}, "query": prompt, "response_mode": "streaming", "user":"face2shadow@163.com" } try: async with httpx.AsyncClient() as client: async with client.stream("POST", API_URL, json=data, headers=headers) as response: response.raise_for_status() async for chunk in response.aiter_lines(): if chunk: if chunk.startswith("data: "): json_data = chunk[6:] if json_data != "[DONE]": try: chunk_data = json.loads(json_data) if "answer" in chunk_data and chunk_data["answer"]: delta = chunk_data["answer"] #chunk_data["choices"][0].get("delta", {}) yield delta #if "content" in delta: # yield delta["content"] except json.JSONDecodeError: continue except httpx.RequestError as e: print(f"Error: ",e) del data del headers from test.deepseek_chat import chat_with_deepseek async def chat(prompt: str) -> str: message = [{'role':'user', 'content': prompt}] call_deepseek = chat_with_deepseek(message) #call_deepseek = chat_with_dify(prompt) output = "" async for chunk in call_deepseek: output = output + chunk print(chunk, end="") print("\n") return output if __name__ == "__main__": prompt_template = load_prompt("kb/label.txt") json_data = None with open(r"D:\work\03\regulations.json","r",encoding="utf-8") as f: lines = f.readlines() json_data = json.loads(''.join(lines)) print(">>> finished process document ") if json_data: index = 1 total = len(json_data) for item in json_data: title = item["meta_data"]['ArticleTitle'] text = item["article_text"] title = title.replace("/","_") title = title.replace("\\","_") if os.path.exists(f"d:/work/03/output/doc_label/{title}.txt"): print(f"skip {title}") continue text = prompt_template.format(text=text) count = 0 while count < 3: try: coro = chat(text) output = asyncio.run(coro) title = title.replace("/","_") title = title.replace("\\","_") if os.path.exists(f"d:/work/03/output/doc_label/{title}.txt"): print("abstract file already exists, skip") else: with open(f"d:/work/03/output/doc_label/{title}.txt", "w", encoding="utf-8") as f: f.write(output) print(">>> process", title, "ok") count = 3 except Exception as e: print(e) print(">>> process", title, "failed, retry", count) count = count + 1 time.sleep(3) index = index + 1