开发喵星球

通义千问ChatAPI流式传输案例

通义千问ChatAPI流式传输案例

本次通义千问的流式传输的核心方案是:基于http text/event-stream 事件流

下面直接上代码

请求配置

  /**
   * 通义千问流式请求
   * @param messages
   * @param modelType
   */
  async TYQWSteamRequest(messages: chatBaseMessages[], modelType: string) {
    const payload = {
      model: modelType || 'qwen-max', // 模型名称 必填
      input: {
        messages: messages.map(({ role, content }) => ({ role, content })),
      },
      stream: true,
    };
    // 请求埋点
    this.logger.info(`TYQWSteamRequest: reqBody =>{JSON.stringify(payload)};`);
    const response = await this.ctx.curl(
      'https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation',
      {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          Accept: 'text/event-stream',
          'X-DashScope-SSE': 'enable',
        },
        streaming: true, // 启用流式响应
        // 请求数据类型
        data: payload,
        timeout: 60 * 1000,
      }
    );
    // 处理流式响应
    if (response.status === 200) {
      // 获取node原生的响应对象
      const stream = response.res;
      return stream;
    } else {
      console.error(`请求失败,状态码:{response.status}`);
    }
  }

代码解析

这段代码是一个异步函数 TYQWSteamRequest,用于发起通用千问(TYQW)的流式请求。让我们一起逐行来看一下:

  1. async TYQWSteamRequest(messages: chatBaseMessages[], modelType: string) {: 这是一个异步函数,接受两个参数 messagesmodelType

  2. const payload = { ... }: 创建了一个包含请求参数的对象 payload。其中:

    • model: 模型名称,默认为 ‘qwen-max’。
    • input: 输入对象,包含一个 messages 数组,该数组是根据传入的 messages 参数映射而来的,保留了每个消息的角色和内容。
    • stream: 设置为 true,表示启用流式请求。
  3. this.logger.info(TYQWSteamRequest: reqBody =>${JSON.stringify(payload)};);: 这里记录了请求的详细信息,以便日志跟踪。

  4. const response = await this.ctx.curl('https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation', { ... }: 这里使用了 curl 方法来发起 HTTP POST 请求。请求的 URL 是 https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation。参数配置包括:

    • method: 请求方法为 POST。
    • headers: 请求头包含了 Content-TypeAccept,分别设置为 application/jsontext/event-stream,还有一个自定义的头部 X-DashScope-SSE: enable
    • streaming: true: 启用了流式响应。
    • data: 请求体,设置为之前定义的 payload 对象。
    • timeout: 设置请求超时时间为 60 秒。
  5. if (response.status === 200) { ... } else { ... }: 处理响应。如果响应状态码为 200,表示请求成功,则返回原生响应流 stream;否则输出错误信息。

总体来说,这段代码是用于向指定的 API 发送流式请求,并返回响应流,以便进一步处理。

服务中间流程处理

/**
 * 提取stream流中的data部分
 * @param streamData
 * @returns
 */
export const getStreamDataWithJson = (streamData: string) => {
  const dataRegex = /data:({.*})/;
  const matches = streamData.match(dataRegex);
  if (matches && matches.length > 1) {
    return matches[1];
  }
  return null;
};

/**
 * 统一处理stream流数据格式
 * @param streamData
 * @param modelType
 */
export const streamFormater = (streamData: string, modelType: string) => {
  if (qytwModels.some(({ value }) => value === modelType)) {
    const fromatData = getStreamDataWithJson(streamData);
    const objectData = JSON.parse(fromatData);
    return objectData?.output?.text;
  }
  throw new Error('模型暂不支持');
};


/**
   * 数据流对话接口
   * @param sendChatMessageParams
   */
  @Post('/send_chat_message_by_stream')
  async sendChatMessageByStream(@Body() body: sendChatMessageParams) {
    const { sessionKey, input, modelType } = body;
    if (!sessionKey) throw new Error('缺少参数[sessionKey]');
    if (!input) throw new Error('缺少参数[input]');
    if (!modelType) throw new Error('缺少参数[modelType]');
    // 1.历史消息
    const { messages, creator } = await this.AiChatService.getAiChatDetailData(
      sessionKey
    );
    if (creator && creator !== this.ctx.user.workid)
      throw new Error('禁止操作他人的会话');
    // 2. 开始对话
    let chatResult = ``;
    const reqTime = moment().format('YYYY-MM-DD HH:mm:ss');
    const stream = await this.AiChatService.sendChatMessageByStream(
      messages,
      input,
      modelType
    );
    // 3.处理响应
    // 3.1 设置客户端响应头,这里是返回给浏览器
    this.ctx.status = 200;
    this.ctx.set({
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    });
    // 3.2 监听数据变化
    stream.on('data', chunk => {
      const textData = chunk.toString();
      const formatData = streamFormater(textData, modelType);
      chatResult = formatData;
      this.ctx.res.write(JSON.stringify(this.HttpReturn.success(formatData)));
    });

    // 3.3 监听结束事件
    stream.on('end', async () => {
      const resTime = moment().format('YYYY-MM-DD HH:mm:ss');
      // 3.3.1 记录对话信息
      await this.AiChatService.setMessagesBySessionKey(sessionKey, {
        modelType,
        messages: [
          ...messages,
          {
            role: roleType.user,
            content: input,
            time: reqTime,
          },
          {
            role: roleType.assistant,
            content: chatResult,
            time: resTime,
          },
        ],
      });
      // 3.3.2 标记对话完成
      this.ctx.res.write(
        JSON.stringify(this.HttpReturn.success(`event-stream:end`))
      );
    });

    // 3.4 监听错误事件
    stream.on('error', err => {
      this.ctx.res.write(`event-stream:error[{err.toString()}]`);
      throw new Error(`Stream error:{err}`);
    });

    // 保持长链
    return new Promise(resolve => {
      this.ctx.req.on('end', resolve);
    });
  }

代码解析

这段代码是一个用于处理数据流对话接口的函数 sendChatMessageByStream

  1. async sendChatMessageByStream(@Body() body: sendChatMessageParams) {: 这是一个异步函数,接受一个包含请求体的参数 body

  2. 参数解构:从 body 中提取 sessionKey, input, modelType

  3. 参数验证:确保 sessionKey, input, modelType 均存在,否则抛出错误。

  4. 获取历史消息:调用 AiChatService.getAiChatDetailData 方法获取历史消息和会话创建者。

  5. 校验会话创建者:如果会话创建者不是当前用户,则抛出错误。

  6. 发起对话:调用 AiChatService.sendChatMessageByStream 方法开始对话,获取一个数据流 stream

  7. 处理响应:

    • 设置响应头:设置响应的内容类型为 text/event-stream,并且禁用缓存,保持长连接。
    • 监听数据变化:当数据流传入新的数据时,解析并格式化数据,然后将格式化后的数据写回给客户端。
    • 监听结束事件:当数据流传输结束时,记录对话信息,包括输入、对话内容、时间等,并向客户端发送对话结束的信号。
    • 监听错误事件:如果发生错误,向客户端发送错误信息,并抛出错误。
  8. 保持长链:返回一个 Promise,确保请求保持长连接。

这段代码主要是为了处理数据流式对话接口,它通过监听数据流的变化,不断地将对话内容返回给客户端,同时处理对话的开始、结束和错误等情况。

客户端接收

// 发起 POST 请求并处理 text/event-stream
import { ElMessage, ElNotification } from 'element-plus';

export function sendChatMessageByStreamAPI(payLoad, callback) {
  fetch(`/api/chat/send_chat_message_by_stream`, {
    method: 'POST',
    headers: {
      Accept: 'text/event-stream',
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(payLoad),
  })
    .then((response) => {
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let isFirst = true;
      function read() {
        reader.read().then(({ value, done }) => {
          if (done) return;
          const text = decoder.decode(value, { stream: true });
          const { data, success, message } = JSON.parse(text);
          // 1. 报错
          if (!success) {
            ElNotification({
              title: 'Error',
              message,
              type: 'error',
              dangerouslyUseHTMLString: true,
              zIndex: 9999999,
              customClass: 'network-error-notification',
            });
            callback(`event-stream:error`);
            return;
          }
          // 2. 正常开始
          if (isFirst) {
            callback(`event-stream:start`);
          }
          // 3. 正常流转
          callback(data);
          read();
          isFirst = false;
        });
      }
      read();
    })
    .catch((error) => {
      ElMessage.error(`Fetch error:',${error}`);
      console.error('Fetch error:', error);
    });
}

代码解析

这段代码是一个用于发起 POST 请求并处理 text/event-stream 类型响应的函数 sendChatMessageByStreamAPI

  1. export function sendChatMessageByStreamAPI(payLoad, callback) {: 这是一个导出的函数 sendChatMessageByStreamAPI,接受两个参数 payLoadcallback

  2. 使用 fetch 发起 POST 请求到 /api/chat/send_chat_message_by_stream,请求头包含 Accept: 'text/event-stream'Content-Type: 'application/json'

  3. 处理响应:

    • 通过 response.body.getReader() 获取一个读取器 reader
    • 创建一个解码器 decoder,用于将二进制数据解码为文本。
    • 定义一个内部函数 read() 用于读取响应数据。
    • read() 函数中,调用 reader.read() 来读取数据,然后解码为文本,并根据解码后的内容进行处理。
    • 如果读取完成,则返回;否则解析 JSON 数据,根据情况执行不同的逻辑:
      • 如果请求出现错误,弹出错误通知,并调用回调函数 callback 并传递错误信息。
      • 如果是第一次读取到数据,则调用回调函数 callback 并传递 event-stream:start
      • 对于正常的数据流转,调用回调函数 callback 并传递数据,然后递归调用 read() 继续读取数据。
  4. fetchcatch 块中处理请求错误,显示错误通知,并打印错误信息到控制台。

这段代码主要是为了发起 POST 请求,并监听来自服务器的 text/event-stream 类型响应。它使用 fetch API 发起请求,并通过读取器来处理流式响应,然后根据响应内容执行不同的逻辑。

思路梳理

  1. 客户端调用 sendChatMessageByStreamAPI 函数发起流式请求,将请求参数传递给服务器端。
  2. 服务器端接收到请求后,调用 TYQWSteamRequest 函数向远程服务器发送流式请求,并监听响应流的变化。
  3. 远程服务器根据接收到的请求,生成对应的响应流并返回给服务器端。
  4. 服务器端通过监听响应流的变化,根据不同的响应事件执行相应的操作,如记录对话信息、向客户端发送消息等。
  5. 客户端根据接收到的响应流,通过监听器处理不同的响应事件,如显示对话内容、错误通知等。

这样,整个流程实现了一个完整的流式对话系统,客户端和服务器端通过流式请求和响应进行实时通信,实现了实时的对话交互。

总结

希望这个分享能帮助大家更好的去理解通义千问流式输出的原理,并在此基础上完善自己的功能。

   
分类:玩技术 作者:荡荡, 浩浩 发表于:2024-03-22 14:23:14 阅读量:307
<<   >>


powered by kaifamiao