From 3dc74b6ad22c45dee4d8aa1a05cbee83e79202b7 Mon Sep 17 00:00:00 2001 From: work Date: Fri, 10 Apr 2026 17:28:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8F=91=E5=B8=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spider/task.py | 203 ++++++++++++++++++++++++++++++---------- test_playwright_post.py | 76 +++++++++++++++ 2 files changed, 228 insertions(+), 51 deletions(-) create mode 100644 test_playwright_post.py diff --git a/spider/task.py b/spider/task.py index 3492037..aca4881 100644 --- a/spider/task.py +++ b/spider/task.py @@ -432,73 +432,173 @@ def retry_get_new_video(page, cookies, post_count): Args: page: Playwright页面对象 - cookies: Cookies + cookies: Cookies (此处未使用,保留接口) post_count: 初始帖子数量 Returns: - 函数执行结果或抛出Timeout(如果超时) + 新视频的页面URL + + Raises: + TimeoutError: 超时未获取到新视频 + OperationFailed: 点击视频时出错 """ - max_duration = 10 * 60 # 5分钟(秒) - retry_interval = 30 # 30秒重试一次 + max_duration = 10 * 60 # 5分钟(300秒) + retry_interval = 30 # 30秒重试一次 start_time = time.time() attempt = 1 while time.time() - start_time < max_duration: - # 获取当前帖子数量 + # 1. 滚动到页面底部,触发懒加载新内容 + print("滚动到页面底部,加载更多内容...") + page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + # 等待新内容加载(可调整时间或使用网络空闲等待) + page.wait_for_timeout(3000) # 等待3秒让新帖子渲染 + # 可选:等待网络空闲确保动态内容加载完成 + # page.wait_for_load_state("networkidle") + + # 2. 获取当前帖子数量 new_post_count = get_post_count(page, cookies) - # 如果新帖子数量大于初始帖子数量,则表示有新帖子上传 + print(f"第{attempt}次检查: 初始帖子数={post_count}, 当前帖子数={new_post_count}") + + # 3. 如果有新帖子(视频) if new_post_count > post_count: + print("检测到新视频,准备点击...") try: - # 尝试点击视频 + # 获取所有带 aria-posinset 的 div(代表每个视频帖子) comment_buttons = page.query_selector_all('//div[@aria-posinset]') if comment_buttons: - # 使用js去点击第一个评论按钮 - element = page.query_selector_all('//div[@aria-posinset]')[0] - # 获取元素位置 + # 使用第一个视频元素(最新发布的通常在第一个) + element = comment_buttons[0] bounding_box = element.bounding_box() if bounding_box: - # 滚动到特定位置 + # 滚动到元素可视区域中央 page.evaluate( - f''' () => {{ window.scrollTo({{ top: {bounding_box['y']} - window.innerHeight / 2, left: {bounding_box['x']} - window.innerWidth / 2, behavior: 'smooth' }}); }} ''') + f''' () => {{ + window.scrollTo({{ + top: {bounding_box['y']} - window.innerHeight / 2, + left: {bounding_box['x']} - window.innerWidth / 2, + behavior: 'smooth' + }}); + }} ''' + ) + page.wait_for_timeout(1000) # 等待滚动完成 - # 等待滚动完成 - page.wait_for_timeout(1000) - - # 点击 + # 点击视频 element.click() + # 等待视频页面加载 time.sleep(random.randint(3, 5)) + # 刷新页面确保视频播放器完全加载(可选) page.reload(timeout=180000) time.sleep(random.randint(3, 5)) return page.url else: - raise OperationFailed(f"未找到视频") + raise OperationFailed("未找到视频元素的有效位置信息") + else: + raise OperationFailed("未找到任何视频元素(div[@aria-posinset])") + except Exception as e: raise OperationFailed(f"点击视频时出错: {e}") - # 计算下一次重试时间 + # 4. 没有新视频,等待下一次重试 elapsed = time.time() - start_time remaining_time = max_duration - elapsed if remaining_time > 0: - # 等待30秒或剩余时间(取较小值) sleep_time = min(retry_interval, remaining_time) - print(f"第{attempt}次尝试,等待 {sleep_time:.1f} 秒后重试... (剩余时间: {remaining_time:.1f}秒)") + print(f"第{attempt}次尝试未发现新视频,等待 {sleep_time:.1f} 秒后重试... (剩余时间: {remaining_time:.1f}秒)") time.sleep(sleep_time) attempt += 1 + # 超时退出 print("5分钟超时,退出重试") raise TimeoutError("未获取到新视频(可能视频上传失败),已超时") -def playwright_post(cookies, content, image_key=None): +def _is_video_media(image_key): + return bool(image_key and image_key.lower().endswith('.mp4')) + + +def _download_post_media(image_key): + if not image_key: + return None + + filename = image_key.split('/')[-1] + unique_filename = f"{uuid.uuid4()}_{filename}" + file_path = os.path.join(BASE_PATH, 'files', unique_filename) + client.fget_object(BUCKET, image_key, file_path) + return file_path + + +def _open_post_composer(page, has_media): + if has_media: + composer = page.locator('//div[contains(@aria-placeholder, "What\'s on your mind")]').first + composer.wait_for(state='visible', timeout=300000) + return composer + + triggers = [ + '//span[contains(text(), "What\'s on your mind")]', + '//div[@role="button"]//span[contains(text(), "What\'s on your mind")]', + ] + last_error = None + for selector in triggers: + try: + page.locator(selector).first.click(timeout=30000) + composer = page.locator('//div[contains(@aria-placeholder, "What\'s on your mind")]').first + composer.wait_for(state='visible', timeout=300000) + return composer + except Error as e: + last_error = e + + raise OperationFailed(f'未能打开发布输入框: {last_error}') + + +def _fill_post_content(page, content, has_media): + composer = _open_post_composer(page, has_media=has_media) + composer.fill(content, timeout=300000) + + +def _wait_post_submit_result(page): + page.wait_for_timeout(15000) + + +def _get_latest_post_url(page): + candidates = [ + '//div[@aria-posinset="1"]//a[@role="link"]', + '//a[contains(@href, "/posts/")]', + '//a[contains(@href, "permalink")]', + ] + for selector in candidates: + locator = page.locator(selector) + count = locator.count() + if count == 0: + continue + for index in range(count): + href = locator.nth(index).get_attribute('href') + if href and ('/posts/' in href or 'permalink' in href): + if href.startswith('/'): + return f'https://www.facebook.com{href}' + return href + + page.reload(timeout=180000) + return page.url + + +def _is_facebook_home(page): + current_url = (page.url or '').rstrip('/') + return current_url in {'https://www.facebook.com', 'https://facebook.com'} + + +def playwright_post(cookies, content, image_key=None, dry_run=False): path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe') with lock: with sync_playwright() as playwright: update_windows_distinguish() max_browser_retries = 3 last_error = None + parsed_cookies = parse_cookies(cookies) + is_video = _is_video_media(image_key) for browser_attempt in range(max_browser_retries): browser = None @@ -509,51 +609,52 @@ def playwright_post(cookies, content, image_key=None): headless=False, args=['--start-maximized'], executable_path=path ) context = browser.new_context(no_viewport=True) - context.add_cookies(parse_cookies(cookies)) + context.add_cookies(parsed_cookies) page = context.new_page() + page.set_default_timeout(30000) + page.set_default_navigation_timeout(180000) - check_account_status(page, parse_cookies(cookies)) - # 声明默认发布视频数量 + check_account_status(page, parsed_cookies) video_count = 0 url = 'https://www.facebook.com' - # 先获取视频数量 - if image_key is not None and ".mp4" in image_key: + if is_video: video_count = get_post_count(page, cookies) - # check_account_status 已经把页面带到 Facebook 首页,避免重复二次跳转导致页面进程崩溃 - if "facebook.com" not in page.url: + if not _is_facebook_home(page): retry_goto(page, url) - time.sleep(random.randint(3, 10)) - time.sleep(5) + sleep(3, 5) if image_key: - filename = image_key.split('/')[-1] - file_path = os.path.join(BASE_PATH, 'files', filename) - client.fget_object(BUCKET, image_key, file_path) - + file_path = _download_post_media(image_key) sleep(1, 2) page.locator('input[accept="image/*,image/heif,image/heic,video/*,video/mp4,video/x-m4v,video/x-matroska,.mkv"]').set_input_files(file_path) - time.sleep(5) + page.locator('//div[contains(@aria-placeholder, "What\'s on your mind")]').first.wait_for( + state='visible', timeout=300000 + ) - if not image_key: - page.click('''//span[contains(text(), "What's on your mind")]''') + _fill_post_content(page, content, has_media=bool(image_key)) _edit_privacy(page) - # 修改后 (使用 fill) - page.fill('//div[contains(@aria-placeholder, "What\'s on your mind")]', content, - timeout=300000) - page.click('//div[@aria-label="Post"]', timeout=300000) - time.sleep(15) - post_index = page.locator('//div[@aria-posinset="1"]//a[@role="link"]').nth(2) - post_index.click(timeout=600000) - time.sleep(5) - page.reload(timeout=180000) - post_url = page.url - # 视频格式要单独去获取链接 - if image_key is not None and ".mp4" in image_key: - post_url = retry_get_new_video(page, cookies, video_count) - time.sleep(random.randint(3, 10)) + post_button = page.locator('//div[@aria-label="Post"]').first + post_button.wait_for(state='visible', timeout=300000) + if dry_run: + screenshot_content = _full_screenshot() + key = f'screenshot/{uuid.uuid4()}.png' + put_object(key, screenshot_content) + return { + 'response_url': page.url, + 'screenshot_key': key, + 'dry_run': True, + 'message': '已完成到发布前校验,未实际点击 Post' + } + post_button.click(timeout=300000) + _wait_post_submit_result(page) + + if is_video: + post_url = retry_get_new_video(page, cookies, video_count) + else: + post_url = _get_latest_post_url(page) screenshot_content = _full_screenshot() key = f'screenshot/{uuid.uuid4()}.png' put_object(key, screenshot_content) diff --git a/test_playwright_post.py b/test_playwright_post.py new file mode 100644 index 0000000..d132bb4 --- /dev/null +++ b/test_playwright_post.py @@ -0,0 +1,76 @@ +import json +import os +import shutil +import uuid +from pathlib import Path + +from loguru import logger + +import spider.task as task_module + + +# 直接在这里填写测试参数 +COOKIES = {"c_user":"61586392053773","datr":"WV6nae8OJRICxw_kijnEpLD1","fr":"0hGCJPLgNUFLUrV6Z.AWcsvWymOjMTEFtBgLfY-pw-Xz-P97RXobgVcM284eq3bj35ub4.Bpp16Z..AAA.0.0.Bpp16Z.AWdJOJ9zB5VzOioMAXgv9kW9VqE","xs":"29:Tu729Jl28NcUNQ:2:1772576414:-1:-1"} + +CONTENT = "International rankings consistently place the Philippines high on corruption perception. This damages foreign investment and our global standing. Let's change this narrative.#PoliticalDynastyCorruption" + +LOCAL_VIDEO_PATH = r"E:\Code\Python\facebook\files\e2b8eaad-f950-46b6-9268-634d697f1ac9.mp4" + +DRY_RUN = False + + +def _validate_config(): + missing = [key for key, value in COOKIES.items() if not str(value).strip()] + if missing: + raise ValueError(f"cookies 缺少字段: {', '.join(missing)}") + + if not CONTENT.strip(): + raise ValueError("CONTENT 不能为空") + + if not LOCAL_VIDEO_PATH.strip(): + raise ValueError("LOCAL_VIDEO_PATH 不能为空") + + video_path = Path(LOCAL_VIDEO_PATH) + if not video_path.exists(): + raise FileNotFoundError(f"视频文件不存在: {video_path}") + + if video_path.suffix.lower() != ".mp4": + raise ValueError(f"当前测试文件仅按 mp4 视频发布流程处理: {video_path}") + + +def _prepare_local_video(video_path_str): + source = Path(video_path_str) + temp_name = f"{uuid.uuid4()}_{source.name}" + target = Path(task_module.BASE_PATH) / "files" / temp_name + target.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source, target) + return str(target) + + +def main(): + _validate_config() + + logger.add("./log/test_playwright_post.log", rotation="20 MB") + + original_download = task_module._download_post_media + + def _download_post_media_for_test(_image_key): + return _prepare_local_video(LOCAL_VIDEO_PATH) + + task_module._download_post_media = _download_post_media_for_test + + try: + result = task_module.playwright_post( + cookies=COOKIES, + content=CONTENT, + image_key=os.path.basename(LOCAL_VIDEO_PATH), + dry_run=DRY_RUN, + ) + logger.info("发布结果: {}", result) + print(json.dumps(result, ensure_ascii=False, indent=2)) + finally: + task_module._download_post_media = original_download + + +if __name__ == "__main__": + main()