package crontask import ( "strconv" "strings" "time" "wechat-api/ent" "wechat-api/ent/contact" "wechat-api/ent/custom_types" "wechat-api/ent/labelrelationship" "wechat-api/ent/messagerecords" "wechat-api/ent/usagedetail" "wechat-api/ent/usagestatisticday" "wechat-api/ent/usagestatistichour" "wechat-api/ent/wx" ) func (l *CronTask) computeStatistic() { startTime := time.Now() // 获取所有机器人信息 wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx) if err != nil { l.Errorf("fetch wxids error:%v\n", err) return } wxbotsSet := make(map[uint64][]*ent.Wx) for _, bot := range wxbots { if !strings.HasPrefix(bot.Wxid, "temp-") { wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot) } } LabelsCountSet := make(map[uint64][]custom_types.LabelDist) /* 计算本小时的数据 1. 查询出上小时里所有 usagedetail 内容 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数 3. 遍历的时候可能有重复,所以要先检查是否生成了数据,如果有就忽略,没有再生成 ---------------------------------------------------------------------------------------------------------- */ // 获取当前时间 now := time.Now() // 获取本小时的第一分钟 currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) currentHourInt, _ := strconv.Atoi(currentHour.Format("2006010215")) // 上一个小时的起始时间 lastHour := currentHour.Add(-time.Hour * 1) lastHourInt, _ := strconv.Atoi(lastHour.Format("2006010215")) lc := []custom_types.LabelDist{} var allHourAiResponseInt, allHourSopRunInt, allHourFriendCountInt, allHourGroupCountInt, allHourAccountBalanceInt, allHourConsumeTokenInt, allHourActiveUserInt, allHourNewUserInt int for orgID, wxinfos := range wxbotsSet { var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt, orgNewUserInt int for _, wxinfo := range wxinfos { l.Logger.Infof("开始计算小时数据:%d\n", lastHourInt) // 先判断该账号是否已经统计了小时数据,如果已经统计了,就不需要再统计了 var aiResponseInt, sopRunInt, friendCountInt, groupCountInt, accountBalanceInt, consumeTokenInt, activeUserInt, newUserInt int hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.BotID(wxinfo.Wxid), usagestatistichour.Addtime(uint64(lastHourInt)), ).Count(l.ctx) if hourDataCount > 0 { continue } // AI回复包括:SOP次数+AI次数 // SOP次数:content 非空,source_type = 3 或 4,sub_source_id = 0 // AI次数:app = 1 或 3 sopresp, _ := l.svcCtx.DB.MessageRecords.Query().Where( messagerecords.SubSourceID(0), messagerecords.SourceTypeIn(3, 4), messagerecords.BotWxid(wxinfo.Wxid), messagerecords.CreatedAtGTE(lastHour), messagerecords.CreatedAtLT(currentHour), ).Count(l.ctx) airesp, _ := l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.AppIn(1, 3), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(lastHour), usagedetail.CreatedAtLT(currentHour), ).Count(l.ctx) aiResponseInt = sopresp + airesp orgAiResponseInt += aiResponseInt allHourAiResponseInt += aiResponseInt // SOP执行次数:SOP阶段和节点的执行次数。 sopRunInt, _ = l.svcCtx.DB.MessageRecords.Query().Where( messagerecords.BotWxid(wxinfo.Wxid), messagerecords.SubSourceIDEQ(0), messagerecords.SourceTypeIn(3, 4), messagerecords.BotWxid(wxinfo.Wxid), messagerecords.CreatedAtGTE(lastHour), messagerecords.CreatedAtLT(currentHour), ).Count(l.ctx) orgSopRunInt += sopRunInt allHourSopRunInt += sopRunInt // 好友总数:contact 表中 type=1 friendCountInt, _ = l.svcCtx.DB.Contact.Query().Where( contact.Type(1), contact.WxWxid(wxinfo.Wxid), contact.Ctype(1), ).Count(l.ctx) orgFriendCountInt += friendCountInt allHourFriendCountInt += friendCountInt // 群总数:contact 表中 type=2 groupCountInt, _ = l.svcCtx.DB.Contact.Query().Where( contact.Type(2), contact.WxWxid(wxinfo.Wxid), contact.Ctype(1), ).Count(l.ctx) orgGroupCountInt += groupCountInt allHourGroupCountInt += groupCountInt // 消耗Token数:usage_detail 表 consumeTokenInt, _ = l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.TypeEQ(1), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(lastHour), usagedetail.CreatedAtLT(currentHour), ).Aggregate(ent.Sum("total_tokens")).Int(l.ctx) orgConsumeTokenInt += consumeTokenInt allHourConsumeTokenInt += consumeTokenInt // 账户余额 accountBalanceInt = 0 orgAccountBalanceInt = 0 allHourAccountBalanceInt = 0 // 活跃好友:usage_detail 表 type = 1 activeUsers, _ := l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.Type(1), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(lastHour), usagedetail.CreatedAtLT(currentHour), ).GroupBy(usagedetail.FieldReceiverID).Strings(l.ctx) activeUserInt = len(activeUsers) orgActiveUserInt += activeUserInt allHourActiveUserInt += activeUserInt lastHourData, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.AddtimeEQ(uint64(lastHourInt)), usagestatistichour.Type(1), usagestatistichour.BotID(wxinfo.Wxid), ).First(l.ctx) if lastHourData == nil { newUserInt = 0 } else { newUserInt = friendCountInt - int(lastHourData.TotalFriend) } orgNewUserInt += newUserInt allHourNewUserInt += newUserInt _, err := l.svcCtx.DB.UsageStatisticHour.Create(). SetType(1). SetBotID(wxinfo.Wxid). SetOrganizationID(wxinfo.OrganizationID). SetAiResponse(uint64(aiResponseInt)). SetSopRun(uint64(sopRunInt)). SetTotalFriend(uint64(friendCountInt)). SetTotalGroup(uint64(groupCountInt)). SetAccountBalance(uint64(accountBalanceInt)). SetConsumeToken(uint64(consumeTokenInt)). SetActiveUser(uint64(activeUserInt)). SetNewUser(int64(newUserInt)). SetAddtime(uint64(lastHourInt)). SetLabelDist(lc). Save(l.ctx) l.Errorf("save hour data error:%v \n", err) } // 先判断该租户是否已经统计了小时数据,如果已经统计了,就不需要再统计了 hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.OrganizationID(orgID), usagestatistichour.BotID(""), usagestatistichour.Addtime(uint64(lastHourInt)), ).Count(l.ctx) if hourDataCount > 0 { continue } LabelsCount := []custom_types.LabelDist{} err := l.svcCtx.DB.LabelRelationship.Query().Where( labelrelationship.OrganizationIDEQ(orgID), ).GroupBy(labelrelationship.FieldLabelID).Aggregate(ent.Count()).Scan(l.ctx, &LabelsCount) l.Errorf("save hour data error:%v \n", err) LabelsCountSet[orgID] = LabelsCount _, err = l.svcCtx.DB.UsageStatisticHour.Create(). SetType(1). SetOrganizationID(orgID). SetAiResponse(uint64(orgAiResponseInt)). SetSopRun(uint64(orgSopRunInt)). SetTotalFriend(uint64(orgFriendCountInt)). SetTotalGroup(uint64(orgGroupCountInt)). SetAccountBalance(uint64(orgAccountBalanceInt)). SetConsumeToken(uint64(orgConsumeTokenInt)). SetActiveUser(uint64(orgActiveUserInt)). SetNewUser(int64(orgNewUserInt)). SetAddtime(uint64(lastHourInt)). SetNotNilLabelDist(LabelsCount). Save(l.ctx) l.Errorf("save hour data error:%v \n", err) } // 先判断该租户是否已经统计了小时数据,如果已经统计了,就不需要再统计了 hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.OrganizationID(0), usagestatistichour.BotID(""), usagestatistichour.Addtime(uint64(lastHourInt)), ).Count(l.ctx) if hourDataCount == 0 { _, err = l.svcCtx.DB.UsageStatisticHour.Create(). SetType(1). SetOrganizationID(0). SetAiResponse(uint64(allHourAiResponseInt)). SetSopRun(uint64(allHourSopRunInt)). SetTotalFriend(uint64(allHourFriendCountInt)). SetTotalGroup(uint64(allHourGroupCountInt)). SetAccountBalance(uint64(allHourAccountBalanceInt)). SetConsumeToken(uint64(allHourConsumeTokenInt)). SetActiveUser(uint64(allHourActiveUserInt)). SetNewUser(int64(allHourNewUserInt)). SetAddtime(uint64(lastHourInt)). SetLabelDist(lc). Save(l.ctx) l.Errorf("save hour data error:%v \n", err) } /* 计算日数据 ---------------------------------------------------------------------------------------------------------- */ //dayStr := time.Now().Format("20060102") //day, _ := strconv.Atoi(dayStr) // 获取昨天的第一小时 yesterday := now.AddDate(0, 0, -1) yesterdayFirstHour := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, now.Location()) yesterdayInt, _ := strconv.Atoi(yesterdayFirstHour.Format("20060102")) yesterdayFirstHourInt, _ := strconv.Atoi(yesterdayFirstHour.Format("2006010215")) // 获取昨天的最后一小时 yesterdayLastHour := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) yesterdayLastHourInt, _ := strconv.Atoi(yesterdayLastHour.Format("2006010215")) var allDayAiResponseInt, allDaySopRunInt, allDayFriendCountInt, allDayGroupCountInt, allDayAccountBalanceInt, allDayConsumeTokenInt, allDayActiveUserInt uint64 var allDayNewUserInt int64 for orgID, wxinfos := range wxbotsSet { var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64 var orgNewUserInt int64 for _, wxinfo := range wxinfos { l.Logger.Infof("开始计算日数据:%d\n", yesterdayInt) hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where( usagestatistichour.Type(1), usagestatistichour.BotID(wxinfo.Wxid), usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)), usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)), ).All(l.ctx) if hourDataBatch == nil { continue } var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64 var newUser int64 for _, hourData := range hourDataBatch { aiResponse += hourData.AiResponse sopRun += hourData.SopRun totalFriend = hourData.TotalFriend totalGroup = hourData.TotalGroup accountBalance = hourData.AccountBalance consumeToken += hourData.ConsumeToken //activeUser += hourData.ActiveUser newUser += hourData.NewUser } // 活跃好友:usage_detail 表 type = 1 activeUsers, _ := l.svcCtx.DB.UsageDetail.Query().Where( usagedetail.Type(1), usagedetail.BotID(wxinfo.Wxid), usagedetail.CreatedAtGTE(yesterdayFirstHour), usagedetail.CreatedAtLT(yesterdayLastHour), ).GroupBy(usagedetail.FieldReceiverID).Strings(l.ctx) activeUser = uint64(len(activeUsers)) orgAiResponseInt += aiResponse orgSopRunInt += sopRun orgFriendCountInt += totalFriend orgGroupCountInt += totalGroup orgAccountBalanceInt += accountBalance orgConsumeTokenInt += consumeToken orgActiveUserInt += activeUser orgNewUserInt += newUser allDayAiResponseInt += aiResponse allDaySopRunInt += sopRun allDayFriendCountInt += totalFriend allDayGroupCountInt += totalGroup allDayAccountBalanceInt += accountBalance allDayConsumeTokenInt += consumeToken allDayActiveUserInt += activeUser allDayNewUserInt += newUser // 先判断该账号是否已经统计了日数据,如果已经统计了,就不需要再统计了 dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( usagestatisticday.Type(1), usagestatisticday.BotID(wxinfo.Wxid), usagestatisticday.Addtime(uint64(yesterdayInt)), ).Count(l.ctx) // 如果添加过了就略过 if dayDataCount == 0 && yesterdayLastHourInt <= currentHourInt { _, err := l.svcCtx.DB.UsageStatisticDay.Create(). SetAddtime(uint64(yesterdayInt)). SetType(1). SetBotID(wxinfo.Wxid). SetOrganizationID(wxinfo.OrganizationID). SetAiResponse(aiResponse). SetSopRun(sopRun). SetTotalFriend(totalFriend). SetTotalGroup(totalGroup). SetAccountBalance(accountBalance). SetConsumeToken(consumeToken). SetActiveUser(activeUser). SetNewUser(newUser). SetLabelDist(lc). Save(l.ctx) if err != nil { l.Errorf("create day data error:%v \n", err) continue } } } // 先判断该租户是否已经统计了日数据,如果已经统计了,就不需要再统计了 dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( usagestatisticday.Type(1), usagestatisticday.OrganizationID(orgID), usagestatisticday.BotID(""), usagestatisticday.Addtime(uint64(yesterdayInt)), ).Count(l.ctx) // 如果添加过了就略过 if dayDataCount == 0 && yesterdayLastHourInt <= currentHourInt { _, err := l.svcCtx.DB.UsageStatisticDay.Create(). SetAddtime(uint64(yesterdayInt)). SetType(1). SetOrganizationID(orgID). SetAiResponse(orgAiResponseInt). SetSopRun(orgSopRunInt). SetTotalFriend(orgFriendCountInt). SetTotalGroup(orgGroupCountInt). SetAccountBalance(orgAccountBalanceInt). SetConsumeToken(orgConsumeTokenInt). SetActiveUser(orgActiveUserInt). SetNewUser(orgNewUserInt). SetNotNilLabelDist(LabelsCountSet[orgID]). Save(l.ctx) if err != nil { l.Errorf("create day data error:%v \n", err) continue } } } // 先判断该租户是否已经统计了日数据,如果已经统计了,就不需要再统计了 dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( usagestatisticday.Type(1), usagestatisticday.OrganizationID(0), usagestatisticday.BotID(""), usagestatisticday.Addtime(uint64(yesterdayInt)), ).Count(l.ctx) // 如果添加过了就略过 if dayDataCount == 0 && yesterdayLastHourInt <= currentHourInt { _, err = l.svcCtx.DB.UsageStatisticDay.Create(). SetAddtime(uint64(yesterdayInt)). SetType(1). SetOrganizationID(0). SetAiResponse(allDayAiResponseInt). SetSopRun(allDaySopRunInt). SetTotalFriend(allDayFriendCountInt). SetTotalGroup(allDayGroupCountInt). SetAccountBalance(allDayAccountBalanceInt). SetConsumeToken(allDayConsumeTokenInt). SetActiveUser(allDayActiveUserInt). SetNewUser(allDayNewUserInt). SetLabelDist(lc). Save(l.ctx) if err != nil { l.Errorf("create day data error:%v \n", err) } } /* 查看月表数据是否已经完成 1. 查询出上月里所有 usagedetail 内容 2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数 ---------------------------------------------------------------------------------------------------------- */ //monthStr := time.Now().Format("200601") //month, _ := strconv.Atoi(monthStr) // //var allMonthAiResponseInt, allMonthSopRunInt, allMonthFriendCountInt, allMonthGroupCountInt, allMonthAccountBalanceInt, allMonthConsumeTokenInt, allMonthActiveUserInt uint64 //var allMonthNewUserInt int64 //for orgID, wxinfos := range wxbotsSet { // var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64 // var orgNewUserInt int64 // for _, wxinfo := range wxinfos { // l.Logger.Infof("开始计算月数据:%d\n", month) // // // 获取上月的第一天 // monthFirstDay := time.Date(now.Year(), now.Month()-1, 1, 0, 0, 0, 0, now.Location()) // monthFirstDayInt, _ := strconv.Atoi(monthFirstDay.Format("20060102")) // // // 获取上月的最后一天 // monthLastDay := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) // monthLastDayInt, _ := strconv.Atoi(monthLastDay.Format("20060102")) // // // 先判断该账号是否已经统计了月数据,如果已经统计了,就不需要再统计了 // monthDataCount, _ := l.svcCtx.DB.UsageStatisticMonth.Query().Where( // usagestatisticmonth.Type(1), // usagestatisticmonth.BotID(wxinfo.Wxid), // usagestatisticmonth.Addtime(uint64(month)), // ).Count(l.ctx) // // // 如果添加过了就略过 // if monthDataCount > 0 { // continue // } // // dayDataBatch, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where( // usagestatisticday.Type(1), // usagestatisticday.BotID(wxinfo.Wxid), // usagestatisticday.AddtimeGTE(uint64(monthFirstDayInt)), // usagestatisticday.AddtimeLT(uint64(monthLastDayInt)), // ).All(l.ctx) // // var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64 // var newUser int64 // for _, dayData := range dayDataBatch { // aiResponse += dayData.AiResponse // sopRun += dayData.SopRun // totalFriend += dayData.TotalFriend // totalGroup += dayData.TotalGroup // accountBalance += dayData.AccountBalance // consumeToken += dayData.ConsumeToken // activeUser += dayData.ActiveUser // newUser += dayData.NewUser // } // orgAiResponseInt += aiResponse // orgSopRunInt += sopRun // orgFriendCountInt += totalFriend // orgGroupCountInt += totalGroup // orgAccountBalanceInt += accountBalance // orgConsumeTokenInt += consumeToken // orgActiveUserInt += activeUser // orgNewUserInt += newUser // // allMonthAiResponseInt += aiResponse // allMonthSopRunInt += sopRun // allMonthFriendCountInt += totalFriend // allMonthGroupCountInt += totalGroup // allMonthAccountBalanceInt += accountBalance // allMonthConsumeTokenInt += consumeToken // allMonthActiveUserInt += activeUser // allMonthNewUserInt += newUser // // _, err := l.svcCtx.DB.UsageStatisticMonth.Create(). // SetAddtime(uint64(month)). // SetType(1). // SetBotID(wxinfo.Wxid). // SetOrganizationID(wxinfo.OrganizationID). // SetAiResponse(aiResponse). // SetSopRun(sopRun). // SetTotalFriend(totalFriend). // SetTotalGroup(totalGroup). // SetAccountBalance(accountBalance). // SetConsumeToken(consumeToken). // SetActiveUser(activeUser). // SetNewUser(newUser). // SetLabelDist(lc). // Save(l.ctx) // if err != nil { // l.Errorf("create month data error:%v \n", err) // continue // } // } // // 先判断该租户是否已经统计了月数据,如果已经统计了,就不需要再统计了 // monthDataCount, _ := l.svcCtx.DB.UsageStatisticMonth.Query().Where( // usagestatisticmonth.Type(1), // usagestatisticmonth.OrganizationID(orgID), // usagestatisticmonth.BotIDIsNil(), // usagestatisticmonth.Addtime(uint64(month)), // ).Count(l.ctx) // // // 如果添加过了就略过 // if monthDataCount > 0 { // continue // } // // _, err := l.svcCtx.DB.UsageStatisticMonth.Create(). // SetAddtime(uint64(month)). // SetType(1). // SetOrganizationID(orgID). // SetAiResponse(orgAiResponseInt). // SetSopRun(orgSopRunInt). // SetTotalFriend(orgFriendCountInt). // SetTotalGroup(orgGroupCountInt). // SetAccountBalance(orgAccountBalanceInt). // SetConsumeToken(orgConsumeTokenInt). // SetActiveUser(orgActiveUserInt). // SetNewUser(orgNewUserInt). // SetNotNilLabelDist(LabelsCountSet[orgID]). // Save(l.ctx) // if err != nil { // l.Errorf("create month data error:%v \n", err) // continue // } //} // //_, err = l.svcCtx.DB.UsageStatisticMonth.Create(). // SetAddtime(uint64(month)). // SetType(1). // SetOrganizationID(0). // SetAiResponse(allMonthAiResponseInt). // SetSopRun(allMonthSopRunInt). // SetTotalFriend(allMonthFriendCountInt). // SetTotalGroup(allMonthGroupCountInt). // SetAccountBalance(allMonthAccountBalanceInt). // SetConsumeToken(allMonthConsumeTokenInt). // SetActiveUser(allMonthActiveUserInt). // SetNewUser(allMonthNewUserInt). // SetNotNilLabelDist(lc). // Save(l.ctx) //if err != nil { // l.Errorf("create month data error:%v \n", err) //} finishTime := time.Now() l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String()) return }