流式输出的深度剖析
之前一直在调用openai的key,只是照着文档进行流式调用,也只知其确是流式与api有所不同,而未成体系深究其实现原理。
就以openai的官方流式输出为切入。
概述
流式输出(Streaming Output)是 HTTP 响应中的一种模式,服务器可以在生成部分内容时立即将这些内容发送给客户端,而无需等待整个响应内容生成完成。这种方式常用于实时交互、高延迟操作或长时间任务中,比如 OpenAI 的 GPT 模型生成流式对话。- package main
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "fmt"
- "net/http"
- "strings"
- "time"
- )
- // 定义必要的数据结构
- type Message struct {
- Role string `json:"role"`
- Content string `json:"content"`
- }
- type RequestBody struct {
- Model string `json:"model"`
- Messages []Message `json:"messages"`
- Temperature float64 `json:"temperature"`
- Stream bool `json:"stream"`
- }
- type Choice struct {
- Delta struct {
- Content string `json:"content"`
- } `json:"delta"`
- }
- type ResponseBody struct {
- Choices []Choice `json:"choices"`
- }
- const (
- apiURL = "https://api.example.com/v1/chat/completions" // 替换为实际的 API 地址
- authToken = "your-auth-token" // 替换为实际的 Token
- model = "gpt-3.5-turbo"
- temperature = 0.7
- )
- func StreamHandler(w http.ResponseWriter, r *http.Request) {
- // 从查询参数获取输入内容
- content := r.URL.Query().Get("content")
- if content == "" {
- http.Error(w, "Missing 'content' parameter", http.StatusBadRequest)
- return
- }
- // 构造请求体
- message := Message{
- Role: "user",
- Content: content,
- }
- requestBody := RequestBody{
- Model: model,
- Messages: []Message{message},
- Temperature: temperature,
- Stream: true,
- }
- jsonData, err := json.Marshal(requestBody)
- if err != nil {
- http.Error(w, "Failed to marshal request body", http.StatusInternalServerError)
- return
- }
- // 创建 HTTP 请求
- req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonData))
- if err != nil {
- http.Error(w, "Failed to create request", http.StatusInternalServerError)
- return
- }
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("Authorization", "Bearer "+authToken)
- // 设置 HTTP 客户端
- client := &http.Client{Timeout: time.Second * 50}
- resp, err := client.Do(req)
- if err != nil {
- http.Error(w, "Failed to get response", http.StatusInternalServerError)
- return
- }
- defer resp.Body.Close()
- // 设置响应头,开启流式输出
- w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
- // 确保 ResponseWriter 支持 Flusher
- flusher, ok := w.(http.Flusher)
- if !ok {
- http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
- return
- }
- // 处理流式响应
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- line := scanner.Text()
- // 处理以 "data: " 开头的行
- if strings.HasPrefix(line, "data: ") {
- line = strings.TrimPrefix(line, "data: ")
- }
- if line == "[DONE]" {
- break
- }
- if line == "" {
- continue
- }
- // 解析响应内容
- var chunk ResponseBody
- if err := json.Unmarshal([]byte(line), &chunk); err != nil {
- continue
- }
- // 将响应数据逐步发送给客户端
- for _, choice := range chunk.Choices {
- content := choice.Delta.Content
- _, err := w.Write([]byte(content))
- if err != nil {
- http.Error(w, "Failed to write response", http.StatusInternalServerError)
- return
- }
- flusher.Flush() // 刷新缓冲区
- }
- }
- if err := scanner.Err(); err != nil {
- http.Error(w, "Scanner error", http.StatusInternalServerError)
- return
- }
- }
- func main() {
- http.HandleFunc("/stream", StreamHandler)
- fmt.Println("Server started at :8080")
- http.ListenAndServe(":8080", nil)
- }
复制代码 核心流程
接收到用户输入后,将其作为 content 参数发送给目标 API。
开启流式输出模式,设置 Stream: true。
使用 http.Flusher 将从远程接口接收到的内容逐步发送给客户端。
关键点
1.流式响应头设置- w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
复制代码 实时输出: 通过 w.Write 输出内容后调用 flusher.Flush() 确保数据实时发送。
启动服务后,通过浏览器访问类似以下 URL:- http://localhost:8080/stream?content=Hello%20world
复制代码 客户端会逐步接收内容,类似命令行实时打印。
1. HTTP 协议中的流式响应
流式输出利用 HTTP 协议的特性,不关闭连接,逐步将数据发送给客户端。典型流式响应会设置如下 HTTP Header:
Content-Type: text/event-stream表示这是一个事件流(Event Stream),用于向客户端连续发送数据片段。
Cache-Control: no-cache防止响应被缓存,以确保客户端接收到实时内容。
Connection: keep-alive 保持连接处于活跃状态,支持多次数据传输。
2. 流式输出的工作原理
客户端发起请求,服务器在接收到请求后开始响应。
服务器不一次性生成完整的响应内容,而是将生成的部分数据逐段发送。
客户端收到数据后立即处理,而无需等待完整响应结束。
在数据发送完成后,服务器可以选择关闭连接或保持连接以发送后续数据。
流式输出的常见应用场景
实时聊天:聊天模型逐词/逐句生成时,可以实时传输数据。
日志监控:将服务器的实时日志逐行推送到前端。
流式文件传输:如大文件或视频流传输。
实时进度更新:如任务进度条更新。
到此这篇关于使用Golang实现流式输出的文章就介绍到这了,更多相关Golang流式输出内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
来源:互联网
免责声明:如果侵犯了您的权益,请联系站长(1277306191@qq.com),我们会及时删除侵权内容,谢谢合作! |
|