Browse Source

fix:add agent/data/upload

jimmyyem 4 months ago
parent
commit
af1a54a3e1

+ 14 - 0
desc/wechat/agent.api

@@ -276,8 +276,22 @@ type (
 	DeleteDataReq {
 		ID *string `json:"id" validate:"required"`
 	}
+
+	UploadDataReq {
+		AgentId *string `form:"agentId"`
+	}
+)
+
+@server (
+	group: agent
 )
 
+service Wechat {
+	// upload agent data | 上传data
+	@handler uploadAgentData
+	post /agent/data/upload () returns (BaseDataInfo)
+}
+
 @server(
     jwt: Auth
     group: agent

+ 19 - 0
hook/fastgpt/data.go

@@ -159,6 +159,25 @@ func CreateBulkData(data *CreateBulkDataReq) (response *CreateBulkDataResp, err
 	return
 }
 
+// DiyCreateBulkData 批量创建数据
+func DiyCreateBulkData(token string, data *CreateBulkDataReq) (response *CreateBulkDataResp, err error) {
+	resp, err := NewDiyResty(token).
+		R().
+		SetResult(&response).
+		SetBody(*data).
+		Post("core/dataset/data/pushData")
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.IsError() {
+		return nil, errors.New(resp.String())
+	}
+
+	return
+}
+
 // UpdateData 修改数据
 func UpdateData(data *UpdateDataReq) (response *DataResp, err error) {
 	resp, err := NewResty().

+ 72 - 0
internal/handler/agent/upload_agent_data_handler.go

@@ -0,0 +1,72 @@
+package agent
+
+import (
+	"errors"
+	"net/http"
+	"strconv"
+	"wechat-api/internal/types"
+
+	"github.com/zeromicro/go-zero/rest/httpx"
+
+	"wechat-api/internal/logic/agent"
+	"wechat-api/internal/svc"
+)
+
+const (
+	defaultMultipartMemory = 32 << 20 // 32 MB
+)
+
+// swagger:route post /agent/data/upload agent UploadAgentData
+//
+// upload agent data | 上传data
+//
+// upload agent data | 上传data
+//
+// Responses:
+//  200: BaseDataInfo
+
+func UploadAgentDataHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		if err := r.ParseMultipartForm(defaultMultipartMemory); err != nil {
+			httpx.Error(w, err)
+			return
+		}
+
+		l := agent.NewUploadAgentDataLogic(r.Context(), svcCtx)
+
+		file, header, err := r.FormFile("file")
+		agentIds, ok := r.Form["agentId"]
+
+		if err != nil {
+			httpx.ErrorCtx(r.Context(), w, err)
+			return
+		}
+		if !ok || len(agentIds) != 1 {
+			httpx.ErrorCtx(r.Context(), w, errors.New("agentId cannot be null"))
+			return
+		}
+		if file == nil {
+			httpx.ErrorCtx(r.Context(), w, errors.New("upload file cannot be null"))
+			return
+		}
+		//l.Logger.Infof("header=%+v size=%d filename=%v\n", header.Header, header.Size, header.Filename)
+		if header.Header.Get("Content-Type") != "text/csv" {
+			httpx.ErrorCtx(r.Context(), w, errors.New("file format must be csv"))
+			return
+		}
+		agentId, _ := strconv.Atoi(agentIds[0])
+
+		var req types.UploadDataReq
+		if err := httpx.Parse(r, &req, true); err != nil {
+			httpx.ErrorCtx(r.Context(), w, err)
+			return
+		}
+
+		resp, err := l.UploadAgentData(&req, file, uint64(agentId))
+		if err != nil {
+			httpx.ErrorCtx(r.Context(), w, err)
+		} else {
+			httpx.OkJsonCtx(r.Context(), w, resp)
+		}
+	}
+}

+ 10 - 0
internal/handler/routes.go

