From 303f0817ca8c653416c0c34d2d41250a03d60976 Mon Sep 17 00:00:00 2001 From: sk <123456@qq.com> Date: Fri, 28 Jun 2024 15:07:57 +0800 Subject: [PATCH] =?UTF-8?q?mongodb=E4=BA=8B=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbproxy/mongo/newmongo.go | 53 ++++++ dbproxy/mq/c_invite.go | 308 ++++++++++++++++++++++------------- dbproxy/svc/l_invitescore.go | 14 +- dbproxy/svc/u_player.go | 104 ++++++++---- model/invitecode.go | 11 ++ model/player.go | 28 ++++ worldsrv/action_welfare.go | 27 +-- 7 files changed, 382 insertions(+), 163 deletions(-) create mode 100644 dbproxy/mongo/newmongo.go diff --git a/dbproxy/mongo/newmongo.go b/dbproxy/mongo/newmongo.go new file mode 100644 index 0000000..f5f7e8d --- /dev/null +++ b/dbproxy/mongo/newmongo.go @@ -0,0 +1,53 @@ +package mongo + +import ( + "context" + "errors" + "fmt" + "time" + + newMongo "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "mongo.games.com/goserver/core/logger" +) + +var globalMongoSession *newMongo.Client + +func mongoURI(user, password, host string, port int32, options string) string { + login := "" + if user != "" { + login = user + ":" + password + "@" + } + if host == "" { + host = "localhost" + } + if port == 0 { + port = 27017 + } + if options != "" { + options = "?" + options + } + url := fmt.Sprintf("mongodb://%s%s:%d/admin%s", login, host, port, options) + return url +} + +func NewMongoClient() (*newMongo.Client, error) { + if globalMongoSession != nil { + return globalMongoSession, nil + } + cfg, b := MgoSessionMgrSington.GetCfg(G_P, "user") + if !b { + return nil, errors.New("not db") + } + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + client, err := newMongo.Connect(ctx, options.Client().ApplyURI(mongoURI(cfg.Username, cfg.Password, cfg.HostName, cfg.HostPort, cfg.Options))) + if err != nil { + logger.Logger.Errorf("NewMongoClient error:%v", err) + return nil, err + } + if client == nil { + return nil, errors.New("not db client") + } + globalMongoSession = client + return client, nil +} diff --git a/dbproxy/mq/c_invite.go b/dbproxy/mq/c_invite.go index 2c84406..3f4b0ea 100644 --- a/dbproxy/mq/c_invite.go +++ b/dbproxy/mq/c_invite.go @@ -1,15 +1,19 @@ package mq import ( + "context" "encoding/json" "errors" "time" + "go.mongodb.org/mongo-driver/bson" + newMongo "go.mongodb.org/mongo-driver/mongo" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/common" + "mongo.games.com/game/dbproxy/mongo" "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" @@ -44,25 +48,6 @@ func init() { return n, err } - // 增加积分 - add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error { - if psnid <= 0 { - return nil - } - err = svc.AddInviteScore(log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney) - if err != nil { - logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) - return err - } - msg, err := get(psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) - return err - } - mq.Send(model.EvtInviteAck, msg) - return nil - } - // 重置积分 reset := func(snid int32) error { if snid == 0 { @@ -119,98 +104,203 @@ func init() { } } - switch log.Tp { - case common.InviteScoreTypeBind: - code, err := svc.GetCodeBySnId(log.Platform, log.InviteSnId) - if err != nil { - logger.Logger.Errorf("EvtInvite GetCodeBySnId error:%v %v", err, log.InviteSnId) - return err - } - // 绑定关系 - err = svc.BindInviteSnId(log.Platform, log.SnId, log.InviteSnId, code) - if err != nil { - logger.Logger.Errorf("EvtInvite BindInviteSnId error:%v msg:%+v", err, log.InviteScore) - return err - } - // 更新绑定数量 - // 更新邀请积分 - // 1.邀请人增加积分 - add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false) - // 2.上级增加积分 - var psnid int32 - if len(log.Rate) > 0 { - psnid, err = svc.GetPSnId(log.Platform, log.InviteSnId) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) - return err - } - if psnid > 0 { - add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false) - } - } - if len(log.Rate) > 1 && psnid > 0 { - psnid, err = svc.GetPSnId(log.Platform, psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) - return err - } - if psnid > 0 { - add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) - } - } - if len(log.Rate) > 2 && psnid > 0 { - psnid, err = svc.GetPSnId(log.Platform, psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) - return err - } - if psnid > 0 { - add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) - } - } - - case common.InviteScoreTypePay: - // 更新充值积分,上级积分增加 - add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false) - var psnid int32 - if len(log.Rate) > 0 { - add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true) - psnid, err = svc.GetPSnId(log.Platform, log.InviteSnId) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) - return err - } - } - if len(log.Rate) > 1 && psnid > 0 { - add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) - psnid, err = svc.GetPSnId(log.Platform, psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid) - return err - } - } - if len(log.Rate) > 2 && psnid > 0 { - add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) - } - if addRechargeScore { - add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true) - } - - case common.InviteScoreTypeRecharge: - // 更新自己的积分 - if addRechargeScore { - add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true) - } else { - // 只增加充值金额 - add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true) - } - - case common.InviteScoreCheckWeek: - - default: - logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp) + mongoClient, err := mongo.NewMongoClient() + if err != nil { + logger.Logger.Errorf("EvtInvite NewMongoClient error:%v", err) return err } + session, err := mongoClient.StartSession() + if err != nil { + logger.Logger.Errorf("EvtInvite StartSession error:%v", err) + return err + } + defer session.EndSession(context.Background()) + + var notifySnId []int32 + // 修改积分 + err = newMongo.WithSession(context.Background(), session, func(sc newMongo.SessionContext) error { + err := session.StartTransaction() + if err != nil { + return err + } + + // 增加积分 + add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error { + if psnid <= 0 { + return nil + } + notifySnId = append(notifySnId, psnid) + err = svc.AddInviteScore(mongoClient, sc, log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney) + if err != nil { + logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) + return err + } + return nil + } + + getPSnId := func(platform string, snid int32) (int32, error) { + cfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, svc.PlayerDBName) + if !b { + return 0, errors.New("not db") + } + playerDB := mongoClient.Database(cfg.Database) + type M struct { + PSnId int32 + } + res := &M{} + c := playerDB.Collection(svc.PlayerCollName) + err = c.FindOne(context.TODO(), bson.D{{"snid", snid}}).Decode(res) + if err != nil && !errors.Is(err, newMongo.ErrNoDocuments) { + return 0, err + } + return res.PSnId, nil + } + + switch log.Tp { + case common.InviteScoreTypeBind: + // 更新绑定数量 + // 更新邀请积分 + // 1.邀请人增加积分 + err = add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false) + if err != nil { + session.AbortTransaction(sc) + return err + } + // 2.上级增加积分 + var psnid int32 + if len(log.Rate) > 0 { + psnid, err = getPSnId(log.Platform, log.InviteSnId) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) + return err + } + if psnid > 0 { + err = add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + } + if len(log.Rate) > 1 && psnid > 0 { + psnid, err = getPSnId(log.Platform, psnid) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) + return err + } + if psnid > 0 { + err = add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + } + if len(log.Rate) > 2 && psnid > 0 { + psnid, err = getPSnId(log.Platform, psnid) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) + return err + } + if psnid > 0 { + err = add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + } + + case common.InviteScoreTypePay: + // 更新充值积分,上级积分增加 + add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false) + var psnid int32 + if len(log.Rate) > 0 { + err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true) + if err != nil { + session.AbortTransaction(sc) + return err + } + psnid, err = getPSnId(log.Platform, log.InviteSnId) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) + return err + } + } + if len(log.Rate) > 1 && psnid > 0 { + err = add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + psnid, err = getPSnId(log.Platform, psnid) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid) + return err + } + } + if len(log.Rate) > 2 && psnid > 0 { + err = add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + if addRechargeScore { + err = add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + + case common.InviteScoreTypeRecharge: + // 更新自己的积分 + if addRechargeScore { + err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true) + } else { + // 只增加充值金额 + err = add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true) + } + if err != nil { + session.AbortTransaction(sc) + return err + } + + case common.InviteScoreCheckWeek: + + default: + logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp) + return err + } + + // 提交事务 + err = session.CommitTransaction(sc) + if err != nil { + logger.Logger.Errorf("EvtInvite CommitTransaction error:%v", err) + return err + } + return nil + }) + if err != nil { + logger.Logger.Errorf("EvtInvite WithSession error:%v", err) + return err + } + // 通知变更 + for _, v := range notifySnId { + msg, err := get(v) + if err != nil { + logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, v, log.Score, log.Rate) + return err + } + mq.Send(model.EvtInviteAck, msg) + } + } return nil }, broker.Queue(model.EvtInvite), broker.DisableAutoAck(), rabbitmq.DurableQueue()) diff --git a/dbproxy/svc/l_invitescore.go b/dbproxy/svc/l_invitescore.go index 7b29c11..bd4dc30 100644 --- a/dbproxy/svc/l_invitescore.go +++ b/dbproxy/svc/l_invitescore.go @@ -56,13 +56,13 @@ func CheckInviteScore(req *model.InviteScore) (b bool, err error) { } // 不能重复绑定 - if req.Tp == common.InviteScoreTypeBind { - psnid, err := GetPSnId(req.Platform, req.SnId) - if err == nil && psnid > 0 { - // 已经绑定 - return false, errors.New("already bind") - } - } + //if req.Tp == common.InviteScoreTypeBind { + // psnid, err := GetPSnId(req.Platform, req.SnId) + // if err == nil && psnid > 0 { + // // 已经绑定 + // return false, errors.New("already bind") + // } + //} // 必须已经绑定 if req.Tp != common.InviteScoreTypeBind { diff --git a/dbproxy/svc/u_player.go b/dbproxy/svc/u_player.go index 92a5804..7cbbc64 100644 --- a/dbproxy/svc/u_player.go +++ b/dbproxy/svc/u_player.go @@ -11,12 +11,15 @@ import ( "strings" "time" + newMongo "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/task" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + newBson "go.mongodb.org/mongo-driver/bson" "mongo.games.com/game/common" "mongo.games.com/game/dbproxy/mongo" @@ -1548,6 +1551,15 @@ func (svc *PlayerDataSvc) GetPlayerInviteSnid(req *model.PlayerIsExistBySnIdArgs return nil } +func (svc *PlayerDataSvc) BindInviteSnId(args *model.BindArgs, ret *bool) error { + err := BindInviteSnId(args.Platform, args.SnId, args.PSnId, args.PCode) + if err != nil { + return err + } + *ret = true + return nil +} + func BindInviteSnId(platform string, snId, inviteSnId int32, code string) error { c := PlayerDataCollection(platform) if c == nil { @@ -1559,11 +1571,6 @@ func BindInviteSnId(platform string, snId, inviteSnId int32, code string) error logger.Logger.Error("BindInviteSnId error ", err) return err } - err = c.Update(bson.M{"snid": inviteSnId}, bson.M{"$inc": bson.D{{"inum", 1}}}) - if err != nil { - logger.Logger.Error("BindInviteSnId inc error ", err) - return err - } return nil } @@ -1582,18 +1589,37 @@ func ClearInviteScore(platform string, snId int32, now time.Time) error { return nil } -func AddInviteScore(platform string, psnId, snid, level, tp int32, num, rate, money int64, now time.Time, addMoney bool) error { +func AddInviteScore(client *newMongo.Client, sc newMongo.SessionContext, platform string, psnId, snid, level, tp int32, num, rate, money int64, now time.Time, addMoney bool) error { logger.Logger.Tracef("AddInviteScore ==> platform %v, psnId %v, snid %v, level %v, tp %v, num %v, rate %v, money %v, now %v, addMoney %v", platform, psnId, snid, level, tp, num, rate, money, now, addMoney) - i := InviteScoreCollection(platform) - if i == nil { + + inviteCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, InviteScoreDBName) + if !b { return InviteScoreColError } + inviteC := client.Database(inviteCfg.Database).Collection(InviteScoreCollName) + if inviteC == nil { + return InviteScoreColError + } + playerCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, PlayerDBName) + if !b { + return PlayerColError + } + playerC := client.Database(playerCfg.Database).Collection(PlayerCollName) + if playerC == nil { + return PlayerColError + } + rankCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, RankInviteDBName) + if !b { + return RankDataDBErr + } + rankC := client.Database(rankCfg.Database).Collection(RankInviteCollName) + if rankC == nil { + return RankDataDBErr + } - id := bson.NewObjectId() - err := i.Insert(&model.LogInviteScore{ - Id: id, - Platform: platform, + // 积分变更记录 + _, err := inviteC.InsertOne(sc, &model.NInviteScore{ UpSnid: psnId, DownSnid: snid, Level: level, @@ -1603,48 +1629,54 @@ func AddInviteScore(platform string, psnId, snid, level, tp int32, num, rate, mo Money: money, Ts: now.Unix(), }) - if err != nil { logger.Logger.Error("AddInviteScore LogInviteScore error ", err) return err } - c := PlayerDataCollection(platform) - if c == nil { - return PlayerColError - } - - myMoney := money - if !addMoney { - myMoney = 0 - } - - err = c.Update(bson.M{"snid": psnId}, bson.M{"$inc": bson.M{"iscore": num, "imoney": myMoney}, "$set": bson.M{"iscorets": now}}) - if err != nil { - i.RemoveId(id) - logger.Logger.Error("AddInviteScore error ", err) - return err - } - - // 更新排行榜 + // 玩家积分和绑定数量 type m struct { IScore int64 INum int64 } res := &m{} - err = c.Find(bson.M{"snid": psnId}).Select(bson.M{"iscore": 1, "inum": 1}).One(res) + err = playerC.FindOne(sc, newBson.M{"snid": psnId}).Decode(res) if err != nil { logger.Logger.Error("AddInviteScore find error ", err) return err } - SaveRankInvite(&model.RankInvite{ + // 修改玩家积分 + myMoney := money + if !addMoney { + myMoney = 0 + } + + addNum := int64(0) + if level == 0 && tp == common.InviteScoreTypeBind { + addNum = 1 + } + + _, err = playerC.UpdateOne(sc, newBson.M{"snid": psnId}, newBson.M{"$inc": bson.M{"iscore": num, "imoney": myMoney, "inum": addNum}, "$set": bson.M{"iscorets": now}}) + if err != nil { + logger.Logger.Error("AddInviteScore error ", err) + return err + } + + // 更新排行榜 + data := &model.RankInvite{ Platform: platform, SnId: psnId, - Num: res.INum, - Score: res.IScore, + Num: res.INum + addNum, + Score: res.IScore + num, Ts: now.Unix(), - }) + Week: common.GetWeekStartTs(now.Unix()), + } + _, err = rankC.UpdateOne(sc, newBson.M{"snid": psnId, "week": data.Week}, data, options.Update().SetUpsert(true)) + if err != nil { + logger.Logger.Tracef("SaveRankInvite error:%v", err) + return err + } return nil } diff --git a/model/invitecode.go b/model/invitecode.go index 0a7951d..381ba49 100644 --- a/model/invitecode.go +++ b/model/invitecode.go @@ -131,3 +131,14 @@ type LogInviteScore struct { Money int64 // 充值金额 Ts int64 // 时间戳 } + +type NInviteScore struct { + UpSnid int32 // 上级代理 + DownSnid int32 // 下级代理 + Level int32 // 代理层级 例如 1:DownSnid 是 UpSnid 的 1 级代理; 2: DownSnid 是 UpSnid 的 2 级代理 + Tp int32 // 返佣类型 + Rate int64 // 返佣比例 + Score int64 // 积分 + Money int64 // 充值金额 + Ts int64 // 时间戳 +} diff --git a/model/player.go b/model/player.go index 8b6d39c..b8636d3 100644 --- a/model/player.go +++ b/model/player.go @@ -2941,3 +2941,31 @@ func (this *PlayerData) GetRoleId() int32 { } return common.DefaultRoleId } + +type BindArgs struct { + Platform string + PSnId, SnId int32 + PCode string +} + +func BindInviteSnId(platform string, snId, pSnId int32, code string) error { + if rpcCli == nil { + return fmt.Errorf("db may be close") + } + var args = &BindArgs{ + Platform: platform, + PSnId: pSnId, + SnId: snId, + PCode: code, + } + var ret bool + err := rpcCli.CallWithTimeout("PlayerDataSvc.BindInviteSnId", args, &ret, time.Second*30) + if err != nil { + logger.Logger.Warnf("BindInviteSnId error:%v", err) + return err + } + if ret { + return nil + } + return errors.New("bind error") +} diff --git a/worldsrv/action_welfare.go b/worldsrv/action_welfare.go index 80a7c1b..36120da 100644 --- a/worldsrv/action_welfare.go +++ b/worldsrv/action_welfare.go @@ -403,7 +403,7 @@ func CSBindInvite(s *netlib.Session, packetid int, data interface{}, sid int64) } } - if p.PSnId != 0 { + if p.PSnId != 0 || p.PCode != "" { ret.OpRetCode = welfare.OpResultCode_OPRC_AlreadyBind send() return nil @@ -440,15 +440,20 @@ func CSBindInvite(s *netlib.Session, packetid int, data interface{}, sid int64) break } } - SaveInviteScore(&model.InviteScore{ - Platform: p.Platform, - SnId: p.SnId, - InviteSnId: inviteSnId, - Tp: common.InviteScoreTypeBind, - Score: cfg.GetBindScore(), - Ts: now.Unix(), - Money: 0, - }) + if err == nil { + err = model.BindInviteSnId(p.Platform, p.SnId, inviteSnId, msg.Code) + } + if err == nil { + SaveInviteScore(&model.InviteScore{ + Platform: p.Platform, + SnId: p.SnId, + InviteSnId: inviteSnId, + Tp: common.InviteScoreTypeBind, + Score: cfg.GetBindScore(), + Ts: now.Unix(), + Money: 0, + }) + } return nil }), task.CompleteNotifyWrapper(func(i interface{}, t task.Task) { if err != nil { @@ -459,7 +464,7 @@ func CSBindInvite(s *netlib.Session, packetid int, data interface{}, sid int64) p.PCode = msg.GetCode() ret.OpRetCode = welfare.OpResultCode_OPRC_Sucess send() - })).Start() + })).StartByFixExecutor("invite_score") })).Start() return nil }