asynctask.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "hash/fnv"
  10. "os"
  11. "os/signal"
  12. "reflect"
  13. "runtime"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. "sync/atomic"
  18. "syscall"
  19. "time"
  20. "wechat-api/ent"
  21. "wechat-api/ent/compapiasynctask"
  22. "wechat-api/ent/predicate"
  23. "wechat-api/internal/svc"
  24. "wechat-api/internal/types"
  25. "wechat-api/internal/utils/compapi"
  26. "github.com/suyuan32/simple-admin-common/config"
  27. "github.com/zeromicro/go-zero/core/conf"
  28. "github.com/zeromicro/go-zero/core/logx"
  29. )
  30. const (
  31. Task_Ready = 10 //任务就绪
  32. ReqApi_Done = 20 //请求API完成
  33. Callback_Done = 30 //请求回调完成
  34. All_Done = Callback_Done //全部完成 (暂时将成功状态的终点标注于此)
  35. Task_Suspend = 60 //任务暂停
  36. Task_Fail = 70 //任务失败
  37. LoopTryCount = 3 //循环体内重试次数
  38. LoopDelayFactor = 3
  39. ErrTaskTryCount = 3 //最大允许错误任务重试次数
  40. DefaultDisId = "DIS0001"
  41. )
  42. type Config struct {
  43. BatchLoadTask uint `json:",default=100"`
  44. MaxWorker uint `json:",default=2"`
  45. MaxChannel uint `json:",default=1"`
  46. Debug bool `json:",default=false"`
  47. DatabaseConf config.DatabaseConf
  48. RedisConf config.RedisConf
  49. }
  50. type TaskStat struct {
  51. }
  52. type AsyncTask struct {
  53. logx.Logger
  54. ctx context.Context
  55. svcCtx *svc.ServiceContext
  56. Conf Config
  57. Stats TaskStat
  58. }
  59. type Task struct {
  60. Data *ent.CompapiAsynctask
  61. Idx int
  62. Code int
  63. }
  64. // 带会话管理的任务通道组
  65. type TaskDispatcher struct {
  66. mu sync.Mutex
  67. workerChs []chan Task // 每个worker独立通道
  68. Debug bool
  69. }
  70. var configFile = flag.String("f", "./etc/asynctask.yaml", "the config file")
  71. func getGoroutineId() (int64, error) {
  72. // 堆栈结果中需要消除的前缀符
  73. var goroutineSpace = []byte("goroutine ")
  74. bs := make([]byte, 128)
  75. bs = bs[:runtime.Stack(bs, false)]
  76. bs = bytes.TrimPrefix(bs, goroutineSpace)
  77. i := bytes.IndexByte(bs, ' ')
  78. if i < 0 {
  79. return -1, errors.New("get current goroutine id failed")
  80. }
  81. return strconv.ParseInt(string(bs[:i]), 10, 64)
  82. }
  83. func NewTaskDispatcher(channelCount uint, chanSize uint, debug bool) *TaskDispatcher {
  84. td := &TaskDispatcher{
  85. workerChs: make([]chan Task, channelCount),
  86. Debug: debug,
  87. }
  88. // 初始化worker通道
  89. for i := range td.workerChs {
  90. td.workerChs[i] = make(chan Task, chanSize+1) // 每个worker带缓冲
  91. }
  92. return td
  93. }
  94. // 按woker下标索引选择workerChs
  95. func (td *TaskDispatcher) getWorkerChanByIdx(idx uint) (uint, chan Task) {
  96. nidx := idx % uint(len(td.workerChs))
  97. return nidx, td.workerChs[nidx]
  98. }
  99. // 按哈希分片选择workerChs
  100. func (td *TaskDispatcher) getWorkerChanByHash(disID string) (int, chan Task) {
  101. if len(disID) == 0 {
  102. disID = DefaultDisId
  103. }
  104. idx := 0
  105. if len(td.workerChs) > 1 {
  106. h := fnv.New32a()
  107. h.Write([]byte(disID))
  108. idx = int(h.Sum32()) % len(td.workerChs)
  109. }
  110. if td.Debug {
  111. outStr := fmt.Sprintf("getWorkerChannel by disId Hash:'%s',from workerChs{", disID)
  112. for i := range len(td.workerChs) {
  113. outStr += fmt.Sprintf("#%d", i+1)
  114. if i < len(td.workerChs)-1 {
  115. outStr += ","
  116. }
  117. }
  118. outStr += fmt.Sprintf("} choice chs:'#%d' by '%s'", idx+1, disID)
  119. logx.Debug(outStr)
  120. }
  121. return idx, td.workerChs[idx]
  122. }
  123. // 分配任务到对应的消费channel
  124. func (td *TaskDispatcher) Dispatch(task Task) {
  125. td.mu.Lock()
  126. defer td.mu.Unlock()
  127. // 根据chatId哈希获得其对应的workerChan
  128. workerChIdx, workerCh := td.getWorkerChanByHash(task.Data.EventType)
  129. // 将任务送入该chatid的workerChan
  130. workerCh <- task
  131. if td.Debug {
  132. logx.Debugf("Producer:EventType:[%s] Task Push to WorkerChan:#%d", task.Data.EventType, workerChIdx+1)
  133. }
  134. }
  135. // 更新任务状态
  136. func (me *AsyncTask) updateTaskStatus(taskId uint64, status int) error {
  137. _, err := me.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskId).
  138. SetTaskStatus(int8(status)).
  139. SetUpdatedAt(time.Now()).
  140. Save(me.ctx)
  141. return err
  142. }
  143. // 更新请求大模型后结果
  144. func (me *AsyncTask) updateApiResponse(taskId uint64, apiResponse string) error {
  145. _, err := me.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskId).
  146. SetUpdatedAt(time.Now()).
  147. SetResponseRaw(apiResponse).
  148. SetLastError("").
  149. SetRetryCount(0).
  150. Save(me.ctx)
  151. return err
  152. }
  153. func (me *AsyncTask) updateCallbackResponse(taskId uint64, callRes any) error {
  154. callResStr := ""
  155. switch v := callRes.(type) {
  156. case []byte:
  157. callResStr = string(v)
  158. default:
  159. if bs, err := json.Marshal(v); err == nil {
  160. callResStr = string(bs)
  161. } else {
  162. return err
  163. }
  164. }
  165. _, err := me.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskId).
  166. SetUpdatedAt(time.Now()).
  167. SetCallbackResponseRaw(callResStr).
  168. SetLastError("").
  169. SetRetryCount(0).
  170. Save(me.ctx)
  171. return err
  172. }
  173. func (me *AsyncTask) checkErrRetry(taskData *ent.CompapiAsynctask) (bool, error) {
  174. var err error
  175. var needStop = false
  176. if taskData.RetryCount >= ErrTaskTryCount { //错误任务尝试次数超过约定则将任务状态永久设为失败
  177. _, err = me.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskData.ID).
  178. SetUpdatedAt(time.Now()).
  179. SetTaskStatus(int8(Task_Fail)).
  180. Save(me.ctx)
  181. if err == nil {
  182. needStop = true
  183. taskData.TaskStatus = Task_Fail
  184. }
  185. }
  186. return needStop, err
  187. }
  188. // 错误任务处理
  189. func (me *AsyncTask) dealErrorTask(taskData *ent.CompapiAsynctask, lasterr error) error {
  190. logx.Debug("多次循环之后依然失败,进入错误任务处理环节")
  191. cauo := me.svcCtx.DB.CompapiAsynctask.UpdateOneID(taskData.ID).
  192. SetUpdatedAt(time.Now())
  193. if taskData.RetryCount >= ErrTaskTryCount { //错误任务尝试次数超过约定则将任务状态永久设为失败
  194. taskData.TaskStatus = Task_Fail
  195. cauo = cauo.SetTaskStatus(int8(Task_Fail))
  196. } else {
  197. cauo = cauo.SetRetryCount(taskData.RetryCount + 1).
  198. SetLastError(lasterr.Error())
  199. }
  200. _, err := cauo.Save(me.ctx)
  201. return err
  202. }
  203. func (me *AsyncTask) requestCallback(taskData *ent.CompapiAsynctask) (int, error) {
  204. var workerId int64
  205. if me.Conf.Debug {
  206. workerId, _ = getGoroutineId()
  207. logx.Debugf("Worker:%d INTO requestCallback for task status:%d", workerId, taskData.TaskStatus)
  208. }
  209. if needStop, _ := me.checkErrRetry(taskData); needStop { //重试次数检测,如果超过直接标为永久失败而不再处理
  210. return 1, errors.New("too many err retry")
  211. }
  212. if taskData.TaskStatus != ReqApi_Done {
  213. return 0, fmt.Errorf("invalid task run order for status:%d", taskData.TaskStatus)
  214. }
  215. if len(taskData.CallbackURL) == 0 {
  216. return 0, errors.New("callback url empty")
  217. }
  218. if len(taskData.ResponseRaw) == 0 {
  219. return 0, errors.New("previous status call api response empty")
  220. }
  221. /*
  222. fstr := "mytest-svc:"
  223. if taskData.RetryCount >= 0 && strings.Contains(taskData.CallbackURL, fstr) {
  224. taskData.CallbackURL = strings.Replace(taskData.CallbackURL, fstr, "0.0.0.0:", 1)
  225. }
  226. */
  227. //请求预先给定的callback_url
  228. res := map[string]any{}
  229. var err error
  230. //初始化client
  231. client := compapi.NewClient(me.ctx)
  232. for i := range LoopTryCount { //重试机制
  233. select {
  234. case <-me.ctx.Done(): //接到信号退出
  235. goto endloopTry
  236. default:
  237. res, err = client.Callback(taskData.EventType, taskData.CallbackURL, taskData)
  238. if err == nil {
  239. //call succ
  240. //fmt.Println("callback succ..........")
  241. //fmt.Println(typekit.PrettyPrint(res))
  242. if me.Conf.Debug {
  243. logx.Infof("callback:'%s' succ", taskData.CallbackURL)
  244. }
  245. goto endloopTry
  246. }
  247. logx.Infof("Worker:%d call '%s' fail: '%s',sleep %d Second for next(%d/%d/%d)", workerId,
  248. taskData.CallbackURL, err, 1+i*LoopDelayFactor, i+1, LoopTryCount, taskData.RetryCount)
  249. time.Sleep(time.Duration(1+i*LoopDelayFactor) * time.Second)
  250. }
  251. }
  252. //多次循环之后依然失败,进入错误任务处理环节
  253. endloopTry:
  254. if err != nil {
  255. err1 := me.dealErrorTask(taskData, err) //错误任务处理
  256. et := 1
  257. if err1 != nil {
  258. et = 0
  259. }
  260. return et, err
  261. }
  262. //成功后处理环节
  263. //更新任务状态 => Callback_Done(回调完成)
  264. err = me.updateTaskStatus(taskData.ID, Callback_Done)
  265. if err != nil {
  266. return 0, err
  267. }
  268. //更新taskData.CallbackResponseRaw
  269. if len(res) > 0 {
  270. me.updateCallbackResponse(taskData.ID, res)
  271. }
  272. taskData.TaskStatus = Callback_Done //状态迁移
  273. return 1, nil
  274. }
  275. func (me *AsyncTask) requestAPI(taskData *ent.CompapiAsynctask) (int, error) {
  276. var workerId int64
  277. if me.Conf.Debug {
  278. workerId, _ = getGoroutineId()
  279. logx.Debugf("Worker:%d INTO requestAPI for task status:%d", workerId, taskData.TaskStatus)
  280. }
  281. if needStop, _ := me.checkErrRetry(taskData); needStop { //重试次数检测,如果超过直接标为永久失败而不再处理
  282. return 1, errors.New("too many err retry")
  283. }
  284. if taskData.TaskStatus != Task_Ready {
  285. return 0, fmt.Errorf("invalid task run order for status:%d", taskData.TaskStatus)
  286. }
  287. var (
  288. err error
  289. apiResp *types.CompOpenApiResp
  290. )
  291. req := types.CompApiReq{}
  292. if err = json.Unmarshal([]byte(taskData.RequestRaw), &req); err != nil {
  293. return 0, err
  294. }
  295. //初始化client
  296. if !strings.HasSuffix(taskData.OpenaiBase, "/") {
  297. taskData.OpenaiBase = taskData.OpenaiBase + "/"
  298. }
  299. client := compapi.NewClient(me.ctx, compapi.WithApiBase(taskData.OpenaiBase),
  300. compapi.WithApiKey(taskData.OpenaiKey))
  301. for i := range LoopTryCount { //重试机制
  302. select {
  303. case <-me.ctx.Done(): //接到信号退出
  304. goto endloopTry
  305. default:
  306. apiResp, err = client.Chat(&req)
  307. if err == nil && apiResp != nil && len(apiResp.Choices) > 0 {
  308. //call succ
  309. goto endloopTry
  310. } else if apiResp != nil && len(apiResp.Choices) == 0 {
  311. err = errors.New("返回结果缺失,请检查访问权限")
  312. }
  313. if me.Conf.Debug {
  314. logx.Infof("Worker:%d call '%s' fail: '%s',sleep %d Second for next(%d/%d/%d)", workerId,
  315. taskData.CallbackURL, err, 1+i*LoopDelayFactor, i+1, LoopTryCount, taskData.RetryCount)
  316. }
  317. time.Sleep(time.Duration(1+i*LoopDelayFactor) * time.Second)
  318. }
  319. }
  320. endloopTry:
  321. //多次循环之后依然失败,进入错误任务处理环节
  322. if err != nil || apiResp == nil {
  323. if apiResp == nil && err == nil {
  324. err = errors.New("resp is null")
  325. }
  326. err1 := me.dealErrorTask(taskData, err) //错误任务处理
  327. et := 1
  328. if err1 != nil {
  329. et = 0
  330. }
  331. return et, err
  332. }
  333. //成功后处理环节
  334. //更新任务状态 => ReqApi_Done(请求API完成)
  335. err = me.updateTaskStatus(taskData.ID, ReqApi_Done)
  336. if err != nil {
  337. return 0, err
  338. }
  339. //更新taskData.ResponseRaw
  340. taskData.ResponseRaw, err = (*apiResp).ToString()
  341. if err != nil {
  342. return 0, err
  343. }
  344. err = me.updateApiResponse(taskData.ID, taskData.ResponseRaw)
  345. if err != nil {
  346. return 0, err
  347. }
  348. taskData.TaskStatus = ReqApi_Done //状态迁移
  349. return 1, nil
  350. }
  351. func EntStructGenScanField(structPtr any) (string, []any, error) {
  352. t := reflect.TypeOf(structPtr)
  353. v := reflect.ValueOf(structPtr)
  354. if t.Kind() != reflect.Ptr || t.Elem().Kind() != reflect.Struct {
  355. return "", nil, errors.New("input must be a pointer to a struct")
  356. }
  357. t = t.Elem()
  358. v = v.Elem()
  359. var fields []string
  360. var scanArgs []any
  361. for i := 0; i < t.NumField(); i++ {
  362. field := t.Field(i)
  363. value := v.Field(i)
  364. // Skip unexported fields
  365. if !field.IsExported() {
  366. continue
  367. }
  368. // Get json tag
  369. jsonTag := field.Tag.Get("json")
  370. if jsonTag == "-" || jsonTag == "" {
  371. continue
  372. }
  373. jsonParts := strings.Split(jsonTag, ",")
  374. jsonName := jsonParts[0]
  375. if jsonName == "" {
  376. continue
  377. }
  378. fields = append(fields, jsonName)
  379. scanArgs = append(scanArgs, value.Addr().Interface())
  380. }
  381. return strings.Join(fields, ", "), scanArgs, nil
  382. }
  383. /*
  384. CREATE INDEX idx_compapi_task_status_chat_id_id_desc
  385. ON compapi_asynctask (task_status, chat_id, id DESC);
  386. */
  387. func (me *AsyncTask) getAsyncReqTaskFairList() ([]Task, error) {
  388. fieldListStr, _, err := EntStructGenScanField(&ent.CompapiAsynctask{})
  389. if err != nil {
  390. return nil, err
  391. }
  392. rawQuery := fmt.Sprintf(`
  393. WITH ranked AS (
  394. SELECT %s,
  395. ROW_NUMBER() OVER (PARTITION BY chat_id ORDER BY id DESC) AS rn
  396. FROM compapi_asynctask
  397. WHERE task_status < ?
  398. )
  399. SELECT %s
  400. FROM ranked
  401. WHERE rn <= ?
  402. ORDER BY rn,id DESC
  403. LIMIT ?;
  404. `, fieldListStr, fieldListStr)
  405. // 执行原始查询
  406. rows, err := me.svcCtx.DB.QueryContext(me.ctx, rawQuery, All_Done,
  407. me.Conf.BatchLoadTask, me.Conf.BatchLoadTask)
  408. if err != nil {
  409. return nil, fmt.Errorf("fetch fair tasks query error: %w", err)
  410. }
  411. defer rows.Close()
  412. Idx := 0
  413. tasks := []Task{}
  414. for rows.Next() {
  415. taskrow := ent.CompapiAsynctask{}
  416. var scanParams []any
  417. _, scanParams, err = EntStructGenScanField(&taskrow)
  418. if err != nil {
  419. break
  420. }
  421. err = rows.Scan(scanParams...)
  422. if err != nil {
  423. break
  424. }
  425. task := Task{Data: &taskrow, Idx: Idx}
  426. tasks = append(tasks, task)
  427. Idx++
  428. }
  429. fmt.Printf("getAsyncReqTaskFairList get task:%d\n", len(tasks))
  430. return tasks, nil
  431. }
  432. func (me *AsyncTask) getAsyncReqTaskList() ([]Task, error) {
  433. var predicates []predicate.CompapiAsynctask
  434. predicates = append(predicates, compapiasynctask.TaskStatusLT(All_Done))
  435. var tasks []Task
  436. res, err := me.svcCtx.DB.CompapiAsynctask.Query().Where(predicates...).
  437. Order(ent.Desc(compapiasynctask.FieldID)).
  438. Limit(int(me.Conf.BatchLoadTask)).
  439. All(me.ctx)
  440. if err == nil {
  441. for idx, val := range res {
  442. tasks = append(tasks, Task{Data: val, Idx: idx})
  443. }
  444. }
  445. return tasks, err
  446. }
  447. func (me *AsyncTask) processTask(workerID uint, task Task) {
  448. /*
  449. fmt.Printf("In processTask,Consumer(%d) dealing Task Detail: User(%s/%s/%s) Async Call %s(%s) on Status:%d\n",
  450. workerID, task.Data.EventType, task.Data.ChatID, task.Data.AuthToken,
  451. task.Data.OpenaiBase, task.Data.OpenaiKey, task.Data.TaskStatus)
  452. */
  453. _ = workerID
  454. var err error
  455. rt := 0
  456. for {
  457. select {
  458. case <-me.ctx.Done(): //接到信号退出
  459. return
  460. default:
  461. if task.Data.TaskStatus >= All_Done {
  462. goto endfor //原来的break操作加了switch一层不好使了
  463. }
  464. switch task.Data.TaskStatus {
  465. case Task_Ready:
  466. //请求API平台
  467. // succ: taskStatus Task_Ready => ReqApi_Done
  468. // fail: taskStatus保持当前不变或Task_Fail
  469. rt, err = me.requestAPI(task.Data)
  470. case ReqApi_Done:
  471. //结果回调
  472. // succ: taskStatus ReqApi_Done => Callback_Done(All_Done)
  473. // fail: taskStatus保持当前不变或Task_Fail
  474. rt, err = me.requestCallback(task.Data)
  475. }
  476. if err != nil {
  477. //收集错误
  478. if rt == 0 {
  479. //不可恢复错误处理....
  480. logx.Errorf("Task error by '%s'", err)
  481. } else {
  482. logx.Debugf("Task ignore by '%s'", err)
  483. }
  484. return //先暂时忽略处理,也许应按错误类型分别对待
  485. }
  486. }
  487. }
  488. endfor:
  489. }
  490. func (me *AsyncTask) batchWork() (int64, int64) {
  491. var (
  492. wg sync.WaitGroup
  493. produced int64 //生产数量(原子计数器)
  494. consumed int64 //消费数量(原子计数器)
  495. )
  496. //创建任务分发器
  497. dispatcher := NewTaskDispatcher(me.Conf.MaxChannel,
  498. me.Conf.BatchLoadTask/me.Conf.MaxChannel, me.Conf.Debug)
  499. //启动消费者
  500. for widx := range me.Conf.MaxWorker {
  501. cidx, ch := dispatcher.getWorkerChanByIdx(widx)
  502. wg.Add(1)
  503. go func(workerID uint, channelID uint, taskCh chan Task) {
  504. defer wg.Done()
  505. gidStr := ""
  506. if me.Conf.Debug {
  507. gid, _ := getGoroutineId()
  508. gidStr = fmt.Sprintf("(Goroutine:%d)", gid)
  509. logx.Infof("Consumer @%d%s bind WorkerChan:#%d start......",
  510. workerID, gidStr, channelID)
  511. }
  512. for task := range taskCh {
  513. me.processTask(widx, task)
  514. atomic.AddInt64(&consumed, 1)
  515. }
  516. }(widx+1, cidx+1, ch)
  517. }
  518. // 生产者
  519. wg.Add(1)
  520. go func() {
  521. defer wg.Done()
  522. gidStr := ""
  523. if me.Conf.Debug {
  524. gid, _ := getGoroutineId()
  525. gidStr = fmt.Sprintf("(Goroutine:%d)", gid)
  526. logx.Infof("Producer @1%s start......", gidStr)
  527. }
  528. //获得待处理异步任务列表
  529. //tasks, err := me.getAsyncReqTaskList()
  530. tasks, err := me.getAsyncReqTaskFairList()
  531. if err != nil {
  532. logx.Errorf("getAsyncReqTaskFairList err:%s", err)
  533. return
  534. }
  535. // 分发任务
  536. for _, task := range tasks {
  537. dispatcher.Dispatch(task)
  538. atomic.AddInt64(&produced, 1)
  539. }
  540. fmt.Printf("📦Producer @1 此批次共创建任务%d件\n", len(tasks))
  541. // 关闭所有会话通道
  542. dispatcher.mu.Lock()
  543. for _, ch := range dispatcher.workerChs {
  544. _ = ch
  545. close(ch)
  546. }
  547. dispatcher.mu.Unlock()
  548. }()
  549. wg.Wait()
  550. consumedRatStr := ""
  551. if atomic.LoadInt64(&produced) > 0 {
  552. consumedRatStr = fmt.Sprintf(" (%d/%d)*100=%d%%", atomic.LoadInt64(&produced), atomic.LoadInt64(&consumed),
  553. (atomic.LoadInt64(&consumed)/atomic.LoadInt64(&produced))*100)
  554. }
  555. fmt.Printf("🏁本次任务完成度统计: Task dispatch: %d(%d)(Producer:1),Task deal: %d(Consumer:%d)%s\n", atomic.LoadInt64(&produced), me.Conf.BatchLoadTask, atomic.LoadInt64(&consumed), me.Conf.MaxWorker, consumedRatStr)
  556. return produced, consumed
  557. }
  558. func (me *AsyncTask) InitServiceContext() *svc.ServiceContext {
  559. rds := me.Conf.RedisConf.MustNewUniversalRedis()
  560. dbOpts := []ent.Option{ent.Log(logx.Info),
  561. ent.Driver(me.Conf.DatabaseConf.NewNoCacheDriver())}
  562. if me.Conf.Debug {
  563. dbOpts = append(dbOpts, ent.Debug())
  564. }
  565. db := ent.NewClient(dbOpts...)
  566. svcCtx := svc.ServiceContext{DB: db, Rds: rds}
  567. //svcCtx.Config
  568. return &svcCtx
  569. }
  570. func (me *AsyncTask) adjustConf() {
  571. if me.Conf.MaxWorker <= 0 {
  572. me.Conf.MaxWorker = 2
  573. }
  574. if me.Conf.MaxChannel <= 0 || me.Conf.MaxChannel > me.Conf.MaxWorker {
  575. me.Conf.MaxChannel = me.Conf.MaxWorker
  576. }
  577. }
  578. func (me *AsyncTask) Run(ctx context.Context) {
  579. me.ctx = ctx
  580. me.Logger = logx.WithContext(ctx)
  581. me.adjustConf()
  582. /*
  583. tasks, err := me.getAsyncReqTaskFairList()
  584. if err != nil {
  585. fmt.Println(err)
  586. return
  587. }
  588. for idx, task := range tasks {
  589. if idx > 20 {
  590. break
  591. }
  592. fmt.Printf("#%d=>%d ||[%s]'%s' || '%s' || '%s'|| '%s'\n", idx, task.Data.ID, task.Data.CreatedAt, task.Data.EventType,
  593. task.Data.Model, task.Data.OpenaiBase, task.Data.ChatID)
  594. }
  595. */
  596. batchId := 0
  597. for {
  598. batchId++
  599. select {
  600. case <-ctx.Done():
  601. // 收到了取消信号
  602. fmt.Printf("Main Will Shutting down gracefully... Reason: %v\n", ctx.Err())
  603. return
  604. default:
  605. timeStart := time.Now()
  606. secStart := timeStart.Unix()
  607. fmt.Printf("[%s]batchWork#%d start......\n", timeStart.Format("2006-01-02 15:04:05"), batchId)
  608. produced, _ := me.batchWork()
  609. timeEnd := time.Now()
  610. fmt.Printf("[%s]batchWork#%d end,spent %d sec\n", timeEnd.Format("2006-01-02 15:04:05"),
  611. batchId, timeEnd.Unix()-secStart)
  612. timeDurnum := 1
  613. if produced == 0 {
  614. timeDurnum = 5
  615. }
  616. fmt.Printf("batchWork will sleep %d sec\n", timeDurnum)
  617. time.Sleep(time.Duration(timeDurnum) * time.Second)
  618. }
  619. }
  620. }
  621. func NewAsyncTask() *AsyncTask {
  622. ataskObj := AsyncTask{}
  623. flag.Parse() //将命令行参数也塞入flag.CommandLine结构
  624. //fmt.Println(typekit.PrettyPrint(flag.CommandLine))
  625. conf.MustLoad(*configFile, &ataskObj.Conf, conf.UseEnv())
  626. //fmt.Println(typekit.PrettyPrint(ataskObj.Conf))
  627. ataskObj.svcCtx = ataskObj.InitServiceContext()
  628. return &ataskObj
  629. }
  630. func main() {
  631. fmt.Println("Compapi Asynctask Start......")
  632. //ctx, cancel := context.WithCancel(context.Background())
  633. ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  634. defer cancel()
  635. NewAsyncTask().Run(ctx)
  636. fmt.Println("Compapi Asynctask End......")
  637. }