diff --git a/dbproxy/svc/l_itemlog.go b/dbproxy/svc/l_itemlog.go index 3606e7e..ed8eebc 100644 --- a/dbproxy/svc/l_itemlog.go +++ b/dbproxy/svc/l_itemlog.go @@ -30,6 +30,7 @@ func ItemLogsCollection(plt string) *mongo.Collection { c_itemlog.EnsureIndex(mgo.Index{Key: []string{"typeid", "roomconfigid"}, Background: true, Sparse: true}) c_itemlog.EnsureIndex(mgo.Index{Key: []string{"ts"}, Background: true, Sparse: true}) c_itemlog.EnsureIndex(mgo.Index{Key: []string{"-ts"}, Background: true, Sparse: true}) + c_itemlog.EnsureIndex(mgo.Index{Key: []string{"seq"}, Background: true, Sparse: true}) c_itemlog.EnsureIndex(mgo.Index{Key: []string{"gamedif"}, Background: true, Sparse: true}) } return c_itemlog @@ -43,6 +44,7 @@ type ItemLogSvc struct { func (svc *ItemLogSvc) InsertItemLog(log *model.ItemLog, ret *bool) (err error) { clog := ItemLogsCollection(log.Platform) if clog == nil { + logger.Logger.Errorf("ItemLogSvc.InsertItemLog get collection fail platform:%v", log.Platform) return } err = clog.Insert(log) @@ -54,10 +56,46 @@ func (svc *ItemLogSvc) InsertItemLog(log *model.ItemLog, ret *bool) (err error) return } +func (svc *ItemLogSvc) Insert(req *model.InsertItemLogReq, res *bool) error { + if len(req.Logs) == 0 { + return nil + } + + clog := ItemLogsCollection(req.Logs[0].Platform) + if clog == nil { + logger.Logger.Errorf("ItemLogSvc.Insert get collection fail platform:%v", req.Logs[0].Platform) + return nil + } + var docs []interface{} + for _, v := range req.Logs { + docs = append(docs, v) + } + if err := clog.Insert(docs...); err != nil { + logger.Logger.Errorf("ItemLogSvc.Insert error: %v", err) + return err + } + *res = true + return nil +} + +func (svc *ItemLogSvc) UpdateState(req *model.UpdateParam, res *model.UpdateRes) error { + c := ItemLogsCollection(req.Platform) + if c == nil { + logger.Logger.Errorf("ItemLogSvc.UpdateState get collection fail platform:%v", req.Platform) + return nil + } + err := c.UpdateId(req.LogId, bson.M{"$set": bson.M{"status": req.State}}) + if err != nil { + logger.Logger.Errorf("ItemLogSvc.UpdateState error: %v", err) + } + return err +} + // GetItemCount 获取v卡兑换消耗数量 func GetItemCount(platform string, snid, id int32, tp int) (count int64) { c := ItemLogsCollection(platform) if c == nil { + logger.Logger.Errorf("ItemLogSvc.GetItemCount get collection fail platform:%v", platform) return 0 } var err error @@ -73,7 +111,7 @@ func GetItemCount(platform string, snid, id int32, tp int) (count int64) { {"$group": bson.M{"_id": nil, "count": bson.M{"$sum": "$count"}}}, }).AllowDiskUse().One(tc) if err != nil && !errors.Is(err, mgo.ErrNotFound) { - logger.Logger.Warn("GetItemCount swapN error:", err) + logger.Logger.Error("GetItemCount swapN error:", err) return 0 } swapN = tc.Count @@ -85,7 +123,7 @@ func GetItemCount(platform string, snid, id int32, tp int) (count int64) { {"$group": bson.M{"_id": nil, "count": bson.M{"$sum": "$count"}}}, }).AllowDiskUse().One(tc) if err != nil && !errors.Is(err, mgo.ErrNotFound) { - logger.Logger.Warn("GetItemCount costN error:", err) + logger.Logger.Error("GetItemCount costN error:", err) return 0 } costN = tc.Count @@ -107,15 +145,6 @@ func (svc *ItemLogSvc) GetItemCount(req *model.ItemCountParam, count *int64) err return nil } -func (svc *ItemLogSvc) UpdateState(req *model.UpdateParam, res *model.UpdateRes) error { - c := ItemLogsCollection(req.Platform) - if c == nil { - return nil - } - err := c.UpdateId(req.LogId, bson.M{"$set": bson.M{"status": req.State}}) - return err -} - func (svc *ItemLogSvc) GetClawdollItemLog(args *model.ClawdollItemLogReq, ret *model.GetClawdollItemLogRet) (err error) { limitDataNum := 200 @@ -229,38 +258,18 @@ func (svc *ItemLogSvc) GetClawdollSuccessItemLog(args *model.ClawdollSuccessItem func (svc *ItemLogSvc) GetItemLog(req *model.GetItemLogParam, res *model.GetItemLogRes) error { c := ItemLogsCollection(req.Plt) if c == nil { + logger.Logger.Errorf("ItemLogSvc.GetItemLog get collection fail platform:%v", req.Plt) return nil } - err := c.Find(bson.M{"snid": req.SnId, "ts": bson.M{"$gt": req.Ts}}).All(&res.Logs) + err := c.Find(bson.M{"snid": req.SnId, "ts": bson.M{"$gt": req.Ts}}).Sort("ts", "seq").All(&res.Logs) if err != nil && !errors.Is(err, mgo.ErrNotFound) { + logger.Logger.Errorf("ItemLogSvc.GetItemLog error: %v", err) return err } return nil } -func (svc *ItemLogSvc) Insert(req *model.InsertItemLogReq, res *bool) error { - if len(req.Logs) == 0 { - return nil - } - - clog := ItemLogsCollection(req.Logs[0].Platform) - if clog == nil { - logger.Logger.Errorf("ItemLogSvc.Insert collection not found Platform:%v", req.Logs[0].Platform) - return nil - } - var docs []interface{} - for _, v := range req.Logs { - docs = append(docs, v) - } - if err := clog.Insert(docs...); err != nil { - logger.Logger.Warn("ItemLogSvc.Insert error:", err) - return err - } - *res = true - return nil -} - func init() { rpc.Register(new(ItemLogSvc)) } diff --git a/model/baginfo.go b/model/baginfo.go index a44cf26..5df4ec5 100644 --- a/model/baginfo.go +++ b/model/baginfo.go @@ -55,7 +55,7 @@ func UpBagItem(args *BagInfo) error { ret := false err := rpcCli.CallWithTimeout("BagSvc.UpgradeBag", args, &ret, time.Second*30) if err != nil { - return fmt.Errorf("UpgradeBag err:%v SnId:%v BagId:%v", err, args.SnId, args.BagId) + return fmt.Errorf("UpgradeBag err:%v SnId:%v BagId:%v Ts:%v", err, args.SnId, args.BagId, args.Ts) } return nil diff --git a/model/itemdatalog.go b/model/itemdatalog.go index 16415d4..eca24ca 100644 --- a/model/itemdatalog.go +++ b/model/itemdatalog.go @@ -2,16 +2,19 @@ package model import ( "errors" - "github.com/globalsign/mgo/bson" - "mongo.games.com/game/protocol/server" - "mongo.games.com/goserver/core/logger" + "sync/atomic" "time" + + "github.com/globalsign/mgo/bson" + "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/protocol/server" ) var ( ItemLogDBName = "log" ItemLogCollName = "log_itemlog" - ClawDollItemIds = []int32{40003, 40004, 80001, 80002} + ItemSeq = int64(0) ) type ItemLog struct { @@ -24,6 +27,7 @@ type ItemLog struct { Count int64 //个数 CreateTs int64 //记录时间 Ts int64 // 纳秒时间戳 + Seq int64 // 序号 Remark string //备注 TypeId int32 // 变化类型 GameId int64 // 游戏id,游戏中获得时有值 @@ -55,7 +59,7 @@ type ItemParam struct { Cost []*Item // 消耗的道具 LogId string // 撤销的id,兑换失败 RoomConfigId int32 // 房间配置id - Offline int32 // 离线记录 + Offline int32 // 离线记录 1是 0否 } func NewItemLogEx(param ItemParam) *ItemLog { @@ -69,6 +73,7 @@ func NewItemLogEx(param ItemParam) *ItemLog { itemLog.Count = param.Count itemLog.CreateTs = now.Unix() itemLog.Ts = now.UnixNano() + itemLog.Seq = atomic.AddInt64(&ItemSeq, 1) itemLog.Remark = param.Remark itemLog.TypeId = param.TypeId itemLog.GameId = param.GameId diff --git a/worldsrv/bagmgr.go b/worldsrv/bagmgr.go index eb2ad7e..d936ec5 100644 --- a/worldsrv/bagmgr.go +++ b/worldsrv/bagmgr.go @@ -6,7 +6,6 @@ import ( "time" "github.com/globalsign/mgo/bson" - "golang.org/x/exp/maps" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/i18n" "mongo.games.com/goserver/core/logger" @@ -172,8 +171,9 @@ type BagInfo struct { Ts int64 //更新时间戳 // 临时携带参数 - dirty bool `bson:"-"` //是否需要更新数据库 - LogId string `bson:"-"` //最后一次保存的日志id + ItemLogsOffline []*model.ItemLog + dirty bool `bson:"-"` //是否需要更新数据库 + LogId string `bson:"-"` //最后一次保存的日志id } func NewBagInfo(platform string, snid int32) *BagInfo { @@ -205,7 +205,9 @@ var BagMgrSingleton = &BagMgr{ //============================================================================= type LoadData struct { - BagInfo *model.BagInfo + BagInfo *model.BagInfo + ItemLogsOnline []*model.ItemLog + ItemLogsOffline []*model.ItemLog } func (this *BagMgr) Load(platform string, snid int32, player any) *internal.PlayerLoadReplay { @@ -225,12 +227,61 @@ func (this *BagMgr) Load(platform string, snid int32, player any) *internal.Play bagInfo.Ts = time.Now().UnixNano() } + // 根据时间戳对账 + itemLogs, err := model.GetItemLog(platform, snid, bagInfo.Ts) + if err != nil { + logger.Logger.Errorf("Load GetItemLog err: %v", err) + return &internal.PlayerLoadReplay{ + Platform: platform, + Snid: snid, + Err: err, + Data: nil, + } + } + + // 恢复道具 + endTs := time.Now().UnixNano() + var itemLogsOnline, itemLogsOffline []*model.ItemLog + for _, v := range itemLogs { + if v == nil { + continue + } + if v.Ts >= bagInfo.Ts && v.Ts < endTs { + num := v.Count + if v.LogType == 1 { + num = -num + } + if v.Ts > bagInfo.Ts { + bagInfo.Ts = v.Ts + } + if v.Offline == 0 { + // 在线数据恢复 + logger.Logger.Tracef("道具恢复 SnId:%v Item:%+v", snid, *v) + if _, ok := bagInfo.BagItem[v.ItemId]; !ok { + bagInfo.BagItem[v.ItemId] = &model.Item{ + ItemId: v.ItemId, + ItemNum: 0, + ObtainTime: v.CreateTs, + } + } + bagInfo.BagItem[v.ItemId].ItemNum += num + itemLogsOnline = append(itemLogsOnline, v) + } else { + // 离线时的变更 + logger.Logger.Tracef("处理离线道具变化 SnId:%v Item:%v", snid, *v) + itemLogsOffline = append(itemLogsOffline, v) + } + } + } + return &internal.PlayerLoadReplay{ Platform: platform, Snid: snid, Err: err, Data: &LoadData{ - BagInfo: bagInfo, + BagInfo: bagInfo, + ItemLogsOnline: itemLogsOnline, + ItemLogsOffline: itemLogsOffline, }, } } @@ -262,43 +313,25 @@ func (this *BagMgr) Callback(player any, ret *internal.PlayerLoadReplay) { logger.Logger.Error("InitBagInfo err: item is nil. ItemId:", bi.ItemId) } } + + newBagInfo.ItemLogsOffline = data.ItemLogsOffline this.PlayerBag[ret.Snid] = newBagInfo } type LoadAfterData struct { - GameID []int32 - ItemLogs []*model.ItemLog - StarTs, EndTs int64 + GameID []int32 } func (this *BagMgr) LoadAfter(platform string, snid int32) *internal.PlayerLoadReplay { - var err error // 查询最近游戏 gameID := model.GetRecentGame(platform, snid) - // 道具变更记录 - endTs := time.Now().UnixNano() - var itemLogs []*model.ItemLog - itemLogs, err = model.GetItemLog(platform, snid, this.PlayerBag[snid].Ts) - if err != nil { - logger.Logger.Errorf("LoadAfter GetItemLog err: %v", err) - return &internal.PlayerLoadReplay{ - Platform: platform, - Snid: snid, - Err: err, - Data: nil, - } - } - return &internal.PlayerLoadReplay{ Platform: platform, Snid: snid, Err: nil, Data: &LoadAfterData{ - GameID: gameID, - ItemLogs: itemLogs, - StarTs: this.PlayerBag[snid].Ts, - EndTs: endTs, + GameID: gameID, }, } } @@ -330,50 +363,21 @@ func (this *BagMgr) CallbackAfter(ret *internal.PlayerLoadReplay) { // 道具变更记录 bagInfo := this.PlayerBag[p.SnId] if bagInfo != nil { - changeItems := make(map[int32]struct{}) - for _, v := range param.ItemLogs { - if v == nil { - continue - } - if v.Ts > param.StarTs && v.Ts <= param.EndTs { - bagInfo.dirty = true - num := v.Count - if v.LogType == 1 { - num = -num - } - if v.Ts > bagInfo.Ts { - bagInfo.Ts = v.Ts - } - if v.Offline == 0 { - // 在线数据恢复 - logger.Logger.Tracef("道具恢复 SnId:%v Item:%+v", p.SnId, *v) - if _, ok := bagInfo.BagItem[v.ItemId]; !ok { - bagInfo.BagItem[v.ItemId] = &Item{ - ItemId: v.ItemId, - ItemNum: 0, - ObtainTime: v.CreateTs, - } - } - bagInfo.BagItem[v.ItemId].ItemNum += num - changeItems[v.ItemId] = struct{}{} - } else { - // 离线时的变更 - logger.Logger.Tracef("处理离线道具变化 SnId:%v Item:%v", p.SnId, *v) - this.OnChangeFuncs(&model.ChangeItemParam{ - SnId: p.SnId, - ItemId: v.ItemId, - ItemNum: v.Count, - GainWay: v.TypeId, - RoomConfigId: v.RoomConfigId, - GameId: v.GameId, - GameFreeId: v.GameFreeId, - Cost: v.Cost, - }) - } - } + // 离线时的变更 + for _, v := range bagInfo.ItemLogsOffline { + logger.Logger.Tracef("处理离线道具变化 SnId:%v Item:%v", p.SnId, *v) + this.OnChangeFuncs(&model.ChangeItemParam{ + SnId: p.SnId, + ItemId: v.ItemId, + ItemNum: v.Count, + GainWay: v.TypeId, + RoomConfigId: v.RoomConfigId, + GameId: v.GameId, + GameFreeId: v.GameFreeId, + Cost: v.Cost, + }) } - - this.SyncBagData(p.SnId, maps.Keys(changeItems)...) + bagInfo.ItemLogsOffline = nil } } @@ -399,6 +403,9 @@ func (this *BagMgr) Save(platform string, snid int32, isSync, force bool) { newBagInfo.BagItem[v.ItemId] = &model.Item{ItemId: v.ItemId, ItemNum: v.ItemNum, ObtainTime: v.ObtainTime} } err = model.UpBagItem(newBagInfo) + if err != nil { + logger.Logger.Errorf("SaveBagData err: %v", err) + } } cf := func() {