Stream Response 流式返回

SSE 是一种轻量级的实时通信技术,只支持服务端向客户端单向推送。同时由于浏览器、HTTP 协议版本等因素会有诸多限制,并不是 WebSocket 的替代品。

前言

在 OpenAI 提供的 API 中对于应用的响应有一种称为 Stream Response 的格式,即流式返回。Chat Completions API 和 Assistants API 这两种形式的接口都支持流式返回。

Stream Response 流式返回

通过这里的解释说明,我们知道流式返回时,返回一个整体的一部分,为了实现这个功能,遵循了 Server-sent events 标准。为什么要返回部分?什么是 Sever-sent events ?下面就来了解一下。

为什么需要流式返回

流式返回,就是可以优先返回一个响应中的一部分。使用 ChatGPT 或者是类似产品的过程中,聊天界面的回答并不是一次性返回所有的内容,而是几个字连续不断的蹦出来,有点像打字机的效果。

这其实是由于 ChatGPT 的特性导致的,ChatGPT 本质上是基于用户的输入猜测下一个词,每一次的输出也是输入,他会连带着这个输入再次去获取下一个最合适的词(这里的词泛指一个 token)。这样一来一往,就会有两个非常明显的结论。

  1. 模型返回完整响应会比较耗时,返回结果越长,需要的时间就越多。
  2. 模型返回的响应,已经输出的内容是不会变的。

基于这两个特点,流式返回就显得很必要了。

  • 首先,接口使用流式返回的形式,可以让用户在使用的过程中优先看到一部分内容,不会陷入等待的焦虑情绪中。
  • 其次,通过流式返回部分结果,用户可以提前得知返回的结果是否是自己想要的,可以及时中断或者修改 prompt 之再次重新请求。
  • 最后,通过流式返回可以减轻服务器的压力。根据第二点的情况,用户由于对输出结果的不满意提前终止本次会话,既节省了用户时间,又避免了无效的计算。再有,通过流式返回可以极大的减轻带宽压力,减少了大量流量尖刺的出现几率。

未来随着 LLM 的逐渐发展,算力成本下降之后,响应速度理论上会越来越快,长文本能够快速输出的话,流式返回可能就没有太大的必要了

流式返回 SSE

SSE 是什么

回到使用 ChatGPT 聊天的真实场景,对于用户的一次请求,模型每次只能返回最终响应的一部分,直到模型内部的神经网络认为自己完成了这个问题的回答为止。对于模型之上的应用层服务器来说,要么等模型输出所有结果之后统一返回,要么不断把模型的输出结果实时返回给发起请求的客户端。鉴于响应速度和用户体验的考量,实时返回结果无疑是最好的选择。这里就涉及到 服务端如何把消息实时推送给客户端 这样一个问题。对于这个问题,典型的解决方式是维护长链接,实时推送消息,使用 MQTT,WebSocket 这类标准的组件。但是用这些组件解决这个问题有显得过于重和复杂。而使用 SSE 这种更加轻量级的协议可以更简单一点。

SSE(Server-Send Events)

SSE 是一种在基于浏览器的 Web 应用程序中仅从服务器向客户端发送文本消息的技术。SSE基于 HTTP 协议中的持久连接, 具有由 W3C 标准化的网络协议和 EventSource 客户端接口,作为 HTML5 标准套件的一部分。

SSE 和 WebSocket 对比

Server-Sent Events APIWebSockets API
基于 HTTP 协议基于 TCP 协议
单工,只能服务端单向发送消息全双工,可以同时发送和接收消息
轻量级,使用简单相对复杂
内置断线重连和消息追踪的功能不在协议范围内,需手动实现
文本或使用 Base64 编码和 gzip 压缩的二进制消息类型广泛
支持自定义事件类型不支持自定义事件类型
连接数 HTTP/1.1 6 个,HTTP/2 可协商(默认 100)连接数无限制

服务端协议

发送事件的服务器端脚本需要使用 text/event-stream MIME 类型响应内容。每个通知以文本块形式发送,并以一对换行符结尾。

这里引用知乎回答中对协议的介绍

Stream Response 流式返回

基于上面这个简介明了的介绍,我们就可以模拟实现一个支持 SSE 的服务端。

SSE 服务端

  • 构造 SSE 响应
type SSEResponse struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
	Event   Event  `json:"event"`
}

type Event struct {
	Id   string `json:"id"`
	Data string `json:"data"`
}

func genSSEEvent(event Event) string {
	sseResponse := SSEResponse{200, "success", event}
	result, err := json.Marshal(sseResponse)
	if err != nil {
		log.Fatal(err)
		return ""
	}
	return fmt.Sprintf("data: %snn", result)
}

这里定义 SSE 响应的数据结构和构建 SSE 的方式,由于客户端解析 SSE 格式的数据需要基于换行符确定,因此一定要要在每一条消息的末尾添加换行符。

  • 返回响应
