123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- # coding=utf-8
- import os
- import gc
- from dotenv import load_dotenv
- import httpx
- from typing import List, Dict, AsyncGenerator
- from libs.text_processor import TextProcessor
- from utils.es import ElasticsearchOperations
- import json
- from fastapi import FastAPI, HTTPException, Request
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import StreamingResponse
- from pydantic import BaseModel
- from functions.call import generate_response_with_function_call, parse_function_call
- from functions.basic_function import basic_functions
- from openai import OpenAI
- import codecs
- import psutil
- #import chardet
- # 加载环境变量
- load_dotenv()
- # DeepSeek API配置
- DEEPSEEK_API_URL = os.getenv("DEEPSEEK_API_URL")
- DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
- async def chat_with_openai(prompt: List) -> AsyncGenerator[List, None]:
- client = OpenAI(api_key="sk-3894637f410a4653bbdc27fc86ddebc8", base_url="https://api.deepseek.com")
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=prompt,
- stream=True
- )
- print(response)
- for item in response:
- yield "data: " + json.dumps(item)+"\n\n"
- print(item.choices[0].delta.content)
- async def chat_with_deepseek(prompt: List) -> AsyncGenerator[List, None]:
- print(">>> start chat_with_deepseek ")
- print(prompt)
- new_prompt = prompt
- """与DeepSeek模型进行流式对话"""
- headers = {
- "Authorization": f"Bearer {DEEPSEEK_API_KEY}",
- "Content-Type": "application/json; charset=utf-8"
-
- }
- data = {
- "model":"Pro/deepseek-ai/DeepSeek-V3", #deepseek-ai/DeepSeek-V3",
- #"model": "Pro/deepseek-ai/DeepSeek-V3",
- "messages": new_prompt,
- "temperature": 0.7,
- "max_tokens": 4000,
- "stream": True
- }
-
- try:
- async with httpx.AsyncClient() as client:
- async with client.stream("POST", DEEPSEEK_API_URL, json=data, headers=headers) as response:
- print(response)
- response.raise_for_status()
- async for chunk in response.aiter_lines():
- if chunk:
- yield chunk+"\n\n"
- # if chunk.startswith("data: "):
- # json_data = chunk[6:]
- # if json_data != "[DONE]":
- # try:
- # chunk_data = json.loads(json_data)
- # if "choices" in chunk_data and chunk_data["choices"]:
- # delta = chunk_data["choices"][0].get("delta", {})
- # if "content" in delta:
- # yield delta["content"]
- # except json.JSONDecodeError:
- # continue
- except httpx.RequestError as e:
- print(f"Error: ",e)
- del data
- del headers
- app = FastAPI()
- # 允许所有来源的跨域请求
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- class ChatMessage(BaseModel):
- role: str
- content: str
-
- class ChatRequest(BaseModel):
- messages: List[ChatMessage]
- @app.get("/")
- def hello():
- return "hello"
- @app.post("/chat")
- async def chat_endpoint(request: ChatRequest):
- gc.collect()
- process = psutil.Process()
- print(">>> start chat_endpoint ")
- user_input = ""
-
- user_messages = []
- for msg in request.messages:
- user_messages.append({'role':msg.role, 'content':msg.content})
- last_message = user_messages[-1]
- user_input = last_message['content']
-
- if not user_input or user_input.strip() == "":
- raise HTTPException(status_code=400, detail="Message cannot be empty")
-
- prompt_text = []
- prompt_text.append(user_input)
- print(">>> user_input ", prompt_text)
- # if len(user_input) > 4:
- # results = es_ops.search_similar_texts(user_input)
- # for result in results:
- # if result['score'] > 1.8:
- # prompt_text.append(result['text'])
-
- # if len(prompt_text) > 0:
- # prompt_text = "\n\n".join(prompt_text)
- # prompt_text ="'''doc\n"+ prompt_text + "'''\n"
- # prompt_text = f"请基于以下的文档内容回复问题\n\n{prompt_text}\n\n{user_input}"
- # else:
- # prompt_text = user_input
- print(process.memory_info().rss)
- first_response = generate_response_with_function_call(functions=basic_functions, user_input=prompt_text)
- print(process.memory_info().rss)
- if 'choices' in first_response.keys():
- if 'tool_calls' in first_response['choices'][0]['message'].keys():
- choice = first_response['choices'][0]
- print(">>> function call response : ",choice['message'])
- #user_messages = user_messages + [choice['message']]
- call_result = parse_function_call(first_response, user_messages)
- if call_result['result'] != "":
- result_text = codecs.encode(call_result['result'], "utf-8")
- result_text = codecs.decode(result_text, "utf-8")
- user_messages = [{
- "role": "user",
- "content": f"以下是你的参考内容,请基于这些内容进行问题回答问题:【{user_input}】\n\n```doc\n{result_text}\n```",
- #"tool_call_id":choice['message']['tool_calls'][0]['id']
- }]
- # user_messages.append({
- # "role": "user",
- # "content": f"以下是你的参考内容,请基于这些内容进行问题回答:\n```doc\n{result_text}\n```",
- # #"tool_call_id":choice['message']['tool_calls'][0]['id']
- # })
- print(process.memory_info().rss)
- async def generate_response():
- async for chunk in chat_with_deepseek(user_messages):
- print(">>> ", chunk)
- yield chunk
- #yield json.dumps({"content": chunk}) + "\n"
-
- return StreamingResponse(generate_response(), media_type="application/json")
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
|