# coding:utf-8 import json import re import requests from requests.exceptions import ChunkedEncodingError # 请求地址 qa_url = "http://api-ai-prod.valuesimplex.tech/api/v3/finchat" # 熵简提供的x-api-key api_key = "api@shangjian!" token = "9b5c0b6a5e1e4e8fioouiouqiuioasaz" user_id = 999041 def ask(question, user_id): # 设置请求头 headers = { 'token': token, 'X-API-KEY': api_key, 'Content-Type': 'application/json' } # 构造请求体 payload = json.dumps({ "msg": question, # 历史问答,没有则为空List "history": [], "user_id": user_id, "model": "deepseek-r1", # 默认值 不用改 "using_indicator": True, # 是否用指标 "start_time": "2024-01-01", # 开始时间 "doc_show_type": ["A001", "A002", "A003", "A004"], # 文档类型 "simple_tracking": simple_tracking # 是否简单溯源 }) print(f"******开始提问:[{question}]") # 发送请求 response = requests.request("POST", qa_url, data=payload, headers=headers, stream=True) qa_result = '' # 判断请求是否成功 if response.status_code == 200: if stream_enabled: try: for chunk in response.iter_content(chunk_size=128): try: chunk_event = chunk.decode('utf-8', 'ignore') except UnicodeDecodeError as e: # 自定义处理解码错误,例如替换无法解码的部分 chunk_event = chunk.decode('utf-8', 'replace') print(f"Decoding error occurred: {e}") qa_result += chunk_event print(f"\033[1;32m" + chunk_event) except ChunkedEncodingError: print("Stream ended prematurely. Handling gracefully.") else: # 获取响应内容 qa_result = response.content # 将响应内容解码为utf-8格式 qa_result = qa_result.decode('utf-8') else: print(f"Failed to get stream data. Status code: {response.status_code}") # 返回结果 return qa_result if __name__ == '__main__': # 问题内容 question = '科大讯飞业绩怎么样?' # 关闭吐字模式 stream_enabled = True # 开启简单溯源 simple_tracking = True # 调用函数进行问答 result = ask(question, user_id) # 仅打印最终问答结果 print("**************COT**************") cot_list = re.findall(r'\{"id":"_cot","content":"(.*?)"}', result) cot = "".join(cot_list) print(cot) print("**********************************") # 仅打印最终问答结果 print("**************最终答案**************") print(re.findall(r'\{"id":"_final","content":"(.*?)"}', result)[0]) # print(result['answer']) print("**********************************") if simple_tracking: print("**************溯源文件**************") source_file = re.findall(r'\{"id":"tracking_documents","content":\s*(\[[^]]*])}', result) if source_file and source_file.__len__() > 0: print(source_file[0]) print("**********************************")