大贤者
精华
|
战斗力 鹅
|
回帖 0
注册时间 2015-6-6
|
可以用python弄一个,问了下豆包
python编程,在“C:\Users\Administrator\Desktop\定时问问题”目录下
定时每天早上7点读取配置文件,包含url和key字段
读取问题.txt,每个问题之间用回车间隔
使用openai库请求url,获取response.choices[0].message.content,每个请求间隔2秒,注意考虑异常请求,重试3次
返回值写入问题答案.txt中,格式是问题: 回车 答案:
得出的回答稍微改了下,因为deepseek没有余额,没法请求成功,就不调了
- import os
- import time
- import schedule
- from openai import OpenAI
- from configparser import ConfigParser
- # 配置文件路径和问题文件路径
- config_file_path = r"C:\Users\Administrator\Desktop\定时问问题\config.ini"
- question_file_path = r"C:\Users\Administrator\Desktop\定时问问题\问题.txt"
- answer_file_path = r"C:\Users\Administrator\Desktop\定时问问题\问题答案.txt"
- def read_config():
- """读取配置文件,获取 url 和 key"""
- config = ConfigParser()
- config.read(config_file_path)
- url = config.get('openai', 'url', fallback=None)
- key = config.get('openai', 'key', fallback=None)
- return url, key
- def read_questions():
- """读取问题文件,返回问题列表"""
- with open(question_file_path, 'r', encoding='utf-8') as file:
- questions = file.read().splitlines()
- return questions
- def request_answer(url, key, question):
- """使用 OpenAI API 请求答案,处理异常并重试"""
- max_retries = 3
- for attempt in range(max_retries):
- try:
- client = OpenAI(api_key=key, base_url=url)
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=[
- {"role": "system", "content": question},
- {"role": "user", "content": question},
- ],
- stream=False
- )
- return response.choices[0].message.content
- except Exception as e:
- if attempt < max_retries - 1:
- print(f"请求失败,正在重试第 {attempt + 1} 次:{e}")
- time.sleep(2)
- else:
- print(f"请求失败,已达到最大重试次数:{e}")
- return None
- def write_answers(answers):
- """将问题和答案写入文件"""
- with open(answer_file_path, 'w', encoding='utf-8') as file:
- for question, answer in answers:
- file.write(f"问题:{question}\n")
- file.write(f"答案:{answer}\n\n")
- def main():
- """主函数,定时执行任务"""
- url, key = read_config()
- if url is None or key is None:
- print("配置文件中缺少 url 或 key,请检查配置文件。")
- return
- questions = read_questions()
- answers = []
- for question in questions:
- answer = request_answer(url, key, question)
- if answer is None:
- answer = "请求失败,无法获取答案。"
- answers.append((question, answer))
- time.sleep(2)
- write_answers(answers)
- print("任务完成,答案已写入文件。")
- # 定时每天早上 7 点执行任务
- schedule.every().day.at("07:00").do(main)
- while True:
- schedule.run_pending()
- time.sleep(1)
复制代码 |
|