Compare commits

...

4 Commits

Author SHA1 Message Date
sk 1fcb3031ed 解决冲突 2024-06-28 18:57:13 +08:00
sk 446e696d7e 新邀请活动 2024-06-28 18:52:15 +08:00
sk f1ebe57d04 新邀请活动 2024-06-28 18:37:07 +08:00
sk 303f0817ca mongodb事务 2024-06-28 15:07:57 +08:00
5 changed files with 322 additions and 127 deletions

53
dbproxy/mongo/newmongo.go Normal file
View File

@ -0,0 +1,53 @@
package mongo
import (
"context"
"errors"
"fmt"
"time"
newMongo "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"mongo.games.com/goserver/core/logger"
)
var globalMongoSession *newMongo.Client
func mongoURI(user, password, host string, port int32, options string) string {
login := ""
if user != "" {
login = user + ":" + password + "@"
}
if host == "" {
host = "localhost"
}
if port == 0 {
port = 27017
}
if options != "" {
options = "?" + options
}
url := fmt.Sprintf("mongodb://%s%s:%d/admin%s", login, host, port, options)
return url
}
func NewMongoClient() (*newMongo.Client, error) {
if globalMongoSession != nil {
return globalMongoSession, nil
}
cfg, b := MgoSessionMgrSington.GetCfg(G_P, "user")
if !b {
return nil, errors.New("not db")
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
client, err := newMongo.Connect(ctx, options.Client().ApplyURI(mongoURI(cfg.Username, cfg.Password, cfg.HostName, cfg.HostPort, cfg.Options)))
if err != nil {
logger.Logger.Errorf("NewMongoClient error:%v", err)
return nil, err
}
if client == nil {
return nil, errors.New("not db client")
}
globalMongoSession = client
return client, nil
}

View File

@ -1,15 +1,19 @@
package mq
import (
"context"
"encoding/json"
"errors"
"time"
"go.mongodb.org/mongo-driver/bson"
newMongo "go.mongodb.org/mongo-driver/mongo"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/common"
"mongo.games.com/game/dbproxy/mongo"
"mongo.games.com/game/dbproxy/svc"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
@ -100,106 +104,206 @@ func init() {
}
}
// 增加积分
add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error {
if psnid <= 0 {
return nil
}
err = svc.AddInviteScore(log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney)
mongoClient, err := mongo.NewMongoClient()
if err != nil {
logger.Logger.Errorf("EvtInvite NewMongoClient error:%v", err)
return err
}
session, err := mongoClient.StartSession()
if err != nil {
logger.Logger.Errorf("EvtInvite StartSession error:%v", err)
return err
}
defer session.EndSession(context.Background())
// 事务不支持创建集合和索引,需要提前创建
svc.InviteScoreCollection(log.Platform)
svc.RankInviteCollection(log.Platform)
var notifySnId []int32
// 修改积分
err = newMongo.WithSession(context.Background(), session, func(sc newMongo.SessionContext) error {
err := session.StartTransaction()
if err != nil {
logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate)
return err
}
msg, err := get(psnid)
// 增加积分
add := func(psnid, snid, level, tp int32, score, money, rate int64, addMoney bool) error {
if psnid <= 0 {
return nil
}
notifySnId = append(notifySnId, psnid)
err = svc.AddInviteScore(mongoClient, sc, log.Platform, psnid, snid, level, tp, score*rate/10000, rate, money, now, addMoney)
if err != nil {
logger.Logger.Errorf("EvtInvite add error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate)
return err
}
return nil
}
getPSnId := func(platform string, snid int32) (int32, error) {
cfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, svc.PlayerDBName)
if !b {
return 0, errors.New("not db")
}
playerDB := mongoClient.Database(cfg.Database)
type M struct {
PSnId int32
}
res := &M{}
c := playerDB.Collection(svc.PlayerCollName)
err = c.FindOne(context.TODO(), bson.D{{"snid", snid}}).Decode(res)
if err != nil && !errors.Is(err, newMongo.ErrNoDocuments) {
return 0, err
}
return res.PSnId, nil
}
switch log.Tp {
case common.InviteScoreTypeBind:
// 更新绑定数量
// 更新邀请积分
// 1.邀请人增加积分
err = add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false)
if err != nil {
session.AbortTransaction(sc)
return err
}
// 2.上级增加积分
var psnid int32
if len(log.Rate) > 0 {
psnid, err = getPSnId(log.Platform, log.InviteSnId)
if err != nil {
session.AbortTransaction(sc)
logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId)
return err
}
if psnid > 0 {
err = add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false)
if err != nil {
session.AbortTransaction(sc)
return err
}
}
}
if len(log.Rate) > 1 && psnid > 0 {
psnid, err = getPSnId(log.Platform, psnid)
if err != nil {
session.AbortTransaction(sc)
logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId)
return err
}
if psnid > 0 {
err = add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false)
if err != nil {
session.AbortTransaction(sc)
return err
}
}
}
if len(log.Rate) > 2 && psnid > 0 {
psnid, err = getPSnId(log.Platform, psnid)
if err != nil {
session.AbortTransaction(sc)
logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId)
return err
}
if psnid > 0 {
err = add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false)
if err != nil {
session.AbortTransaction(sc)
return err
}
}
}
case common.InviteScoreTypePay:
// 更新充值积分,上级积分增加
add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false)
var psnid int32
if len(log.Rate) > 0 {
err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true)
if err != nil {
session.AbortTransaction(sc)
return err
}
psnid, err = getPSnId(log.Platform, log.InviteSnId)
if err != nil {
session.AbortTransaction(sc)
logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId)
return err
}
}
if len(log.Rate) > 1 && psnid > 0 {
err = add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false)
if err != nil {
session.AbortTransaction(sc)
return err
}
psnid, err = getPSnId(log.Platform, psnid)
if err != nil {
session.AbortTransaction(sc)
logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid)
return err
}
}
if len(log.Rate) > 2 && psnid > 0 {
err = add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false)
if err != nil {
session.AbortTransaction(sc)
return err
}
}
if addRechargeScore {
err = add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true)
if err != nil {
session.AbortTransaction(sc)
return err
}
}
case common.InviteScoreTypeRecharge:
// 更新自己的积分
if addRechargeScore {
err = add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true)
} else {
// 只增加充值金额
err = add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true)
}
if err != nil {
session.AbortTransaction(sc)
return err
}
case common.InviteScoreCheckWeek:
default:
logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp)
return err
}
// 提交事务
err = session.CommitTransaction(sc)
if err != nil {
logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, psnid, score, rate)
logger.Logger.Errorf("EvtInvite CommitTransaction error:%v", err)
return err
}
return nil
})
if err != nil {
logger.Logger.Errorf("EvtInvite WithSession error:%v", err)
return err
}
// 通知变更
for _, v := range notifySnId {
msg, err := get(v)
if err != nil {
logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, v, log.Score, log.Rate)
return err
}
mq.Send(model.EvtInviteAck, msg)
return nil
}
switch log.Tp {
case common.InviteScoreTypeBind:
// 更新绑定数量
// 更新邀请积分
// 1.邀请人增加积分
add(log.InviteSnId, log.SnId, 0, log.Tp, log.Score, log.Money, 10000, false)
// 2.上级增加积分
var psnid int32
if len(log.Rate) > 0 {
psnid, err = svc.GetPSnId(log.Platform, log.InviteSnId)
if err != nil {
logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId)
return err
}
if psnid > 0 {
add(psnid, log.InviteSnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], false)
}
}
if len(log.Rate) > 1 && psnid > 0 {
psnid, err = svc.GetPSnId(log.Platform, psnid)
if err != nil {
logger.Logger.Errorf("EvtInvite GetPSnId 2 error:%v snid:%v", err, log.InviteSnId)
return err
}
if psnid > 0 {
add(psnid, log.InviteSnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false)
}
}
if len(log.Rate) > 2 && psnid > 0 {
psnid, err = svc.GetPSnId(log.Platform, psnid)
if err != nil {
logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId)
return err
}
if psnid > 0 {
add(psnid, log.InviteSnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false)
}
}
case common.InviteScoreTypePay:
// 更新充值积分,上级积分增加
add(log.SnId, 0, 0, common.InviteScoreTypePayMe, log.Score, log.Money, 10000, false)
var psnid int32
if len(log.Rate) > 0 {
add(log.InviteSnId, log.SnId, 1, log.Tp, log.Score, log.Money, log.Rate[0], true)
psnid, err = svc.GetPSnId(log.Platform, log.InviteSnId)
if err != nil {
logger.Logger.Errorf("EvtInvite GetPSnId 3 error:%v snid:%v", err, log.InviteSnId)
return err
}
}
if len(log.Rate) > 1 && psnid > 0 {
add(psnid, log.SnId, 2, log.Tp, log.Score, log.Money, log.Rate[1], false)
psnid, err = svc.GetPSnId(log.Platform, psnid)
if err != nil {
logger.Logger.Errorf("EvtInvite GetPSnId 4 error:%v snid:%v", err, psnid)
return err
}
}
if len(log.Rate) > 2 && psnid > 0 {
add(psnid, log.SnId, 3, log.Tp, log.Score, log.Money, log.Rate[2], false)
}
if addRechargeScore {
add(log.InviteSnId, log.SnId, 1, common.InviteScoreTypeRecharge, log.RechargeScore, log.Money, 10000, true)
}
case common.InviteScoreTypeRecharge:
// 更新自己的积分
if addRechargeScore {
add(log.InviteSnId, log.SnId, 1, log.Tp, log.RechargeScore, log.Money, 10000, true)
} else {
// 只增加充值金额
add(log.InviteSnId, log.SnId, 0, log.Tp, 0, log.Money, 10000, true)
}
case common.InviteScoreCheckWeek:
default:
logger.Logger.Errorf("EvtInvite tp error, %v", log.Tp)
return err
}
}
return nil
}, broker.Queue(model.EvtInvite), broker.DisableAutoAck(), rabbitmq.DurableQueue())

