跳到主要内容

解决SSE中的粘包问题

· 阅读需 2 分钟
素明诚
Full stack development

解决问题的关键在于找到关键问题

  • 确保服务器正确地使用 SSE 协议格式,包括在事件之间使用正确的分隔符。
  • 优化服务器发送事件的逻辑,避免因为过快的发送速度而导致的事件合并。
  • 客户端应当实现健壮的数据处理逻辑,能够处理合并的事件数据,并正确地分割和解析事件。

分隔并延迟

from flask import Flask, Response, stream_with_context
import time
import json

app = Flask(__name__)

def generate_sse_events():
event_id = 1
while True:
# 模拟数据生成
data = {"time": time.time(), "msg": f"Event {event_id}"}
sse_data = f"id: {event_id}\n" \
f"event: message\n" \
f"data: {json.dumps(data)}\n\n"
yield sse_data
event_id += 1
time.sleep(1) # 引入延时以模拟数据处理时间和避免粘包

@app.route('/events')
def sse_request():
return Response(stream_with_context(generate_sse_events()),
content_type='text/event-stream')

if __name__ == '__main__':
app.run(debug=True, threaded=True)

前端优化

这是我在业务中封装的代码,有需要可以试一下

import { currentEnvironment } from "@/api/axios";

function buildApiUrl(): string {
return `${ currentEnvironment() === '/vnet' ? '/vnet/' : '' }api/chat-process`;
}

async function processStreamedData(reader: ReadableStreamDefaultReader<Uint8Array>, onDataReceived: (data: any) => void) {
let accumulatedText = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;
accumulatedText += value;

while (true) {
const newlineIndex = accumulatedText.indexOf('\n\n');
if (newlineIndex === -1) break;

let dataBlock = accumulatedText.substring(0, newlineIndex).trim();
accumulatedText = accumulatedText.substring(newlineIndex + 2);

if (dataBlock.startsWith('data: ')) {
dataBlock = dataBlock.substring(6);
try {
const json = JSON.parse(dataBlock);
onDataReceived(json);
} catch (error) {
console.error('分析JSON时出错:', error, '来自区块:', dataBlock);
}
}
}
}
}

export async function getGeneratedData(data: any, onDataReceived: (data: any) => void) {
const response = await fetch(buildApiUrl(), {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(data),
});

if (!response.body) {
throw new Error("响应体中没有可读流数据。");
}

const reader: any = response.body.pipeThrough(new TextDecoderStream()).getReader();

await processStreamedData(reader, onDataReceived);
}