package mq import ( "context" "encoding/json" "errors" "time" "go.mongodb.org/mongo-driver/bson" newMongo "go.mongodb.org/mongo-driver/mongo" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/common" "mongo.games.com/game/dbproxy/mongo" "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" ) func init() { mq.RegisterSubscriber(model.EvtInvite, func(e broker.Event) (err error) { msg := e.Message() if msg != nil { defer func() { e.Ack() }() var log model.EvtInviteMsg err = json.Unmarshal(msg.Body, &log) if err != nil { logger.Logger.Errorf("EvtInvite json.Unmarshal error:%v msg:%v", err, string(msg.Body)) return } logger.Logger.Tracef("EvtInvite log:%+v", log) var addRechargeScore bool if log.Tp != common.InviteScoreCheckWeek { addRechargeScore, err = svc.CheckInviteScore(&log.InviteScore) if err != nil { logger.Logger.Errorf("EvtInvite SaveInviteScore error:%v msg:%+v %+v", err, log.InviteScore, log) return err } } now := time.Unix(log.Ts, 0).Local() // 数据创建时间 get := func(snid int32) (*model.EvtInviteAckMsg, error) { if snid == 0 { return nil, errors.New("not found") } n := new(model.EvtInviteAckMsg) n.Platform = log.Platform n.Snid = snid n.Score, n.Num, n.Money, _, err = svc.GetInviteData(log.Platform, snid) return n, err } // 重置积分 reset := func(snid int32) error { if snid == 0 { return nil } _, _, _, ts, err := svc.GetInviteData(log.Platform, snid) // 上次更新时间 if err != nil { logger.Logger.Errorf("EvtInvite GetInviteData error:%v snid:%v", err, snid) return err } inSameWeek := common.InSameWeek(ts, now) if !inSameWeek { err = svc.ClearInviteScore(log.Platform, snid, now) if err != nil { logger.Logger.Errorf("EvtInvite ClearInviteScore error:%v snid:%v", err, snid) return err } msg, err := get(snid) if err != nil { logger.Logger.Errorf("EvtInvite get error:%v snid:%v", err, snid) return err } msg.Score = 0 mq.Send(model.EvtInviteAck, msg) } return nil } // 当前玩家积分是否跨周重置 tmpSnid := log.SnId for i := 0; i < 6; i++ { if tmpSnid > 0 { err = reset(tmpSnid) if err != nil { logger.Logger.Errorf("EvtInvite reset %v error:%v snid:%v", i, err, tmpSnid) return err } } if tmpSnid > 0 { tmpSnid, err = svc.GetPSnId(log.Platform, tmpSnid) if err != nil { logger.Logger.Errorf("EvtInvite GetPSnId %v error:%v snid:%v", i, err, tmpSnid) return err } } } mongoClient, err := mongo.NewMongoClient() if err != nil { logger.Logger.Errorf("EvtInvite NewMongoClient error:%v", err) return err } session, err := mongoClient.StartSession() if err != nil { logger.Logger.Errorf("EvtInvite StartSession error:%v", err) return err } defer session.EndSession(context.Background()) // 事务不支持创建集合和索引,需要提前创建 svc.InviteScoreCollection(log.Platform) svc.RankInviteCollection(log.Platform) var notifySnId []int32 // 修改积分 err = newMongo.WithSession(context.Background(), session, func(sc newMongo.SessionContext) error { err := session.StartTransaction() if err != nil { return err } // 增加积分 add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error { if psnid <= 0 { return nil } notifySnId = append(notifySnId, psnid) err = svc.AddInviteScore(mongoClient, sc, log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney) if err != nil { logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) return err } return nil } getPSnId := func(platform string, snid int32) (int32, error) { cfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, svc.PlayerDBName) if !b { return 0, errors.New("not db") } playerDB := mongoClient.Database(cfg.Database) type M struct { PSnId int32 } res := &M{} c := playerDB.Collection(svc.PlayerCollName) err = c.FindOne(context.TODO(), bson.D{{"snid", snid}}).Decode(res) if err != nil && !errors.Is(err, newMongo.ErrNoDocuments) { return 0, err } return res.PSnId, nil } switch log.Tp { case common.InviteScoreTypeBind, common.InviteScoreTypeBindTel: // 更新绑定数量 // 更新邀请积分 // 1.邀请人增加积分 err = add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false) if err != nil { session.AbortTransaction(sc) return err } // 2.上级增加积分 var psnid int32 if len(log.Rate) > 0 { psnid, err = getPSnId(log.Platform, log.InviteSnId) if err != nil { session.AbortTransaction(sc) logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) return err } if psnid > 0 { err = add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false) if err != nil { session.AbortTransaction(sc) return err } } } if len(log.Rate) > 1 && psnid > 0 { psnid, err = getPSnId(log.Platform, psnid) if err != nil { session.AbortTransaction(sc) logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) return err } if psnid > 0 { err = add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) if err != nil { session.AbortTransaction(sc) return err } } } if len(log.Rate) > 2 && psnid > 0 { psnid, err = getPSnId(log.Platform, psnid) if err != nil { session.AbortTransaction(sc) logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) return err } if psnid > 0 { err = add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) if err != nil { session.AbortTransaction(sc) return err } } } case common.InviteScoreTypePay: // 更新充值积分,上级积分增加 add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false) var psnid int32 if len(log.Rate) > 0 { err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true) if err != nil { session.AbortTransaction(sc) return err } psnid, err = getPSnId(log.Platform, log.InviteSnId) if err != nil { session.AbortTransaction(sc) logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) return err } } if len(log.Rate) > 1 && psnid > 0 { err = add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) if err != nil { session.AbortTransaction(sc) return err } psnid, err = getPSnId(log.Platform, psnid) if err != nil { session.AbortTransaction(sc) logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid) return err } } if len(log.Rate) > 2 && psnid > 0 { err = add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) if err != nil { session.AbortTransaction(sc) return err } } if addRechargeScore { err = add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true) if err != nil { session.AbortTransaction(sc) return err } } case common.InviteScoreTypeRecharge: // 更新自己的积分 if addRechargeScore { err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true) } else { // 只增加充值金额 err = add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true) } if err != nil { session.AbortTransaction(sc) return err } case common.InviteScoreCheckWeek: default: logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp) return err } // 提交事务 err = session.CommitTransaction(sc) if err != nil { logger.Logger.Errorf("EvtInvite CommitTransaction error:%v", err) return err } return nil }) if err != nil { logger.Logger.Errorf("EvtInvite WithSession error:%v", err) return err } // 通知变更 for _, v := range notifySnId { msg, err := get(v) if err != nil { logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, v, log.Score, log.Rate) return err } mq.Send(model.EvtInviteAck, msg) } } return nil }, broker.Queue(model.EvtInvite), broker.DisableAutoAck(), rabbitmq.DurableQueue()) }