@@ -137,6 +137,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
 	)
 
 	server.AddRoutes(
+		[]rest.Route{
+			{
+				Method:  http.MethodPost,
+				Path:    "/agent/data/upload",
+				Handler: agent.UploadAgentDataHandler(serverCtx),
+			},
+		},
+	)
+
+	server.AddRoutes(
 		rest.WithMiddlewares(
 			[]rest.Middleware{serverCtx.Authority},
 			[]rest.Route{

+ 91 - 0
internal/logic/agent/upload_agent_data_logic.go

@@ -0,0 +1,91 @@
+package agent
+
+import (
+	"context"
+	"encoding/csv"
+	"fmt"
+	"github.com/suyuan32/simple-admin-common/msg/errormsg"
+	"mime/multipart"
+	agentModel "wechat-api/ent/agent"
+	"wechat-api/hook/fastgpt"
+	"wechat-api/internal/utils/dberrorhandler"
+
+	"wechat-api/internal/svc"
+	"wechat-api/internal/types"
+
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type UploadAgentDataLogic struct {
+	logx.Logger
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+}
+
+func NewUploadAgentDataLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UploadAgentDataLogic {
+	return &UploadAgentDataLogic{
+		Logger: logx.WithContext(ctx),
+		ctx:    ctx,
+		svcCtx: svcCtx}
+}
+
+func (l *UploadAgentDataLogic) UploadAgentData(req *types.UploadDataReq, file multipart.File, agentId uint64) (*types.BaseDataInfo, error) {
+	var count uint64
+
+	reader := csv.NewReader(file)
+	records, err := reader.ReadAll()
+	if err != nil {
+		return nil, err
+	}
+
+	fmt.Printf("req=%+v", *req)
+
+	agent, err := l.svcCtx.DB.Agent.Query().Where(agentModel.ID(agentId)).Only(l.ctx)
+	if err != nil {
+		return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
+	}
+
+	//TODO
+	token := agent.CollectionID
+
+	var params fastgpt.CreateBulkDataReq
+	params.CollectionID = agent.CollectionID
+	params.TrainingMode = "chunk"
+
+	qas := make([]fastgpt.DataQuestion, 0, 100)
+	for idx, record := range records {
+		//l.Logger.Infof("idx=>%d 0=>%s 1=>%s\n", idx, record[0], record[1])
+		qas = append(qas, fastgpt.DataQuestion{
+			Q: record[0],
+			A: record[1],
+		})
+		if idx%100 == 0 && len(qas) > 0 {
+			params.Data = qas
+			//response, err := fastgpt.DiyCreateBulkData(token, &params)
+			response, err := fastgpt.CreateBulkData(&params)
+			if err != nil {
+				l.Logger.Errorf("batch insert data to fastgpt failed. collection=%s error=%s", agent.CollectionID, err.Error())
+				return nil, err
+			}
+			count += response.Data.InsertLen
+			qas = make([]fastgpt.DataQuestion, 0, 100)
+		}
+	}
+
+	if len(qas) > 0 {
+		params.Data = qas
+		response, err := fastgpt.DiyCreateBulkData(token, &params)
+		//response, err := fastgpt.CreateBulkData(&params)
+		if err != nil {
+			l.Logger.Errorf("batch insert data to fastgpt failed. collection=%s error=%s", agent.CollectionID, err.Error())
+			return nil, err
+		}
+		count += response.Data.InsertLen
+	}
+
+	resp := &types.BaseDataInfo{}
+	resp.Code = 0
+	resp.Msg = errormsg.Success
+	resp.Data = fmt.Sprintf("upload %d rows", count)
+	return resp, nil
+}

+ 5 - 0
internal/types/types.go

@@ -707,6 +707,11 @@ type DeleteDataReq struct {
 	ID *string `json:"id" validate:"required"`
 }
 
+// swagger:model UploadDataReq
+type UploadDataReq struct {
+	AgentId *string `form:"agentId"`
+}
+
 // ContactLabelList | Contact标签列表
 type ContactLabelList struct {
 	// label