本文整理了主要几个大模型提供商的REST API 用法,方便学习使用
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
class OpenaiLLMs(object):
def __init__(self):
self.models_map={}
self.models_map['Openai-gpt-3.5-turbo-1106'] ={"name":"gpt-3.5-turbo-1106","model":"chat"}
self.models_map['Openai-text-davinci-003'] ={"name":"text-davinci-003","model":"completions"}
self.models_map['Openai-gpt-3.5-turbo-0301'] ={"name":"gpt-3.5-turbo-0301","model":"chat"}
self.models_map['Openai-davinci-instruct-beta'] ={"name":"davinci-instruct-beta","model":"completions"}
self.models_map['Openai-gpt-3.5-turbo-16k-0613'] ={"name":"gpt-3.5-turbo-16k-0613","model":"chat"}
self.models_map['Openai-gpt-3.5-turbo-instruct'] ={"name":"gpt-3.5-turbo-instruct","model":"completions"}
self.models_map['Openai-gpt-3.5-turbo-16k'] ={"name":"gpt-3.5-turbo-16k","model":"chat"}
self.models_map['Openai-text-davinci-001'] ={"name":"text-davinci-001","model":"completions"}
self.models_map['Openai-gpt-3.5-turbo-0613'] ={"name":"gpt-3.5-turbo-0613","model":"chat"}
self.models_map['Openai-gpt-3.5-turbo-instruct-0914'] ={"name":"gpt-3.5-turbo-instruct-0914","model":"completions"}
self.models_map['Openai-gpt-3.5-turbo'] ={"name":"gpt-3.5-turbo","model":"chat"}
self.models_map['Openai-text-davinci-002'] ={"name":"text-davinci-002","model":"completions"}
def models(self):
return self.models_map.keys()
def request(self,question,model_name):
if model_name not in self.models_map:
return None
key=os.getenv("OPENAI_API_KEY")
headers = {
'Authorization': f'Bearer {key}',
'Content-Type':'application/json'
}
if self.models_map[model_name]['model']=="chat":
url = "https://api.openai.com/v1/chat/completions"
query = {
"model":self.models_map[model_name]['name'],
"messages":[
{"role":"user","content":question}
],
"temperature":0.1,
"top_p":0.85
}
else:
url = "https://api.openai.com/v1/completions"
query = {
"model":self.models_map[model_name]['name'],
"prompt":question,
"temperature":0.1,
"top_p":0.85
}
for i in range(2):
fp = StringIO()
try:
response = requests.post(url=url, data=json.dumps(query), headers=headers,timeout=60)
if response.status_code == 200:
if self.models_map[model_name]['model']=="chat":
content=response.json()['choices'][0]['message']["content"].strip()
else:
content=response.json()['choices'][0]['text'].strip()
return model_name,content
else:
print("请求失败,状态码:", response.status_code)
print("请求失败,body:", response.text)
time.sleep(30)
except:
traceback.print_exc(file=fp)
print(fp.getvalue(),response)
time.sleep(30)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=OpenaiLLMs()
for model_name in infer.models():
result=infer.request(question,'Openai-gpt-3.5-turbo-1106')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n llm python=3.9
conda activate llm
pip install python-dotenv requests
pip install openai==0.28
python llm.py
输出
import cv2
import subprocess
# 接收rtsp音视频数据
rtsp_url = 'rtsp://example.com/live/stream'
cap = cv2.VideoCapture(rtsp_url)
# 将音视频保存为mp4格式
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, 20.0, (640, 480))
# 当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if ret:
out.write(frame)
frame_count += 1
if frame_count % (5*60*20) == 0: # 5分钟的帧数
out.release()
out = cv2.VideoWriter('output_{}.mp4'.format(frame_count), fourcc, 20.0, (640, 480))
else:
break
# 将音视频推送到rtmp://127.0.0.1/live
rtmp_url = 'rtmp://127.0.0.1/live'
command = ['ffmpeg', '-i', 'output.mp4', '-c', 'copy', '-f', 'flv', rtmp_url]
subprocess.call(command)
cap.release()
out.release()
cv2.destroyAllWindows()
请注意,上述代码中使用了OpenCV库来处理音视频数据,同时使用了ffmpeg来推送音视频到rtmp服务器。在运行代码之前,请确保已经安装了OpenCV和ffmpeg,并且已经正确配置了环境变量。
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
class AzureLLMs(object):
def __init__(self):
self.models_map={}
self.models_map['Azure-gpt-35-turbo']={"engine":"xxx","version":"2023-07-01-preview"}
self.models_map['Azure-gpt-35-turbo-16k']={"engine":"xxxx","version":"2023-07-01-preview"}
def models(self):
return self.models_map.keys()
def request(self,question,model_name):
if model_name not in self.models_map:
return None
import openai
openai.api_type =
openai.api_base =
openai.api_version = self.models_map[model_name]['version']
openai.api_key =
for i in range(2):
fp = StringIO()
try:
response = openai.ChatCompletion.create(
engine=self.models_map[model_name]['engine'],
messages = [{"role":"system","content":"You are an AI assistant that helps people find information."},
{'role':'user','content':question}],
temperature=0.1,
#max_tokens=800,
top_p=0.85,
frequency_penalty=0,
presence_penalty=0,
stop=None)
message = response["choices"][0]["message"]["content"]
return model_name,message
except:
traceback.print_exc(file=fp)
print(fp.getvalue())
time.sleep(10)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=AzureLLMs()
for model_name in infer.models():
result=infer.request(question,'Azure-gpt-35-turbo-16k')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n llm python=3.9
conda activate llm
pip install python-dotenv requests
pip install openai==0.28
python llm.py
输出
要实现这些功能,你可以使用Python中的OpenCV库和FFmpeg工具。下面是一个简单的实现示例:
```python
import cv2
import subprocess
def save_video(rtsp_url, output_path):
cap = cv2.VideoCapture(rtsp_url)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
current_file = None
current_writer = None
current_duration = 0
while True:
ret, frame = cap.read()
if not ret:
break
if current_file is None or current_duration >= 300:
if current_writer is not None:
current_writer.release()
push_to_rtmp(current_file)
current_file = output_path + f"/output_{int(time.time())}.mp4"
current_writer = cv2.VideoWriter(current_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
current_duration = 0
current_writer.write(frame)
current_duration += 1 / fps
if current_writer is not None:
current_writer.release()
push_to_rtmp(current_file)
cap.release()
def push_to_rtmp(file_path):
subprocess.call(['ffmpeg', '-re', '-i', file_path, '-c:v', 'copy', '-c:a', 'copy', 'rtmp://127.0.0.1/live'])
# 使用示例
save_video('rtsp://your_rtsp_url', 'output_directory')
```
请确保你已经安装了OpenCV和FFmpeg,并将`rtsp://your_rtsp_url`替换为实际的RTSP URL,将`output_directory`替换为保存输出文件的目录。
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
class GoogleLLMs(object):
def __init__(self):
self.models_map={}
self.models_map['Google-gemini-pro']="gemini-pro"
def models(self):
return self.models_map.keys()
def request(self,question,model_name):
import google.generativeai as genai
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
generation_config = {
"temperature": 0.1,
"top_p": 0.85,
"top_k": 4,
}
if model_name not in self.models_map:
return None
for i in range(2):
fp = StringIO()
try:
model = genai.GenerativeModel(self.models_map[model_name],generation_config=generation_config)
response = model.generate_content(question)
return model_name,response.text
except:
traceback.print_exc(file=fp)
logger.info(fp.getvalue())
time.sleep(10)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=GoogleLLMs()
for model_name in infer.models():
result=infer.request(question,'Google-gemini-pro')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n google python=3.9
conda activate google
pip install python-dotenv requests google-generativeai
pip install protobuf==3.19
python llm.py
输出
```python
import cv2
import ffmpeg
import time
import os
# RTSP URL
rtsp_url = 'rtsp://192.168.1.100:554/live/ch00_0'
# Output MP4 file path
output_file = 'output.mp4'
# RTMP URL
rtmp_url = 'rtmp://127.0.0.1/live'
# Create a VideoCapture object
cap = cv2.VideoCapture(rtsp_url)
# Check if the VideoCapture object was successfully created
if not cap.isOpened():
print("Error: Could not open RTSP stream.")
exit()
# Create a VideoWriter object for the output MP4 file
writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), 30, (640, 480))
# Create a FFmpeg instance for RTMP streaming
ffmpeg_process = ffmpeg.input(rtsp_url).output(rtmp_url, vcodec='libx264', acodec='aac').run_async(pipe_stdin=True)
# Start the RTMP streaming
ffmpeg_process.stdin.write(b'G')
# Initialize the start time and the current file duration
start_time = time.time()
current_duration = 0
# Loop over the frames from the RTSP stream
while True:
# Read the next frame from the RTSP stream
ret, frame = cap.read()
# Check if the frame was successfully read
if not ret:
break
# Write the frame to the output MP4 file
writer.write(frame)
# Update the current file duration
current_duration = time.time() - start_time
# Check if the current file duration is greater than 5 minutes
if current_duration > 300:
# Close the current MP4 file
writer.release()
# Create a new MP4 file
output_file = 'output_{}.mp4'.format(int(time.time()))
writer = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), 30, (640, 480))
# Reset the start time and the current file duration
start_time = time.time()
current_duration = 0
# Release the VideoCapture object
cap.release()
# Close the output MP4 file
writer.release()
# Stop the RTMP streaming
ffmpeg_process.stdin.write(b'q')
ffmpeg_process.wait()
# Delete the MP4 files that are older than 5 minutes
for file in os.listdir('.'):
if file.endswith('.mp4'):
file_time = os.path.getmtime(file)
if time.time() - file_time > 300:
os.remove(file)
```
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
class BaiduLLMs(object):
'''
https://cloud.baidu.com/doc/WENXINWORKSHOP/s/clntwmv7t
'''
def __init__(self):
self.models_map={}
self.models_map['Baidu-ERNIE-Bot-4']="chat/completions_pro"
self.models_map['Baidu-ERNIE-Bot-8K']="chat/ernie_bot_8k"
self.models_map['Baidu-ERNIE-Bot']="chat/completions"
self.models_map['Baidu-ERNIE-Bot-turbo']="chat/eb-instant"
self.models_map['Baidu-ERNIE-Bot-turbo-AI']="chat/ai_apaas"
self.models_map['Baidu-BLOOMZ-7B']="chat/bloomz_7b1"
self.models_map['Baidu-Qianfan-BLOOMZ-7B-compressed'] ="chat/qianfan_bloomz_7b_compressed"
self.models_map['Baidu-Llama-2-7b-chat']="chat/llama_2_7b"
self.models_map['Baidu-Llama-2-13b-chat']="chat/llama_2_13b"
self.models_map['Baidu-Llama-2-70b-chat']="chat/llama_2_70b"
self.models_map['Baidu-Qianfan-Chinese-Llama-2-7B']="chat/qianfan_chinese_llama_2_7b"
self.models_map['Baidu-Qianfan-Chinese-Llama-2-13B']="chat/qianfan_chinese_llama_2_13b"
self.models_map['Baidu-ChatGLM2-6B-32K']="chat/chatglm2_6b_32k"
self.models_map['Baidu-XuanYuan-70B-Chat-4bit']="chat/xuanyuan_70b_chat"
self.models_map['Baidu-AquilaChat-7B']="chat/aquilachat_7b"
def get_access_token(self):
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials",
"client_id": os.getenv("BAIDU_API_KEY"),
"client_secret": os.getenv("BAIDU_SECRET_KEY")}
return str(requests.post(url, params=params).json().get("access_token"))
def models(self):
return self.models_map.keys()
def request(self,question,model_name):
if model_name not in self.models_map:
return None
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/{}?access_token={}".format(self.models_map[model_name],self.get_access_token())
payload = json.dumps({
"messages": [
{
"role": "user",
"content": question
}
],
"disable_search": False,
"enable_citation": False,
"stream": False,
"temperature":0.1,
"top_p":0.85,
"penalty_score":1
})
headers = {
'Content-Type': 'application/json'
}
for i in range(2):
fp = StringIO()
try:
response = requests.request("POST", url, headers=headers, data=payload,timeout=30)
if response.status_code == 200:
result = response.json()["result"]
return model_name,result
else:
logger.info("请求失败,状态码:", response.status_code)
logger.info("请求失败,body:", response.text)
time.sleep(10)
except:
traceback.print_exc(file=fp)
print(fp.getvalue(),model_name)
time.sleep(10)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=BaiduLLMs()
for model_name in infer.models():
result=infer.request(question,'Baidu-ERNIE-Bot-8K')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests
python llm.py
输出
要实现上述功能,我们需要使用一些Python库,如`ffmpeg`和`rtmpdump`。首先,确保你已经安装了这些库。如果没有,你可以使用pip来安装:
```bash
pip install ffmpeg-python
```
以下是一个简单的Python脚本,它实现了你描述的功能:
```python
import ffmpeg
import os
import time
def save_to_mp4(input_stream, output_file, duration):
if duration > 5 * 60: # 超过5分钟
# 遇到关键帧时关闭文件
if input_stream.info().key_frame == 1:
input_stream.close()
return
input_stream.write_to(output_file)
def process_rtsp(rtsp_url, output_dir):
input_stream = ffmpeg.input(rtsp_url)
video, audio = ffmpeg.select_streams(input_stream, 'v', 'a')
duration = 0
start_time = time.time()
while True:
packet = video.read()
if not packet: # 读取完毕
break
duration += time.time() - start_time
start_time = time.time()
filename = os.path.join(output_dir, f"output_{int(time.time())}.mp4")
save_to_mp4(video, filename, duration)
# 将音视频推送到rtmp服务器
ffmpeg.input(filename).output('rtmp://127.0.0.1/live/stream', format='flv').run()
os.remove(filename) # 删除已处理文件
if __name__ == "__main__":
rtsp_url = "你的RTSP地址" # 替换为你的RTSP地址
output_dir = "输出目录" # 替换为你希望保存MP4文件的目录
process_rtsp(rtsp_url, output_dir)
```
注意:这个脚本使用了`ffmpeg-python`库来处理RTSP流,并使用`ffmpeg`命令行工具来处理MP4文件和推送到RTMP服务器。确保你的系统上已经安装了`ffmpeg`。此外,这个脚本会持续读取RTSP流,并将数据保存为MP4文件,然后推送到RTMP服务器。当文件时长超过5分钟且遇到关键帧时,它会关闭文件并创建新文件。最后,它删除了已处理文件以节省空间。
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
class AliLLMs(object):
'''
https://help.aliyun.com/zh/dashscope/developer-reference/api-details?spm=a2c4g.11186623.0.i2
'''
def __init__(self):
self.models_map={}
self.models_map['Ali-qwen-turbo']="qwen-turbo"
self.models_map['Ali-qwen-plus']="qwen-plus"
self.models_map['Ali-qwen-max']="qwen-max"
self.models_map['Ali-qwen-1.8b-chat']="qwen-1.8b-chat"
self.models_map['Ali-qwen-7b-chat']="qwen-7b-chat"
self.models_map['Ali-qwen-14b-chat']="qwen-14b-chat"
self.models_map['Ali-qwen-72b-chat'] ="qwen-72b-chat"
self.models_map['Ali-chatglm-6b-v2']="chatglm-6b-v2"
self.models_map['Ali-chatglm3-6b']="chatglm3-6b"
self.models_map['Ali-baichuan-7b-v1']="baichuan-7b-v1"
self.models_map['Ali-baichuan2-7b-chat-v1']="baichuan2-7b-chat-v1"
#self.models_map['Ali-dolly-12b-v2']="dolly-12b-v2"
def models(self):
return self.models_map.keys()
def request(self,question,model_name):
import dashscope
from http import HTTPStatus
dashscope.api_key=os.getenv("DASHSCOPE_API_KEY")
if model_name not in self.models_map:
logger.info("not find")
return None
for i in range(2):
fp = StringIO()
try:
if model_name in ["Ali-chatglm-6b-v2","Ali-baichuan-7b-v1","Ali-dolly-12b-v2"]:
if model_name in ["Ali-baichuan-7b-v1","Ali-dolly-12b-v2"]:
response = dashscope.Generation.call(
model=self.models_map[model_name],
prompt=question
)
else:
response = dashscope.Generation.call(
model=self.models_map[model_name],
prompt=question,
history=[]
)
if response.status_code == HTTPStatus.OK:
return model_name,response.output['text']
else:
logger.info('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.output
))
time.sleep(10)
else:
response = dashscope.Generation.call(
model=self.models_map[model_name],
prompt=question,
seed=1234,
top_p=0.85,
top_k=4,
result_format='message',
enable_search=False,
#max_tokens=1500,
temperature=0.1,
repetition_penalty=1.0,
stream=False
)
if response.status_code == HTTPStatus.OK:
return model_name,response.output.choices[0]['message']['content']
else:
logger.info('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
response.request_id, response.status_code,
response.code, response.message
))
time.sleep(10)
except:
traceback.print_exc(file=fp)
print(fp.getvalue())
time.sleep(10)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=AliLLMs()
for model_name in infer.models():
result=infer.request(question,'Ali-qwen-max')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests dashscope
python llm.py
输出
在Python中,你可以使用OpenCV和moviepy库来实现这个需求。但是需要注意的是,OpenCV并不直接支持RTSP流的音频保存,所以这里只处理视频部分。对于音频部分,你可能需要结合其他库如gstreamer或者ffmpeg进行更深度的开发。以下是一个简化的示例:
```python
import cv2
import time
from moviepy.editor import VideoFileClip
# 1. 接收rtsp音视频数据
cap = cv2.VideoCapture('rtsp://your_rtsp_stream')
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 使用mp4v编解码器
current_file_index = 0
start_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
# 2. 将视频保存为mp4格式
current_time = time.time()
duration = current_time - start_time
if duration > 300: # 当超过5分钟(300秒)
if cv2.waitKey(1) & 0xFF == ord('q'): # 检查是否按下'q'键退出
break
# 关闭当前文件并创建新文件
out.release()
current_file_index += 1
file_name = f"output_{current_file_index}.mp4"
out = cv2.VideoWriter(file_name, fourcc, 20.0, (frame.shape[1], frame.shape[0])) # 假设帧率为20fps
start_time = current_time
# 写入视频帧到当前的VideoWriter对象
out.write(frame)
# 在这里添加关键帧检测逻辑,但OpenCV并没有直接提供关键帧检测API,
# 通常需要通过FFmpeg或其他方式解析H.264等编码的NALU单元获取关键帧信息。
# 释放资源
cap.release()
out.release()
# 3. 将音视频推送到rtmp://127.0.0.1/live
# 这部分较为复杂,通常会借助ffmpeg命令行工具完成,Python中可以调用os.system执行命令:
# os.system("ffmpeg -re -i output.mp4 -c copy -f flv rtmp://127.0.0.1/live")
# 若要在Python中完全控制推流过程,可以使用诸如libav或pyav这样的库,但这超出了本问题的范围。
```
注意:上述代码仅处理了视频录制与分段存储的部分,并未涉及音频及RTMP推送,实际项目中可能需要配合FFmpeg或其他工具完成全部功能。
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket #
answer = ""
class Ws_Param(object):
# 初始化
def __init__(self, APPID, APIKey, APISecret, Spark_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(Spark_url).netloc
self.path = urlparse(Spark_url).path
self.Spark_url = Spark_url
# 生成url
def create_url(self):
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": self.host
}
# 拼接鉴权参数,生成url
url = self.Spark_url + '?' + urlencode(v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
return url
# 收到websocket错误的处理
def on_error(ws, error):
print("### error:", error)
# 收到websocket关闭的处理
def on_close(ws,one,two):
print(" ")
# 收到websocket连接建立的处理
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
ws.send(data)
# 收到websocket消息的处理
def on_message(ws, message):
# print(message)
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
#print(content,end ="")
global answer
answer += content
# print(1)
if status == 2:
ws.close()
def gen_params(appid, domain,question):
"""
通过appid和用户的提问来生成请参数
"""
data = {
"header": {
"app_id": appid,
"uid": "1234"
},
"parameter": {
"chat": {
"domain": domain,
"temperature": 0.1,
"top_k":4,
"max_tokens": 2048,#
"auditing": "default"
}
},
"payload": {
"message": {
"text": question
}
}
}
return data
def SparkAPI(appid, api_key, api_secret, Spark_url,domain, question):
wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.question = question
ws.domain = domain
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
class XfLLMs(object):
'''
https://www.xfyun.cn/doc/spark/Web.html#_1-接口说明
'''
def __init__(self):
self.models_map={}
self.models_map['Xf-Spark-v1.5']={"domain":"general","url":"ws://spark-api.xf-yun.com/v1.1/chat"}
self.models_map['Xf-Spark-v3.1']={"domain":"generalv3","url":"ws://spark-api.xf-yun.com/v3.1/chat"}
def models(self):
return self.models_map.keys()
def _get_text(self,role, content):#
jsoncon = {}
jsoncon["role"] = role
jsoncon["content"] = content
return jsoncon
def _get_length(self,text):
length = 0
for content in text:
temp = content["content"]
leng = len(temp)
length += leng
return length
def _check_len(self,text):
while self._get_length(text) > 8000:
del text[0]
return text
def request(self,question,model_name):
global answer
appid = os.getenv("APP_ID")
api_secret = os.getenv("API_SECRET")
api_key = os.getenv("API_KEY")
if model_name not in self.models_map:
logger.info("not find")
return None
for i in range(2):
fp = StringIO()
try:
text = []
question = self._check_len([self._get_text("user", question)])
answer = ""
SparkAPI(
appid,
api_key,
api_secret,
self.models_map[model_name]['url'],
self.models_map[model_name]['domain'],
question,
)
text.clear()
return model_name,answer
except:
traceback.print_exc(file=fp)
print(fp.getvalue())
time.sleep(10)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=XfLLMs()
for model_name in infer.models():
result=infer.request(question,'Xf-Spark-v3.1')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests
python llm.py
输出
```python
import cv2
import numpy as np
import time
import subprocess
def receive_rtsp_data(rtsp_url):
cap = cv2.VideoCapture(rtsp_url)
return cap
def save_video_to_mp4(cap, output_path):
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4))))
return out
def process_video(cap, out):
frame_count = 0
start_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
out.write(frame)
if frame_count % 1800 == 0: # Check every 3 seconds for keyframe
keyframe_detected = detect_keyframe(frame)
if keyframe_detected:
out.release()
break
current_time = time.time()
if current_time - start_time > 5 * 60: # 5 minutes
out.release()
break
def detect_keyframe(frame):
# This is a simple example of keyframe detection using color histogram
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
hist = cv2.normalize(hist, hist).flatten()
return np.max(hist) > 255 / 4
def push_to_rtmp(output_path):
cmd = f"ffmpeg -re -i {output_path} -c copy -f flv rtmp://127.0.0.1/live"
subprocess.call(cmd, shell=True)
if __name__ == "__main__":
rtsp_url = "rtsp://your_rtsp_url"
output_path = "output.mp4"
cap = receive_rtsp_data(rtsp_url)
out = save_video_to_mp4(cap, output_path)
process_video(cap, out)
push_to_rtmp(output_path)
cap.release()
out.release()
```
代码
import os
from dotenv import load_dotenv
load_dotenv(dotenv_path = ".env",override = True)
import requests
import json
import time
import sys,traceback
from io import StringIO
import re
class BaichuanLLMs(object):
'''
https://platform.baichuan-ai.com/docs/api
'''
def __init__(self):
self.models_map={}
self.models_map['Baichuan2-Turbo']="Baichuan2-Turbo"
self.models_map['Baichuan2-Turbo-192k']="Baichuan2-Turbo-192k"
def models(self):
return self.models_map.keys()
def request(self,question,model_name):
if model_name not in self.models_map:
return None
url = "https://api.baichuan-ai.com/v1/chat/completions"
api_key = os.getenv("BAICHUAN_KEY")
data = {
"model": self.models_map[model_name],
"messages": [
{
"role": "user",
"content": question
}
],
"stream": False,
"top_p":0.85,
"top_k":4,
"temperature":0.1,
"with_search_enhance":False
}
json_data = json.dumps(data)
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + api_key
}
for i in range(2):
fp = StringIO()
try:
response = requests.post(url, data=json_data, headers=headers, timeout=60)
if response.status_code == 200:
return model_name,response.json()['choices'][0]['message']["content"]
else:
logger.info("请求失败,状态码:", response.status_code)
logger.info("请求失败,body:", response.text)
time.sleep(10)
except:
traceback.print_exc(file=fp)
logger.info(fp.getvalue())
time.sleep(10)
return None
question=u'''
采用python实现以下功能:
1.接收rtsp音视频数据
2.将音视频保存为mp4格式
3.当mp4文件的时长超过5分钟,且遇到关键帧时,关闭该文件,创建一个新文件
4.将音视频推送到rtmp://127.0.0.1/live
'''
infer=BaichuanLLMs()
for model_name in infer.models():
result=infer.request(question,'Baichuan2-Turbo')
if result:
name,content=result
with open("{}.txt".format(name),"w") as f:
f.write(content)
break
运行
conda create -n llm python=3.7
conda activate llm
pip install python-dotenv requests
python llm.py
输出
{"error":{"code":"insufficient_quota","param":null,"type":"insufficient_quota","message":"You exceeded your current quota, please check your plan and billing details."}}
powered by kaifamiao