compute_historical_statistic.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package crontask
  2. import (
  3. "fmt"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "strconv"
  6. "time"
  7. "wechat-api/ent"
  8. "wechat-api/ent/usagedetail"
  9. "wechat-api/ent/usagestatisticday"
  10. "wechat-api/ent/usagestatistichour"
  11. )
  12. type OrgBot struct {
  13. OrganizationID uint64 `json:"organization_id"`
  14. BotID string `json:"bot_id"`
  15. }
  16. func (l *CronTask) computeHistoricalStatistic() {
  17. // 获取所有机器人信息
  18. //wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx)
  19. //if err != nil {
  20. // l.Errorf("fetch wxids error:%v\n", err)
  21. // return
  22. //}
  23. //
  24. //wxbotsSet := make(map[uint64][]*ent.Wx)
  25. //for _, bot := range wxbots {
  26. // if !strings.HasPrefix(bot.Wxid, "temp-") {
  27. // wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot)
  28. // }
  29. //}
  30. orgBots := []OrgBot{}
  31. err := l.svcCtx.DB.UsageDetail.Query().
  32. GroupBy(usagedetail.FieldOrganizationID, usagedetail.FieldBotID).
  33. Aggregate().
  34. Scan(l.ctx, &orgBots)
  35. if err != nil {
  36. l.Errorf("group usage_detail error: %v", err)
  37. return
  38. }
  39. wxbotsSet := make(map[uint64][]string)
  40. for _, ob := range orgBots {
  41. wxbotsSet[ob.OrganizationID] = append(wxbotsSet[ob.OrganizationID], ob.BotID)
  42. }
  43. logx.Info("wxbotsSet: ", wxbotsSet)
  44. /*
  45. 计算本小时的数据
  46. 1. 查询出上小时里所有 usagedetail 内容
  47. 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数
  48. 3. 遍历的时候可能有重复,所以要先检查是否生成了数据,如果有就忽略,没有再生成
  49. ----------------------------------------------------------------------------------------------------------
  50. */
  51. // 获取当前时间
  52. //now := time.Now()
  53. //start := time.Date(2025, 5, 5, 0, 0, 0, 0, time.Local)
  54. //end := time.Date(2025, 5, 8, 0, 0, 0, 0, time.Local)
  55. start := time.Date(2024, 11, 24, 0, 0, 0, 0, time.Local)
  56. end := time.Date(2025, 5, 14, 0, 0, 0, 0, time.Local)
  57. for now := start; !now.After(end); now = now.Add(time.Hour) {
  58. fmt.Println(now.Format("2006-01-02 15:00:00"))
  59. // 获取本小时的第一分钟
  60. currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()).Add(-8 * time.Hour)
  61. currentHourInt, _ := strconv.Atoi(currentHour.Format("2006010215"))
  62. currentHourLocation := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
  63. currentHourIntLocation, _ := strconv.Atoi(currentHourLocation.Format("2006010215"))
  64. // 上一个小时的起始时间
  65. lastHour := currentHour.Add(-time.Hour * 1)
  66. lastHourLocation := currentHourLocation.Add(-time.Hour * 1)
  67. lastHourInt, _ := strconv.Atoi(lastHour.Format("2006010215"))
  68. lastHourIntLocation, _ := strconv.Atoi(lastHourLocation.Format("2006010215"))
  69. l.Logger.Infof("currentHour:%d\n", currentHour)
  70. l.Logger.Infof("currentHourInt:%d\n", currentHourInt)
  71. l.Logger.Infof("currentHourLocation:%d\n", currentHourLocation)
  72. l.Logger.Infof("currentHourIntLocation:%d\n", currentHourIntLocation)
  73. l.Logger.Infof("lastHour:%d\n", lastHour)
  74. l.Logger.Infof("lastHourLocation:%d\n", lastHourLocation)
  75. l.Logger.Infof("lastHourInt:%d\n", lastHourInt)
  76. l.Logger.Infof("lastHourIntLocation:%d\n", lastHourIntLocation)
  77. var allHourConsumeCoinFloat float64
  78. for orgID, wxinfos := range wxbotsSet {
  79. var orgHourConsumeCoinFloat float64
  80. for _, wxinfo := range wxinfos {
  81. l.Logger.Infof("开始计算小时数据:%d\n", lastHourInt)
  82. // 先判断该账号是否已经统计了小时数据,如果已经统计了,就不需要再统计了
  83. var consumeCoinFloat float64
  84. // 计算积分消耗
  85. consumeCoinFloat, _ = l.svcCtx.DB.UsageDetail.Query().Where(
  86. usagedetail.BotID(wxinfo),
  87. usagedetail.OrganizationIDEQ(orgID),
  88. usagedetail.CreatedAtGTE(lastHour),
  89. usagedetail.CreatedAtLT(currentHour),
  90. ).Aggregate(ent.Sum("credits")).Float64(l.ctx)
  91. ush, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.BotIDEQ(wxinfo), usagestatistichour.OrganizationIDEQ(orgID)).Only(l.ctx)
  92. if ush == nil {
  93. continue
  94. }
  95. allHourConsumeCoinFloat += consumeCoinFloat
  96. orgHourConsumeCoinFloat += consumeCoinFloat
  97. logx.Info("hour consumeCoinFloat: ", consumeCoinFloat)
  98. logx.Info("hour allHourConsumeCoinFloat: ", allHourConsumeCoinFloat)
  99. logx.Info("hour orgHourConsumeCoinFloat: ", orgHourConsumeCoinFloat)
  100. _, err := l.svcCtx.DB.UsageStatisticHour.Update().
  101. Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.BotIDEQ(wxinfo), usagestatistichour.OrganizationIDEQ(orgID)).
  102. SetConsumeCoin(consumeCoinFloat).
  103. Save(l.ctx)
  104. if err != nil {
  105. l.Errorf("save hour data error:%v \n", err)
  106. }
  107. }
  108. _, err = l.svcCtx.DB.UsageStatisticHour.Update().
  109. Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.OrganizationIDEQ(orgID), usagestatistichour.BotIDEQ("")).
  110. SetConsumeCoin(orgHourConsumeCoinFloat).
  111. Save(l.ctx)
  112. if err != nil {
  113. l.Errorf("save hour data error:%v \n", err)
  114. }
  115. }
  116. // 先判断该租户是否已经统计了小时数据,如果已经统计了,就不需要再统计了
  117. _, err = l.svcCtx.DB.UsageStatisticHour.Update().
  118. Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.OrganizationIDEQ(0), usagestatistichour.BotIDEQ("")).
  119. SetConsumeCoin(allHourConsumeCoinFloat).
  120. Save(l.ctx)
  121. if err != nil {
  122. l.Errorf("save hour data error:%v \n", err)
  123. }
  124. /*
  125. 计算日数据
  126. ----------------------------------------------------------------------------------------------------------
  127. */
  128. //dayStr := time.Now().Format("20060102")
  129. //day, _ := strconv.Atoi(dayStr)
  130. // 获取昨天的第一小时
  131. yesterday := now.AddDate(0, 0, -1)
  132. yesterdayFirstHour := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, now.Location())
  133. yesterdayInt, _ := strconv.Atoi(yesterdayFirstHour.Format("20060102"))
  134. yesterdayFirstHourInt, _ := strconv.Atoi(yesterdayFirstHour.Format("2006010215"))
  135. // 获取昨天的最后一小时
  136. yesterdayLastHour := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  137. yesterdayLastHourInt, _ := strconv.Atoi(yesterdayLastHour.Format("2006010215"))
  138. var allDayConsumeCoinFloat float64
  139. for orgID, wxinfos := range wxbotsSet {
  140. var orgDayConsumeCoinFloat float64
  141. for _, wxinfo := range wxinfos {
  142. l.Logger.Infof("开始计算日数据:%d\n", yesterdayInt)
  143. l.Logger.Infof("yesterdayFirstHourInt:%d\n", yesterdayFirstHourInt)
  144. l.Logger.Infof("yesterdayLastHourInt:%d\n", yesterdayLastHourInt)
  145. hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
  146. usagestatistichour.Type(1),
  147. usagestatistichour.BotID(wxinfo),
  148. usagestatistichour.OrganizationIDEQ(orgID),
  149. usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)),
  150. usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)),
  151. ).All(l.ctx)
  152. if hourDataBatch == nil {
  153. continue
  154. }
  155. var consumeCoin float64
  156. for _, hourData := range hourDataBatch {
  157. consumeCoin += hourData.ConsumeCoin
  158. }
  159. orgDayConsumeCoinFloat += consumeCoin
  160. allDayConsumeCoinFloat += consumeCoin
  161. // 如果添加过了就略过
  162. if yesterdayLastHourInt <= currentHourInt {
  163. _, err := l.svcCtx.DB.UsageStatisticDay.Update().
  164. Where(usagestatisticday.AddtimeEQ(uint64(yesterdayInt)), usagestatisticday.BotID(wxinfo), usagestatisticday.OrganizationIDEQ(orgID)).
  165. SetConsumeCoin(consumeCoin).
  166. Save(l.ctx)
  167. if err != nil {
  168. l.Errorf("create day data error:%v \n", err)
  169. continue
  170. }
  171. }
  172. }
  173. // 如果添加过了就略过
  174. if yesterdayLastHourInt <= currentHourInt {
  175. _, err := l.svcCtx.DB.UsageStatisticDay.Update().
  176. Where(usagestatisticday.AddtimeEQ(uint64(yesterdayInt)), usagestatisticday.OrganizationIDEQ(orgID), usagestatisticday.BotIDEQ("")).
  177. SetConsumeCoin(orgDayConsumeCoinFloat).
  178. Save(l.ctx)
  179. if err != nil {
  180. l.Errorf("create day data error:%v \n", err)
  181. continue
  182. }
  183. }
  184. }
  185. // 如果添加过了就略过
  186. if yesterdayLastHourInt <= currentHourInt {
  187. _, err = l.svcCtx.DB.UsageStatisticDay.Update().
  188. Where(usagestatisticday.AddtimeEQ(uint64(yesterdayInt)), usagestatisticday.OrganizationIDEQ(0), usagestatisticday.BotIDEQ("")).
  189. SetConsumeCoin(allDayConsumeCoinFloat).
  190. Save(l.ctx)
  191. if err != nil {
  192. l.Errorf("create day data error:%v \n", err)
  193. }
  194. }
  195. }
  196. return
  197. }