Files
py_facebook/main.py

114 lines
3.6 KiB
Python
Raw Normal View History

2025-03-28 14:50:37 +08:00
from spider.task import *
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import threading
import time
import requests
logger.add("./log/logging.log", rotation="50 MB")
2025-03-28 14:50:37 +08:00
TASK_TYPE = {
2025-04-08 16:09:45 +08:00
'get_account_profile': playwright_get_user_profile,
2025-04-14 14:45:16 +08:00
'update_account_profile': playwright_set_user_profile,
2025-04-08 16:09:45 +08:00
'check_account_cookies': playwright_check_account_cookies,
2025-04-14 14:45:16 +08:00
'comment': playwright_comment,
'like': playwright_like,
2025-04-24 15:55:08 +08:00
'post': playwright_post,
2025-05-30 14:01:29 +08:00
'login_account': playwright_m_login,
'forward': playwright_share
2025-03-28 14:50:37 +08:00
}
2025-04-27 16:19:10 +08:00
HOST = "http://118.193.40.152:8002"
2025-03-28 14:50:37 +08:00
2025-04-08 16:09:45 +08:00
def get_task():
url = f'{HOST}/queue/get-data'
header = {
'Content-Type': 'application/json'
}
data = {
2025-05-30 14:01:29 +08:00
"include_task_type": [],
"exclude_task_type": []
}
try:
response = requests.post(url, headers=header, json=data, proxies=None)
if response.status_code == 200:
return response.json()
else:
# logger.error(f"Failed to get task. Status code: {response.status_code}, Response: {response.text}")
return None
except Exception as e:
logger.error(f"Error occurred while getting task: {str(e)}")
return None
2025-03-28 14:50:37 +08:00
2025-04-08 16:09:45 +08:00
def task_callback(tid, data, status='success', msg='success'):
body = {
'id': tid,
'status': status,
'data': data,
'message': msg,
}
logger.info(f"回调任务: tid:{tid}, status:{status}, data:{data}, msg:{msg}")
2025-03-28 14:50:37 +08:00
response = requests.post(
2025-04-08 16:09:45 +08:00
f'{HOST}/queue/handle-data',
json=body,
proxies=None
2025-03-28 14:50:37 +08:00
)
result = response.json()
2025-04-08 16:09:45 +08:00
if response.status_code != 200:
raise RuntimeError(f"任务回调失败:code={response.status_code} text={result.text}")
2025-03-28 14:50:37 +08:00
2025-04-08 16:09:45 +08:00
def execute_task(tid, task_type, **kwargs):
2025-03-28 14:50:37 +08:00
try:
result = TASK_TYPE.get(task_type)(**kwargs)
2025-04-08 16:09:45 +08:00
task_callback(tid, data=result)
2025-04-18 17:26:27 +08:00
except (AuthException, OperationFailed) as e:
logger.exception("账号或操作异常")
2025-04-18 17:26:27 +08:00
task_callback(tid, data={}, status=e.error_type, msg=str(e))
2025-03-28 14:50:37 +08:00
except Exception as e:
logger.exception("未捕获异常")
2025-04-18 17:26:27 +08:00
task_callback(tid, data={}, status='failed', msg=str(e))
2025-03-28 14:50:37 +08:00
def main():
# 创建任务队列最大容量为3
task_queue = Queue(maxsize=2)
# 存储正在运行的任务
running_tasks = set()
# 创建线程池执行任务3个工作线程
with ThreadPoolExecutor(max_workers=3) as executor:
while True:
try:
# 清理已完成的任务
running_tasks = {task for task in running_tasks if not task.done()}
# 如果队列未满,尝试获取新任务
if len(running_tasks) < 3:
task = get_task()
if task:
logger.info(f"收到任务: {task['id']} - {task['task_type']}")
task_data = task['data']
task_data['tid'] = task['id']
task_data['task_type'] = task['task_type']
# 提交任务到线程池并保存future对象
future = executor.submit(execute_task, **task_data)
running_tasks.add(future)
else:
logger.info("无更多任务")
time.sleep(10)
else:
# 达到最大并发数时等待
time.sleep(1)
except Exception as e:
logger.error(f'Main Error: {e}')
2025-03-28 14:50:37 +08:00
time.sleep(10)
if __name__ == '__main__':
main()