|
@@ -2,35 +2,40 @@ package crontask
|
|
|
|
|
|
import (
|
|
|
"strconv"
|
|
|
- "strings"
|
|
|
"time"
|
|
|
"wechat-api/ent"
|
|
|
"wechat-api/ent/contact"
|
|
|
"wechat-api/ent/custom_types"
|
|
|
+ "wechat-api/ent/label"
|
|
|
"wechat-api/ent/labelrelationship"
|
|
|
"wechat-api/ent/messagerecords"
|
|
|
"wechat-api/ent/usagedetail"
|
|
|
"wechat-api/ent/usagestatisticday"
|
|
|
"wechat-api/ent/usagestatistichour"
|
|
|
- "wechat-api/ent/wx"
|
|
|
+ //"wechat-api/ent/wx"
|
|
|
)
|
|
|
|
|
|
+type OrgBot struct {
|
|
|
+ OrganizationID uint64 `json:"organization_id"`
|
|
|
+ BotID string `json:"bot_id"`
|
|
|
+}
|
|
|
+
|
|
|
func (l *CronTask) computeStatistic() {
|
|
|
startTime := time.Now()
|
|
|
|
|
|
// 获取所有机器人信息
|
|
|
- wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx)
|
|
|
- if err != nil {
|
|
|
- l.Errorf("fetch wxids error:%v\n", err)
|
|
|
- return
|
|
|
- }
|
|
|
+ //wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx)
|
|
|
+ //if err != nil {
|
|
|
+ // l.Errorf("fetch wxids error:%v\n", err)
|
|
|
+ // return
|
|
|
+ //}
|
|
|
|
|
|
- wxbotsSet := make(map[uint64][]*ent.Wx)
|
|
|
- for _, bot := range wxbots {
|
|
|
- if !strings.HasPrefix(bot.Wxid, "temp-") {
|
|
|
- wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot)
|
|
|
- }
|
|
|
- }
|
|
|
+ //wxbotsSet := make(map[uint64][]*ent.Wx)
|
|
|
+ //for _, bot := range wxbots {
|
|
|
+ // if !strings.HasPrefix(bot.Wxid, "temp-") {
|
|
|
+ // wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot)
|
|
|
+ // }
|
|
|
+ //}
|
|
|
LabelsCountSet := make(map[uint64][]custom_types.LabelDist)
|
|
|
|
|
|
/*
|
|
@@ -56,10 +61,30 @@ func (l *CronTask) computeStatistic() {
|
|
|
|
|
|
var allHourAiResponseInt, allHourSopRunInt, allHourFriendCountInt, allHourGroupCountInt, allHourAccountBalanceInt, allHourConsumeTokenInt, allHourActiveUserInt, allHourNewUserInt int
|
|
|
var allHourConsumeCoinFloat float64
|
|
|
- for orgID, wxinfos := range wxbotsSet {
|
|
|
+
|
|
|
+ orgBots := []OrgBot{}
|
|
|
+ err := l.svcCtx.DB.UsageDetail.Query().Where(
|
|
|
+ usagedetail.CreatedAtGTE(lastHour),
|
|
|
+ usagedetail.CreatedAtLT(currentHour),
|
|
|
+ ).
|
|
|
+ GroupBy(usagedetail.FieldOrganizationID, usagedetail.FieldBotID).
|
|
|
+ Aggregate().
|
|
|
+ Scan(l.ctx, &orgBots)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ l.Errorf("group usage_detail error: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ wxbotsSet := make(map[uint64][]string)
|
|
|
+ for _, ob := range orgBots {
|
|
|
+ wxbotsSet[ob.OrganizationID] = append(wxbotsSet[ob.OrganizationID], ob.BotID)
|
|
|
+ }
|
|
|
+
|
|
|
+ for orgID, wxids := range wxbotsSet {
|
|
|
var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt, orgNewUserInt int
|
|
|
var orgHourConsumeCoinFloat float64
|
|
|
- for _, wxinfo := range wxinfos {
|
|
|
+ for _, wxid := range wxids {
|
|
|
l.Logger.Infof("开始计算小时数据:%d\n", lastHourInt)
|
|
|
|
|
|
// 先判断该账号是否已经统计了小时数据,如果已经统计了,就不需要再统计了
|
|
@@ -67,7 +92,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
var consumeCoinFloat float64
|
|
|
hourDataCount, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
|
|
|
usagestatistichour.Type(1),
|
|
|
- usagestatistichour.BotID(wxinfo.Wxid),
|
|
|
+ usagestatistichour.BotID(wxid),
|
|
|
+ usagestatistichour.OrganizationIDEQ(orgID),
|
|
|
usagestatistichour.Addtime(uint64(lastHourInt)),
|
|
|
).Count(l.ctx)
|
|
|
if hourDataCount > 0 {
|
|
@@ -80,13 +106,15 @@ func (l *CronTask) computeStatistic() {
|
|
|
sopresp, _ := l.svcCtx.DB.MessageRecords.Query().Where(
|
|
|
messagerecords.SubSourceID(0),
|
|
|
messagerecords.SourceTypeIn(3, 4),
|
|
|
- messagerecords.BotWxid(wxinfo.Wxid),
|
|
|
+ messagerecords.BotWxid(wxid),
|
|
|
+ messagerecords.OrganizationIDEQ(orgID),
|
|
|
messagerecords.CreatedAtGTE(lastHour),
|
|
|
messagerecords.CreatedAtLT(currentHour),
|
|
|
).Count(l.ctx)
|
|
|
airesp, _ := l.svcCtx.DB.UsageDetail.Query().Where(
|
|
|
usagedetail.AppIn(1, 3),
|
|
|
- usagedetail.BotID(wxinfo.Wxid),
|
|
|
+ usagedetail.BotID(wxid),
|
|
|
+ usagedetail.OrganizationIDEQ(orgID),
|
|
|
usagedetail.CreatedAtGTE(lastHour),
|
|
|
usagedetail.CreatedAtLT(currentHour),
|
|
|
).Count(l.ctx)
|
|
@@ -96,10 +124,10 @@ func (l *CronTask) computeStatistic() {
|
|
|
|
|
|
// SOP执行次数:SOP阶段和节点的执行次数。
|
|
|
sopRunInt, _ = l.svcCtx.DB.MessageRecords.Query().Where(
|
|
|
- messagerecords.BotWxid(wxinfo.Wxid),
|
|
|
messagerecords.SubSourceIDEQ(0),
|
|
|
messagerecords.SourceTypeIn(3, 4),
|
|
|
- messagerecords.BotWxid(wxinfo.Wxid),
|
|
|
+ messagerecords.BotWxid(wxid),
|
|
|
+ messagerecords.OrganizationIDEQ(orgID),
|
|
|
messagerecords.CreatedAtGTE(lastHour),
|
|
|
messagerecords.CreatedAtLT(currentHour),
|
|
|
messagerecords.StatusIn(2, 3, 4),
|
|
@@ -110,7 +138,7 @@ func (l *CronTask) computeStatistic() {
|
|
|
// 好友总数:contact 表中 type=1
|
|
|
friendCountInt, _ = l.svcCtx.DB.Contact.Query().Where(
|
|
|
contact.Type(1),
|
|
|
- contact.WxWxid(wxinfo.Wxid),
|
|
|
+ contact.WxWxid(wxid),
|
|
|
contact.Ctype(1),
|
|
|
).Count(l.ctx)
|
|
|
orgFriendCountInt += friendCountInt
|
|
@@ -119,7 +147,7 @@ func (l *CronTask) computeStatistic() {
|
|
|
// 群总数:contact 表中 type=2
|
|
|
groupCountInt, _ = l.svcCtx.DB.Contact.Query().Where(
|
|
|
contact.Type(2),
|
|
|
- contact.WxWxid(wxinfo.Wxid),
|
|
|
+ contact.WxWxid(wxid),
|
|
|
contact.Ctype(1),
|
|
|
).Count(l.ctx)
|
|
|
orgGroupCountInt += groupCountInt
|
|
@@ -127,7 +155,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
|
|
|
// 消耗Token数:usage_detail 表
|
|
|
consumeTokenInt, _ = l.svcCtx.DB.UsageDetail.Query().Where(
|
|
|
- usagedetail.BotID(wxinfo.Wxid),
|
|
|
+ usagedetail.BotID(wxid),
|
|
|
+ usagedetail.OrganizationIDEQ(orgID),
|
|
|
usagedetail.CreatedAtGTE(lastHour),
|
|
|
usagedetail.CreatedAtLT(currentHour),
|
|
|
).Aggregate(ent.Sum("total_tokens")).Int(l.ctx)
|
|
@@ -136,7 +165,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
|
|
|
// 计算积分消耗
|
|
|
consumeCoinFloat, _ = l.svcCtx.DB.UsageDetail.Query().Where(
|
|
|
- usagedetail.BotID(wxinfo.Wxid),
|
|
|
+ usagedetail.BotID(wxid),
|
|
|
+ usagedetail.OrganizationIDEQ(orgID),
|
|
|
usagedetail.CreatedAtGTE(lastHour),
|
|
|
usagedetail.CreatedAtLT(currentHour),
|
|
|
).Aggregate(ent.Sum("credits")).Float64(l.ctx)
|
|
@@ -151,7 +181,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
// 活跃好友:usage_detail 表 type = 1
|
|
|
activeUsers, _ := l.svcCtx.DB.UsageDetail.Query().Where(
|
|
|
usagedetail.TypeIn(1, 6),
|
|
|
- usagedetail.BotID(wxinfo.Wxid),
|
|
|
+ usagedetail.BotID(wxid),
|
|
|
+ usagedetail.OrganizationIDEQ(orgID),
|
|
|
usagedetail.CreatedAtGTE(lastHour),
|
|
|
usagedetail.CreatedAtLT(currentHour),
|
|
|
).GroupBy(usagedetail.FieldReceiverID).Strings(l.ctx)
|
|
@@ -162,7 +193,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
lastHourData, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
|
|
|
usagestatistichour.AddtimeEQ(uint64(lastHourInt)),
|
|
|
usagestatistichour.Type(1),
|
|
|
- usagestatistichour.BotID(wxinfo.Wxid),
|
|
|
+ usagestatistichour.BotID(wxid),
|
|
|
+ usagestatistichour.OrganizationIDEQ(orgID),
|
|
|
).First(l.ctx)
|
|
|
|
|
|
if lastHourData == nil {
|
|
@@ -175,8 +207,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
|
|
|
_, err := l.svcCtx.DB.UsageStatisticHour.Create().
|
|
|
SetType(1).
|
|
|
- SetBotID(wxinfo.Wxid).
|
|
|
- SetOrganizationID(wxinfo.OrganizationID).
|
|
|
+ SetBotID(wxid).
|
|
|
+ SetOrganizationID(orgID).
|
|
|
SetAiResponse(uint64(aiResponseInt)).
|
|
|
SetSopRun(uint64(sopRunInt)).
|
|
|
SetTotalFriend(uint64(friendCountInt)).
|
|
@@ -204,7 +236,9 @@ func (l *CronTask) computeStatistic() {
|
|
|
|
|
|
LabelsCount := []custom_types.LabelDist{}
|
|
|
err := l.svcCtx.DB.LabelRelationship.Query().Where(
|
|
|
- labelrelationship.OrganizationIDEQ(orgID),
|
|
|
+ labelrelationship.HasLabelsWith(
|
|
|
+ label.OrganizationIDEQ(orgID),
|
|
|
+ ),
|
|
|
).GroupBy(labelrelationship.FieldLabelID).Aggregate(ent.Count()).Scan(l.ctx, &LabelsCount)
|
|
|
l.Errorf("save hour data error:%v \n", err)
|
|
|
LabelsCountSet[orgID] = LabelsCount
|
|
@@ -273,16 +307,18 @@ func (l *CronTask) computeStatistic() {
|
|
|
var allDayAiResponseInt, allDaySopRunInt, allDayFriendCountInt, allDayGroupCountInt, allDayAccountBalanceInt, allDayConsumeTokenInt, allDayActiveUserInt uint64
|
|
|
var allDayNewUserInt int64
|
|
|
var allDayConsumeCoinFloat float64
|
|
|
- for orgID, wxinfos := range wxbotsSet {
|
|
|
+
|
|
|
+ for orgID, wxids := range wxbotsSet {
|
|
|
var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64
|
|
|
var orgNewUserInt int64
|
|
|
var orgDayConsumeCoinFloat float64
|
|
|
- for _, wxinfo := range wxinfos {
|
|
|
+ for _, wxid := range wxids {
|
|
|
l.Logger.Infof("开始计算日数据:%d\n", yesterdayInt)
|
|
|
|
|
|
hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
|
|
|
usagestatistichour.Type(1),
|
|
|
- usagestatistichour.BotID(wxinfo.Wxid),
|
|
|
+ usagestatistichour.BotID(wxid),
|
|
|
+ usagestatistichour.OrganizationIDEQ(orgID),
|
|
|
usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)),
|
|
|
usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)),
|
|
|
).All(l.ctx)
|
|
@@ -308,7 +344,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
// 活跃好友:usage_detail 表 type = 1
|
|
|
activeUsers, _ := l.svcCtx.DB.UsageDetail.Query().Where(
|
|
|
usagedetail.TypeIn(1, 6),
|
|
|
- usagedetail.BotID(wxinfo.Wxid),
|
|
|
+ usagedetail.BotID(wxid),
|
|
|
+ usagedetail.OrganizationIDEQ(orgID),
|
|
|
usagedetail.CreatedAtGTE(yesterdayFirstHour),
|
|
|
usagedetail.CreatedAtLT(yesterdayLastHour),
|
|
|
).GroupBy(usagedetail.FieldReceiverID).Strings(l.ctx)
|
|
@@ -337,7 +374,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
// 先判断该账号是否已经统计了日数据,如果已经统计了,就不需要再统计了
|
|
|
dayDataCount, _ := l.svcCtx.DB.UsageStatisticDay.Query().Where(
|
|
|
usagestatisticday.Type(1),
|
|
|
- usagestatisticday.BotID(wxinfo.Wxid),
|
|
|
+ usagestatisticday.BotID(wxid),
|
|
|
+ usagestatisticday.OrganizationIDEQ(orgID),
|
|
|
usagestatisticday.Addtime(uint64(yesterdayInt)),
|
|
|
).Count(l.ctx)
|
|
|
|
|
@@ -346,8 +384,8 @@ func (l *CronTask) computeStatistic() {
|
|
|
_, err := l.svcCtx.DB.UsageStatisticDay.Create().
|
|
|
SetAddtime(uint64(yesterdayInt)).
|
|
|
SetType(1).
|
|
|
- SetBotID(wxinfo.Wxid).
|
|
|
- SetOrganizationID(wxinfo.OrganizationID).
|
|
|
+ SetBotID(wxid).
|
|
|
+ SetOrganizationID(orgID).
|
|
|
SetAiResponse(aiResponse).
|
|
|
SetSopRun(sopRun).
|
|
|
SetTotalFriend(totalFriend).
|
|
@@ -438,10 +476,10 @@ func (l *CronTask) computeStatistic() {
|
|
|
//
|
|
|
//var allMonthAiResponseInt, allMonthSopRunInt, allMonthFriendCountInt, allMonthGroupCountInt, allMonthAccountBalanceInt, allMonthConsumeTokenInt, allMonthActiveUserInt uint64
|
|
|
//var allMonthNewUserInt int64
|
|
|
- //for orgID, wxinfos := range wxbotsSet {
|
|
|
+ //for orgID, wxids := range wxbotsSet {
|
|
|
// var orgAiResponseInt, orgSopRunInt, orgFriendCountInt, orgGroupCountInt, orgAccountBalanceInt, orgConsumeTokenInt, orgActiveUserInt uint64
|
|
|
// var orgNewUserInt int64
|
|
|
- // for _, wxinfo := range wxinfos {
|
|
|
+ // for _, wxinfo := range wxids {
|
|
|
// l.Logger.Infof("开始计算月数据:%d\n", month)
|
|
|
//
|
|
|
// // 获取上月的第一天
|