from concurrent.futures.thread import ThreadPoolExecutor from spider.task import * from logger import error_logger, record_full_log TASK_TYPE = { 0: post, } HOST = "http://118.193.40.152:8091" def get_task(platform): # if lock._block.locked(): # return response = requests.get(f'{HOST}/services/task', json={'platform': platform}) result = response.json() if result['code'] == 0: return result def task_callback(task_type, queue_id, data, status=1, err_msg=''): response = requests.post( f'{HOST}/services/task/callback', json={ 'task_type': task_type, 'queue_id': queue_id, 'status': status, 'data': data, 'err_msg': err_msg, } ) result = response.json() if result['code'] != 0: raise RuntimeError(f"任务回调失败:{result['msg']}") def execute_task(queue_id, task_type, **kwargs): try: result = TASK_TYPE.get(task_type)(**kwargs) task_callback(task_type, queue_id, data=result) except AuthException as e: record_full_log(error_logger, e) task_callback(task_type, queue_id, data={}, status=4, err_msg=str(e)) except Exception as e: record_full_log(error_logger, e) task_callback(task_type, queue_id, data={}, status=0, err_msg=str(e)) def main(): with ThreadPoolExecutor(max_workers=1) as t: while True: try: task = get_task(0) if task is None: time.sleep(10) continue task['data']['queue_id'] = task['queue_id'] task['data']['task_type'] = task['task_type'] t.submit(execute_task, **task['data']) except Exception as e: error_logger.error(f'Main Error: {e}') time.sleep(10) if __name__ == '__main__': main()