Browse Source

对齐数据

boweniac 2 days ago
parent
commit
bd110222dc

+ 3 - 2
crontask/compute_historical_credit.go

@@ -38,9 +38,10 @@ func (l *CronTask) computeHistoricalCredit() {
 
 	// 获取当前时间
 	//now := time.Now()
-	//start := time.Date(2024, 12, 25, 0, 0, 0, 0, time.Local)
+	//start := time.Date(2025, 4, 30, 16, 0, 0, 0, time.Local)
+	//end := time.Date(2025, 5, 11, 16, 0, 0, 0, time.Local)
 	start := time.Date(2024, 11, 24, 0, 0, 0, 0, time.Local)
-	end := time.Date(2025, 5, 9, 0, 0, 0, 0, time.Local)
+	end := time.Date(2025, 5, 14, 0, 0, 0, 0, time.Local)
 	//start := time.Date(2025, 3, 18, 0, 0, 0, 0, time.Local)
 	//end := time.Date(2025, 3, 19, 23, 0, 0, 0, time.Local)
 

+ 12 - 1
crontask/compute_historical_statistic.go

@@ -57,8 +57,11 @@ func (l *CronTask) computeHistoricalStatistic() {
 
 	// 获取当前时间
 	//now := time.Now()
+	//start := time.Date(2025, 5, 5, 0, 0, 0, 0, time.Local)
+	//end := time.Date(2025, 5, 8, 0, 0, 0, 0, time.Local)
+
 	start := time.Date(2024, 11, 24, 0, 0, 0, 0, time.Local)
-	end := time.Date(2025, 5, 9, 0, 0, 0, 0, time.Local)
+	end := time.Date(2025, 5, 14, 0, 0, 0, 0, time.Local)
 
 	for now := start; !now.After(end); now = now.Add(time.Hour) {
 		fmt.Println(now.Format("2006-01-02 15:00:00"))
@@ -100,6 +103,11 @@ func (l *CronTask) computeHistoricalStatistic() {
 					usagedetail.CreatedAtGTE(lastHour),
 					usagedetail.CreatedAtLT(currentHour),
 				).Aggregate(ent.Sum("credits")).Float64(l.ctx)
+
+				ush, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.BotIDEQ(wxinfo), usagestatistichour.OrganizationIDEQ(orgID)).Only(l.ctx)
+				if ush == nil {
+					continue
+				}
 				allHourConsumeCoinFloat += consumeCoinFloat
 				orgHourConsumeCoinFloat += consumeCoinFloat
 
@@ -156,10 +164,13 @@ func (l *CronTask) computeHistoricalStatistic() {
 			var orgDayConsumeCoinFloat float64
 			for _, wxinfo := range wxinfos {
 				l.Logger.Infof("开始计算日数据:%d\n", yesterdayInt)
+				l.Logger.Infof("yesterdayFirstHourInt:%d\n", yesterdayFirstHourInt)
+				l.Logger.Infof("yesterdayLastHourInt:%d\n", yesterdayLastHourInt)
 
 				hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
 					usagestatistichour.Type(1),
 					usagestatistichour.BotID(wxinfo),
+					usagestatistichour.OrganizationIDEQ(orgID),
 					usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)),
 					usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)),
 				).All(l.ctx)

+ 230 - 0
crontask/compute_historical_statistic_back.go

