package crontask import ( "encoding/json" "net/url" "path" "time" "wechat-api/ent" "wechat-api/ent/batchmsg" "wechat-api/ent/contact" "wechat-api/ent/custom_types" "wechat-api/ent/labelrelationship" "wechat-api/ent/msg" "wechat-api/ent/wx" "wechat-api/hook" "wechat-api/internal/utils/dberrorhandler" ) func (l *CronTask) sendMsg() { // 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据 batchList, err := l.svcCtx.DB.BatchMsg.Query().Where( batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1), batchmsg.CtypeIn(1, 3), ).All(l.ctx) if err != nil { l.Logger.Errorf("batchList err: %v", err) return } startTime := time.Now() for _, batch := range batchList { // 记录当前批次开始处理 l.Logger.Info("batch start: ", batch.BatchNo) // 如果 批次 status 为 0,则先产生待发送消息 if batch.Status == 0 { userList := make([]*ent.Contact, 0) groupList := make([]*ent.Contact, 0) tagMap := make(map[string][]uint64) err = json.Unmarshal([]byte(batch.Tagids), &tagMap) if err != nil { continue } l.Logger.Infof("send_msg.go batch.Tagids = %v\n", tagMap) var allContact, allGroup, ok bool var contactTags, groupTags []uint64 if contactTags, ok = tagMap["contact_tag"]; ok { allContact = hasAll(contactTags, 0) l.Logger.Infof("contactTags=%v", contactTags) } if groupTags, ok = tagMap["group_tag"]; ok { allGroup = hasAll(groupTags, 0) l.Logger.Infof("groupTags=%v", groupTags) } var err error if allContact && allGroup { // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据 userList, err = l.svcCtx.DB.Contact.Query().Where( contact.WxWxid(batch.Fromwxid), contact.TypeIn(1, 2), contact.CtypeIn(1, 3), ).All(l.ctx) if err != nil { l.Logger.Errorf("userlist err: %v", err) continue } } else { if allContact { // 所有联系人 // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据 userList, err = l.svcCtx.DB.Contact.Query().Where( contact.WxWxid(batch.Fromwxid), contact.TypeEQ(1), contact.CtypeIn(1, 3), ).All(l.ctx) if err != nil { l.Logger.Errorf("userList err: %v", err) continue } } else { //获取指定标签的联系人 userList, err = l.getContactList(contactTags, batch.Fromwxid, 1) if err != nil { l.Logger.Errorf("userList err: %v", err) continue } } if allGroup { //所有群 // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为2的数据 groupList, err = l.svcCtx.DB.Contact.Query().Where( contact.WxWxid(batch.Fromwxid), contact.TypeEQ(2), contact.CtypeIn(1, 3), ).All(l.ctx) if err != nil { l.Logger.Errorf("groupList err: %v", err) continue } } else { //获取指定标签的群 groupList, err = l.getContactList(groupTags, batch.Fromwxid, 2) if err != nil { l.Logger.Errorf("groupList err: %v", err) continue } } if len(groupList) > 0 { userList = append(userList, groupList...) } } // 这里是待插入到 msg 表的数据 msgs := make([]*ent.MsgCreate, 0) // 这里是把 batch.Msg 转换为 json 数组 msgArray := make([]custom_types.Action, 0) err = json.Unmarshal([]byte(batch.Msg), &msgArray) l.Logger.Infof("msgArray length= %v, err:%v", len(msgArray), err) if err != nil { // json 解析失败 msgArray = make([]custom_types.Action, 0) } for _, user := range userList { // 这里改动主要是 batch_msg 目前支持批量添加图文,导致 batch_msg 的 msg 字段为 json // msg 里包括文字和图片,msgtype=1 为文字, msgtype=2 为图片 // 每一条文字或者图片 都是一条单独的消息 if len(msgArray) > 0 { // 这里是新格式(msg内容为json),需要遍历数组 for _, msgItem := range msgArray { msgRow := l.svcCtx.DB.Msg.Create(). SetNotNilFromwxid(&batch.Fromwxid). SetNotNilToid(&user.Wxid). SetMsgtype(int32(msgItem.Type)). SetNotNilMsg(&msgItem.Content). SetStatus(0). SetNotNilBatchNo(&batch.BatchNo) msgs = append(msgs, msgRow) } } } if len(msgs) > 0 { // 加事务,批量操作一条 batch_msg 和 一堆 msg 信息 tx, err := l.svcCtx.DB.Tx(l.ctx) if err != nil { l.Logger.Errorf("start db transaction err: %v", err) continue } _, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx) if err != nil { _ = tx.Rollback() l.Logger.Errorf("batchmsg update err: %v", err) continue } _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx) if err != nil { _ = tx.Rollback() l.Logger.Errorf("msg CreateBulk err: %v", err) continue } _ = tx.Commit() } else { // 如果没有消息,直接更新批次状态为已发送 _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID). SetStatus(2). SetTotal(0). SetSuccess(0). SetFail(0). Save(l.ctx) if err != nil { l.Logger.Errorf("batchmsg update err: %v", err) } continue } } // 获取当前批次的所有待发送消息 msglist, err := l.svcCtx.DB.Msg.Query().Where( msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(0), ).All(l.ctx) if err != nil { l.Logger.Errorf("msglist err: %v", err) continue } wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx) if err != nil { l.Logger.Errorf("wxInfo err: %v", err) continue } privateIP := "" adminPort := "" port := "" var ctype uint64 if wxInfo.ServerID != 0 { serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID) if err != nil { l.Logger.Errorf("serverInfo err: %v", err) continue } privateIP = serverInfo.PrivateIP adminPort = serverInfo.AdminPort port = wxInfo.Port ctype = wxInfo.Ctype } var hookClient *hook.Hook if ctype == uint64(3) { hookClient = hook.NewWecomHook("", adminPort, port) } else { hookClient = hook.NewHook(privateIP, adminPort, port) } //循环发送消息 for _, msg := range msglist { // 这里之前只有文字消息(既 msgtype=1) 目前增加了图片 所以增加了msgtype=2 // 所以增加了一个判断,判断发送的内容类型,如果是文字就调用SendTextMsg,如果是图片就调用SendPicMsg if msg.Msgtype == 1 { err = hookClient.SendTextMsg(msg.Toid, msg.Msg, wxInfo.Wxid) } else if msg.Msgtype == 2 { diyfilename := getFileName(msg.Msg) err = hookClient.SendPicMsg(msg.Toid, msg.Msg, diyfilename, wxInfo.Wxid) } // 每次发完暂停1秒 time.Sleep(time.Second) if err != nil { l.Logger.Errorf("send msg err: %v", err) _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx) if err != nil { l.Logger.Errorf("msg update err: %v", err) continue } continue } _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx) if err != nil { l.Logger.Errorf("msg update err: %v", err) continue } } // 获取当前批次的所有发送的消息总数 total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx) // 获取当前批次的所有发送成功的消息总数 success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx) // 获取当前批次的所有发送失败的消息总数 fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx) // 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间 _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID). SetStatus(2). SetTotal(int32(total)). SetSuccess(int32(success)). SetFail(int32(fail)). SetStopTime(time.Now()). Save(l.ctx) if err != nil { l.Logger.Errorf("batchmsg update err: %v", err) continue } l.Logger.Info("batch stop: ", batch.BatchNo) } finishTime := time.Now() l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String()) return } // 根据URL获取图片名 func getFileName(photoUrl string) string { u, err := url.Parse(photoUrl) if err != nil { return "" } return path.Base(u.Path) } func hasAll(array []uint64, target uint64) bool { for _, val := range array { if val == target { return true } } return false } func (l *CronTask) getContactList(labels []uint64, fromWxId string, stype int) ([]*ent.Contact, error) { // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where( labelrelationship.LabelIDIn(labels...), ).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil) } contact_ids := make([]uint64, 0, len(labelrelationships)) for _, labelrelationship := range labelrelationships { contact_ids = append(contact_ids, labelrelationship.ContactID) } userList := make([]*ent.Contact, 0) if len(contact_ids) > 0 { // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据 userList, err = l.svcCtx.DB.Contact.Query().Where( contact.WxWxid(fromWxId), contact.IDIn(contact_ids...), contact.TypeEQ(stype), contact.CtypeIn(1, 3), ).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil) } } return userList, nil }