123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554 |
- package crontask
- import (
- "strconv"
- "strings"
- "time"
- "wechat-api/ent"
- "wechat-api/ent/contact"
- "wechat-api/ent/custom_types"
- "wechat-api/ent/labelrelationship"
- "wechat-api/ent/messagerecords"
- "wechat-api/ent/usagedetail"
- "wechat-api/ent/usagestatisticday"
- "wechat-api/ent/usagestatistichour"
- "wechat-api/ent/wx"
- )
- func (l *CronTask) computeStatistic() {
- startTime := time.Now()
-
- wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx)
- if err != nil {
- l.Errorf("fetch wxids error:%v\n", err)
- return
- }
- wxbotsSet := make(map[uint64][]*ent.Wx)
- for _, bot := range wxbots {
- if !strings.HasPrefix(bot.Wxid, "temp-") {
- wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot)
- }
- }
- LabelsCountSet := make(map[uint64][]custom_types.LabelDist)
-
-
- now := time.Now()
-
- currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
- currentHourInt, _ := strconv.Atoi(currentHour.Format("2006010215"))
-
- lastHour := currentHour.Add(-time.Hour * 1)
- lastHourInt, _ := strconv.Atoi(lastHour.Format("2006010215"))
- lc := []custom_types.LabelDist{}
- var allHourAiResponseInt, allHourSopRunInt, allHourFriendCountInt, allHourGroupCountInt, allHourAccountBalanceInt, allHourConsumeTokenInt, allHourActiveUserInt, allHourNewUserInt int
- for orgID, wxinfos := range wxbotsSet {
- var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt, orgNewUserInt int
- for _, wxinfo := range wxinfos {
- l.Logger.Infof("开始计算小时数据:%d\n", lastHourInt)
-
- var aiResponseInt, sopRunInt, friendCountInt, groupCountInt, accountBalanceInt, consumeTokenInt, activeUserInt, newUserInt int
- hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
- usagestatistichour.Type(1),
- usagestatistichour.BotID(wxinfo.Wxid),
- usagestatistichour.Addtime(uint64(lastHourInt)),
- ).Count(l.ctx)
- if hourDataCount > 0 {
- continue
- }
-
-
-
- sopresp, _ := l.svcCtx.DB.MessageRecords.Query().Where(
- messagerecords.SubSourceID(0),
- messagerecords.SourceTypeIn(3, 4),
- messagerecords.BotWxid(wxinfo.Wxid),
- messagerecords.CreatedAtGTE(lastHour),
- messagerecords.CreatedAtLT(currentHour),
- ).Count(l.ctx)
- airesp, _ := l.svcCtx.DB.UsageDetail.Query().Where(
- usagedetail.AppIn(1, 3),
- usagedetail.BotID(wxinfo.Wxid),
- usagedetail.CreatedAtGTE(lastHour),
- usagedetail.CreatedAtLT(currentHour),
- ).Count(l.ctx)
- aiResponseInt = sopresp + airesp
- orgAiResponseInt += aiResponseInt
- allHourAiResponseInt += aiResponseInt
-
- sopRunInt, _ = l.svcCtx.DB.MessageRecords.Query().Where(
- messagerecords.BotWxid(wxinfo.Wxid),
- messagerecords.SubSourceIDEQ(0),
- messagerecords.SourceTypeIn(3, 4),
- messagerecords.BotWxid(wxinfo.Wxid),
- messagerecords.CreatedAtGTE(lastHour),
- messagerecords.CreatedAtLT(currentHour),
- ).Count(l.ctx)
- orgSopRunInt += sopRunInt
- allHourSopRunInt += sopRunInt
-
- friendCountInt, _ = l.svcCtx.DB.Contact.Query().Where(
- contact.Type(1),
- contact.WxWxid(wxinfo.Wxid),
- contact.Ctype(1),
- ).Count(l.ctx)
- orgFriendCountInt += friendCountInt
- allHourFriendCountInt += friendCountInt
-
- groupCountInt, _ = l.svcCtx.DB.Contact.Query().Where(
- contact.Type(2),
- contact.WxWxid(wxinfo.Wxid),
- contact.Ctype(1),
- ).Count(l.ctx)
- orgGroupCountInt += groupCountInt
- allHourGroupCountInt += groupCountInt
-
- consumeTokenInt, _ = l.svcCtx.DB.UsageDetail.Query().Where(
- usagedetail.TypeEQ(1),
- usagedetail.BotID(wxinfo.Wxid),
- usagedetail.CreatedAtGTE(lastHour),
- usagedetail.CreatedAtLT(currentHour),
- ).Aggregate(ent.Sum("total_tokens")).Int(l.ctx)
- orgConsumeTokenInt += consumeTokenInt
- allHourConsumeTokenInt += consumeTokenInt
-
- accountBalanceInt = 0
- orgAccountBalanceInt = 0
- allHourAccountBalanceInt = 0
-
- activeUsers, _ := l.svcCtx.DB.UsageDetail.Query().Where(
- usagedetail.Type(1),
- usagedetail.BotID(wxinfo.Wxid),
- usagedetail.CreatedAtGTE(lastHour),
- usagedetail.CreatedAtLT(currentHour),
- ).GroupBy(usagedetail.FieldReceiverID).Strings(l.ctx)
- activeUserInt = len(activeUsers)
- orgActiveUserInt += activeUserInt
- allHourActiveUserInt += activeUserInt
- lastHourData, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
- usagestatistichour.AddtimeEQ(uint64(lastHourInt)),
- usagestatistichour.Type(1),
- usagestatistichour.BotID(wxinfo.Wxid),
- ).First(l.ctx)
- if lastHourData == nil {
- newUserInt = 0
- } else {
- newUserInt = friendCountInt - int(lastHourData.TotalFriend)
- }
- orgNewUserInt += newUserInt
- allHourNewUserInt += newUserInt
- _, err := l.svcCtx.DB.UsageStatisticHour.Create().
- SetType(1).
- SetBotID(wxinfo.Wxid).
- SetOrganizationID(wxinfo.OrganizationID).
- SetAiResponse(uint64(aiResponseInt)).
- SetSopRun(uint64(sopRunInt)).
- SetTotalFriend(uint64(friendCountInt)).
- SetTotalGroup(uint64(groupCountInt)).
- SetAccountBalance(uint64(accountBalanceInt)).
- SetConsumeToken(uint64(consumeTokenInt)).
- SetActiveUser(uint64(activeUserInt)).
- SetNewUser(int64(newUserInt)).
- SetAddtime(uint64(lastHourInt)).
- SetLabelDist(lc).
- Save(l.ctx)
- l.Errorf("save hour data error:%v \n", err)
- }
-
- hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
- usagestatistichour.Type(1),
- usagestatistichour.OrganizationID(orgID),
- usagestatistichour.BotID(""),
- usagestatistichour.Addtime(uint64(lastHourInt)),
- ).Count(l.ctx)
- if hourDataCount > 0 {
- continue
- }
- LabelsCount := []custom_types.LabelDist{}
- err := l.svcCtx.DB.LabelRelationship.Query().Where(
- labelrelationship.OrganizationIDEQ(orgID),
- ).GroupBy(labelrelationship.FieldLabelID).Aggregate(ent.Count()).Scan(l.ctx, &LabelsCount)
- l.Errorf("save hour data error:%v \n", err)
- LabelsCountSet[orgID] = LabelsCount
- _, err = l.svcCtx.DB.UsageStatisticHour.Create().
- SetType(1).
- SetOrganizationID(orgID).
- SetAiResponse(uint64(orgAiResponseInt)).
- SetSopRun(uint64(orgSopRunInt)).
- SetTotalFriend(uint64(orgFriendCountInt)).
- SetTotalGroup(uint64(orgGroupCountInt)).
- SetAccountBalance(uint64(orgAccountBalanceInt)).
- SetConsumeToken(uint64(orgConsumeTokenInt)).
- SetActiveUser(uint64(orgActiveUserInt)).
- SetNewUser(int64(orgNewUserInt)).
- SetAddtime(uint64(lastHourInt)).
- SetNotNilLabelDist(LabelsCount).
- Save(l.ctx)
- l.Errorf("save hour data error:%v \n", err)
- }
-
- hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
- usagestatistichour.Type(1),
- usagestatistichour.OrganizationID(0),
- usagestatistichour.BotID(""),
- usagestatistichour.Addtime(uint64(lastHourInt)),
- ).Count(l.ctx)
- if hourDataCount == 0 {
- _, err = l.svcCtx.DB.UsageStatisticHour.Create().
- SetType(1).
- SetOrganizationID(0).
- SetAiResponse(uint64(allHourAiResponseInt)).
- SetSopRun(uint64(allHourSopRunInt)).
- SetTotalFriend(uint64(allHourFriendCountInt)).
- SetTotalGroup(uint64(allHourGroupCountInt)).
- SetAccountBalance(uint64(allHourAccountBalanceInt)).
- SetConsumeToken(uint64(allHourConsumeTokenInt)).
- SetActiveUser(uint64(allHourActiveUserInt)).
- SetNewUser(int64(allHourNewUserInt)).
- SetAddtime(uint64(lastHourInt)).
- SetLabelDist(lc).
- Save(l.ctx)
- l.Errorf("save hour data error:%v \n", err)
- }
-
-
-
-
- yesterday := now.AddDate(0, 0, -1)
- yesterdayFirstHour := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, now.Location())
- yesterdayInt, _ := strconv.Atoi(yesterdayFirstHour.Format("20060102"))
- yesterdayFirstHourInt, _ := strconv.Atoi(yesterdayFirstHour.Format("2006010215"))
-
- yesterdayLastHour := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- yesterdayLastHourInt, _ := strconv.Atoi(yesterdayLastHour.Format("2006010215"))
- var allDayAiResponseInt, allDaySopRunInt, allDayFriendCountInt, allDayGroupCountInt, allDayAccountBalanceInt, allDayConsumeTokenInt, allDayActiveUserInt uint64
- var allDayNewUserInt int64
- for orgID, wxinfos := range wxbotsSet {
- var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64
- var orgNewUserInt int64
- for _, wxinfo := range wxinfos {
- l.Logger.Infof("开始计算日数据:%d\n", yesterdayInt)
- hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
- usagestatistichour.Type(1),
- usagestatistichour.BotID(wxinfo.Wxid),
- usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)),
- usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)),
- ).All(l.ctx)
- if hourDataBatch == nil {
- continue
- }
- var aiResponse, sopRun, totalFriend, totalGroup, accountBalance, consumeToken, activeUser uint64
- var newUser int64
- for _, hourData := range hourDataBatch {
- aiResponse += hourData.AiResponse
- sopRun += hourData.SopRun
- totalFriend = hourData.TotalFriend
- totalGroup = hourData.TotalGroup
- accountBalance = hourData.AccountBalance
- consumeToken += hourData.ConsumeToken
-
- newUser += hourData.NewUser
- }
-
- activeUsers, _ := l.svcCtx.DB.UsageDetail.Query().Where(
- usagedetail.Type(1),
- usagedetail.BotID(wxinfo.Wxid),
- usagedetail.CreatedAtGTE(yesterdayFirstHour),
- usagedetail.CreatedAtLT(yesterdayLastHour),
- ).GroupBy(usagedetail.FieldReceiverID).Strings(l.ctx)
- activeUser = uint64(len(activeUsers))
- orgAiResponseInt += aiResponse
- orgSopRunInt += sopRun
- orgFriendCountInt += totalFriend
- orgGroupCountInt += totalGroup
- orgAccountBalanceInt += accountBalance
- orgConsumeTokenInt += consumeToken
- orgActiveUserInt += activeUser
- orgNewUserInt += newUser
- allDayAiResponseInt += aiResponse
- allDaySopRunInt += sopRun
- allDayFriendCountInt += totalFriend
- allDayGroupCountInt += totalGroup
- allDayAccountBalanceInt += accountBalance
- allDayConsumeTokenInt += consumeToken
- allDayActiveUserInt += activeUser
- allDayNewUserInt += newUser
-
- dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
- usagestatisticday.Type(1),
- usagestatisticday.BotID(wxinfo.Wxid),
- usagestatisticday.Addtime(uint64(yesterdayInt)),
- ).Count(l.ctx)
-
- if dayDataCount == 0 && yesterdayLastHourInt <= currentHourInt {
- _, err := l.svcCtx.DB.UsageStatisticDay.Create().
- SetAddtime(uint64(yesterdayInt)).
- SetType(1).
- SetBotID(wxinfo.Wxid).
- SetOrganizationID(wxinfo.OrganizationID).
- SetAiResponse(aiResponse).
- SetSopRun(sopRun).
- SetTotalFriend(totalFriend).
- SetTotalGroup(totalGroup).
- SetAccountBalance(accountBalance).
- SetConsumeToken(consumeToken).
- SetActiveUser(activeUser).
- SetNewUser(newUser).
- SetLabelDist(lc).
- Save(l.ctx)
- if err != nil {
- l.Errorf("create day data error:%v \n", err)
- continue
- }
- }
- }
-
- dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
- usagestatisticday.Type(1),
- usagestatisticday.OrganizationID(orgID),
- usagestatisticday.BotID(""),
- usagestatisticday.Addtime(uint64(yesterdayInt)),
- ).Count(l.ctx)
-
- if dayDataCount == 0 && yesterdayLastHourInt <= currentHourInt {
- _, err := l.svcCtx.DB.UsageStatisticDay.Create().
- SetAddtime(uint64(yesterdayInt)).
- SetType(1).
- SetOrganizationID(orgID).
- SetAiResponse(orgAiResponseInt).
- SetSopRun(orgSopRunInt).
- SetTotalFriend(orgFriendCountInt).
- SetTotalGroup(orgGroupCountInt).
- SetAccountBalance(orgAccountBalanceInt).
- SetConsumeToken(orgConsumeTokenInt).
- SetActiveUser(orgActiveUserInt).
- SetNewUser(orgNewUserInt).
- SetNotNilLabelDist(LabelsCountSet[orgID]).
- Save(l.ctx)
- if err != nil {
- l.Errorf("create day data error:%v \n", err)
- continue
- }
- }
- }
-
- dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
- usagestatisticday.Type(1),
- usagestatisticday.OrganizationID(0),
- usagestatisticday.BotID(""),
- usagestatisticday.Addtime(uint64(yesterdayInt)),
- ).Count(l.ctx)
-
- if dayDataCount == 0 && yesterdayLastHourInt <= currentHourInt {
- _, err = l.svcCtx.DB.UsageStatisticDay.Create().
- SetAddtime(uint64(yesterdayInt)).
- SetType(1).
- SetOrganizationID(0).
- SetAiResponse(allDayAiResponseInt).
- SetSopRun(allDaySopRunInt).
- SetTotalFriend(allDayFriendCountInt).
- SetTotalGroup(allDayGroupCountInt).
- SetAccountBalance(allDayAccountBalanceInt).
- SetConsumeToken(allDayConsumeTokenInt).
- SetActiveUser(allDayActiveUserInt).
- SetNewUser(allDayNewUserInt).
- SetLabelDist(lc).
- Save(l.ctx)
- if err != nil {
- l.Errorf("create day data error:%v \n", err)
- }
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- finishTime := time.Now()
- l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
- return
- }
|