昨天的文章中有朋友私信我想看service的实现思路和代码,那么这里我就大概的说一下service。
主要有以下几个部分:
ChatCompletionRequest
。该请求包括了模型类型、输入消息等信息。
OpenAiService
的streamChatCompletion()
方法建立与AI服务的流式连接。在这一步,服务将实时发送消息给AI服务,并接收AI服务的响应。
以上是该服务的主要实现思路,通过这些步骤,服务能够与AI服务建立流式连接,并处理来自AI服务的响应,最终将消息发送给客户端。
package org.example.chatgpt.service;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.theokanning.openai.OpenAiApi;
import com.theokanning.openai.completion.chat.ChatCompletionChoice;
import com.theokanning.openai.completion.chat.ChatCompletionRequest;
import com.theokanning.openai.completion.chat.ChatMessage;
import com.theokanning.openai.completion.chat.ChatMessageRole;
import com.theokanning.openai.service.OpenAiService;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import retrofit2.Retrofit;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static com.theokanning.openai.service.OpenAiService.*;
/**
* 会话服务
*/
@Service
public class ChatService {
private static final Logger LOG = LoggerFactory.getLogger(ChatService.class);
String token = "sk-xxx";
String proxyHost = "127.0.0.1";
int proxyPort = 7890;
/**
* 流式对话
* 注:必须使用异步处理(否则发送消息不会及时返回前端)
*
* @param prompt 输入消息
* @param sseEmitter SSE对象
*/
@Async
public void streamChatCompletion(String prompt, SseEmitter sseEmitter) {
LOG.info("发送消息:" + prompt);
final List<ChatMessage> messages = new ArrayList<>();
final ChatMessage systemMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), prompt);
messages.add(systemMessage);
ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest
.builder()
.model("gpt-3.5-turbo")
.messages(messages)
.n(1)
// .maxTokens(500)
.logitBias(new HashMap<>())
.build();
//流式对话(逐Token返回)
StringBuilder receiveMsgBuilder = new StringBuilder();
OpenAiService service = buildOpenAiService(token, proxyHost, proxyPort);
service.streamChatCompletion(chatCompletionRequest)
//正常结束
.doOnComplete(() -> {
LOG.info("连接结束");
//发送连接关闭事件,让客户端主动断开连接避免重连
sendStopEvent(sseEmitter);
//完成请求处理
sseEmitter.complete();
})
//异常结束
.doOnError(throwable -> {
LOG.error("连接异常", throwable);
//发送连接关闭事件,让客户端主动断开连接避免重连
sendStopEvent(sseEmitter);
//完成请求处理携带异常
sseEmitter.completeWithError(throwable);
})
//收到消息后转发到浏览器
.blockingForEach(x -> {
ChatCompletionChoice choice = x.getChoices().get(0);
LOG.debug("收到消息:" + choice);
if (StrUtil.isEmpty(choice.getFinishReason())) {
//未结束时才可以发送消息(结束后,先调用doOnComplete然后还会收到一条结束消息,因连接关闭导致发送消息失败:ResponseBodyEmitter has already completed)
sseEmitter.send(choice.getMessage());
}
String content = choice.getMessage().getContent();
content = content == null ? StrUtil.EMPTY : content;
receiveMsgBuilder.append(content);
});
LOG.info("收到的完整消息:" + receiveMsgBuilder);
}
/**
* 发送连接关闭事件,让客户端主动断开连接避免重连
*
* @param sseEmitter
* @throws IOException
*/
private static void sendStopEvent(SseEmitter sseEmitter) throws IOException {
sseEmitter.send(SseEmitter.event().name("stop").data(""));
}
/**
* 构建OpenAiService
*
* @param token API_KEY
* @param proxyHost 代理域名
* @param proxyPort 代理端口号
* @return OpenAiService
*/
private OpenAiService buildOpenAiService(String token, String proxyHost, int proxyPort) {
//构建HTTP代理
Proxy proxy = null;
if (StrUtil.isNotBlank(proxyHost)) {
proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
}
//构建HTTP客户端
OkHttpClient client = defaultClient(token, Duration.of(60, ChronoUnit.SECONDS))
.newBuilder()
.proxy(proxy)
.build();
ObjectMapper mapper = defaultObjectMapper();
Retrofit retrofit = defaultRetrofit(client, mapper);
OpenAiApi api = retrofit.create(OpenAiApi.class);
OpenAiService service = new OpenAiService(api, client.dispatcher().executorService());
return service;
}
}
让我们来一起逐行来分析一下这个代码
@Service
public class ChatService {
private static final Logger LOG = LoggerFactory.getLogger(ChatService.class);
String token = "sk-xxx";
String proxyHost = "127.0.0.1";
int proxyPort = 7890;
/**
* 流式对话
* 注:必须使用异步处理(否则发送消息不会及时返回前端)
*
* @param prompt 输入消息
* @param sseEmitter SSE对象
*/
@Async
public void streamChatCompletion(String prompt, SseEmitter sseEmitter) {
@Service
注解将 ChatService
类标记为 Spring 的服务类。Logger LOG
是用来记录日志的静态 Logger 对象。token
、proxyHost
和 proxyPort
是用于与AI服务进行通信的凭据和代理设置。streamChatCompletion
方法用于进行流式对话。@Async
注解表示这个方法会异步执行,以避免阻塞主线程。 LOG.info("发送消息:" + prompt);
final List<ChatMessage> messages = new ArrayList<>();
final ChatMessage systemMessage = new ChatMessage(ChatMessageRole.SYSTEM.value(), prompt);
messages.add(systemMessage);
ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest
.builder()
.model("gpt-3.5-turbo")
.messages(messages)
.n(1)
// .maxTokens(500)
.logitBias(new HashMap<>())
.build();
ChatCompletionRequest
对象,包括了对话所需的信息,例如模型类型、消息列表等。 //流式对话(逐Token返回)
StringBuilder receiveMsgBuilder = new StringBuilder();
OpenAiService service = buildOpenAiService(token, proxyHost, proxyPort);
StringBuilder
对象,用于存储接收到的消息。buildOpenAiService
方法创建了一个 OpenAiService
对象,用于与AI服务建立连接。 service.streamChatCompletion(chatCompletionRequest)
//正常结束
.doOnComplete(() -> {
LOG.info("连接结束");
//发送连接关闭事件,让客户端主动断开连接避免重连
sendStopEvent(sseEmitter);
//完成请求处理
sseEmitter.complete();
})
service.streamChatCompletion()
方法与AI服务建立流式连接,并处理连接正常结束的情况。 //异常结束
.doOnError(throwable -> {
LOG.error("连接异常", throwable);
//发送连接关闭事件,让客户端主动断开连接避免重连
sendStopEvent(sseEmitter);
//完成请求处理携带异常
sseEmitter.completeWithError(throwable);
})
//收到消息后转发到浏览器
.blockingForEach(x -> {
ChatCompletionChoice choice = x.getChoices().get(0);
LOG.debug("收到消息:" + choice);
if (StrUtil.isEmpty(choice.getFinishReason())) {
//未结束时才可以发送消息(结束后,先调用doOnComplete然后还会收到一条结束消息,因连接关闭导致发送消息失败:ResponseBodyEmitter has already completed)
sseEmitter.send(choice.getMessage());
}
String content = choice.getMessage().getContent();
content = content == null ? StrUtil.EMPTY : content;
receiveMsgBuilder.append(content);
});
LOG.info("收到的完整消息:" + receiveMsgBuilder);
}
/**
* 发送连接关闭事件,让客户端主动断开连接避免重连
*
* @param sseEmitter
* @throws IOException
*/
private static void sendStopEvent(SseEmitter sseEmitter) throws IOException {
sseEmitter.send(SseEmitter.event().name("stop").data(""));
}
StringBuilder
中,以便记录完整的会话内容。 * @param token API_KEY
* @param proxyHost 代理域名
* @param proxyPort 代理端口号
* @return OpenAiService
*/
private OpenAiService buildOpenAiService(String token, String proxyHost, int proxyPort) {
//构建HTTP代理
Proxy proxy = null;
if (StrUtil.isNotBlank(proxyHost)) {
proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
}
buildOpenAiService
方法用于构建与OpenAI服务的连接。 //构建HTTP客户端
OkHttpClient client = defaultClient(token, Duration.of(60, ChronoUnit.SECONDS))
.newBuilder()
.proxy(proxy)
.build();
ObjectMapper mapper = defaultObjectMapper();
Retrofit retrofit = defaultRetrofit(client, mapper);
OpenAiApi api = retrofit.create(OpenAiApi.class);
OpenAiService service = new OpenAiService(api, client.dispatcher().executorService());
return service;
}
ObjectMapper
对象和 Retrofit
对象,用于将HTTP请求的响应转换为Java对象。Retrofit
创建了一个 OpenAiApi
实例,用于发送HTTP请求到OpenAI服务。OpenAiService
对象,该对象封装了与OpenAI服务通信的方法和逻辑,并返回该对象。这就是对 ChatService
类中方法的详细解析,它实现了与OpenAI服务进行流式对话的功能,同时处理了异常情况,并通过异步处理和记录日志等方式保证了服务的稳定性和可靠性。
powered by kaifamiao