game_sync/dbproxy/monitormgr.go

86 lines
2.0 KiB
Go

package main
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/module"
"mongo.games.com/goserver/core/mongox"
"mongo.games.com/goserver/core/task"
"mongo.games.com/game/common"
"mongo.games.com/game/model"
)
func init() {
//module.RegisteModule(new(MonitorMgr), time.Second*30, 0)
}
type MonitorData struct {
SrvId int32 //服务器id
SrvType int32 //服务器类型
Key string //自定义key
Time time.Time //时间戳
Data interface{} //数据体
}
type MonitorMgr struct {
}
func (m *MonitorMgr) ModuleName() string {
return "MonitorMgr"
}
func (m *MonitorMgr) Init() {
}
func (m *MonitorMgr) Update() {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
var updates []mongo.WriteModel
for k, v := range GetPPCState() {
d := &MonitorData{
SrvId: int32(common.GetSelfSrvId()),
SrvType: int32(common.GetSelfSrvType()),
Key: k,
Time: time.Now(),
Data: v,
}
updates = append(updates,
mongo.NewUpdateOneModel().
SetFilter(bson.M{"srvid": d.SrvId, "srvtype": d.SrvType, "key": d.Key}).
SetUpdate(bson.M{"$set": d}).SetUpsert(true))
}
if len(updates) == 0 {
return nil
}
c, err := mongox.GetGlobalMonitorCollection(fmt.Sprintf("%s_rpc", model.MonitorPrefixName))
if err != nil {
logger.Logger.Errorf("MonitorMgr Update get collection failed")
return nil
}
opts := options.BulkWrite().SetOrdered(false) // SetOrdered(false) 表示并行执行
_, err = c.BulkWrite(context.Background(), updates, opts)
if err != nil {
logger.Logger.Errorf("MonitorMgr Update bulk write failed: %v", err)
return nil
}
return nil
}), nil, "MonitorMgr_Update").StartByFixExecutor("MonitorMgr")
}
func (m *MonitorMgr) Shutdown() {
module.UnregisteModule(m)
}