123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package compapi
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net/http"
- "wechat-api/internal/types"
- "wechat-api/internal/utils/contextkey"
- openai "github.com/openai/openai-go"
- "github.com/openai/openai-go/option"
- "github.com/openai/openai-go/packages/ssestream"
- "github.com/zeromicro/go-zero/rest/httpx"
- )
- func NewAiClient(apiKey string, apiBase string) *openai.Client {
- return openai.NewClient(option.WithAPIKey(apiKey),
- option.WithBaseURL(apiBase))
- }
- func NewFastgptClient(apiKey string) *openai.Client {
- //http://fastgpt.ascrm.cn/api/v1/
- return openai.NewClient(option.WithAPIKey(apiKey),
- option.WithBaseURL("http://fastgpt.ascrm.cn/api/v1/"))
- }
- func NewDeepSeekClient(apiKey string) *openai.Client {
- return openai.NewClient(option.WithAPIKey(apiKey),
- option.WithBaseURL("https://api.deepseek.com"))
- }
- func DoChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
- var (
- jsonBytes []byte
- err error
- )
- emptyParams := openai.ChatCompletionNewParams{}
- if jsonBytes, err = json.Marshal(chatInfo); err != nil {
- return nil, err
- }
- customResp := types.CompOpenApiResp{}
- reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
- respBodyOps := option.WithResponseBodyInto(&customResp)
- if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps); err != nil {
- return nil, err
- }
- return &customResp, nil
- }
- func DoChatCompletionsStream(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
- var (
- jsonBytes []byte
- raw *http.Response
- //raw []byte
- ok bool
- hw http.ResponseWriter
- )
- hw, ok = contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter
- if !ok {
- return nil, errors.New("content get http writer err")
- }
- flusher, ok := (hw).(http.Flusher)
- if !ok {
- http.Error(hw, "Streaming unsupported!", http.StatusInternalServerError)
- }
- emptyParams := openai.ChatCompletionNewParams{}
- if jsonBytes, err = json.Marshal(chatInfo); err != nil {
- return nil, err
- }
- reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
- respBodyOps := option.WithResponseBodyInto(&raw)
- if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps, option.WithJSONSet("stream", true)); err != nil {
- return nil, err
- }
- //设置流式输出头 http1.1
- hw.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
- hw.Header().Set("Connection", "keep-alive")
- hw.Header().Set("Cache-Control", "no-cache")
- chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(raw), err)
- defer chatStream.Close()
- for chatStream.Next() {
- chunk := chatStream.Current()
- fmt.Fprintf(hw, "event:%s\ndata:%s\n\n", chunk.Event, chunk.Data.RAW)
- flusher.Flush()
- //time.Sleep(1 * time.Millisecond)
- }
- fmt.Fprintf(hw, "event:%s\ndata:%s\n\n", "answer", "[DONE]")
- flusher.Flush()
- httpx.Ok(hw)
- return nil, nil
- }
- func NewChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
- if chatInfo.Stream {
- return DoChatCompletionsStream(ctx, client, chatInfo)
- } else {
- return DoChatCompletions(ctx, client, chatInfo)
- }
- }
- func NewFastgptChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
- client := NewAiClient(apiKey, apiBase)
- return NewChatCompletions(ctx, client, chatInfo)
- }
- func NewDeepSeekChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq, chatModel openai.ChatModel) (res *types.CompOpenApiResp, err error) {
- client := NewDeepSeekClient(apiKey)
- if chatModel != ChatModelDeepSeekV3 {
- chatModel = ChatModelDeepSeekR1
- }
- chatInfo.Model = chatModel
- return NewChatCompletions(ctx, client, chatInfo)
- }
- func DoChatCompletionsStreamOld(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
- var (
- jsonBytes []byte
- )
- emptyParams := openai.ChatCompletionNewParams{}
- if jsonBytes, err = json.Marshal(chatInfo); err != nil {
- return nil, err
- }
- reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
- //customResp := types.CompOpenApiResp{}
- //respBodyOps := option.WithResponseBodyInto(&customResp)
- //chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps, respBodyOps)
- chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps)
- // optionally, an accumulator helper can be used
- acc := openai.ChatCompletionAccumulator{}
- httpWriter, ok := ctx.Value("HttpResp-Writer").(http.ResponseWriter)
- if !ok {
- return nil, errors.New("content get writer err")
- }
- //httpWriter.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
- //httpWriter.Header().Set("Connection", "keep-alive")
- //httpWriter.Header().Set("Cache-Control", "no-cache")
- idx := 0
- for chatStream.Next() {
- chunk := chatStream.Current()
- acc.AddChunk(chunk)
- fmt.Printf("=====>get %d chunk:%v\n", idx, chunk)
- if _, err := fmt.Fprintf(httpWriter, "%v", chunk); err != nil {
- fmt.Printf("Error writing to client:%v \n", err)
- break
- }
- if content, ok := acc.JustFinishedContent(); ok {
- println("Content stream finished:", content)
- }
- // if using tool calls
- if tool, ok := acc.JustFinishedToolCall(); ok {
- println("Tool call stream finished:", tool.Index, tool.Name, tool.Arguments)
- }
- if refusal, ok := acc.JustFinishedRefusal(); ok {
- println("Refusal stream finished:", refusal)
- }
- // it's best to use chunks after handling JustFinished events
- if len(chunk.Choices) > 0 {
- idx++
- fmt.Printf("idx:%d get =>'%s'\n", idx, chunk.Choices[0].Delta.Content)
- }
- }
- if err := chatStream.Err(); err != nil {
- return nil, err
- }
- return nil, nil
- }
|