# API调用
发布后,可以通过API访问
# 创建API秘钥
注意:每一个API秘钥都和本APP一一对应,即通过不同的API秘钥来访问不同的APP。

# 调用API
可以查看平台上API文档了解详情。这里给一些使用示例。请替换示例中的API秘钥。
# 示例1:调用一次chatflow会话
阻塞模式,即等待所有结果,一次性返回:
# 示例2:调用chatflow时需要传递文件或图片
比如chatflow中自定义了一个输入参数叫fileInput,下面是使用例子
对于远程url文件,可以这样子调用:
import requests
url = 'https://llm.huya.info/v1/chat-messages'
text = 'Hello world' # 你的输入
header = {'Authorization': 'Bearer 你的API秘钥', 'Content-Type': 'application/json'}
data_json = {
'inputs':
'fileInput': { # 这个是你平台上自定义的输入名字
'type': 'image', # 这个是文件类型,比如image,document,audio,video,custom
'transfer_method': 'remote_url', # 远程url固定这个
'url': 'your url', # 你的 url,比如https://example.com/abc.png
},
},
'query': text,
'response_mode': 'blocking',
'conversation_id': "",
'user': 'test_user'
}
response = requests.post(url, headers=header, json=data_json, timeout=20)
print(response.json())
对于本地文件,可以这样子调用:
# TODO带补充这个例子代码
# 需要先调用文件上传接口/files/upload,拿到upload_file_id,然后
'inputs': {
'my_file': {
'type': 'image or document or audio等',
'transfer_method': 'local_file',
'upload_file_id': 'your upload_file_id',
},
},
# 示例3:调用一次workflow运行
import os
import requests
url = 'https://llm.huya.info/v1/workflows/run'
headers = {
'Authorization': 'Bearer your_token_here', # 从环境变量读取
'Content-Type': 'application/json'
}
data = {
"inputs": {},# 这里写输入变量,如:{"test_input": "asdfsff"}
"response_mode": "blocking", # 阻塞模型请求,也支持流失
"user": "user_name"
}
response = requests.post(url, json=data, headers=headers)
print("Status Code:", response.status_code)
print("Response Body:", response.json())
# 示例4:调用workflow时需要传递文件
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"log"
"path/filepath"
)
type UploadResponse struct {
ID string `json:"id"`
}
func main() {
token := "app-xxx" // 替换为实际的API秘钥
// 1. 上传文件
filePath := "/Users/mac/Downloads/test.txt" // 替换为实际文件路径
fileType := "document" // 替换为实际文件类型,如 document, image, audio, video, custom
fileID, err := uploadFile(token, filePath)
if err != nil {
log.Printf("上传文件失败: %v\n", err)
return
}
log.Printf("上传成功,文件ID: %s\n", fileID)
// 2. 运行工作流
inputs := map[string]interface{}{
// 请将test_file替换为您的工作流中实际的输入文件名称
"test_file": map[string]interface{}{
"transfer_method": "local_file", // 如果是远程URL,请参考文档参数修改
"upload_file_id": fileID,
"type": fileType,
},
// 这里可以继续添加其它变量
}
outputs, err := runWorkflow(token, inputs)
if err != nil {
log.Printf("运行工作流失败: %v\n", err)
return
}
log.Println("工作流运行成功")
log.Println("Outputs内容:")
for key, value := range outputs {
log.Printf("%s: %v\n", key, value)
}
}
func uploadFile(token string, filePath string) (string, error) {
form := new(bytes.Buffer)
writer := multipart.NewWriter(form)
fw, err := writer.CreateFormFile("file", filepath.Base(filePath))
if err != nil {
log.Fatal(err)
}
fd, err := os.Open(filePath)
if err != nil {
log.Fatal(err)
}
defer fd.Close()
_, err = io.Copy(fw, fd)
if err != nil {
log.Fatal(err)
}
formField, err := writer.CreateFormField("user")
if err != nil {
log.Fatal(err)
}
_, err = formField.Write([]byte("abc-123"))
writer.Close()
client := &http.Client{}
req, err := http.NewRequest("POST", "https://llm.huya.info/v1/files/upload", form)
if err != nil {
log.Fatal(err)
}
req.Header.Set("Authorization", "Bearer " + token)
req.Header.Set("Content-Type", writer.FormDataContentType())
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
var result UploadResponse
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return "", err
}
return result.ID, nil
}
func runWorkflow(token string, inputs map[string]interface{}) (map[string]interface{}, error) {
payload := map[string]interface{}{
"inputs": inputs,
"response_mode": "blocking",
"user": "abc-123",
}
jsonData, err := json.Marshal(payload)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", "https://llm.huya.info/v1/workflows/run", bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer " + token)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("发送消息失败,状态码: %d", resp.StatusCode)
}
bodyText, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
var responseData map[string]interface{}
err = json.Unmarshal(bodyText, &responseData)
if err != nil {
return nil, err
}
if data, ok := responseData["data"].(map[string]interface{}); ok {
if outputs, ok := data["outputs"].(map[string]interface{}); ok {
return outputs, nil
}
}
return nil, fmt.Errorf("未找到outputs数据")
}
# 扩展
# 如何异步调用Workflow?
如果workflow一次运行非常久,比如半小时,请求一直保持连接的稳定性可能会比较差,有时可能会被中断,这个时候可能需要异步请求,即请求运行后立刻中断连接,后续通过轮询方式获取结果输出。
当前仅支持Workflow异步调用,ChatFlow暂未支持
- 请求时,设置response_mode为async
curl --max-time 0.5 -X POST 'https://llm.huya.info/v1/workflows/run' \
--header 'Authorization: Bearer app-xxxx' \
--header 'Content-Type: application/json' \
--data-raw '{
"response_mode": "async",
"user": "user_name"
}'
请求后会立刻返回一个workflow_run_id,例如:{"workflow_run_id": "xxx"}
- 轮询状态接口获取运行状态和输出结果
curl -X GET 'https://llm.huya.info/v1/workflows/run/{workflow_run_id}' \
-H 'Authorization: Bearer app-xxxx' \
-H 'Content-Type: application/json'