func.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package compapi
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "wechat-api/internal/types"
  10. "wechat-api/internal/utils/contextkey"
  11. openai "github.com/openai/openai-go"
  12. "github.com/openai/openai-go/option"
  13. "github.com/openai/openai-go/packages/ssestream"
  14. "github.com/zeromicro/go-zero/rest/httpx"
  15. )
  16. func IsOpenaiModel(model string) bool {
  17. prefixes := []string{"gpt-4", "gpt-3", "o1", "o3"}
  18. // 遍历所有前缀进行检查
  19. for _, prefix := range prefixes {
  20. if strings.HasPrefix(model, prefix) {
  21. return true
  22. }
  23. }
  24. return false
  25. }
  26. type StdChatClient struct {
  27. *openai.Client
  28. }
  29. func NewStdChatClient(apiKey string, apiBase string) *StdChatClient {
  30. opts := []option.RequestOption{}
  31. if len(apiKey) > 0 {
  32. opts = append(opts, option.WithAPIKey(apiKey))
  33. }
  34. opts = append(opts, option.WithBaseURL(apiBase))
  35. client := openai.NewClient(opts...)
  36. return &StdChatClient{&client}
  37. }
  38. func NewAiClient(apiKey string, apiBase string) *openai.Client {
  39. opts := []option.RequestOption{}
  40. if len(apiKey) > 0 {
  41. opts = append(opts, option.WithAPIKey(apiKey))
  42. }
  43. opts = append(opts, option.WithBaseURL(apiBase))
  44. client := openai.NewClient(opts...)
  45. return &client
  46. }
  47. func NewFastgptClient(apiKey string) *openai.Client {
  48. //http://fastgpt.ascrm.cn/api/v1/
  49. client := openai.NewClient(option.WithAPIKey(apiKey),
  50. option.WithBaseURL("http://fastgpt.ascrm.cn/api/v1/"))
  51. return &client
  52. }
  53. func NewDeepSeekClient(apiKey string) *openai.Client {
  54. client := openai.NewClient(option.WithAPIKey(apiKey),
  55. option.WithBaseURL("https://api.deepseek.com"))
  56. return &client
  57. }
  58. func DoChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  59. var (
  60. jsonBytes []byte
  61. err error
  62. )
  63. emptyParams := openai.ChatCompletionNewParams{}
  64. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  65. return nil, err
  66. }
  67. //fmt.Printf("In DoChatCompletions, req: '%s'\n", string(jsonBytes))
  68. //也许应该对请求体不规范成员名进行检查
  69. customResp := types.CompOpenApiResp{}
  70. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  71. respBodyOps := option.WithResponseBodyInto(&customResp)
  72. if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps); err != nil {
  73. return nil, err
  74. }
  75. if customResp.FgtErrCode != nil && customResp.FgtErrStatusTxt != nil { //针对fastgpt出错但New()不返回错误的情况
  76. return nil, fmt.Errorf("%s(%d)", *customResp.FgtErrStatusTxt, *customResp.FgtErrCode)
  77. }
  78. return &customResp, nil
  79. }
  80. func DoChatCompletionsStream(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
  81. var (
  82. jsonBytes []byte
  83. raw *http.Response
  84. //raw []byte
  85. ok bool
  86. hw http.ResponseWriter
  87. )
  88. hw, ok = contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter
  89. if !ok {
  90. return nil, errors.New("content get http writer err")
  91. }
  92. flusher, ok := (hw).(http.Flusher)
  93. if !ok {
  94. http.Error(hw, "Streaming unsupported!", http.StatusInternalServerError)
  95. }
  96. emptyParams := openai.ChatCompletionNewParams{}
  97. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  98. return nil, err
  99. }
  100. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  101. respBodyOps := option.WithResponseBodyInto(&raw)
  102. if _, err = client.Chat.Completions.New(ctx, emptyParams, reqBodyOps, respBodyOps, option.WithJSONSet("stream", true)); err != nil {
  103. return nil, err
  104. }
  105. //设置流式输出头 http1.1
  106. hw.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  107. hw.Header().Set("Connection", "keep-alive")
  108. hw.Header().Set("Cache-Control", "no-cache")
  109. chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(raw), err)
  110. defer chatStream.Close()
  111. for chatStream.Next() {
  112. chunk := chatStream.Current()
  113. fmt.Fprintf(hw, "data:%s\n\n", chunk.Data.RAW)
  114. flusher.Flush()
  115. //time.Sleep(1 * time.Millisecond)
  116. }
  117. fmt.Fprintf(hw, "data:%s\n\n", "[DONE]")
  118. flusher.Flush()
  119. httpx.Ok(hw)
  120. return nil, nil
  121. }
  122. func NewChatCompletions(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  123. if chatInfo.Stream {
  124. return DoChatCompletionsStream(ctx, client, chatInfo)
  125. } else {
  126. return DoChatCompletions(ctx, client, chatInfo)
  127. }
  128. }
  129. func NewMismatchChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  130. client := NewAiClient(apiKey, apiBase)
  131. return NewChatCompletions(ctx, client, chatInfo)
  132. }
  133. func NewFastgptChatCompletions(ctx context.Context, apiKey string, apiBase string, chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) {
  134. client := NewAiClient(apiKey, apiBase)
  135. return NewChatCompletions(ctx, client, chatInfo)
  136. }
  137. func NewDeepSeekChatCompletions(ctx context.Context, apiKey string, chatInfo *types.CompApiReq, chatModel openai.ChatModel) (res *types.CompOpenApiResp, err error) {
  138. client := NewDeepSeekClient(apiKey)
  139. if chatModel != ChatModelDeepSeekV3 {
  140. chatModel = ChatModelDeepSeekR1
  141. }
  142. chatInfo.Model = chatModel
  143. return NewChatCompletions(ctx, client, chatInfo)
  144. }
  145. func DoChatCompletionsStreamOld(ctx context.Context, client *openai.Client, chatInfo *types.CompApiReq) (res *types.CompOpenApiResp, err error) {
  146. var (
  147. jsonBytes []byte
  148. )
  149. emptyParams := openai.ChatCompletionNewParams{}
  150. if jsonBytes, err = json.Marshal(chatInfo); err != nil {
  151. return nil, err
  152. }
  153. reqBodyOps := option.WithRequestBody("application/json", jsonBytes)
  154. //customResp := types.CompOpenApiResp{}
  155. //respBodyOps := option.WithResponseBodyInto(&customResp)
  156. //chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps, respBodyOps)
  157. chatStream := client.Chat.Completions.NewStreaming(ctx, emptyParams, reqBodyOps)
  158. // optionally, an accumulator helper can be used
  159. acc := openai.ChatCompletionAccumulator{}
  160. httpWriter, ok := ctx.Value("HttpResp-Writer").(http.ResponseWriter)
  161. if !ok {
  162. return nil, errors.New("content get writer err")
  163. }
  164. //httpWriter.Header().Set("Content-Type", "text/event-stream;charset=utf-8")
  165. //httpWriter.Header().Set("Connection", "keep-alive")
  166. //httpWriter.Header().Set("Cache-Control", "no-cache")
  167. idx := 0
  168. for chatStream.Next() {
  169. chunk := chatStream.Current()
  170. acc.AddChunk(chunk)
  171. fmt.Printf("=====>get %d chunk:%v\n", idx, chunk)
  172. if _, err := fmt.Fprintf(httpWriter, "%v", chunk); err != nil {
  173. fmt.Printf("Error writing to client:%v \n", err)
  174. break
  175. }
  176. if content, ok := acc.JustFinishedContent(); ok {
  177. println("Content stream finished:", content)
  178. }
  179. // if using tool calls
  180. if tool, ok := acc.JustFinishedToolCall(); ok {
  181. println("Tool call stream finished:", tool.Index, tool.Name, tool.Arguments)
  182. }
  183. if refusal, ok := acc.JustFinishedRefusal(); ok {
  184. println("Refusal stream finished:", refusal)
  185. }
  186. // it's best to use chunks after handling JustFinished events
  187. if len(chunk.Choices) > 0 {
  188. idx++
  189. fmt.Printf("idx:%d get =>'%s'\n", idx, chunk.Choices[0].Delta.Content)
  190. }
  191. }
  192. if err := chatStream.Err(); err != nil {
  193. return nil, err
  194. }
  195. return nil, nil
  196. }
  197. func GetWorkInfoByID(eventType string, workId string) (string, uint) {
  198. val, exist := fastgptWorkIdMap[workId]
  199. if !exist {
  200. val = fastgptWorkIdMap["default"]
  201. }
  202. return val.Id, val.Idx
  203. }
  204. // 获取workToken
  205. func GetWorkTokenByID(eventType string, workId string) string {
  206. id, _ := GetWorkInfoByID(eventType, workId)
  207. return id
  208. }
  209. // 获取workIdx
  210. func GetWorkIdxByID(eventType string, workId string) uint {
  211. _, idx := GetWorkInfoByID(eventType, workId)
  212. return idx
  213. }