Browse Source

Merge branch 'develop/apikey_and_agent_v1' into debug

* develop/apikey_and_agent_v1:
  提供最新的compapi_asynctask表结构
  重构了mismatch类型的回调格式,修补了一个数据库类型bug
boweniac 2 weeks ago
parent
commit
d40de35342

+ 5 - 4
crontask/compapi_callback.go

@@ -234,8 +234,10 @@ func (l *CronTask) processTask(workerID int, task Task) {
 			//收集错误
 			if rt == 0 {
 				//不可恢复错误处理....
+				logx.Errorf("Task error by '%s'", err)
+			} else {
+				logx.Debugf("Task ignore by '%s'", err)
 			}
-			logx.Debugf("Task ignore by '%s'", err)
 			return //先暂时忽略处理,也许应按错误类型分别对待
 		}
 	}
@@ -261,7 +263,7 @@ func (l *CronTask) requestCallback(taskData *ent.CompapiAsynctask) (int, error)
 
 	/*
 		fstr := "mytest-svc:"
-		if taskData.RetryCount > 0 && strings.Contains(taskData.CallbackURL, fstr) {
+		if taskData.RetryCount >= 0 && strings.Contains(taskData.CallbackURL, fstr) {
 			taskData.CallbackURL = strings.Replace(taskData.CallbackURL, fstr, "0.0.0.0:", 1)
 		}
 	*/
