本次通义千问的流式传输的核心方案是:基于http text/event-stream 事件流
下面直接上代码
/**
* 通义千问流式请求
* @param messages
* @param modelType
*/
async TYQWSteamRequest(messages: chatBaseMessages[], modelType: string) {
const payload = {
model: modelType || 'qwen-max', // 模型名称 必填
input: {
messages: messages.map(({ role, content }) => ({ role, content })),
},
stream: true,
};
// 请求埋点
this.logger.info(`TYQWSteamRequest: reqBody =>{JSON.stringify(payload)};`);
const response = await this.ctx.curl(
'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation',
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
'X-DashScope-SSE': 'enable',
},
streaming: true, // 启用流式响应
// 请求数据类型
data: payload,
timeout: 60 * 1000,
}
);
// 处理流式响应
if (response.status === 200) {
// 获取node原生的响应对象
const stream = response.res;
return stream;
} else {
console.error(`请求失败,状态码:{response.status}`);
}
}
这段代码是一个异步函数 TYQWSteamRequest
,用于发起通用千问(TYQW)的流式请求。让我们一起逐行来看一下:
async TYQWSteamRequest(messages: chatBaseMessages[], modelType: string) {
: 这是一个异步函数,接受两个参数 messages
和 modelType
。
const payload = { ... }
: 创建了一个包含请求参数的对象 payload
。其中:
model
: 模型名称,默认为 ‘qwen-max’。input
: 输入对象,包含一个 messages
数组,该数组是根据传入的 messages
参数映射而来的,保留了每个消息的角色和内容。stream
: 设置为 true
,表示启用流式请求。this.logger.info(
TYQWSteamRequest: reqBody =>${JSON.stringify(payload)};);
: 这里记录了请求的详细信息,以便日志跟踪。
const response = await this.ctx.curl('https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation', { ... }
: 这里使用了 curl
方法来发起 HTTP POST 请求。请求的 URL 是 https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation
。参数配置包括:
method
: 请求方法为 POST。headers
: 请求头包含了 Content-Type
和 Accept
,分别设置为 application/json
和 text/event-stream
,还有一个自定义的头部 X-DashScope-SSE: enable
。streaming: true
: 启用了流式响应。data
: 请求体,设置为之前定义的 payload
对象。timeout
: 设置请求超时时间为 60 秒。if (response.status === 200) { ... } else { ... }
: 处理响应。如果响应状态码为 200,表示请求成功,则返回原生响应流 stream
;否则输出错误信息。
总体来说,这段代码是用于向指定的 API 发送流式请求,并返回响应流,以便进一步处理。
/**
* 提取stream流中的data部分
* @param streamData
* @returns
*/
export const getStreamDataWithJson = (streamData: string) => {
const dataRegex = /data:({.*})/;
const matches = streamData.match(dataRegex);
if (matches && matches.length > 1) {
return matches[1];
}
return null;
};
/**
* 统一处理stream流数据格式
* @param streamData
* @param modelType
*/
export const streamFormater = (streamData: string, modelType: string) => {
if (qytwModels.some(({ value }) => value === modelType)) {
const fromatData = getStreamDataWithJson(streamData);
const objectData = JSON.parse(fromatData);
return objectData?.output?.text;
}
throw new Error('模型暂不支持');
};
/**
* 数据流对话接口
* @param sendChatMessageParams
*/
@Post('/send_chat_message_by_stream')
async sendChatMessageByStream(@Body() body: sendChatMessageParams) {
const { sessionKey, input, modelType } = body;
if (!sessionKey) throw new Error('缺少参数[sessionKey]');
if (!input) throw new Error('缺少参数[input]');
if (!modelType) throw new Error('缺少参数[modelType]');
// 1.历史消息
const { messages, creator } = await this.AiChatService.getAiChatDetailData(
sessionKey
);
if (creator && creator !== this.ctx.user.workid)
throw new Error('禁止操作他人的会话');
// 2. 开始对话
let chatResult = ``;
const reqTime = moment().format('YYYY-MM-DD HH:mm:ss');
const stream = await this.AiChatService.sendChatMessageByStream(
messages,
input,
modelType
);
// 3.处理响应
// 3.1 设置客户端响应头,这里是返回给浏览器
this.ctx.status = 200;
this.ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
// 3.2 监听数据变化
stream.on('data', chunk => {
const textData = chunk.toString();
const formatData = streamFormater(textData, modelType);
chatResult = formatData;
this.ctx.res.write(JSON.stringify(this.HttpReturn.success(formatData)));
});
// 3.3 监听结束事件
stream.on('end', async () => {
const resTime = moment().format('YYYY-MM-DD HH:mm:ss');
// 3.3.1 记录对话信息
await this.AiChatService.setMessagesBySessionKey(sessionKey, {
modelType,
messages: [
...messages,
{
role: roleType.user,
content: input,
time: reqTime,
},
{
role: roleType.assistant,
content: chatResult,
time: resTime,
},
],
});
// 3.3.2 标记对话完成
this.ctx.res.write(
JSON.stringify(this.HttpReturn.success(`event-stream:end`))
);
});
// 3.4 监听错误事件
stream.on('error', err => {
this.ctx.res.write(`event-stream:error[{err.toString()}]`);
throw new Error(`Stream error:{err}`);
});
// 保持长链
return new Promise(resolve => {
this.ctx.req.on('end', resolve);
});
}
这段代码是一个用于处理数据流对话接口的函数 sendChatMessageByStream
。
async sendChatMessageByStream(@Body() body: sendChatMessageParams) {
: 这是一个异步函数,接受一个包含请求体的参数 body
。
参数解构:从 body
中提取 sessionKey
, input
, modelType
。
参数验证:确保 sessionKey
, input
, modelType
均存在,否则抛出错误。
获取历史消息:调用 AiChatService.getAiChatDetailData
方法获取历史消息和会话创建者。
校验会话创建者:如果会话创建者不是当前用户,则抛出错误。
发起对话:调用 AiChatService.sendChatMessageByStream
方法开始对话,获取一个数据流 stream
。
处理响应:
text/event-stream
,并且禁用缓存,保持长连接。这段代码主要是为了处理数据流式对话接口,它通过监听数据流的变化,不断地将对话内容返回给客户端,同时处理对话的开始、结束和错误等情况。
// 发起 POST 请求并处理 text/event-stream
import { ElMessage, ElNotification } from 'element-plus';
export function sendChatMessageByStreamAPI(payLoad, callback) {
fetch(`/api/chat/send_chat_message_by_stream`, {
method: 'POST',
headers: {
Accept: 'text/event-stream',
'Content-Type': 'application/json',
},
body: JSON.stringify(payLoad),
})
.then((response) => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let isFirst = true;
function read() {
reader.read().then(({ value, done }) => {
if (done) return;
const text = decoder.decode(value, { stream: true });
const { data, success, message } = JSON.parse(text);
// 1. 报错
if (!success) {
ElNotification({
title: 'Error',
message,
type: 'error',
dangerouslyUseHTMLString: true,
zIndex: 9999999,
customClass: 'network-error-notification',
});
callback(`event-stream:error`);
return;
}
// 2. 正常开始
if (isFirst) {
callback(`event-stream:start`);
}
// 3. 正常流转
callback(data);
read();
isFirst = false;
});
}
read();
})
.catch((error) => {
ElMessage.error(`Fetch error:',${error}`);
console.error('Fetch error:', error);
});
}
这段代码是一个用于发起 POST 请求并处理 text/event-stream
类型响应的函数 sendChatMessageByStreamAPI
。
export function sendChatMessageByStreamAPI(payLoad, callback) {
: 这是一个导出的函数 sendChatMessageByStreamAPI
,接受两个参数 payLoad
和 callback
。
使用 fetch
发起 POST 请求到 /api/chat/send_chat_message_by_stream
,请求头包含 Accept: 'text/event-stream'
和 Content-Type: 'application/json'
。
处理响应:
response.body.getReader()
获取一个读取器 reader
。decoder
,用于将二进制数据解码为文本。read()
用于读取响应数据。read()
函数中,调用 reader.read()
来读取数据,然后解码为文本,并根据解码后的内容进行处理。callback
并传递错误信息。callback
并传递 event-stream:start
。callback
并传递数据,然后递归调用 read()
继续读取数据。fetch
的 catch
块中处理请求错误,显示错误通知,并打印错误信息到控制台。
这段代码主要是为了发起 POST 请求,并监听来自服务器的 text/event-stream
类型响应。它使用 fetch
API 发起请求,并通过读取器来处理流式响应,然后根据响应内容执行不同的逻辑。
sendChatMessageByStreamAPI
函数发起流式请求,将请求参数传递给服务器端。TYQWSteamRequest
函数向远程服务器发送流式请求,并监听响应流的变化。这样,整个流程实现了一个完整的流式对话系统,客户端和服务器端通过流式请求和响应进行实时通信,实现了实时的对话交互。
希望这个分享能帮助大家更好的去理解通义千问流式输出的原理,并在此基础上完善自己的功能。
powered by kaifamiao