Files
py_facebook/main.py

114 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from spider.task import *
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import threading
import time
import requests
logger.add("./log/logging.log", rotation="50 MB")
TASK_TYPE = {
'get_account_profile': playwright_get_user_profile,
'update_account_profile': playwright_set_user_profile,
'check_account_cookies': playwright_check_account_cookies,
'comment': playwright_comment,
'like': playwright_like,
'post': playwright_post,
'login_account': playwright_m_login,
'forward': playwright_share
}
HOST = "http://118.193.40.152:8002"
def get_task():
url = f'{HOST}/queue/get-data'
header = {
'Content-Type': 'application/json'
}
data = {
"include_task_type": [],
"exclude_task_type": []
}
try:
response = requests.post(url, headers=header, json=data, proxies=None)
if response.status_code == 200:
return response.json()
else:
# logger.error(f"Failed to get task. Status code: {response.status_code}, Response: {response.text}")
return None
except Exception as e:
logger.error(f"Error occurred while getting task: {str(e)}")
return None
def task_callback(tid, data, status='success', msg='success'):
body = {
'id': tid,
'status': status,
'data': data,
'message': msg,
}
logger.info(f"回调任务: tid:{tid}, status:{status}, data:{data}, msg:{msg}")
response = requests.post(
f'{HOST}/queue/handle-data',
json=body,
proxies=None
)
result = response.json()
if response.status_code != 200:
raise RuntimeError(f"任务回调失败:code={response.status_code} text={result.text}")
def execute_task(tid, task_type, **kwargs):
try:
result = TASK_TYPE.get(task_type)(**kwargs)
task_callback(tid, data=result)
except (AuthException, OperationFailed) as e:
logger.exception("账号或操作异常")
task_callback(tid, data={}, status=e.error_type, msg=str(e))
except Exception as e:
logger.exception("未捕获异常")
task_callback(tid, data={}, status='failed', msg=str(e))
def main():
# 创建任务队列最大容量为3
task_queue = Queue(maxsize=2)
# 存储正在运行的任务
running_tasks = set()
# 创建线程池执行任务3个工作线程
with ThreadPoolExecutor(max_workers=3) as executor:
while True:
try:
# 清理已完成的任务
running_tasks = {task for task in running_tasks if not task.done()}
# 如果队列未满,尝试获取新任务
if len(running_tasks) < 3:
task = get_task()
if task:
logger.info(f"收到任务: {task['id']} - {task['task_type']}")
task_data = task['data']
task_data['tid'] = task['id']
task_data['task_type'] = task['task_type']
# 提交任务到线程池并保存future对象
future = executor.submit(execute_task, **task_data)
running_tasks.add(future)
else:
logger.info("无更多任务")
time.sleep(10)
else:
# 达到最大并发数时等待
time.sleep(1)
except Exception as e:
logger.error(f'Main Error: {e}')
time.sleep(10)
if __name__ == '__main__':
main()