最近在使用 Trae IDE 进行开发,有一说一,界面的确清爽好用。但因为我个人的工作流是 Windows Trae 客户端 + SSH 远程连接 Linux 服务器,并且配置第三方的 OpenAI 兼容接口,当使用工具内自带的openai接口功能接入gpt-5.4后,发现 AI 开始输出代码或文本时,前端界面的字符会排版严重错乱,导致修改的代码文件出错。
通过抓包脚本进行调试,发现跟返回的流式传输有关,于是写了一个 Go 语言“智能防抖代理”来解决,终于客户端回复正常了。
脚本功能如下:
- 物理节流:接受上游返回后,强制将输出速度限制在
128字节 / 10ms。增加数据返回客户端的延迟,契合 Trae 的前端渲染节奏,根治排版混乱。 - UTF-8 边界识别:在切分数据包时,自动通过二进制位运算检测切口。如果发现切在半个汉字上,自动回退字节,保证发给前端的每一个字符都是完整的,防止 4054 解析报错。
- 接口强转与劫持补全:Trae自带的GPT-5.4选项实际请求模型是gpt-5.4-2026-03-05,为了方便使用,对
/v1/models接口请求时强制注入gpt-5.4-2026-03-05模型,并增加对话模型替换功能,将前端请求的模型替换成后端支持的模型,例如:gpt-5.4,哪怕你的上游接口没有这个模型也能秒过校验并中转为上游支持的模型。
脚本使用方法
脚本需要运行在被 SSH 连接的 Linux 远程服务器上。
1. 保存代码并编译在 Linux 服务器上创建一个文件 main.go,将文末的代码粘贴进去。然后执行编译:
# 确保你的服务器安装了 Go 环境
go build -o trae_proxy main.go
2. 运行代理服务
必须使用 sudo 运行,因为脚本会自动修改 /etc/hosts 劫持域名并生成安装本地根证书。
sudo ./trae_proxy -target "http://你的第三方API地址" -model "你要强制使用的模型" -debug
参数说明:
-target:你的第三方 API 基础地址(不需要加/v1/chat/completions)。-model:(可选)无视你在 Trae 里选的啥,强行把你所有的请求替换为该模型,比如gpt-5.4。-debug:(可选)开启终端对话日志输出,可以在服务器端直观看到 AI 吐字的过程。
3. 最重要的一步:重启客户端!
脚本启动成功后,请务必在 Windows 上关闭 Trae 客户端并重新打开(或者 Reload Window)。只有这样,Trae Server 才会读取到我们新配置的根证书和 Hosts 劫持记录,添加大模型直接填写第三方中转站API秘钥即可,其它保持默认。
4. 安全退出
想要停止代理时,直接在终端按下 Ctrl + C 即可。脚本内置了优雅退出机制,会自动帮你把 /etc/hosts 恢复原样,并卸载临时证书。
附:完整 Go 语言源码 (main.go)
package main
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"encoding/pem"
"flag"
"fmt"
"io"
"log"
"math/big"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
const (
hostsFile = "/etc/hosts"
hijackEntry = "127.0.0.1 api.openai.com #AddedByTraeProxy"
)
var (
systemCAPath string
rootCA tls.Certificate
isDebug bool
forceModel string
)
// --- 流式数据拦截与流控 (智能防错乱版) ---
type streamInterceptor struct {
originalBody io.ReadCloser
dataChan chan []byte
errChan chan error
logChan chan[]byte
closeOnce sync.Once
currentBuf[]byte
}
func newStreamInterceptor(body io.ReadCloser, logChan chan[]byte) *streamInterceptor {
s := &streamInterceptor{
originalBody: body,
dataChan: make(chan[]byte, 10000),
errChan: make(chan error, 1),
logChan: logChan,
}
go s.readFromUpstream()
return s
}
func (s *streamInterceptor) readFromUpstream() {
defer close(s.dataChan)
for {
buf := make([]byte, 4096)
n, err := s.originalBody.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
s.dataChan <- chunk
}
if err != nil {
s.errChan <- err
return
}
}
}
// 代理发往 IDE 前端的数据读取逻辑
func (s *streamInterceptor) Read(p[]byte) (int, error) {
// 恢复雷打不动的物理节流:最大限制 128 字节
limit := 128
if len(p) < limit {
limit = len(p)
}
if len(s.currentBuf) == 0 {
select {
case chunk, ok := <-s.dataChan:
if !ok {
select {
case err := <-s.errChan:
return 0, err
default:
return 0, io.EOF
}
}
s.currentBuf = chunk
}
}
n := len(s.currentBuf)
if n > limit {
n = limit
}
// 【核心修复:智能 UTF-8 边界识别,彻底告别半个字符乱码导致的 4054】
// UTF-8 的多字节字符后续字节特征是二进制 10xxxxxx (即 16进制的 0x80~0xBF)
// 如果我们要切断的位置(index n)正好位于一个汉字的中间,
// 我们向前回退(n--),直到找到完整的字符边界!
if n < len(s.currentBuf) {
for n > 0 && (s.currentBuf[n]&0xC0) == 0x80 {
n--
}
}
if n == 0 { // 极小概率的安全兜底
n = 1
}
// 切取并输出安全的整字符数据
copy(p, s.currentBuf[:n])
s.currentBuf = s.currentBuf[n:]
if isDebug && s.logChan != nil {
b := make([]byte, n)
copy(b, p[:n])
select {
case s.logChan <- b:
default:
}
}
// 恢复强制休眠防错乱,保障前端渲染完美顺滑
time.Sleep(10 * time.Millisecond)
return n, nil
}
func (s *streamInterceptor) Close() error {
s.closeOnce.Do(func() {
if s.logChan != nil {
close(s.logChan)
}
})
return s.originalBody.Close()
}
// 终端日志打印
func startStreamLogger(ch <-chan[]byte) {
var buffer string
fmt.Print("\n\033[36m🤖[AI回复]: \033[0m\033[32m")
for data := range ch {
buffer += string(data)
for {
idx := strings.Index(buffer, "\n")
if idx == -1 {
break
}
line := strings.TrimSpace(buffer[:idx])
buffer = buffer[idx+1:]
if strings.HasPrefix(line, "data: ") {
payload := strings.TrimPrefix(line, "data: ")
if payload == "[DONE]" {
fmt.Print("\033[0m\n\033[90m[✔️ 传输结束]\033[0m\n")
continue
}
var v struct {
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
} `json:"choices"`
}
if err := json.Unmarshal([]byte(payload), &v); err == nil && len(v.Choices) > 0 {
fmt.Print(v.Choices[0].Delta.Content)
}
}
}
}
fmt.Print("\033[0m")
}
// --- 程序主入口 ---
func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "\n🚀 Trae 代理助手\n\n")
fmt.Fprintf(os.Stderr, "用法:\n")
fmt.Fprintf(os.Stderr, " %s [参数]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "参数列表:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\n示例:\n")
fmt.Fprintf(os.Stderr, " %s -target http://10.168.188.19:8317 -model gpt-5.4 -debug\n\n", os.Args[0])
}
targetFlag := flag.String("target", "https://api.openai.com", "目标 API 地址")
debugFlag := flag.Bool("debug", false, "开启调试模式(打印请求详情和AI回复)")
modelFlag := flag.String("model", "", "强制指定实际请求的模型名称(覆盖前端请求)")
helpFlag := flag.Bool("h", false, "显示帮助信息")
flag.Parse()
if *helpFlag {
flag.Usage()
os.Exit(0)
}
targetURL := *targetFlag
isDebug = *debugFlag
forceModel = *modelFlag
remote, err := url.Parse(targetURL)
if err != nil {
log.Fatalf("❌ 解析目标地址失败: %v", err)
}
fmt.Println("=====================================================")
fmt.Println("🚀 正在启动 API 代理...")
fmt.Printf("🎯 目标地址 : %s\n", targetURL)
if forceModel != "" {
fmt.Printf("🎭 强制模型 : 已开启 (所有请求将强制转为 \033[33m%s\033[0m)\n", forceModel)
}
if isDebug {
fmt.Println("🐛 运行模式 : 调试模式 (将打印详细日志)")
} else {
fmt.Println("⚡ 运行模式 : 标准模式 (精简输出)")
}
fmt.Println("🛑 按 Ctrl+C 可以安全停止并自动清理环境。")
fmt.Println("\033[31m⚠️ 重要提示 : 代理已启动,请务必【重启或重载 Trae 客户端】使配置生效!\033[0m")
fmt.Println("=====================================================")
generateAndInstallCA()
setupHosts()
go handleGracefulShutdown()
startProxyServer(remote)
}
// --- 核心反向代理 ---
func startProxyServer(remote *url.URL) {
cert := generateFakeServerCert("api.openai.com")
dialer := &net.Dialer{
Timeout: 15 * time.Second,
KeepAlive: 30 * time.Second,
}
customTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
if strings.HasPrefix(addr, "api.openai.com") {
r := &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial("udp", "8.8.8.8:53")
},
}
ips, err := r.LookupIPAddr(ctx, "api.openai.com")
if err == nil && len(ips) > 0 {
addr = fmt.Sprintf("%s:%s", ips[0].String(), strings.Split(addr, ":")[1])
}
}
return dialer.DialContext(ctx, network, addr)
},
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
proxy := httputil.NewSingleHostReverseProxy(remote)
proxy.Transport = customTransport
// 【强制同步刷新】:设为与休眠时间一致的 10ms,
// 防止反向代理库(ReverseProxy)自带的 32KB 缓冲将数据憋住导致超时
proxy.FlushInterval = 10 * time.Millisecond
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
if err == context.Canceled || err == io.EOF {
return
}
if isDebug {
log.Printf("❌ 代理错误: %v", err)
}
w.WriteHeader(http.StatusBadGateway)
}
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
originalDirector(req)
req.Host = remote.Host
req.Header.Del("Accept-Encoding")
if isDebug {
log.Printf("📥 收到请求 -> %s", req.URL.Path)
}
if forceModel != "" && req.Body != nil && req.Method != http.MethodGet {
bodyBytes, err := io.ReadAll(req.Body)
req.Body.Close()
if err == nil && len(bodyBytes) > 0 {
var payload map[string]interface{}
if err := json.Unmarshal(bodyBytes, &payload); err == nil {
if oldModelInter, hasModel := payload["model"]; hasModel {
oldModel, _ := oldModelInter.(string)
payload["model"] = forceModel
newBodyBytes, _ := json.Marshal(payload)
req.Body = io.NopCloser(bytes.NewReader(newBodyBytes))
req.ContentLength = int64(len(newBodyBytes))
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(newBodyBytes)))
if isDebug {
log.Printf("🔄 [模型篡改] 原始模型: %s -> 强制替换为: %s", oldModel, forceModel)
}
} else {
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
} else {
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
}
}
}
proxy.ModifyResponse = func(resp *http.Response) error {
if resp.Request != nil && resp.Request.URL.Path == "/v1/models" {
bodyBytes, err := io.ReadAll(resp.Body)
resp.Body.Close()
var payload map[string]interface{}
if err == nil && len(bodyBytes) > 0 {
json.Unmarshal(bodyBytes, &payload)
}
if payload == nil {
payload = make(map[string]interface{})
}
if _, ok := payload["object"]; !ok {
payload["object"] = "list"
}
dataInter, ok := payload["data"].([]interface{})
if !ok {
dataInter =[]interface{}{}
}
addModel := func(modelName string) {
for _, m := range dataInter {
if mObj, ok := m.(map[string]interface{}); ok {
if id, _ := mObj["id"].(string); id == modelName {
return
}
}
}
dataInter = append(dataInter, map[string]interface{}{
"id": modelName,
"object": "model",
"created": time.Now().Unix(),
"owned_by": "openai",
})
}
addModel("gpt-5")
if forceModel != "" {
addModel(forceModel)
}
payload["data"] = dataInter
newBodyBytes, _ := json.Marshal(payload)
resp.Body = io.NopCloser(bytes.NewReader(newBodyBytes))
resp.ContentLength = int64(len(newBodyBytes))
resp.Header.Set("Content-Length", fmt.Sprintf("%d", len(newBodyBytes)))
resp.Header.Set("Content-Type", "application/json")
if resp.StatusCode >= 400 {
resp.StatusCode = http.StatusOK
resp.Status = "200 OK"
}
return nil
}
contentType := resp.Header.Get("Content-Type")
if strings.Contains(contentType, "text/event-stream") {
// 流式响应强制干掉 Content-Length 头,防止客户端解析阻塞
resp.Header.Del("Content-Length")
var logChan chan []byte
if isDebug {
logChan = make(chan[]byte, 5000)
go startStreamLogger(logChan)
}
resp.Body = newStreamInterceptor(resp.Body, logChan)
}
return nil
}
server := &http.Server{
Addr: "127.0.0.1:443",
Handler: proxy,
ReadHeaderTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
TLSConfig: &tls.Config{
Certificates:[]tls.Certificate{*cert},
MaxVersion: tls.VersionTLS12,
},
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
}
if isDebug {
log.Println("🛡️ 服务已就绪,正在监听 127.0.0.1:443...")
}
if err := server.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
log.Fatalf("❌ 服务绑定失败 (请确保使用 sudo): %v", err)
}
}
// --- 证书管理与系统配置 (保持原样) ---
func generateAndInstallCA() {
priv, _ := rsa.GenerateKey(rand.Reader, 2048)
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, _ := rand.Int(rand.Reader, serialNumberLimit)
caTemplate := &x509.Certificate{
SerialNumber: serialNumber,
Subject: pkix.Name{Organization:[]string{"Proxy Local CA"}, CommonName: "Proxy Local Root CA"},
NotBefore: time.Now(), NotAfter: time.Now().AddDate(10, 0, 0),
IsCA: true, ExtKeyUsage:[]x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, BasicConstraintsValid: true,
}
caBytes, _ := x509.CreateCertificate(rand.Reader, caTemplate, caTemplate, &priv.PublicKey, priv)
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caBytes})
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)})
rootCA, _ = tls.X509KeyPair(certPEM, keyPEM)
if _, err := os.Stat("/usr/local/share/ca-certificates"); err == nil {
systemCAPath = "/usr/local/share/ca-certificates/proxy_local.crt"
os.WriteFile(systemCAPath, certPEM, 0644)
exec.Command("update-ca-certificates").Run()
} else if _, err := os.Stat("/etc/pki/ca-trust/source/anchors"); err == nil {
systemCAPath = "/etc/pki/ca-trust/source/anchors/proxy_local.crt"
os.WriteFile(systemCAPath, certPEM, 0644)
exec.Command("update-ca-trust", "extract").Run()
}
}
func uninstallCA() {
if systemCAPath != "" {
os.Remove(systemCAPath)
if strings.Contains(systemCAPath, "usr/local") {
exec.Command("update-ca-certificates").Run()
} else {
exec.Command("update-ca-trust", "extract").Run()
}
}
}
func generateFakeServerCert(host string) *tls.Certificate {
caLeaf, _ := x509.ParseCertificate(rootCA.Certificate[0])
priv, _ := rsa.GenerateKey(rand.Reader, 2048)
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, _ := rand.Int(rand.Reader, serialNumberLimit)
template := x509.Certificate{
SerialNumber: serialNumber, NotBefore: time.Now().Add(-24 * time.Hour), NotAfter: time.Now().Add(365 * 24 * time.Hour),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, ExtKeyUsage:[]x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
BasicConstraintsValid: true, DNSNames:[]string{host},
}
derBytes, _ := x509.CreateCertificate(rand.Reader, &template, caLeaf, &priv.PublicKey, rootCA.PrivateKey)
return &tls.Certificate{Certificate: [][]byte{derBytes}, PrivateKey: priv}
}
func setupHosts() {
hostsContent, _ := os.ReadFile(hostsFile)
if !strings.Contains(string(hostsContent), "api.openai.com") {
f, err := os.OpenFile(hostsFile, os.O_APPEND|os.O_WRONLY, 0644)
if err == nil {
f.WriteString("\n" + hijackEntry + "\n")
f.Close()
}
}
}
func handleGracefulShutdown() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
fmt.Println("\n🛑 收到退出信号,正在清理环境 (恢复 hosts 和证书)...")
file, err := os.Open(hostsFile)
if err == nil {
var lines[]string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
if scanner.Text() != hijackEntry {
lines = append(lines, scanner.Text())
}
}
file.Close()
os.WriteFile(hostsFile,[]byte(strings.Join(lines, "\n")+"\n"), 0644)
}
uninstallCA()
os.Exit(0)
}
3 个帖子 - 2 位参与者