Android 处理流式响应

对于像 SSE 这种协议的使用,使用官方或者开源社区贡献的轮子无疑是最好的选择,毕竟这些轮子都是进过各种场景验证的,能覆盖更多场景,同时也更加的完善,可以尽可能避免我们被同一块石头再次绊倒。

前言

上一篇 Stream Response 流式返回 中学习了流式返回的概念,并学习了在浏览器端处理流式响应的两种方式。下面看看如何在 Android 原生实现中处理流式返回。

响应 Stream Response

以最常用的 okhttp 为例,我们看看如何实现。

okhttp 处理

  • 创建 client 和 request
kotlin
复制代码
object OkHttpUtil { private var url = "http://localhost:8199/stream_chat" val request = Request.Builder().url(this.url).build() val client = OkHttpClient.Builder().addInterceptor { println("request is ${it.request()}") val response = it.proceed(it.request()) println("response body header n${response.headers}") println("response body length ${response.body?.contentLength()}") response }.build() .... }

这里在拦截器中打印请求和响应的部分信息,可以观察一下流式响应是否符合预期。

  • 进行异步请求
kotlin
复制代码
fun asyncCall() { val call: Call = client.newCall(request) call.enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { println("onFailure() called with: call = $call, e = $e") if (call.isCanceled().not()) { call.cancel() } } override fun onResponse(call: Call, response: Response) { println("onResponse() called with: call = $call, response = $response") response.body?.let { val reader = BufferedReader(InputStreamReader(it.byteStream())) var line = reader.readLine() while (line != null) { println(line) line = reader.readLine() } } } }) }

通过 okhttp 发起异步请求,按照之前的约定 Response 是按行返回的(这也是服务端生成响应时为什么要手动添加 nn 的原因,就是为了发起请求的一方可以根据换行进行处理),因此这里对于 response 的 body 按照行进行读取,直到结束。

shell
复制代码
request is Request{method=GET, url=http://localhost:8199/stream_chat} response body header Cache-Control: no-cache Connection: keep-alive Content-Type: text/event-stream Date: Sun, 31 Mar 2024 01:24:34 GMT Transfer-Encoding: chunked response body length -1 onResponse() called with: call = okhttp3.internal.connection.RealCall@1f1f0603, response = Response{protocol=http/1.1, code=200, message=OK, url=http://localhost:8199/stream_chat} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"服"}} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"务"}} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"端"}} ... data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"了"}} data: {"code":200,"message":"success","event":{"id":"7e304d9f0ebe42758b4d1cf9a048f295","data":"。"}} EOF

从返回结果可以看到,response 的 header 信息中 Content-Type 就是 text/event-stream 。对于返回结果按行打印可以看到,这里和之前在浏览器上用 fetch 处理的结果是一样的,不是结构化的数据,并不是一个标准的 json ,需要对返回结果进行特殊处理。对于这种情况,我们可以借助 okttp 中对 SSE 的支持来实现,毕竟 SSE 是 Web 中定义的标准组件,对于这种数据格式的处理,除了借助 EventSource 还可以使用其他的方式。

okhttp-sse

这里需要首先需要添加 okttp-sse 的依赖

gradle
复制代码
implementation 'com.squareup.okhttp3:okhttp-sse:4.12.0'

okhttp-sse 的 API 设计和 EventSource 相似,使用方法也很简单

kotlin
复制代码
sseHandler() { val TAG = "OkHttpUtil" val sseListener = object : EventSourceListener() { override fun onOpen(eventSource: EventSource, response: Response) { super.onOpen(eventSource, response) log( TAG, "onOpen() called with: eventSource = $eventSource, response = $response" ) } override fun onEvent( eventSource: EventSource, id: String?, type: String?, data: String ) { super.onEvent(eventSource, id, type, data) log( TAG, "onEvent() called with: id = $id, type = $type, data = $data" ) } override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) { super.onFailure(eventSource, t, response) log( TAG, "onFailure() called with: eventSource = $eventSource, t = $t, response = $response" ) } override fun onClosed(eventSource: EventSource) { super.onClosed(eventSource) log(TAG, "onClosed() called with: eventSource = $eventSource") } } val eventSource = RealEventSource(request, sseListener) eventSource.connect(client) }

EventSourceListener 这个抽象类有四个方法,顾名思义就是用来区分整个请求过程的各个阶段。

可以看到这个用法几乎和浏览器上的 EventSource 一模一样,唯一的区别就是浏览器上不用再主动触发 connect 操作,这里需要主动调用 connect 和相应的 client 进行配合。

