diff --git a/dbproxy/mongo/newmongo.go b/dbproxy/mongo/newmongo.go new file mode 100644 index 0000000..f5f7e8d --- /dev/null +++ b/dbproxy/mongo/newmongo.go @@ -0,0 +1,53 @@ +package mongo + +import ( + "context" + "errors" + "fmt" + "time" + + newMongo "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "mongo.games.com/goserver/core/logger" +) + +var globalMongoSession *newMongo.Client + +func mongoURI(user, password, host string, port int32, options string) string { + login := "" + if user != "" { + login = user + ":" + password + "@" + } + if host == "" { + host = "localhost" + } + if port == 0 { + port = 27017 + } + if options != "" { + options = "?" + options + } + url := fmt.Sprintf("mongodb://%s%s:%d/admin%s", login, host, port, options) + return url +} + +func NewMongoClient() (*newMongo.Client, error) { + if globalMongoSession != nil { + return globalMongoSession, nil + } + cfg, b := MgoSessionMgrSington.GetCfg(G_P, "user") + if !b { + return nil, errors.New("not db") + } + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + client, err := newMongo.Connect(ctx, options.Client().ApplyURI(mongoURI(cfg.Username, cfg.Password, cfg.HostName, cfg.HostPort, cfg.Options))) + if err != nil { + logger.Logger.Errorf("NewMongoClient error:%v", err) + return nil, err + } + if client == nil { + return nil, errors.New("not db client") + } + globalMongoSession = client + return client, nil +} diff --git a/dbproxy/mq/c_invite.go b/dbproxy/mq/c_invite.go index 90b1e64..f75f10c 100644 --- a/dbproxy/mq/c_invite.go +++ b/dbproxy/mq/c_invite.go @@ -1,15 +1,19 @@ package mq import ( + "context" "encoding/json" "errors" "time" + "go.mongodb.org/mongo-driver/bson" + newMongo "go.mongodb.org/mongo-driver/mongo" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/common" + "mongo.games.com/game/dbproxy/mongo" "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" @@ -100,106 +104,206 @@ func init() { } } - // 增加积分 - add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error { - if psnid <= 0 { - return nil - } - err = svc.AddInviteScore(log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney) + mongoClient, err := mongo.NewMongoClient() + if err != nil { + logger.Logger.Errorf("EvtInvite NewMongoClient error:%v", err) + return err + } + session, err := mongoClient.StartSession() + if err != nil { + logger.Logger.Errorf("EvtInvite StartSession error:%v", err) + return err + } + defer session.EndSession(context.Background()) + // 事务不支持创建集合和索引,需要提前创建 + svc.InviteScoreCollection(log.Platform) + svc.RankInviteCollection(log.Platform) + + var notifySnId []int32 + // 修改积分 + err = newMongo.WithSession(context.Background(), session, func(sc newMongo.SessionContext) error { + err := session.StartTransaction() if err != nil { - logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) return err } - msg, err := get(psnid) + + // 增加积分 + add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error { + if psnid <= 0 { + return nil + } + notifySnId = append(notifySnId, psnid) + err = svc.AddInviteScore(mongoClient, sc, log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney) + if err != nil { + logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) + return err + } + return nil + } + + getPSnId := func(platform string, snid int32) (int32, error) { + cfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, svc.PlayerDBName) + if !b { + return 0, errors.New("not db") + } + playerDB := mongoClient.Database(cfg.Database) + type M struct { + PSnId int32 + } + res := &M{} + c := playerDB.Collection(svc.PlayerCollName) + err = c.FindOne(context.TODO(), bson.D{{"snid", snid}}).Decode(res) + if err != nil && !errors.Is(err, newMongo.ErrNoDocuments) { + return 0, err + } + return res.PSnId, nil + } + + switch log.Tp { + case common.InviteScoreTypeBind: + // 更新绑定数量 + // 更新邀请积分 + // 1.邀请人增加积分 + err = add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false) + if err != nil { + session.AbortTransaction(sc) + return err + } + // 2.上级增加积分 + var psnid int32 + if len(log.Rate) > 0 { + psnid, err = getPSnId(log.Platform, log.InviteSnId) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) + return err + } + if psnid > 0 { + err = add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + } + if len(log.Rate) > 1 && psnid > 0 { + psnid, err = getPSnId(log.Platform, psnid) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) + return err + } + if psnid > 0 { + err = add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + } + if len(log.Rate) > 2 && psnid > 0 { + psnid, err = getPSnId(log.Platform, psnid) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) + return err + } + if psnid > 0 { + err = add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + } + + case common.InviteScoreTypePay: + // 更新充值积分,上级积分增加 + add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false) + var psnid int32 + if len(log.Rate) > 0 { + err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true) + if err != nil { + session.AbortTransaction(sc) + return err + } + psnid, err = getPSnId(log.Platform, log.InviteSnId) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) + return err + } + } + if len(log.Rate) > 1 && psnid > 0 { + err = add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + psnid, err = getPSnId(log.Platform, psnid) + if err != nil { + session.AbortTransaction(sc) + logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid) + return err + } + } + if len(log.Rate) > 2 && psnid > 0 { + err = add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + if addRechargeScore { + err = add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true) + if err != nil { + session.AbortTransaction(sc) + return err + } + } + + case common.InviteScoreTypeRecharge: + // 更新自己的积分 + if addRechargeScore { + err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true) + } else { + // 只增加充值金额 + err = add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true) + } + if err != nil { + session.AbortTransaction(sc) + return err + } + + case common.InviteScoreCheckWeek: + + default: + logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp) + return err + } + + // 提交事务 + err = session.CommitTransaction(sc) if err != nil { - logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate) + logger.Logger.Errorf("EvtInvite CommitTransaction error:%v", err) + return err + } + return nil + }) + if err != nil { + logger.Logger.Errorf("EvtInvite WithSession error:%v", err) + return err + } + // 通知变更 + for _, v := range notifySnId { + msg, err := get(v) + if err != nil { + logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, v, log.Score, log.Rate) return err } mq.Send(model.EvtInviteAck, msg) - return nil } - switch log.Tp { - case common.InviteScoreTypeBind: - // 更新绑定数量 - // 更新邀请积分 - // 1.邀请人增加积分 - add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false) - // 2.上级增加积分 - var psnid int32 - if len(log.Rate) > 0 { - psnid, err = svc.GetPSnId(log.Platform, log.InviteSnId) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) - return err - } - if psnid > 0 { - add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false) - } - } - if len(log.Rate) > 1 && psnid > 0 { - psnid, err = svc.GetPSnId(log.Platform, psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId) - return err - } - if psnid > 0 { - add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) - } - } - if len(log.Rate) > 2 && psnid > 0 { - psnid, err = svc.GetPSnId(log.Platform, psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) - return err - } - if psnid > 0 { - add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) - } - } - - case common.InviteScoreTypePay: - // 更新充值积分,上级积分增加 - add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false) - var psnid int32 - if len(log.Rate) > 0 { - add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true) - psnid, err = svc.GetPSnId(log.Platform, log.InviteSnId) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId) - return err - } - } - if len(log.Rate) > 1 && psnid > 0 { - add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false) - psnid, err = svc.GetPSnId(log.Platform, psnid) - if err != nil { - logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid) - return err - } - } - if len(log.Rate) > 2 && psnid > 0 { - add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false) - } - if addRechargeScore { - add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true) - } - - case common.InviteScoreTypeRecharge: - // 更新自己的积分 - if addRechargeScore { - add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true) - } else { - // 只增加充值金额 - add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true) - } - - case common.InviteScoreCheckWeek: - - default: - logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp) - return err - } } return nil }, broker.Queue(model.EvtInvite), broker.DisableAutoAck(), rabbitmq.DurableQueue()) diff --git a/dbproxy/svc/u_player.go b/dbproxy/svc/u_player.go index c6ff636..2b3b7fb 100644 --- a/dbproxy/svc/u_player.go +++ b/dbproxy/svc/u_player.go @@ -11,12 +11,15 @@ import ( "strings" "time" + newMongo "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/task" "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + newBson "go.mongodb.org/mongo-driver/bson" "mongo.games.com/game/common" "mongo.games.com/game/dbproxy/mongo" @@ -1568,6 +1571,11 @@ func BindInviteSnId(platform string, snId, inviteSnId int32, code string) error logger.Logger.Error("BindInviteSnId error ", err) return err } + err = c.Update(bson.M{"snid": inviteSnId}, bson.M{"$inc": bson.M{"inum": 1}}) + if err != nil { + logger.Logger.Error("BindInviteSnId error ", err) + return err + } return nil } @@ -1586,18 +1594,37 @@ func ClearInviteScore(platform string, snId int32, now time.Time) error { return nil } -func AddInviteScore(platform string, psnId, snid, level, tp int32, num, rate, money int64, now time.Time, addMoney bool) error { +func AddInviteScore(client *newMongo.Client, sc newMongo.SessionContext, platform string, psnId, snid, level, tp int32, num, rate, money int64, now time.Time, addMoney bool) error { logger.Logger.Tracef("AddInviteScore ==> platform %v, psnId %v, snid %v, level %v, tp %v, num %v, rate %v, money %v, now %v, addMoney %v", platform, psnId, snid, level, tp, num, rate, money, now, addMoney) - i := InviteScoreCollection(platform) - if i == nil { + + inviteCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, InviteScoreDBName) + if !b { return InviteScoreColError } + inviteC := client.Database(inviteCfg.Database).Collection(InviteScoreCollName) + if inviteC == nil { + return InviteScoreColError + } + playerCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, PlayerDBName) + if !b { + return PlayerColError + } + playerC := client.Database(playerCfg.Database).Collection(PlayerCollName) + if playerC == nil { + return PlayerColError + } + rankCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, RankInviteDBName) + if !b { + return RankDataDBErr + } + rankC := client.Database(rankCfg.Database).Collection(RankInviteCollName) + if rankC == nil { + return RankDataDBErr + } - id := bson.NewObjectId() - err := i.Insert(&model.LogInviteScore{ - Id: id, - Platform: platform, + // 积分变更记录 + _, err := inviteC.InsertOne(sc, &model.NInviteScore{ UpSnid: psnId, DownSnid: snid, Level: level, @@ -1607,53 +1634,54 @@ func AddInviteScore(platform string, psnId, snid, level, tp int32, num, rate, mo Money: money, Ts: now.Unix(), }) - if err != nil { logger.Logger.Error("AddInviteScore LogInviteScore error ", err) return err } - c := PlayerDataCollection(platform) - if c == nil { - return PlayerColError - } - - myMoney := money - if !addMoney { - myMoney = 0 - } - - addNum := int64(0) - if level == 0 && tp == common.InviteScoreTypeBind { - addNum = 1 - } - - err = c.Update(bson.M{"snid": psnId}, bson.M{"$inc": bson.M{"iscore": num, "imoney": myMoney, "inum": addNum}, "$set": bson.M{"iscorets": now}}) - if err != nil { - i.RemoveId(id) - logger.Logger.Error("AddInviteScore error ", err) - return err - } - - // 更新排行榜 + // 玩家积分和绑定数量 type m struct { IScore int64 INum int64 } res := &m{} - err = c.Find(bson.M{"snid": psnId}).Select(bson.M{"iscore": 1, "inum": 1}).One(res) + err = playerC.FindOne(sc, newBson.M{"snid": psnId}).Decode(res) if err != nil { logger.Logger.Error("AddInviteScore find error ", err) return err } - SaveRankInvite(&model.RankInvite{ + // 修改玩家积分 + myMoney := money + if !addMoney { + myMoney = 0 + } + + //addNum := int64(0) + //if level == 0 && tp == common.InviteScoreTypeBind { + // addNum = 1 + //} + + _, err = playerC.UpdateOne(sc, newBson.M{"snid": psnId}, newBson.M{"$inc": bson.M{"iscore": num, "imoney": myMoney}, "$set": bson.M{"iscorets": now}}) + if err != nil { + logger.Logger.Error("AddInviteScore error ", err) + return err + } + + // 更新排行榜 + data := model.RankInvite{ Platform: platform, SnId: psnId, Num: res.INum, - Score: res.IScore, + Score: res.IScore + num, Ts: now.Unix(), - }) + Week: common.GetWeekStartTs(now.Unix()), + } + _, err = rankC.UpdateOne(sc, newBson.M{"snid": psnId, "week": data.Week}, newBson.M{"$set": data}, options.Update().SetUpsert(true)) + if err != nil { + logger.Logger.Tracef("SaveRankInvite error:%v", err) + return err + } return nil } diff --git a/model/invitecode.go b/model/invitecode.go index 0a7951d..381ba49 100644 --- a/model/invitecode.go +++ b/model/invitecode.go @@ -131,3 +131,14 @@ type LogInviteScore struct { Money int64 // 充值金额 Ts int64 // 时间戳 } + +type NInviteScore struct { + UpSnid int32 // 上级代理 + DownSnid int32 // 下级代理 + Level int32 // 代理层级 例如 1:DownSnid 是 UpSnid 的 1 级代理; 2: DownSnid 是 UpSnid 的 2 级代理 + Tp int32 // 返佣类型 + Rate int64 // 返佣比例 + Score int64 // 积分 + Money int64 // 充值金额 + Ts int64 // 时间戳 +} diff --git a/worldsrv/action_welfare.go b/worldsrv/action_welfare.go index b4516f7..048280a 100644 --- a/worldsrv/action_welfare.go +++ b/worldsrv/action_welfare.go @@ -440,7 +440,6 @@ func CSBindInvite(s *netlib.Session, packetid int, data interface{}, sid int64) break } } - if err == nil { err = model.BindInviteSnId(p.Platform, p.SnId, inviteSnId, msg.Code) }