add rpc监控
This commit is contained in:
parent
d8bee4bd73
commit
c8df660c82
|
@ -47,6 +47,9 @@
|
|||
"data":{
|
||||
"RootPath":"../data"
|
||||
},
|
||||
"mongox": {
|
||||
"Path": "./etc/mgo.json"
|
||||
},
|
||||
"etcd": {
|
||||
"Url": ["127.0.0.1:2379"],
|
||||
"UserName": "",
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/logger"
|
||||
"mongo.games.com/goserver/core/module"
|
||||
"mongo.games.com/goserver/core/mongox"
|
||||
"mongo.games.com/goserver/core/task"
|
||||
|
||||
"mongo.games.com/game/common"
|
||||
"mongo.games.com/game/model"
|
||||
)
|
||||
|
||||
func init() {
|
||||
//module.RegisteModule(new(MonitorMgr), time.Second*30, 0)
|
||||
}
|
||||
|
||||
type MonitorData struct {
|
||||
SrvId int32 //服务器id
|
||||
SrvType int32 //服务器类型
|
||||
Key string //自定义key
|
||||
Time time.Time //时间戳
|
||||
Data interface{} //数据体
|
||||
}
|
||||
|
||||
type MonitorMgr struct {
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) ModuleName() string {
|
||||
return "MonitorMgr"
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) Init() {
|
||||
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) Update() {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
var updates []mongo.WriteModel
|
||||
for k, v := range GetPPCState() {
|
||||
d := &MonitorData{
|
||||
SrvId: int32(common.GetSelfSrvId()),
|
||||
SrvType: int32(common.GetSelfSrvType()),
|
||||
Key: k,
|
||||
Time: time.Now(),
|
||||
Data: v,
|
||||
}
|
||||
updates = append(updates,
|
||||
mongo.NewUpdateOneModel().
|
||||
SetFilter(bson.M{"srvid": d.SrvId, "srvtype": d.SrvType, "key": d.Key}).
|
||||
SetUpdate(bson.M{"$set": d}).SetUpsert(true))
|
||||
}
|
||||
|
||||
if len(updates) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
c, err := mongox.GetGlobalMonitorCollection(fmt.Sprintf("%s_rpc", model.MonitorPrefixName))
|
||||
if err != nil {
|
||||
logger.Logger.Errorf("MonitorMgr Update get collection failed")
|
||||
return nil
|
||||
}
|
||||
|
||||
opts := options.BulkWrite().SetOrdered(false) // SetOrdered(false) 表示并行执行
|
||||
_, err = c.BulkWrite(context.Background(), updates, opts)
|
||||
if err != nil {
|
||||
logger.Logger.Errorf("MonitorMgr Update bulk write failed: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}), nil, "MonitorMgr_Update").StartByFixExecutor("MonitorMgr")
|
||||
}
|
||||
|
||||
func (m *MonitorMgr) Shutdown() {
|
||||
module.UnregisteModule(m)
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const Timeout = 30 * time.Second
|
||||
|
||||
type RPCState struct {
|
||||
RunTimes int64 //执行次数
|
||||
TotalRuningTime int64 //总执行时间
|
||||
MaxRuningTime int64 //最长执行时间
|
||||
TimeoutTimes int64 //执行超时次数 大于30秒的次数
|
||||
FailTimes int64 //执行失败次数
|
||||
SuccessTimes int64 //执行成功次数
|
||||
}
|
||||
|
||||
var RPCStateMgr = make(map[string]*RPCState)
|
||||
var RPCStateMgrLock = sync.RWMutex{}
|
||||
|
||||
func GetPPCState() map[string]*RPCState {
|
||||
ret := make(map[string]*RPCState)
|
||||
RPCStateMgrLock.RLock()
|
||||
defer RPCStateMgrLock.RUnlock()
|
||||
for k, v := range RPCStateMgr {
|
||||
e := *v // 复制一份
|
||||
ret[k] = &e
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
//func loggingMiddleware(h http.Handler) http.Handler {
|
||||
// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// RPCStateMgrLock.Lock()
|
||||
// state, ok := RPCStateMgr[r.URL.Path]
|
||||
// if !ok {
|
||||
// state = &RPCState{}
|
||||
// RPCStateMgr[r.URL.Path] = state
|
||||
// }
|
||||
// RPCStateMgrLock.Unlock()
|
||||
//
|
||||
// // 记录请求的时间戳
|
||||
// start := time.Now()
|
||||
//
|
||||
// var buf bytes.Buffer
|
||||
// if r.Body != nil {
|
||||
// tee := io.TeeReader(r.Body, &buf)
|
||||
// r.Body = io.NopCloser(tee)
|
||||
// }
|
||||
//
|
||||
// logger.Logger.Infof("==>RPC %s %s %s", r.Method, r.URL.Path, buf.String())
|
||||
// // 包装响应写入器以捕获响应内容
|
||||
// rw := &responseWriter{w, http.StatusOK}
|
||||
// h.ServeHTTP(rw, r)
|
||||
// // 记录请求完成时间和响应状态码
|
||||
// duration := time.Since(start).Milliseconds()
|
||||
//
|
||||
// RPCStateMgrLock.Lock()
|
||||
// state.RunTimes++
|
||||
// state.TotalRuningTime += duration
|
||||
// if duration > state.MaxRuningTime {
|
||||
// state.MaxRuningTime = duration
|
||||
// }
|
||||
// if duration > Timeout.Milliseconds() {
|
||||
// state.TimeoutTimes++
|
||||
// }
|
||||
// if rw.statusCode != http.StatusOK {
|
||||
// state.FailTimes++
|
||||
// } else {
|
||||
// state.SuccessTimes++
|
||||
// }
|
||||
// RPCStateMgrLock.Unlock()
|
||||
// })
|
||||
//}
|
|
@ -13,6 +13,7 @@ import (
|
|||
"mongo.games.com/game/dbproxy/mongo"
|
||||
"mongo.games.com/game/mgrsrv/api"
|
||||
"mongo.games.com/game/model"
|
||||
rpcx "mongo.games.com/game/rpc"
|
||||
"mongo.games.com/game/webapi"
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/logger"
|
||||
|
@ -136,6 +137,9 @@ func init() {
|
|||
gob.Register(basic.CmdStats{})
|
||||
gob.Register(map[string]basic.CmdStats{})
|
||||
gob.Register(utils.RuntimeStats{})
|
||||
gob.Register(rpcx.State{})
|
||||
gob.Register(map[string]rpcx.State{})
|
||||
|
||||
//gob registe
|
||||
|
||||
rpc.Register(new(MonitorDataSvc))
|
||||
|
|
|
@ -1,133 +0,0 @@
|
|||
package base
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"mongo.games.com/game/common"
|
||||
"mongo.games.com/game/model"
|
||||
"mongo.games.com/goserver/core"
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/module"
|
||||
"mongo.games.com/goserver/core/netlib"
|
||||
"mongo.games.com/goserver/core/profile"
|
||||
"mongo.games.com/goserver/core/schedule"
|
||||
"mongo.games.com/goserver/core/task"
|
||||
"mongo.games.com/goserver/core/transact"
|
||||
"mongo.games.com/goserver/core/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
var MonitorMgrSington = &MonitorMgr{}
|
||||
|
||||
type MonitorMgr struct {
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) ModuleName() string {
|
||||
return "MonitorMgr"
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Init() {
|
||||
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Update() {
|
||||
//logic stats
|
||||
logicStats := profile.GetStats()
|
||||
if len(logicStats) > 0 {
|
||||
logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
|
||||
if logLogic != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("logic", logLogic)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//net session stats
|
||||
netStats := netlib.Stats()
|
||||
if len(netStats) > 0 {
|
||||
logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
|
||||
if logNet != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("net", logNet)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//schedule stats
|
||||
jobStats := schedule.Stats()
|
||||
if len(jobStats) > 0 {
|
||||
logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
|
||||
if logJob != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("job", logJob)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//trans stats
|
||||
transStats := transact.Stats()
|
||||
if len(transStats) > 0 {
|
||||
logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
|
||||
if logTrans != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("transact", logTrans)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//panic stats
|
||||
panicStats := utils.GetPanicStats()
|
||||
if len(panicStats) > 0 {
|
||||
for key, stats := range panicStats {
|
||||
logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
|
||||
if logPanic != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("panic", logPanic)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//object command quene stats
|
||||
objStats := core.AppCtx.GetStats()
|
||||
if len(objStats) > 0 {
|
||||
logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
|
||||
if logCmd != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("cmdque", logCmd)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//gorouting count, eg. system info
|
||||
runtimeStats := utils.StatsRuntime()
|
||||
logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
|
||||
if logRuntime != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.InsertMonitorData("runtime", logRuntime)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Shutdown() {
|
||||
module.UnregisteModule(this)
|
||||
}
|
||||
|
||||
func init() {
|
||||
//gob registe
|
||||
gob.Register(profile.TimeElement{})
|
||||
gob.Register(map[string]profile.TimeElement{})
|
||||
gob.Register(netlib.ServiceStats{})
|
||||
gob.Register(map[int]netlib.ServiceStats{})
|
||||
gob.Register(schedule.TaskStats{})
|
||||
gob.Register(map[string]schedule.TaskStats{})
|
||||
gob.Register(transact.TransStats{})
|
||||
gob.Register(map[int]transact.TransStats{})
|
||||
gob.Register(utils.PanicStackInfo{})
|
||||
gob.Register(map[string]utils.PanicStackInfo{})
|
||||
gob.Register(basic.CmdStats{})
|
||||
gob.Register(map[string]basic.CmdStats{})
|
||||
gob.Register(utils.RuntimeStats{})
|
||||
//gob registe
|
||||
|
||||
module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package win88
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/netlib"
|
||||
"mongo.games.com/goserver/core/profile"
|
||||
"mongo.games.com/goserver/core/schedule"
|
||||
"mongo.games.com/goserver/core/transact"
|
||||
"mongo.games.com/goserver/core/utils"
|
||||
|
||||
"mongo.games.com/game/mgrsrv/api"
|
||||
"mongo.games.com/game/model"
|
||||
"mongo.games.com/game/rpc"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gob.Register(utils.RuntimeStats{})
|
||||
|
||||
gob.Register(model.PlayerOLStats{})
|
||||
gob.Register(map[string]*model.APITransactStats{})
|
||||
|
||||
gob.Register(rpc.State{})
|
||||
gob.Register(map[string]rpc.State{})
|
||||
|
||||
gob.Register(api.ApiStats{})
|
||||
gob.Register(map[string]api.ApiStats{})
|
||||
|
||||
gob.Register(profile.TimeElement{})
|
||||
gob.Register(map[string]profile.TimeElement{})
|
||||
|
||||
gob.Register(netlib.ServiceStats{})
|
||||
gob.Register(map[int]netlib.ServiceStats{})
|
||||
|
||||
gob.Register(schedule.TaskStats{})
|
||||
gob.Register(map[string]schedule.TaskStats{})
|
||||
|
||||
gob.Register(transact.TransStats{})
|
||||
gob.Register(map[int]transact.TransStats{})
|
||||
|
||||
gob.Register(utils.PanicStackInfo{})
|
||||
gob.Register(map[string]utils.PanicStackInfo{})
|
||||
|
||||
gob.Register(basic.CmdStats{})
|
||||
gob.Register(map[string]basic.CmdStats{})
|
||||
}
|
|
@ -1,147 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"mongo.games.com/game/common"
|
||||
"mongo.games.com/game/mgrsrv/api"
|
||||
"mongo.games.com/game/model"
|
||||
"mongo.games.com/goserver/core"
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/module"
|
||||
"mongo.games.com/goserver/core/netlib"
|
||||
"mongo.games.com/goserver/core/profile"
|
||||
"mongo.games.com/goserver/core/schedule"
|
||||
"mongo.games.com/goserver/core/task"
|
||||
"mongo.games.com/goserver/core/transact"
|
||||
"mongo.games.com/goserver/core/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
var MonitorMgrSington = &MonitorMgr{}
|
||||
|
||||
type MonitorMgr struct {
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) ModuleName() string {
|
||||
return "MonitorMgr"
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Init() {
|
||||
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Update() {
|
||||
//webapi stats
|
||||
apiStats := api.Stats()
|
||||
if len(apiStats) > 0 {
|
||||
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", apiStats)
|
||||
if log != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("webapi", log)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//logic stats
|
||||
logicStats := profile.GetStats()
|
||||
if len(logicStats) > 0 {
|
||||
logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
|
||||
if logLogic != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("logic", logLogic)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//net session stats
|
||||
netStats := netlib.Stats()
|
||||
if len(netStats) > 0 {
|
||||
logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
|
||||
if logNet != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("net", logNet)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//schedule stats
|
||||
jobStats := schedule.Stats()
|
||||
if len(jobStats) > 0 {
|
||||
logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
|
||||
if logJob != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("job", logJob)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//trans stats
|
||||
transStats := transact.Stats()
|
||||
if len(transStats) > 0 {
|
||||
logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
|
||||
if logTrans != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("transact", logTrans)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//panic stats
|
||||
panicStats := utils.GetPanicStats()
|
||||
if len(panicStats) > 0 {
|
||||
for key, stats := range panicStats {
|
||||
logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
|
||||
if logPanic != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("panic", logPanic)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//object command quene stats
|
||||
objStats := core.AppCtx.GetStats()
|
||||
if len(objStats) > 0 {
|
||||
logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
|
||||
if logCmd != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("cmdque", logCmd)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//gorouting count, eg. system info
|
||||
runtimeStats := utils.StatsRuntime()
|
||||
logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
|
||||
if logRuntime != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.InsertMonitorData("runtime", logRuntime)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Shutdown() {
|
||||
module.UnregisteModule(this)
|
||||
}
|
||||
|
||||
func init() {
|
||||
//gob registe
|
||||
gob.Register(api.ApiStats{})
|
||||
gob.Register(map[string]api.ApiStats{})
|
||||
gob.Register(profile.TimeElement{})
|
||||
gob.Register(map[string]profile.TimeElement{})
|
||||
gob.Register(netlib.ServiceStats{})
|
||||
gob.Register(map[int]netlib.ServiceStats{})
|
||||
gob.Register(schedule.TaskStats{})
|
||||
gob.Register(map[string]schedule.TaskStats{})
|
||||
gob.Register(transact.TransStats{})
|
||||
gob.Register(map[int]transact.TransStats{})
|
||||
gob.Register(utils.PanicStackInfo{})
|
||||
gob.Register(map[string]utils.PanicStackInfo{})
|
||||
gob.Register(basic.CmdStats{})
|
||||
gob.Register(map[string]basic.CmdStats{})
|
||||
gob.Register(utils.RuntimeStats{})
|
||||
//gob registe
|
||||
|
||||
module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
|
||||
}
|
|
@ -5,7 +5,11 @@ import (
|
|||
|
||||
"github.com/globalsign/mgo"
|
||||
"github.com/globalsign/mgo/bson"
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/logger"
|
||||
"mongo.games.com/goserver/core/task"
|
||||
|
||||
"mongo.games.com/game/common"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -85,6 +89,60 @@ func RemoveMonitorData(t time.Time) (chged []*mgo.ChangeInfo, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
type MonitorTool[T any] struct {
|
||||
CollectionName string
|
||||
}
|
||||
|
||||
func NewMonitorTool[T any](collectionName string) *MonitorTool[T] {
|
||||
return &MonitorTool[T]{
|
||||
CollectionName: collectionName,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MonitorTool[T]) Insert(key string, state *T) {
|
||||
d := NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, state)
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return InsertMonitorData(m.CollectionName, d)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
|
||||
}
|
||||
|
||||
func (m *MonitorTool[T]) InsertKV(kv map[string]*T) {
|
||||
// todo 批量插入
|
||||
for k, v := range kv {
|
||||
m.Insert(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MonitorTool[T]) Upsert(key string, state *T) {
|
||||
d := NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, state)
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return UpsertMonitorData(m.CollectionName, d)
|
||||
}), nil, "UpsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
|
||||
func (m *MonitorTool[T]) UpsertKV(kv map[string]*T) {
|
||||
//todo 批量更新
|
||||
for k, v := range kv {
|
||||
m.Upsert(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove 清理监控日志
|
||||
func Remove(t time.Time) {
|
||||
if t.IsZero() {
|
||||
return
|
||||
}
|
||||
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
_, err := RemoveMonitorData(t)
|
||||
if err != nil {
|
||||
logger.Logger.Error("RemoveMonitorData error:", err)
|
||||
}
|
||||
return nil
|
||||
}), nil, "RemoveMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
|
||||
type PlayerOLStats struct {
|
||||
PlatformStats map[string]*PlayerStats
|
||||
RobotStats PlayerStats
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
package base
|
||||
|
||||
import (
|
||||
"mongo.games.com/goserver/core/module"
|
||||
)
|
||||
|
||||
var MonitorMgrSington = &MonitorMgr{}
|
||||
|
||||
type MonitorMgr struct {
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) ModuleName() string {
|
||||
return "MonitorMgr"
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Init() {
|
||||
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Update() {
|
||||
//mongodb stats
|
||||
//mgo.SetStats(true)
|
||||
//mgoStats := mgo.GetStats()
|
||||
//logMgo := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", mgoStats)
|
||||
//if logMgo != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.InsertMonitorData("mgo", logMgo)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
//}
|
||||
//
|
||||
////logic stats
|
||||
//logicStats := profile.GetStats()
|
||||
//if len(logicStats) > 0 {
|
||||
// logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
|
||||
// if logLogic != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.UpsertMonitorData("logic", logLogic)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
// }
|
||||
//}
|
||||
//
|
||||
////net session stats
|
||||
//netStats := netlib.Stats()
|
||||
//if len(netStats) > 0 {
|
||||
// logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
|
||||
// if logNet != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.UpsertMonitorData("net", logNet)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
// }
|
||||
//}
|
||||
//
|
||||
////schedule stats
|
||||
//jobStats := schedule.Stats()
|
||||
//if len(jobStats) > 0 {
|
||||
// logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
|
||||
// if logJob != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.UpsertMonitorData("job", logJob)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
// }
|
||||
//}
|
||||
//
|
||||
////trans stats
|
||||
//transStats := transact.Stats()
|
||||
//if len(transStats) > 0 {
|
||||
// logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
|
||||
// if logTrans != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.UpsertMonitorData("transact", logTrans)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
// }
|
||||
//}
|
||||
//
|
||||
////panic stats
|
||||
//panicStats := utils.GetPanicStats()
|
||||
//if len(panicStats) > 0 {
|
||||
// for key, stats := range panicStats {
|
||||
// logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
|
||||
// if logPanic != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.UpsertMonitorData("panic", logPanic)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//
|
||||
////object command quene stats
|
||||
//objStats := core.AppCtx.GetStats()
|
||||
//if len(objStats) > 0 {
|
||||
// logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
|
||||
// if logCmd != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.UpsertMonitorData("cmdque", logCmd)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
// }
|
||||
//}
|
||||
//
|
||||
////gorouting count, eg. system info
|
||||
//runtimeStats := utils.StatsRuntime()
|
||||
//logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
|
||||
//if logRuntime != nil {
|
||||
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
// return model.InsertMonitorData("runtime", logRuntime)
|
||||
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
//}
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Shutdown() {
|
||||
module.UnregisteModule(this)
|
||||
}
|
||||
|
||||
func init() {
|
||||
//module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
|
||||
}
|
|
@ -79,6 +79,41 @@ func (this *RPClient) Call(serviceMethod string, args interface{}, reply interfa
|
|||
}
|
||||
|
||||
func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, reply interface{}, d time.Duration) error {
|
||||
var err error
|
||||
start := time.Now()
|
||||
|
||||
defer func() {
|
||||
// 记录请求完成时间和响应状态码
|
||||
duration := time.Since(start).Milliseconds()
|
||||
StateMgrLock.Lock()
|
||||
state, ok := StateMgr[serviceMethod]
|
||||
if !ok {
|
||||
state = new(State)
|
||||
StateMgr[serviceMethod] = state
|
||||
}
|
||||
// 执行次数
|
||||
state.RunTimes++
|
||||
// 执行总时长
|
||||
state.TotalRuningTime += duration
|
||||
// 最长执行时长
|
||||
if duration > state.MaxRuningTime {
|
||||
state.MaxRuningTime = duration
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, ErrRPClientNoConn):
|
||||
state.NoConnTimes++
|
||||
case errors.Is(err, ErrRPClientCallTimeout):
|
||||
state.TimeoutTimes++
|
||||
default:
|
||||
if err != nil {
|
||||
state.FailTimes++
|
||||
} else {
|
||||
state.SuccessTimes++
|
||||
}
|
||||
}
|
||||
StateMgrLock.Unlock()
|
||||
}()
|
||||
|
||||
if this.client == nil {
|
||||
return ErrRPClientNoConn
|
||||
}
|
||||
|
@ -87,8 +122,6 @@ func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, re
|
|||
d = time.Second
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
var err error
|
||||
call := this.client.Go(serviceMethod, args, reply, make(chan *rpc.Call, 1))
|
||||
select {
|
||||
case <-time.After(d):
|
||||
|
@ -96,7 +129,7 @@ func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, re
|
|||
case call = <-call.Done:
|
||||
err = call.Error
|
||||
}
|
||||
if err != nil && (err == rpc.ErrShutdown || err == io.ErrUnexpectedEOF) {
|
||||
if err != nil && (errors.Is(err, rpc.ErrShutdown) || errors.Is(err, io.ErrUnexpectedEOF)) {
|
||||
var dailed chan struct{}
|
||||
if atomic.CompareAndSwapInt32(&this.connecting, 0, 1) {
|
||||
dailed = make(chan struct{})
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
package rpc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type State struct {
|
||||
RunTimes int64 //执行次数
|
||||
TotalRuningTime int64 //总执行时间
|
||||
MaxRuningTime int64 //最长执行时间
|
||||
TimeoutTimes int64 //执行超时次数 大于30秒的次数
|
||||
FailTimes int64 //执行失败次数
|
||||
SuccessTimes int64 //执行成功次数
|
||||
NoConnTimes int64 //无连接次数
|
||||
}
|
||||
|
||||
var StateMgr = make(map[string]*State)
|
||||
var StateMgrLock = sync.RWMutex{}
|
||||
|
||||
func GetState() map[string]*State {
|
||||
ret := make(map[string]*State)
|
||||
StateMgrLock.RLock()
|
||||
defer StateMgrLock.RUnlock()
|
||||
for k, v := range StateMgr {
|
||||
e := *v // 复制一份
|
||||
ret[k] = &e
|
||||
}
|
||||
return ret
|
||||
}
|
|
@ -54,5 +54,6 @@ func main() {
|
|||
schedule.StartTask()
|
||||
//启动业务模块
|
||||
waiter := module.Start()
|
||||
StartMonitor()
|
||||
waiter.Wait("main()")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"mongo.games.com/goserver/core/logger"
|
||||
"mongo.games.com/goserver/core/timer"
|
||||
|
||||
"mongo.games.com/game/model"
|
||||
"mongo.games.com/game/rpc"
|
||||
)
|
||||
|
||||
var (
|
||||
monitorRpc = model.NewMonitorTool[rpc.State]("rpc")
|
||||
)
|
||||
|
||||
func StartMonitor() {
|
||||
timer.StartTimer(timer.TimerActionWrapper(func(h timer.TimerHandle, ud interface{}) bool {
|
||||
//monitorRpc.InsertKV(rpc.GetState())
|
||||
|
||||
//保存到本地文件
|
||||
go func() {
|
||||
err := saveToFile("rpc.json", rpc.GetState())
|
||||
if err != nil {
|
||||
logger.Logger.Errorf("rpc.json failed to save to file: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
}), nil, time.Second*10, -1)
|
||||
}
|
||||
|
||||
// 保存变量到文件(覆盖方式)
|
||||
func saveToFile(filename string, data interface{}) error {
|
||||
// 将数据转换为JSON字节
|
||||
content, err := json.MarshalIndent(data, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to serialize data: %w", err)
|
||||
}
|
||||
|
||||
// 写入文件,覆盖模式
|
||||
err = os.WriteFile(filename, content, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,169 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"time"
|
||||
|
||||
"mongo.games.com/goserver/core"
|
||||
"mongo.games.com/goserver/core/basic"
|
||||
"mongo.games.com/goserver/core/module"
|
||||
"mongo.games.com/goserver/core/netlib"
|
||||
"mongo.games.com/goserver/core/profile"
|
||||
"mongo.games.com/goserver/core/schedule"
|
||||
"mongo.games.com/goserver/core/task"
|
||||
"mongo.games.com/goserver/core/transact"
|
||||
"mongo.games.com/goserver/core/utils"
|
||||
|
||||
"mongo.games.com/game/common"
|
||||
"mongo.games.com/game/model"
|
||||
"mongo.games.com/game/webapi"
|
||||
)
|
||||
|
||||
var MonitorMgrSington = &MonitorMgr{}
|
||||
|
||||
type MonitorMgr struct {
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) ModuleName() string {
|
||||
return "MonitorMgr"
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Init() {
|
||||
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Update() {
|
||||
//player online stats
|
||||
olStats := PlayerMgrSington.StatsOnline()
|
||||
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", olStats)
|
||||
if log != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.InsertMonitorData("online", log)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
|
||||
//api stats
|
||||
if len(WebAPIStats) > 0 {
|
||||
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", WebAPIStats)
|
||||
if log != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("webapi", log)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
apiStats := webapi.Stats()
|
||||
if len(apiStats) > 0 {
|
||||
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "api", apiStats)
|
||||
if log != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("webapi", log)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//logic stats
|
||||
logicStats := profile.GetStats()
|
||||
if len(logicStats) > 0 {
|
||||
logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
|
||||
if logLogic != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("logic", logLogic)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//net session stats
|
||||
netStats := netlib.Stats()
|
||||
if len(netStats) > 0 {
|
||||
logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
|
||||
if logNet != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("net", logNet)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//schedule stats
|
||||
jobStats := schedule.Stats()
|
||||
if len(jobStats) > 0 {
|
||||
logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
|
||||
if logJob != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("job", logJob)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//trans stats
|
||||
transStats := transact.Stats()
|
||||
if len(transStats) > 0 {
|
||||
logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
|
||||
if logTrans != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("transact", logTrans)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//panic stats
|
||||
panicStats := utils.GetPanicStats()
|
||||
if len(panicStats) > 0 {
|
||||
for key, stats := range panicStats {
|
||||
logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
|
||||
if logPanic != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("panic", logPanic)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//object command quene stats
|
||||
objStats := core.AppCtx.GetStats()
|
||||
if len(objStats) > 0 {
|
||||
logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
|
||||
if logCmd != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.UpsertMonitorData("cmdque", logCmd)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
//gorouting count, eg. system info
|
||||
runtimeStats := utils.StatsRuntime()
|
||||
logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
|
||||
if logRuntime != nil {
|
||||
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
|
||||
return model.InsertMonitorData("runtime", logRuntime)
|
||||
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
|
||||
}
|
||||
}
|
||||
|
||||
func (this *MonitorMgr) Shutdown() {
|
||||
module.UnregisteModule(this)
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
//gob registe
|
||||
gob.Register(model.PlayerOLStats{})
|
||||
gob.Register(map[string]*model.APITransactStats{})
|
||||
gob.Register(webapi.ApiStats{})
|
||||
gob.Register(map[string]webapi.ApiStats{})
|
||||
gob.Register(profile.TimeElement{})
|
||||
gob.Register(map[string]profile.TimeElement{})
|
||||
gob.Register(netlib.ServiceStats{})
|
||||
gob.Register(map[int]netlib.ServiceStats{})
|
||||
gob.Register(schedule.TaskStats{})
|
||||
gob.Register(map[string]schedule.TaskStats{})
|
||||
gob.Register(transact.TransStats{})
|
||||
gob.Register(map[int]transact.TransStats{})
|
||||
gob.Register(utils.PanicStackInfo{})
|
||||
gob.Register(map[string]utils.PanicStackInfo{})
|
||||
gob.Register(basic.CmdStats{})
|
||||
gob.Register(map[string]basic.CmdStats{})
|
||||
gob.Register(utils.RuntimeStats{})
|
||||
//gob registe
|
||||
|
||||
module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
|
||||
}
|
Loading…
Reference in New Issue