@@ -277,8 +279,7 @@ func (l *CronTask) requestCallback(taskData *ent.CompapiAsynctask) (int, error)
 	//初始化client
 	client := compapi.NewClient(l.ctx)
 	for i := range LoopTryCount { //重试机制
-
-		res, err = client.Callback(taskData.EventType, taskData.CallbackURL, taskData.ResponseRaw)
+		res, err = client.Callback(taskData.EventType, taskData.CallbackURL, taskData)
 		//_, err = client.Chat.Completions.New(l.ctx, emptyParams, opts...)
 		if err == nil {
 			//call succ

+ 48 - 3
internal/logic/chat/chat_completions_logic.go

@@ -109,6 +109,17 @@ func (l *ChatCompletionsLogic) ChatCompletions(req *types.CompApiReq) (asyncMode
 	if err != nil {
 		return false, nil, err
 	}
+	/*
+		switch wf.(type) {
+		case *MismatchChatLogic:
+			fmt.Println("MismatchChatLogic Flow.....")
+		case *FastgptChatLogic:
+			fmt.Println("FastgptChatLogic Flow.....")
+		default:
+			fmt.Println("Other Flow.....")
+		}
+	*/
+
 	//微调部分请求参数
 	wf.AdjustRequest(req, apiKeyObj)
 
@@ -167,11 +178,45 @@ func (l *ChatCompletionsLogic) DoSyncRequest(apiKeyObj *ent.ApiKey, req *types.C
 	resp, err := compapi.NewClient(l.ctx, compapi.WithApiBase(apiKeyObj.Edges.Agent.APIBase),
 		compapi.WithApiKey(apiKeyObj.Edges.Agent.APIKey)).
 		Chat(req)
+
 	/*
+		if err != nil {
+			return nil, err
+		}
+		if req.EventType == "mismatch" {
+
+			client := compapi.MismatchClient{}
+			taskData := ent.CompapiAsynctask{}
+			taskData.ID = 1234
+			taskData.ResponseChatItemID = req.ResponseChatItemId
+			taskData.EventType = req.EventType
+			taskData.ChatID = req.ChatId
+			var err error
+			taskData.ResponseRaw, err = resp.ToString()
+			if err != nil {
+				fmt.Println(err)
+				return nil, err
+			}
+			var bs []byte
+			bs, err = client.CallbackPrepare(&taskData)
+			if err != nil {
+				fmt.Println(err)
+				return nil, err
+			}
+			fmt.Println(string(bs))
+			nres := map[string]string{}
+			err = json.Unmarshal(bs, &nres)
+			if err != nil {
+				fmt.Println(err)
+				return nil, err
+			}
+			fmt.Println(typekit.PrettyPrint(nres))
+
 			res := compapi.MismatchResponse{}
-		err = compapi.NewChatResult(resp).ParseContentAs(&res)
-		fmt.Println(err)
-		fmt.Println(typekit.PrettyPrint(res))
+			err = compapi.NewChatResult(resp).ParseContentAs(&res)
+			fmt.Println(err)
+			fmt.Println(typekit.PrettyPrint(res))
+		}
 	*/
 	return resp, err
 }

+ 23 - 3
internal/utils/compapi/mismatch.go

@@ -1,6 +1,9 @@
 package compapi
 
 import (
+	"errors"
+	"fmt"
+	"wechat-api/ent"
 	"wechat-api/internal/types"
 
 	"github.com/openai/openai-go"
@@ -80,10 +83,27 @@ func (me *MismatchClient) BuildRequest(req *types.CompApiReq) error {
 }
 
 func (me *MismatchClient) CallbackPrepare(params any) ([]byte, error) {
-	res := MismatchResponse{}
-	err := NewChatResult(params).ParseContentAs(&res)
+	taskData, ok := params.(*ent.CompapiAsynctask)
+	if !ok {
+		return nil, errors.New("invalid callback taskdata")
+	}
+	type OutResult struct {
+		InternalID string `json:"internal_id"`
+		ExternalID string `json:"external_id"`
+		ChatID     string `json:"chat_id"`
+		EventType  string `json:"event_type"`
+		Content    string `json:"content"`
+	}
+
+	res := OutResult{}
+	res.InternalID = fmt.Sprintf("%d", taskData.ID)
+	res.ExternalID = taskData.ResponseChatItemID
+	res.EventType = taskData.EventType
+	res.ChatID = taskData.ChatID
+	var err error
+	res.Content, err = NewChatResult(taskData.ResponseRaw).GetContentJsonStr()
 	if err != nil {
 		return nil, err
 	}
-	return WrapJSON(res, "data", true)
+	return WrapJSON(res, "", false)
 }

+ 26 - 10
internal/utils/compapi/result.go

@@ -41,6 +41,27 @@ func (r *ChatResult) GetContentString() (string, error) {
 	return content, err
 }
 
+func (r *ChatResult) GetContentJsonStr() (string, error) {
+	var (
+		content string = ""
+		err     error
+	)
+	if r.err == nil && len(r.Choices) > 0 {
+		content = r.Choices[0].Message.Content
+	} else if r.err == nil && len(r.Choices) == 0 {
+		err = errors.New("choices empty")
+	}
+	if !IsOpenaiModel(r.Model) { //不支持Response Schema的要特殊处理一下
+		var isJsonContent bool
+		content, isJsonContent = ExtractJSONContent(content)
+		if !isJsonContent {
+			return "", errors.New("invalid json content")
+		}
+	}
+
+	return content, err
+}
+
 // ParseContentAs 解析 Message Content 中的 JSON 到指定的 Go 结构体
 // target 必须是一个指向目标结构体实例的指针 (e.g., &MyStruct{})
 func (r *ChatResult) ParseContentAs(target any) error {
@@ -58,13 +79,7 @@ func (r *ChatResult) ParseContentAs(target any) error {
 			return errors.New("invalid json content")
 		}
 	}
-	err = json.Unmarshal([]byte(content), target)
-	if err != nil {
-		return fmt.Errorf("parseContent err: failed to unmarshal content JSON "+
-			"into target type '%w'", err)
-	}
-
-	return nil
+	return ParseContentAs(content, target, false)
 }
 
 func AnyToBytes(in any) ([]byte, error) {
@@ -162,9 +177,10 @@ func WrapJSON(input any, warpKey string, checkValid bool) ([]byte, error) {
 	return outputBytes, nil
 }
 
-func ParseContentAs(content string, target any) error {
-	// 清理可能的 Markdown ```json ``` 包装
-	if strings.HasPrefix(content, "```json") && strings.HasSuffix(content, "```") {
+func ParseContentAs(content string, target any, removeJsonBlock bool) error {
+
+	if removeJsonBlock &&
+		strings.HasPrefix(content, "```json") && strings.HasSuffix(content, "```") {
 		content = strings.TrimSuffix(strings.TrimPrefix(content, "```json"), "```")
 		content = strings.TrimSpace(content)
 	}

+ 6 - 1
internal/utils/compapi/std.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"net/http"
+	"wechat-api/ent"
 	"wechat-api/internal/types"
 
 	"github.com/openai/openai-go"
@@ -15,7 +16,11 @@ type StdClient struct {
 }
 
 func (me *StdClient) CallbackPrepare(params any) ([]byte, error) {
-	return AnyToBytes(params)
+	taskData, ok := params.(*ent.CompapiAsynctask)
+	if !ok {
+		return nil, errors.New("invalid callback taskdata")
+	}
+	return AnyToBytes(taskData.ResponseRaw)
 }
 
 func (me *StdClient) DoRequestBS(req []byte,

+ 5 - 4
sql/wechat/compapi_asynctask.sql

@@ -4,7 +4,7 @@
 -- https://tableplus.com/
 --
 -- Database: wechat
--- Generation Time: 2025-04-13 16:07:49.8430
+-- Generation Time: 2025-04-16 15:43:45.0380
 -- -------------------------------------------------------------
 
 
@@ -23,7 +23,8 @@ CREATE TABLE `compapi_asynctask` (
   `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create Time | 任务创建日期',
   `auth_token` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '发起请求者的授权token',
   `event_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'fastgpt' COMMENT '请求目标类型',
-  `chat_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '会话ID',
+  `chat_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '会话ID',
+  `response_chat_item_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '',
   `organization_id` bigint unsigned DEFAULT '1' COMMENT '机构 ID',
   `openai_base` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '大模型服务地址',
   `openai_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '大模型服务密钥',
@@ -35,10 +36,10 @@ CREATE TABLE `compapi_asynctask` (
   `task_status` tinyint NOT NULL DEFAULT '10' COMMENT 'callback status |任务完成状态 10 任务就绪 20 请求API完成 30 请求回调完成 50 任务全部完成 60 任务暂停 70 任务失败 ',
   `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'updated Time | 状态更新时间',
   `retry_count` tinyint DEFAULT '0' COMMENT 'retry count | 重试次数',
-  `last_error` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '最后一次错误信息',
+  `last_error` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '最后一次错误信息',
   PRIMARY KEY (`id`),
   KEY `task_status` (`task_status`) USING BTREE
-) ENGINE=InnoDB AUTO_INCREMENT=816 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+) ENGINE=InnoDB AUTO_INCREMENT=873 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;