From 409967fa1db3e04c9a8c09ec57054fb9efc22ce1 Mon Sep 17 00:00:00 2001 From: work Date: Fri, 22 Aug 2025 17:17:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=81=A2=E5=A4=8Dmain=E7=9A=84=E5=B9=B6?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/main.py b/main.py index f1b4f88..13ff7c8 100644 --- a/main.py +++ b/main.py @@ -74,17 +74,41 @@ def execute_task(tid, task_type, **kwargs): def main(): - while True: - try: - task = get_task() - if task is None: + # 创建任务队列,最大容量为3 + task_queue = Queue(maxsize=2) + # 存储正在运行的任务 + running_tasks = set() + + # 创建线程池执行任务(2个工作线程) + with ThreadPoolExecutor(max_workers=2) as executor: + while True: + try: + # 清理已完成的任务 + running_tasks = {task for task in running_tasks if not task.done()} + + # 如果队列未满,尝试获取新任务 + if len(running_tasks) < 2: + task = get_task() + if task: + logger.info(f"收到任务: {task['id']} - {task['task_type']}") + task_data = task['data'] + task_data['tid'] = task['id'] + task_data['task_type'] = task['task_type'] + # 提交任务到线程池并保存future对象 + future = executor.submit(execute_task, **task_data) + running_tasks.add(future) + else: + logger.info("无更多任务") + time.sleep(10) + else: + # 达到最大并发数时等待 + logger.info("等待任务完成") + time.sleep(1) + + except Exception as e: + logger.error(f'Main Error: {e}') time.sleep(10) - continue - task['data']['tid'] = task['id'] - task['data']['task_type'] = task['task_type'] - execute_task(**task['data']) - except Exception as e: - time.sleep(10) + if __name__ == '__main__': main()