import io import json import os import random import sys import threading import time import uuid from datetime import datetime from string import ascii_lowercase import pyotp import pywintypes import requests import win32api import win32con from PIL import ImageGrab from fake_useragent import UserAgent # 导入 UserAgent from loguru import logger from playwright._impl._page import Page from playwright.sync_api import sync_playwright, Error, TimeoutError import const from const import BUCKET, BASE_PATH from exceptions import AuthException, OperationFailed from miniofile import client, put_object from spider.proxy_valid import ProxyChecker def sleep(a, b=None): if not b: return time.sleep(a) return time.sleep(round(random.uniform(a, b), 1)) def _full_screenshot(): im = ImageGrab.grab() mem_file = io.BytesIO() # 保存到内存中 im.save(mem_file, "png") return mem_file def update_windows_distinguish(x=1920, y=1080): """更改windows分辨率""" if sys.platform == "win32": devmode = pywintypes.DEVMODEType() # screenSize = [1280,800] screenSize = [x, y] devmode.PelsWidth = screenSize[0] devmode.PelsHeight = screenSize[1] devmode.Fields = win32con.DM_PELSWIDTH | win32con.DM_PELSHEIGHT win32api.ChangeDisplaySettings(devmode, 0) def _change_language(page): sleep(2, 3) # 找到顶部头像按钮并点击 selector = ''' div[role="button"] svg image[style*="height: 40px"][style*="width: 40px"], div[role="button"] svg image[style*="height:40px"][style*="width:40px"] ''' page.wait_for_selector(selector, timeout=10000).click(force=True) sleep(1, 2) # 点击设置图标 page.wait_for_selector('//div[@role="list"]/div[@role="listitem"][1]', timeout=30000).click(force=True) sleep(1, 2) # 点击语言 page.wait_for_selector('//div[@role="menu"]/div[2]', timeout=10000).click(force=True) sleep(1, 2) # 点击切换语言 page.query_selector_all('//div[@role="list"]/div[@role="listitem"]')[6].click(force=True) sleep(1, 2) # 点击英文 page.wait_for_selector('//span[text()="English (US)"][1]', timeout=10000).click(force=True) sleep(3, 5) def _edit_privacy(page): try: time.sleep(5) # 可能会有弹窗 update_settings = page.query_selector_all('//div[@aria-label="Update settings" and @role="button"]') if len(update_settings) > 0: set_public = page.query_selector('//span[text()="Set to Public"]') if set_public: set_public.click() else: raise OperationFailed("有弹窗但是无法点击设置公开帖子") review_audience = page.query_selector_all('//h2/span[text()="Review audience"]') if len(review_audience) > 0: continue_btn = page.query_selector('//span[text()="Continue"]') if continue_btn: continue_btn.click() sleep(1, 2) page.click('//div[@aria-label="Select audience"]//span[text()="Public"]') page.click('//div[@aria-label="Save"]') sleep(1, 2) return else: raise OperationFailed("有弹窗但是无法返回") sleep(1, 2) page.click('//div[contains(@aria-label, "Edit privacy")]') sleep(1, 2) page.click('//div[@aria-label="Select audience"]//span[text()="Public"]') page.click('//div[@aria-label="Done"]') sleep(1, 2) except Error as e: logger.error(f"Error editing privacy settings: {e}") raise OperationFailed("编辑隐私设置失败") from e import json from typing import Union, List, Dict, Any def parse_cookies(cookies: Union[str, dict, list], default_domain: str = '.facebook.com', default_path: str = '/') -> \ List[Dict[str, Any]]: """ 解析多种格式的cookies为Playwright可用的格式 Args: cookies: 可以是JSON字符串、字典或已经是列表格式 default_domain: 默认域名 default_path: 默认路径 Returns: List of cookie dictionaries """ # 如果已经是列表,直接返回(添加locale) if isinstance(cookies, list): # 检查是否已包含locale,如果没有则添加 if not any(cookie.get('name') == 'locale' for cookie in cookies): cookies.append({'name': "locale", 'value': "en_US", 'domain': default_domain, 'path': default_path}) return cookies # 如果是字典 if isinstance(cookies, dict): cookie_list = [] for k, v in cookies.items(): cookie_list.append({ 'name': k, 'value': str(v), 'domain': default_domain, 'path': default_path }) # 添加locale if 'locale' not in cookies: cookie_list.append({'name': "locale", 'value': "en_US", 'domain': default_domain, 'path': default_path}) return cookie_list # 如果是字符串 if isinstance(cookies, str): # 尝试解析为JSON try: parsed = json.loads(cookies) return parse_cookies(parsed, default_domain, default_path) # 递归调用 except (json.JSONDecodeError, ValueError): # 按Cookie字符串格式解析 cookie_list = [] for cookie_str in cookies.split(';'): cookie_str = cookie_str.strip() if not cookie_str: continue # 分割键值对 parts = cookie_str.split('=', 1) if len(parts) == 2: key, value = parts[0].strip(), parts[1].strip() elif len(parts) == 1: key, value = parts[0].strip(), '' else: continue cookie_list.append({ 'name': key, 'value': value, 'domain': default_domain, 'path': default_path }) # 添加locale if not any(cookie['name'] == 'locale' for cookie in cookie_list): cookie_list.append({'name': "locale", 'value': "en_US", 'domain': default_domain, 'path': default_path}) return cookie_list # 如果是不支持的类型 raise ValueError(f"Unsupported cookies type: {type(cookies)}") def check_freeze_account(uid, max_retries=3, retry_delay=2, timeout=5): """ 检查账号是否被冻结,超时自动重试 """ headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36', } url = f"https://graph.facebook.com/{uid}/picture?type=normal" for attempt in range(max_retries): try: response = requests.get(url, headers=headers, allow_redirects=False, verify=False, timeout=timeout) if response.status_code == 302: if response.headers.get('Location') == 'https://static.xx.fbcdn.net/rsrc.php/v1/yh/r/C5yt7Cqf3zU.jpg': raise AuthException('该账号已被冻结', 'frozen') # 正常返回就 break break except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: print(f"请求超时或连接错误,第{attempt + 1}次重试: {e}") if attempt < max_retries - 1: time.sleep(retry_delay) else: raise OperationFailed("验证账号冻结失败") # 最后一次失败则抛出异常 def check_account_status(page, cookies): # 检查是否冻结 cookies = {i['name']: i['value'] for i in cookies} uid = cookies['c_user'] check_freeze_account(uid) # 校验Cookies 是否失效 # 登录页面判断 retry_goto(page, 'https://www.facebook.com') time.sleep(3) login_btn = page.query_selector_all('//button[@name="login"]') if login_btn: raise AuthException('该账户登录状态失效', 'invalid') create_btn = page.query_selector_all('//span[text()="Create new account"]') if create_btn: raise AuthException('该账户登录状态失效', 'invalid') # 判断是否被检测到自动化,这种情况只需要点击按钮就可以继续 if page.query_selector( '//span[text()="We suspect automated behaviour on your account" or text()="We suspect automated behavior on your account"]') is not None: page.click('//span[text()="Dismiss"]') time.sleep(3) retry_goto(page, 'https://www.facebook.com') # 判断是否需要运行cookies页面 if "flow=user_cookie_choice_v2&source=pft_user_cookie_choice" in page.url: allow_cookies = page.query_selector('//div[@role="dialog"]/div/div/div/div/div[3]/div/div/div[1]') if allow_cookies is None: raise OperationFailed("允许cookies设置点击失败") allow_cookies.click() time.sleep(3) return check_account_status(page, cookies) # 判断是否为英文 lang = page.locator('html').get_attribute('lang') if lang != "en": try: _change_language(page) except TimeoutError: raise OperationFailed('更改语言异常') class RLock(threading._RLock): pass lock = RLock() login_semaphore = threading.Semaphore(2) def playwright_like(cookies, target_url): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with lock: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=False, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() check_account_status(page, parse_cookies(cookies)) url = 'https://facebook.com' try: retry_goto(page, url) time.sleep(random.randint(3, 10)) if 'permalink.php?story_fbid' in target_url or '/permalink/' in target_url or '/posts/' in target_url: # 文字或图片类型 button_xpath = '//*[@role="dialog"]//span[text()="Like" or @data-ad-rendering-role="like_button"]' elif 'watch/?v' in target_url or '/videos/' in target_url: # 视频类型, 视频类型, button_xpath = '//span[@data-ad-rendering-role="like_button"][1]' elif '/reel/' in target_url: # 短视频类型 button_xpath = '//div[@class="__fb-dark-mode x1afcbsf x1uhb9sk x1swf91x"]//div[@aria-label="Like"]//div[@aria-label="Like"]' else: raise OperationFailed(f'不支持的帖子类型POST: {target_url}') retry_goto(page, target_url) sleep(1, 2) # 判断按钮是否存在 button = page.query_selector(button_xpath) if button: # 滚动到按钮所在处 button.scroll_into_view_if_needed() # 判断按钮是否已经点过赞 if button.get_attribute('style') == "" or button.get_attribute('style') is None: # 未点过赞进行点赞操作 button.click(force=True) else: raise OperationFailed("未找到点赞按钮") time.sleep(10) except Error as e: raise OperationFailed(f'操作超时,请重试{e}') screenshot_content = _full_screenshot() context.close() browser.close() key = f'screenshot/{uuid.uuid4()}.png' put_object(key, screenshot_content) return { 'response_url': target_url, 'screenshot_key': key } # 获取个个人主页视频数量 def get_post_count(page, cookies): # 进入个人主页视频页面获取最新视频链接 cookies = {i['name']: i['value'] for i in parse_cookies(cookies)} uid = cookies['c_user'] videos_url = f"https://www.facebook.com/profile.php?id={uid}" # 替换为实际的个人主页URL retry_goto(page, videos_url) # 移动页面最底部 page.evaluate("window.scrollTo(0, document.body.scrollHeight)") page.wait_for_timeout(random.randint(3, 5) * 1000) # 这里还是获取视频 posts = page.query_selector_all('//a[@aria-label="Enlarge"]') # 返回数量 logger.info(f"账号{uid} 获取到帖子数量为{len(posts)}") return len(posts) def retry_get_new_video(page, cookies, post_count): """ 每30秒重试一次,5分钟内没获取到新视频则抛出Timeout异常 Args: page: Playwright页面对象 cookies: Cookies post_count: 初始帖子数量 Returns: 函数执行结果或抛出Timeout(如果超时) """ max_duration = 5 * 60 # 5分钟(秒) retry_interval = 30 # 30秒重试一次 start_time = time.time() attempt = 1 while time.time() - start_time < max_duration: # 获取当前帖子数量 new_post_count = get_post_count(page, cookies) # 如果新帖子数量大于初始帖子数量,则表示有新帖子上传 if new_post_count > post_count: try: # 尝试点击视频 comment_buttons = page.query_selector_all( '//a[@aria-label="Enlarge"]') if comment_buttons: # 使用js去点击第一个评论按钮 element = page.query_selector_all('//a[@aria-label="Enlarge"]')[0] page.evaluate('(element) => element.click()', element) time.sleep(random.randint(3, 5)) page.reload() time.sleep(random.randint(3, 5)) return page.url except Exception as e: raise OperationFailed(f"点击视频时出错: {e}") # 计算下一次重试时间 elapsed = time.time() - start_time remaining_time = max_duration - elapsed if remaining_time > 0: # 等待30秒或剩余时间(取较小值) sleep_time = min(retry_interval, remaining_time) print(f"第{attempt}次尝试,等待 {sleep_time:.1f} 秒后重试... (剩余时间: {remaining_time:.1f}秒)") time.sleep(sleep_time) attempt += 1 print("5分钟超时,退出重试") raise TimeoutError("未获取到新视频(可能视频上传失败),已超时5分钟") def playwright_post(cookies, content, image_key=None): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with lock: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=False, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() page.evaluate(f'document.body.style.zoom = "{const.DISPLAY_SCALE}"') check_account_status(page, parse_cookies(cookies)) # 声明默认发布视频数量 video_count = 0 url = 'https://facebook.com' try: # 先获取视频数量 if ".mp4" in image_key: video_count = get_post_count(page, cookies) retry_goto(page, url) time.sleep(random.randint(3, 10)) time.sleep(5) if image_key: filename = image_key.split('/')[-1] file_path = os.path.join(BASE_PATH, 'files', filename) client.fget_object(BUCKET, image_key, file_path) sleep(1, 2) page.locator('input[accept="image/*,image/heif,image/heic,video/*,video/mp4,video/x-m4v,' 'video/x-matroska,.mkv"]').set_input_files(file_path) time.sleep(5) if not image_key: page.click('''//span[contains(text(), "What's on your mind")]''') _edit_privacy(page) page.type('''//div[contains(@aria-placeholder, "What's on your mind")]''', content, delay=50) page.click('//div[@aria-label="Post"]', timeout=300000) time.sleep(15) post_index = page.locator('//div[@aria-posinset="1"]//a[@role="link"]').nth(2) post_index.click(timeout=600000) time.sleep(5) page.reload() post_url = page.url # 视频格式要单独去获取链接 if ".mp4" in image_key: post_url = retry_get_new_video(page, cookies, video_count) time.sleep(random.randint(3, 10)) except Error as e: raise OperationFailed(f'操作超时,请重试{e}') screenshot_content = _full_screenshot() if image_key: os.remove(file_path) context.close() browser.close() key = f'screenshot/{uuid.uuid4()}.png' put_object(key, screenshot_content) return {'response_url': post_url, 'screenshot_key': key} def playwright_comment(cookies, target_url, content, image_key=None): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with lock: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=False, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() check_account_status(page, parse_cookies(cookies)) try: retry_goto(page, target_url) sleep(1, 2) if 'permalink.php?story_fbid' in target_url or '/posts/' in target_url or "/permalink/" in target_url: # 文字或图片类型 input_xpath = '//div[@role="dialog"]//span[@data-ad-rendering-role="comment_button"]' attach_xpath = '//div[@id="focused-state-actions-list"]//div[@aria-label="Attach a photo or video"]' comment_xpath = '//div[@aria-label="Comment"]' page.click(input_xpath) sleep(1, 2) elif 'watch/?v' in target_url or '/videos/' in target_url: # 视频类型 input_xpath = '//div[@aria-label="Write a comment…"]' attach_xpath = '//div[@aria-label="Attach a photo or video"]' comment_xpath = '//div[@aria-label="Comment"]' elif '/reel/' in target_url: # 短视频类型 input_xpath = '//div[@aria-label="Write a comment…"]' attach_xpath = '//div[@aria-label="Attach a photo or video"]' comment_xpath = '//div[@role="complementary"]//div[@aria-label="Comment"]' page.click('//div[@aria-label="Comment"][1]') sleep(1, 2) else: raise OperationFailed(f'不支持的帖子类型POST: {target_url}') if image_key: filename = image_key.split('/')[-1] file_path = os.path.join(BASE_PATH, 'files', filename) client.fget_object(BUCKET, image_key, file_path) sleep(1, 2) with page.expect_file_chooser() as fc_info: page.click(attach_xpath) file_chooser = fc_info.value file_chooser.set_files(file_path) time.sleep(5) page.type(input_xpath, content, delay=50) page.click(comment_xpath) time.sleep(5) except Error as e: raise OperationFailed(f'操作超时,请重试{e}') screenshot_content = _full_screenshot() if image_key: os.remove(file_path) context.close() browser.close() key = f'screenshot/{uuid.uuid4()}.png' put_object(key, screenshot_content) return { 'response_url': target_url, 'screenshot_key': key } def playwright_get_user_profile(cookies, username=None): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=const.HEADLESS, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() check_account_status(page, parse_cookies(cookies)) cookies_dict = {i['name']: i['value'] for i in parse_cookies(cookies)} uid = cookies_dict["c_user"] url = f'https://accountscenter.facebook.com/profiles/{uid}' try: retry_goto(page, url) time.sleep(random.randint(3, 10)) profile_pic_url = page.query_selector('//*[@preserveAspectRatio="xMidYMid slice"]').get_attribute( 'xlink:href') page.query_selector('//div[@role="dialog"]//div[@role="listitem"]').click() firstname = page.locator('//label[text()="First name"]/../input').input_value() lastname = page.locator('//label[text()="Last name"]/../input').input_value() except Error as e: raise OperationFailed(f'操作超时,请重试{e}') context.close() browser.close() response = requests.get( url=profile_pic_url, headers={ 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36'}, # proxies={ # 'http': 'http://127.0.0.1:10889', # 'https': 'http://127.0.0.1:10889', # } ) bio = io.BytesIO(response.content) key = f"{lastname + firstname.replace(' ', '_')}.png" put_object(key, bio) return {'avatar_key': key, 'username': lastname + firstname, 'first_name': firstname, 'last_name': lastname} def playwright_set_user_profile(cookies, username=None, first_name=None, last_name=None, avatar_key=None): if not first_name and not last_name and not avatar_key and not username: return path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=const.HEADLESS, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() check_account_status(page, parse_cookies(cookies)) try: cookies_dict = {i['name']: i['value'] for i in parse_cookies(cookies)} uid = cookies_dict["c_user"] url = f'https://accountscenter.facebook.com/profiles/{uid}' retry_goto(page, url) if first_name or last_name: if first_name and last_name: # 修改名称 page.click('//a[@aria-label="Name"]') page.locator('//input').first.fill(first_name) page.locator('//input').last.fill(last_name) page.locator('//div[@role="button"]').last.click() page.click('//span[text()="Done"]') else: raise OperationFailed('名称中必须有First name和Last name') if avatar_key: # 修改头像 page.click('//a[@aria-label="Profile picture"]') filename = avatar_key.split('/')[-1] file_path = os.path.join(BASE_PATH, 'files', filename) client.fget_object(BUCKET, avatar_key, file_path) sleep(1, 2) with page.expect_file_chooser() as fc_info: page.click('//div[text()="Upload new photo"]') file_chooser = fc_info.value file_chooser.set_files(file_path) page.locator('//span[text()="Save"]').last.click() time.sleep(5) os.remove(file_path) except Error as e: raise OperationFailed(f'操作超时,请重试{e}') context.close() browser.close() return { 'first_name': first_name, 'last_name': last_name, 'avatar_key': avatar_key, } def playwright_check_account_cookies(cookies): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=const.HEADLESS, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() check_account_status(page, parse_cookies(cookies)) context.close() browser.close() return {} def get_login_continue_btn(page): bs = page.query_selector_all('//span[text()="Continue"]') for b in bs: if b.is_visible() and b.is_enabled(): return b return None def retry_goto(page: "Page", url: str, max_retries: int = 3, retry_delay: int = 5): """ Attempts to navigate to a URL with retries on timeout using synchronous Playwright. Args: page: The synchronous Playwright Page object. url: The URL to navigate to. max_retries: Maximum number of retry attempts (including the initial attempt). retry_delay: Delay in seconds between retries. """ # ... potentially other code before goto ... for attempt in range(max_retries): try: if attempt > 0: logger.info(f"Retrying navigation to {url}, attempt {attempt + 1}/{max_retries}...") else: logger.info(f"Navigating to {url}, initial attempt...") # Make the synchronous goto call within the try block # Use wait_until='load' as in your traceback, or adjust if needed page.goto(url, timeout=30000, wait_until="domcontentloaded") # Use the synchronous call print(f"Successfully navigated to {url} on attempt {attempt + 1}") break # Navigation was successful, exit the retry loop except TimeoutError as e: print(f"Navigation to {url} timed out on attempt {attempt + 1}.") if attempt < max_retries - 1: print(f"Waiting {retry_delay} seconds before retrying...") time.sleep(retry_delay) # Use synchronous sleep else: print(f"All {max_retries} attempts failed for {url}.") # If all retries fail, re-raise the exception raise e except Exception as e: # Catch any other unexpected errors during goto print(f"An unexpected error occurred during navigation to {url} on attempt {attempt + 1}: {e}") # Decide if other exceptions should also trigger retries # For now, we'll just re-raise other exceptions immediately raise e def playwright_login(username, password, code_2fa=None): logger.info(f"登录账号{username}") # 检查是否冻结 check_freeze_account(username) path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with login_semaphore: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=const.HEADLESS, args=['--start-maximized'], executable_path=path ) random_user_agent = UserAgent().getBrowser("Chrome").get("useragent") logger.info(f"使用ua={random_user_agent}") context = browser.new_context(no_viewport=True, user_agent=random_user_agent) # 设置语言为英文 context.add_cookies([ { "name": "locale", "value": "en_US", "domain": ".facebook.com", "path": "/", "expires": -1, "httpOnly": True, "secure": False, }, ]) page = context.new_page() url = 'https://www.facebook.com' retry_goto(page, url) page.locator('//input[@id="email"]').type(username, delay=30) time.sleep(random.randint(1, 3)) page.locator('//input[@id="pass"]').type(password, delay=30) time.sleep(random.randint(1, 3)) page.click('//button[@name="login"]') page.wait_for_load_state() time.sleep(random.randint(3, 5)) arkose_captcha = page.query_selector('#arkose-captcha') if arkose_captcha: logger.info(f"账号{username} 弹语音识别验证") raise OperationFailed("操作失败") arkose_captcha = page.query_selector('#captcha-recaptcha') if arkose_captcha: logger.info(f"账号{username} 弹谷歌验证") raise OperationFailed("操作失败") captcha_img = page.query_selector('//img[contains(@src, "captcha")]') if captcha_img: logger.info(f"账号{username} 需要验证") data = { 'user': 'ycxxkj', 'pass2': 'B4DBF06831577C6558F823879061626C', 'softid': '951004', 'codetype': '3006', } response = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=data, timeout=60, files={'userfile': ('ccc.jpg', captcha_img.screenshot())}) result = response.json() if result['err_no'] == 0: pic_str = result['pic_str'] page.fill('//img[contains(@src, "captcha")]/parent::div/parent::div//input', pic_str) page.locator('//img[contains(@src, "captcha")]/parent::div/parent::div/div').nth(4).click() else: raise OperationFailed('验证码解析错误') # 检查是否还有验证码, 隐式等待60秒 page.wait_for_selector( '//span[@class="x1lliihq x1plvlek xryxfnj x1n2onr6 x1ji0vk5 x18bv5gf x193iq5w xeuugli x1fj9vlw x13faqbe x1vvkbs x1s928wv xhkezso x1gmr53x x1cpjm7i x1fgarty x1943h6x x1qo61fq x81x36d xa4e6wy x1rhavg7 xzsf02u x1yc453h xudqn12 x3x7a5m x1yztbdb"]', timeout=60000) h2 = page.wait_for_selector( '//div[@class="x1n2onr6 x1ja2u2z x9f619 x78zum5 xdt5ytf x2lah0s x193iq5w"]//h2/span', timeout=60000) if h2 is None: raise OperationFailed('页面有误') else: text_contexts = [ "Go to your authentication app", "Check your notifications on another device" ] if not h2.text_content() in text_contexts: logger.info(f"账号{username} 操作失败") raise OperationFailed("操作失败") auth_span = page.query_selector('//span[text()="Try Another Way" or text()="Try another way"]') if auth_span: if not code_2fa: raise OperationFailed('缺少2FA密钥') auth_code = pyotp.TOTP(code_2fa).now() auth_span.click() time.sleep(1) page.click('//div[text()="Authentication app"]') time.sleep(1) # 可能会有多个Continue btn = get_login_continue_btn(page) if btn is None: raise OperationFailed("操作失败") btn.click() time.sleep(1) page.locator('//label[text()="Code"]/preceding-sibling::input').fill(auth_code) page.click('//span[text()="Continue"]') # 等待登录成功页面出来 page.wait_for_selector( "xpath=//h2[normalize-space()='You’re logged in. Trust this device?'] or //span[text()='Save']", timeout=60000) c = {i['name']: i['value'] for i in context.cookies()} if c["c_user"] is None: raise OperationFailed("操作失败") logger.info(f"登录账号{username} 登录成功") context.close() browser.close() return {'cookies': json.dumps(c)} def get_proxy_from_api(): """从代理API获取代理地址""" if os.getenv("dev") is not None: return f"http://127.0.0.1:1080" try: # 假设你的代理API地址 proxy_api_url = const.PROXY_HOST response = requests.get(proxy_api_url) if response.status_code == 200: proxy_data = response.json() # 根据你的API返回格式调整 p = proxy_data.get("proxy") if p: return f"http://{p}" raise OperationFailed(f"获取代理时出错={proxy_data}") except Exception as e: raise OperationFailed("获取代理时出错") from e def playwright_m_login(username, password, code_2fa=None): logger.info(f"登录账号{username}") # 检查是否冻结 check_freeze_account(username) logger.info(f"账号{username} 未冻结") path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with lock: with sync_playwright() as playwright: update_windows_distinguish() proxy_url = ProxyChecker(timeout=8).get_valid_proxy_url() if proxy_url is None: raise OperationFailed("获取代理失败") logger.info(f"使用proxy={proxy_url}") browser = playwright.chromium.launch( headless=const.HEADLESS, args=['--start-maximized'], executable_path=path, proxy={ "server": proxy_url, } ) # random_user_agent = UserAgent().getBrowser(["Chrome Mobile iOS"]).get("useragent") random_user_agent = "Mozilla/5.0 (Linux; Android 12; Pixel 6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Mobile Safari/537.36" logger.info(f"使用ua={random_user_agent}") context = browser.new_context(no_viewport=True, user_agent=random_user_agent) # 设置语言为英文 context.add_cookies([ { "name": "locale", "value": "en_US", "domain": ".facebook.com", "path": "/", "expires": -1, "httpOnly": True, "secure": False, }, ]) page = context.new_page() url = 'https://m.facebook.com/login' retry_goto(page, url) # 概率会跳到首页 have_account = 'div[role="button"][aria-label="I already have an account"]' # 用户名输入框框 input_email = "input#m_login_email" try: page.wait_for_selector(f'{have_account}, {input_email}', timeout=60000) except Exception as e: logger.error(f"页面加载异常, 未定位到按钮", exc_info=True) raise OperationFailed("系统异常") hava_account_btn = page.query_selector(have_account) if hava_account_btn: hava_account_btn.click() page.wait_for_selector(f'{input_email}', timeout=60000).type(username, delay=30) time.sleep(1) page.wait_for_selector(f'//input[@id="m_login_password"]', timeout=60000).type(password, delay=30) page.click('div[aria-label="Log in"]') success_login_selector1 = 'span:has-text("Check your notifications on another device")' success_login_selector2 = 'span:has-text("Go to your authentication app")' failed_login_selector = 'div[data-bloks-name="bk.components.dialog.Dialog"] > div[aria-label="Wrong Credentials"] > div:nth-child(1)' # 等成功或失败 page.wait_for_selector(f'{success_login_selector1},{success_login_selector2},{failed_login_selector}', timeout=60000) # 判断是否失败 failed_login = page.query_selector(failed_login_selector) if failed_login: # 登录失败, 返回失败信息 logger.info(f"用户名:{username} 密码错误 {failed_login.text_content()}") raise OperationFailed(failed_login.text_content()) # 判断是否有多重验证 success_login1 = page.query_selector(success_login_selector1) if success_login1: # 点击尝试另一种方式验证账号 page.query_selector('div[role="button"][aria-label="Try another way"]').click() # 点击选择app验证 page.wait_for_selector( 'span[data-bloks-name="bk.components.TextSpan"]:has-text("Authentication app")').click() # 点击继续 page.query_selector('div[role="button"][aria-label="Continue"]').click() # 等待页面 page.wait_for_selector('span:has-text("Go to your authentication app")', timeout=60000) # 输入2faCode auth_code = pyotp.TOTP(code_2fa).now() page.wait_for_selector('input[aria-label="Code"][type="text"]', timeout=60000).fill(auth_code) # 点击继续 page.query_selector('div[role="button"][aria-label="Continue"]').click() # 等待登录成功, 账号被检测自动化程序会卡在这里. 直接忽略报错 try: page.wait_for_selector('img[data-bloks-name="bk.components.Image"][src*=".fbcdn.net/v/t"]', timeout=60000) except Exception as e: pass # 成功 logger.info(f"用户名:{username} 账号密码正确") c = {i['name']: i['value'] for i in context.cookies()} if c["c_user"] is None: raise OperationFailed("操作失败") logger.info(f"登录账号{username} 登录成功") context.close() browser.close() return {'cookies': json.dumps(c)} def playwright_share(cookies, target_url, content): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with lock: with sync_playwright() as playwright: update_windows_distinguish() browser = playwright.chromium.launch( headless=False, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) context.add_cookies(parse_cookies(cookies)) page = context.new_page() check_account_status(page, parse_cookies(cookies)) try: retry_goto(page, target_url) if 'permalink.php?story_fbid' in target_url or '/posts/' in target_url or "/permalink/" in target_url: # 文字或图片类型 share_button = '//div[@aria-label="Send this to friends or post it on your profile."]//span[@data-ad-rendering-role="share_button"]' input_box = '//form[@method="POST" and count(@*) = 1]/div/div/div[2]' share_now_button = '//span[text()="Share now"]' page.locator(share_button).last.click() elif 'watch/?v' in target_url or '/videos/' in target_url or 'watch?v' in target_url: # 视频类型, 视频类型, share_button = '//span[@dir="auto" and text()="Share"]' input_box = '//form[@method="POST" and count(@*) = 1]/div/div/div[2]' share_now_button = '//span[text()="Share now"]' page.locator(share_button).first.click() elif '/reel/' in target_url: # 短视频类型 share_button = '//div[@aria-label="Share"]' input_box = '//form[@method="POST" and count(@*) = 1]/div/div/div[2]' share_now_button = '//span[text()="Share now"]' page.locator(share_button).click() else: raise OperationFailed(f'不支持的帖子类型: {target_url}') page.locator(input_box).type(content, delay=30) _edit_privacy(page) page.click(share_now_button) time.sleep(1) page.wait_for_selector('//span[text()="Posting..."]', state='detached') time.sleep(1) success_tag = page.wait_for_selector('//span[text()="Shared to your profile"]') if not success_tag: raise OperationFailed('转发失败,原因未知') cookies = {i['name']: i['value'] for i in parse_cookies(cookies)} uid = cookies['c_user'] retry_goto(page, f'https://facebook.com/profile.php?id={uid}') page.wait_for_load_state() post_index = page.locator('//div[@aria-posinset="1"]//a[@role="link"]').nth(2) post_index.click() time.sleep(5) page.reload() post_url = page.url screenshot_content = _full_screenshot() except Error as e: raise OperationFailed(f'操作超时,请重试{e}') context.close() browser.close() key = f'screenshot/{uuid.uuid4()}.png' put_object(key, screenshot_content) return {'response_url': post_url, 'screenshot_key': key} if __name__ == '__main__': # cookies = '{"locale": "en_US", "datr": "ZnGnaBBx0yN7pov19-8_A6Gr", "sb": "ZnGnaDQicDSsVuevkudqio1J", "m_pixel_ratio": "1", "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61579364283503", "xs": "34%3AdWeZoaWzFrtdVQ%3A2%3A1755804022%3A-1%3A-1", "oo": "v1%7C3%3A1755804031"}' # cookies = {"c_user":"61565823476070","datr":"q13hZowje6bbViFxECQpYyp8","fr":"01C6Lt4VArm5hELvx.AWXg75HOo-QNJgbiDl8qFtw_5lc.Bm4V2r..AAA.0.0.Bm4V25.AWWHzUeMTuI","m_pixel_ratio":"1.875","sb":"q13hZgJARsRIDmNJG8xUauAe","wd":"384x686","xs":"50%3A8luhgQ-Ea0vnhg%3A2%3A1726045627%3A-1%3A-1"} cookies = {"locale": "en_US", "datr": "vBmxaKfb6cm0AhcefMHPSQO6", "sb": "vBmxaGdLX0gW8f4-cRs7nUtk", "m_pixel_ratio": "1", "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61552034433240", "fr": "0WwHWIVyPWFnUQJdK.AWc47U27HRH3lkVnoI2aLvYmWh9OtMnXPu_1tPSnENGUyO5p_4M.BosRm8..AAA.0.0.BosRnW.AWeWlm_2ZdtbsqcgNZYOw4T5QaI", "xs": "15%3AGhxgmwl-LvPNow%3A2%3A1756436950%3A-1%3A-1"} # post(cookies, 'cs2025') # like(cookies, 'ZmVlZGJhY2s6MTIyMTA5NjE0NjU0NzkzNzc5') # comment(cookies, 'ZmVlZGJhY2s6MTIyMTA5NjE0NjU0NzkzNzc5', 'game la', 'xzpq.mp4') # playwright_like(cookies, 'https://www.facebook.com/watch/?v=1007800324567828') # print(playwright_post(cookie, '2025-3-230~like')) # playwright_post(cookies, '2025-3-26~like', "") # playwright_comment( # cookies, # 'https://www.facebook.com/permalink.php?story_fbid=122096663738814448&id=61574433449058', # # 'https://www.facebook.com/watch/?v=1603348023628396', # # 'https://www.facebook.com/permalink.php?story_fbid=635052906055594&id=100086526695858', # # 'https://www.facebook.com/reel/3578555425778137', # '2025-3-26~like', # # 'rg.jpg' # ) # print(playwright_get_user_profile(cookies)) # print(_change_language(cookies)) # playwright_set_user_profile( # cookies, # username='facebaby66' # # firstname='Lisa', # # lastname='Keals', # # image_key='rg.jpg' # ) # cookies = '{"c_user":"61565405263653","datr":"-YDhZoLWu5zbUIw5cOB2In9s","fr":"0ZmsqLWbmV0Onlspt.AWW1JRfVxQAF-jl0oGY7lBQLYq4.Bm4YD5..AAA.0.0.Bm4YED.AWVf1ae03r4","m_page_voice":"61565405263653","m_pixel_ratio":"2.625","sb":"-YDhZs8LozUoyLe1gj2MCUwW","wd":"412x759","xs":"21%3A8Gt3CwtjVWJUhQ%3A2%3A1726054660%3A-1%3A-1"}' # cookies = '{"datr": "mm0taNtaPfOxWhpxdzpkVjV0", "sb": "mm0taFuFnO_L1FpzkKDiA4lw", "wd": "1920x953", "locale": "en_US", "c_user": "61575901481649", "fr": "0c0y2KyMv8lRJ6NNq.AWe7DLt-TSkoOyn3DhRjhA4ByOITAhfSwaiIw4eQE5ilq4Q4KAY.BoLW2a..AAA.0.0.BoLW3M.AWfHVOhZIAGgDh_3BvPFPi8-YhE", "xs": "29%3ASM0qc4U4Ile_MA%3A2%3A1747807693%3A-1%3A-1", "presence": "C%7B%22t3%22%3A%5B%5D%2C%22utc3%22%3A1747807698911%2C%22v%22%3A1%7D"}' # cookies = '{"locale": "en_US", "datr": "PaB4aGZCgstQYUkBHpEVnEe8", "sb": "PaB4aAgR68sRQtATM6v7gEu5", "m_pixel_ratio": "1", "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "100094571602733", "fr": "0g0qqVhuLyyrKSaUv.AWdif7wExy29FD7aMjwFvrQFqoBzz-S7Qbeg8la4QMVeGv43eLg.BoeKA9..AAA.0.0.BoeKBQ.AWdj3k5XKtwF766wY3n-cro4yw8", "xs": "15%3A52m6IVmYaMzM3Q%3A2%3A1752735825%3A-1%3A-1"}' # print(playwright_share(cookies, "https://www.facebook.com/groups/1702958116839437/permalink/2210833932718517/", "")) # print(playwright_get_user_profile(cookies)) # # 永久链接的帖子点赞 # print(playwright_like(cookies, "https://www.facebook.com/groups/1070754870427928/permalink/1873461830157224/")) print(playwright_like(cookies, "https://www.facebook.com/groups/267112859525987/permalink/658047397099196/?rdid=LETw8B1KYb7eUKUJ")) # # # # 视频链接的帖子点赞 # print(playwright_like(cookies, "https://www.facebook.com/groups/1070754870427928/permalink/1873461830157224/")) # # cookies = playwright_login('61576195641387', 'osman@@5', 'Z7U32HAU3RDVE6JE5WEXATOOH6DLPUHV') # print(cookies) # pass # 测试获取代理 print(get_proxy_from_api()) # cookies = playwright_m_login('61576995257902', 'djkufhhh', 'R56AR2LXBK664C6N4AQX2CPV2SL6FOFW') # print(cookies) # 点赞 # cookies_list = [ # # {"locale": "en_US", "datr": "edo_aHbvz3EnE6wLxMErpyJN", "sb": "edo_aB7zwegWek0KTQ1tx-FY", "m_pixel_ratio": "1", # # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576223713268", # # "fr": "0g90xhA5Gl8ZgOqxG.AWfoCPjWPzbDQn6pLym-URV-n2fHl9Ht9QSsvq-N2gHrVT4XfDk.BoP9p5..AAA.0.0.BoP9qH.AWcANS7YIylX14NqadUIosIZKdI", # # "xs": "48%3ASpsqohj8-YUsYw%3A2%3A1749015176%3A-1%3A-1"}, # # {"locale": "en_US", "datr": "Yds_aBYaJ-fce50yt7zP7ar7", "sb": "Yds_aKXiSyiKmG8a3SoFbwkZ", "m_pixel_ratio": "1", # # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576668058350", # # "fr": "0h7K7g58Qvbr1AK5k.AWfmgzZGs7oGSA3Ix7tGZjU2-UwTs7W2TjY7JW1K2Tq1eZfOgDM.BoP9th..AAA.0.0.BoP9tw.AWe4d26SD_BUw7vAd7hmkFe7Akc", # # "xs": "38%3APKGnjRBkSQ_TGg%3A2%3A1749015409%3A-1%3A-1"}, # {"locale": "en_US", "datr": "n9s_aFz_DGtnnY4ykLeM3K4t", "sb": "n9s_aIU7TDGsUvpTmZ3WEI2z", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576555533160", # "fr": "0M0A9DWmQqX090mhO.AWfLUhjX_kx8rBpnmFZQexb649CZfCySYWg2W7PxjpQM_ssHfeY.BoP9uf..AAA.0.0.BoP9un.AWeP2pc6-g_ZYLQOq_bUWKseAfo", # "xs": "38%3AkHgOiwbxyT7sfw%3A2%3A1749015463%3A-1%3A-1"}, # {"locale": "en_US", "datr": "_Ns_aFWk_p0EElGrAi_tNXEg", "sb": "_Ns_aEsFh0KeghxVRRafVAUZ", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576802091944", # "fr": "0Z0WLbZCuM5XHjXDJ.AWdmWevd43Nen9oYwCFjg5iWlx86SAkcCWVXx_4HwQy6-gRgjBc.BoP9v8..AAA.0.0.BoP9wI.AWfK64zgJ71c76MCv2gazDXBzbI", # "xs": "31%3AtV4oOWHr4Il5ug%3A2%3A1749015561%3A-1%3A-1"}, # {"locale": "en_US", "datr": "E-E_aIwPKZ30BUqS4q8JIfQI", "sb": "E-E_aNEmMEV78u_Vnm8tQGX6", "m_pixel_ratio": "1", # "wd": "1920x1080", "ps_l": "1", "ps_n": "1", "test_cookie": "CheckForPermission", "c_user": "61576325558767", # "fr": "0VvlERRijEeTAnppZ.AWcqoATDLUSVislwUMXIuDQYxxxf7ksmECHv--94j-wGTMQzk6U.BoP-ET..AAA.0.0.BoP-Eg.AWdR6VNnZSfB-_YWaA5XEWcyQvA", # "xs": "30%3AxljD-RO8defzCA%3A2%3A1749016865%3A-1%3A-1"}, # {"locale": "en_US", "datr": "JeE_aAX1P5smkZmQvLInd4ks", "sb": "JeE_aOyuAZsfMGpeUxsovVZD", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576711106140", # "fr": "07RdoRUs1SsUlq7ef.AWe3jMvGjMqd54IxTDl5vbLvGoOHRT1myhxqRRny7GRaIRszFTA.BoP-El..AAA.0.0.BoP-Eu.AWfyXHJMmkeyX0iwh1UXv_0if8Q", # "xs": "10%3AkZhoUr10WNy9ww%3A2%3A1749016879%3A-1%3A-1"}, # {"locale": "en_US", "datr": "MuE_aAucRyXVDioA3HWs1N0O", "sb": "MuE_aOmJUt_HZCAEkkimKKm4", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576218193987", # "fr": "0PTX91SU4KYxo8wSl.AWeeaJ5RiZemCmgKMZriUg8ZNd9rfzvzuM8mGYVxApULaImQUXk.BoP-Ey..AAA.0.0.BoP-FB.AWdQzDjj8jtqe9lKIvOExq6qF84", # "xs": "37%3ATE2pxPfwNwdj1g%3A2%3A1749016898%3A-1%3A-1"}, # {"locale": "en_US", "datr": "R-E_aNsGqOYN8pdKKM7W_Uex", "sb": "R-E_aNoFf6JeSlOoOmwMdYI4", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576332729468", # "fr": "0nU85lvzIaFTsdxjz.AWd0opLHfWkafYnwBhC_CzkdQFUXOQn65cetutGpbG3rSiuJRSo.BoP-FH..AAA.0.0.BoP-FU.AWfrmCKdqXpjSeHcu8z8xOTeKp4", # "xs": "50%3AJ9Xi34a6amKV1g%3A2%3A1749016917%3A-1%3A-1"}, # {"locale": "en_US", "datr": "WeE_aFQbLkcj3fSF25ZeCcfe", "sb": "WeE_aNEPhbw60DgzKrwPX_jE", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576570803564", # "fr": "0n8aDHJ9N2QuRqfQk.AWcKYZI4zd7qXFvbKAoYZGJ8rhUsOPJ1b8hdOR5fJpuEXRZK5mg.BoP-FZ..AAA.0.0.BoP-Fv.AWd8GCiXev5g05qJV1X_h-Wc5hQ", # "xs": "16%3AOYTGTKbKdBko6g%3A2%3A1749016944%3A-1%3A-1"}, # {"locale": "en_US", "datr": "d-E_aKwMl9BeWWx1cR4MzMad", "sb": "d-E_aBZiULc0sw8LQEoqj--A", "m_pixel_ratio": "1", # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576547344605", # "fr": "0Kccfs8cK4FoRKsFz.AWcjtGqjJwkD6lC_oTSjduL1w4AKD3ErAk877IvJKI3YI1-pUxQ.BoP-F3..AAA.0.0.BoP-GD.AWeg-MT8xtV9JelBtZuPnCkwg18", # "xs": "45%3AI8xuKEPRy222pA%3A2%3A1749016964%3A-1%3A-1"}, # ] # for cookies in cookies_list: # # # 视频链接的帖子点赞 # print(playwright_like(cookies, "https://www.facebook.com/groups/1070754870427928/permalink/1873461830157224/")) # 评论 # print(playwright_comment(cookies, "https://www.facebook.com/groups/7423373454348259/permalink/24322822973976709/", # "6"))