View File

@ -11,12 +11,15 @@ import (
"strings"
"time"
newMongo "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/task"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
newBson "go.mongodb.org/mongo-driver/bson"
"mongo.games.com/game/common"
"mongo.games.com/game/dbproxy/mongo"
@ -1568,6 +1571,11 @@ func BindInviteSnId(platform string, snId, inviteSnId int32, code string) error
logger.Logger.Error("BindInviteSnId error ", err)
return err
}
err = c.Update(bson.M{"snid": inviteSnId}, bson.M{"$inc": bson.M{"inum": 1}})
if err != nil {
logger.Logger.Error("BindInviteSnId error ", err)
return err
}
return nil
}
@ -1586,18 +1594,37 @@ func ClearInviteScore(platform string, snId int32, now time.Time) error {
return nil
}
func AddInviteScore(platform string, psnId, snid, level, tp int32, num, rate, money int64, now time.Time, addMoney bool) error {
func AddInviteScore(client *newMongo.Client, sc newMongo.SessionContext, platform string, psnId, snid, level, tp int32, num, rate, money int64, now time.Time, addMoney bool) error {
logger.Logger.Tracef("AddInviteScore ==> platform %v, psnId %v, snid %v, level %v, tp %v, num %v, rate %v, money %v, now %v, addMoney %v",
platform, psnId, snid, level, tp, num, rate, money, now, addMoney)
i := InviteScoreCollection(platform)
if i == nil {
inviteCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, InviteScoreDBName)
if !b {
return InviteScoreColError
}
inviteC := client.Database(inviteCfg.Database).Collection(InviteScoreCollName)
if inviteC == nil {
return InviteScoreColError
}
playerCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, PlayerDBName)
if !b {
return PlayerColError
}
playerC := client.Database(playerCfg.Database).Collection(PlayerCollName)
if playerC == nil {
return PlayerColError
}
rankCfg, b := mongo.MgoSessionMgrSington.GetCfg(platform, RankInviteDBName)
if !b {
return RankDataDBErr
}
rankC := client.Database(rankCfg.Database).Collection(RankInviteCollName)
if rankC == nil {
return RankDataDBErr
}
id := bson.NewObjectId()
err := i.Insert(&model.LogInviteScore{
Id: id,
Platform: platform,
// 积分变更记录
_, err := inviteC.InsertOne(sc, &model.NInviteScore{
UpSnid: psnId,
DownSnid: snid,
Level: level,
@ -1607,53 +1634,54 @@ func AddInviteScore(platform string, psnId, snid, level, tp int32, num, rate, mo
Money: money,
Ts: now.Unix(),
})
if err != nil {
logger.Logger.Error("AddInviteScore LogInviteScore error ", err)
return err
}
c := PlayerDataCollection(platform)
if c == nil {
return PlayerColError
}
myMoney := money
if !addMoney {
myMoney = 0
}
addNum := int64(0)
if level == 0 && tp == common.InviteScoreTypeBind {
addNum = 1
}
err = c.Update(bson.M{"snid": psnId}, bson.M{"$inc": bson.M{"iscore": num, "imoney": myMoney, "inum": addNum}, "$set": bson.M{"iscorets": now}})
if err != nil {
i.RemoveId(id)
logger.Logger.Error("AddInviteScore error ", err)
return err
}
// 更新排行榜
// 玩家积分和绑定数量
type m struct {
IScore int64
INum int64
}
res := &m{}
err = c.Find(bson.M{"snid": psnId}).Select(bson.M{"iscore": 1, "inum": 1}).One(res)
err = playerC.FindOne(sc, newBson.M{"snid": psnId}).Decode(res)
if err != nil {
logger.Logger.Error("AddInviteScore find error ", err)
return err
}
SaveRankInvite(&model.RankInvite{
// 修改玩家积分
myMoney := money
if !addMoney {
myMoney = 0
}
//addNum := int64(0)
//if level == 0 && tp == common.InviteScoreTypeBind {
// addNum = 1
//}
_, err = playerC.UpdateOne(sc, newBson.M{"snid": psnId}, newBson.M{"$inc": bson.M{"iscore": num, "imoney": myMoney}, "$set": bson.M{"iscorets": now}})
if err != nil {
logger.Logger.Error("AddInviteScore error ", err)
return err
}
// 更新排行榜
data := model.RankInvite{
Platform: platform,
SnId: psnId,
Num: res.INum,
Score: res.IScore,
Score: res.IScore + num,
Ts: now.Unix(),
})
Week: common.GetWeekStartTs(now.Unix()),
}
_, err = rankC.UpdateOne(sc, newBson.M{"snid": psnId, "week": data.Week}, newBson.M{"$set": data}, options.Update().SetUpsert(true))
if err != nil {
logger.Logger.Tracef("SaveRankInvite error:%v", err)
return err
}
return nil
}

View File

@ -131,3 +131,14 @@ type LogInviteScore struct {
Money int64 // 充值金额
Ts int64 // 时间戳
}
type NInviteScore struct {
UpSnid int32 // 上级代理
DownSnid int32 // 下级代理
Level int32 // 代理层级 例如 1DownSnid 是 UpSnid 的 1 级代理; 2: DownSnid 是 UpSnid 的 2 级代理
Tp int32 // 返佣类型
Rate int64 // 返佣比例
Score int64 // 积分
Money int64 // 充值金额
Ts int64 // 时间戳
}

View File

@ -440,7 +440,6 @@ func CSBindInvite(s *netlib.Session, packetid int, data interface{}, sid int64)
break
}
}
if err == nil {
err = model.BindInviteSnId(p.Platform, p.SnId, inviteSnId, msg.Code)
}