from concurrent.futures.thread import ThreadPoolExecutor from spider.task import * from logger import error_logger, record_full_log TASK_TYPE = { 'get_account_profile': playwright_get_user_profile, 'update_account_profile': playwright_set_user_profile, 'check_account_cookies': playwright_check_account_cookies, 'comment': playwright_comment, 'like': playwright_like, 'post': playwright_post } HOST = "http://192.168.1.69:8001" def get_task(): if lock._block.locked(): return url = f'{HOST}/queue/get-data' header = { 'Content-Type': 'application/json' } response = requests.post(url, headers=header) if response.status_code == 200: result = response.json() return result def task_callback(tid, data, status='success', msg='success'): body = { 'id': tid, 'status': status, 'data': data, 'message': msg, } response = requests.post( f'{HOST}/queue/handle-data', json=body ) result = response.json() if response.status_code != 200: raise RuntimeError(f"任务回调失败:{result['msg']}") def execute_task(tid, task_type, **kwargs): try: result = TASK_TYPE.get(task_type)(**kwargs) task_callback(tid, data=result) except (AuthException, OperationFailed) as e: record_full_log(error_logger, e) task_callback(tid, data={}, status=e.error_type, msg=str(e)) except Exception as e: record_full_log(error_logger, e) task_callback(tid, data={}, status='failed', msg=str(e)) def main(): with ThreadPoolExecutor(max_workers=1) as t: while True: try: task = get_task() if task is None: time.sleep(10) continue task['data']['tid'] = task['id'] task['data']['task_type'] = task['task_type'] t.submit(execute_task, **task['data']) except Exception as e: error_logger.error(f'Main Error: {e}') time.sleep(10) if __name__ == '__main__': main()