chromedp自动操作doubao
文章目录
Go测试代码
package test
import (
"context"
"fmt"
"net/url"
"os"
"slices"
"testing"
"time"
"github.com/chromedp/cdproto/cdp"
"github.com/chromedp/cdproto/network"
"github.com/chromedp/cdproto/page"
"github.com/chromedp/chromedp"
)
// userAgent 浏览器的UA头
//var userAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0"
// stealthJS 随机浏览器指纹信息,来自 https://github.com/requireCool/stealth.min.js
var stealthJS, _ = os.ReadFile("stealth.min.js")
var doubaoSSEJS, _ = os.ReadFile("doubao-sse.js")
func TestDoubao(t *testing.T) {
ctx := context.Background()
chromedpOptions := []chromedp.ExecAllocatorOption{
chromedp.Flag("headless", false), // debug使用 false
chromedp.Flag("disable-hang-monitor", true), // 禁用页面无响应检测
// 核心:禁用自动化指示器
chromedp.Flag("enable-automation", false),
chromedp.Flag("useAutomationExtension", false),
chromedp.Flag("disable-blink-features", "AutomationControlled"),
// 辅助:增强伪装
//chromedp.UserAgent(userAgent),
chromedp.Flag("disable-web-security", false),
chromedp.Flag("ignore-certificate-errors", false),
// 随机化窗口大小,避免所有实例千篇一律
//chromedp.WindowSize(1920+rand.Intn(200), 1080+rand.Intn(200)),
}
//初始化参数,先传一个空的数据
chromedpOptions = append(chromedp.DefaultExecAllocatorOptions[:], chromedpOptions...)
allocatorContext, cancel1 := chromedp.NewExecAllocator(ctx, chromedpOptions...)
//allocatorContext, cancel1 := chromedp.NewRemoteAllocator(ctx, "ws://10.0.0.121:9128")
defer cancel1()
chromeCtx, cancel2 := chromedp.NewContext(allocatorContext)
defer cancel2()
// 执行一个空task, 用提前创建Chrome实例
//chromedp.Run(chromeCtx, make([]chromedp.Action, 0, 1)...)
//创建一个上下文,超时时间为300s
chromeCtx, cancel3 := context.WithTimeout(chromeCtx, time.Duration(300)*time.Second)
defer cancel3()
// 监听deepseek的sse请求
sseStatus := make(chan int, 1)
var content string
var requestIDs []network.RequestID
// 添加监听器
chromedp.ListenTarget(chromeCtx, func(ev interface{}) {
switch ev := ev.(type) {
case *network.EventRequestWillBeSent: //SSE请求发送
uri, _ := url.Parse(ev.Request.URL)
if uri.Path != "/samantha/chat/completion" {
break
}
if ev.Request.Method != "POST" {
break
}
//sseStatus = 1
requestIDs = append(requestIDs, ev.RequestID)
case *network.EventLoadingFinished: //SSE事件结束
i := slices.Index(requestIDs, ev.RequestID)
if i < 0 {
break
}
requestIDs = slices.Delete(requestIDs, i, i+1)
fc := chromedp.FromContext(chromeCtx)
ctx := cdp.WithExecutor(chromeCtx, fc.Target)
go func() { //使用协程,避免错误
// network.GetResponseBody 因为content-type缺少charset=utf-8,CDP解析乱码
/*
bs, err := network.GetResponseBody(ev.RequestID).Do(ctx)
if err != nil {
return
}
//获取SSE的整体返回值
content = content + string(bs)
*/
//等页面js执行1秒
time.Sleep(1 * time.Second)
//var result string
chromedp.Evaluate(`(function() {return sse_message_all;})()`, &content).Do(ctx)
fmt.Println(content)
sseStatus <- 1
}()
}
})
screenshotLogin := make([]byte, 0)
screenshotSession := make([]byte, 0)
// 无需登录,每次重启容器(修改docker service 启动命令,增加 -H tcp://0.0.0.0:2375 -H unix://var/run/docker.sock) curl -X POST http://10.0.0.131:2375/containers/chromedp-doubao/restart
/*
data, err := os.ReadFile("cookies.json")
if err != nil {
return
}
// 定义 map 来存储 cookies
cookies := make(map[string]string)
// 解析 JSON
if err := json.Unmarshal(data, &cookies); err != nil {
return
}
fmt.Println("---豆包的cookies已经获取---")
// 2注入 map[string]string 形式的 cookies
err = chromedp.Run(chromeCtx,
//清理缓存
chromedp.ActionFunc(func(ctx context.Context) error {
cookes, err := storage.GetCookies().Do(ctx)
fmt.Println("cookes", cookes)
return err
}),
//等待三秒
chromedp.Sleep(3*time.Second),
chromedp.ActionFunc(func(ctx context.Context) error {
for name, value := range cookies {
// 构造 SetCookie action 并在正确的 ctx 下执行
c := network.SetCookie(name, value).
WithDomain("www.doubao.com").
WithPath("/").
WithHTTPOnly(false).
WithSecure(false)
// 注意:Do(ctx) 只返回 error
if err := c.Do(ctx); err != nil {
fmt.Errorf("SetCookie %s failed: %w", name, err)
}
}
return nil
}),
)
*/
err := chromedp.Run(chromeCtx, chromedp.Tasks{
/*
chromedp.ActionFunc(func(ctx context.Context) error {
// 构造 SetCookie action 并在正确的 ctx 下执行
c := network.SetCookie("sessionid", "4e18aba71db74332e76f9ea0e10cf05a").
WithDomain("www.doubao.com").
WithPath("/").
WithHTTPOnly(false).
WithSecure(false)
// 注意:Do(ctx) 只返回 error
if err := c.Do(ctx); err != nil {
fmt.Printf("SetCookie %s failed: %v", "sessionid", err)
}
return nil
}),
*/
//chromedp.Evaluate(`Object.defineProperty(navigator, 'webdriver', {get: () => undefined})`, nil),
// 推荐使用 page.AddScriptToEvaluateOnNewDocument
//chromedp.Evaluate(string(stealthJS), nil),
chromedp.ActionFunc(func(ctx context.Context) error {
_, err := page.AddScriptToEvaluateOnNewDocument(string(stealthJS)).Do(ctx)
return err
}),
// 启用网络事件监听,这是关键一步
network.Enable(),
// 覆盖 navigator.userAgent 等
//emulation.SetUserAgentOverride(userAgent),
// 可同时设置额外请求头
//network.SetExtraHTTPHeaders(network.Headers{"Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": userAgent}),
/*
//指定分辨率的窗口
emulation.SetDeviceMetricsOverride(1920, 1080, 1.0, false).
WithScreenOrientation(&emulation.ScreenOrientation{
Type: emulation.OrientationTypePortraitPrimary,
Angle: 0,
}),
*/
// 导航到登录页面
chromedp.Navigate("https://www.doubao.com/chat/"),
//等待三秒
chromedp.Sleep(3 * time.Second),
/*
// 判断页面是否是已经登录
chromedp.ActionFunc(func(ctx context.Context) error {
// 检查是否在登录页面(通过查找登录相关元素).使用相同的chromeCtx
var exists bool
err := chromedp.Run(chromeCtx, chromedp.EvaluateAsDevTools(`document.evaluate('//*[text()=\"密码登录\"]', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue !== null`, &exists))
if err != nil {
return err
}
if !exists {
return nil
}
// 如果找到登录元素,执行登录流程.使用相同的chromeCtx
return chromedp.Run(chromeCtx, chromedp.Tasks{
// 点击密码登录标签页
chromedp.WaitReady(`//*[text()='密码登录']`, chromedp.BySearch),
chromedp.Click(`//*[text()='密码登录']`, chromedp.BySearch),
//输入账号
chromedp.WaitReady(`//*[@placeholder='请输入手机号/邮箱地址']`, chromedp.BySearch),
chromedp.SendKeys(`//*[@placeholder='请输入手机号/邮箱地址']`, "账号", chromedp.BySearch),
//输入密码
chromedp.WaitReady(`//*[@placeholder='请输入密码']`, chromedp.BySearch),
chromedp.SendKeys(`//*[@placeholder='请输入密码']`, "密码", chromedp.BySearch),
//点击登录
chromedp.WaitReady(`//*[text()='登录']`, chromedp.BySearch),
chromedp.Click(`//*[text()='登录']`, chromedp.BySearch),
// 等待登录完成
chromedp.Sleep(3 * time.Second),
})
}),
*/
// 登录截屏
chromedp.FullScreenshot(&screenshotLogin, 100),
//1.输入问题
// chromedp.SendKeys(`textarea[data-testid="chat_input_input"]`, "今天的热点新闻", chromedp.ByQuery),
// 模拟人类输入问题
chromedp.ActionFunc(func(ctx context.Context) error {
err := typeHumanLike(ctx, `textarea[data-testid="chat_input_input"]`, "只回复你好,不要做其他的任何事情", true)
return err
}),
//等待三秒
chromedp.Sleep(3 * time.Second),
// 2. 等待按钮处于可点击状态 (aria-disabled="false" 且没有 disabled 属性)
chromedp.WaitVisible(`button#flow-end-msg-send[aria-disabled="false"]:not([disabled])`, chromedp.ByQuery),
// 3. 点击按钮
chromedp.Click(`button#flow-end-msg-send`, chromedp.ByQuery),
// 等待大模型回复,最多等待3分钟
chromedp.ActionFunc(func(ctx context.Context) error {
// 设置超时时间为180秒
select {
case <-sseStatus: // 成功状态
return nil
case <-time.After(180 * time.Second):
return nil
}
}),
// 会话截屏
chromedp.FullScreenshot(&screenshotSession, 100),
})
fmt.Println(err)
if len(screenshotLogin) > 0 {
os.Remove("screenshotLogin.png")
os.WriteFile("screenshotLogin.png", screenshotLogin, 0644)
}
if len(screenshotSession) > 0 {
os.Remove("screenshotSession.png")
os.WriteFile("screenshotSession.png", screenshotSession, 0644)
}
fmt.Println(content)
}
// typeHumanLike 模拟人类打字
func typeHumanLike(ctx context.Context, selector, text string, allowMistakes bool) error {
// 1. 等待元素可见 + 可交互
err := chromedp.Run(ctx,
chromedp.WaitVisible(selector, chromedp.ByQuery),
chromedp.WaitEnabled(selector, chromedp.ByQuery),
)
if err != nil {
return err
}
// 2. 强制聚焦(关键!)
err = chromedp.Run(ctx, chromedp.Focus(selector, chromedp.ByQuery))
if err != nil {
return err
}
// 4. 逐字符发送(使用 WithText + input.Char)
for i, r := range text {
// 偶尔误打
if allowMistakes && i > 1 && i < len([]rune(text))-1 && rand.Float32() < 0.07 {
wrong := rune('a' + rand.Intn(26))
if err := sendChar(ctx, string(wrong)); err != nil {
return err
}
time.Sleep(randomTypingDelay())
if err := sendKey(ctx, "Backspace"); err != nil {
return err
}
time.Sleep(randomTypingDelay())
}
// 正常输入
if err := sendChar(ctx, string(r)); err != nil {
return err
}
time.Sleep(randomTypingDelay())
}
// 5. 最后发送 Enter(如果需要提交)
// if err := sendKey(ctx, "Enter"); err != nil {
// return err
// }
return nil
}
// sendChar 发送单个字符
func sendChar(ctx context.Context, char string) error {
return chromedp.Run(ctx,
input.DispatchKeyEvent(input.KeyChar).WithText(char),
)
}
// sendKey 发送功能键(如 Backspace, Enter, Tab)
func sendKey(ctx context.Context, key string) error {
return chromedp.Run(ctx,
input.DispatchKeyEvent(input.KeyDown).WithWindowsVirtualKeyCode(keyCodeMap[key]),
input.DispatchKeyEvent(input.KeyUp).WithWindowsVirtualKeyCode(keyCodeMap[key]),
)
}
// 常用功能键映射
var keyCodeMap = map[string]int64{
"Backspace": 8,
"Tab": 9,
"Enter": 13,
"Shift": 16,
"Control": 17,
"Alt": 18,
"Escape": 27,
"ArrowLeft": 37,
"ArrowUp": 38,
"ArrowRight": 39,
"ArrowDown": 40,
}
// randomTypingDelay 返回人类打字间隔(正态分布)
func randomTypingDelay() time.Duration {
// 正态分布:均值 120ms,标准差 40ms
delay := rand.NormFloat64()*40 + 120
if delay < 30 {
delay = 30
}
if delay > 300 {
delay = 300
}
return time.Duration(delay) * time.Millisecond
}
stealth.min.js
注意修改 ['en-US', 'en'] 为 ['zh-CN', 'zh'],默认中文界面
下载:stealth.min.js.zip
浏览器指纹验证网站:
docker compose
services:
chromedp:
image: chromedp/headless-shell:143.0.7445.3
container_name: chromedp
restart: unless-stopped
command:
- "--headless=new"
- "--window-size=1920,1080"
- "--no-sandbox"
- "--disable-setuid-sandbox"
- "--disable-background-timer-throttling"
- "--disable-backgrounding-occluded-windows"
- "--disable-renderer-backgrounding"
- "--disable-features=VizDisplayCompositor"
- "--enable-unsafe-swiftshader"
- "--disable-hang-monitor"
- "--disable-automation"
- "--disable-extensions"
- "--disable-blink-features=AutomationControlled"
- "--disable-web-security=false"
- "--ignore-certificate-errors=false"
shm_size: 4g
#extra_hosts:
# - "www.doubao.com:10.0.0.131"
environment:
TZ: Asia/Shanghai
LANG: zh_CN.UTF-8
LC_ALL: zh_CN.UTF-8
ports:
- "8222:9222"
deploy:
resources:
limits:
cpus: '4'
memory: 16G
reservations:
cpus: '4'
memory: 8G
中文乱码
使用nginx修改Content-Type
### 豆包的 Content-Type text/event-stream 没有 charset=utf-8,造成乱码.(CDP会严格按照协议执行解析,无法修改Content-Type)
### 指定 www.doubao.com 的Nginx解析IP, 自签https证书
#extra_hosts:
# - "www.doubao.com:10.0.0.131"
location /samantha/chat/completion {
## 设置UTF-8编码
add_header Content-Type "text/event-stream; charset=utf-8";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 取消缓冲
proxy_buffering off;
# 关闭代理缓存
proxy_cache off;
# 代理到真实的豆包服务
proxy_pass https://www.doubao.com;
}
location / {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 取消缓冲
proxy_buffering off;
# 关闭代理缓存
proxy_cache off;
# 代理到真实的豆包服务
proxy_pass https://www.doubao.com;
}
sse.js 监听SSE响应
var sse_message_all = ""; // 全局变量,用于存储所有 SSE 消息
function sleep(delay) {
var start = (new Date()).getTime();
while ((new Date()).getTime() - start < delay) {
continue;
}
}
(function () {
if (window._NET_HOOKED) return;
window._NET_HOOKED = true;
// --- 1. Hook fetch API (保留你原来的逻辑) ---
const originalFetch = window.fetch;
window.fetch = function (...args) {
return originalFetch.apply(this, args).then(response => {
// 不是 SSE 请求, 不处理
const contentType = response.headers.get('content-type');
if (!contentType || contentType.toLowerCase().indexOf("text/event-stream") < 0) {
return response;
}
// 等待获取流,避免流复制异常
sleep(1000);
// 是 SSE 请求, 复制响应体并开始读取
const [stream1, stream2] = response.body.tee();
response = new Response(stream2, response);
const reader = stream1.getReader();
const decoder = new TextDecoder();
let buffer = '';
function read() {
reader.read().then(({ done, value }) => {
if (done) return;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 最后一行可能不完整,保留到下次
lines.forEach(line => {
// 提取并存储 SSE 数据行
// 原始的 SSE 消息是 'data: message\n' 这样的格式
sse_message_all = sse_message_all + line + '\n';
});
read();
}).catch(error => {
// console.error("Fetch SSE read error:", error);
});
}
read();
return response;
});
};
// --- 2. Hook XMLHttpRequest (处理 XHR 发起的 SSE) ---
const originalOpen = XMLHttpRequest.prototype.open;
const originalSend = XMLHttpRequest.prototype.send;
XMLHttpRequest.prototype.open = function (method, url, ...rest) {
// 在 open 时保存 url 和 method
this._method = method;
this._url = url;
originalOpen.apply(this, [method, url, ...rest]);
};
XMLHttpRequest.prototype.send = function(...args) {
// 确保不会重复设置 onreadystatechange
if (this._hooked) {
return originalSend.apply(this, args);
}
// 原始的 onreadystatechange 处理器
const originalOnReadyStateChange = this.onreadystatechange || function() {};
// 检查请求是否可能是一个 SSE 请求 (根据 URL 或其他上下文)
// 简单起见,我们只在 readyState 改变时检查响应头
const xhr = this;
let isSSE = false;
let lastLength = 0; // 用于跟踪上次读取的文本长度
this.onreadystatechange = function() {
// 只有当 isSSE 为 true 或 readyState 已经达到 HEADERS_RECEIVED 时,才进行头信息检查
if (xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) {
// 检查 Content-Type 响应头
const contentType = xhr.getResponseHeader('Content-Type');
if (contentType && contentType.toLowerCase().indexOf('text/event-stream') > -1) {
isSSE = true;
// console.log("XHR SSE request detected:", xhr._url);
}
}
// 拦截处于 LOADING (readyState = 3) 状态的 SSE 数据
// SSE 通常会在 readyState = 3 时推送数据
if (isSSE && xhr.readyState === XMLHttpRequest.LOADING) {
try {
// XHR SSE 的特点是它会持续在 readyState=3 时更新 responseText
// 我们只处理新增的部分
let currentText = xhr.responseText;
let newText = currentText.substring(lastLength);
lastLength = currentText.length;
if (newText.length > 0) {
// 按行处理新增的 SSE 数据
const lines = newText.split('\n');
lines.forEach(line => {
// 存储 SSE 数据行
sse_message_all = sse_message_all + line + '\n';
});
}
} catch (e) {
// console.error("XHR SSE read error:", e);
}
}
// 调用原始的 onreadystatechange
originalOnReadyStateChange.apply(this, arguments);
};
this._hooked = true;
originalSend.apply(this, args);
};
// 💡 注意: 这种 XHR 拦截方式比较复杂,因为它需要模拟 EventSource 的分段读取行为。
// 它可能无法完美模拟 EventSource 的所有边缘情况(如自动重连逻辑)。
})();
// 现在,不论是 fetch 还是 XHR 发起的 SSE 请求,它们的消息都会被追加到 sse_message_all 变量中。
代码处理乱码字符串(部分字符丢失,无法恢复,废弃)
package main
/**
场景: 后台返回的UTF-8字符,Content-Type里没有声明 charset=UTF-8,被CDP按照Latin-1字符处理.
"系统错误" \u7cfb\u7edf\u9519\u8bef
UTF-8 字节流 错误的编码成了 Latin-1 字符
然后又被 json.Marshal 编码成了 Unicode 转义序列
最终变成了: \u00e7\u00b3\u00bb\u00e7\u00bb\u0178\u00e9\u201d\u2122\u00e8\u00af\u00af
**/
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"testing"
)
/**
Latin-1 vs Windows-1252
ISO-8859-1 (Latin-1):
定义了 0x00–0xFF 的字符
但 0x80–0x9F 是控制字符(不可见)
所以这些字节 没有对应的可见字符
Windows-1252:
是 Latin-1 的超集
0x80–0x9F 被重新定义成了可见字符(如 €、‚、ƒ、„、…、†、‡、ˆ、‰、Š、‹、Œ、Ž、‘、’、“、”、•、–、—、˜、™、š、›、œ、ž、Ÿ)
**/
// unicodeToLatin1 映射 Unicode 字符 -> Latin-1/Windows-1252 (0x80–0x9F) 字节
var unicodeToLatin1 = map[rune]byte{
0x20ac: 0x80, // €
0x201a: 0x82, // ‚
0x0192: 0x83, // ƒ
0x201e: 0x84, // „
0x2026: 0x85, // …
0x2020: 0x86, // †
0x2021: 0x87, // ‡
0x02c6: 0x88, // ˆ
0x2030: 0x89, // ‰
0x0160: 0x8a, // Š
0x2039: 0x8b, // ‹
0x0152: 0x8c, // Œ
0x017d: 0x8e, // Ž
0x2018: 0x91, // ‘
0x2019: 0x92, // ’
0x201c: 0x93, // “
0x201d: 0x94, // ”
0x2022: 0x95, // •
0x2013: 0x96, // –
0x2014: 0x97, // —
0x02dc: 0x98, // ˜
0x2122: 0x99, // ™
0x0161: 0x9a, // š
0x203a: 0x9b, // ›
0x0153: 0x9c, // œ
0x017e: 0x9e, // ž
0x0178: 0x9f, // Ÿ
}
// decodeFakeUnicodeEscape 解码错误的 Unicode 转义序列为原始字符串
func decodeFakeUnicodeEscape(input string) string {
var bytes []byte
// 按 \uXXXX 拆分
parts := strings.Split(input, "\\u")
for _, part := range parts {
if len(part) < 4 {
continue
}
hex := part[:4]
code, err := strconv.ParseInt(hex, 16, 32)
if err != nil {
continue
}
r := rune(code)
// 如果是特殊字符, 反向映射为字节
if b, ok := unicodeToLatin1[r]; ok {
bytes = append(bytes, b)
} else {
// 否则取低字节
bytes = append(bytes, byte(r&0xFF))
}
}
// 解码 UTF-8 字节流为字符串
str := string(bytes)
// 重新编码为标准 Unicode 转义序列
var result strings.Builder
for _, r := range str {
result.WriteString(fmt.Sprintf("\\u%04x", r))
}
return result.String()
}
func Test_Latin1ToUnicode(t *testing.T) {
// 系统错误 的乱码 系统错误
input := `"\u00e7\u00b3\u00bb\u00e7\u00bb\u0178\u00e9\u201d\u2122\u00e8\u00af\u00af"`
var result string
json.Unmarshal([]byte(input), &result)
fmt.Println("乱码汉字:", result)
//result = `系统错误`
// 重新编码为 \uXXXX 形式
ascii := strconv.QuoteToASCII(result)
fmt.Println("乱码汉字重新编码Unicode:", ascii)
input = ascii[1 : len(ascii)-1]
fmt.Println("乱码Unicode去掉两面的引号:", input)
output := decodeFakeUnicodeEscape(input)
fmt.Println("输入乱码Unicode:", input)
fmt.Println("输出正确Unicode:", output)
jsonStr := `"` + output + `"` // 包成 JSON 字符串
var hanzi string
json.Unmarshal([]byte(jsonStr), &hanzi)
fmt.Println("输出正常汉字:", hanzi)
}
文章作者
上次更新 2025-11-14