java
复制代码
OKhttpUtil:onOpen() called with: eventSource = okhttp3.internal.sse.RealEventSource@735957be, response = Response{protocol=http/1.1, code=200, message=OK, url=http://localhost:8199/stream_chat} OKhttpUtil:onEvent() called with: id = null, type = null, data = {"code":200,"message":"success","event":{"id":"a7bb4c79fdd3470bad61c1ade5a15031","data":"服"}} ... OKhttpUtil:onEvent() called with: id = null, type = null, data = {"code":200,"message":"success","event":{"id":"a7bb4c79fdd3470bad61c1ade5a15031","data":"了"}} OKhttpUtil:onEvent() called with: id = null, type = null, data = {"code":200,"message":"success","event":{"id":"a7bb4c79fdd3470bad61c1ade5a15031","data":"。"}} OKhttpUtil:onClosed() called with: eventSource = okhttp3.internal.sse.RealEventSource@735957be

从输出结果可以看到,用 okhttp-sse 非常方便,对于 SSE 协议的支持很完善。data 中返回的内容已经是标准的 Json 格式数据了。

这样我们就可以借助 kotlin 超强的语法糖创建一个通用的方法来处理 SSE 的响应结果了。

支持泛型的 SSE Handler

kotlin
复制代码
inline fun <reified T> sseHandler(noinline callback: ((T) -> Unit)? = null) { val TAG = "OkHttpUtil" val sseListener = object : EventSourceListener() { override fun onEvent( eventSource: EventSource, id: String?, type: String?, data: String ) { super.onEvent(eventSource, id, type, data) val result = jsonToObj<T>(data) result?.let { callback?.invoke(it) } } ... } ... } inline fun <reified T> jsonToObj(json: String): T? { val moshi = Moshi.Builder().add(KotlinJsonAdapterFactory()).build() val adapter = moshi.adapter(T::class.java) return adapter.fromJson(json) }

这里直接对 onEvent 方法返回的 data 字段进行 Json 解析。 可以简单声明一个返回结果的数据类验证一下。

kotlin
复制代码
data class SSEResult(val id: String, val data: String) data class SSResponse(val code: Int, val message: String, val event: SSEResult) OkHttpUtil.sseHandler<SSResponse> { println(it.event) }
shell
复制代码
SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=服) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=务) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=端) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=实) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=时) .... SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=回) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=消) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=息) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=了) SSEResult(id=c4c56c9be22146f289619bdc3d1f4b9d, data=。)

可以看到使用最终返的数据格式已经可以符合按照预期的结果了。

okhttp-sse 实现原理

这里可以再简单了解一下 okhttp-sse 是如何处理流式响应的。

  • RealEventSource
