compapi_callback.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. package crontask
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "hash/fnv"
  8. "runtime"
  9. "strconv"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "wechat-api/ent"
  14. "wechat-api/ent/compapiasynctask"
  15. "wechat-api/ent/predicate"
  16. "wechat-api/internal/types"
  17. "wechat-api/internal/utils/compapi"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. )
  20. const (
  21. Task_Ready = 10 //任务就绪
  22. ReqApi_Done = 20 //请求API完成
  23. Callback_Done = 30 //请求回调完成
  24. All_Done = Callback_Done //全部完成 (暂时将成功状态的终点标注于此)
  25. Task_Suspend = 60 //任务暂停
  26. Task_Fail = 70 //任务失败
  27. MaxLoadTask = 1000 //一次最大获取任务数量
  28. LoopTryCount = 3 //循环体内重试次数
  29. ErrTaskTryCount = 3 //最大允许错误任务重试次数
  30. DefaultDisId = "DIS0002"
  31. )
  32. type Task struct {
  33. Data *ent.CompapiAsynctask
  34. Idx int
  35. Code int
  36. }
  37. // 带会话管理的任务通道组
  38. type TaskDispatcher struct {
  39. mu sync.Mutex
  40. workerChs []chan Task // 每个worker独立通道
  41. }
  42. func NewTaskDispatcher(channelCount int) *TaskDispatcher {
  43. td := &TaskDispatcher{
  44. workerChs: make([]chan Task, channelCount),
  45. }
  46. // 初始化worker通道
  47. for i := range td.workerChs {
  48. td.workerChs[i] = make(chan Task, MaxLoadTask/channelCount+1) // 每个worker带缓冲
  49. //fmt.Printf("make worker chan:%d\n", i+1)
  50. }
  51. return td
  52. }
  53. // 按woker下标索引选择workerChs
  54. func (td *TaskDispatcher) getWorkerChanByIdx(idx int) (int, chan Task) {
  55. nidx := idx % len(td.workerChs)
  56. return nidx, td.workerChs[nidx]
  57. }
  58. // 按哈希分片选择workerChs
  59. func (td *TaskDispatcher) getWorkerChanByHash(disID string) (int, chan Task) {
  60. if len(disID) == 0 {
  61. disID = DefaultDisId
  62. }
  63. idx := 0
  64. if len(td.workerChs) > 1 {
  65. h := fnv.New32a()
  66. h.Write([]byte(disID))
  67. idx = int(h.Sum32()) % len(td.workerChs)
  68. }
  69. outStr := fmt.Sprintf("getWorkerChannel by disId Hash:'%s',from workerChs{", disID)
  70. for i := range len(td.workerChs) {
  71. outStr += fmt.Sprintf("#%d", i+1)
  72. if i < len(td.workerChs)-1 {
  73. outStr += ","
  74. }
  75. }
  76. outStr += fmt.Sprintf("} choice chs:'#%d' by '%s'", idx+1, disID)
  77. logx.Debug(outStr)
  78. return idx, td.workerChs[idx]
  79. }
  80. // 分配任务到对应的消费channel
  81. func (td *TaskDispatcher) Dispatch(task Task) {
  82. td.mu.Lock()
  83. defer td.mu.Unlock()
  84. // 根据chatId哈希获得其对应的workerChan
  85. workerChIdx, workerCh := td.getWorkerChanByHash(task.Data.EventType)
  86. // 将任务送入该chatid的workerChan
  87. workerCh <- task
  88. logx.Debugf("Producer:EventType:[%s] Task Push to WorkerChan:#%d", task.Data.EventType, workerChIdx+1)
  89. }
  90. func getGoroutineId() (int64, error) {
  91. // 堆栈结果中需要消除的前缀符
  92. var goroutineSpace = []byte("goroutine ")
  93. bs := make([]byte, 128)
  94. bs = bs[:runtime.Stack(bs, false)]
  95. bs = bytes.TrimPrefix(bs, goroutineSpace)
  96. i := bytes.IndexByte(bs, ' ')
  97. if i < 0 {
  98. return -1, errors.New("get current goroutine id failed")
  99. }
  100. return strconv.ParseInt(string(bs[:i]), 10, 64)
  101. }
  102. func (l *CronTask) CliTest(MaxWorker int, MaxChannel int) {
  103. l.compApiCallback(MaxWorker, MaxChannel)
  104. }
  105. func (l *CronTask) compApiCallback(MaxWorker int, MaxChannel int) {
  106. var (
  107. wg sync.WaitGroup
  108. produced int64 //生产数量(原子计数器)
  109. consumed int64 //消费数量(原子计数器)
  110. )
  111. //创建任务分发器
  112. if MaxWorker <= 0 {
  113. MaxWorker = 2
  114. }
  115. if MaxChannel <= 0 || MaxChannel > MaxWorker {
  116. MaxChannel = MaxWorker
  117. }
  118. dispatcher := NewTaskDispatcher(MaxChannel)
  119. //启动消费者
  120. for widx := range MaxWorker {
  121. cidx, ch := dispatcher.getWorkerChanByIdx(widx)
  122. wg.Add(1)
  123. go func(workerID int, channelID int, taskCh chan Task) {
  124. defer wg.Done()
  125. gid, _ := getGoroutineId()
  126. logx.Infof("Consumer %d(Goroutine:%d) bind WorkerChan:#%d start......\n",
  127. workerID, gid, channelID)
  128. for task := range taskCh {
  129. l.processTask(workerID, task)
  130. atomic.AddInt64(&consumed, 1)
  131. }
  132. }(widx+1, cidx+1, ch)
  133. }
  134. // 生产者
  135. wg.Add(1)
  136. go func() {
  137. defer wg.Done()
  138. gid, _ := getGoroutineId()
  139. logx.Infof("Producer 1(Goroutine:%d) start......\n", gid)
  140. //获得待处理异步任务列表
  141. tasks, err := l.getAsyncReqTaskList(MaxLoadTask)
  142. if err != nil {
  143. logx.Errorf("getAsyncReqTaskList err:%s", err)
  144. return
  145. }
  146. // 分发任务
  147. for _, task := range tasks {
  148. dispatcher.Dispatch(task)
  149. atomic.AddInt64(&produced, 1)
  150. }
  151. logx.Infof("📦Producer 1 此批次共创建任务%d件", len(tasks))
  152. // 关闭所有会话通道
  153. dispatcher.mu.Lock()
  154. for _, ch := range dispatcher.workerChs {
  155. _ = ch
  156. close(ch)
  157. }
  158. dispatcher.mu.Unlock()
  159. }()
  160. wg.Wait()
  161. if atomic.LoadInt64(&produced) > 0 {
  162. logx.Infof("🏁本次任务完成度统计: Producer:1,Consumer:%d (%d/%d)*100=%d%%", MaxWorker, atomic.LoadInt64(&consumed), atomic.LoadInt64(&produced),
  163. (atomic.LoadInt64(&consumed)/atomic.LoadInt64(&produced))*100)
  164. }
  165. }
  166. func (l *CronTask) getAsyncReqTaskList(loadCount int) ([]Task, error) {
  167. var predicates []predicate.CompapiAsynctask
  168. predicates = append(predicates, compapiasynctask.TaskStatusLT(All_Done))
  169. var tasks []Task
  170. res, err := l.svcCtx.DB.CompapiAsynctask.Query().Where(predicates...).
  171. Order(ent.Asc(compapiasynctask.FieldID)).
  172. Limit(loadCount).
  173. All(l.ctx)
  174. if err == nil {
  175. for idx, val := range res {
  176. tasks = append(tasks, Task{Data: val, Idx: idx})
  177. }
  178. }
  179. return tasks, err
  180. }
  181. func (l *CronTask) processTask(workerID int, task Task) {
  182. /*
  183. fmt.Printf("In processTask,Consumer(%d) dealing Task Detail: User(%s/%s/%s) Async Call %s(%s) on Status:%d\n",
  184. workerID, task.Data.EventType, task.Data.ChatID, task.Data.AuthToken,
  185. task.Data.OpenaiBase, task.Data.OpenaiKey, task.Data.TaskStatus)*/
  186. _ = workerID
  187. var err error
  188. rt := 0
  189. for {
  190. if task.Data.TaskStatus >= All_Done {
  191. break
  192. }
  193. switch task.Data.TaskStatus {
  194. case Task_Ready:
  195. //请求API平台
  196. // succ: taskStatus Task_Ready => ReqApi_Done
  197. // fail: taskStatus保持当前不变或Task_Fail
  198. rt, err = l.requestAPI(task.Data)
  199. case ReqApi_Done:
  200. //结果回调
  201. // succ: taskStatus ReqApi_Done => Callback_Done(All_Done)
  202. // fail: taskStatus保持当前不变或Task_Fail
  203. rt, err = l.requestCallback(task.Data)
  204. }
  205. if err != nil {
  206. //收集错误
  207. if rt == 0 {
  208. //不可恢复错误处理....
  209. logx.Errorf("Task error by '%s'", err)
  210. } else {
  211. logx.Debugf("Task ignore by '%s'", err)
  212. }
  213. return //先暂时忽略处理,也许应按错误类型分别对待
  214. }
  215. }
  216. }
  217. func (l *CronTask) requestCallback(taskData *ent.CompapiAsynctask) (int, error) {
  218. workerId, _ := getGoroutineId()
  219. logx.Debugf("Worker:%d INTO requestCallback for task status:%d", workerId, taskData.TaskStatus)
  220. if needStop, _ := l.checkErrRetry(taskData); needStop { //重试次数检测,如果超过直接标为永久失败而不再处理
  221. return 1, errors.New("too many err retry")
  222. }
  223. if taskData.TaskStatus != ReqApi_Done {
  224. return 0, fmt.Errorf("invalid task run order for status:%d", taskData.TaskStatus)
  225. }
  226. if len(taskData.CallbackURL) == 0 {
  227. return 0, errors.New("callback url empty")
  228. }
  229. if len(taskData.ResponseRaw) == 0 {
  230. return 0, errors.New("call api response empty")
  231. }
  232. /*
  233. fstr := "mytest-svc:"
  234. if taskData.RetryCount >= 0 && strings.Contains(taskData.CallbackURL, fstr) {
  235. taskData.CallbackURL = strings.Replace(taskData.CallbackURL, fstr, "0.0.0.0:", 1)
  236. }
  237. */
  238. //先开启事务更新任务状态 => Callback_Done(回调完成)
  239. tx, err := l.updateTaskStatusByTx(taskData.ID, Callback_Done)
  240. if err != nil {
  241. return 0, err
  242. }
  243. //请求预先给定的callback_url
  244. var res map[string]any
  245. //初始化client
  246. client := compapi.NewClient(l.ctx)
  247. for i := range LoopTryCount { //重试机制
  248. res, err = client.Callback(taskData.EventType, taskData.CallbackURL, taskData)
  249. //_, err = client.Chat.Completions.New(l.ctx, emptyParams, opts...)
  250. if err == nil {
  251. //call succ
  252. //fmt.Println("callback succ..........")
  253. //fmt.Println(typekit.PrettyPrint(res))
  254. logx.Infof("callback:'%s' succ", taskData.CallbackURL)
  255. break
  256. }
  257. logx.Infof("Worker:%d call '%s' fail: '%s',sleep %d Second for next(%d/%d/%d)", workerId,
  258. taskData.CallbackURL, err, 1+i*5, i+1, LoopTryCount, taskData.RetryCount)
  259. time.Sleep(time.Duration(1+i*5) * time.Second)
  260. }
  261. //多次循环之后依然失败,进入错误任务处理环节
  262. if err != nil {
  263. _ = tx.Rollback() //回滚之前更新状态
  264. //fmt.Printf("Worker:%d client.Chat.Completions.New Fail,Rollback......\n", workerId)
  265. err1 := l.dealErrorTask(taskData, err) //错误任务处理
  266. et := 1
  267. if err1 != nil {
  268. et = 0
  269. }
  270. return et, err
  271. }
  272. //成功后处理环节
  273. err = tx.Commit() //事务提交
  274. if err != nil {
  275. return 0, err
  276. }
  277. //更新taskData.CallbackResponseRaw
  278. l.updateCallbackResponse(taskData.ID, res)
  279. taskData.TaskStatus = Callback_Done //状态迁移
  280. return 1, nil
  281. }
  282. func (l *CronTask) requestAPI(taskData *ent.CompapiAsynctask) (int, error) {
  283. workerId, _ := getGoroutineId()
  284. logx.Debugf("Worker:%d INTO requestAPI for task status:%d", workerId, taskData.TaskStatus)
  285. if needStop, _ := l.checkErrRetry(taskData); needStop { //重试次数检测,如果超过直接标为永久失败而不再处理
  286. return 1, errors.New("too many err retry")
  287. }
  288. if taskData.TaskStatus != Task_Ready {
  289. return 0, fmt.Errorf("invalid task run order for status:%d", taskData.TaskStatus)
  290. }
  291. var (
  292. err error
  293. apiResp *types.CompOpenApiResp
  294. tx *ent.Tx
  295. )
  296. req := types.CompApiReq{}
  297. if err = json.Unmarshal([]byte(taskData.RequestRaw), &req); err != nil {
  298. return 0, err
  299. }
  300. //先开启事务更新任务状态 => ReqApi_Done(请求API完成)
  301. tx, err = l.updateTaskStatusByTx(taskData.ID, ReqApi_Done)
  302. if err != nil {
  303. return 0, err
  304. }
  305. //初始化client
  306. client := compapi.NewClient(l.ctx, compapi.WithApiBase(taskData.OpenaiBase),
  307. compapi.WithApiKey(taskData.OpenaiKey))
  308. for i := range LoopTryCount { //重试机制
  309. apiResp, err = client.Chat(&req)
  310. if err == nil && apiResp != nil && len(apiResp.Choices) > 0 {
  311. //call succ
  312. break
  313. } else if apiResp != nil && len(apiResp.Choices) == 0 {
  314. err = errors.New("返回结果缺失,请检查访问权限")
  315. }
  316. logx.Infof("Worker:%d call '%s' fail: '%s',sleep %d Second for next(%d/%d/%d)", workerId,
  317. taskData.CallbackURL, err, 1+i*5, i+1, LoopTryCount, taskData.RetryCount)
  318. time.Sleep(time.Duration(1+i*5) * time.Second)
  319. }
  320. //多次循环之后依然失败,进入错误任务处理环节
  321. if err != nil || apiResp == nil {
  322. if apiResp == nil && err == nil {
  323. err = errors.New("resp is null")
  324. }
  325. _ = tx.Rollback() //回滚之前更新状态
  326. err1 := l.dealErrorTask(taskData, err) //错误任务处理
  327. et := 1
  328. if err1 != nil {
  329. et = 0
  330. }
  331. return et, err
  332. }
  333. //成功后处理环节
  334. //更新taskData.ResponseRaw
  335. taskData.ResponseRaw, err = (*apiResp).ToString()
  336. if err != nil {
  337. _ = tx.Rollback() //回滚之前更新状态
  338. return 0, err
  339. }
  340. err = l.updateApiResponseByTx(tx, taskData.ID, taskData.ResponseRaw)
  341. if err != nil {
  342. return 0, err
  343. }
  344. err = tx.Commit() //事务提交
  345. //fmt.Printf("Worker:%d requestAPI事务提交\n", workerId)
  346. if err != nil {
  347. return 0, err
  348. }
  349. taskData.TaskStatus = ReqApi_Done //状态迁移
  350. return 1, nil
  351. }
  352. // 更新任务状态事务版
  353. func (l *CronTask) updateTaskStatusByTx(Id uint64, status int) (*ent.Tx, error) {
  354. //开启Mysql事务
  355. tx, _ := l.svcCtx.DB.Tx(l.ctx)
  356. _, err := tx.CompapiAsynctask.UpdateOneID(Id).
  357. SetTaskStatus(int8(status)).
  358. SetUpdatedAt(time.Now()).
  359. Save(l.ctx)
  360. if err != nil {
  361. return nil, err
  362. }
  363. return tx, nil
  364. }
  365. // 更新请求大模型后结果事务版
  366. func (l *CronTask) updateApiResponseByTx(tx *ent.Tx, taskId uint64, apiResponse string) error {
  367. _, err := tx.CompapiAsynctask.UpdateOneID(taskId).
  368. SetUpdatedAt(time.Now()).
  369. SetResponseRaw(apiResponse).
  370. SetLastError("").
  371. SetRetryCount(0).
  372. Save(l.ctx)
  373. if err != nil {
  374. _ = tx.Rollback() //回滚之前更新状态
  375. }
  376. return err
  377. }
  378. func (l *CronTask) updateCallbackResponse(taskId uint64, callRes any) error {
  379. callResStr := ""
  380. switch v := callRes.(type) {
  381. case []byte:
  382. callResStr = string(v)
  383. default:
  384. if bs, err := json.Marshal(v); err == nil {
  385. callResStr = string(bs)
  386. } else {
  387. return err
  388. }
  389. }
  390. _, err := l.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskId).
  391. SetUpdatedAt(time.Now()).
  392. SetCallbackResponseRaw(callResStr).
  393. SetLastError("").
  394. SetRetryCount(0).
  395. Save(l.ctx)
  396. return err
  397. }
  398. func (l *CronTask) checkErrRetry(taskData *ent.CompapiAsynctask) (bool, error) {
  399. var err error
  400. var needStop = false
  401. if taskData.RetryCount >= ErrTaskTryCount { //错误任务尝试次数超过约定则将任务状态永久设为失败
  402. _, err = l.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskData.ID).
  403. SetUpdatedAt(time.Now()).
  404. SetTaskStatus(int8(Task_Fail)).
  405. Save(l.ctx)
  406. if err == nil {
  407. needStop = true
  408. taskData.TaskStatus = Task_Fail
  409. }
  410. }
  411. return needStop, err
  412. }
  413. // 错误任务处理
  414. func (l *CronTask) dealErrorTask(taskData *ent.CompapiAsynctask, lasterr error) error {
  415. logx.Debug("多次循环之后依然失败,进入错误任务处理环节")
  416. cauo := l.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskData.ID).
  417. SetUpdatedAt(time.Now())
  418. if taskData.RetryCount >= ErrTaskTryCount { //错误任务尝试次数超过约定则将任务状态永久设为失败
  419. taskData.TaskStatus = Task_Fail
  420. cauo = cauo.SetTaskStatus(int8(Task_Fail))
  421. } else {
  422. cauo = cauo.SetRetryCount(taskData.RetryCount + 1).
  423. SetLastError(lasterr.Error())
  424. }
  425. _, err := cauo.Save(l.ctx)
  426. return err
  427. }