func.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package compapi
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "wechat-api/internal/types"
  9. "wechat-api/internal/utils/contextkey"
  10. openai "github.com/openai/openai-go"
  11. "github.com/openai/openai-go/option"
  12. "github.com/openai/openai-go/packages/ssestream"
  13. "github.com/zeromicro/go-zero/rest/httpx"
  14. )
  15. func NewAiClient(apiKey string, apiBase string) *openai.Client {
  16. opts := []option.RequestOption{}
  17. if len(apiKey) > 0 {
  18. opts = append(opts, option.WithAPIKey(apiKey))
  19. }
  20. opts = append(opts, option.WithBaseURL(apiBase))
  21. return openai.NewClient(opts...)
  22. }
  23. func NewFastgptClient(apiKey string) *openai.Client {
  24. //http://fastgpt.ascrm.cn/api/v1/
  25. return openai.NewClient(option.WithAPIKey(apiKey),
  26. option.WithBaseURL("http://fastgpt.ascrm.cn/api/v1/"))
  27. }
  28. func NewDeepSeekClient(apiKey string) *openai.Client {
  29. return openai.NewClient(option.WithAPIKey(apiKey),
  30. option.WithBaseURL("https://api.deepseek.com"))
  31. }
  32. func DoChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  33. var (
  34. jsonBytes []byte
  35. err error
  36. )
  37. emptyParams := openai.ChatCompletionNewParams{}
  38. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  39. return nil, err
  40. }
  41. //fmt.Printf("In DoChatCompletions, req: '%s'\n", string(jsonBytes))
  42. //也许应该对请求体不规范成员名进行检查
  43. customResp := types.CompOpenApiResp{}
  44. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  45. respBodyOps := option.WithResponseBodyInto(&customResp)
  46. if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps); err != nil {
  47. return nil, err
  48. }
  49. if customResp.FgtErrCode != nil && customResp.FgtErrStatusTxt != nil { //针对fastgpt出错但New()不返回错误的情况
  50. return nil, fmt.Errorf("%s(%d)", *customResp.FgtErrStatusTxt, *customResp.FgtErrCode)
  51. }
  52. return &customResp, nil
  53. }
  54. func DoChatCompletionsStream(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
  55. var (
  56. jsonBytes []byte
  57. raw *http.Response
  58. //raw []byte
  59. ok bool
  60. hw http.ResponseWriter
  61. )
  62. hw, ok = contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter
  63. if !ok {
  64. return nil, errors.New("content get http writer err")
  65. }
  66. flusher, ok := (hw).(http.Flusher)
  67. if !ok {
  68. http.Error(hw, "Streaming unsupported!", http.StatusInternalServerError)
  69. }
  70. emptyParams := openai.ChatCompletionNewParams{}
  71. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  72. return nil, err
  73. }
  74. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  75. respBodyOps := option.WithResponseBodyInto(&raw)
  76. if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps, option.WithJSONSet("stream", true)); err != nil {
  77. return nil, err
  78. }
  79. //设置流式输出头 http1.1
  80. hw.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  81. hw.Header().Set("Connection", "keep-alive")
  82. hw.Header().Set("Cache-Control", "no-cache")
  83. chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(raw), err)
  84. defer chatStream.Close()
  85. for chatStream.Next() {
  86. chunk := chatStream.Current()
  87. fmt.Fprintf(hw, "data:%s\n\n", chunk.Data.RAW)
  88. flusher.Flush()
  89. //time.Sleep(1 * time.Millisecond)
  90. }
  91. fmt.Fprintf(hw, "data:%s\n\n", "[DONE]")
  92. flusher.Flush()
  93. httpx.Ok(hw)
  94. return nil, nil
  95. }
  96. func NewChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  97. if chatInfo.Stream {
  98. return DoChatCompletionsStream(ctx, client, chatInfo)
  99. } else {
  100. return DoChatCompletions(ctx, client, chatInfo)
  101. }
  102. }
  103. func NewFastgptChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  104. client := NewAiClient(apiKey, apiBase)
  105. return NewChatCompletions(ctx, client, chatInfo)
  106. }
  107. func NewDeepSeekChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq, chatModel openai.ChatModel) (res *types.CompOpenApiResp, err error) {
  108. client := NewDeepSeekClient(apiKey)
  109. if chatModel != ChatModelDeepSeekV3 {
  110. chatModel = ChatModelDeepSeekR1
  111. }
  112. chatInfo.Model = chatModel
  113. return NewChatCompletions(ctx, client, chatInfo)
  114. }
  115. func DoChatCompletionsStreamOld(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
  116. var (
  117. jsonBytes []byte
  118. )
  119. emptyParams := openai.ChatCompletionNewParams{}
  120. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  121. return nil, err
  122. }
  123. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  124. //customResp := types.CompOpenApiResp{}
  125. //respBodyOps := option.WithResponseBodyInto(&customResp)
  126. //chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps, respBodyOps)
  127. chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps)
  128. // optionally, an accumulator helper can be used
  129. acc := openai.ChatCompletionAccumulator{}
  130. httpWriter, ok := ctx.Value("HttpResp-Writer").(http.ResponseWriter)
  131. if !ok {
  132. return nil, errors.New("content get writer err")
  133. }
  134. //httpWriter.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  135. //httpWriter.Header().Set("Connection", "keep-alive")
  136. //httpWriter.Header().Set("Cache-Control", "no-cache")
  137. idx := 0
  138. for chatStream.Next() {
  139. chunk := chatStream.Current()
  140. acc.AddChunk(chunk)
  141. fmt.Printf("=====>get %d chunk:%v\n", idx, chunk)
  142. if _, err := fmt.Fprintf(httpWriter, "%v", chunk); err != nil {
  143. fmt.Printf("Error writing to client:%v \n", err)
  144. break
  145. }
  146. if content, ok := acc.JustFinishedContent(); ok {
  147. println("Content stream finished:", content)
  148. }
  149. // if using tool calls
  150. if tool, ok := acc.JustFinishedToolCall(); ok {
  151. println("Tool call stream finished:", tool.Index, tool.Name, tool.Arguments)
  152. }
  153. if refusal, ok := acc.JustFinishedRefusal(); ok {
  154. println("Refusal stream finished:", refusal)
  155. }
  156. // it's best to use chunks after handling JustFinished events
  157. if len(chunk.Choices) > 0 {
  158. idx++
  159. fmt.Printf("idx:%d get =>'%s'\n", idx, chunk.Choices[0].Delta.Content)
  160. }
  161. }
  162. if err := chatStream.Err(); err != nil {
  163. return nil, err
  164. }
  165. return nil, nil
  166. }
  167. func GetWorkInfoByID(eventType string, workId string) (string, uint) {
  168. val, exist := fastgptWorkIdMap[workId]
  169. if !exist {
  170. val = fastgptWorkIdMap["default"]
  171. }
  172. return val.Id, val.Idx
  173. }
  174. // 获取workToken
  175. func GetWorkTokenByID(eventType string, workId string) string {
  176. id, _ := GetWorkInfoByID(eventType, workId)
  177. return id
  178. }
  179. // 获取workIdx
  180. func GetWorkIdxByID(eventType string, workId string) uint {
  181. _, idx := GetWorkInfoByID(eventType, workId)
  182. return idx
  183. }