entity_extract.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #通过分析文章,生成分析结果
  2. import asyncio
  3. import os
  4. import time
  5. import json
  6. from typing import List, Dict, AsyncGenerator
  7. import httpx
  8. from dotenv import load_dotenv
  9. from typing import List, Dict, AsyncGenerator
  10. # 加载环境变量
  11. load_dotenv()
  12. # DeepSeek API配置
  13. DEEPSEEK_API_URL = os.getenv("DEEPSEEK_API_URL")
  14. DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
  15. #API_URL = "https://api.siliconflow.cn/v1/chat/completions"
  16. API_URL = "http://localhost/v1/chat-messages"
  17. API_KEY = "app-rXG0orb4Ap1slxQvAZKkAdGk"
  18. #API_URL = "http://localhost/v1/completion-messages"
  19. # #API_KEY = "app-U23k5t9iNdwbulCZBCgvJusS"
  20. def load_prompt(filename):
  21. with open(filename, "r", encoding="utf-8") as f:
  22. return "".join(f.readlines())
  23. async def chat_with_dify(prompt: str) -> AsyncGenerator[List, None]:
  24. print(">>> start chat_with_dify ")
  25. """与DeepSeek模型进行流式对话"""
  26. headers = {
  27. "Authorization": f"Bearer {API_KEY}",
  28. "Content-Type": "application/json; charset=utf-8"
  29. }
  30. data = {
  31. #"inputs": {"query": prompt},
  32. "inputs": {},
  33. "query": prompt,
  34. "response_mode": "streaming",
  35. "user":"face2shadow@163.com"
  36. }
  37. try:
  38. async with httpx.AsyncClient() as client:
  39. async with client.stream("POST", API_URL, json=data, headers=headers) as response:
  40. response.raise_for_status()
  41. async for chunk in response.aiter_lines():
  42. if chunk:
  43. if chunk.startswith("data: "):
  44. json_data = chunk[6:]
  45. if json_data != "[DONE]":
  46. try:
  47. chunk_data = json.loads(json_data)
  48. if "answer" in chunk_data and chunk_data["answer"]:
  49. delta = chunk_data["answer"] #chunk_data["choices"][0].get("delta", {})
  50. yield delta
  51. #if "content" in delta:
  52. # yield delta["content"]
  53. except json.JSONDecodeError:
  54. continue
  55. except httpx.RequestError as e:
  56. print(f"Error: ",e)
  57. del data
  58. del headers
  59. from test.deepseek_chat import chat_with_deepseek
  60. async def chat(prompt: str) -> str:
  61. message = [{'role':'user', 'content': prompt}]
  62. call_deepseek = chat_with_deepseek(message)
  63. #call_deepseek = chat_with_dify(prompt)
  64. output = ""
  65. async for chunk in call_deepseek:
  66. output = output + chunk
  67. print(chunk, end="")
  68. print("\n")
  69. return output
  70. if __name__ == "__main__":
  71. prompt_template = load_prompt("kb/label.txt")
  72. json_data = None
  73. with open(r"D:\work\03\regulations.json","r",encoding="utf-8") as f:
  74. lines = f.readlines()
  75. json_data = json.loads(''.join(lines))
  76. print(">>> finished process document ")
  77. if json_data:
  78. index = 1
  79. total = len(json_data)
  80. for item in json_data:
  81. title = item["meta_data"]['ArticleTitle']
  82. text = item["article_text"]
  83. title = title.replace("/","_")
  84. title = title.replace("\\","_")
  85. if os.path.exists(f"d:/work/03/output/doc_label/{title}.txt"):
  86. print(f"skip {title}")
  87. continue
  88. text = prompt_template.format(text=text)
  89. count = 0
  90. while count < 3:
  91. try:
  92. coro = chat(text)
  93. output = asyncio.run(coro)
  94. title = title.replace("/","_")
  95. title = title.replace("\\","_")
  96. if os.path.exists(f"d:/work/03/output/doc_label/{title}.txt"):
  97. print("abstract file already exists, skip")
  98. else:
  99. with open(f"d:/work/03/output/doc_label/{title}.txt", "w", encoding="utf-8") as f:
  100. f.write(output)
  101. print(">>> process", title, "ok")
  102. count = 3
  103. except Exception as e:
  104. print(e)
  105. print(">>> process", title, "failed, retry", count)
  106. count = count + 1
  107. time.sleep(3)
  108. index = index + 1