开发喵星球

Java调用chatGPT流式输出(2)

Java调用chatGPT流式输出(2)

昨天的文章中有朋友私信我想看service的实现思路和代码,那么这里我就大概的说一下service。

现实思路

主要有以下几个部分:

  1. 异步处理消息发送:为了确保发送消息的即时性,服务中的流式对话方法使用了异步处理。这样可以避免因为消息发送阻塞导致客户端等待过长时间。
  2. 构建对话请求:在流式对话方法中,首先构建了对话请求对象ChatCompletionRequest。该请求包括了模型类型、输入消息等信息。
  3. 建立流式连接:通过调用OpenAiServicestreamChatCompletion()方法建立与AI服务的流式连接。在这一步,服务将实时发送消息给AI服务,并接收AI服务的响应。
  4. 消息处理:服务接收到AI服务返回的消息后,将其发送给客户端。同时,服务也会将收到的消息累加到一个字符串中,以便记录完整的会话内容。
  5. 异常处理:服务在连接过程中处理可能出现的异常情况,例如连接异常或连接结束。
  6. 发送连接关闭事件:在连接结束时,服务发送连接关闭事件给客户端,让客户端主动断开连接避免重连。
  7. 构建OpenAiService:服务中包含了构建OpenAiService对象的私有方法,用于构建与OpenAI服务的连接。在这一步中,服务配置了HTTP代理,并构建了HTTP客户端,以及相应的Retrofit实例和OpenAiService对象。

以上是该服务的主要实现思路,通过这些步骤,服务能够与AI服务建立流式连接,并处理来自AI服务的响应,最终将消息发送给客户端。

实现过程

package org.example.chatgpt.service;

import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.theokanning.openai.OpenAiApi;
import com.theokanning.openai.completion.chat.ChatCompletionChoice;
import com.theokanning.openai.completion.chat.ChatCompletionRequest;
import com.theokanning.openai.completion.chat.ChatMessage;
import com.theokanning.openai.completion.chat.ChatMessageRole;
import com.theokanning.openai.service.OpenAiService;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import retrofit2.Retrofit;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import static com.theokanning.openai.service.OpenAiService.*;

/**
 * 会话服务
 */
@Service
public class ChatService {

    private static final Logger LOG = LoggerFactory.getLogger(ChatService.class);

    String token = "sk-xxx";
    String proxyHost = "127.0.0.1";
    int proxyPort = 7890;

    /**
     * 流式对话
     * 注:必须使用异步处理(否则发送消息不会及时返回前端)
     *
     * @param prompt     输入消息
     * @param sseEmitter SSE对象
     */
    @Async
    public void streamChatCompletion(String prompt, SseEmitter sseEmitter) {
        LOG.info("发送消息:" + prompt);
        final List<ChatMessage> messages = new ArrayList<>();
        final ChatMessage systemMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), prompt);
        messages.add(systemMessage);
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest
                .builder()
                .model("gpt-3.5-turbo")
                .messages(messages)
                .n(1)
//                .maxTokens(500)
                .logitBias(new HashMap<>())
                .build();


        //流式对话(逐Token返回)
        StringBuilder receiveMsgBuilder = new StringBuilder();
        OpenAiService service = buildOpenAiService(token, proxyHost, proxyPort);
        service.streamChatCompletion(chatCompletionRequest)
                //正常结束
                .doOnComplete(() -> {
                    LOG.info("连接结束");

                    //发送连接关闭事件,让客户端主动断开连接避免重连
                    sendStopEvent(sseEmitter);

                    //完成请求处理
                    sseEmitter.complete();
                })
                //异常结束
                .doOnError(throwable -> {
                    LOG.error("连接异常", throwable);

                    //发送连接关闭事件,让客户端主动断开连接避免重连
                    sendStopEvent(sseEmitter);

                    //完成请求处理携带异常
                    sseEmitter.completeWithError(throwable);
                })
                //收到消息后转发到浏览器
                .blockingForEach(x -> {
                    ChatCompletionChoice choice = x.getChoices().get(0);
                    LOG.debug("收到消息:" + choice);
                    if (StrUtil.isEmpty(choice.getFinishReason())) {
                        //未结束时才可以发送消息(结束后,先调用doOnComplete然后还会收到一条结束消息,因连接关闭导致发送消息失败:ResponseBodyEmitter has already completed)
                        sseEmitter.send(choice.getMessage());
                    }
                    String content = choice.getMessage().getContent();
                    content = content == null ? StrUtil.EMPTY : content;
                    receiveMsgBuilder.append(content);
                });
        LOG.info("收到的完整消息:" + receiveMsgBuilder);
    }

    /**
     * 发送连接关闭事件,让客户端主动断开连接避免重连
     *
     * @param sseEmitter
     * @throws IOException
     */
    private static void sendStopEvent(SseEmitter sseEmitter) throws IOException {
        sseEmitter.send(SseEmitter.event().name("stop").data(""));
    }


    /**
     * 构建OpenAiService
     *
     * @param token     API_KEY
     * @param proxyHost 代理域名
     * @param proxyPort 代理端口号
     * @return OpenAiService
     */
    private OpenAiService buildOpenAiService(String token, String proxyHost, int proxyPort) {
        //构建HTTP代理
        Proxy proxy = null;
        if (StrUtil.isNotBlank(proxyHost)) {
            proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
        }
        //构建HTTP客户端
        OkHttpClient client = defaultClient(token, Duration.of(60, ChronoUnit.SECONDS))
                .newBuilder()
                .proxy(proxy)
                .build();
        ObjectMapper mapper = defaultObjectMapper();
        Retrofit retrofit = defaultRetrofit(client, mapper);
        OpenAiApi api = retrofit.create(OpenAiApi.class);
        OpenAiService service = new OpenAiService(api, client.dispatcher().executorService());
        return service;
    }
}