func streamHandler(w http.ResponseWriter, r *http.Request) {
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "streaming not supported", http.StatusInternalServerError)
	}
	// 设置响应头,表明响应是流式的
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.WriteHeader(http.StatusOK)

	//
	data := "服务端实时推送数据,除了用 WebSocket 外,还可以用 HTTP 的 Server Send Event。" +
		"只要 http 返回 Content-Type 为 text/event-stream 的 header,就可以通过 stream 的方式多次返回消息了。"
	eventID := strings.ReplaceAll(uuid.New().String(), "-", "")
	runes := []rune(data)
	size := 0
	isEnd := false
	for {
		select {
		case "req done..")
			return
		case 30 * time.Millisecond):
			if size len(runes) {
				value := string(runes[size])
				event := Event{eventID, value}
				result := genSSEEvent(event)
				_, err := w.Write([]byte(result))
				if err != nil {
					fmt.Println(err.Error())
				}
				size++

			} else {
				fmt.Fprintf(w, io.EOF.Error())
				isEnd = true
			}
			flusher.Flush()
			if isEnd {
				log.Println("finish")
				return
			}
		}

	}
}

func StartStreamServer() {
	http.HandleFunc("/stream_chat", streamHandler)
	port := Port
	// 启动HTTP服务器
	err := http.ListenAndServe(port, nil)
	if err != nil {
		fmt.Println("Error:", err)
	}
}

这里通过 select 超时机制延时返回一段固定的内容,模拟 LLM 单字返回数据的效果。

SSE 浏览器端

由于 SSE 是 HTML5 标准套件的一部分,因此对于服务端 SSE 类型的消息,浏览器有天然的支持,可以直接使用特定的组件非常方便处理结果。

首先定义一个模板页面

body>
form method="post" id="simple_form">
    input type="text" id="input" name="input"/>
    input type="submit" value="send"/>
form>
br/>
div id="result" style="width: 600px">div>
script>

    const send = function () {
        let name = document.getElementById("input").value
        let resultArea = document.getElementById("result")
        resultArea.innerText = ""
        let url = "stream_chat?name=" + name
        let eventSource = new EventSource(url)

        eventSource.onopen = function (event) {
            console.log("open ", event)
        }
        eventSource.onmessage = function (event) {
            // console.log(event.data)
            const data = JSON.parse(event.data);
            resultArea.innerText += data.event.data

        }
        eventSource.onerror = function (e) {
            console.log(e)
            eventSource.close()
        }
    };
    document.getElementById("simple_form").addEventListener("submit", function (event) {
        event.preventDefault()
        send()
    })
script>
body>
html>

在这里我们通过 event.preventDefault() 拦截表单提交的默认行为,而是用自定义的 send 方法,通过 EvevtSoource 组件实现对 SSE 的请求和响应的处理。EventSource 可以监听 open ,message ,error 这几个事件,也可以通过 addEventListener 添加自定义事件。

注册一个路由渲染这个页面

	http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
		t, err := template.ParseFiles("web/stream/index.html")
		if err != nil {
			log.Fatal(err)
		}
		log.Println(t.Execute(writer, nil))
	})

可以看一下具体的效果

Stream Response 流式返回

同时也可以看到返回的内容

Stream Response 流式返回

可以看到通过 EventSource 已经可以正常接收服务端返回的 SSE 格式的消息了。

Fetch

虽然使用 EventSource 非常方便,但是 ChatGPT 并没有使用,这一点通过浏览器查看请求时的网络调用就可以看到,而是使用了 Fetch。因为浏览器端的 EventSource 有一些限制。包括阿里的通义千问也是类似的实现,每一次请求都是通过 Post 发起。

Stream Response 流式返回

SSE 的缺点

  • 只支持 GET 请求,那么请求长度是有限制的,这对于使用场景就有很打的约束了,用户输入的内容会受限制。
  • 无法自定义 Header
  • 兼容性,当不通过 HTTP/2 使用时,SSE(server-sent events)会受到最大连接数的限制,这在打开多个选项卡时特别麻烦,因为该限制是针对每个浏览器的,并且被设置为一个非常低的数字(6)。该问题在 Chrome 和 Firefox 中被标记为“不会解决”。此限制是针对每个浏览器 + 域的,因此这意味着你可以跨所有选项卡打开 6 个 SSE 连接到 www.example1.com,并打开 6 个 SSE 连接到 www.example2.com。(来自 Stackoverflow)。使用 HTTP/2 时,同一时间内 HTTP 最大连接数由服务器和客户端之间协商(默认为 100)。

因此,我们可以通过浏览器自带的 fetch 进行请求,只不过会稍微有点麻烦。

用 fetch 处理 SSE 消息

可以看一下输出

Stream Response 流式返回

通过日志可以看到,直接通过 fetch 读取响应时,是按照服务端生成 SSE 响应时 fmt.Sprintf("data: %snn", result) 的格式返回的。这里其实就不是一个标准的 json 格式,需要进行一些处理了。当然,也可以在服务端对输出格式进行简化,但是这样又无法被 EventSource 处理了。因此,使用 SSE 要结合实际的场景。

小结

SSE 是一种轻量级的实时通信技术,只支持服务端向客户端单向推送。同时由于浏览器、HTTP 协议版本等因素会有诸多限制,并不是 WebSocket 的替代品。对外暴露接口、API 可以通过 SSE 封装,但是实际使用时,当遇到限制的时候,可以进行恰当的改造,满足不同的场景。当然,对于一些对实时性要求更高的场景,那么就要考虑使用 WebSocket 之类其他的组件了。

参考文档

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

给TA打赏
共{{data.count}}人
人已打赏
人工智能

【AI赋能前端研发】落地全流程分析

2024-6-2 1:38:29

人工智能

Android 处理流式响应

2024-6-2 5:38:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索