@@ -0,0 +1,230 @@
+package crontask
+
+//
+//import (
+//	"fmt"
+//	"github.com/zeromicro/go-zero/core/logx"
+//	"strconv"
+//	"time"
+//	"wechat-api/ent"
+//	"wechat-api/ent/usagedetail"
+//	"wechat-api/ent/usagestatisticday"
+//	"wechat-api/ent/usagestatistichour"
+//)
+//
+//type OrgBot struct {
+//	OrganizationID uint64 `json:"organization_id"`
+//	BotID          string `json:"bot_id"`
+//}
+//
+//func (l *CronTask) computeHistoricalStatistic() {
+//	// 获取所有机器人信息
+//	//wxbots, err := l.svcCtx.DB.Wx.Query().Select(wx.FieldWxid, wx.FieldID, wx.FieldOrganizationID).All(l.ctx)
+//	//if err != nil {
+//	//	l.Errorf("fetch wxids error:%v\n", err)
+//	//	return
+//	//}
+//	//
+//	//wxbotsSet := make(map[uint64][]*ent.Wx)
+//	//for _, bot := range wxbots {
+//	//	if !strings.HasPrefix(bot.Wxid, "temp-") {
+//	//		wxbotsSet[bot.OrganizationID] = append(wxbotsSet[bot.OrganizationID], bot)
+//	//	}
+//	//}
+//	orgBots := []OrgBot{}
+//	err := l.svcCtx.DB.UsageDetail.Query().
+//		GroupBy(usagedetail.FieldOrganizationID, usagedetail.FieldBotID).
+//		Aggregate().
+//		Scan(l.ctx, &orgBots)
+//
+//	if err != nil {
+//		l.Errorf("group usage_detail error: %v", err)
+//		return
+//	}
+//
+//	wxbotsSet := make(map[uint64][]string)
+//	for _, ob := range orgBots {
+//		wxbotsSet[ob.OrganizationID] = append(wxbotsSet[ob.OrganizationID], ob.BotID)
+//	}
+//	logx.Info("wxbotsSet: ", wxbotsSet)
+//
+//	/*
+//		计算本小时的数据
+//		1. 查询出上小时里所有 usagedetail 内容
+//		2. 挨个遍历他的 bot_id ,再查询他的 bot_id 相关的参数
+//		3. 遍历的时候可能有重复,所以要先检查是否生成了数据,如果有就忽略,没有再生成
+//		----------------------------------------------------------------------------------------------------------
+//	*/
+//
+//	// 获取当前时间
+//	//now := time.Now()
+//	start := time.Date(2025, 5, 5, 0, 0, 0, 0, time.Local)
+//	end := time.Date(2025, 5, 8, 0, 0, 0, 0, time.Local)
+//
+//	//start := time.Date(2024, 11, 24, 0, 0, 0, 0, time.Local)
+//	//end := time.Date(2025, 5, 9, 0, 0, 0, 0, time.Local)
+//
+//	for now := start; !now.After(end); now = now.Add(time.Hour) {
+//		fmt.Println(now.Format("2006-01-02 15:00:00"))
+//
+//		// 获取本小时的第一分钟
+//		currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()).Add(-8 * time.Hour)
+//		currentHourInt, _ := strconv.Atoi(currentHour.Format("2006010215"))
+//		currentHourLocation := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
+//		currentHourIntLocation, _ := strconv.Atoi(currentHourLocation.Format("2006010215"))
+//
+//		// 上一个小时的起始时间
+//		lastHour := currentHour.Add(-time.Hour * 1)
+//		lastHourLocation := currentHourLocation.Add(-time.Hour * 1)
+//		lastHourInt, _ := strconv.Atoi(lastHour.Format("2006010215"))
+//		lastHourIntLocation, _ := strconv.Atoi(lastHourLocation.Format("2006010215"))
+//
+//		l.Logger.Infof("currentHour:%d\n", currentHour)
+//		l.Logger.Infof("currentHourInt:%d\n", currentHourInt)
+//		l.Logger.Infof("currentHourLocation:%d\n", currentHourLocation)
+//		l.Logger.Infof("currentHourIntLocation:%d\n", currentHourIntLocation)
+//		l.Logger.Infof("lastHour:%d\n", lastHour)
+//		l.Logger.Infof("lastHourLocation:%d\n", lastHourLocation)
+//		l.Logger.Infof("lastHourInt:%d\n", lastHourInt)
+//		l.Logger.Infof("lastHourIntLocation:%d\n", lastHourIntLocation)
+//
+//		var allHourConsumeCoinFloat float64
+//		for orgID, wxinfos := range wxbotsSet {
+//			var orgHourConsumeCoinFloat float64
+//			for _, wxinfo := range wxinfos {
+//				l.Logger.Infof("开始计算小时数据:%d\n", lastHourInt)
+//
+//				// 先判断该账号是否已经统计了小时数据,如果已经统计了,就不需要再统计了
+//				var consumeCoinFloat float64
+//
+//				// 计算积分消耗
+//				consumeCoinFloat, _ = l.svcCtx.DB.UsageDetail.Query().Where(
+//					usagedetail.BotID(wxinfo),
+//					usagedetail.OrganizationIDEQ(orgID),
+//					usagedetail.CreatedAtGTE(lastHour),
+//					usagedetail.CreatedAtLT(currentHour),
+//				).Aggregate(ent.Sum("credits")).Float64(l.ctx)
+//
+//				ush, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.BotIDEQ(wxinfo), usagestatistichour.OrganizationIDEQ(orgID)).Only(l.ctx)
+//				if ush == nil {
+//					continue
+//				}
+//				allHourConsumeCoinFloat += consumeCoinFloat
+//				orgHourConsumeCoinFloat += consumeCoinFloat
+//
+//				logx.Info("hour consumeCoinFloat: ", consumeCoinFloat)
+//				logx.Info("hour allHourConsumeCoinFloat: ", allHourConsumeCoinFloat)
+//				logx.Info("hour orgHourConsumeCoinFloat: ", orgHourConsumeCoinFloat)
+//
+//				_, err := l.svcCtx.DB.UsageStatisticHour.Update().
+//					Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.BotIDEQ(wxinfo), usagestatistichour.OrganizationIDEQ(orgID)).
+//					SetConsumeCoin(consumeCoinFloat).
+//					Save(l.ctx)
+//				if err != nil {
+//					l.Errorf("save hour data error:%v \n", err)
+//				}
+//			}
+//
+//			_, err = l.svcCtx.DB.UsageStatisticHour.Update().
+//				Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.OrganizationIDEQ(orgID), usagestatistichour.BotIDEQ("")).
+//				SetConsumeCoin(orgHourConsumeCoinFloat).
+//				Save(l.ctx)
+//			if err != nil {
+//				l.Errorf("save hour data error:%v \n", err)
+//			}
+//		}
+//
+//		// 先判断该租户是否已经统计了小时数据,如果已经统计了,就不需要再统计了
+//		_, err = l.svcCtx.DB.UsageStatisticHour.Update().
+//			Where(usagestatistichour.AddtimeEQ(uint64(lastHourIntLocation)), usagestatistichour.OrganizationIDEQ(0), usagestatistichour.BotIDEQ("")).
+//			SetConsumeCoin(allHourConsumeCoinFloat).
+//			Save(l.ctx)
+//		if err != nil {
+//			l.Errorf("save hour data error:%v \n", err)
+//		}
+//
+//		/*
+//			计算日数据
+//			----------------------------------------------------------------------------------------------------------
+//		*/
+//		//dayStr := time.Now().Format("20060102")
+//		//day, _ := strconv.Atoi(dayStr)
+//
+//		// 获取昨天的第一小时
+//		yesterday := now.AddDate(0, 0, -1)
+//		yesterdayFirstHour := time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, now.Location())
+//		yesterdayInt, _ := strconv.Atoi(yesterdayFirstHour.Format("20060102"))
+//		yesterdayFirstHourInt, _ := strconv.Atoi(yesterdayFirstHour.Format("2006010215"))
+//
+//		// 获取昨天的最后一小时
+//		yesterdayLastHour := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
+//		yesterdayLastHourInt, _ := strconv.Atoi(yesterdayLastHour.Format("2006010215"))
+//
+//		var allDayConsumeCoinFloat float64
+//		for orgID, wxinfos := range wxbotsSet {
+//			var orgDayConsumeCoinFloat float64
+//			for _, wxinfo := range wxinfos {
+//				l.Logger.Infof("开始计算日数据:%d\n", yesterdayInt)
+//				l.Logger.Infof("yesterdayFirstHourInt:%d\n", yesterdayFirstHourInt)
+//				l.Logger.Infof("yesterdayLastHourInt:%d\n", yesterdayLastHourInt)
+//
+//				hourDataBatch, _ := l.svcCtx.DB.UsageStatisticHour.Query().Where(
+//					usagestatistichour.Type(1),
+//					usagestatistichour.BotID(wxinfo),
+//					usagestatistichour.OrganizationIDEQ(orgID),
+//					usagestatistichour.AddtimeGTE(uint64(yesterdayFirstHourInt)),
+//					usagestatistichour.AddtimeLT(uint64(yesterdayLastHourInt)),
+//				).All(l.ctx)
+//
+//				if hourDataBatch == nil {
+//					continue
+//				}
+//
+//				var consumeCoin float64
+//				for _, hourData := range hourDataBatch {
+//					consumeCoin += hourData.ConsumeCoin
+//				}
+//
+//				orgDayConsumeCoinFloat += consumeCoin
+//
+//				allDayConsumeCoinFloat += consumeCoin
+//
+//				// 如果添加过了就略过
+//				if yesterdayLastHourInt <= currentHourInt {
+//					_, err := l.svcCtx.DB.UsageStatisticDay.Update().
+//						Where(usagestatisticday.AddtimeEQ(uint64(yesterdayInt)), usagestatisticday.BotID(wxinfo), usagestatisticday.OrganizationIDEQ(orgID)).
+//						SetConsumeCoin(consumeCoin).
+//						Save(l.ctx)
+//					if err != nil {
+//						l.Errorf("create day data error:%v \n", err)
+//						continue
+//					}
+//				}
+//			}
+//			// 如果添加过了就略过
+//			if yesterdayLastHourInt <= currentHourInt {
+//				_, err := l.svcCtx.DB.UsageStatisticDay.Update().
+//					Where(usagestatisticday.AddtimeEQ(uint64(yesterdayInt)), usagestatisticday.OrganizationIDEQ(orgID), usagestatisticday.BotIDEQ("")).
+//					SetConsumeCoin(orgDayConsumeCoinFloat).
+//					Save(l.ctx)
+//				if err != nil {
+//					l.Errorf("create day data error:%v \n", err)
+//					continue
+//				}
+//			}
+//		}
+//
+//		// 如果添加过了就略过
+//		if yesterdayLastHourInt <= currentHourInt {
+//			_, err = l.svcCtx.DB.UsageStatisticDay.Update().
+//				Where(usagestatisticday.AddtimeEQ(uint64(yesterdayInt)), usagestatisticday.OrganizationIDEQ(0), usagestatisticday.BotIDEQ("")).
+//				SetConsumeCoin(allDayConsumeCoinFloat).
+//				Save(l.ctx)
+//			if err != nil {
+//				l.Errorf("create day data error:%v \n", err)
+//			}
+//		}
+//	}
+//
+//	return
+//}