package compapi import ( "context" "encoding/json" "errors" "fmt" "net/http" "reflect" "strings" "wechat-api/internal/types" "wechat-api/internal/utils/contextkey" openai "github.com/openai/openai-go" "github.com/openai/openai-go/option" "github.com/openai/openai-go/packages/ssestream" "github.com/zeromicro/go-zero/rest/httpx" ) func IsOpenaiModel(model string) bool { prefixes := []string{"gpt-4", "gpt-3", "o1", "o3"} // 遍历所有前缀进行检查 for _, prefix := range prefixes { if strings.HasPrefix(model, prefix) { return true } } return false } func EntStructGenScanField(structPtr any, ignoredTypes ...reflect.Type) (string, []any, error) { t := reflect.TypeOf(structPtr) v := reflect.ValueOf(structPtr) if t.Kind() != reflect.Ptr || t.Elem().Kind() != reflect.Struct { return "", nil, errors.New("input must be a pointer to a struct") } t = t.Elem() v = v.Elem() var fields []string var scanArgs []any ignoredMap := make(map[reflect.Type]struct{}) // 检查调用者是否传入了任何要忽略的类型 if len(ignoredTypes) > 0 { for _, ignoredType := range ignoredTypes { if ignoredType != nil { // 防止 nil 类型加入 map ignoredMap[ignoredType] = struct{}{} } } } for i := 0; i < t.NumField(); i++ { field := t.Field(i) value := v.Field(i) // Skip unexported fields if !field.IsExported() { continue } // Get json tag jsonTag := field.Tag.Get("json") if jsonTag == "-" || jsonTag == "" { continue } jsonParts := strings.Split(jsonTag, ",") jsonName := jsonParts[0] if jsonName == "" { continue } //传入了要忽略的类型时进行处理 if len(ignoredMap) > 0 { fieldType := field.Type //获取字段的实际 Go 类型 //如果字段是指针,我们通常关心的是指针指向的元素的类型 if fieldType.Kind() == reflect.Ptr { fieldType = fieldType.Elem() // 获取元素类型 } if _, shouldIgnore := ignoredMap[fieldType]; shouldIgnore { continue // 成员类型存在于忽略列表中则忽略 } } fields = append(fields, jsonName) scanArgs = append(scanArgs, value.Addr().Interface()) } return strings.Join(fields, ", "), scanArgs, nil } type StdChatClient struct { *openai.Client } func NewStdChatClient(apiKey string, apiBase string) *StdChatClient { opts := []option.RequestOption{} if len(apiKey) > 0 { opts = append(opts, option.WithAPIKey(apiKey)) } opts = append(opts, option.WithBaseURL(apiBase)) client := openai.NewClient(opts...) return &StdChatClient{&client} } func NewAiClient(apiKey string, apiBase string) *openai.Client { opts := []option.RequestOption{} if len(apiKey) > 0 { opts = append(opts, option.WithAPIKey(apiKey)) } opts = append(opts, option.WithBaseURL(apiBase)) client := openai.NewClient(opts...) return &client } func NewFastgptClient(apiKey string) *openai.Client { //http://fastgpt.ascrm.cn/api/v1/ client := openai.NewClient(option.WithAPIKey(apiKey), option.WithBaseURL("http://fastgpt.ascrm.cn/api/v1/")) return &client } func NewDeepSeekClient(apiKey string) *openai.Client { client := openai.NewClient(option.WithAPIKey(apiKey), option.WithBaseURL("https://api.deepseek.com")) return &client } func DoChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { var ( jsonBytes []byte err error ) emptyParams := openai.ChatCompletionNewParams{} if jsonBytes, err = json.Marshal(chatInfo); err != nil { return nil, err } //fmt.Printf("In DoChatCompletions, req: '%s'\n", string(jsonBytes)) //也许应该对请求体不规范成员名进行检查 customResp := types.CompOpenApiResp{} reqBodyOps := option.WithRequestBody("application/json", jsonBytes) respBodyOps := option.WithResponseBodyInto(&customResp) if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps); err != nil { return nil, err } if customResp.FgtErrCode != nil && customResp.FgtErrStatusTxt != nil { //针对fastgpt出错但New()不返回错误的情况 return nil, fmt.Errorf("%s(%d)", *customResp.FgtErrStatusTxt, *customResp.FgtErrCode) } return &customResp, nil } func DoChatCompletionsStream(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) { var ( jsonBytes []byte raw *http.Response //raw []byte ok bool hw http.ResponseWriter ) hw, ok = contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter if !ok { return nil, errors.New("content get http writer err") } flusher, ok := (hw).(http.Flusher) if !ok { http.Error(hw, "Streaming unsupported!", http.StatusInternalServerError) } emptyParams := openai.ChatCompletionNewParams{} if jsonBytes, err = json.Marshal(chatInfo); err != nil { return nil, err } reqBodyOps := option.WithRequestBody("application/json", jsonBytes) respBodyOps := option.WithResponseBodyInto(&raw) if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps, option.WithJSONSet("stream", true)); err != nil { return nil, err } //设置流式输出头 http1.1 hw.Header().Set("Content-Type", "text/event-stream;charset=utf-8") hw.Header().Set("Connection", "keep-alive") hw.Header().Set("Cache-Control", "no-cache") chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(raw), err) defer chatStream.Close() for chatStream.Next() { chunk := chatStream.Current() fmt.Fprintf(hw, "data:%s\n\n", chunk.Data.RAW) flusher.Flush() //time.Sleep(1 * time.Millisecond) } fmt.Fprintf(hw, "data:%s\n\n", "[DONE]") flusher.Flush() httpx.Ok(hw) return nil, nil } func NewChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { if chatInfo.Stream { return DoChatCompletionsStream(ctx, client, chatInfo) } else { return DoChatCompletions(ctx, client, chatInfo) } } func NewMismatchChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { client := NewAiClient(apiKey, apiBase) return NewChatCompletions(ctx, client, chatInfo) } func NewFastgptChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { client := NewAiClient(apiKey, apiBase) return NewChatCompletions(ctx, client, chatInfo) } func NewDeepSeekChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq, chatModel openai.ChatModel) (res *types.CompOpenApiResp, err error) { client := NewDeepSeekClient(apiKey) if chatModel != ChatModelDeepSeekV3 { chatModel = ChatModelDeepSeekR1 } chatInfo.Model = chatModel return NewChatCompletions(ctx, client, chatInfo) } func DoChatCompletionsStreamOld(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) { var ( jsonBytes []byte ) emptyParams := openai.ChatCompletionNewParams{} if jsonBytes, err = json.Marshal(chatInfo); err != nil { return nil, err } reqBodyOps := option.WithRequestBody("application/json", jsonBytes) //customResp := types.CompOpenApiResp{} //respBodyOps := option.WithResponseBodyInto(&customResp) //chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps, respBodyOps) chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps) // optionally, an accumulator helper can be used acc := openai.ChatCompletionAccumulator{} httpWriter, ok := ctx.Value("HttpResp-Writer").(http.ResponseWriter) if !ok { return nil, errors.New("content get writer err") } //httpWriter.Header().Set("Content-Type", "text/event-stream;charset=utf-8") //httpWriter.Header().Set("Connection", "keep-alive") //httpWriter.Header().Set("Cache-Control", "no-cache") idx := 0 for chatStream.Next() { chunk := chatStream.Current() acc.AddChunk(chunk) fmt.Printf("=====>get %d chunk:%v\n", idx, chunk) if _, err := fmt.Fprintf(httpWriter, "%v", chunk); err != nil { fmt.Printf("Error writing to client:%v \n", err) break } if content, ok := acc.JustFinishedContent(); ok { println("Content stream finished:", content) } // if using tool calls if tool, ok := acc.JustFinishedToolCall(); ok { println("Tool call stream finished:", tool.Index, tool.Name, tool.Arguments) } if refusal, ok := acc.JustFinishedRefusal(); ok { println("Refusal stream finished:", refusal) } // it's best to use chunks after handling JustFinished events if len(chunk.Choices) > 0 { idx++ fmt.Printf("idx:%d get =>'%s'\n", idx, chunk.Choices[0].Delta.Content) } } if err := chatStream.Err(); err != nil { return nil, err } return nil, nil } func GetWorkInfoByID(eventType string, workId string) (string, uint) { val, exist := fastgptWorkIdMap[workId] if !exist { val = fastgptWorkIdMap["default"] } return val.Id, val.Idx } // 获取workToken func GetWorkTokenByID(eventType string, workId string) string { id, _ := GetWorkInfoByID(eventType, workId) return id } // 获取workIdx func GetWorkIdxByID(eventType string, workId string) uint { _, idx := GetWorkInfoByID(eventType, workId) return idx }