代码解析

让我们来一起逐行来分析一下这个代码

@Service
public class ChatService {

    private static final Logger LOG = LoggerFactory.getLogger(ChatService.class);

    String token = "sk-xxx";
    String proxyHost = "127.0.0.1";
    int proxyPort = 7890;

    /**
     * 流式对话
     * 注:必须使用异步处理(否则发送消息不会及时返回前端)
     *
     * @param prompt     输入消息
     * @param sseEmitter SSE对象
     */
    @Async
    public void streamChatCompletion(String prompt, SseEmitter sseEmitter) {
        LOG.info("发送消息:" + prompt);
        final List<ChatMessage> messages = new ArrayList<>();
        final ChatMessage systemMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), prompt);
        messages.add(systemMessage);
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest
                .builder()
                .model("gpt-3.5-turbo")
                .messages(messages)
                .n(1)
//                .maxTokens(500)
                .logitBias(new HashMap<>())
                .build();
        //流式对话(逐Token返回)
        StringBuilder receiveMsgBuilder = new StringBuilder();
        OpenAiService service = buildOpenAiService(token, proxyHost, proxyPort);
        service.streamChatCompletion(chatCompletionRequest)
                //正常结束
                .doOnComplete(() -> {
                    LOG.info("连接结束");

                    //发送连接关闭事件,让客户端主动断开连接避免重连
                    sendStopEvent(sseEmitter);

                    //完成请求处理
                    sseEmitter.complete();
                })
                //异常结束
                .doOnError(throwable -> {
                    LOG.error("连接异常", throwable);

                    //发送连接关闭事件,让客户端主动断开连接避免重连
                    sendStopEvent(sseEmitter);

                    //完成请求处理携带异常
                    sseEmitter.completeWithError(throwable);
                })
                //收到消息后转发到浏览器
                .blockingForEach(x -> {
                    ChatCompletionChoice choice = x.getChoices().get(0);
                    LOG.debug("收到消息:" + choice);
                    if (StrUtil.isEmpty(choice.getFinishReason())) {
                        //未结束时才可以发送消息(结束后,先调用doOnComplete然后还会收到一条结束消息,因连接关闭导致发送消息失败:ResponseBodyEmitter has already completed)
                        sseEmitter.send(choice.getMessage());
                    }
                    String content = choice.getMessage().getContent();
                    content = content == null ? StrUtil.EMPTY : content;
                    receiveMsgBuilder.append(content);
                });
        LOG.info("收到的完整消息:" + receiveMsgBuilder);
    }

    /**
     * 发送连接关闭事件,让客户端主动断开连接避免重连
     *
     * @param sseEmitter
     * @throws IOException
     */
    private static void sendStopEvent(SseEmitter sseEmitter) throws IOException {
        sseEmitter.send(SseEmitter.event().name("stop").data(""));
    }
     * @param token     API_KEY
     * @param proxyHost 代理域名
     * @param proxyPort 代理端口号
     * @return OpenAiService
     */
    private OpenAiService buildOpenAiService(String token, String proxyHost, int proxyPort) {
        //构建HTTP代理
        Proxy proxy = null;
        if (StrUtil.isNotBlank(proxyHost)) {
            proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
        }
        //构建HTTP客户端
        OkHttpClient client = defaultClient(token, Duration.of(60, ChronoUnit.SECONDS))
                .newBuilder()
                .proxy(proxy)
                .build();
        ObjectMapper mapper = defaultObjectMapper();
        Retrofit retrofit = defaultRetrofit(client, mapper);
        OpenAiApi api = retrofit.create(OpenAiApi.class);
        OpenAiService service = new OpenAiService(api, client.dispatcher().executorService());
        return service;
    }

这就是对 ChatService 类中方法的详细解析,它实现了与OpenAI服务进行流式对话的功能,同时处理了异常情况,并通过异步处理和记录日志等方式保证了服务的稳定性和可靠性。

   
分类:玩技术 作者:荡荡, 浩浩 发表于:2024-03-25 14:49:18 阅读量:205
<<   >>


powered by kaifamiao