• 设为首页
  • 收藏本站
  • 积分充值
  • VIP赞助
  • 手机版
  • 微博
  • 微信
    微信公众号 添加方式:
    1:搜索微信号(888888
    2:扫描左侧二维码
  • 快捷导航
    福建二哥 门户 查看主题

    使用Golang实现流式输出

    发布者: 怀卉1097 | 发布时间: 2025-8-14 09:30| 查看数: 114| 评论数: 0|帖子模式

    流式输出的深度剖析
    之前一直在调用openai的key,只是照着文档进行流式调用,也只知其确是流式与api有所不同,而未成体系深究其实现原理。
    就以openai的官方流式输出为切入。
    概述
    流式输出(Streaming Output)是 HTTP 响应中的一种模式,服务器可以在生成部分内容时立即将这些内容发送给客户端,而无需等待整个响应内容生成完成。这种方式常用于实时交互、高延迟操作或长时间任务中,比如 OpenAI 的 GPT 模型生成流式对话。
    1. package main

    2. import (
    3.         "bufio"
    4.         "bytes"
    5.         "encoding/json"
    6.         "fmt"
    7.         "net/http"
    8.         "strings"
    9.         "time"
    10. )

    11. // 定义必要的数据结构
    12. type Message struct {
    13.         Role    string `json:"role"`
    14.         Content string `json:"content"`
    15. }

    16. type RequestBody struct {
    17.         Model       string    `json:"model"`
    18.         Messages    []Message `json:"messages"`
    19.         Temperature float64   `json:"temperature"`
    20.         Stream      bool      `json:"stream"`
    21. }

    22. type Choice struct {
    23.         Delta struct {
    24.                 Content string `json:"content"`
    25.         } `json:"delta"`
    26. }

    27. type ResponseBody struct {
    28.         Choices []Choice `json:"choices"`
    29. }

    30. const (
    31.         apiURL      = "https://api.example.com/v1/chat/completions" // 替换为实际的 API 地址
    32.         authToken   = "your-auth-token"                             // 替换为实际的 Token
    33.         model       = "gpt-3.5-turbo"
    34.         temperature = 0.7
    35. )

    36. func StreamHandler(w http.ResponseWriter, r *http.Request) {
    37.         // 从查询参数获取输入内容
    38.         content := r.URL.Query().Get("content")
    39.         if content == "" {
    40.                 http.Error(w, "Missing 'content' parameter", http.StatusBadRequest)
    41.                 return
    42.         }

    43.         // 构造请求体
    44.         message := Message{
    45.                 Role:    "user",
    46.                 Content: content,
    47.         }
    48.         requestBody := RequestBody{
    49.                 Model:       model,
    50.                 Messages:    []Message{message},
    51.                 Temperature: temperature,
    52.                 Stream:      true,
    53.         }
    54.         jsonData, err := json.Marshal(requestBody)
    55.         if err != nil {
    56.                 http.Error(w, "Failed to marshal request body", http.StatusInternalServerError)
    57.                 return
    58.         }

    59.         // 创建 HTTP 请求
    60.         req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonData))
    61.         if err != nil {
    62.                 http.Error(w, "Failed to create request", http.StatusInternalServerError)
    63.                 return
    64.         }
    65.         req.Header.Set("Content-Type", "application/json")
    66.         req.Header.Set("Authorization", "Bearer "+authToken)

    67.         // 设置 HTTP 客户端
    68.         client := &http.Client{Timeout: time.Second * 50}
    69.         resp, err := client.Do(req)
    70.         if err != nil {
    71.                 http.Error(w, "Failed to get response", http.StatusInternalServerError)
    72.                 return
    73.         }
    74.         defer resp.Body.Close()

    75.         // 设置响应头,开启流式输出
    76.         w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
    77.         w.Header().Set("Cache-Control", "no-cache")
    78.         w.Header().Set("Connection", "keep-alive")

    79.         // 确保 ResponseWriter 支持 Flusher
    80.         flusher, ok := w.(http.Flusher)
    81.         if !ok {
    82.                 http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
    83.                 return
    84.         }

    85.         // 处理流式响应
    86.         scanner := bufio.NewScanner(resp.Body)
    87.         for scanner.Scan() {
    88.                 line := scanner.Text()

    89.                 // 处理以 "data: " 开头的行
    90.                 if strings.HasPrefix(line, "data: ") {
    91.                         line = strings.TrimPrefix(line, "data: ")
    92.                 }
    93.                 if line == "[DONE]" {
    94.                         break
    95.                 }
    96.                 if line == "" {
    97.                         continue
    98.                 }

    99.                 // 解析响应内容
    100.                 var chunk ResponseBody
    101.                 if err := json.Unmarshal([]byte(line), &chunk); err != nil {
    102.                         continue
    103.                 }

    104.                 // 将响应数据逐步发送给客户端
    105.                 for _, choice := range chunk.Choices {
    106.                         content := choice.Delta.Content
    107.                         _, err := w.Write([]byte(content))
    108.                         if err != nil {
    109.                                 http.Error(w, "Failed to write response", http.StatusInternalServerError)
    110.                                 return
    111.                         }
    112.                         flusher.Flush() // 刷新缓冲区
    113.                 }
    114.         }

    115.         if err := scanner.Err(); err != nil {
    116.                 http.Error(w, "Scanner error", http.StatusInternalServerError)
    117.                 return
    118.         }
    119. }

    120. func main() {
    121.         http.HandleFunc("/stream", StreamHandler)
    122.         fmt.Println("Server started at :8080")
    123.         http.ListenAndServe(":8080", nil)
    124. }
    复制代码
    核心流程
    接收到用户输入后,将其作为 content 参数发送给目标 API。
    开启流式输出模式,设置 Stream: true。
    使用 http.Flusher 将从远程接口接收到的内容逐步发送给客户端。
    关键点
    1.流式响应头设置
    1. w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
    2. w.Header().Set("Cache-Control", "no-cache")
    3. w.Header().Set("Connection", "keep-alive")
    复制代码
    实时输出: 通过 w.Write 输出内容后调用 flusher.Flush() 确保数据实时发送。
    启动服务后,通过浏览器访问类似以下 URL:
    1. http://localhost:8080/stream?content=Hello%20world
    复制代码
    客户端会逐步接收内容,类似命令行实时打印。
    1. HTTP 协议中的流式响应
    流式输出利用 HTTP 协议的特性,不关闭连接,逐步将数据发送给客户端。典型流式响应会设置如下 HTTP Header:
    Content-Type: text/event-stream表示这是一个事件流(Event Stream),用于向客户端连续发送数据片段。
    Cache-Control: no-cache防止响应被缓存,以确保客户端接收到实时内容。
    Connection: keep-alive 保持连接处于活跃状态,支持多次数据传输。
    2. 流式输出的工作原理
    客户端发起请求,服务器在接收到请求后开始响应。
    服务器不一次性生成完整的响应内容,而是将生成的部分数据逐段发送。
    客户端收到数据后立即处理,而无需等待完整响应结束。
    在数据发送完成后,服务器可以选择关闭连接或保持连接以发送后续数据。
    流式输出的常见应用场景
    实时聊天:聊天模型逐词/逐句生成时,可以实时传输数据。
    日志监控:将服务器的实时日志逐行推送到前端。
    流式文件传输:如大文件或视频流传输。
    实时进度更新:如任务进度条更新。
    到此这篇关于使用Golang实现流式输出的文章就介绍到这了,更多相关Golang流式输出内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    来源:互联网
    免责声明:如果侵犯了您的权益,请联系站长(1277306191@qq.com),我们会及时删除侵权内容,谢谢合作!

    最新评论

    QQ Archiver 手机版 小黑屋 福建二哥 ( 闽ICP备2022004717号|闽公网安备35052402000345号 )

    Powered by Discuz! X3.5 © 2001-2023

    快速回复 返回顶部 返回列表