优化发帖
All checks were successful
Update Code / StopService (windows-101.36.102.136) (push) Successful in 1s
Update Code / StopService (windows-101.36.104.175) (push) Successful in 1s
Update Code / CD (windows-101.36.102.136) (push) Successful in 8s
Update Code / CD (windows-101.36.104.175) (push) Successful in 12s

This commit is contained in:
work
2026-04-10 17:28:52 +08:00
parent e8208b06f1
commit 3dc74b6ad2
2 changed files with 228 additions and 51 deletions

View File

@@ -432,73 +432,173 @@ def retry_get_new_video(page, cookies, post_count):
Args:
page: Playwright页面对象
cookies: Cookies
cookies: Cookies (此处未使用,保留接口)
post_count: 初始帖子数量
Returns:
函数执行结果或抛出Timeout如果超时
新视频的页面URL
Raises:
TimeoutError: 超时未获取到新视频
OperationFailed: 点击视频时出错
"""
max_duration = 10 * 60 # 5分钟
retry_interval = 30 # 30秒重试一次
max_duration = 10 * 60 # 5分钟300秒)
retry_interval = 30 # 30秒重试一次
start_time = time.time()
attempt = 1
while time.time() - start_time < max_duration:
# 获取当前帖子数量
# 1. 滚动到页面底部,触发懒加载新内容
print("滚动到页面底部,加载更多内容...")
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
# 等待新内容加载(可调整时间或使用网络空闲等待)
page.wait_for_timeout(3000) # 等待3秒让新帖子渲染
# 可选:等待网络空闲确保动态内容加载完成
# page.wait_for_load_state("networkidle")
# 2. 获取当前帖子数量
new_post_count = get_post_count(page, cookies)
# 如果新帖子数量大于初始帖子数量,则表示有新帖子上传
print(f"{attempt}次检查: 初始帖子数={post_count}, 当前帖子数={new_post_count}")
# 3. 如果有新帖子(视频)
if new_post_count > post_count:
print("检测到新视频,准备点击...")
try:
# 尝试点击视频
# 获取所有带 aria-posinset 的 div代表每个视频帖子
comment_buttons = page.query_selector_all('//div[@aria-posinset]')
if comment_buttons:
# 使用js去点击第一个评论按钮
element = page.query_selector_all('//div[@aria-posinset]')[0]
# 获取元素位置
# 使用第一个视频元素(最新发布的通常在第一个)
element = comment_buttons[0]
bounding_box = element.bounding_box()
if bounding_box:
# 滚动到特定位置
# 滚动到元素可视区域中央
page.evaluate(
f''' () => {{ window.scrollTo({{ top: {bounding_box['y']} - window.innerHeight / 2, left: {bounding_box['x']} - window.innerWidth / 2, behavior: 'smooth' }}); }} ''')
f''' () => {{
window.scrollTo({{
top: {bounding_box['y']} - window.innerHeight / 2,
left: {bounding_box['x']} - window.innerWidth / 2,
behavior: 'smooth'
}});
}} '''
)
page.wait_for_timeout(1000) # 等待滚动完成
# 等待滚动完成
page.wait_for_timeout(1000)
# 点击
# 点击视频
element.click()
# 等待视频页面加载
time.sleep(random.randint(3, 5))
# 刷新页面确保视频播放器完全加载(可选)
page.reload(timeout=180000)
time.sleep(random.randint(3, 5))
return page.url
else:
raise OperationFailed(f"未找到视频")
raise OperationFailed("未找到视频元素的有效位置信息")
else:
raise OperationFailed("未找到任何视频元素div[@aria-posinset]")
except Exception as e:
raise OperationFailed(f"点击视频时出错: {e}")
# 计算下一次重试时间
# 4. 没有新视频,等待下一次重试
elapsed = time.time() - start_time
remaining_time = max_duration - elapsed
if remaining_time > 0:
# 等待30秒或剩余时间取较小值
sleep_time = min(retry_interval, remaining_time)
print(f"{attempt}次尝试,等待 {sleep_time:.1f} 秒后重试... (剩余时间: {remaining_time:.1f}秒)")
print(f"{attempt}次尝试未发现新视频,等待 {sleep_time:.1f} 秒后重试... (剩余时间: {remaining_time:.1f}秒)")
time.sleep(sleep_time)
attempt += 1
# 超时退出
print("5分钟超时退出重试")
raise TimeoutError("未获取到新视频(可能视频上传失败),已超时")
def playwright_post(cookies, content, image_key=None):
def _is_video_media(image_key):
return bool(image_key and image_key.lower().endswith('.mp4'))
def _download_post_media(image_key):
if not image_key:
return None
filename = image_key.split('/')[-1]
unique_filename = f"{uuid.uuid4()}_{filename}"
file_path = os.path.join(BASE_PATH, 'files', unique_filename)
client.fget_object(BUCKET, image_key, file_path)
return file_path
def _open_post_composer(page, has_media):
if has_media:
composer = page.locator('//div[contains(@aria-placeholder, "What\'s on your mind")]').first
composer.wait_for(state='visible', timeout=300000)
return composer
triggers = [
'//span[contains(text(), "What\'s on your mind")]',
'//div[@role="button"]//span[contains(text(), "What\'s on your mind")]',
]
last_error = None
for selector in triggers:
try:
page.locator(selector).first.click(timeout=30000)
composer = page.locator('//div[contains(@aria-placeholder, "What\'s on your mind")]').first
composer.wait_for(state='visible', timeout=300000)
return composer
except Error as e:
last_error = e
raise OperationFailed(f'未能打开发布输入框: {last_error}')
def _fill_post_content(page, content, has_media):
composer = _open_post_composer(page, has_media=has_media)
composer.fill(content, timeout=300000)
def _wait_post_submit_result(page):
page.wait_for_timeout(15000)
def _get_latest_post_url(page):
candidates = [
'//div[@aria-posinset="1"]//a[@role="link"]',
'//a[contains(@href, "/posts/")]',
'//a[contains(@href, "permalink")]',
]
for selector in candidates:
locator = page.locator(selector)
count = locator.count()
if count == 0:
continue
for index in range(count):
href = locator.nth(index).get_attribute('href')
if href and ('/posts/' in href or 'permalink' in href):
if href.startswith('/'):
return f'https://www.facebook.com{href}'
return href
page.reload(timeout=180000)
return page.url
def _is_facebook_home(page):
current_url = (page.url or '').rstrip('/')
return current_url in {'https://www.facebook.com', 'https://facebook.com'}
def playwright_post(cookies, content, image_key=None, dry_run=False):
path = os.path.join(BASE_PATH, 'chrome', '130-0008', 'chrome.exe')
with lock:
with sync_playwright() as playwright:
update_windows_distinguish()
max_browser_retries = 3
last_error = None
parsed_cookies = parse_cookies(cookies)
is_video = _is_video_media(image_key)
for browser_attempt in range(max_browser_retries):
browser = None
@@ -509,51 +609,52 @@ def playwright_post(cookies, content, image_key=None):
headless=False, args=['--start-maximized'], executable_path=path
)
context = browser.new_context(no_viewport=True)
context.add_cookies(parse_cookies(cookies))
context.add_cookies(parsed_cookies)
page = context.new_page()
page.set_default_timeout(30000)
page.set_default_navigation_timeout(180000)
check_account_status(page, parse_cookies(cookies))
# 声明默认发布视频数量
check_account_status(page, parsed_cookies)
video_count = 0
url = 'https://www.facebook.com'
# 先获取视频数量
if image_key is not None and ".mp4" in image_key:
if is_video:
video_count = get_post_count(page, cookies)
# check_account_status 已经把页面带到 Facebook 首页,避免重复二次跳转导致页面进程崩溃
if "facebook.com" not in page.url:
if not _is_facebook_home(page):
retry_goto(page, url)
time.sleep(random.randint(3, 10))
time.sleep(5)
sleep(3, 5)
if image_key:
filename = image_key.split('/')[-1]
file_path = os.path.join(BASE_PATH, 'files', filename)
client.fget_object(BUCKET, image_key, file_path)
file_path = _download_post_media(image_key)
sleep(1, 2)
page.locator('input[accept="image/*,image/heif,image/heic,video/*,video/mp4,video/x-m4v,video/x-matroska,.mkv"]').set_input_files(file_path)
time.sleep(5)
page.locator('//div[contains(@aria-placeholder, "What\'s on your mind")]').first.wait_for(
state='visible', timeout=300000
)
if not image_key:
page.click('''//span[contains(text(), "What's on your mind")]''')
_fill_post_content(page, content, has_media=bool(image_key))
_edit_privacy(page)
# 修改后 (使用 fill)
page.fill('//div[contains(@aria-placeholder, "What\'s on your mind")]', content,
timeout=300000)
page.click('//div[@aria-label="Post"]', timeout=300000)
time.sleep(15)
post_index = page.locator('//div[@aria-posinset="1"]//a[@role="link"]').nth(2)
post_index.click(timeout=600000)
time.sleep(5)
page.reload(timeout=180000)
post_url = page.url
# 视频格式要单独去获取链接
if image_key is not None and ".mp4" in image_key:
post_url = retry_get_new_video(page, cookies, video_count)
time.sleep(random.randint(3, 10))
post_button = page.locator('//div[@aria-label="Post"]').first
post_button.wait_for(state='visible', timeout=300000)
if dry_run:
screenshot_content = _full_screenshot()
key = f'screenshot/{uuid.uuid4()}.png'
put_object(key, screenshot_content)
return {
'response_url': page.url,
'screenshot_key': key,
'dry_run': True,
'message': '已完成到发布前校验,未实际点击 Post'
}
post_button.click(timeout=300000)
_wait_post_submit_result(page)
if is_video:
post_url = retry_get_new_video(page, cookies, video_count)
else:
post_url = _get_latest_post_url(page)
screenshot_content = _full_screenshot()
key = f'screenshot/{uuid.uuid4()}.png'
put_object(key, screenshot_content)