func.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package compapi
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "wechat-api/internal/types"
  9. "wechat-api/internal/utils/contextkey"
  10. openai "github.com/openai/openai-go"
  11. "github.com/openai/openai-go/option"
  12. "github.com/openai/openai-go/packages/ssestream"
  13. "github.com/zeromicro/go-zero/rest/httpx"
  14. )
  15. func NewFastgptClient(apiKey string) *openai.Client {
  16. return openai.NewClient(option.WithAPIKey(apiKey),
  17. option.WithBaseURL("http://fastgpt.ascrm.cn/api/v1/"))
  18. }
  19. func NewDeepSeekClient(apiKey string) *openai.Client {
  20. return openai.NewClient(option.WithAPIKey(apiKey),
  21. option.WithBaseURL("https://api.deepseek.com"))
  22. }
  23. func DoChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  24. var (
  25. jsonBytes []byte
  26. err error
  27. )
  28. emptyParams := openai.ChatCompletionNewParams{}
  29. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  30. return nil, err
  31. }
  32. customResp := types.CompOpenApiResp{}
  33. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  34. respBodyOps := option.WithResponseBodyInto(&customResp)
  35. if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps); err != nil {
  36. return nil, err
  37. }
  38. return &customResp, nil
  39. }
  40. func DoChatCompletionsStream(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
  41. var (
  42. jsonBytes []byte
  43. raw *http.Response
  44. //raw []byte
  45. ok bool
  46. hw http.ResponseWriter
  47. )
  48. hw, ok = contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter
  49. if !ok {
  50. return nil, errors.New("content get http writer err")
  51. }
  52. flusher, ok := (hw).(http.Flusher)
  53. if !ok {
  54. http.Error(hw, "Streaming unsupported!", http.StatusInternalServerError)
  55. }
  56. emptyParams := openai.ChatCompletionNewParams{}
  57. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  58. return nil, err
  59. }
  60. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  61. respBodyOps := option.WithResponseBodyInto(&raw)
  62. if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps, option.WithJSONSet("stream", true)); err != nil {
  63. return nil, err
  64. }
  65. //设置流式输出头 http1.1
  66. hw.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  67. hw.Header().Set("Connection", "keep-alive")
  68. hw.Header().Set("Cache-Control", "no-cache")
  69. chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(raw), err)
  70. defer chatStream.Close()
  71. for chatStream.Next() {
  72. chunk := chatStream.Current()
  73. fmt.Fprintf(hw, "event:%s\ndata:%s\n\n", chunk.Event, chunk.Data.RAW)
  74. //time.Sleep(1 * time.Millisecond)
  75. }
  76. fmt.Fprintf(hw, "event:%s\ndata:%s\n\n", "answer", "[DONE]")
  77. flusher.Flush()
  78. httpx.Ok(hw)
  79. return nil, nil
  80. }
  81. func NewChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  82. if chatInfo.Stream {
  83. return DoChatCompletionsStream(ctx, client, chatInfo)
  84. } else {
  85. return DoChatCompletions(ctx, client, chatInfo)
  86. }
  87. }
  88. func NewFastgptChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  89. client := NewFastgptClient(apiKey)
  90. return NewChatCompletions(ctx, client, chatInfo)
  91. }
  92. func NewDeepSeekChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq, chatModel openai.ChatModel) (res *types.CompOpenApiResp, err error) {
  93. client := NewDeepSeekClient(apiKey)
  94. if chatModel != ChatModelDeepSeekV3 {
  95. chatModel = ChatModelDeepSeekR1
  96. }
  97. chatInfo.Model = chatModel
  98. return NewChatCompletions(ctx, client, chatInfo)
  99. }
  100. func DoChatCompletionsStreamOld(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
  101. var (
  102. jsonBytes []byte
  103. )
  104. emptyParams := openai.ChatCompletionNewParams{}
  105. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  106. return nil, err
  107. }
  108. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  109. //customResp := types.CompOpenApiResp{}
  110. //respBodyOps := option.WithResponseBodyInto(&customResp)
  111. //chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps, respBodyOps)
  112. chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps)
  113. // optionally, an accumulator helper can be used
  114. acc := openai.ChatCompletionAccumulator{}
  115. httpWriter, ok := ctx.Value("HttpResp-Writer").(http.ResponseWriter)
  116. if !ok {
  117. return nil, errors.New("content get writer err")
  118. }
  119. //httpWriter.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  120. //httpWriter.Header().Set("Connection", "keep-alive")
  121. //httpWriter.Header().Set("Cache-Control", "no-cache")
  122. idx := 0
  123. for chatStream.Next() {
  124. chunk := chatStream.Current()
  125. acc.AddChunk(chunk)
  126. fmt.Printf("=====>get %d chunk:%v\n", idx, chunk)
  127. if _, err := fmt.Fprintf(httpWriter, "%v", chunk); err != nil {
  128. fmt.Printf("Error writing to client:%v \n", err)
  129. break
  130. }
  131. if content, ok := acc.JustFinishedContent(); ok {
  132. println("Content stream finished:", content)
  133. }
  134. // if using tool calls
  135. if tool, ok := acc.JustFinishedToolCall(); ok {
  136. println("Tool call stream finished:", tool.Index, tool.Name, tool.Arguments)
  137. }
  138. if refusal, ok := acc.JustFinishedRefusal(); ok {
  139. println("Refusal stream finished:", refusal)
  140. }
  141. // it's best to use chunks after handling JustFinished events
  142. if len(chunk.Choices) > 0 {
  143. idx++
  144. fmt.Printf("idx:%d get =>'%s'\n", idx, chunk.Choices[0].Delta.Content)
  145. }
  146. }
  147. if err := chatStream.Err(); err != nil {
  148. return nil, err
  149. }
  150. return nil, nil
  151. }