package compapi import ( "context" "encoding/json" "errors" "fmt" "net/http" "wechat-api/internal/types" "wechat-api/internal/utils/contextkey" openai "github.com/openai/openai-go" "github.com/openai/openai-go/option" "github.com/openai/openai-go/packages/ssestream" "github.com/zeromicro/go-zero/rest/httpx" ) func NewAiClient(apiKey string, apiBase string) *openai.Client { return openai.NewClient(option.WithAPIKey(apiKey), option.WithBaseURL(apiBase)) } func NewFastgptClient(apiKey string) *openai.Client { //http://fastgpt.ascrm.cn/api/v1/ return openai.NewClient(option.WithAPIKey(apiKey), option.WithBaseURL("http://fastgpt.ascrm.cn/api/v1/")) } func NewDeepSeekClient(apiKey string) *openai.Client { return openai.NewClient(option.WithAPIKey(apiKey), option.WithBaseURL("https://api.deepseek.com")) } func DoChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { var ( jsonBytes []byte err error ) emptyParams := openai.ChatCompletionNewParams{} if jsonBytes, err = json.Marshal(chatInfo); err != nil { return nil, err } customResp := types.CompOpenApiResp{} reqBodyOps := option.WithRequestBody("application/json", jsonBytes) respBodyOps := option.WithResponseBodyInto(&customResp) if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps); err != nil { return nil, err } return &customResp, nil } func DoChatCompletionsStream(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) { var ( jsonBytes []byte raw *http.Response //raw []byte ok bool hw http.ResponseWriter ) hw, ok = contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter if !ok { return nil, errors.New("content get http writer err") } flusher, ok := (hw).(http.Flusher) if !ok { http.Error(hw, "Streaming unsupported!", http.StatusInternalServerError) } emptyParams := openai.ChatCompletionNewParams{} if jsonBytes, err = json.Marshal(chatInfo); err != nil { return nil, err } reqBodyOps := option.WithRequestBody("application/json", jsonBytes) respBodyOps := option.WithResponseBodyInto(&raw) if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps, option.WithJSONSet("stream", true)); err != nil { return nil, err } //设置流式输出头 http1.1 hw.Header().Set("Content-Type", "text/event-stream;charset=utf-8") hw.Header().Set("Connection", "keep-alive") hw.Header().Set("Cache-Control", "no-cache") chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(raw), err) defer chatStream.Close() for chatStream.Next() { chunk := chatStream.Current() fmt.Fprintf(hw, "event:%s\ndata:%s\n\n", chunk.Event, chunk.Data.RAW) flusher.Flush() //time.Sleep(1 * time.Millisecond) } fmt.Fprintf(hw, "event:%s\ndata:%s\n\n", "answer", "[DONE]") flusher.Flush() httpx.Ok(hw) return nil, nil } func NewChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { if chatInfo.Stream { return DoChatCompletionsStream(ctx, client, chatInfo) } else { return DoChatCompletions(ctx, client, chatInfo) } } func NewFastgptChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { client := NewAiClient(apiKey, apiBase) return NewChatCompletions(ctx, client, chatInfo) } func NewDeepSeekChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq, chatModel openai.ChatModel) (res *types.CompOpenApiResp, err error) { client := NewDeepSeekClient(apiKey) if chatModel != ChatModelDeepSeekV3 { chatModel = ChatModelDeepSeekR1 } chatInfo.Model = chatModel return NewChatCompletions(ctx, client, chatInfo) } func DoChatCompletionsStreamOld(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) { var ( jsonBytes []byte ) emptyParams := openai.ChatCompletionNewParams{} if jsonBytes, err = json.Marshal(chatInfo); err != nil { return nil, err } reqBodyOps := option.WithRequestBody("application/json", jsonBytes) //customResp := types.CompOpenApiResp{} //respBodyOps := option.WithResponseBodyInto(&customResp) //chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps, respBodyOps) chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps) // optionally, an accumulator helper can be used acc := openai.ChatCompletionAccumulator{} httpWriter, ok := ctx.Value("HttpResp-Writer").(http.ResponseWriter) if !ok { return nil, errors.New("content get writer err") } //httpWriter.Header().Set("Content-Type", "text/event-stream;charset=utf-8") //httpWriter.Header().Set("Connection", "keep-alive") //httpWriter.Header().Set("Cache-Control", "no-cache") idx := 0 for chatStream.Next() { chunk := chatStream.Current() acc.AddChunk(chunk) fmt.Printf("=====>get %d chunk:%v\n", idx, chunk) if _, err := fmt.Fprintf(httpWriter, "%v", chunk); err != nil { fmt.Printf("Error writing to client:%v \n", err) break } if content, ok := acc.JustFinishedContent(); ok { println("Content stream finished:", content) } // if using tool calls if tool, ok := acc.JustFinishedToolCall(); ok { println("Tool call stream finished:", tool.Index, tool.Name, tool.Arguments) } if refusal, ok := acc.JustFinishedRefusal(); ok { println("Refusal stream finished:", refusal) } // it's best to use chunks after handling JustFinished events if len(chunk.Choices) > 0 { idx++ fmt.Printf("idx:%d get =>'%s'\n", idx, chunk.Choices[0].Delta.Content) } } if err := chatStream.Err(); err != nil { return nil, err } return nil, nil }