From 4ae05022439f129124ae2c8a78d7d0071efb4526 Mon Sep 17 00:00:00 2001 From: work Date: Tue, 8 Jul 2025 15:52:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9=E4=BB=BB=E5=8A=A1=E4=B8=BA?= =?UTF-8?q?=E9=98=9F=E5=88=97+=E5=B9=B6=E5=8F=91=E7=9A=84=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F,=20=E7=99=BB=E5=BD=95,=E8=8E=B7=E5=8F=96=E8=B4=A6?= =?UTF-8?q?=E5=8F=B7=E9=85=8D=E7=BD=AE,=20=E6=9B=B4=E6=96=B0=E8=B4=A6?= =?UTF-8?q?=E5=8F=B7=E9=85=8D=E7=BD=AE=E5=8A=9F=E8=83=BD=E4=B8=BA=E5=8D=95?= =?UTF-8?q?=E7=8B=AC=E7=9A=84=E9=94=81=E8=83=BD=E5=B9=B6=E5=8F=91=E4=B8=A4?= =?UTF-8?q?=E6=AC=A1.=20=E6=9B=B4=E6=94=B9=E8=B4=A6=E5=8F=B7=E8=A2=AB?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E5=88=B0=E8=87=AA=E5=8A=A8=E5=8C=96=E6=97=B6?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E8=BF=9B=E8=A1=8C=E8=A7=A3=E9=94=81=E8=B4=A6?= =?UTF-8?q?=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 66 +++++++++++++++++++++++++++++++++++--------------- spider/task.py | 57 +++++++++++++++++++++++++------------------ 2 files changed, 80 insertions(+), 43 deletions(-) diff --git a/main.py b/main.py index abd2f0b..ca8babc 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,10 @@ from spider.task import * from loguru import logger +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +import threading +import time +import requests logger.add("./log/logging.log", rotation="50 MB") @@ -18,8 +23,6 @@ HOST = "http://118.193.40.152:8002" def get_task(): - if lock._block.locked(): - return url = f'{HOST}/queue/get-data' header = { 'Content-Type': 'application/json' @@ -28,10 +31,16 @@ def get_task(): "include_task_type": [], "exclude_task_type": [] } - response = requests.post(url, headers=header, json=data, proxies=None) - if response.status_code == 200: - result = response.json() - return result + try: + response = requests.post(url, headers=header, json=data, proxies=None) + if response.status_code == 200: + return response.json() + else: + # logger.error(f"Failed to get task. Status code: {response.status_code}, Response: {response.text}") + return None + except Exception as e: + logger.error(f"Error occurred while getting task: {str(e)}") + return None def task_callback(tid, data, status='success', msg='success'): @@ -65,20 +74,39 @@ def execute_task(tid, task_type, **kwargs): def main(): - while True: - try: - task = get_task() - if task is None: - logger.info("无任务") + # 创建任务队列,最大容量为3 + task_queue = Queue(maxsize=2) + # 存储正在运行的任务 + running_tasks = set() + + # 创建线程池执行任务(3个工作线程) + with ThreadPoolExecutor(max_workers=3) as executor: + while True: + try: + # 清理已完成的任务 + running_tasks = {task for task in running_tasks if not task.done()} + + # 如果队列未满,尝试获取新任务 + if len(running_tasks) < 3: + task = get_task() + if task: + logger.info(f"收到任务: {task['id']} - {task['task_type']}") + task_data = task['data'] + task_data['tid'] = task['id'] + task_data['task_type'] = task['task_type'] + # 提交任务到线程池并保存future对象 + future = executor.submit(execute_task, **task_data) + running_tasks.add(future) + else: + logger.info("无更多任务") + time.sleep(10) + else: + # 达到最大并发数时等待 + time.sleep(1) + + except Exception as e: + logger.error(f'Main Error: {e}') time.sleep(10) - continue - logger.info(f"收到任务{task}") - task['data']['tid'] = task['id'] - task['data']['task_type'] = task['task_type'] - execute_task(**task['data']) - except Exception as e: - logger.error(f'Main Error: {e}') - time.sleep(10) if __name__ == '__main__': diff --git a/spider/task.py b/spider/task.py index 86b497a..1297446 100644 --- a/spider/task.py +++ b/spider/task.py @@ -17,6 +17,7 @@ from PIL import ImageGrab from loguru import logger from playwright._impl._page import Page from playwright.sync_api import sync_playwright, Error, TimeoutError +from concurrent.futures import ThreadPoolExecutor from const import BUCKET, BASE_PATH from exceptions import AuthException, OperationFailed @@ -826,9 +827,19 @@ def check_account_status(page, cookies): # 校验Cookies 是否失效 # 登录页面判断 + retry_goto(page, 'https://www.facebook.com') + time.sleep(3) + login_btn = page.query_selector_all('//button[@name="login"]') if login_btn: raise AuthException('该账户登录状态失效', 'invalid') + + # 判断是否被检测到自动化,这种情况只需要点击按钮就可以继续 + if page.query_selector('//span[text()="We suspect automated behavior on your account"]') is not None: + page.click('//span[text()="Dismiss"]') + time.sleep(3) + retry_goto(page, 'https://www.facebook.com') + # 判断是否为英文 lang = page.locator('html').get_attribute('lang') if lang != "en": @@ -843,6 +854,7 @@ class RLock(threading._RLock): lock = RLock() +login_semaphore = threading.Semaphore(2) def playwright_like(cookies, target_url): @@ -856,12 +868,14 @@ def playwright_like(cookies, target_url): context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() + + check_account_status(page, parse_cookies(cookies)) + url = 'https://facebook.com' try: + page.goto(url) time.sleep(random.randint(3, 10)) - check_account_status(page, parse_cookies(cookies)) - if 'permalink.php?story_fbid' in target_url or '/posts/' in target_url: # 文字或图片类型 button_xpath = '//div[@class="__fb-light-mode x1n2onr6 x1vjfegm"]//span[@data-ad-rendering-role="like_button"]' @@ -917,11 +931,13 @@ def playwright_post(cookies, content, image_key=None): context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() + + check_account_status(page, parse_cookies(cookies)) + url = 'https://facebook.com' try: page.goto(url) time.sleep(random.randint(3, 10)) - check_account_status(page, parse_cookies(cookies)) time.sleep(5) if image_key: @@ -973,14 +989,11 @@ def playwright_comment(cookies, target_url, content, image_key=None): context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() - url = 'https://facebook.com' - try: - retry_goto(page, url) - time.sleep(random.randint(3, 10)) - check_account_status(page, parse_cookies(cookies)) - time.sleep(5) - page.goto(target_url) + check_account_status(page, parse_cookies(cookies)) + + try: + retry_goto(page, target_url) sleep(1, 2) if 'permalink.php?story_fbid' in target_url or '/posts/' in target_url or "/permalink/" in target_url: @@ -1038,7 +1051,7 @@ def playwright_comment(cookies, target_url, content, image_key=None): def playwright_get_user_profile(cookies, username=None): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') - with lock: + with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( @@ -1047,13 +1060,14 @@ def playwright_get_user_profile(cookies, username=None): context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() + + check_account_status(page, parse_cookies(cookies)) + url = 'https://facebook.com' try: - page.goto(url) + retry_goto(page, url) time.sleep(random.randint(3, 10)) - check_account_status(page, parse_cookies(cookies)) - profile_pic_url = page.locator( '//div[@aria-label="Shortcuts"]//li[1]//*[@preserveAspectRatio="xMidYMid slice"]').get_attribute( 'xlink:href') @@ -1094,13 +1108,10 @@ def playwright_set_user_profile(cookies, username=None, first_name=None, last_na context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() - url = 'https://www.facebook.com' + check_account_status(page, parse_cookies(cookies)) try: - page.goto(url) - check_account_status(page, parse_cookies(cookies)) - url = 'https://accountscenter.facebook.com/?entry_point=app_settings' - page.goto(url) + retry_goto(page, url) page.locator('//div[@role="list"]/div').first.click() if username: @@ -1148,7 +1159,7 @@ def playwright_set_user_profile(cookies, username=None, first_name=None, last_na def playwright_check_account_cookies(cookies): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') - with lock: + with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( @@ -1157,10 +1168,8 @@ def playwright_check_account_cookies(cookies): context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() - url = 'https://www.facebook.com' - page.goto(url) - time.sleep(3) check_account_status(page, parse_cookies(cookies)) + context.close() browser.close() return {} @@ -1224,7 +1233,7 @@ def playwright_login(username, password, code_2fa=None): check_freeze_account(username) path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') - with lock: + with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch(