# coding=utf-8 import os import gc from dotenv import load_dotenv import httpx from typing import List, Dict, AsyncGenerator from libs.text_processor import TextProcessor from utils.es import ElasticsearchOperations import json from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from functions.call import generate_response_with_function_call, parse_function_call from functions.basic_function import basic_functions from openai import OpenAI import codecs import psutil #import chardet # 加载环境变量 load_dotenv() # DeepSeek API配置 DEEPSEEK_API_URL = os.getenv("DEEPSEEK_API_URL") DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") async def chat_with_openai(prompt: List) -> AsyncGenerator[List, None]: client = OpenAI(api_key="sk-3894637f410a4653bbdc27fc86ddebc8", base_url="https://api.deepseek.com") response = client.chat.completions.create( model="deepseek-chat", messages=prompt, stream=True ) print(response) for item in response: yield "data: " + json.dumps(item)+"\n\n" print(item.choices[0].delta.content) async def chat_with_deepseek(prompt: List) -> AsyncGenerator[List, None]: print(">>> start chat_with_deepseek ") print(prompt) new_prompt = prompt """与DeepSeek模型进行流式对话""" headers = { "Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json; charset=utf-8" } data = { "model":"Pro/deepseek-ai/DeepSeek-V3", #deepseek-ai/DeepSeek-V3", #"model": "Pro/deepseek-ai/DeepSeek-V3", "messages": new_prompt, "temperature": 0.7, "max_tokens": 4000, "stream": True } try: async with httpx.AsyncClient() as client: async with client.stream("POST", DEEPSEEK_API_URL, json=data, headers=headers) as response: print(response) response.raise_for_status() async for chunk in response.aiter_lines(): if chunk: yield chunk+"\n\n" # if chunk.startswith("data: "): # json_data = chunk[6:] # if json_data != "[DONE]": # try: # chunk_data = json.loads(json_data) # if "choices" in chunk_data and chunk_data["choices"]: # delta = chunk_data["choices"][0].get("delta", {}) # if "content" in delta: # yield delta["content"] # except json.JSONDecodeError: # continue except httpx.RequestError as e: print(f"Error: ",e) del data del headers app = FastAPI() # 允许所有来源的跨域请求 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[ChatMessage] @app.get("/") def hello(): return "hello" @app.post("/chat") async def chat_endpoint(request: ChatRequest): gc.collect() process = psutil.Process() print(">>> start chat_endpoint ") user_input = "" user_messages = [] for msg in request.messages: user_messages.append({'role':msg.role, 'content':msg.content}) last_message = user_messages[-1] user_input = last_message['content'] if not user_input or user_input.strip() == "": raise HTTPException(status_code=400, detail="Message cannot be empty") prompt_text = [] prompt_text.append(user_input) print(">>> user_input ", prompt_text) # if len(user_input) > 4: # results = es_ops.search_similar_texts(user_input) # for result in results: # if result['score'] > 1.8: # prompt_text.append(result['text']) # if len(prompt_text) > 0: # prompt_text = "\n\n".join(prompt_text) # prompt_text ="'''doc\n"+ prompt_text + "'''\n" # prompt_text = f"请基于以下的文档内容回复问题\n\n{prompt_text}\n\n{user_input}" # else: # prompt_text = user_input print(process.memory_info().rss) first_response = generate_response_with_function_call(functions=basic_functions, user_input=prompt_text) print(process.memory_info().rss) if 'choices' in first_response.keys(): if 'tool_calls' in first_response['choices'][0]['message'].keys(): choice = first_response['choices'][0] print(">>> function call response : ",choice['message']) #user_messages = user_messages + [choice['message']] call_result = parse_function_call(first_response, user_messages) if call_result['result'] != "": result_text = codecs.encode(call_result['result'], "utf-8") result_text = codecs.decode(result_text, "utf-8") user_messages = [{ "role": "user", "content": f"以下是你的参考内容,请基于这些内容进行问题回答问题:【{user_input}】\n\n```doc\n{result_text}\n```", #"tool_call_id":choice['message']['tool_calls'][0]['id'] }] # user_messages.append({ # "role": "user", # "content": f"以下是你的参考内容,请基于这些内容进行问题回答:\n```doc\n{result_text}\n```", # #"tool_call_id":choice['message']['tool_calls'][0]['id'] # }) print(process.memory_info().rss) async def generate_response(): async for chunk in chat_with_deepseek(user_messages): print(">>> ", chunk) yield chunk #yield json.dumps({"content": chunk}) + "\n" return StreamingResponse(generate_response(), media_type="application/json") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)