123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- #通过分析文章,生成分析结果
- import asyncio
- import os
- import time
- import json
- from typing import List, Dict, AsyncGenerator
- import httpx
- from dotenv import load_dotenv
- from typing import List, Dict, AsyncGenerator
- # 加载环境变量
- load_dotenv()
- # DeepSeek API配置
- DEEPSEEK_API_URL = os.getenv("DEEPSEEK_API_URL")
- DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
- #API_URL = "https://api.siliconflow.cn/v1/chat/completions"
- API_URL = "http://localhost/v1/chat-messages"
- API_KEY = "app-rXG0orb4Ap1slxQvAZKkAdGk"
- #API_URL = "http://localhost/v1/completion-messages"
- # #API_KEY = "app-U23k5t9iNdwbulCZBCgvJusS"
- def load_prompt(filename):
- with open(filename, "r", encoding="utf-8") as f:
- return "".join(f.readlines())
-
- async def chat_with_dify(prompt: str) -> AsyncGenerator[List, None]:
- print(">>> start chat_with_dify ")
- """与DeepSeek模型进行流式对话"""
- headers = {
- "Authorization": f"Bearer {API_KEY}",
- "Content-Type": "application/json; charset=utf-8"
-
- }
- data = {
- #"inputs": {"query": prompt},
- "inputs": {},
- "query": prompt,
- "response_mode": "streaming",
- "user":"face2shadow@163.com"
- }
- try:
- async with httpx.AsyncClient() as client:
- async with client.stream("POST", API_URL, json=data, headers=headers) as response:
- response.raise_for_status()
- async for chunk in response.aiter_lines():
- if chunk:
- if chunk.startswith("data: "):
- json_data = chunk[6:]
- if json_data != "[DONE]":
- try:
- chunk_data = json.loads(json_data)
- if "answer" in chunk_data and chunk_data["answer"]:
- delta = chunk_data["answer"] #chunk_data["choices"][0].get("delta", {})
- yield delta
- #if "content" in delta:
- # yield delta["content"]
- except json.JSONDecodeError:
- continue
- except httpx.RequestError as e:
- print(f"Error: ",e)
- del data
- del headers
- from test.deepseek_chat import chat_with_deepseek
- async def chat(prompt: str) -> str:
- message = [{'role':'user', 'content': prompt}]
- call_deepseek = chat_with_deepseek(message)
- #call_deepseek = chat_with_dify(prompt)
- output = ""
- async for chunk in call_deepseek:
- output = output + chunk
- print(chunk, end="")
- print("\n")
- return output
-
- if __name__ == "__main__":
- prompt_template = load_prompt("kb/label.txt")
- json_data = None
- with open(r"D:\work\03\regulations.json","r",encoding="utf-8") as f:
- lines = f.readlines()
- json_data = json.loads(''.join(lines))
- print(">>> finished process document ")
-
- if json_data:
- index = 1
- total = len(json_data)
- for item in json_data:
- title = item["meta_data"]['ArticleTitle']
- text = item["article_text"]
-
- title = title.replace("/","_")
- title = title.replace("\\","_")
- if os.path.exists(f"d:/work/03/output/doc_label/{title}.txt"):
- print(f"skip {title}")
- continue
- text = prompt_template.format(text=text)
- count = 0
- while count < 3:
- try:
- coro = chat(text)
- output = asyncio.run(coro)
- title = title.replace("/","_")
- title = title.replace("\\","_")
- if os.path.exists(f"d:/work/03/output/doc_label/{title}.txt"):
- print("abstract file already exists, skip")
- else:
- with open(f"d:/work/03/output/doc_label/{title}.txt", "w", encoding="utf-8") as f:
- f.write(output)
-
- print(">>> process", title, "ok")
- count = 3
- except Exception as e:
- print(e)
- print(">>> process", title, "failed, retry", count)
- count = count + 1
- time.sleep(3)
-
- index = index + 1
-
|