|
- package crontask
- import (
- "encoding/json"
- "net/url"
- "path"
- "time"
- "wechat-api/ent"
- "wechat-api/ent/batchmsg"
- "wechat-api/ent/contact"
- "wechat-api/ent/custom_types"
- "wechat-api/ent/labelrelationship"
- "wechat-api/ent/msg"
- "wechat-api/ent/wx"
- "wechat-api/hook"
- "wechat-api/internal/utils/dberrorhandler"
- )
- func (l *CronTask) sendMsg() {
-
- batchList, err := l.svcCtx.DB.BatchMsg.Query().Where(
- batchmsg.StartTimeLT(time.Now()),
- batchmsg.StatusIn(0, 1),
- batchmsg.CtypeIn(1, 3),
- ).All(l.ctx)
- l.Logger.Infof("send_msg.go BatchList %v\n", batchList)
- if err != nil {
- l.Logger.Errorf("batchList err: %v\n", err)
- return
- }
- startTime := time.Now()
- for _, batch := range batchList {
-
- l.Logger.Infof("batch start: %s\n", batch.BatchNo)
-
- if batch.Status == 0 {
- userList := make([]*ent.Contact, 0)
- groupList := make([]*ent.Contact, 0)
- tagMap := make(map[string][]uint64)
- err = json.Unmarshal([]byte(batch.Tagids), &tagMap)
- if err != nil {
- continue
- }
- l.Logger.Infof("send_msg.go batch.Tagids = %v\n", tagMap)
- var allContact, allGroup, ok bool
- var contactTags, groupTags []uint64
- if contactTags, ok = tagMap["contact_tag"]; ok {
- allContact = hasAll(contactTags, 0)
- l.Logger.Infof("contactTags=%v \n", contactTags)
- }
- if groupTags, ok = tagMap["group_tag"]; ok {
- allGroup = hasAll(groupTags, 0)
- l.Logger.Infof("groupTags=%v \n", groupTags)
- }
- var err error
- if allContact && allGroup {
-
- userList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.WxWxid(batch.Fromwxid),
- contact.TypeIn(1, 2),
- contact.CtypeIn(1, 3),
- ).All(l.ctx)
- if err != nil {
- l.Logger.Errorf("userlist err: %v \n", err)
- continue
- }
- } else {
- if allContact {
-
- userList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.WxWxid(batch.Fromwxid),
- contact.TypeEQ(1),
- contact.CtypeIn(1, 3),
- ).All(l.ctx)
- if err != nil {
- l.Logger.Errorf("userList err: %v \n", err)
- continue
- }
- } else {
- userList, err = l.getContactList(contactTags, batch.Fromwxid, 1)
- if err != nil {
- l.Logger.Errorf("userList err: %v \n", err)
- continue
- }
- }
- if allGroup {
-
- groupList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.WxWxid(batch.Fromwxid),
- contact.TypeEQ(2),
- contact.CtypeIn(1, 3),
- ).All(l.ctx)
- if err != nil {
- l.Logger.Errorf("groupList err: %v \n", err)
- continue
- }
- } else {
- groupList, err = l.getContactList(groupTags, batch.Fromwxid, 2)
- if err != nil {
- l.Logger.Errorf("groupList err: %v \n", err)
- continue
- }
- }
- if len(groupList) > 0 {
- userList = append(userList, groupList...)
- }
- }
-
- msgs := make([]*ent.MsgCreate, 0)
-
- msgArray := make([]custom_types.Action, 0)
- err = json.Unmarshal([]byte(batch.Msg), &msgArray)
- l.Logger.Infof("msgArray length= %v, err:%v \n", len(msgArray), err)
- if err != nil {
-
- msgArray = make([]custom_types.Action, 0)
- }
- for _, user := range userList {
-
-
-
- if len(msgArray) > 0 {
-
- for _, msgItem := range msgArray {
- msgRow := l.svcCtx.DB.Msg.Create().
- SetNotNilFromwxid(&batch.Fromwxid).
- SetNotNilToid(&user.Wxid).
- SetMsgtype(int32(msgItem.Type)).
- SetNotNilMsg(&msgItem.Content).
- SetStatus(0).
- SetNotNilBatchNo(&batch.BatchNo)
- msgs = append(msgs, msgRow)
- }
- }
- }
- if len(msgs) > 0 {
-
- tx, err := l.svcCtx.DB.Tx(l.ctx)
- if err != nil {
- l.Logger.Errorf("start db transaction err: %v \n", err)
- continue
- }
- _, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
- if err != nil {
- _ = tx.Rollback()
- l.Logger.Errorf("batchmsg update err: %v \n", err)
- continue
- }
- _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
- if err != nil {
- _ = tx.Rollback()
- l.Logger.Errorf("msg CreateBulk err: %v \n", err)
- continue
- }
- _ = tx.Commit()
- } else {
-
- _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
- SetStatus(2).
- SetTotal(0).
- SetSuccess(0).
- SetFail(0).
- Save(l.ctx)
- if err != nil {
- l.Logger.Errorf("batchmsg update err: %v \n", err)
- }
- continue
- }
- }
-
- msglist, err := l.svcCtx.DB.Msg.Query().Where(
- msg.BatchNoEQ(batch.BatchNo),
- msg.StatusEQ(0),
- ).All(l.ctx)
- if err != nil {
- l.Logger.Errorf("msglist err: %v \n", err)
- continue
- }
- wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
- if err != nil {
- l.Logger.Errorf("wxInfo err: %v \n", err)
- continue
- }
- privateIP := ""
- adminPort := ""
- port := ""
- ctype := batch.Ctype
- if wxInfo.ServerID != 0 {
- serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
- if err != nil {
- l.Logger.Errorf("serverInfo err: %v \n", err)
- continue
- }
- privateIP = serverInfo.PrivateIP
- adminPort = serverInfo.AdminPort
- port = wxInfo.Port
- }
- var hookClient *hook.Hook
- if ctype == 3 {
- hookClient = hook.NewWecomHook("", adminPort, port)
- } else {
- hookClient = hook.NewHook(privateIP, adminPort, port)
- }
-
- for _, msg := range msglist {
-
-
- if msg.Msgtype == 1 {
- err = hookClient.SendTextMsg(msg.Toid, msg.Msg, wxInfo.Wxid)
- } else if msg.Msgtype == 2 {
- diyfilename := getFileName(msg.Msg)
- err = hookClient.SendPicMsg(msg.Toid, msg.Msg, diyfilename, wxInfo.Wxid)
- }
-
- time.Sleep(time.Second)
- if err != nil {
- l.Logger.Errorf("send msg err: %v \n", err)
- _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
- if err != nil {
- l.Logger.Errorf("msg update err: %v \n", err)
- continue
- }
- continue
- }
- _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
- if err != nil {
- l.Logger.Errorf("msg update err: %v \n", err)
- continue
- }
- }
-
- total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
-
- success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
-
- fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
-
- _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
- SetStatus(2).
- SetTotal(int32(total)).
- SetSuccess(int32(success)).
- SetFail(int32(fail)).
- SetStopTime(time.Now()).
- Save(l.ctx)
- if err != nil {
- l.Logger.Errorf("batchmsg update err: %v \n", err)
- continue
- }
- l.Logger.Infof("batch stop:%s \n", batch.BatchNo)
- }
- finishTime := time.Now()
- l.Logger.Infof("This process cost %v \n", finishTime.Sub(startTime).String())
- return
- }
- func getFileName(photoUrl string) string {
- u, err := url.Parse(photoUrl)
- if err != nil {
- return ""
- }
- return path.Base(u.Path)
- }
- func hasAll(array []uint64, target uint64) bool {
- for _, val := range array {
- if val == target {
- return true
- }
- }
- return false
- }
- func (l *CronTask) getContactList(labels []uint64, fromWxId string, stype int) ([]*ent.Contact, error) {
-
- labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(
- labelrelationship.LabelIDIn(labels...),
- ).All(l.ctx)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
- }
- contact_ids := make([]uint64, 0, len(labelrelationships))
- for _, labelrelationship := range labelrelationships {
- contact_ids = append(contact_ids, labelrelationship.ContactID)
- }
- userList := make([]*ent.Contact, 0)
- if len(contact_ids) > 0 {
-
- userList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.WxWxid(fromWxId),
- contact.IDIn(contact_ids...),
- contact.TypeEQ(stype),
- contact.CtypeIn(1, 3),
- ).All(l.ctx)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
- }
- }
- return userList, nil
- }
|