kotlin
复制代码
fun processResponse(response: Response) { response.use { if (!response.isSuccessful) { listener.onFailure(this, null, response) return } val body = response.body!! if (!body.isEventStream()) { listener.onFailure(this, IllegalStateException("Invalid content-type: ${body.contentType()}"), response) return } // This is a long-lived response. Cancel full-call timeouts. call.timeoutEarlyExit() // Replace the body with an empty one so the callbacks can't see real data. val response = response.newBuilder() .body(EMPTY_RESPONSE) .build() val reader = ServerSentEventReader(body.source(), this) try { listener.onOpen(this, response) while (reader.processNextEvent()) { } } catch (e: Exception) { listener.onFailure(this, e, response) return } listener.onClosed(this) } } private fun ResponseBody.isEventStream(): Boolean { val contentType = contentType() ?: return false return contentType.type == "text" && contentType.subtype == "event-stream" }

这段代码很好理解,简单而优雅的呈现了处理服务端响应的各种场景。

  • 首先,处理请求是否成功相应
  • 其次,判断返回结果是否为流式响应。在没看到这断代码之前,脑海中的实现方式只有 contentType == "text/event-stream" ,而这里的实现方式就显得更加的优雅,完美的诠释了面向对象编程,一切内容可以封装为对象。
  • 流式响应的时间一般会成长,因此这里取消 OkHttp 固有的超时退出机制,避免请求被打断。
  • 对于 ResponseBody 返回了一个空结果,避免下游检测 ResponseBody 为空时出现异常。
  • 创建 ServerSentEventReader 对流式返回结果进行读取
  • 最后关闭连接。

可以看到最关键的实现逻辑就在 reader.processNextEvent()

  • ServerSentEventReader
kotlin
复制代码
class ServerSentEventReader( private val source: BufferedSource, private val callback: Callback ) { private var lastId: String? = null interface Callback { fun onEvent(id: String?, type: String?, data: String) fun onRetryChange(timeMs: Long) } /** * Process the next event. This will result in a single call to [Callback.onEvent] *unless* the * data section was empty. Any number of calls to [Callback.onRetryChange] may occur while * processing an event. * * @return false when EOF is reached */ @Throws(IOException::class) fun processNextEvent(): Boolean { var id = lastId var type: String? = null val data = Buffer() while (true) { when (source.select(options)) { in 0..2 -> { completeEvent(id, type, data) return true } in 3..4 -> { source.readData(data) } ..... } } } @Throws(IOException::class) private fun completeEvent(id: String?, type: String?, data: Buffer) { if (data.size != 0L) { lastId = id data.skip(1L) // Leading newline. callback.onEvent(id, type, data.readUtf8()) } } companion object { val options = Options.of( /* 0 */ "rn".encodeUtf8(), /* 1 */ "r".encodeUtf8(), /* 2 */ "n".encodeUtf8(), /* 3 */ "data: ".encodeUtf8(), /* 4 */ "data:".encodeUtf8(), ) ... } }

这里我们截取部分代码实现,具体其实没有什么特别之处,就是基于返回的 String 字符串内容进行判断处理,遇到行末的换行符就返回结果,否则就一次读取中间的 data,id,event 字段。同时还会兼顾处理重试的逻辑,直到遇到 EOF 为止。相比我们直接用传统的请求方式处理,官方封装的库对于很多细节的容错性明显会更好,很多我们想不到或者未曾遇到的坑早已经处理好了。

渲染数据

我们可以简单绘制一个列表看一下,实际渲染的效果

kotlin
复制代码
@Composable fun ChatScreen(viewModel: ChatViewModel = viewModel()) { var inpuValue by remember { mutableStateOf("") } val msg by viewModel.messageList.observeAsState(ArrayList()) Column(modifier = Modifier.fillMaxSize()) { // 消息列表 MessageList(messages = temp, modifier = Modifier.weight(1f)) // 输入框和发送按钮 InputArea(inpuValue, viewModel) { inpuValue = it } } } @Composable fun InputArea(inputValue: String, viewModel: ChatViewModel, messageText: (String) -> Unit) { Row( modifier = Modifier .fillMaxWidth() .padding(8.dp), verticalAlignment = Alignment.CenterVertically ) { OutlinedTextField(value = inputValue, onValueChange = { messageText(it) }, modifier = Modifier.weight(1f), label = { Text(text = "输入消息") }, textStyle = MaterialTheme.typography.bodyMedium, keyboardOptions = KeyboardOptions(imeAction = ImeAction.Send), keyboardActions = KeyboardActions(onSend = { if (inputValue.isNotEmpty()) { sendMessage(viewModel, inputValue) messageText("") } }) ) Spacer(modifier = Modifier.width(8.dp)) Button(onClick = { if (inputValue.isNotEmpty()) { sendMessage(viewModel, inputValue) messageText("") } }) { Text("Send") } } } @Composable fun MessageList(messages: ArrayList<ChatMessage>, modifier: Modifier) { Log.d("TAG_TAG", "msg $messages") LazyColumn( modifier = modifier ) { items(messages.size, key = { it }) { index -> ChatMessageItem(message = messages[index]) } } }

大概效果是这样

Android 处理流式响应

我们在 ViewModel 中请求数据真实的数据之后,更新 LiveData 中的数据就可以实现列表刷新的效果了。

kotlin
复制代码
fun queryResult(response: String) { val history = _messageList.value ?: ArrayList() val lastMsg = history.last() chatBuffer.append(response) if (lastMsg.sender == "Bot") { val newMsg = ChatMessage("Bot", chatBuffer.toString(), false) history[history.size - 1] = newMsg } else { val newMsg = ChatMessage("Bot", chatBuffer.toString(), false) history.add(newMsg) } Log.d(TAG, "history $history") _messageList.postValue(history) }

可以看一下 Compose 列表刷新的效果。

Android 处理流式响应

可以看到实际效果还是挺流畅的。

ps: 用 Jetpack Compose 写列表是真的方便,不过处理起键盘还真是挺麻烦。

其他

在 OpenAI 官网的 API 介绍中,对于 SSE 的响应也是建议使用官方提供的库直接处理。避免一些不必要的问题。

Android 处理流式响应

OpenAI 推荐的社区库 中,可以看到对主流语言的支持非常好,Java/Kotinl/NodeJs/Python/Swift 等这些都有支持,甚至 Flutter 都有。

Android 处理流式响应

对于像 SSE 这种协议的使用,使用官方或者开源社区贡献的轮子无疑是最好的选择,毕竟这些轮子都是进过各种场景验证的,能覆盖更多场景,同时也更加的完善,可以尽可能避免我们被同一块石头再次绊倒。

参考文档

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

给TA打赏
共{{data.count}}人
人已打赏
人工智能

Stream Response 流式返回

2024-6-2 3:38:27

人工智能

轻松集成所有大模型——一站式大模型应用开发框架Promptulate

2024-6-2 7:38:21

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索