package crontask import ( "strconv" "time" "wechat-api/ent" "wechat-api/ent/contact" "wechat-api/ent/custom_types" "wechat-api/ent/labelrelationship" "wechat-api/ent/messagerecords" "wechat-api/ent/usagedetail" "wechat-api/ent/usagestatisticday" "wechat-api/ent/usagestatistichour" "wechat-api/ent/usagestatisticmonth" "wechat-api/ent/wx" ) func (l *CronTask) computeStatistic() { startTime := time.Now() // 获取所有机器人信息 wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx) if err != nil { l.Errorf("fetch wxids error:%v\n", err) return } wxbotsSet := make(map[uint64][]*ent.Wx) for _, bot := range wxbots { wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot) } LabelsCountSet := make(map[uint64][]custom_types.LabelDist) /* 计算本小时的数据 1. 查询出上小时里所有 usagedetail 内容 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数 3. 遍历的时候可能有重复,所以要先检查是否生成了数据,如果有就忽略,没有再生成 ---------------------------------------------------------------------------------------------------------- */ // 获取当前时间 now := time.Now() // 获取本小时的第一分钟 currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) currentHourInt, _ := strconv.Atoi(currentHour.Format("2006010215")) // 上一个小时的起始时间 lastHour := currentHour.Add(-time.Hour * 1) lastHourInt, _ := strconv.Atoi(lastHour.Format("2006010215")) for orgID, wxinfos := range wxbotsSet { var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt, orgNewUserInt int for _, wxinfo := range wxinfos { l.Logger.Infof("开始计算小时数据:%d\n", currentHourInt) // 先判断该账号是否已经统计了小时数据,如果已经统计了,就不需要再统计了 var aiResponseInt, sopRunInt, friendCountInt, groupCountInt, accountBalanceInt, consumeTokenInt, activeUserInt, newUserInt int hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.BotID(wxinfo.Wxid), usagestatistichour.Addtime(uint64(currentHourInt)), ).Count(l.ctx) if hourDataCount > 0 { continue } // AI回复包括:SOP次数+AI次数 // SOP次数:content 非空,source_type = 3 或 4,sub_source_id = 0 // AI次数:app = 1 或 3 sopresp, _ := l.svcCtx.DB.MessageRecords.Query().Where( messagerecords.SubSourceID(0), messagerecords.SourceTypeIn(3, 4), messagerecords.BotWxid(wxinfo.Wxid), messagerecords.CreatedAtGTE(lastHour), messagerecords.CreatedAtLT(currentHour), ).Count(l.ctx) airesp, _ := l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.AppIn(1, 3), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(lastHour), usagedetail.CreatedAtLT(currentHour), ).Count(l.ctx) aiResponseInt = sopresp + airesp orgAiResponseInt += aiResponseInt // SOP执行次数:SOP阶段和节点的执行次数。 sopRunInt, _ = l.svcCtx.DB.MessageRecords.Query().Where( messagerecords.BotWxid(wxinfo.Wxid), messagerecords.SubSourceIDEQ(0), messagerecords.SourceTypeIn(3, 4), messagerecords.BotWxid(wxinfo.Wxid), messagerecords.CreatedAtGTE(lastHour), messagerecords.CreatedAtLT(currentHour), ).Count(l.ctx) orgSopRunInt += sopRunInt // 好友总数:contact 表中 type=1 friendCountInt, _ = l.svcCtx.DB.Contact.Query().Where( contact.Type(1), contact.WxWxid(wxinfo.Wxid), ).Count(l.ctx) orgFriendCountInt += friendCountInt // 群总数:contact 表中 type=2 groupCountInt, _ = l.svcCtx.DB.Contact.Query().Where( contact.Type(2), contact.WxWxid(wxinfo.Wxid), ).Count(l.ctx) orgGroupCountInt += groupCountInt // 消耗Token数:usage_detail 表 consumeTokenInt, _ = l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.TypeEQ(1), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(lastHour), usagedetail.CreatedAtLT(currentHour), ).Aggregate(ent.Sum("total_tokens")).Int(l.ctx) orgConsumeTokenInt += consumeTokenInt // 账户余额 accountBalanceInt = 0 orgAccountBalanceInt = 0 // 活跃好友:usage_detail 表 type = 1 activeUserInt, _ = l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.Type(1), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(lastHour), usagedetail.CreatedAtLT(currentHour), ).GroupBy(usagedetail.FieldReceiverID).Int(l.ctx) orgActiveUserInt += activeUserInt lastHourData, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.AddtimeEQ(uint64(lastHourInt)), usagestatistichour.Type(1), usagestatistichour.BotID(wxinfo.Wxid), ).First(l.ctx) if lastHourData == nil { newUserInt = friendCountInt } else { newUserInt = int(lastHourData.TotalFriend) - friendCountInt } orgNewUserInt += newUserInt _, err := l.svcCtx.DB.UsageStatisticHour.Create(). SetType(1). SetBotID(wxinfo.Wxid). SetOrganizationID(wxinfo.OrganizationID). SetAiResponse(uint64(aiResponseInt)). SetSopRun(uint64(sopRunInt)). SetTotalFriend(uint64(friendCountInt)). SetTotalGroup(uint64(groupCountInt)). SetAccountBalance(uint64(accountBalanceInt)). SetConsumeToken(uint64(consumeTokenInt)). SetActiveUser(uint64(activeUserInt)). SetNewUser(int64(newUserInt)). SetAddtime(uint64(currentHourInt)). Save(l.ctx) l.Errorf("save hour data error:%v \n", err) } // 先判断该租户是否已经统计了小时数据,如果已经统计了,就不需要再统计了 hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.OrganizationID(orgID), usagestatistichour.BotIDIsNil(), usagestatistichour.Addtime(uint64(currentHourInt)), ).Count(l.ctx) if hourDataCount > 0 { continue } var LabelsCount []custom_types.LabelDist err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.OrganizationIDEQ(orgID), labelrelationship.DeletedAtIsNil()).GroupBy(labelrelationship.FieldLabelID).Aggregate(ent.Count()).Scan(l.ctx, &LabelsCount) l.Errorf("save hour data error:%v \n", err) LabelsCountSet[orgID] = LabelsCount _, err = l.svcCtx.DB.UsageStatisticHour.Create(). SetType(1). SetOrganizationID(orgID). SetAiResponse(uint64(orgAiResponseInt)). SetSopRun(uint64(orgSopRunInt)). SetTotalFriend(uint64(orgFriendCountInt)). SetTotalGroup(uint64(orgGroupCountInt)). SetAccountBalance(uint64(orgAccountBalanceInt)). SetConsumeToken(uint64(orgConsumeTokenInt)). SetActiveUser(uint64(orgActiveUserInt)). SetNewUser(int64(orgNewUserInt)). SetAddtime(uint64(currentHourInt)). SetNotNilLabelDist(LabelsCount). Save(l.ctx) l.Errorf("save hour data error:%v \n", err) } /* 计算日数据 ---------------------------------------------------------------------------------------------------------- */ dayStr := time.Now().Format("20060102") day, _ := strconv.Atoi(dayStr) // 获取昨天的第一小时 yesterday := now.AddDate(0, 0, -1) yesterdayFirstHour := time.Date(yesterday.Year(), yesterday.Month(), now.Day(), 0, 0, 0, 0, now.Location()) yesterdayFirstHourInt, _ := strconv.Atoi(yesterdayFirstHour.Format("20060102")) // 获取昨天的最后一小时 yesterdayLastHour := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) yesterdayLastHourInt, _ := strconv.Atoi(yesterdayLastHour.Format("20060102")) for orgID, wxinfos := range wxbotsSet { var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64 var orgNewUserInt int64 for _, wxinfo := range wxinfos { l.Logger.Infof("开始计算日数据:%d\n", day) // 先判断该账号是否已经统计了日数据,如果已经统计了,就不需要再统计了 dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( usagestatisticday.Type(1), usagestatisticday.BotID(wxinfo.Wxid), usagestatisticday.Addtime(uint64(day)), ).Count(l.ctx) // 如果添加过了就略过 if dayDataCount > 0 { continue } hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.BotID(wxinfo.Wxid), usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)), usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)), ).All(l.ctx) var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64 var newUser int64 for _, hourData := range hourDataBatch { aiResponse += hourData.AiResponse sopRun += hourData.SopRun totalFriend += hourData.TotalFriend totalGroup += hourData.TotalGroup accountBalance += hourData.AccountBalance consumeToken += hourData.ConsumeToken activeUser += hourData.ActiveUser newUser += hourData.NewUser } orgAiResponseInt += aiResponse orgSopRunInt += sopRun orgFriendCountInt += totalFriend orgGroupCountInt += totalGroup orgAccountBalanceInt += accountBalance orgConsumeTokenInt += consumeToken orgActiveUserInt += activeUser orgNewUserInt += newUser _, err := l.svcCtx.DB.UsageStatisticDay.Create(). SetAddtime(uint64(day)). SetType(1). SetBotID(wxinfo.Wxid). SetOrganizationID(wxinfo.OrganizationID). SetAiResponse(aiResponse). SetSopRun(sopRun). SetTotalFriend(totalFriend). SetTotalGroup(totalGroup). SetAccountBalance(accountBalance). SetConsumeToken(consumeToken). SetActiveUser(activeUser). SetNewUser(newUser). Save(l.ctx) if err != nil { l.Errorf("create day data error:%v \n", err) continue } } // 先判断该租户是否已经统计了日数据,如果已经统计了,就不需要再统计了 dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( usagestatisticday.Type(1), usagestatisticday.OrganizationID(orgID), usagestatisticday.BotIDIsNil(), usagestatisticday.Addtime(uint64(day)), ).Count(l.ctx) // 如果添加过了就略过 if dayDataCount > 0 { continue } _, err := l.svcCtx.DB.UsageStatisticDay.Create(). SetAddtime(uint64(day)). SetType(1). SetOrganizationID(orgID). SetAiResponse(orgAiResponseInt). SetSopRun(orgSopRunInt). SetTotalFriend(orgFriendCountInt). SetTotalGroup(orgGroupCountInt). SetAccountBalance(orgAccountBalanceInt). SetConsumeToken(orgConsumeTokenInt). SetActiveUser(orgActiveUserInt). SetNewUser(orgNewUserInt). SetNotNilLabelDist(LabelsCountSet[orgID]). Save(l.ctx) if err != nil { l.Errorf("create day data error:%v \n", err) continue } } /* 查看月表数据是否已经完成 1. 查询出上月里所有 usagedetail 内容 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数 ---------------------------------------------------------------------------------------------------------- */ monthStr := time.Now().Format("200601") month, _ := strconv.Atoi(monthStr) for orgID, wxinfos := range wxbotsSet { var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64 var orgNewUserInt int64 for _, wxinfo := range wxinfos { l.Logger.Infof("开始计算月数据:%d\n", month) // 获取上月的第一天 monthFirstDay := time.Date(now.Year(), now.Month()-1, 1, 0, 0, 0, 0, now.Location()) monthFirstDayInt, _ := strconv.Atoi(monthFirstDay.Format("20060102")) // 获取上月的最后一天 monthLastDay := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) monthLastDayInt, _ := strconv.Atoi(monthLastDay.Format("20060102")) // 先判断该账号是否已经统计了月数据,如果已经统计了,就不需要再统计了 monthDataCount, _ := l.svcCtx.DB.UsageStatisticMonth.Query().Where( usagestatisticmonth.Type(1), usagestatisticmonth.BotID(wxinfo.Wxid), usagestatisticmonth.Addtime(uint64(month)), ).Count(l.ctx) // 如果添加过了就略过 if monthDataCount > 0 { continue } dayDataBatch, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( usagestatisticday.Type(1), usagestatisticday.BotID(wxinfo.Wxid), usagestatisticday.AddtimeGTE(uint64(monthFirstDayInt)), usagestatisticday.AddtimeLT(uint64(monthLastDayInt)), ).All(l.ctx) var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64 var newUser int64 for _, dayData := range dayDataBatch { aiResponse += dayData.AiResponse sopRun += dayData.SopRun totalFriend += dayData.TotalFriend totalGroup += dayData.TotalGroup accountBalance += dayData.AccountBalance consumeToken += dayData.ConsumeToken activeUser += dayData.ActiveUser newUser += dayData.NewUser } orgAiResponseInt += aiResponse orgSopRunInt += sopRun orgFriendCountInt += totalFriend orgGroupCountInt += totalGroup orgAccountBalanceInt += accountBalance orgConsumeTokenInt += consumeToken orgActiveUserInt += activeUser orgNewUserInt += newUser _, err := l.svcCtx.DB.UsageStatisticMonth.Create(). SetAddtime(uint64(month)). SetType(1). SetBotID(wxinfo.Wxid). SetOrganizationID(wxinfo.OrganizationID). SetAiResponse(aiResponse). SetSopRun(sopRun). SetTotalFriend(totalFriend). SetTotalGroup(totalGroup). SetAccountBalance(accountBalance). SetConsumeToken(consumeToken). SetActiveUser(activeUser). SetNewUser(newUser). Save(l.ctx) if err != nil { l.Errorf("create month data error:%v \n", err) continue } } // 先判断该租户是否已经统计了月数据,如果已经统计了,就不需要再统计了 monthDataCount, _ := l.svcCtx.DB.UsageStatisticMonth.Query().Where( usagestatisticmonth.Type(1), usagestatisticmonth.OrganizationID(orgID), usagestatisticmonth.BotIDIsNil(), usagestatisticmonth.Addtime(uint64(month)), ).Count(l.ctx) // 如果添加过了就略过 if monthDataCount > 0 { continue } _, err := l.svcCtx.DB.UsageStatisticMonth.Create(). SetAddtime(uint64(month)). SetType(1). SetOrganizationID(orgID). SetAiResponse(orgAiResponseInt). SetSopRun(orgSopRunInt). SetTotalFriend(orgFriendCountInt). SetTotalGroup(orgGroupCountInt). SetAccountBalance(orgAccountBalanceInt). SetConsumeToken(orgConsumeTokenInt). SetActiveUser(orgActiveUserInt). SetNewUser(orgNewUserInt). SetNotNilLabelDist(LabelsCountSet[orgID]). Save(l.ctx) if err != nil { l.Errorf("create month data error:%v \n", err) continue } } finishTime := time.Now() l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String()) return }