前言
上一篇 Stream Response 流式返回 中学习了流式返回的概念,并学习了在浏览器端处理流式响应的两种方式。下面看看如何在 Android 原生实现中处理流式返回。
响应 Stream Response
以最常用的 okhttp 为例,我们看看如何实现。
okhttp 处理
- 创建 client 和 request
kotlin复制代码object OkHttpUtil {
private var url = "http://localhost:8199/stream_chat"
val request = Request.Builder().url(this.url).build()
val client = OkHttpClient.Builder().addInterceptor {
println("request is ${it.request()}")
val response = it.proceed(it.request())
println("response body header n${response.headers}")
println("response body length ${response.body?.contentLength()}")
response
}.build()
....
}
这里在拦截器中打印请求和响应的部分信息,可以观察一下流式响应是否符合预期。
- 进行异步请求
kotlin复制代码 fun asyncCall() {
val call: Call = client.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
println("onFailure() called with: call = $call, e = $e")
if (call.isCanceled().not()) {
call.cancel()
}
}
override fun onResponse(call: Call, response: Response) {
println("onResponse() called with: call = $call, response = $response")
response.body?.let {
val reader = BufferedReader(InputStreamReader(it.byteStream()))
var line = reader.readLine()
while (line != null) {
println(line)
line = reader.readLine()
}
}
}
})
}
通过 okhttp 发起异步请求,按照之前的约定 Response 是按行返回的(这也是服务端生成响应时为什么要手动添加 nn
的原因,就是为了发起请求的一方可以根据换行进行处理),因此这里对于 response 的 body 按照行进行读取,直到结束。
shell复制代码request is Request{method=GET, url=http://localhost:8199/stream_chat} response body header Cache-Control: no-cache Connection: keep-alive Content-Type: text/event-stream Date: Sun, 31 Mar 2024 01:24:34 GMT Transfer-Encoding: chunked response body length -1 onResponse() called with: call = okhttp3.internal.connection.RealCall@1f1f0603, response = Response{protocol=http/1.1, code=200, message=OK, url=http://localhost:8199/stream_chat} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"服"}} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"务"}} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"端"}} ... data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"了"}} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"。"}} EOF
从返回结果可以看到,response 的 header 信息中 Content-Type 就是 text/event-stream
。对于返回结果按行打印可以看到,这里和之前在浏览器上用 fetch 处理的结果是一样的,不是结构化的数据,并不是一个标准的 json ,需要对返回结果进行特殊处理。对于这种情况,我们可以借助 okttp 中对 SSE 的支持来实现,毕竟 SSE 是 Web 中定义的标准组件,对于这种数据格式的处理,除了借助 EventSource 还可以使用其他的方式。
okhttp-sse
这里需要首先需要添加 okttp-sse 的依赖
gradle复制代码implementation 'com.squareup.okhttp3:okhttp-sse:4.12.0'
okhttp-sse 的 API 设计和 EventSource 相似,使用方法也很简单
kotlin复制代码 sseHandler() {
val TAG = "OkHttpUtil"
val sseListener = object : EventSourceListener() {
override fun onOpen(eventSource: EventSource, response: Response) {
super.onOpen(eventSource, response)
log(
TAG, "onOpen() called with: eventSource = $eventSource, response = $response"
)
}
override fun onEvent(
eventSource: EventSource, id: String?, type: String?, data: String
) {
super.onEvent(eventSource, id, type, data)
log(
TAG,
"onEvent() called with: id = $id, type = $type, data = $data"
)
}
override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) {
super.onFailure(eventSource, t, response)
log(
TAG,
"onFailure() called with: eventSource = $eventSource, t = $t, response = $response"
)
}
override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource)
log(TAG, "onClosed() called with: eventSource = $eventSource")
}
}
val eventSource = RealEventSource(request, sseListener)
eventSource.connect(client)
}
EventSourceListener 这个抽象类有四个方法,顾名思义就是用来区分整个请求过程的各个阶段。
可以看到这个用法几乎和浏览器上的 EventSource 一模一样,唯一的区别就是浏览器上不用再主动触发 connect 操作,这里需要主动调用 connect 和相应的 client 进行配合。
java复制代码OKhttpUtil:onOpen() called with: eventSource = okhttp3.internal.sse.RealEventSource@735957be, response = Response{protocol=http/1.1, code=200, message=OK, url=http://localhost:8199/stream_chat}
OKhttpUtil:onEvent() called with: id = null, type = null, data = {"code":200,"message":"success","event":{"id":"a7bb4c79fdd3470bad61c1ade5a15031","data":"服"}}
...
OKhttpUtil:onEvent() called with: id = null, type = null, data = {"code":200,"message":"success","event":{"id":"a7bb4c79fdd3470bad61c1ade5a15031","data":"了"}}
OKhttpUtil:onEvent() called with: id = null, type = null, data = {"code":200,"message":"success","event":{"id":"a7bb4c79fdd3470bad61c1ade5a15031","data":"。"}}
OKhttpUtil:onClosed() called with: eventSource = okhttp3.internal.sse.RealEventSource@735957be
从输出结果可以看到,用 okhttp-sse 非常方便,对于 SSE 协议的支持很完善。data 中返回的内容已经是标准的 Json 格式数据了。
这样我们就可以借助 kotlin 超强的语法糖创建一个通用的方法来处理 SSE 的响应结果了。
支持泛型的 SSE Handler
kotlin复制代码 inline fun <reified T> sseHandler(noinline callback: ((T) -> Unit)? = null) {
val TAG = "OkHttpUtil"
val sseListener = object : EventSourceListener() {
override fun onEvent(
eventSource: EventSource, id: String?, type: String?, data: String
) {
super.onEvent(eventSource, id, type, data)
val result = jsonToObj<T>(data)
result?.let {
callback?.invoke(it)
}
}
...
}
...
}
inline fun <reified T> jsonToObj(json: String): T? {
val moshi = Moshi.Builder().add(KotlinJsonAdapterFactory()).build()
val adapter = moshi.adapter(T::class.java)
return adapter.fromJson(json)
}
这里直接对 onEvent 方法返回的 data 字段进行 Json 解析。 可以简单声明一个返回结果的数据类验证一下。
kotlin复制代码data class SSEResult(val id: String, val data: String)
data class SSResponse(val code: Int, val message: String, val event: SSEResult)
OkHttpUtil.sseHandler<SSResponse> {
println(it.event)
}
shell复制代码SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=服) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=务) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=端) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=实) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=时) .... SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=回) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=消) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=息) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=了) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=。)
可以看到使用最终返的数据格式已经可以符合按照预期的结果了。
okhttp-sse 实现原理
这里可以再简单了解一下 okhttp-sse 是如何处理流式响应的。
- RealEventSource
kotlin复制代码fun processResponse(response: Response) {
response.use {
if (!response.isSuccessful) {
listener.onFailure(this, null, response)
return
}
val body = response.body!!
if (!body.isEventStream()) {
listener.onFailure(this,
IllegalStateException("Invalid content-type: ${body.contentType()}"), response)
return
}
// This is a long-lived response. Cancel full-call timeouts.
call.timeoutEarlyExit()
// Replace the body with an empty one so the callbacks can't see real data.
val response = response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
val reader = ServerSentEventReader(body.source(), this)
try {
listener.onOpen(this, response)
while (reader.processNextEvent()) {
}
} catch (e: Exception) {
listener.onFailure(this, e, response)
return
}
listener.onClosed(this)
}
}
private fun ResponseBody.isEventStream(): Boolean {
val contentType = contentType() ?: return false
return contentType.type == "text" && contentType.subtype == "event-stream"
}
这段代码很好理解,简单而优雅的呈现了处理服务端响应的各种场景。
- 首先,处理请求是否成功相应
- 其次,判断返回结果是否为流式响应。在没看到这断代码之前,脑海中的实现方式只有
contentType == "text/event-stream"
,而这里的实现方式就显得更加的优雅,完美的诠释了面向对象编程,一切内容可以封装为对象。 - 流式响应的时间一般会成长,因此这里取消 OkHttp 固有的超时退出机制,避免请求被打断。
- 对于 ResponseBody 返回了一个空结果,避免下游检测 ResponseBody 为空时出现异常。
- 创建 ServerSentEventReader 对流式返回结果进行读取
- 最后关闭连接。
可以看到最关键的实现逻辑就在 reader.processNextEvent()
里
- ServerSentEventReader
kotlin复制代码class ServerSentEventReader(
private val source: BufferedSource,
private val callback: Callback
) {
private var lastId: String? = null
interface Callback {
fun onEvent(id: String?, type: String?, data: String)
fun onRetryChange(timeMs: Long)
}
/**
* Process the next event. This will result in a single call to [Callback.onEvent] *unless* the
* data section was empty. Any number of calls to [Callback.onRetryChange] may occur while
* processing an event.
*
* @return false when EOF is reached
*/
@Throws(IOException::class)
fun processNextEvent(): Boolean {
var id = lastId
var type: String? = null
val data = Buffer()
while (true) {
when (source.select(options)) {
in 0..2 -> {
completeEvent(id, type, data)
return true
}
in 3..4 -> {
source.readData(data)
}
.....
}
}
}
@Throws(IOException::class)
private fun completeEvent(id: String?, type: String?, data: Buffer) {
if (data.size != 0L) {
lastId = id
data.skip(1L) // Leading newline.
callback.onEvent(id, type, data.readUtf8())
}
}
companion object {
val options = Options.of(
/* 0 */ "rn".encodeUtf8(),
/* 1 */ "r".encodeUtf8(),
/* 2 */ "n".encodeUtf8(),
/* 3 */ "data: ".encodeUtf8(),
/* 4 */ "data:".encodeUtf8(),
)
...
}
}
这里我们截取部分代码实现,具体其实没有什么特别之处,就是基于返回的 String 字符串内容进行判断处理,遇到行末的换行符就返回结果,否则就一次读取中间的 data,id,event 字段。同时还会兼顾处理重试的逻辑,直到遇到 EOF 为止。相比我们直接用传统的请求方式处理,官方封装的库对于很多细节的容错性明显会更好,很多我们想不到或者未曾遇到的坑早已经处理好了。
渲染数据
我们可以简单绘制一个列表看一下,实际渲染的效果
kotlin复制代码@Composable
fun ChatScreen(viewModel: ChatViewModel = viewModel()) {
var inpuValue by remember { mutableStateOf("") }
val msg by viewModel.messageList.observeAsState(ArrayList())
Column(modifier = Modifier.fillMaxSize()) {
// 消息列表
MessageList(messages = temp, modifier = Modifier.weight(1f))
// 输入框和发送按钮
InputArea(inpuValue, viewModel) {
inpuValue = it
}
}
}
@Composable
fun InputArea(inputValue: String, viewModel: ChatViewModel, messageText: (String) -> Unit) {
Row(
modifier = Modifier
.fillMaxWidth()
.padding(8.dp),
verticalAlignment = Alignment.CenterVertically
) {
OutlinedTextField(value = inputValue,
onValueChange = { messageText(it) },
modifier = Modifier.weight(1f),
label = { Text(text = "输入消息") },
textStyle = MaterialTheme.typography.bodyMedium,
keyboardOptions = KeyboardOptions(imeAction = ImeAction.Send),
keyboardActions = KeyboardActions(onSend = {
if (inputValue.isNotEmpty()) {
sendMessage(viewModel, inputValue)
messageText("")
}
})
)
Spacer(modifier = Modifier.width(8.dp))
Button(onClick = {
if (inputValue.isNotEmpty()) {
sendMessage(viewModel, inputValue)
messageText("")
}
}) {
Text("Send")
}
}
}
@Composable
fun MessageList(messages: ArrayList<ChatMessage>, modifier: Modifier) {
Log.d("TAG_TAG", "msg $messages")
LazyColumn(
modifier = modifier
) {
items(messages.size, key = { it }) { index ->
ChatMessageItem(message = messages[index])
}
}
}
大概效果是这样
我们在 ViewModel 中请求数据真实的数据之后,更新 LiveData 中的数据就可以实现列表刷新的效果了。
kotlin复制代码 fun queryResult(response: String) {
val history = _messageList.value ?: ArrayList()
val lastMsg = history.last()
chatBuffer.append(response)
if (lastMsg.sender == "Bot") {
val newMsg = ChatMessage("Bot", chatBuffer.toString(), false)
history[history.size - 1] = newMsg
} else {
val newMsg = ChatMessage("Bot", chatBuffer.toString(), false)
history.add(newMsg)
}
Log.d(TAG, "history $history")
_messageList.postValue(history)
}
可以看一下 Compose 列表刷新的效果。
可以看到实际效果还是挺流畅的。
ps: 用 Jetpack Compose 写列表是真的方便,不过处理起键盘还真是挺麻烦。
其他
在 OpenAI 官网的 API 介绍中,对于 SSE 的响应也是建议使用官方提供的库直接处理。避免一些不必要的问题。
在OpenAI 推荐的社区库 中,可以看到对主流语言的支持非常好,Java/Kotinl/NodeJs/Python/Swift 等这些都有支持,甚至 Flutter 都有。
对于像 SSE 这种协议的使用,使用官方或者开源社区贡献的轮子无疑是最好的选择,毕竟这些轮子都是进过各种场景验证的,能覆盖更多场景,同时也更加的完善,可以尽可能避免我们被同一块石头再次绊倒。