compute_statistic.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package crontask
  2. import (
  3. "strconv"
  4. "time"
  5. "wechat-api/ent"
  6. "wechat-api/ent/contact"
  7. "wechat-api/ent/custom_types"
  8. "wechat-api/ent/labelrelationship"
  9. "wechat-api/ent/messagerecords"
  10. "wechat-api/ent/usagedetail"
  11. "wechat-api/ent/usagestatisticday"
  12. "wechat-api/ent/usagestatistichour"
  13. "wechat-api/ent/usagestatisticmonth"
  14. "wechat-api/ent/wx"
  15. )
  16. func (l *CronTask) computeStatistic() {
  17. startTime := time.Now()
  18. // 获取所有机器人信息
  19. wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx)
  20. if err != nil {
  21. l.Errorf("fetch wxids error:%v\n", err)
  22. return
  23. }
  24. wxbotsSet := make(map[uint64][]*ent.Wx)
  25. for _, bot := range wxbots {
  26. wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot)
  27. }
  28. LabelsCountSet := make(map[uint64][]custom_types.LabelDist)
  29. /*
  30. 计算本小时的数据
  31. 1. 查询出上小时里所有 usagedetail 内容
  32. 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数
  33. 3. 遍历的时候可能有重复,所以要先检查是否生成了数据,如果有就忽略,没有再生成
  34. ----------------------------------------------------------------------------------------------------------
  35. */
  36. // 获取当前时间
  37. now := time.Now()
  38. // 获取本小时的第一分钟
  39. currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
  40. currentHourInt, _ := strconv.Atoi(currentHour.Format("2006010215"))
  41. // 上一个小时的起始时间
  42. lastHour := currentHour.Add(-time.Hour * 1)
  43. lastHourInt, _ := strconv.Atoi(lastHour.Format("2006010215"))
  44. for orgID, wxinfos := range wxbotsSet {
  45. var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt, orgNewUserInt int
  46. for _, wxinfo := range wxinfos {
  47. l.Logger.Infof("开始计算小时数据:%d\n", currentHourInt)
  48. // 先判断该账号是否已经统计了小时数据,如果已经统计了,就不需要再统计了
  49. var aiResponseInt, sopRunInt, friendCountInt, groupCountInt, accountBalanceInt, consumeTokenInt, activeUserInt, newUserInt int
  50. hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
  51. usagestatistichour.Type(1),
  52. usagestatistichour.BotID(wxinfo.Wxid),
  53. usagestatistichour.Addtime(uint64(currentHourInt)),
  54. ).Count(l.ctx)
  55. if hourDataCount > 0 {
  56. continue
  57. }
  58. // AI回复包括:SOP次数+AI次数
  59. // SOP次数:content 非空,source_type = 3 或 4,sub_source_id = 0
  60. // AI次数:app = 1 或 3
  61. sopresp, _ := l.svcCtx.DB.MessageRecords.Query().Where(
  62. messagerecords.SubSourceID(0),
  63. messagerecords.SourceTypeIn(3, 4),
  64. messagerecords.BotWxid(wxinfo.Wxid),
  65. messagerecords.CreatedAtGTE(lastHour),
  66. messagerecords.CreatedAtLT(currentHour),
  67. ).Count(l.ctx)
  68. airesp, _ := l.svcCtx.DB.UsageDetail.Query().Where(
  69. usagedetail.AppIn(1, 3),
  70. usagedetail.BotID(wxinfo.Wxid),
  71. usagedetail.CreatedAtGTE(lastHour),
  72. usagedetail.CreatedAtLT(currentHour),
  73. ).Count(l.ctx)
  74. aiResponseInt = sopresp + airesp
  75. orgAiResponseInt += aiResponseInt
  76. // SOP执行次数:SOP阶段和节点的执行次数。
  77. sopRunInt, _ = l.svcCtx.DB.MessageRecords.Query().Where(
  78. messagerecords.BotWxid(wxinfo.Wxid),
  79. messagerecords.SubSourceIDEQ(0),
  80. messagerecords.SourceTypeIn(3, 4),
  81. messagerecords.BotWxid(wxinfo.Wxid),
  82. messagerecords.CreatedAtGTE(lastHour),
  83. messagerecords.CreatedAtLT(currentHour),
  84. ).Count(l.ctx)
  85. orgSopRunInt += sopRunInt
  86. // 好友总数:contact 表中 type=1
  87. friendCountInt, _ = l.svcCtx.DB.Contact.Query().Where(
  88. contact.Type(1),
  89. contact.WxWxid(wxinfo.Wxid),
  90. ).Count(l.ctx)
  91. orgFriendCountInt += friendCountInt
  92. // 群总数:contact 表中 type=2
  93. groupCountInt, _ = l.svcCtx.DB.Contact.Query().Where(
  94. contact.Type(2),
  95. contact.WxWxid(wxinfo.Wxid),
  96. ).Count(l.ctx)
  97. orgGroupCountInt += groupCountInt
  98. // 消耗Token数:usage_detail 表
  99. consumeTokenInt, _ = l.svcCtx.DB.UsageDetail.Query().Where(
  100. usagedetail.TypeEQ(1),
  101. usagedetail.BotID(wxinfo.Wxid),
  102. usagedetail.CreatedAtGTE(lastHour),
  103. usagedetail.CreatedAtLT(currentHour),
  104. ).Aggregate(ent.Sum("total_tokens")).Int(l.ctx)
  105. orgConsumeTokenInt += consumeTokenInt
  106. // 账户余额
  107. accountBalanceInt = 0
  108. orgAccountBalanceInt = 0
  109. // 活跃好友:usage_detail 表 type = 1
  110. activeUserInt, _ = l.svcCtx.DB.UsageDetail.Query().Where(
  111. usagedetail.Type(1),
  112. usagedetail.BotID(wxinfo.Wxid),
  113. usagedetail.CreatedAtGTE(lastHour),
  114. usagedetail.CreatedAtLT(currentHour),
  115. ).GroupBy(usagedetail.FieldReceiverID).Int(l.ctx)
  116. orgActiveUserInt += activeUserInt
  117. lastHourData, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
  118. usagestatistichour.AddtimeEQ(uint64(lastHourInt)),
  119. usagestatistichour.Type(1),
  120. usagestatistichour.BotID(wxinfo.Wxid),
  121. ).First(l.ctx)
  122. if lastHourData == nil {
  123. newUserInt = friendCountInt
  124. } else {
  125. newUserInt = int(lastHourData.TotalFriend) - friendCountInt
  126. }
  127. orgNewUserInt += newUserInt
  128. var lc []custom_types.LabelDist
  129. _, err := l.svcCtx.DB.UsageStatisticHour.Create().
  130. SetType(1).
  131. SetBotID(wxinfo.Wxid).
  132. SetOrganizationID(wxinfo.OrganizationID).
  133. SetAiResponse(uint64(aiResponseInt)).
  134. SetSopRun(uint64(sopRunInt)).
  135. SetTotalFriend(uint64(friendCountInt)).
  136. SetTotalGroup(uint64(groupCountInt)).
  137. SetAccountBalance(uint64(accountBalanceInt)).
  138. SetConsumeToken(uint64(consumeTokenInt)).
  139. SetActiveUser(uint64(activeUserInt)).
  140. SetNewUser(int64(newUserInt)).
  141. SetAddtime(uint64(currentHourInt)).
  142. SetLabelDist(lc).
  143. Save(l.ctx)
  144. l.Errorf("save hour data error:%v \n", err)
  145. }
  146. // 先判断该租户是否已经统计了小时数据,如果已经统计了,就不需要再统计了
  147. hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
  148. usagestatistichour.Type(1),
  149. usagestatistichour.OrganizationID(orgID),
  150. usagestatistichour.BotIDIsNil(),
  151. usagestatistichour.Addtime(uint64(currentHourInt)),
  152. ).Count(l.ctx)
  153. if hourDataCount > 0 {
  154. continue
  155. }
  156. var LabelsCount []custom_types.LabelDist
  157. err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.OrganizationIDEQ(orgID), labelrelationship.DeletedAtIsNil()).GroupBy(labelrelationship.FieldLabelID).Aggregate(ent.Count()).Scan(l.ctx, &LabelsCount)
  158. l.Errorf("save hour data error:%v \n", err)
  159. LabelsCountSet[orgID] = LabelsCount
  160. _, err = l.svcCtx.DB.UsageStatisticHour.Create().
  161. SetType(1).
  162. SetOrganizationID(orgID).
  163. SetAiResponse(uint64(orgAiResponseInt)).
  164. SetSopRun(uint64(orgSopRunInt)).
  165. SetTotalFriend(uint64(orgFriendCountInt)).
  166. SetTotalGroup(uint64(orgGroupCountInt)).
  167. SetAccountBalance(uint64(orgAccountBalanceInt)).
  168. SetConsumeToken(uint64(orgConsumeTokenInt)).
  169. SetActiveUser(uint64(orgActiveUserInt)).
  170. SetNewUser(int64(orgNewUserInt)).
  171. SetAddtime(uint64(currentHourInt)).
  172. SetNotNilLabelDist(LabelsCount).
  173. Save(l.ctx)
  174. l.Errorf("save hour data error:%v \n", err)
  175. }
  176. /*
  177. 计算日数据
  178. ----------------------------------------------------------------------------------------------------------
  179. */
  180. dayStr := time.Now().Format("20060102")
  181. day, _ := strconv.Atoi(dayStr)
  182. // 获取昨天的第一小时
  183. yesterday := now.AddDate(0, 0, -1)
  184. yesterdayFirstHour := time.Date(yesterday.Year(), yesterday.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  185. yesterdayFirstHourInt, _ := strconv.Atoi(yesterdayFirstHour.Format("20060102"))
  186. // 获取昨天的最后一小时
  187. yesterdayLastHour := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  188. yesterdayLastHourInt, _ := strconv.Atoi(yesterdayLastHour.Format("20060102"))
  189. for orgID, wxinfos := range wxbotsSet {
  190. var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64
  191. var orgNewUserInt int64
  192. for _, wxinfo := range wxinfos {
  193. l.Logger.Infof("开始计算日数据:%d\n", day)
  194. // 先判断该账号是否已经统计了日数据,如果已经统计了,就不需要再统计了
  195. dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
  196. usagestatisticday.Type(1),
  197. usagestatisticday.BotID(wxinfo.Wxid),
  198. usagestatisticday.Addtime(uint64(day)),
  199. ).Count(l.ctx)
  200. // 如果添加过了就略过
  201. if dayDataCount > 0 {
  202. continue
  203. }
  204. hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
  205. usagestatistichour.Type(1),
  206. usagestatistichour.BotID(wxinfo.Wxid),
  207. usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)),
  208. usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)),
  209. ).All(l.ctx)
  210. var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64
  211. var newUser int64
  212. for _, hourData := range hourDataBatch {
  213. aiResponse += hourData.AiResponse
  214. sopRun += hourData.SopRun
  215. totalFriend += hourData.TotalFriend
  216. totalGroup += hourData.TotalGroup
  217. accountBalance += hourData.AccountBalance
  218. consumeToken += hourData.ConsumeToken
  219. activeUser += hourData.ActiveUser
  220. newUser += hourData.NewUser
  221. }
  222. orgAiResponseInt += aiResponse
  223. orgSopRunInt += sopRun
  224. orgFriendCountInt += totalFriend
  225. orgGroupCountInt += totalGroup
  226. orgAccountBalanceInt += accountBalance
  227. orgConsumeTokenInt += consumeToken
  228. orgActiveUserInt += activeUser
  229. orgNewUserInt += newUser
  230. var lc []custom_types.LabelDist
  231. _, err := l.svcCtx.DB.UsageStatisticDay.Create().
  232. SetAddtime(uint64(day)).
  233. SetType(1).
  234. SetBotID(wxinfo.Wxid).
  235. SetOrganizationID(wxinfo.OrganizationID).
  236. SetAiResponse(aiResponse).
  237. SetSopRun(sopRun).
  238. SetTotalFriend(totalFriend).
  239. SetTotalGroup(totalGroup).
  240. SetAccountBalance(accountBalance).
  241. SetConsumeToken(consumeToken).
  242. SetActiveUser(activeUser).
  243. SetNewUser(newUser).
  244. SetLabelDist(lc).
  245. Save(l.ctx)
  246. if err != nil {
  247. l.Errorf("create day data error:%v \n", err)
  248. continue
  249. }
  250. }
  251. // 先判断该租户是否已经统计了日数据,如果已经统计了,就不需要再统计了
  252. dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
  253. usagestatisticday.Type(1),
  254. usagestatisticday.OrganizationID(orgID),
  255. usagestatisticday.BotIDIsNil(),
  256. usagestatisticday.Addtime(uint64(day)),
  257. ).Count(l.ctx)
  258. // 如果添加过了就略过
  259. if dayDataCount > 0 {
  260. continue
  261. }
  262. _, err := l.svcCtx.DB.UsageStatisticDay.Create().
  263. SetAddtime(uint64(day)).
  264. SetType(1).
  265. SetOrganizationID(orgID).
  266. SetAiResponse(orgAiResponseInt).
  267. SetSopRun(orgSopRunInt).
  268. SetTotalFriend(orgFriendCountInt).
  269. SetTotalGroup(orgGroupCountInt).
  270. SetAccountBalance(orgAccountBalanceInt).
  271. SetConsumeToken(orgConsumeTokenInt).
  272. SetActiveUser(orgActiveUserInt).
  273. SetNewUser(orgNewUserInt).
  274. SetNotNilLabelDist(LabelsCountSet[orgID]).
  275. Save(l.ctx)
  276. if err != nil {
  277. l.Errorf("create day data error:%v \n", err)
  278. continue
  279. }
  280. }
  281. /*
  282. 查看月表数据是否已经完成
  283. 1. 查询出上月里所有 usagedetail 内容
  284. 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数
  285. ----------------------------------------------------------------------------------------------------------
  286. */
  287. monthStr := time.Now().Format("200601")
  288. month, _ := strconv.Atoi(monthStr)
  289. for orgID, wxinfos := range wxbotsSet {
  290. var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64
  291. var orgNewUserInt int64
  292. for _, wxinfo := range wxinfos {
  293. l.Logger.Infof("开始计算月数据:%d\n", month)
  294. // 获取上月的第一天
  295. monthFirstDay := time.Date(now.Year(), now.Month()-1, 1, 0, 0, 0, 0, now.Location())
  296. monthFirstDayInt, _ := strconv.Atoi(monthFirstDay.Format("20060102"))
  297. // 获取上月的最后一天
  298. monthLastDay := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
  299. monthLastDayInt, _ := strconv.Atoi(monthLastDay.Format("20060102"))
  300. // 先判断该账号是否已经统计了月数据,如果已经统计了,就不需要再统计了
  301. monthDataCount, _ := l.svcCtx.DB.UsageStatisticMonth.Query().Where(
  302. usagestatisticmonth.Type(1),
  303. usagestatisticmonth.BotID(wxinfo.Wxid),
  304. usagestatisticmonth.Addtime(uint64(month)),
  305. ).Count(l.ctx)
  306. // 如果添加过了就略过
  307. if monthDataCount > 0 {
  308. continue
  309. }
  310. dayDataBatch, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
  311. usagestatisticday.Type(1),
  312. usagestatisticday.BotID(wxinfo.Wxid),
  313. usagestatisticday.AddtimeGTE(uint64(monthFirstDayInt)),
  314. usagestatisticday.AddtimeLT(uint64(monthLastDayInt)),
  315. ).All(l.ctx)
  316. var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64
  317. var newUser int64
  318. for _, dayData := range dayDataBatch {
  319. aiResponse += dayData.AiResponse
  320. sopRun += dayData.SopRun
  321. totalFriend += dayData.TotalFriend
  322. totalGroup += dayData.TotalGroup
  323. accountBalance += dayData.AccountBalance
  324. consumeToken += dayData.ConsumeToken
  325. activeUser += dayData.ActiveUser
  326. newUser += dayData.NewUser
  327. }
  328. orgAiResponseInt += aiResponse
  329. orgSopRunInt += sopRun
  330. orgFriendCountInt += totalFriend
  331. orgGroupCountInt += totalGroup
  332. orgAccountBalanceInt += accountBalance
  333. orgConsumeTokenInt += consumeToken
  334. orgActiveUserInt += activeUser
  335. orgNewUserInt += newUser
  336. var lc []custom_types.LabelDist
  337. _, err := l.svcCtx.DB.UsageStatisticMonth.Create().
  338. SetAddtime(uint64(month)).
  339. SetType(1).
  340. SetBotID(wxinfo.Wxid).
  341. SetOrganizationID(wxinfo.OrganizationID).
  342. SetAiResponse(aiResponse).
  343. SetSopRun(sopRun).
  344. SetTotalFriend(totalFriend).
  345. SetTotalGroup(totalGroup).
  346. SetAccountBalance(accountBalance).
  347. SetConsumeToken(consumeToken).
  348. SetActiveUser(activeUser).
  349. SetNewUser(newUser).
  350. SetLabelDist(lc).
  351. Save(l.ctx)
  352. if err != nil {
  353. l.Errorf("create month data error:%v \n", err)
  354. continue
  355. }
  356. }
  357. // 先判断该租户是否已经统计了月数据,如果已经统计了,就不需要再统计了
  358. monthDataCount, _ := l.svcCtx.DB.UsageStatisticMonth.Query().Where(
  359. usagestatisticmonth.Type(1),
  360. usagestatisticmonth.OrganizationID(orgID),
  361. usagestatisticmonth.BotIDIsNil(),
  362. usagestatisticmonth.Addtime(uint64(month)),
  363. ).Count(l.ctx)
  364. // 如果添加过了就略过
  365. if monthDataCount > 0 {
  366. continue
  367. }
  368. _, err := l.svcCtx.DB.UsageStatisticMonth.Create().
  369. SetAddtime(uint64(month)).
  370. SetType(1).
  371. SetOrganizationID(orgID).
  372. SetAiResponse(orgAiResponseInt).
  373. SetSopRun(orgSopRunInt).
  374. SetTotalFriend(orgFriendCountInt).
  375. SetTotalGroup(orgGroupCountInt).
  376. SetAccountBalance(orgAccountBalanceInt).
  377. SetConsumeToken(orgConsumeTokenInt).
  378. SetActiveUser(orgActiveUserInt).
  379. SetNewUser(orgNewUserInt).
  380. SetNotNilLabelDist(LabelsCountSet[orgID]).
  381. Save(l.ctx)
  382. if err != nil {
  383. l.Errorf("create month data error:%v \n", err)
  384. continue
  385. }
  386. }
  387. finishTime := time.Now()
  388. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  389. return
  390. }