title: chromedp自动操作doubao CreateTime: 2025-09-26 17:57:53 UpdateTime: 2026-01-06 11:33:17 CategoryName: AI --- # Go测试代码 ```go package test import ( "context" "fmt" "net/url" "os" "slices" "testing" "time" "github.com/chromedp/cdproto/cdp" "github.com/chromedp/cdproto/network" "github.com/chromedp/cdproto/page" "github.com/chromedp/chromedp" ) // userAgent 浏览器的UA头 //var userAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 Edg/141.0.0.0" // stealthJS 随机浏览器指纹信息,来自 https://github.com/requireCool/stealth.min.js var stealthJS, _ = os.ReadFile("stealth.min.js") var doubaoSSEJS, _ = os.ReadFile("doubao-sse.js") func TestDoubao(t *testing.T) { ctx := context.Background() chromedpOptions := []chromedp.ExecAllocatorOption{ chromedp.Flag("headless", false), // debug使用 false chromedp.Flag("disable-hang-monitor", true), // 禁用页面无响应检测 // 核心:禁用自动化指示器 chromedp.Flag("enable-automation", false), chromedp.Flag("useAutomationExtension", false), chromedp.Flag("disable-blink-features", "AutomationControlled"), // 辅助:增强伪装 //chromedp.UserAgent(userAgent), chromedp.Flag("disable-web-security", false), chromedp.Flag("ignore-certificate-errors", false), // 随机化窗口大小,避免所有实例千篇一律 //chromedp.WindowSize(1920+rand.Intn(200), 1080+rand.Intn(200)), } //初始化参数,先传一个空的数据 chromedpOptions = append(chromedp.DefaultExecAllocatorOptions[:], chromedpOptions...) allocatorContext, cancel1 := chromedp.NewExecAllocator(ctx, chromedpOptions...) //allocatorContext, cancel1 := chromedp.NewRemoteAllocator(ctx, "ws://10.0.0.121:9128") defer cancel1() chromeCtx, cancel2 := chromedp.NewContext(allocatorContext) defer cancel2() // 执行一个空task, 用提前创建Chrome实例 //chromedp.Run(chromeCtx, make([]chromedp.Action, 0, 1)...) //创建一个上下文,超时时间为300s chromeCtx, cancel3 := context.WithTimeout(chromeCtx, time.Duration(300)*time.Second) defer cancel3() // 监听deepseek的sse请求 sseStatus := make(chan int, 1) var content string var requestIDs []network.RequestID // 添加监听器 chromedp.ListenTarget(chromeCtx, func(ev interface{}) { switch ev := ev.(type) { case *network.EventRequestWillBeSent: //SSE请求发送 uri, _ := url.Parse(ev.Request.URL) if uri.Path != "/samantha/chat/completion" { break } if ev.Request.Method != "POST" { break } //sseStatus = 1 requestIDs = append(requestIDs, ev.RequestID) case *network.EventLoadingFinished: //SSE事件结束 i := slices.Index(requestIDs, ev.RequestID) if i < 0 { break } requestIDs = slices.Delete(requestIDs, i, i+1) fc := chromedp.FromContext(chromeCtx) ctx := cdp.WithExecutor(chromeCtx, fc.Target) go func() { //使用协程,避免错误 // network.GetResponseBody 因为content-type缺少charset=utf-8,CDP解析乱码 /* bs, err := network.GetResponseBody(ev.RequestID).Do(ctx) if err != nil { return } //获取SSE的整体返回值 content = content + string(bs) */ //等页面js执行1秒 time.Sleep(1 * time.Second) //var result string chromedp.Evaluate(`(function() {return sse_message_all;})()`, &content).Do(ctx) fmt.Println(content) sseStatus <- 1 }() } }) screenshotLogin := make([]byte, 0) screenshotSession := make([]byte, 0) // 无需登录,每次重启容器(修改docker service 启动命令,增加 -H tcp://0.0.0.0:2375 -H unix://var/run/docker.sock) curl -X POST http://10.0.0.131:2375/containers/chromedp-doubao/restart /* data, err := os.ReadFile("cookies.json") if err != nil { return } // 定义 map 来存储 cookies cookies := make(map[string]string) // 解析 JSON if err := json.Unmarshal(data, &cookies); err != nil { return } fmt.Println("---豆包的cookies已经获取---") // 2注入 map[string]string 形式的 cookies err = chromedp.Run(chromeCtx, //清理缓存 chromedp.ActionFunc(func(ctx context.Context) error { cookes, err := storage.GetCookies().Do(ctx) fmt.Println("cookes", cookes) return err }), //等待三秒 chromedp.Sleep(3*time.Second), chromedp.ActionFunc(func(ctx context.Context) error { for name, value := range cookies { // 构造 SetCookie action 并在正确的 ctx 下执行 c := network.SetCookie(name, value). WithDomain("www.doubao.com"). WithPath("/"). WithHTTPOnly(false). WithSecure(false) // 注意:Do(ctx) 只返回 error if err := c.Do(ctx); err != nil { fmt.Errorf("SetCookie %s failed: %w", name, err) } } return nil }), ) */ err := chromedp.Run(chromeCtx, chromedp.Tasks{ /* chromedp.ActionFunc(func(ctx context.Context) error { // 构造 SetCookie action 并在正确的 ctx 下执行 c := network.SetCookie("sessionid", "4e18aba71db74332e76f9ea0e10cf05a"). WithDomain("www.doubao.com"). WithPath("/"). WithHTTPOnly(false). WithSecure(false) // 注意:Do(ctx) 只返回 error if err := c.Do(ctx); err != nil { fmt.Printf("SetCookie %s failed: %v", "sessionid", err) } return nil }), */ //chromedp.Evaluate(`Object.defineProperty(navigator, 'webdriver', {get: () => undefined})`, nil), // 推荐使用 page.AddScriptToEvaluateOnNewDocument //chromedp.Evaluate(string(stealthJS), nil), chromedp.ActionFunc(func(ctx context.Context) error { _, err := page.AddScriptToEvaluateOnNewDocument(string(stealthJS)).Do(ctx) return err }), // 启用网络事件监听,这是关键一步 network.Enable(), // 覆盖 navigator.userAgent 等 //emulation.SetUserAgentOverride(userAgent), // 可同时设置额外请求头 //network.SetExtraHTTPHeaders(network.Headers{"Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": userAgent}), /* //指定分辨率的窗口 emulation.SetDeviceMetricsOverride(1920, 1080, 1.0, false). WithScreenOrientation(&emulation.ScreenOrientation{ Type: emulation.OrientationTypePortraitPrimary, Angle: 0, }), */ // 导航到登录页面 chromedp.Navigate("https://www.doubao.com/chat/"), //等待三秒 chromedp.Sleep(3 * time.Second), /* // 判断页面是否是已经登录 chromedp.ActionFunc(func(ctx context.Context) error { // 检查是否在登录页面(通过查找登录相关元素).使用相同的chromeCtx var exists bool err := chromedp.Run(chromeCtx, chromedp.EvaluateAsDevTools(`document.evaluate('//*[text()=\"密码登录\"]', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue !== null`, &exists)) if err != nil { return err } if !exists { return nil } // 如果找到登录元素,执行登录流程.使用相同的chromeCtx return chromedp.Run(chromeCtx, chromedp.Tasks{ // 点击密码登录标签页 chromedp.WaitReady(`//*[text()='密码登录']`, chromedp.BySearch), chromedp.Click(`//*[text()='密码登录']`, chromedp.BySearch), //输入账号 chromedp.WaitReady(`//*[@placeholder='请输入手机号/邮箱地址']`, chromedp.BySearch), chromedp.SendKeys(`//*[@placeholder='请输入手机号/邮箱地址']`, "账号", chromedp.BySearch), //输入密码 chromedp.WaitReady(`//*[@placeholder='请输入密码']`, chromedp.BySearch), chromedp.SendKeys(`//*[@placeholder='请输入密码']`, "密码", chromedp.BySearch), //点击登录 chromedp.WaitReady(`//*[text()='登录']`, chromedp.BySearch), chromedp.Click(`//*[text()='登录']`, chromedp.BySearch), // 等待登录完成 chromedp.Sleep(3 * time.Second), }) }), */ // 登录截屏 chromedp.FullScreenshot(&screenshotLogin, 100), //1.输入问题 // chromedp.SendKeys(`textarea[data-testid="chat_input_input"]`, "今天的热点新闻", chromedp.ByQuery), // 模拟人类输入问题 chromedp.ActionFunc(func(ctx context.Context) error { err := typeHumanLike(ctx, `textarea[data-testid="chat_input_input"]`, "只回复你好,不要做其他的任何事情", true) return err }), //等待三秒 chromedp.Sleep(3 * time.Second), // 2. 等待按钮处于可点击状态 (aria-disabled="false" 且没有 disabled 属性) chromedp.WaitVisible(`button#flow-end-msg-send[aria-disabled="false"]:not([disabled])`, chromedp.ByQuery), // 3. 点击按钮 chromedp.Click(`button#flow-end-msg-send`, chromedp.ByQuery), // 等待大模型回复,最多等待3分钟 chromedp.ActionFunc(func(ctx context.Context) error { // 设置超时时间为180秒 select { case <-sseStatus: // 成功状态 return nil case <-time.After(180 * time.Second): return nil } }), // 会话截屏 chromedp.FullScreenshot(&screenshotSession, 100), }) fmt.Println(err) if len(screenshotLogin) > 0 { os.Remove("screenshotLogin.png") os.WriteFile("screenshotLogin.png", screenshotLogin, 0644) } if len(screenshotSession) > 0 { os.Remove("screenshotSession.png") os.WriteFile("screenshotSession.png", screenshotSession, 0644) } fmt.Println(content) } // typeHumanLike 模拟人类打字 func typeHumanLike(ctx context.Context, selector, text string, allowMistakes bool) error { // 1. 等待元素可见 + 可交互 err := chromedp.Run(ctx, chromedp.WaitVisible(selector, chromedp.ByQuery), chromedp.WaitEnabled(selector, chromedp.ByQuery), ) if err != nil { return err } // 2. 强制聚焦(关键!) err = chromedp.Run(ctx, chromedp.Focus(selector, chromedp.ByQuery)) if err != nil { return err } // 4. 逐字符发送(使用 WithText + input.Char) for i, r := range text { // 偶尔误打 if allowMistakes && i > 1 && i < len([]rune(text))-1 && rand.Float32() < 0.07 { wrong := rune('a' + rand.Intn(26)) if err := sendChar(ctx, string(wrong)); err != nil { return err } time.Sleep(randomTypingDelay()) if err := sendKey(ctx, "Backspace"); err != nil { return err } time.Sleep(randomTypingDelay()) } // 正常输入 if err := sendChar(ctx, string(r)); err != nil { return err } time.Sleep(randomTypingDelay()) } // 5. 最后发送 Enter(如果需要提交) // if err := sendKey(ctx, "Enter"); err != nil { // return err // } return nil } // sendChar 发送单个字符 func sendChar(ctx context.Context, char string) error { return chromedp.Run(ctx, input.DispatchKeyEvent(input.KeyChar).WithText(char), ) } // sendKey 发送功能键(如 Backspace, Enter, Tab) func sendKey(ctx context.Context, key string) error { return chromedp.Run(ctx, input.DispatchKeyEvent(input.KeyDown).WithWindowsVirtualKeyCode(keyCodeMap[key]), input.DispatchKeyEvent(input.KeyUp).WithWindowsVirtualKeyCode(keyCodeMap[key]), ) } // 常用功能键映射 var keyCodeMap = map[string]int64{ "Backspace": 8, "Tab": 9, "Enter": 13, "Shift": 16, "Control": 17, "Alt": 18, "Escape": 27, "ArrowLeft": 37, "ArrowUp": 38, "ArrowRight": 39, "ArrowDown": 40, } // randomTypingDelay 返回人类打字间隔(正态分布) func randomTypingDelay() time.Duration { // 正态分布:均值 120ms,标准差 40ms delay := rand.NormFloat64()*40 + 120 if delay < 30 { delay = 30 } if delay > 300 { delay = 300 } return time.Duration(delay) * time.Millisecond } ``` # stealth.min.js 注意修改 ```['en-US', 'en']``` 为 ```['zh-CN', 'zh']```,默认中文界面 下载:[stealth.min.js.zip](/public/109/stealth.min.js.zip) 浏览器指纹验证网站: - [https://www.browserscan.net/zh](https://www.browserscan.net/zh) - [https://bot.sannysoft.com](https://bot.sannysoft.com) # docker compose ```yaml services: chromedp: image: chromedp/headless-shell:143.0.7445.3 container_name: chromedp restart: unless-stopped command: - "--headless=new" - "--window-size=1920,1080" - "--no-sandbox" - "--disable-setuid-sandbox" - "--disable-background-timer-throttling" - "--disable-backgrounding-occluded-windows" - "--disable-renderer-backgrounding" - "--disable-features=VizDisplayCompositor" - "--enable-unsafe-swiftshader" - "--disable-hang-monitor" - "--disable-automation" - "--disable-extensions" - "--disable-blink-features=AutomationControlled" - "--disable-web-security=false" - "--ignore-certificate-errors=false" shm_size: 4g #extra_hosts: # - "www.doubao.com:10.0.0.131" environment: TZ: Asia/Shanghai LANG: zh_CN.UTF-8 LC_ALL: zh_CN.UTF-8 ports: - "8222:9222" deploy: resources: limits: cpus: '4' memory: 16G reservations: cpus: '4' memory: 8G ``` # 中文乱码 ## 使用nginx修改Content-Type ```conf ### 豆包的 Content-Type text/event-stream 没有 charset=utf-8,造成乱码.(CDP会严格按照协议执行解析,无法修改Content-Type) ### 指定 www.doubao.com 的Nginx解析IP, 自签https证书 #extra_hosts: # - "www.doubao.com:10.0.0.131" location /samantha/chat/completion { ## 设置UTF-8编码 add_header Content-Type "text/event-stream; charset=utf-8"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 取消缓冲 proxy_buffering off; # 关闭代理缓存 proxy_cache off; # 代理到真实的豆包服务 proxy_pass https://www.doubao.com; } location / { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 取消缓冲 proxy_buffering off; # 关闭代理缓存 proxy_cache off; # 代理到真实的豆包服务 proxy_pass https://www.doubao.com; } ``` ## sse.js 监听SSE响应 ```js var sse_message_all = ""; // 全局变量,用于存储所有 SSE 消息 function sleep(delay) { var start = (new Date()).getTime(); while ((new Date()).getTime() - start < delay) { continue; } } (function () { if (window._NET_HOOKED) return; window._NET_HOOKED = true; // --- 1. Hook fetch API (保留你原来的逻辑) --- const originalFetch = window.fetch; window.fetch = function (...args) { return originalFetch.apply(this, args).then(response => { // 不是 SSE 请求, 不处理 const contentType = response.headers.get('content-type'); if (!contentType || contentType.toLowerCase().indexOf("text/event-stream") < 0) { return response; } // 等待获取流,避免流复制异常 sleep(1000); // 是 SSE 请求, 复制响应体并开始读取 const [stream1, stream2] = response.body.tee(); response = new Response(stream2, response); const reader = stream1.getReader(); const decoder = new TextDecoder(); let buffer = ''; function read() { reader.read().then(({ done, value }) => { if (done) return; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop(); // 最后一行可能不完整,保留到下次 lines.forEach(line => { // 提取并存储 SSE 数据行 // 原始的 SSE 消息是 'data: message\n' 这样的格式 sse_message_all = sse_message_all + line + '\n'; }); read(); }).catch(error => { // console.error("Fetch SSE read error:", error); }); } read(); return response; }); }; // --- 2. Hook XMLHttpRequest (处理 XHR 发起的 SSE) --- const originalOpen = XMLHttpRequest.prototype.open; const originalSend = XMLHttpRequest.prototype.send; XMLHttpRequest.prototype.open = function (method, url, ...rest) { // 在 open 时保存 url 和 method this._method = method; this._url = url; originalOpen.apply(this, [method, url, ...rest]); }; XMLHttpRequest.prototype.send = function(...args) { // 确保不会重复设置 onreadystatechange if (this._hooked) { return originalSend.apply(this, args); } // 原始的 onreadystatechange 处理器 const originalOnReadyStateChange = this.onreadystatechange || function() {}; // 检查请求是否可能是一个 SSE 请求 (根据 URL 或其他上下文) // 简单起见,我们只在 readyState 改变时检查响应头 const xhr = this; let isSSE = false; let lastLength = 0; // 用于跟踪上次读取的文本长度 this.onreadystatechange = function() { // 只有当 isSSE 为 true 或 readyState 已经达到 HEADERS_RECEIVED 时,才进行头信息检查 if (xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) { // 检查 Content-Type 响应头 const contentType = xhr.getResponseHeader('Content-Type'); if (contentType && contentType.toLowerCase().indexOf('text/event-stream') > -1) { isSSE = true; // console.log("XHR SSE request detected:", xhr._url); } } // 拦截处于 LOADING (readyState = 3) 状态的 SSE 数据 // SSE 通常会在 readyState = 3 时推送数据 if (isSSE && xhr.readyState === XMLHttpRequest.LOADING) { try { // XHR SSE 的特点是它会持续在 readyState=3 时更新 responseText // 我们只处理新增的部分 let currentText = xhr.responseText; let newText = currentText.substring(lastLength); lastLength = currentText.length; if (newText.length > 0) { // 按行处理新增的 SSE 数据 const lines = newText.split('\n'); lines.forEach(line => { // 存储 SSE 数据行 sse_message_all = sse_message_all + line + '\n'; }); } } catch (e) { // console.error("XHR SSE read error:", e); } } // 调用原始的 onreadystatechange originalOnReadyStateChange.apply(this, arguments); }; this._hooked = true; originalSend.apply(this, args); }; // 💡 注意: 这种 XHR 拦截方式比较复杂,因为它需要模拟 EventSource 的分段读取行为。 // 它可能无法完美模拟 EventSource 的所有边缘情况(如自动重连逻辑)。 })(); // 现在,不论是 fetch 还是 XHR 发起的 SSE 请求,它们的消息都会被追加到 sse_message_all 变量中。 ``` ## ~~代码处理乱码字符串(部分字符丢失,无法恢复,废弃)~~ ```go package main /** 场景: 后台返回的UTF-8字符,Content-Type里没有声明 charset=UTF-8,被CDP按照Latin-1字符处理. "系统错误" \u7cfb\u7edf\u9519\u8bef UTF-8 字节流 错误的编码成了 Latin-1 字符 然后又被 json.Marshal 编码成了 Unicode 转义序列 最终变成了: \u00e7\u00b3\u00bb\u00e7\u00bb\u0178\u00e9\u201d\u2122\u00e8\u00af\u00af **/ import ( "encoding/json" "fmt" "strconv" "strings" "testing" ) /** Latin-1 vs Windows-1252 ISO-8859-1 (Latin-1): 定义了 0x00–0xFF 的字符 但 0x80–0x9F 是控制字符(不可见) 所以这些字节 没有对应的可见字符 Windows-1252: 是 Latin-1 的超集 0x80–0x9F 被重新定义成了可见字符(如 €、‚、ƒ、„、…、†、‡、ˆ、‰、Š、‹、Œ、Ž、‘、’、“、”、•、–、—、˜、™、š、›、œ、ž、Ÿ) **/ // unicodeToLatin1 映射 Unicode 字符 -> Latin-1/Windows-1252 (0x80–0x9F) 字节 var unicodeToLatin1 = map[rune]byte{ 0x20ac: 0x80, // € 0x201a: 0x82, // ‚ 0x0192: 0x83, // ƒ 0x201e: 0x84, // „ 0x2026: 0x85, // … 0x2020: 0x86, // † 0x2021: 0x87, // ‡ 0x02c6: 0x88, // ˆ 0x2030: 0x89, // ‰ 0x0160: 0x8a, // Š 0x2039: 0x8b, // ‹ 0x0152: 0x8c, // Œ 0x017d: 0x8e, // Ž 0x2018: 0x91, // ‘ 0x2019: 0x92, // ’ 0x201c: 0x93, // “ 0x201d: 0x94, // ” 0x2022: 0x95, // • 0x2013: 0x96, // – 0x2014: 0x97, // — 0x02dc: 0x98, // ˜ 0x2122: 0x99, // ™ 0x0161: 0x9a, // š 0x203a: 0x9b, // › 0x0153: 0x9c, // œ 0x017e: 0x9e, // ž 0x0178: 0x9f, // Ÿ } // decodeFakeUnicodeEscape 解码错误的 Unicode 转义序列为原始字符串 func decodeFakeUnicodeEscape(input string) string { var bytes []byte // 按 \uXXXX 拆分 parts := strings.Split(input, "\\u") for _, part := range parts { if len(part) < 4 { continue } hex := part[:4] code, err := strconv.ParseInt(hex, 16, 32) if err != nil { continue } r := rune(code) // 如果是特殊字符, 反向映射为字节 if b, ok := unicodeToLatin1[r]; ok { bytes = append(bytes, b) } else { // 否则取低字节 bytes = append(bytes, byte(r&0xFF)) } } // 解码 UTF-8 字节流为字符串 str := string(bytes) // 重新编码为标准 Unicode 转义序列 var result strings.Builder for _, r := range str { result.WriteString(fmt.Sprintf("\\u%04x", r)) } return result.String() } func Test_Latin1ToUnicode(t *testing.T) { // 系统错误 的乱码 系统错误 input := `"\u00e7\u00b3\u00bb\u00e7\u00bb\u0178\u00e9\u201d\u2122\u00e8\u00af\u00af"` var result string json.Unmarshal([]byte(input), &result) fmt.Println("乱码汉字:", result) //result = `系统错误` // 重新编码为 \uXXXX 形式 ascii := strconv.QuoteToASCII(result) fmt.Println("乱码汉字重新编码Unicode:", ascii) input = ascii[1 : len(ascii)-1] fmt.Println("乱码Unicode去掉两面的引号:", input) output := decodeFakeUnicodeEscape(input) fmt.Println("输入乱码Unicode:", input) fmt.Println("输出正确Unicode:", output) jsonStr := `"` + output + `"` // 包成 JSON 字符串 var hanzi string json.Unmarshal([]byte(jsonStr), &hanzi) fmt.Println("输出正常汉字:", hanzi) } ```