fix 背包数据丢失恢复

This commit is contained in:
sk 2024-12-30 18:17:11 +08:00
parent 7066d7c441
commit 44acf8b5e7
4 changed files with 131 additions and 110 deletions

View File

@ -30,6 +30,7 @@ func ItemLogsCollection(plt string) *mongo.Collection {
c_itemlog.EnsureIndex(mgo.Index{Key: []string{"typeid", "roomconfigid"}, Background: true, Sparse: true})
c_itemlog.EnsureIndex(mgo.Index{Key: []string{"ts"}, Background: true, Sparse: true})
c_itemlog.EnsureIndex(mgo.Index{Key: []string{"-ts"}, Background: true, Sparse: true})
c_itemlog.EnsureIndex(mgo.Index{Key: []string{"seq"}, Background: true, Sparse: true})
c_itemlog.EnsureIndex(mgo.Index{Key: []string{"gamedif"}, Background: true, Sparse: true})
}
return c_itemlog
@ -43,6 +44,7 @@ type ItemLogSvc struct {
func (svc *ItemLogSvc) InsertItemLog(log *model.ItemLog, ret *bool) (err error) {
clog := ItemLogsCollection(log.Platform)
if clog == nil {
logger.Logger.Errorf("ItemLogSvc.InsertItemLog get collection fail platform:%v", log.Platform)
return
}
err = clog.Insert(log)
@ -54,10 +56,46 @@ func (svc *ItemLogSvc) InsertItemLog(log *model.ItemLog, ret *bool) (err error)
return
}
func (svc *ItemLogSvc) Insert(req *model.InsertItemLogReq, res *bool) error {
if len(req.Logs) == 0 {
return nil
}
clog := ItemLogsCollection(req.Logs[0].Platform)
if clog == nil {
logger.Logger.Errorf("ItemLogSvc.Insert get collection fail platform:%v", req.Logs[0].Platform)
return nil
}
var docs []interface{}
for _, v := range req.Logs {
docs = append(docs, v)
}
if err := clog.Insert(docs...); err != nil {
logger.Logger.Errorf("ItemLogSvc.Insert error: %v", err)
return err
}
*res = true
return nil
}
func (svc *ItemLogSvc) UpdateState(req *model.UpdateParam, res *model.UpdateRes) error {
c := ItemLogsCollection(req.Platform)
if c == nil {
logger.Logger.Errorf("ItemLogSvc.UpdateState get collection fail platform:%v", req.Platform)
return nil
}
err := c.UpdateId(req.LogId, bson.M{"$set": bson.M{"status": req.State}})
if err != nil {
logger.Logger.Errorf("ItemLogSvc.UpdateState error: %v", err)
}
return err
}
// GetItemCount 获取v卡兑换消耗数量
func GetItemCount(platform string, snid, id int32, tp int) (count int64) {
c := ItemLogsCollection(platform)
if c == nil {
logger.Logger.Errorf("ItemLogSvc.GetItemCount get collection fail platform:%v", platform)
return 0
}
var err error
@ -73,7 +111,7 @@ func GetItemCount(platform string, snid, id int32, tp int) (count int64) {
{"$group": bson.M{"_id": nil, "count": bson.M{"$sum": "$count"}}},
}).AllowDiskUse().One(tc)
if err != nil && !errors.Is(err, mgo.ErrNotFound) {
logger.Logger.Warn("GetItemCount swapN error:", err)
logger.Logger.Error("GetItemCount swapN error:", err)
return 0
}
swapN = tc.Count
@ -85,7 +123,7 @@ func GetItemCount(platform string, snid, id int32, tp int) (count int64) {
{"$group": bson.M{"_id": nil, "count": bson.M{"$sum": "$count"}}},
}).AllowDiskUse().One(tc)
if err != nil && !errors.Is(err, mgo.ErrNotFound) {
logger.Logger.Warn("GetItemCount costN error:", err)
logger.Logger.Error("GetItemCount costN error:", err)
return 0
}
costN = tc.Count
@ -107,15 +145,6 @@ func (svc *ItemLogSvc) GetItemCount(req *model.ItemCountParam, count *int64) err
return nil
}
func (svc *ItemLogSvc) UpdateState(req *model.UpdateParam, res *model.UpdateRes) error {
c := ItemLogsCollection(req.Platform)
if c == nil {
return nil
}
err := c.UpdateId(req.LogId, bson.M{"$set": bson.M{"status": req.State}})
return err
}
func (svc *ItemLogSvc) GetClawdollItemLog(args *model.ClawdollItemLogReq, ret *model.GetClawdollItemLogRet) (err error) {
limitDataNum := 200
@ -229,38 +258,18 @@ func (svc *ItemLogSvc) GetClawdollSuccessItemLog(args *model.ClawdollSuccessItem
func (svc *ItemLogSvc) GetItemLog(req *model.GetItemLogParam, res *model.GetItemLogRes) error {
c := ItemLogsCollection(req.Plt)
if c == nil {
logger.Logger.Errorf("ItemLogSvc.GetItemLog get collection fail platform:%v", req.Plt)
return nil
}
err := c.Find(bson.M{"snid": req.SnId, "ts": bson.M{"$gt": req.Ts}}).All(&res.Logs)
err := c.Find(bson.M{"snid": req.SnId, "ts": bson.M{"$gt": req.Ts}}).Sort("ts", "seq").All(&res.Logs)
if err != nil && !errors.Is(err, mgo.ErrNotFound) {
logger.Logger.Errorf("ItemLogSvc.GetItemLog error: %v", err)
return err
}
return nil
}
func (svc *ItemLogSvc) Insert(req *model.InsertItemLogReq, res *bool) error {
if len(req.Logs) == 0 {
return nil
}
clog := ItemLogsCollection(req.Logs[0].Platform)
if clog == nil {
logger.Logger.Errorf("ItemLogSvc.Insert collection not found Platform:%v", req.Logs[0].Platform)
return nil
}
var docs []interface{}
for _, v := range req.Logs {
docs = append(docs, v)
}
if err := clog.Insert(docs...); err != nil {
logger.Logger.Warn("ItemLogSvc.Insert error:", err)
return err
}
*res = true
return nil
}
func init() {
rpc.Register(new(ItemLogSvc))
}

View File

@ -55,7 +55,7 @@ func UpBagItem(args *BagInfo) error {
ret := false
err := rpcCli.CallWithTimeout("BagSvc.UpgradeBag", args, &ret, time.Second*30)
if err != nil {
return fmt.Errorf("UpgradeBag err:%v SnId:%v BagId:%v", err, args.SnId, args.BagId)
return fmt.Errorf("UpgradeBag err:%v SnId:%v BagId:%v Ts:%v", err, args.SnId, args.BagId, args.Ts)
}
return nil

View File

@ -2,16 +2,19 @@ package model
import (
"errors"
"github.com/globalsign/mgo/bson"
"mongo.games.com/game/protocol/server"
"mongo.games.com/goserver/core/logger"
"sync/atomic"
"time"
"github.com/globalsign/mgo/bson"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/protocol/server"
)
var (
ItemLogDBName = "log"
ItemLogCollName = "log_itemlog"
ClawDollItemIds = []int32{40003, 40004, 80001, 80002}
ItemSeq = int64(0)
)
type ItemLog struct {
@ -24,6 +27,7 @@ type ItemLog struct {
Count int64 //个数
CreateTs int64 //记录时间
Ts int64 // 纳秒时间戳
Seq int64 // 序号
Remark string //备注
TypeId int32 // 变化类型
GameId int64 // 游戏id,游戏中获得时有值
@ -55,7 +59,7 @@ type ItemParam struct {
Cost []*Item // 消耗的道具
LogId string // 撤销的id兑换失败
RoomConfigId int32 // 房间配置id
Offline int32 // 离线记录
Offline int32 // 离线记录 1是 0否
}
func NewItemLogEx(param ItemParam) *ItemLog {
@ -69,6 +73,7 @@ func NewItemLogEx(param ItemParam) *ItemLog {
itemLog.Count = param.Count
itemLog.CreateTs = now.Unix()
itemLog.Ts = now.UnixNano()
itemLog.Seq = atomic.AddInt64(&ItemSeq, 1)
itemLog.Remark = param.Remark
itemLog.TypeId = param.TypeId
itemLog.GameId = param.GameId

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/globalsign/mgo/bson"
"golang.org/x/exp/maps"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/i18n"
"mongo.games.com/goserver/core/logger"
@ -172,8 +171,9 @@ type BagInfo struct {
Ts int64 //更新时间戳
// 临时携带参数
dirty bool `bson:"-"` //是否需要更新数据库
LogId string `bson:"-"` //最后一次保存的日志id
ItemLogsOffline []*model.ItemLog
dirty bool `bson:"-"` //是否需要更新数据库
LogId string `bson:"-"` //最后一次保存的日志id
}
func NewBagInfo(platform string, snid int32) *BagInfo {
@ -205,7 +205,9 @@ var BagMgrSingleton = &BagMgr{
//=============================================================================
type LoadData struct {
BagInfo *model.BagInfo
BagInfo *model.BagInfo
ItemLogsOnline []*model.ItemLog
ItemLogsOffline []*model.ItemLog
}
func (this *BagMgr) Load(platform string, snid int32, player any) *internal.PlayerLoadReplay {
@ -225,12 +227,61 @@ func (this *BagMgr) Load(platform string, snid int32, player any) *internal.Play
bagInfo.Ts = time.Now().UnixNano()
}
// 根据时间戳对账
itemLogs, err := model.GetItemLog(platform, snid, bagInfo.Ts)
if err != nil {
logger.Logger.Errorf("Load GetItemLog err: %v", err)
return &internal.PlayerLoadReplay{
Platform: platform,
Snid: snid,
Err: err,
Data: nil,
}
}
// 恢复道具
endTs := time.Now().UnixNano()
var itemLogsOnline, itemLogsOffline []*model.ItemLog
for _, v := range itemLogs {
if v == nil {
continue
}
if v.Ts >= bagInfo.Ts && v.Ts < endTs {
num := v.Count
if v.LogType == 1 {
num = -num
}
if v.Ts > bagInfo.Ts {
bagInfo.Ts = v.Ts
}
if v.Offline == 0 {
// 在线数据恢复
logger.Logger.Tracef("道具恢复 SnId:%v Item:%+v", snid, *v)
if _, ok := bagInfo.BagItem[v.ItemId]; !ok {
bagInfo.BagItem[v.ItemId] = &model.Item{
ItemId: v.ItemId,
ItemNum: 0,
ObtainTime: v.CreateTs,
}
}
bagInfo.BagItem[v.ItemId].ItemNum += num
itemLogsOnline = append(itemLogsOnline, v)
} else {
// 离线时的变更
logger.Logger.Tracef("处理离线道具变化 SnId:%v Item:%v", snid, *v)
itemLogsOffline = append(itemLogsOffline, v)
}
}
}
return &internal.PlayerLoadReplay{
Platform: platform,
Snid: snid,
Err: err,
Data: &LoadData{
BagInfo: bagInfo,
BagInfo: bagInfo,
ItemLogsOnline: itemLogsOnline,
ItemLogsOffline: itemLogsOffline,
},
}
}
@ -262,43 +313,25 @@ func (this *BagMgr) Callback(player any, ret *internal.PlayerLoadReplay) {
logger.Logger.Error("InitBagInfo err: item is nil. ItemId:", bi.ItemId)
}
}
newBagInfo.ItemLogsOffline = data.ItemLogsOffline
this.PlayerBag[ret.Snid] = newBagInfo
}
type LoadAfterData struct {
GameID []int32
ItemLogs []*model.ItemLog
StarTs, EndTs int64
GameID []int32
}
func (this *BagMgr) LoadAfter(platform string, snid int32) *internal.PlayerLoadReplay {
var err error
// 查询最近游戏
gameID := model.GetRecentGame(platform, snid)
// 道具变更记录
endTs := time.Now().UnixNano()
var itemLogs []*model.ItemLog
itemLogs, err = model.GetItemLog(platform, snid, this.PlayerBag[snid].Ts)
if err != nil {
logger.Logger.Errorf("LoadAfter GetItemLog err: %v", err)
return &internal.PlayerLoadReplay{
Platform: platform,
Snid: snid,
Err: err,
Data: nil,
}
}
return &internal.PlayerLoadReplay{
Platform: platform,
Snid: snid,
Err: nil,
Data: &LoadAfterData{
GameID: gameID,
ItemLogs: itemLogs,
StarTs: this.PlayerBag[snid].Ts,
EndTs: endTs,
GameID: gameID,
},
}
}
@ -330,50 +363,21 @@ func (this *BagMgr) CallbackAfter(ret *internal.PlayerLoadReplay) {
// 道具变更记录
bagInfo := this.PlayerBag[p.SnId]
if bagInfo != nil {
changeItems := make(map[int32]struct{})
for _, v := range param.ItemLogs {
if v == nil {
continue
}
if v.Ts > param.StarTs && v.Ts <= param.EndTs {
bagInfo.dirty = true
num := v.Count
if v.LogType == 1 {
num = -num
}
if v.Ts > bagInfo.Ts {
bagInfo.Ts = v.Ts
}
if v.Offline == 0 {
// 在线数据恢复
logger.Logger.Tracef("道具恢复 SnId:%v Item:%+v", p.SnId, *v)
if _, ok := bagInfo.BagItem[v.ItemId]; !ok {
bagInfo.BagItem[v.ItemId] = &Item{
ItemId: v.ItemId,
ItemNum: 0,
ObtainTime: v.CreateTs,
}
}
bagInfo.BagItem[v.ItemId].ItemNum += num
changeItems[v.ItemId] = struct{}{}
} else {
// 离线时的变更
logger.Logger.Tracef("处理离线道具变化 SnId:%v Item:%v", p.SnId, *v)
this.OnChangeFuncs(&model.ChangeItemParam{
SnId: p.SnId,
ItemId: v.ItemId,
ItemNum: v.Count,
GainWay: v.TypeId,
RoomConfigId: v.RoomConfigId,
GameId: v.GameId,
GameFreeId: v.GameFreeId,
Cost: v.Cost,
})
}
}
// 离线时的变更
for _, v := range bagInfo.ItemLogsOffline {
logger.Logger.Tracef("处理离线道具变化 SnId:%v Item:%v", p.SnId, *v)
this.OnChangeFuncs(&model.ChangeItemParam{
SnId: p.SnId,
ItemId: v.ItemId,
ItemNum: v.Count,
GainWay: v.TypeId,
RoomConfigId: v.RoomConfigId,
GameId: v.GameId,
GameFreeId: v.GameFreeId,
Cost: v.Cost,
})
}
this.SyncBagData(p.SnId, maps.Keys(changeItems)...)
bagInfo.ItemLogsOffline = nil
}
}
@ -399,6 +403,9 @@ func (this *BagMgr) Save(platform string, snid int32, isSync, force bool) {
newBagInfo.BagItem[v.ItemId] = &model.Item{ItemId: v.ItemId, ItemNum: v.ItemNum, ObtainTime: v.ObtainTime}
}
err = model.UpBagItem(newBagInfo)
if err != nil {
logger.Logger.Errorf("SaveBagData err: %v", err)
}
}
cf := func() {