diff --git a/dbproxy/config.json b/dbproxy/config.json index e97d4bb..f0eccd8 100644 --- a/dbproxy/config.json +++ b/dbproxy/config.json @@ -47,6 +47,9 @@ "data":{ "RootPath":"../data" }, + "mongox": { + "Path": "./etc/mgo.json" + }, "etcd": { "Url": ["127.0.0.1:2379"], "UserName": "", diff --git a/dbproxy/monitormgr.go b/dbproxy/monitormgr.go new file mode 100644 index 0000000..9be8493 --- /dev/null +++ b/dbproxy/monitormgr.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "mongo.games.com/goserver/core/basic" + "mongo.games.com/goserver/core/logger" + "mongo.games.com/goserver/core/module" + "mongo.games.com/goserver/core/mongox" + "mongo.games.com/goserver/core/task" + + "mongo.games.com/game/common" + "mongo.games.com/game/model" +) + +func init() { + //module.RegisteModule(new(MonitorMgr), time.Second*30, 0) +} + +type MonitorData struct { + SrvId int32 //服务器id + SrvType int32 //服务器类型 + Key string //自定义key + Time time.Time //时间戳 + Data interface{} //数据体 +} + +type MonitorMgr struct { +} + +func (m *MonitorMgr) ModuleName() string { + return "MonitorMgr" +} + +func (m *MonitorMgr) Init() { + +} + +func (m *MonitorMgr) Update() { + task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { + var updates []mongo.WriteModel + for k, v := range GetPPCState() { + d := &MonitorData{ + SrvId: int32(common.GetSelfSrvId()), + SrvType: int32(common.GetSelfSrvType()), + Key: k, + Time: time.Now(), + Data: v, + } + updates = append(updates, + mongo.NewUpdateOneModel(). + SetFilter(bson.M{"srvid": d.SrvId, "srvtype": d.SrvType, "key": d.Key}). + SetUpdate(bson.M{"$set": d}).SetUpsert(true)) + } + + if len(updates) == 0 { + return nil + } + + c, err := mongox.GetGlobalMonitorCollection(fmt.Sprintf("%s_rpc", model.MonitorPrefixName)) + if err != nil { + logger.Logger.Errorf("MonitorMgr Update get collection failed") + return nil + } + + opts := options.BulkWrite().SetOrdered(false) // SetOrdered(false) 表示并行执行 + _, err = c.BulkWrite(context.Background(), updates, opts) + if err != nil { + logger.Logger.Errorf("MonitorMgr Update bulk write failed: %v", err) + return nil + } + + return nil + }), nil, "MonitorMgr_Update").StartByFixExecutor("MonitorMgr") +} + +func (m *MonitorMgr) Shutdown() { + module.UnregisteModule(m) +} diff --git a/dbproxy/rpcstate.go b/dbproxy/rpcstate.go new file mode 100644 index 0000000..4f94637 --- /dev/null +++ b/dbproxy/rpcstate.go @@ -0,0 +1,75 @@ +package main + +import ( + "sync" + "time" +) + +const Timeout = 30 * time.Second + +type RPCState struct { + RunTimes int64 //执行次数 + TotalRuningTime int64 //总执行时间 + MaxRuningTime int64 //最长执行时间 + TimeoutTimes int64 //执行超时次数 大于30秒的次数 + FailTimes int64 //执行失败次数 + SuccessTimes int64 //执行成功次数 +} + +var RPCStateMgr = make(map[string]*RPCState) +var RPCStateMgrLock = sync.RWMutex{} + +func GetPPCState() map[string]*RPCState { + ret := make(map[string]*RPCState) + RPCStateMgrLock.RLock() + defer RPCStateMgrLock.RUnlock() + for k, v := range RPCStateMgr { + e := *v // 复制一份 + ret[k] = &e + } + return ret +} + +//func loggingMiddleware(h http.Handler) http.Handler { +// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// RPCStateMgrLock.Lock() +// state, ok := RPCStateMgr[r.URL.Path] +// if !ok { +// state = &RPCState{} +// RPCStateMgr[r.URL.Path] = state +// } +// RPCStateMgrLock.Unlock() +// +// // 记录请求的时间戳 +// start := time.Now() +// +// var buf bytes.Buffer +// if r.Body != nil { +// tee := io.TeeReader(r.Body, &buf) +// r.Body = io.NopCloser(tee) +// } +// +// logger.Logger.Infof("==>RPC %s %s %s", r.Method, r.URL.Path, buf.String()) +// // 包装响应写入器以捕获响应内容 +// rw := &responseWriter{w, http.StatusOK} +// h.ServeHTTP(rw, r) +// // 记录请求完成时间和响应状态码 +// duration := time.Since(start).Milliseconds() +// +// RPCStateMgrLock.Lock() +// state.RunTimes++ +// state.TotalRuningTime += duration +// if duration > state.MaxRuningTime { +// state.MaxRuningTime = duration +// } +// if duration > Timeout.Milliseconds() { +// state.TimeoutTimes++ +// } +// if rw.statusCode != http.StatusOK { +// state.FailTimes++ +// } else { +// state.SuccessTimes++ +// } +// RPCStateMgrLock.Unlock() +// }) +//} diff --git a/dbproxy/svc/m_monitor.go b/dbproxy/svc/m_monitor.go index f3ffef5..1c84975 100644 --- a/dbproxy/svc/m_monitor.go +++ b/dbproxy/svc/m_monitor.go @@ -13,6 +13,7 @@ import ( "mongo.games.com/game/dbproxy/mongo" "mongo.games.com/game/mgrsrv/api" "mongo.games.com/game/model" + rpcx "mongo.games.com/game/rpc" "mongo.games.com/game/webapi" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" @@ -136,6 +137,9 @@ func init() { gob.Register(basic.CmdStats{}) gob.Register(map[string]basic.CmdStats{}) gob.Register(utils.RuntimeStats{}) + gob.Register(rpcx.State{}) + gob.Register(map[string]rpcx.State{}) + //gob registe rpc.Register(new(MonitorDataSvc)) diff --git a/gamesrv/base/monitormgr.go b/gamesrv/base/monitormgr.go deleted file mode 100644 index b2e3e39..0000000 --- a/gamesrv/base/monitormgr.go +++ /dev/null @@ -1,133 +0,0 @@ -package base - -import ( - "encoding/gob" - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/module" - "mongo.games.com/goserver/core/netlib" - "mongo.games.com/goserver/core/profile" - "mongo.games.com/goserver/core/schedule" - "mongo.games.com/goserver/core/task" - "mongo.games.com/goserver/core/transact" - "mongo.games.com/goserver/core/utils" - "time" -) - -var MonitorMgrSington = &MonitorMgr{} - -type MonitorMgr struct { -} - -func (this *MonitorMgr) ModuleName() string { - return "MonitorMgr" -} - -func (this *MonitorMgr) Init() { - -} - -func (this *MonitorMgr) Update() { - //logic stats - logicStats := profile.GetStats() - if len(logicStats) > 0 { - logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats) - if logLogic != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("logic", logLogic) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //net session stats - netStats := netlib.Stats() - if len(netStats) > 0 { - logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats) - if logNet != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("net", logNet) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //schedule stats - jobStats := schedule.Stats() - if len(jobStats) > 0 { - logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats) - if logJob != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("job", logJob) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //trans stats - transStats := transact.Stats() - if len(transStats) > 0 { - logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats) - if logTrans != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("transact", logTrans) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //panic stats - panicStats := utils.GetPanicStats() - if len(panicStats) > 0 { - for key, stats := range panicStats { - logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats) - if logPanic != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("panic", logPanic) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - } - - //object command quene stats - objStats := core.AppCtx.GetStats() - if len(objStats) > 0 { - logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats) - if logCmd != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("cmdque", logCmd) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //gorouting count, eg. system info - runtimeStats := utils.StatsRuntime() - logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats) - if logRuntime != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.InsertMonitorData("runtime", logRuntime) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } -} - -func (this *MonitorMgr) Shutdown() { - module.UnregisteModule(this) -} - -func init() { - //gob registe - gob.Register(profile.TimeElement{}) - gob.Register(map[string]profile.TimeElement{}) - gob.Register(netlib.ServiceStats{}) - gob.Register(map[int]netlib.ServiceStats{}) - gob.Register(schedule.TaskStats{}) - gob.Register(map[string]schedule.TaskStats{}) - gob.Register(transact.TransStats{}) - gob.Register(map[int]transact.TransStats{}) - gob.Register(utils.PanicStackInfo{}) - gob.Register(map[string]utils.PanicStackInfo{}) - gob.Register(basic.CmdStats{}) - gob.Register(map[string]basic.CmdStats{}) - gob.Register(utils.RuntimeStats{}) - //gob registe - - module.RegisteModule(MonitorMgrSington, time.Minute*5, 0) -} diff --git a/gob_register.go b/gob_register.go new file mode 100644 index 0000000..04eabc0 --- /dev/null +++ b/gob_register.go @@ -0,0 +1,47 @@ +package win88 + +import ( + "encoding/gob" + + "mongo.games.com/goserver/core/basic" + "mongo.games.com/goserver/core/netlib" + "mongo.games.com/goserver/core/profile" + "mongo.games.com/goserver/core/schedule" + "mongo.games.com/goserver/core/transact" + "mongo.games.com/goserver/core/utils" + + "mongo.games.com/game/mgrsrv/api" + "mongo.games.com/game/model" + "mongo.games.com/game/rpc" +) + +func init() { + gob.Register(utils.RuntimeStats{}) + + gob.Register(model.PlayerOLStats{}) + gob.Register(map[string]*model.APITransactStats{}) + + gob.Register(rpc.State{}) + gob.Register(map[string]rpc.State{}) + + gob.Register(api.ApiStats{}) + gob.Register(map[string]api.ApiStats{}) + + gob.Register(profile.TimeElement{}) + gob.Register(map[string]profile.TimeElement{}) + + gob.Register(netlib.ServiceStats{}) + gob.Register(map[int]netlib.ServiceStats{}) + + gob.Register(schedule.TaskStats{}) + gob.Register(map[string]schedule.TaskStats{}) + + gob.Register(transact.TransStats{}) + gob.Register(map[int]transact.TransStats{}) + + gob.Register(utils.PanicStackInfo{}) + gob.Register(map[string]utils.PanicStackInfo{}) + + gob.Register(basic.CmdStats{}) + gob.Register(map[string]basic.CmdStats{}) +} diff --git a/mgrsrv/monitormgr.go b/mgrsrv/monitormgr.go deleted file mode 100644 index c24567b..0000000 --- a/mgrsrv/monitormgr.go +++ /dev/null @@ -1,147 +0,0 @@ -package main - -import ( - "encoding/gob" - "mongo.games.com/game/common" - "mongo.games.com/game/mgrsrv/api" - "mongo.games.com/game/model" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/module" - "mongo.games.com/goserver/core/netlib" - "mongo.games.com/goserver/core/profile" - "mongo.games.com/goserver/core/schedule" - "mongo.games.com/goserver/core/task" - "mongo.games.com/goserver/core/transact" - "mongo.games.com/goserver/core/utils" - "time" -) - -var MonitorMgrSington = &MonitorMgr{} - -type MonitorMgr struct { -} - -func (this *MonitorMgr) ModuleName() string { - return "MonitorMgr" -} - -func (this *MonitorMgr) Init() { - -} - -func (this *MonitorMgr) Update() { - //webapi stats - apiStats := api.Stats() - if len(apiStats) > 0 { - log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", apiStats) - if log != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("webapi", log) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //logic stats - logicStats := profile.GetStats() - if len(logicStats) > 0 { - logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats) - if logLogic != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("logic", logLogic) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //net session stats - netStats := netlib.Stats() - if len(netStats) > 0 { - logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats) - if logNet != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("net", logNet) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //schedule stats - jobStats := schedule.Stats() - if len(jobStats) > 0 { - logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats) - if logJob != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("job", logJob) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //trans stats - transStats := transact.Stats() - if len(transStats) > 0 { - logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats) - if logTrans != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("transact", logTrans) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //panic stats - panicStats := utils.GetPanicStats() - if len(panicStats) > 0 { - for key, stats := range panicStats { - logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats) - if logPanic != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("panic", logPanic) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - } - - //object command quene stats - objStats := core.AppCtx.GetStats() - if len(objStats) > 0 { - logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats) - if logCmd != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("cmdque", logCmd) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //gorouting count, eg. system info - runtimeStats := utils.StatsRuntime() - logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats) - if logRuntime != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.InsertMonitorData("runtime", logRuntime) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } -} - -func (this *MonitorMgr) Shutdown() { - module.UnregisteModule(this) -} - -func init() { - //gob registe - gob.Register(api.ApiStats{}) - gob.Register(map[string]api.ApiStats{}) - gob.Register(profile.TimeElement{}) - gob.Register(map[string]profile.TimeElement{}) - gob.Register(netlib.ServiceStats{}) - gob.Register(map[int]netlib.ServiceStats{}) - gob.Register(schedule.TaskStats{}) - gob.Register(map[string]schedule.TaskStats{}) - gob.Register(transact.TransStats{}) - gob.Register(map[int]transact.TransStats{}) - gob.Register(utils.PanicStackInfo{}) - gob.Register(map[string]utils.PanicStackInfo{}) - gob.Register(basic.CmdStats{}) - gob.Register(map[string]basic.CmdStats{}) - gob.Register(utils.RuntimeStats{}) - //gob registe - - module.RegisteModule(MonitorMgrSington, time.Minute*5, 0) -} diff --git a/model/monitor.go b/model/monitor.go index 1085f9c..db1215a 100644 --- a/model/monitor.go +++ b/model/monitor.go @@ -5,7 +5,11 @@ import ( "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" + "mongo.games.com/goserver/core/task" + + "mongo.games.com/game/common" ) var ( @@ -85,6 +89,60 @@ func RemoveMonitorData(t time.Time) (chged []*mgo.ChangeInfo, err error) { return } +type MonitorTool[T any] struct { + CollectionName string +} + +func NewMonitorTool[T any](collectionName string) *MonitorTool[T] { + return &MonitorTool[T]{ + CollectionName: collectionName, + } +} + +func (m *MonitorTool[T]) Insert(key string, state *T) { + d := NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, state) + task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { + return InsertMonitorData(m.CollectionName, d) + }), nil, "InsertMonitorData").StartByFixExecutor("monitor") + +} + +func (m *MonitorTool[T]) InsertKV(kv map[string]*T) { + // todo 批量插入 + for k, v := range kv { + m.Insert(k, v) + } +} + +func (m *MonitorTool[T]) Upsert(key string, state *T) { + d := NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, state) + task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { + return UpsertMonitorData(m.CollectionName, d) + }), nil, "UpsertMonitorData").StartByFixExecutor("monitor") +} + +func (m *MonitorTool[T]) UpsertKV(kv map[string]*T) { + //todo 批量更新 + for k, v := range kv { + m.Upsert(k, v) + } +} + +// Remove 清理监控日志 +func Remove(t time.Time) { + if t.IsZero() { + return + } + + task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { + _, err := RemoveMonitorData(t) + if err != nil { + logger.Logger.Error("RemoveMonitorData error:", err) + } + return nil + }), nil, "RemoveMonitorData").StartByFixExecutor("monitor") +} + type PlayerOLStats struct { PlatformStats map[string]*PlayerStats RobotStats PlayerStats diff --git a/robot/base/monitormgr.go b/robot/base/monitormgr.go deleted file mode 100644 index 6e96738..0000000 --- a/robot/base/monitormgr.go +++ /dev/null @@ -1,115 +0,0 @@ -package base - -import ( - "mongo.games.com/goserver/core/module" -) - -var MonitorMgrSington = &MonitorMgr{} - -type MonitorMgr struct { -} - -func (this *MonitorMgr) ModuleName() string { - return "MonitorMgr" -} - -func (this *MonitorMgr) Init() { - -} - -func (this *MonitorMgr) Update() { - //mongodb stats - //mgo.SetStats(true) - //mgoStats := mgo.GetStats() - //logMgo := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", mgoStats) - //if logMgo != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.InsertMonitorData("mgo", logMgo) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - //} - // - ////logic stats - //logicStats := profile.GetStats() - //if len(logicStats) > 0 { - // logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats) - // if logLogic != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.UpsertMonitorData("logic", logLogic) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - // } - //} - // - ////net session stats - //netStats := netlib.Stats() - //if len(netStats) > 0 { - // logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats) - // if logNet != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.UpsertMonitorData("net", logNet) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - // } - //} - // - ////schedule stats - //jobStats := schedule.Stats() - //if len(jobStats) > 0 { - // logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats) - // if logJob != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.UpsertMonitorData("job", logJob) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - // } - //} - // - ////trans stats - //transStats := transact.Stats() - //if len(transStats) > 0 { - // logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats) - // if logTrans != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.UpsertMonitorData("transact", logTrans) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - // } - //} - // - ////panic stats - //panicStats := utils.GetPanicStats() - //if len(panicStats) > 0 { - // for key, stats := range panicStats { - // logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats) - // if logPanic != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.UpsertMonitorData("panic", logPanic) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - // } - // } - //} - // - ////object command quene stats - //objStats := core.AppCtx.GetStats() - //if len(objStats) > 0 { - // logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats) - // if logCmd != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.UpsertMonitorData("cmdque", logCmd) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - // } - //} - // - ////gorouting count, eg. system info - //runtimeStats := utils.StatsRuntime() - //logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats) - //if logRuntime != nil { - // task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - // return model.InsertMonitorData("runtime", logRuntime) - // }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - //} -} - -func (this *MonitorMgr) Shutdown() { - module.UnregisteModule(this) -} - -func init() { - //module.RegisteModule(MonitorMgrSington, time.Minute*5, 0) -} diff --git a/rpc/rpclient.go b/rpc/rpclient.go index 3b810a2..86ece61 100644 --- a/rpc/rpclient.go +++ b/rpc/rpclient.go @@ -79,6 +79,41 @@ func (this *RPClient) Call(serviceMethod string, args interface{}, reply interfa } func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, reply interface{}, d time.Duration) error { + var err error + start := time.Now() + + defer func() { + // 记录请求完成时间和响应状态码 + duration := time.Since(start).Milliseconds() + StateMgrLock.Lock() + state, ok := StateMgr[serviceMethod] + if !ok { + state = new(State) + StateMgr[serviceMethod] = state + } + // 执行次数 + state.RunTimes++ + // 执行总时长 + state.TotalRuningTime += duration + // 最长执行时长 + if duration > state.MaxRuningTime { + state.MaxRuningTime = duration + } + switch { + case errors.Is(err, ErrRPClientNoConn): + state.NoConnTimes++ + case errors.Is(err, ErrRPClientCallTimeout): + state.TimeoutTimes++ + default: + if err != nil { + state.FailTimes++ + } else { + state.SuccessTimes++ + } + } + StateMgrLock.Unlock() + }() + if this.client == nil { return ErrRPClientNoConn } @@ -87,8 +122,6 @@ func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, re d = time.Second } - start := time.Now() - var err error call := this.client.Go(serviceMethod, args, reply, make(chan *rpc.Call, 1)) select { case <-time.After(d): @@ -96,7 +129,7 @@ func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, re case call = <-call.Done: err = call.Error } - if err != nil && (err == rpc.ErrShutdown || err == io.ErrUnexpectedEOF) { + if err != nil && (errors.Is(err, rpc.ErrShutdown) || errors.Is(err, io.ErrUnexpectedEOF)) { var dailed chan struct{} if atomic.CompareAndSwapInt32(&this.connecting, 0, 1) { dailed = make(chan struct{}) diff --git a/rpc/rpcstate.go b/rpc/rpcstate.go new file mode 100644 index 0000000..f63ef5e --- /dev/null +++ b/rpc/rpcstate.go @@ -0,0 +1,29 @@ +package rpc + +import ( + "sync" +) + +type State struct { + RunTimes int64 //执行次数 + TotalRuningTime int64 //总执行时间 + MaxRuningTime int64 //最长执行时间 + TimeoutTimes int64 //执行超时次数 大于30秒的次数 + FailTimes int64 //执行失败次数 + SuccessTimes int64 //执行成功次数 + NoConnTimes int64 //无连接次数 +} + +var StateMgr = make(map[string]*State) +var StateMgrLock = sync.RWMutex{} + +func GetState() map[string]*State { + ret := make(map[string]*State) + StateMgrLock.RLock() + defer StateMgrLock.RUnlock() + for k, v := range StateMgr { + e := *v // 复制一份 + ret[k] = &e + } + return ret +} diff --git a/worldsrv/main.go b/worldsrv/main.go index 6170ab1..366b8e2 100644 --- a/worldsrv/main.go +++ b/worldsrv/main.go @@ -54,5 +54,6 @@ func main() { schedule.StartTask() //启动业务模块 waiter := module.Start() + StartMonitor() waiter.Wait("main()") } diff --git a/worldsrv/monitor.go b/worldsrv/monitor.go new file mode 100644 index 0000000..ef25742 --- /dev/null +++ b/worldsrv/monitor.go @@ -0,0 +1,51 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "time" + + "mongo.games.com/goserver/core/logger" + "mongo.games.com/goserver/core/timer" + + "mongo.games.com/game/model" + "mongo.games.com/game/rpc" +) + +var ( + monitorRpc = model.NewMonitorTool[rpc.State]("rpc") +) + +func StartMonitor() { + timer.StartTimer(timer.TimerActionWrapper(func(h timer.TimerHandle, ud interface{}) bool { + //monitorRpc.InsertKV(rpc.GetState()) + + //保存到本地文件 + go func() { + err := saveToFile("rpc.json", rpc.GetState()) + if err != nil { + logger.Logger.Errorf("rpc.json failed to save to file: %v", err) + } + }() + + return true + }), nil, time.Second*10, -1) +} + +// 保存变量到文件(覆盖方式) +func saveToFile(filename string, data interface{}) error { + // 将数据转换为JSON字节 + content, err := json.MarshalIndent(data, "", " ") + if err != nil { + return fmt.Errorf("failed to serialize data: %w", err) + } + + // 写入文件,覆盖模式 + err = os.WriteFile(filename, content, 0644) + if err != nil { + return fmt.Errorf("failed to write to file: %w", err) + } + + return nil +} diff --git a/worldsrv/monitormgr.go b/worldsrv/monitormgr.go deleted file mode 100644 index aa5e845..0000000 --- a/worldsrv/monitormgr.go +++ /dev/null @@ -1,169 +0,0 @@ -package main - -import ( - "encoding/gob" - "time" - - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/module" - "mongo.games.com/goserver/core/netlib" - "mongo.games.com/goserver/core/profile" - "mongo.games.com/goserver/core/schedule" - "mongo.games.com/goserver/core/task" - "mongo.games.com/goserver/core/transact" - "mongo.games.com/goserver/core/utils" - - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/webapi" -) - -var MonitorMgrSington = &MonitorMgr{} - -type MonitorMgr struct { -} - -func (this *MonitorMgr) ModuleName() string { - return "MonitorMgr" -} - -func (this *MonitorMgr) Init() { - -} - -func (this *MonitorMgr) Update() { - //player online stats - olStats := PlayerMgrSington.StatsOnline() - log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", olStats) - if log != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.InsertMonitorData("online", log) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - - //api stats - if len(WebAPIStats) > 0 { - log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", WebAPIStats) - if log != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("webapi", log) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - apiStats := webapi.Stats() - if len(apiStats) > 0 { - log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "api", apiStats) - if log != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("webapi", log) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //logic stats - logicStats := profile.GetStats() - if len(logicStats) > 0 { - logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats) - if logLogic != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("logic", logLogic) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //net session stats - netStats := netlib.Stats() - if len(netStats) > 0 { - logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats) - if logNet != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("net", logNet) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //schedule stats - jobStats := schedule.Stats() - if len(jobStats) > 0 { - logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats) - if logJob != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("job", logJob) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //trans stats - transStats := transact.Stats() - if len(transStats) > 0 { - logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats) - if logTrans != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("transact", logTrans) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //panic stats - panicStats := utils.GetPanicStats() - if len(panicStats) > 0 { - for key, stats := range panicStats { - logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats) - if logPanic != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("panic", logPanic) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - } - - //object command quene stats - objStats := core.AppCtx.GetStats() - if len(objStats) > 0 { - logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats) - if logCmd != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMonitorData("cmdque", logCmd) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } - } - - //gorouting count, eg. system info - runtimeStats := utils.StatsRuntime() - logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats) - if logRuntime != nil { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.InsertMonitorData("runtime", logRuntime) - }), nil, "InsertMonitorData").StartByFixExecutor("monitor") - } -} - -func (this *MonitorMgr) Shutdown() { - module.UnregisteModule(this) -} - -func init() { - - //gob registe - gob.Register(model.PlayerOLStats{}) - gob.Register(map[string]*model.APITransactStats{}) - gob.Register(webapi.ApiStats{}) - gob.Register(map[string]webapi.ApiStats{}) - gob.Register(profile.TimeElement{}) - gob.Register(map[string]profile.TimeElement{}) - gob.Register(netlib.ServiceStats{}) - gob.Register(map[int]netlib.ServiceStats{}) - gob.Register(schedule.TaskStats{}) - gob.Register(map[string]schedule.TaskStats{}) - gob.Register(transact.TransStats{}) - gob.Register(map[int]transact.TransStats{}) - gob.Register(utils.PanicStackInfo{}) - gob.Register(map[string]utils.PanicStackInfo{}) - gob.Register(basic.CmdStats{}) - gob.Register(map[string]basic.CmdStats{}) - gob.Register(utils.RuntimeStats{}) - //gob registe - - module.RegisteModule(MonitorMgrSington, time.Minute*5, 0) -}