Files
py_facebook/spider/task.py
2026-02-06 23:07:46 +08:00

1275 lines
57 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import io
import json
import logging
import os
import random
import sys
import threading
import time
import uuid
from datetime import datetime
from string import ascii_lowercase
import pyotp
import pywintypes
import requests
import win32api
import win32con
from PIL import ImageGrab
from fake_useragent import UserAgent # 导入 UserAgent
from loguru import logger
from playwright._impl._page import Page
from playwright.sync_api import sync_playwright, Error, TimeoutError
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type, retry_if_exception
import const
from const import BUCKET, BASE_PATH
from exceptions import AuthException, OperationFailed
from miniofile import client, put_object
from spider.proxy_valid import ProxyChecker
def sleep(a, b=None):
if not b:
return time.sleep(a)
return time.sleep(round(random.uniform(a, b), 1))
def _full_screenshot():
im = ImageGrab.grab()
mem_file = io.BytesIO()
# 保存到内存中
im.save(mem_file, "png")
return mem_file
def update_windows_distinguish(x=1920, y=1080):
"""更改windows分辨率"""
if sys.platform == "win32":
devmode = pywintypes.DEVMODEType()
# screenSize = [1280,800]
screenSize = [x, y]
devmode.PelsWidth = screenSize[0]
devmode.PelsHeight = screenSize[1]
devmode.Fields = win32con.DM_PELSWIDTH | win32con.DM_PELSHEIGHT
win32api.ChangeDisplaySettings(devmode, 0)
def _change_language(page):
sleep(2, 3)
# 找到顶部头像按钮并点击
selector = '''
div[role="button"] svg image[style*="height: 40px"][style*="width: 40px"],
div[role="button"] svg image[style*="height:40px"][style*="width:40px"]
'''
page.wait_for_selector(selector, timeout=10000).click(force=True)
sleep(1, 2)
# 点击设置图标
page.wait_for_selector('//div[@role="list"]/div[@role="listitem"][1]', timeout=30000).click(force=True)
sleep(1, 2)
# 点击语言
page.wait_for_selector('//div[@role="menu"]/div[2]', timeout=10000).click(force=True)
sleep(1, 2)
# 点击切换语言
page.query_selector_all('//div[@role="list"]/div[@role="listitem"]')[6].click(force=True)
sleep(1, 2)
# 点击英文
page.wait_for_selector('//span[text()="English (US)"][1]', timeout=10000).click(force=True)
sleep(3, 5)
def _edit_privacy(page):
try:
time.sleep(5)
# 可能会有弹窗
update_settings = page.query_selector_all('//div[@aria-label="Update settings" and @role="button"]')
if len(update_settings) > 0:
set_public = page.query_selector('//span[text()="Set to Public"]')
if set_public:
set_public.click()
else:
raise OperationFailed("有弹窗但是无法点击设置公开帖子")
review_audience = page.query_selector_all('//h2/span[text()="Review audience"]')
if len(review_audience) > 0:
continue_btn = page.query_selector('//span[text()="Continue"]')
if continue_btn:
continue_btn.click()
sleep(1, 2)
page.click('//div[@aria-label="Select audience"]//span[text()="Public"]')
page.click('//div[@aria-label="Save"]')
sleep(1, 2)
return
else:
raise OperationFailed("有弹窗但是无法返回")
sleep(1, 2)
page.click('//div[contains(@aria-label, "Edit privacy")]')
sleep(1, 2)
page.click('//div[@aria-label="Select audience"]//span[text()="Public"]')
page.click('//div[@aria-label="Done"]')
sleep(1, 2)
except Error as e:
logger.error(f"Error editing privacy settings: {e}")
raise OperationFailed("编辑隐私设置失败") from e
import json
from typing import Union, List, Dict, Any
def parse_cookies(cookies: Union[str, dict, list], default_domain: str = '.facebook.com', default_path: str = '/') -> \
List[Dict[str, Any]]:
"""
解析多种格式的cookies为Playwright可用的格式
Args:
cookies: 可以是JSON字符串、字典或已经是列表格式
default_domain: 默认域名
default_path: 默认路径
Returns:
List of cookie dictionaries
"""
# 如果已经是列表直接返回添加locale
if isinstance(cookies, list):
# 检查是否已包含locale如果没有则添加
if not any(cookie.get('name') == 'locale' for cookie in cookies):
cookies.append({'name': "locale", 'value': "en_US", 'domain': default_domain, 'path': default_path})
return cookies
# 如果是字典
if isinstance(cookies, dict):
cookie_list = []
for k, v in cookies.items():
cookie_list.append({
'name': k,
'value': str(v),
'domain': default_domain,
'path': default_path
})
# 添加locale
if 'locale' not in cookies:
cookie_list.append({'name': "locale", 'value': "en_US", 'domain': default_domain, 'path': default_path})
return cookie_list
# 如果是字符串
if isinstance(cookies, str):
# 尝试解析为JSON
try:
parsed = json.loads(cookies)
return parse_cookies(parsed, default_domain, default_path) # 递归调用
except (json.JSONDecodeError, ValueError):
# 按Cookie字符串格式解析
cookie_list = []
for cookie_str in cookies.split(';'):
cookie_str = cookie_str.strip()
if not cookie_str:
continue
# 分割键值对
parts = cookie_str.split('=', 1)
if len(parts) == 2:
key, value = parts[0].strip(), parts[1].strip()
elif len(parts) == 1:
key, value = parts[0].strip(), ''
else:
continue
cookie_list.append({
'name': key,
'value': value,
'domain': default_domain,
'path': default_path
})
# 添加locale
if not any(cookie['name'] == 'locale' for cookie in cookie_list):
cookie_list.append({'name': "locale", 'value': "en_US", 'domain': default_domain, 'path': default_path})
return cookie_list
# 如果是不支持的类型
raise ValueError(f"Unsupported cookies type: {type(cookies)}")
def check_freeze_account(uid, max_retries=3, retry_delay=2, timeout=5):
"""
检查账号是否被冻结,超时自动重试
"""
headers = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
}
url = f"https://graph.facebook.com/{uid}/picture?type=normal"
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, allow_redirects=False, verify=False, timeout=timeout)
if response.status_code == 302:
if response.headers.get('Location') == 'https://static.xx.fbcdn.net/rsrc.php/v1/yh/r/C5yt7Cqf3zU.jpg':
raise AuthException('该账号已被冻结', 'frozen')
# 正常返回就 break
break
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
print(f"请求超时或连接错误,第{attempt + 1}次重试: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
raise OperationFailed("验证账号冻结失败") # 最后一次失败则抛出异常
def is_operation_failed(exception):
return isinstance(exception, OperationFailed) and "更改语言异常" in str(exception)
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(2),
retry=retry_if_exception(is_operation_failed)
)
def check_account_status(page, cookies):
# 检查是否冻结
cookies = {i['name']: i['value'] for i in cookies}
uid = cookies['c_user']
check_freeze_account(uid)
# 校验Cookies 是否失效
# 登录页面判断
retry_goto(page, 'https://www.facebook.com')
time.sleep(3)
login_btn = page.query_selector_all('//button[@name="login"]')
if login_btn:
raise AuthException('该账户登录状态失效', 'invalid')
create_btn = page.query_selector_all('//span[text()="Create new account"]')
if create_btn:
raise AuthException('该账户登录状态失效', 'invalid')
# 判断是否被检测到自动化,这种情况只需要点击按钮就可以继续
if page.query_selector(
'//span[text()="We suspect automated behaviour on your account" or text()="We suspect automated behavior on your account"]') is not None:
page.click('//span[text()="Dismiss"]')
time.sleep(3)
retry_goto(page, 'https://www.facebook.com')
# 判断是否需要运行cookies页面
if "flow=user_cookie_choice_v2&source=pft_user_cookie_choice" in page.url:
allow_cookies = page.query_selector('//div[@role="dialog"]/div/div/div/div/div[3]/div/div/div[1]')
if allow_cookies is None:
raise OperationFailed("允许cookies设置点击失败")
allow_cookies.click()
time.sleep(3)
return check_account_status(page, cookies)
# 判断是否为英文
lang = page.locator('html').get_attribute('lang')
if lang != "en":
try:
_change_language(page)
except TimeoutError:
raise OperationFailed('更改语言异常')
class RLock(threading._RLock):
pass
lock = RLock()
login_semaphore = threading.Semaphore(2)
def playwright_like(cookies, target_url):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with lock:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=False, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
check_account_status(page, parse_cookies(cookies))
url = 'https://facebook.com'
try:
retry_goto(page, url)
time.sleep(random.randint(3, 10))
if 'permalink.php?story_fbid' in target_url or '/permalink/' in target_url or '/posts/' in target_url:
# 文字或图片类型
button_xpath = '//*[@role="dialog"]//span[text()="Like" or @data-ad-rendering-role="like_button"]'
elif 'watch/?v' in target_url or '/videos/' in target_url:
# 视频类型, 视频类型,
button_xpath = '//span[@data-ad-rendering-role="like_button"][1]'
elif '/reel/' in target_url:
# 短视频类型
button_xpath = '//div[@class="__fb-dark-mode x1afcbsf x1uhb9sk x1swf91x"]//div[@aria-label="Like"]//div[@aria-label="Like"]'
else:
raise OperationFailed(f'不支持的帖子类型POST: {target_url}')
retry_goto(page, target_url)
sleep(1, 2)
# 判断按钮是否存在
button = page.query_selector(button_xpath)
if button:
# 滚动到按钮所在处
button.scroll_into_view_if_needed()
# 判断按钮是否已经点过赞
if button.get_attribute('style') == "" or button.get_attribute('style') is None:
# 未点过赞进行点赞操作
button.click(force=True)
else:
raise OperationFailed("未找到点赞按钮")
time.sleep(10)
except Error as e:
raise OperationFailed(f'操作超时,请重试{e}')
screenshot_content = _full_screenshot()
context.close()
browser.close()
key = f'screenshot/{uuid.uuid4()}.png'
put_object(key, screenshot_content)
return {
'response_url': target_url,
'screenshot_key': key
}
# 获取个个人主页视频数量
def get_post_count(page, cookies):
# 进入个人主页视频页面获取最新视频链接
cookies = {i['name']: i['value'] for i in parse_cookies(cookies)}
uid = cookies['c_user']
videos_url = f"https://www.facebook.com/profile.php?id={uid}" # 替换为实际的个人主页URL
retry_goto(page, videos_url)
# 移动页面最底部
# 获取初始高度
previous_height = page.evaluate("document.body.scrollHeight")
logger.info(f"初始高度: {previous_height}")
while True:
# 滚动到底部
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
# 等待内容加载
page.wait_for_timeout(3000)
# 获取新高度
new_height = page.evaluate("document.body.scrollHeight")
logger.info(f"新高度: {new_height}")
# 如果高度没有变化,说明已经到底部
if new_height == previous_height:
logger.info("已经滚动到底部")
break
previous_height = new_height
# 安全限制,防止无限循环
if previous_height > 10000: # 假设最大高度
logger.info("达到最大滚动限制")
break
# 这里还是获取视频
posts = page.query_selector_all('//div[@aria-posinset]')
# 返回数量
logger.info(f"账号{uid} 获取到帖子数量为{len(posts)}")
return len(posts)
def retry_get_new_video(page, cookies, post_count):
"""
每30秒重试一次5分钟内没获取到新视频则抛出Timeout异常
Args:
page: Playwright页面对象
cookies: Cookies
post_count: 初始帖子数量
Returns:
函数执行结果或抛出Timeout如果超时
"""
max_duration = 10 * 60 # 5分钟
retry_interval = 30 # 30秒重试一次
start_time = time.time()
attempt = 1
while time.time() - start_time < max_duration:
# 获取当前帖子数量
new_post_count = get_post_count(page, cookies)
# 如果新帖子数量大于初始帖子数量,则表示有新帖子上传
if new_post_count > post_count:
try:
# 尝试点击视频
comment_buttons = page.query_selector_all('//div[@aria-posinset]')
if comment_buttons:
# 使用js去点击第一个评论按钮
element = page.query_selector_all('//div[@aria-posinset]')[0]
# 获取元素位置
bounding_box = element.bounding_box()
if bounding_box:
# 滚动到特定位置
page.evaluate(
f''' () => {{ window.scrollTo({{ top: {bounding_box['y']} - window.innerHeight / 2, left: {bounding_box['x']} - window.innerWidth / 2, behavior: 'smooth' }}); }} ''')
# 等待滚动完成
page.wait_for_timeout(1000)
# 点击
element.click()
page.reload(timeout=180000)
time.sleep(random.randint(3, 5))
return page.url
except Exception as e:
raise OperationFailed(f"点击视频时出错: {e}")
# 计算下一次重试时间
elapsed = time.time() - start_time
remaining_time = max_duration - elapsed
if remaining_time > 0:
# 等待30秒或剩余时间取较小值
sleep_time = min(retry_interval, remaining_time)
print(f"{attempt}次尝试,等待 {sleep_time:.1f} 秒后重试... (剩余时间: {remaining_time:.1f}秒)")
time.sleep(sleep_time)
attempt += 1
print("5分钟超时退出重试")
raise TimeoutError("未获取到新视频(可能视频上传失败),已超时")
def playwright_post(cookies, content, image_key=None):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with lock:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=False, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
page.evaluate(f'document.body.style.zoom = "{const.DISPLAY_SCALE}"')
check_account_status(page, parse_cookies(cookies))
# 声明默认发布视频数量
video_count = 0
url = 'https://facebook.com'
try:
# 先获取视频数量
if image_key is not None and ".mp4" in image_key:
video_count = get_post_count(page, cookies)
retry_goto(page, url)
time.sleep(random.randint(3, 10))
time.sleep(5)
if image_key:
filename = image_key.split('/')[-1]
file_path = os.path.join(BASE_PATH, 'files', filename)
client.fget_object(BUCKET, image_key, file_path)
sleep(1, 2)
page.locator('input[accept="image/*,image/heif,image/heic,video/*,video/mp4,video/x-m4v,'
'video/x-matroska,.mkv"]').set_input_files(file_path)
time.sleep(5)
if not image_key:
page.click('''//span[contains(text(), "What's on your mind")]''')
_edit_privacy(page)
page.type('''//div[contains(@aria-placeholder, "What's on your mind")]''', content, delay=50,
timeout=300000)
page.click('//div[@aria-label="Post"]', timeout=300000)
time.sleep(15)
post_index = page.locator('//div[@aria-posinset="1"]//a[@role="link"]').nth(2)
post_index.click(timeout=600000)
time.sleep(5)
page.reload(timeout=180000)
post_url = page.url
# 视频格式要单独去获取链接
if image_key is not None and ".mp4" in image_key:
post_url = retry_get_new_video(page, cookies, video_count)
time.sleep(random.randint(3, 10))
except Error as e:
raise OperationFailed(f'操作超时,请重试{e}')
screenshot_content = _full_screenshot()
if image_key:
os.remove(file_path)
context.close()
browser.close()
key = f'screenshot/{uuid.uuid4()}.png'
put_object(key, screenshot_content)
return {'response_url': post_url, 'screenshot_key': key}
def playwright_comment(cookies, target_url, content, image_key=None):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with lock:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=False, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
check_account_status(page, parse_cookies(cookies))
try:
retry_goto(page, target_url)
sleep(1, 2)
if 'permalink.php?story_fbid' in target_url or '/posts/' in target_url or "/permalink/" in target_url:
# 文字或图片类型
input_xpath = '//div[@role="dialog"]//span[text()="Comment" or@data-ad-rendering-role="comment_button"]'
attach_xpath = '//div[@id="focused-state-actions-list"]//div[@aria-label="Attach a photo or video"]'
comment_xpath = '//div[@aria-label="Comment"]'
page.click(input_xpath)
sleep(1, 2)
elif 'watch/?v' in target_url or '/videos/' in target_url:
# 视频类型
input_xpath = '//div[@aria-label="Write a comment…"]'
attach_xpath = '//div[@aria-label="Attach a photo or video"]'
comment_xpath = '//div[@aria-label="Comment"]'
elif '/reel/' in target_url:
# 短视频类型
input_xpath = '//div[@aria-label="Write a comment…"]'
attach_xpath = '//div[@aria-label="Attach a photo or video"]'
comment_xpath = '//div[@role="complementary"]//div[@aria-label="Comment"]'
page.click('//div[@aria-label="Comment"][1]')
sleep(1, 2)
else:
raise OperationFailed(f'不支持的帖子类型POST: {target_url}')
if image_key:
filename = image_key.split('/')[-1]
file_path = os.path.join(BASE_PATH, 'files', filename)
client.fget_object(BUCKET, image_key, file_path)
sleep(1, 2)
with page.expect_file_chooser() as fc_info:
page.click(attach_xpath)
file_chooser = fc_info.value
file_chooser.set_files(file_path)
time.sleep(5)
page.type(input_xpath, content, delay=50)
page.click(comment_xpath)
time.sleep(5)
except Error as e:
raise OperationFailed(f'操作超时,请重试{e}')
screenshot_content = _full_screenshot()
if image_key:
os.remove(file_path)
context.close()
browser.close()
key = f'screenshot/{uuid.uuid4()}.png'
put_object(key, screenshot_content)
return {
'response_url': target_url,
'screenshot_key': key
}
def playwright_get_user_profile(cookies, username=None):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with login_semaphore:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=const.HEADLESS, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
check_account_status(page, parse_cookies(cookies))
cookies_dict = {i['name']: i['value'] for i in parse_cookies(cookies)}
uid = cookies_dict["c_user"]
url = f'https://accountscenter.facebook.com/profiles/{uid}'
try:
retry_goto(page, url)
time.sleep(random.randint(3, 10))
profile_pic_url = page.query_selector('//*[@preserveAspectRatio="xMidYMid slice"]').get_attribute(
'xlink:href')
page.query_selector('//div[@role="dialog"]//div[@role="listitem"]').click()
firstname = page.locator('//label[text()="First name"]/../input').input_value()
try:
lastname = page.get_by_label("Surname").input_value()
except:
lastname = page.get_by_label("Last name").input_value()
except Error as e:
raise OperationFailed(f'操作超时,请重试{e}')
context.close()
browser.close()
response = requests.get(
url=profile_pic_url,
headers={
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36'},
# proxies={
# 'http': 'http://127.0.0.1:10889',
# 'https': 'http://127.0.0.1:10889',
# }
)
bio = io.BytesIO(response.content)
key = f"{lastname + firstname.replace(' ', '_')}.png"
put_object(key, bio)
return {'avatar_key': key, 'username': lastname + firstname, 'first_name': firstname, 'last_name': lastname}
def playwright_set_user_profile(cookies, username=None, first_name=None, last_name=None, avatar_key=None):
if not first_name and not last_name and not avatar_key and not username:
return
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with login_semaphore:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=const.HEADLESS, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
check_account_status(page, parse_cookies(cookies))
try:
cookies_dict = {i['name']: i['value'] for i in parse_cookies(cookies)}
uid = cookies_dict["c_user"]
url = f'https://accountscenter.facebook.com/profiles/{uid}'
retry_goto(page, url)
if first_name or last_name:
if first_name and last_name:
# 修改名称
page.click('//a[@aria-label="Name"]')
input_first_name = page.locator('//input').first.input_value()
input_last_name = page.locator('//input').last.input_value()
if input_first_name != first_name or input_last_name != last_name:
# 判断是否发生改变
page.locator('//input').first.fill(first_name)
page.locator('//input').last.fill(last_name)
page.locator('//div[@role="button"]').last.click()
page.click('//span[text()="Done"]')
else:
raise OperationFailed('名称中必须有First name和Last name')
if avatar_key:
# 修改头像
retry_goto(page, url)
page.click('//a[@aria-label="Profile picture"]')
filename = avatar_key.split('/')[-1]
file_path = os.path.join(BASE_PATH, 'files', filename)
client.fget_object(BUCKET, avatar_key, file_path)
sleep(1, 2)
with page.expect_file_chooser() as fc_info:
page.click('//div[text()="Upload new photo"]')
file_chooser = fc_info.value
file_chooser.set_files(file_path)
page.locator('//span[text()="Save"]').last.click()
time.sleep(5)
os.remove(file_path)
except Error as e:
raise OperationFailed(f'操作超时,请重试{e}')
context.close()
browser.close()
return {
'first_name': first_name,
'last_name': last_name,
'avatar_key': avatar_key,
}
def playwright_check_account_cookies(cookies):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with login_semaphore:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=const.HEADLESS, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
check_account_status(page, parse_cookies(cookies))
context.close()
browser.close()
return {}
def get_login_continue_btn(page):
bs = page.query_selector_all('//span[text()="Continue"]')
for b in bs:
if b.is_visible() and b.is_enabled():
return b
return None
def retry_goto(page: "Page", url: str, max_retries: int = 3, retry_delay: int = 5):
"""
Attempts to navigate to a URL with retries on timeout using synchronous Playwright.
Args:
page: The synchronous Playwright Page object.
url: The URL to navigate to.
max_retries: Maximum number of retry attempts (including the initial attempt).
retry_delay: Delay in seconds between retries.
"""
# ... potentially other code before goto ...
for attempt in range(max_retries):
try:
if attempt > 0:
logger.info(f"Retrying navigation to {url}, attempt {attempt + 1}/{max_retries}...")
else:
logger.info(f"Navigating to {url}, initial attempt...")
# Make the synchronous goto call within the try block
# Use wait_until='load' as in your traceback, or adjust if needed
page.goto(url, timeout=30000, wait_until="domcontentloaded") # Use the synchronous call
print(f"Successfully navigated to {url} on attempt {attempt + 1}")
break # Navigation was successful, exit the retry loop
except TimeoutError as e:
print(f"Navigation to {url} timed out on attempt {attempt + 1}.")
if attempt < max_retries - 1:
print(f"Waiting {retry_delay} seconds before retrying...")
time.sleep(retry_delay) # Use synchronous sleep
else:
print(f"All {max_retries} attempts failed for {url}.")
# If all retries fail, re-raise the exception
raise e
except Exception as e:
# Catch any other unexpected errors during goto
print(f"An unexpected error occurred during navigation to {url} on attempt {attempt + 1}: {e}")
# Decide if other exceptions should also trigger retries
# For now, we'll just re-raise other exceptions immediately
raise e
def playwright_login(username, password, code_2fa=None):
logger.info(f"登录账号{username}")
# 检查是否冻结
check_freeze_account(username)
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with login_semaphore:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=const.HEADLESS, args=['--start-maximized'], executable_path=path
)
random_user_agent = UserAgent().getBrowser("Chrome").get("useragent")
logger.info(f"使用ua={random_user_agent}")
context = browser.new_context(no_viewport=True, user_agent=random_user_agent)
# 设置语言为英文
context.add_cookies([
{
"name": "locale",
"value": "en_US",
"domain": ".facebook.com",
"path": "/",
"expires": -1,
"httpOnly": True,
"secure": False,
},
])
page = context.new_page()
url = 'https://www.facebook.com'
retry_goto(page, url)
page.locator('//input[@id="email"]').type(username, delay=30)
time.sleep(random.randint(1, 3))
page.locator('//input[@id="pass"]').type(password, delay=30)
time.sleep(random.randint(1, 3))
page.click('//button[@name="login"]')
page.wait_for_load_state()
time.sleep(random.randint(3, 5))
arkose_captcha = page.query_selector('#arkose-captcha')
if arkose_captcha:
logger.info(f"账号{username} 弹语音识别验证")
raise OperationFailed("操作失败")
arkose_captcha = page.query_selector('#captcha-recaptcha')
if arkose_captcha:
logger.info(f"账号{username} 弹谷歌验证")
raise OperationFailed("操作失败")
captcha_img = page.query_selector('//img[contains(@src, "captcha")]')
if captcha_img:
logger.info(f"账号{username} 需要验证")
data = {
'user': 'ycxxkj',
'pass2': 'B4DBF06831577C6558F823879061626C',
'softid': '951004',
'codetype': '3006',
}
response = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=data,
timeout=60, files={'userfile': ('ccc.jpg', captcha_img.screenshot())})
result = response.json()
if result['err_no'] == 0:
pic_str = result['pic_str']
page.fill('//img[contains(@src, "captcha")]/parent::div/parent::div//input', pic_str)
page.locator('//img[contains(@src, "captcha")]/parent::div/parent::div/div').nth(4).click()
else:
raise OperationFailed('验证码解析错误')
# 检查是否还有验证码, 隐式等待60秒
page.wait_for_selector(
'//span[@class="x1lliihq x1plvlek xryxfnj x1n2onr6 x1ji0vk5 x18bv5gf x193iq5w xeuugli x1fj9vlw x13faqbe x1vvkbs x1s928wv xhkezso x1gmr53x x1cpjm7i x1fgarty x1943h6x x1qo61fq x81x36d xa4e6wy x1rhavg7 xzsf02u x1yc453h xudqn12 x3x7a5m x1yztbdb"]',
timeout=60000)
h2 = page.wait_for_selector(
'//div[@class="x1n2onr6 x1ja2u2z x9f619 x78zum5 xdt5ytf x2lah0s x193iq5w"]//h2/span', timeout=60000)
if h2 is None:
raise OperationFailed('页面有误')
else:
text_contexts = [
"Go to your authentication app",
"Check your notifications on another device"
]
if not h2.text_content() in text_contexts:
logger.info(f"账号{username} 操作失败")
raise OperationFailed("操作失败")
auth_span = page.query_selector('//span[text()="Try Another Way" or text()="Try another way"]')
if auth_span:
if not code_2fa:
raise OperationFailed('缺少2FA密钥')
auth_code = pyotp.TOTP(code_2fa).now()
auth_span.click()
time.sleep(1)
page.click('//div[text()="Authentication app"]')
time.sleep(1)
# 可能会有多个Continue
btn = get_login_continue_btn(page)
if btn is None:
raise OperationFailed("操作失败")
btn.click()
time.sleep(1)
page.locator('//label[text()="Code"]/preceding-sibling::input').fill(auth_code)
page.click('//span[text()="Continue"]')
# 等待登录成功页面出来
page.wait_for_selector(
"xpath=//h2[normalize-space()='Youre logged in. Trust this device?'] or //span[text()='Save']",
timeout=60000)
c = {i['name']: i['value'] for i in context.cookies()}
if c["c_user"] is None:
raise OperationFailed("操作失败")
logger.info(f"登录账号{username} 登录成功")
context.close()
browser.close()
return {'cookies': json.dumps(c)}
def get_proxy_from_api():
"""从代理API获取代理地址"""
if os.getenv("dev") is not None:
return f"http://127.0.0.1:1080"
try:
# 假设你的代理API地址
proxy_api_url = const.PROXY_HOST
response = requests.get(proxy_api_url)
if response.status_code == 200:
proxy_data = response.json()
# 根据你的API返回格式调整
p = proxy_data.get("proxy")
if p:
return f"http://{p}"
else:
raise OperationFailed(f"获取代理时出错={proxy_data}")
raise OperationFailed(f"获取代理时出错")
except Exception as e:
raise OperationFailed("获取代理时出错") from e
def login_with_2fa_retry(page, code_2fa, max_retries=3):
"""
带重试机制的2FA登录
"""
retry_count = 0
while retry_count < max_retries:
try:
print(f"{retry_count + 1} 次尝试2FA验证...")
# 生成2FA验证码
auth_code = pyotp.TOTP(code_2fa).now()
print(f"生成的2FA验证码: {auth_code}")
# 输入验证码
code_input = page.wait_for_selector(
'input[aria-label="Code"][type="text"]',
timeout=60000
)
code_input.fill(auth_code)
# 点击继续
page.query_selector('div[role="button"][aria-label="Continue"]').click()
# 检查是否出现错误信息
try:
error_element = page.wait_for_selector(
'span:has-text("This code doesnt work. Check its correct or try a new one.")', timeout=3000)
if error_element and error_element.is_visible():
print("2FA验证码错误准备重试...")
retry_count += 1
# 清空输入框重新输入
code_input.fill("")
# 如果不是最后一次重试,等待一段时间再重试
if retry_count < max_retries:
wait_time = 5 # 等待5秒
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
except TimeoutError:
logger.info("2FA验证成功")
return True
except Exception as e:
logger.info(f"2FA验证发生未知错误: {e}")
retry_count += 1
if retry_count < max_retries:
time.sleep(3)
continue
def is_timeout_error(exception):
return isinstance(exception, TimeoutError)
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(2),
retry=retry_if_exception_type(TimeoutError)
)
def playwright_m_login(username, password, code_2fa=None):
logger.info(f"登录账号{username}")
# 检查是否冻结
check_freeze_account(username)
logger.info(f"账号{username} 未冻结")
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with lock:
with sync_playwright() as playwright:
update_windows_distinguish()
max_proxy_retries = 3
proxy_retry_count = 0
while proxy_retry_count < max_proxy_retries:
proxy_url = ProxyChecker(timeout=8).get_valid_proxy_url()
if proxy_url is None:
raise OperationFailed("获取代理失败")
logger.info(f"使用proxy={proxy_url}")
browser = playwright.chromium.launch(
headless=const.HEADLESS, args=['--start-maximized'], executable_path=path, proxy={
"server": proxy_url,
}
)
# random_user_agent = UserAgent().getBrowser(["Chrome Mobile iOS"]).get("useragent")
random_user_agent = "Mozilla/5.0 (Linux; Android 12; Pixel 6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Mobile Safari/537.36"
logger.info(f"使用ua={random_user_agent}")
context = browser.new_context(no_viewport=True, user_agent=random_user_agent)
# 设置语言为英文
context.add_cookies([
{
"name": "locale",
"value": "en_US",
"domain": ".facebook.com",
"path": "/",
"expires": -1,
"httpOnly": True,
"secure": False,
},
])
page = context.new_page()
url = 'https://m.facebook.com/login'
retry_goto(page, url)
# 概率会跳到首页
have_account = 'div[role="button"][aria-label="I already have an account"]'
# 用户名输入框框
input_email = "input#m_login_email"
try:
page.wait_for_selector(f'{have_account}, {input_email}', timeout=60000)
except Exception as e:
logger.error(f"页面加载异常, 未定位到按钮", exc_info=True)
raise OperationFailed("系统异常")
hava_account_btn = page.query_selector(have_account)
if hava_account_btn:
hava_account_btn.click()
page.wait_for_selector(f'{input_email}', timeout=60000).type(username, delay=30)
time.sleep(1)
page.wait_for_selector(f'//input[@id="m_login_password"]', timeout=60000).type(password, delay=30)
page.click('div[aria-label="Log in"]')
success_login_selector1 = 'span:has-text("Check your notifications on another device")'
success_login_selector2 = 'span:has-text("Go to your authentication app")'
failed_login_selector = 'div[data-bloks-name="bk.components.dialog.Dialog"] > div[aria-label="Wrong Credentials"] > div:nth-child(1)'
# 等成功或失败
page.wait_for_selector(f'{success_login_selector1},{success_login_selector2},{failed_login_selector}',
timeout=60000)
# 判断是否失败
failed_login = page.query_selector(failed_login_selector)
if failed_login:
# 登录失败, 返回失败信息
page.screenshot(
path=f'./log/screenshot/{username}_{time.strftime("%Y-%m-%d%H-%M-%S", time.localtime())}_login_failed.png')
logger.info(f"用户名:{username} 密码错误 {failed_login.text_content()}")
# 关闭当前浏览器
browser.close()
proxy_retry_count += 1
logger.info(f"代理重试 {proxy_retry_count}/{max_proxy_retries}")
if proxy_retry_count < max_proxy_retries:
time.sleep(3) # 等待一下再重试
continue
else:
browser.close()
raise OperationFailed(failed_login.text_content())
else:
break # 退出代理重试循环
# 判断是否有多重验证
success_login1 = page.query_selector(success_login_selector1)
if success_login1:
# 点击尝试另一种方式验证账号
page.query_selector('div[role="button"][aria-label="Try another way"]').click()
# 点击选择app验证
page.wait_for_selector(
'span:has-text("Authentication app")').click()
# 点击继续
page.query_selector('div[role="button"][aria-label="Continue"]').click()
# 等待页面
page.wait_for_selector('span:has-text("Go to your authentication app")', timeout=60000)
# 输入2faCode
try:
login_with_2fa_retry(page, code_2fa)
except Exception as e:
logger.error(f"2FA验证失败: {e}", exc_info=True)
raise OperationFailed(f"2FA验证失败: {e}")
# 等待登录成功, 账号被检测自动化程序会卡在这里. 直接忽略报错
try:
page.wait_for_selector('img[data-bloks-name="bk.components.Image"][src*=".fbcdn.net/v/t"]',
timeout=60000)
except Exception as e:
pass
# 成功
logger.info(f"用户名:{username} 账号密码正确")
c = {i['name']: i['value'] for i in context.cookies()}
if c["c_user"] is None:
raise OperationFailed("操作失败")
logger.info(f"登录账号{username} 登录成功")
context.close()
browser.close()
return {'cookies': json.dumps(c)}
def playwright_share(cookies, target_url, content):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with lock:
with sync_playwright() as playwright:
update_windows_distinguish()
browser = playwright.chromium.launch(
headless=False, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
page = context.new_page()
check_account_status(page, parse_cookies(cookies))
try:
retry_goto(page, target_url)
if 'permalink.php?story_fbid' in target_url or '/posts/' in target_url or "/permalink/" in target_url:
# 文字或图片类型
share_button = page.query_selector(
'//*[@role="dialog"]//div[@data-ad-rendering-role="share_button"]')
input_box = '//form[@method="POST" and count(@*) = 1]/div/div/div[2]'
share_now_button = '//span[text()="Share now"]'
share_button.scroll_into_view_if_needed()
page.evaluate("(element) => element.click()", share_button)
time.sleep(1)
elif 'watch/?v' in target_url or '/videos/' in target_url or 'watch?v' in target_url:
# 视频类型, 视频类型,
share_button = '//span[@dir="auto" and text()="Share"]'
input_box = '//form[@method="POST" and count(@*) = 1]/div/div/div[2]'
share_now_button = '//span[text()="Share now"]'
page.locator(share_button).first.click()
elif '/reel/' in target_url:
# 短视频类型
share_button = '//div[@aria-label="Share"]'
input_box = '//form[@method="POST" and count(@*) = 1]/div/div/div[2]'
share_now_button = '//span[text()="Share now"]'
page.locator(share_button).click()
else:
raise OperationFailed(f'不支持的帖子类型: {target_url}')
page.locator(input_box).type(content, delay=30)
_edit_privacy(page)
page.click(share_now_button)
time.sleep(1)
page.wait_for_selector('//span[text()="Posting..."]', state='detached')
time.sleep(1)
success_tag = page.wait_for_selector('//span[text()="Shared to your profile"]')
if not success_tag:
raise OperationFailed('转发失败,原因未知')
cookies = {i['name']: i['value'] for i in parse_cookies(cookies)}
uid = cookies['c_user']
retry_goto(page, f'https://facebook.com/profile.php?id={uid}')
page.wait_for_load_state()
post_index = page.locator('//div[@aria-posinset="1"]//a[@role="link"]').nth(2)
post_index.click()
time.sleep(5)
page.reload()
post_url = page.url
screenshot_content = _full_screenshot()
except Error as e:
raise OperationFailed(f'操作超时,请重试{e}')
context.close()
browser.close()
key = f'screenshot/{uuid.uuid4()}.png'
put_object(key, screenshot_content)
return {'response_url': post_url, 'screenshot_key': key}
if __name__ == '__main__':
cookies = {"c_user": "61587089832795", "datr": "aN5-aZxlIs_oAlMTY-3CMwOc", "oo": "v13:1769922152",
"xs": "15:n1l7-jQRvQ2mnA:2:1769922167:-1:-1"}
# print(playwright_set_user_profile(cookies, "61584735094876", "Inaaya", "Inaaya",
# "facebook/user_upload/0196f098-851c-7810-b2aa-0833a0a7b09d/8c428558-2d90-4f45-baa3-a25f8a654b5c.png"))
# cookies = '{"locale": "en_US", "datr": "ZnGnaBBx0yN7pov19-8_A6Gr", "sb": "ZnGnaDQicDSsVuevkudqio1J", "m_pixel_ratio": "1", "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61579364283503", "xs": "34%3AdWeZoaWzFrtdVQ%3A2%3A1755804022%3A-1%3A-1", "oo": "v1%7C3%3A1755804031"}'
# cookies = {"c_user":"61565823476070","datr":"q13hZowje6bbViFxECQpYyp8","fr":"01C6Lt4VArm5hELvx.AWXg75HOo-QNJgbiDl8qFtw_5lc.Bm4V2r..AAA.0.0.Bm4V25.AWWHzUeMTuI","m_pixel_ratio":"1.875","sb":"q13hZgJARsRIDmNJG8xUauAe","wd":"384x686","xs":"50%3A8luhgQ-Ea0vnhg%3A2%3A1726045627%3A-1%3A-1"}
# cookies = {"locale": "en_US", "datr": "vBmxaKfb6cm0AhcefMHPSQO6", "sb": "vBmxaGdLX0gW8f4-cRs7nUtk",
# "m_pixel_ratio": "1", "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61552034433240",
# "fr": "0WwHWIVyPWFnUQJdK.AWc47U27HRH3lkVnoI2aLvYmWh9OtMnXPu_1tPSnENGUyO5p_4M.BosRm8..AAA.0.0.BosRnW.AWeWlm_2ZdtbsqcgNZYOw4T5QaI",
# "xs": "15%3AGhxgmwl-LvPNow%3A2%3A1756436950%3A-1%3A-1"}
# post(cookies, 'cs2025')
# like(cookies, 'ZmVlZGJhY2s6MTIyMTA5NjE0NjU0NzkzNzc5')
# comment(cookies, 'ZmVlZGJhY2s6MTIyMTA5NjE0NjU0NzkzNzc5', 'game la', 'xzpq.mp4')
# playwright_like(cookies, 'https://www.facebook.com/watch/?v=1007800324567828')
print(playwright_post(cookies, '1111'))
# playwright_post(cookies, '2025-3-26~like', "")
# playwright_comment(
# cookies,
# 'https://www.facebook.com/permalink.php?story_fbid=122096663738814448&id=61574433449058',
# # 'https://www.facebook.com/watch/?v=1603348023628396',
# # 'https://www.facebook.com/permalink.php?story_fbid=635052906055594&id=100086526695858',
# # 'https://www.facebook.com/reel/3578555425778137',
# '2025-3-26~like',
# # 'rg.jpg'
# )
# print(playwright_get_user_profile(cookies))
# print(_change_language(cookies))
# playwright_set_user_profile(
# cookies,
# username='facebaby66'
# # firstname='Lisa',
# # lastname='Keals',
# # image_key='rg.jpg'
# )
# cookies = '{"c_user":"61565405263653","datr":"-YDhZoLWu5zbUIw5cOB2In9s","fr":"0ZmsqLWbmV0Onlspt.AWW1JRfVxQAF-jl0oGY7lBQLYq4.Bm4YD5..AAA.0.0.Bm4YED.AWVf1ae03r4","m_page_voice":"61565405263653","m_pixel_ratio":"2.625","sb":"-YDhZs8LozUoyLe1gj2MCUwW","wd":"412x759","xs":"21%3A8Gt3CwtjVWJUhQ%3A2%3A1726054660%3A-1%3A-1"}'
# cookies = '{"datr": "mm0taNtaPfOxWhpxdzpkVjV0", "sb": "mm0taFuFnO_L1FpzkKDiA4lw", "wd": "1920x953", "locale": "en_US", "c_user": "61575901481649", "fr": "0c0y2KyMv8lRJ6NNq.AWe7DLt-TSkoOyn3DhRjhA4ByOITAhfSwaiIw4eQE5ilq4Q4KAY.BoLW2a..AAA.0.0.BoLW3M.AWfHVOhZIAGgDh_3BvPFPi8-YhE", "xs": "29%3ASM0qc4U4Ile_MA%3A2%3A1747807693%3A-1%3A-1", "presence": "C%7B%22t3%22%3A%5B%5D%2C%22utc3%22%3A1747807698911%2C%22v%22%3A1%7D"}'
# cookies = '{"locale": "en_US", "datr": "PaB4aGZCgstQYUkBHpEVnEe8", "sb": "PaB4aAgR68sRQtATM6v7gEu5", "m_pixel_ratio": "1", "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "100094571602733", "fr": "0g0qqVhuLyyrKSaUv.AWdif7wExy29FD7aMjwFvrQFqoBzz-S7Qbeg8la4QMVeGv43eLg.BoeKA9..AAA.0.0.BoeKBQ.AWdj3k5XKtwF766wY3n-cro4yw8", "xs": "15%3A52m6IVmYaMzM3Q%3A2%3A1752735825%3A-1%3A-1"}'
# print(playwright_share(cookies, "https://www.facebook.com/groups/1702958116839437/permalink/2210833932718517/", ""))
# print(playwright_get_user_profile(cookies))
# # 永久链接的帖子点赞
# print(playwright_like(cookies, "https://www.facebook.com/groups/1070754870427928/permalink/1873461830157224/"))
# print(playwright_share(cookies,
# "https://www.facebook.com/permalink.php?story_fbid=pfbid02BMYFCJTTG7WedezZzRn6YFEbrBg4gpUJKExMLTTdZycQFF2wuhbr8oWvmvUBiKTNl&id=61556406998616",
# "郭正亮痛斥賴清德笑什麼,全台灣都想問"))
#
# # cookies = playwright_m_login('61576995257902', 'djkufhhh', 'R56AR2LXBK664C6N4AQX2CPV2SL6FOFW')
# # print(cookies)
#
# # 点赞
# # cookies_list = [
# # # {"locale": "en_US", "datr": "edo_aHbvz3EnE6wLxMErpyJN", "sb": "edo_aB7zwegWek0KTQ1tx-FY", "m_pixel_ratio": "1",
# # # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576223713268",
# # # "fr": "0g90xhA5Gl8ZgOqxG.AWfoCPjWPzbDQn6pLym-URV-n2fHl9Ht9QSsvq-N2gHrVT4XfDk.BoP9p5..AAA.0.0.BoP9qH.AWcANS7YIylX14NqadUIosIZKdI",
# # # "xs": "48%3ASpsqohj8-YUsYw%3A2%3A1749015176%3A-1%3A-1"},
# # # {"locale": "en_US", "datr": "Yds_aBYaJ-fce50yt7zP7ar7", "sb": "Yds_aKXiSyiKmG8a3SoFbwkZ", "m_pixel_ratio": "1",
# # # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576668058350",
# # # "fr": "0h7K7g58Qvbr1AK5k.AWfmgzZGs7oGSA3Ix7tGZjU2-UwTs7W2TjY7JW1K2Tq1eZfOgDM.BoP9th..AAA.0.0.BoP9tw.AWe4d26SD_BUw7vAd7hmkFe7Akc",
# # # "xs": "38%3APKGnjRBkSQ_TGg%3A2%3A1749015409%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "n9s_aFz_DGtnnY4ykLeM3K4t", "sb": "n9s_aIU7TDGsUvpTmZ3WEI2z", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576555533160",
# # "fr": "0M0A9DWmQqX090mhO.AWfLUhjX_kx8rBpnmFZQexb649CZfCySYWg2W7PxjpQM_ssHfeY.BoP9uf..AAA.0.0.BoP9un.AWeP2pc6-g_ZYLQOq_bUWKseAfo",
# # "xs": "38%3AkHgOiwbxyT7sfw%3A2%3A1749015463%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "_Ns_aFWk_p0EElGrAi_tNXEg", "sb": "_Ns_aEsFh0KeghxVRRafVAUZ", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576802091944",
# # "fr": "0Z0WLbZCuM5XHjXDJ.AWdmWevd43Nen9oYwCFjg5iWlx86SAkcCWVXx_4HwQy6-gRgjBc.BoP9v8..AAA.0.0.BoP9wI.AWfK64zgJ71c76MCv2gazDXBzbI",
# # "xs": "31%3AtV4oOWHr4Il5ug%3A2%3A1749015561%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "E-E_aIwPKZ30BUqS4q8JIfQI", "sb": "E-E_aNEmMEV78u_Vnm8tQGX6", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "ps_l": "1", "ps_n": "1", "test_cookie": "CheckForPermission", "c_user": "61576325558767",
# # "fr": "0VvlERRijEeTAnppZ.AWcqoATDLUSVislwUMXIuDQYxxxf7ksmECHv--94j-wGTMQzk6U.BoP-ET..AAA.0.0.BoP-Eg.AWdR6VNnZSfB-_YWaA5XEWcyQvA",
# # "xs": "30%3AxljD-RO8defzCA%3A2%3A1749016865%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "JeE_aAX1P5smkZmQvLInd4ks", "sb": "JeE_aOyuAZsfMGpeUxsovVZD", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576711106140",
# # "fr": "07RdoRUs1SsUlq7ef.AWe3jMvGjMqd54IxTDl5vbLvGoOHRT1myhxqRRny7GRaIRszFTA.BoP-El..AAA.0.0.BoP-Eu.AWfyXHJMmkeyX0iwh1UXv_0if8Q",
# # "xs": "10%3AkZhoUr10WNy9ww%3A2%3A1749016879%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "MuE_aAucRyXVDioA3HWs1N0O", "sb": "MuE_aOmJUt_HZCAEkkimKKm4", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576218193987",
# # "fr": "0PTX91SU4KYxo8wSl.AWeeaJ5RiZemCmgKMZriUg8ZNd9rfzvzuM8mGYVxApULaImQUXk.BoP-Ey..AAA.0.0.BoP-FB.AWdQzDjj8jtqe9lKIvOExq6qF84",
# # "xs": "37%3ATE2pxPfwNwdj1g%3A2%3A1749016898%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "R-E_aNsGqOYN8pdKKM7W_Uex", "sb": "R-E_aNoFf6JeSlOoOmwMdYI4", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576332729468",
# # "fr": "0nU85lvzIaFTsdxjz.AWd0opLHfWkafYnwBhC_CzkdQFUXOQn65cetutGpbG3rSiuJRSo.BoP-FH..AAA.0.0.BoP-FU.AWfrmCKdqXpjSeHcu8z8xOTeKp4",
# # "xs": "50%3AJ9Xi34a6amKV1g%3A2%3A1749016917%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "WeE_aFQbLkcj3fSF25ZeCcfe", "sb": "WeE_aNEPhbw60DgzKrwPX_jE", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576570803564",
# # "fr": "0n8aDHJ9N2QuRqfQk.AWcKYZI4zd7qXFvbKAoYZGJ8rhUsOPJ1b8hdOR5fJpuEXRZK5mg.BoP-FZ..AAA.0.0.BoP-Fv.AWd8GCiXev5g05qJV1X_h-Wc5hQ",
# # "xs": "16%3AOYTGTKbKdBko6g%3A2%3A1749016944%3A-1%3A-1"},
# # {"locale": "en_US", "datr": "d-E_aKwMl9BeWWx1cR4MzMad", "sb": "d-E_aBZiULc0sw8LQEoqj--A", "m_pixel_ratio": "1",
# # "wd": "1920x1080", "test_cookie": "CheckForPermission", "c_user": "61576547344605",
# # "fr": "0Kccfs8cK4FoRKsFz.AWcjtGqjJwkD6lC_oTSjduL1w4AKD3ErAk877IvJKI3YI1-pUxQ.BoP-F3..AAA.0.0.BoP-GD.AWeg-MT8xtV9JelBtZuPnCkwg18",
# # "xs": "45%3AI8xuKEPRy222pA%3A2%3A1749016964%3A-1%3A-1"},
# # ]
# # for cookies in cookies_list:
# # # # 视频链接的帖子点赞
# # print(playwright_like(cookies, "https://www.facebook.com/groups/1070754870427928/permalink/1873461830157224/"))
#
# # 评论
# # print(playwright_comment(cookies, "https://www.facebook.com/groups/7423373454348259/permalink/24322822973976709/",
# # "6"))