Merge branch 'dev_rpc' into develop

This commit is contained in:
sk 2024-12-27 17:37:15 +08:00
commit 500527f45e
14 changed files with 389 additions and 567 deletions

View File

@ -47,6 +47,9 @@
"data":{
"RootPath":"../data"
},
"mongox": {
"Path": "./etc/mgo.json"
},
"etcd": {
"Url": ["127.0.0.1:2379"],
"UserName": "",

85
dbproxy/monitormgr.go Normal file
View File

@ -0,0 +1,85 @@
package main
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/module"
"mongo.games.com/goserver/core/mongox"
"mongo.games.com/goserver/core/task"
"mongo.games.com/game/common"
"mongo.games.com/game/model"
)
func init() {
//module.RegisteModule(new(MonitorMgr), time.Second*30, 0)
}
type MonitorData struct {
SrvId int32 //服务器id
SrvType int32 //服务器类型
Key string //自定义key
Time time.Time //时间戳
Data interface{} //数据体
}
type MonitorMgr struct {
}
func (m *MonitorMgr) ModuleName() string {
return "MonitorMgr"
}
func (m *MonitorMgr) Init() {
}
func (m *MonitorMgr) Update() {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
var updates []mongo.WriteModel
for k, v := range GetPPCState() {
d := &MonitorData{
SrvId: int32(common.GetSelfSrvId()),
SrvType: int32(common.GetSelfSrvType()),
Key: k,
Time: time.Now(),
Data: v,
}
updates = append(updates,
mongo.NewUpdateOneModel().
SetFilter(bson.M{"srvid": d.SrvId, "srvtype": d.SrvType, "key": d.Key}).
SetUpdate(bson.M{"$set": d}).SetUpsert(true))
}
if len(updates) == 0 {
return nil
}
c, err := mongox.GetGlobalMonitorCollection(fmt.Sprintf("%s_rpc", model.MonitorPrefixName))
if err != nil {
logger.Logger.Errorf("MonitorMgr Update get collection failed")
return nil
}
opts := options.BulkWrite().SetOrdered(false) // SetOrdered(false) 表示并行执行
_, err = c.BulkWrite(context.Background(), updates, opts)
if err != nil {
logger.Logger.Errorf("MonitorMgr Update bulk write failed: %v", err)
return nil
}
return nil
}), nil, "MonitorMgr_Update").StartByFixExecutor("MonitorMgr")
}
func (m *MonitorMgr) Shutdown() {
module.UnregisteModule(m)
}

75
dbproxy/rpcstate.go Normal file
View File

@ -0,0 +1,75 @@
package main
import (
"sync"
"time"
)
const Timeout = 30 * time.Second
type RPCState struct {
RunTimes int64 //执行次数
TotalRuningTime int64 //总执行时间
MaxRuningTime int64 //最长执行时间
TimeoutTimes int64 //执行超时次数 大于30秒的次数
FailTimes int64 //执行失败次数
SuccessTimes int64 //执行成功次数
}
var RPCStateMgr = make(map[string]*RPCState)
var RPCStateMgrLock = sync.RWMutex{}
func GetPPCState() map[string]*RPCState {
ret := make(map[string]*RPCState)
RPCStateMgrLock.RLock()
defer RPCStateMgrLock.RUnlock()
for k, v := range RPCStateMgr {
e := *v // 复制一份
ret[k] = &e
}
return ret
}
//func loggingMiddleware(h http.Handler) http.Handler {
// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// RPCStateMgrLock.Lock()
// state, ok := RPCStateMgr[r.URL.Path]
// if !ok {
// state = &RPCState{}
// RPCStateMgr[r.URL.Path] = state
// }
// RPCStateMgrLock.Unlock()
//
// // 记录请求的时间戳
// start := time.Now()
//
// var buf bytes.Buffer
// if r.Body != nil {
// tee := io.TeeReader(r.Body, &buf)
// r.Body = io.NopCloser(tee)
// }
//
// logger.Logger.Infof("==>RPC %s %s %s", r.Method, r.URL.Path, buf.String())
// // 包装响应写入器以捕获响应内容
// rw := &responseWriter{w, http.StatusOK}
// h.ServeHTTP(rw, r)
// // 记录请求完成时间和响应状态码
// duration := time.Since(start).Milliseconds()
//
// RPCStateMgrLock.Lock()
// state.RunTimes++
// state.TotalRuningTime += duration
// if duration > state.MaxRuningTime {
// state.MaxRuningTime = duration
// }
// if duration > Timeout.Milliseconds() {
// state.TimeoutTimes++
// }
// if rw.statusCode != http.StatusOK {
// state.FailTimes++
// } else {
// state.SuccessTimes++
// }
// RPCStateMgrLock.Unlock()
// })
//}

View File

@ -13,6 +13,7 @@ import (
"mongo.games.com/game/dbproxy/mongo"
"mongo.games.com/game/mgrsrv/api"
"mongo.games.com/game/model"
rpcx "mongo.games.com/game/rpc"
"mongo.games.com/game/webapi"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
@ -136,6 +137,9 @@ func init() {
gob.Register(basic.CmdStats{})
gob.Register(map[string]basic.CmdStats{})
gob.Register(utils.RuntimeStats{})
gob.Register(rpcx.State{})
gob.Register(map[string]rpcx.State{})
//gob registe
rpc.Register(new(MonitorDataSvc))

View File

@ -1,133 +0,0 @@
package base
import (
"encoding/gob"
"mongo.games.com/game/common"
"mongo.games.com/game/model"
"mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/module"
"mongo.games.com/goserver/core/netlib"
"mongo.games.com/goserver/core/profile"
"mongo.games.com/goserver/core/schedule"
"mongo.games.com/goserver/core/task"
"mongo.games.com/goserver/core/transact"
"mongo.games.com/goserver/core/utils"
"time"
)
var MonitorMgrSington = &MonitorMgr{}
type MonitorMgr struct {
}
func (this *MonitorMgr) ModuleName() string {
return "MonitorMgr"
}
func (this *MonitorMgr) Init() {
}
func (this *MonitorMgr) Update() {
//logic stats
logicStats := profile.GetStats()
if len(logicStats) > 0 {
logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
if logLogic != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("logic", logLogic)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//net session stats
netStats := netlib.Stats()
if len(netStats) > 0 {
logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
if logNet != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("net", logNet)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//schedule stats
jobStats := schedule.Stats()
if len(jobStats) > 0 {
logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
if logJob != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("job", logJob)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//trans stats
transStats := transact.Stats()
if len(transStats) > 0 {
logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
if logTrans != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("transact", logTrans)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//panic stats
panicStats := utils.GetPanicStats()
if len(panicStats) > 0 {
for key, stats := range panicStats {
logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
if logPanic != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("panic", logPanic)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
}
//object command quene stats
objStats := core.AppCtx.GetStats()
if len(objStats) > 0 {
logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
if logCmd != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("cmdque", logCmd)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//gorouting count, eg. system info
runtimeStats := utils.StatsRuntime()
logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
if logRuntime != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.InsertMonitorData("runtime", logRuntime)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
func (this *MonitorMgr) Shutdown() {
module.UnregisteModule(this)
}
func init() {
//gob registe
gob.Register(profile.TimeElement{})
gob.Register(map[string]profile.TimeElement{})
gob.Register(netlib.ServiceStats{})
gob.Register(map[int]netlib.ServiceStats{})
gob.Register(schedule.TaskStats{})
gob.Register(map[string]schedule.TaskStats{})
gob.Register(transact.TransStats{})
gob.Register(map[int]transact.TransStats{})
gob.Register(utils.PanicStackInfo{})
gob.Register(map[string]utils.PanicStackInfo{})
gob.Register(basic.CmdStats{})
gob.Register(map[string]basic.CmdStats{})
gob.Register(utils.RuntimeStats{})
//gob registe
module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
}

47
gob_register.go Normal file
View File

@ -0,0 +1,47 @@
package win88
import (
"encoding/gob"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/netlib"
"mongo.games.com/goserver/core/profile"
"mongo.games.com/goserver/core/schedule"
"mongo.games.com/goserver/core/transact"
"mongo.games.com/goserver/core/utils"
"mongo.games.com/game/mgrsrv/api"
"mongo.games.com/game/model"
"mongo.games.com/game/rpc"
)
func init() {
gob.Register(utils.RuntimeStats{})
gob.Register(model.PlayerOLStats{})
gob.Register(map[string]*model.APITransactStats{})
gob.Register(rpc.State{})
gob.Register(map[string]rpc.State{})
gob.Register(api.ApiStats{})
gob.Register(map[string]api.ApiStats{})
gob.Register(profile.TimeElement{})
gob.Register(map[string]profile.TimeElement{})
gob.Register(netlib.ServiceStats{})
gob.Register(map[int]netlib.ServiceStats{})
gob.Register(schedule.TaskStats{})
gob.Register(map[string]schedule.TaskStats{})
gob.Register(transact.TransStats{})
gob.Register(map[int]transact.TransStats{})
gob.Register(utils.PanicStackInfo{})
gob.Register(map[string]utils.PanicStackInfo{})
gob.Register(basic.CmdStats{})
gob.Register(map[string]basic.CmdStats{})
}

View File

@ -1,147 +0,0 @@
package main
import (
"encoding/gob"
"mongo.games.com/game/common"
"mongo.games.com/game/mgrsrv/api"
"mongo.games.com/game/model"
"mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/module"
"mongo.games.com/goserver/core/netlib"
"mongo.games.com/goserver/core/profile"
"mongo.games.com/goserver/core/schedule"
"mongo.games.com/goserver/core/task"
"mongo.games.com/goserver/core/transact"
"mongo.games.com/goserver/core/utils"
"time"
)
var MonitorMgrSington = &MonitorMgr{}
type MonitorMgr struct {
}
func (this *MonitorMgr) ModuleName() string {
return "MonitorMgr"
}
func (this *MonitorMgr) Init() {
}
func (this *MonitorMgr) Update() {
//webapi stats
apiStats := api.Stats()
if len(apiStats) > 0 {
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", apiStats)
if log != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("webapi", log)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//logic stats
logicStats := profile.GetStats()
if len(logicStats) > 0 {
logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
if logLogic != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("logic", logLogic)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//net session stats
netStats := netlib.Stats()
if len(netStats) > 0 {
logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
if logNet != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("net", logNet)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//schedule stats
jobStats := schedule.Stats()
if len(jobStats) > 0 {
logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
if logJob != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("job", logJob)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//trans stats
transStats := transact.Stats()
if len(transStats) > 0 {
logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
if logTrans != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("transact", logTrans)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//panic stats
panicStats := utils.GetPanicStats()
if len(panicStats) > 0 {
for key, stats := range panicStats {
logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
if logPanic != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("panic", logPanic)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
}
//object command quene stats
objStats := core.AppCtx.GetStats()
if len(objStats) > 0 {
logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
if logCmd != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("cmdque", logCmd)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//gorouting count, eg. system info
runtimeStats := utils.StatsRuntime()
logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
if logRuntime != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.InsertMonitorData("runtime", logRuntime)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
func (this *MonitorMgr) Shutdown() {
module.UnregisteModule(this)
}
func init() {
//gob registe
gob.Register(api.ApiStats{})
gob.Register(map[string]api.ApiStats{})
gob.Register(profile.TimeElement{})
gob.Register(map[string]profile.TimeElement{})
gob.Register(netlib.ServiceStats{})
gob.Register(map[int]netlib.ServiceStats{})
gob.Register(schedule.TaskStats{})
gob.Register(map[string]schedule.TaskStats{})
gob.Register(transact.TransStats{})
gob.Register(map[int]transact.TransStats{})
gob.Register(utils.PanicStackInfo{})
gob.Register(map[string]utils.PanicStackInfo{})
gob.Register(basic.CmdStats{})
gob.Register(map[string]basic.CmdStats{})
gob.Register(utils.RuntimeStats{})
//gob registe
module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
}

View File

@ -5,7 +5,11 @@ import (
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/task"
"mongo.games.com/game/common"
)
var (
@ -85,6 +89,60 @@ func RemoveMonitorData(t time.Time) (chged []*mgo.ChangeInfo, err error) {
return
}
type MonitorTool[T any] struct {
CollectionName string
}
func NewMonitorTool[T any](collectionName string) *MonitorTool[T] {
return &MonitorTool[T]{
CollectionName: collectionName,
}
}
func (m *MonitorTool[T]) Insert(key string, state *T) {
d := NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, state)
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return InsertMonitorData(m.CollectionName, d)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
func (m *MonitorTool[T]) InsertKV(kv map[string]*T) {
// todo 批量插入
for k, v := range kv {
m.Insert(k, v)
}
}
func (m *MonitorTool[T]) Upsert(key string, state *T) {
d := NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, state)
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return UpsertMonitorData(m.CollectionName, d)
}), nil, "UpsertMonitorData").StartByFixExecutor("monitor")
}
func (m *MonitorTool[T]) UpsertKV(kv map[string]*T) {
//todo 批量更新
for k, v := range kv {
m.Upsert(k, v)
}
}
// Remove 清理监控日志
func Remove(t time.Time) {
if t.IsZero() {
return
}
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
_, err := RemoveMonitorData(t)
if err != nil {
logger.Logger.Error("RemoveMonitorData error:", err)
}
return nil
}), nil, "RemoveMonitorData").StartByFixExecutor("monitor")
}
type PlayerOLStats struct {
PlatformStats map[string]*PlayerStats
RobotStats PlayerStats

View File

@ -1,115 +0,0 @@
package base
import (
"mongo.games.com/goserver/core/module"
)
var MonitorMgrSington = &MonitorMgr{}
type MonitorMgr struct {
}
func (this *MonitorMgr) ModuleName() string {
return "MonitorMgr"
}
func (this *MonitorMgr) Init() {
}
func (this *MonitorMgr) Update() {
//mongodb stats
//mgo.SetStats(true)
//mgoStats := mgo.GetStats()
//logMgo := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", mgoStats)
//if logMgo != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.InsertMonitorData("mgo", logMgo)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
//}
//
////logic stats
//logicStats := profile.GetStats()
//if len(logicStats) > 0 {
// logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
// if logLogic != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.UpsertMonitorData("logic", logLogic)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
// }
//}
//
////net session stats
//netStats := netlib.Stats()
//if len(netStats) > 0 {
// logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
// if logNet != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.UpsertMonitorData("net", logNet)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
// }
//}
//
////schedule stats
//jobStats := schedule.Stats()
//if len(jobStats) > 0 {
// logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
// if logJob != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.UpsertMonitorData("job", logJob)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
// }
//}
//
////trans stats
//transStats := transact.Stats()
//if len(transStats) > 0 {
// logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
// if logTrans != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.UpsertMonitorData("transact", logTrans)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
// }
//}
//
////panic stats
//panicStats := utils.GetPanicStats()
//if len(panicStats) > 0 {
// for key, stats := range panicStats {
// logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
// if logPanic != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.UpsertMonitorData("panic", logPanic)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
// }
// }
//}
//
////object command quene stats
//objStats := core.AppCtx.GetStats()
//if len(objStats) > 0 {
// logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
// if logCmd != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.UpsertMonitorData("cmdque", logCmd)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
// }
//}
//
////gorouting count, eg. system info
//runtimeStats := utils.StatsRuntime()
//logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
//if logRuntime != nil {
// task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
// return model.InsertMonitorData("runtime", logRuntime)
// }), nil, "InsertMonitorData").StartByFixExecutor("monitor")
//}
}
func (this *MonitorMgr) Shutdown() {
module.UnregisteModule(this)
}
func init() {
//module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
}

View File

@ -79,6 +79,41 @@ func (this *RPClient) Call(serviceMethod string, args interface{}, reply interfa
}
func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, reply interface{}, d time.Duration) error {
var err error
start := time.Now()
defer func() {
// 记录请求完成时间和响应状态码
duration := time.Since(start).Milliseconds()
StateMgrLock.Lock()
state, ok := StateMgr[serviceMethod]
if !ok {
state = new(State)
StateMgr[serviceMethod] = state
}
// 执行次数
state.RunTimes++
// 执行总时长
state.TotalRuningTime += duration
// 最长执行时长
if duration > state.MaxRuningTime {
state.MaxRuningTime = duration
}
switch {
case errors.Is(err, ErrRPClientNoConn):
state.NoConnTimes++
case errors.Is(err, ErrRPClientCallTimeout):
state.TimeoutTimes++
default:
if err != nil {
state.FailTimes++
} else {
state.SuccessTimes++
}
}
StateMgrLock.Unlock()
}()
if this.client == nil {
return ErrRPClientNoConn
}
@ -87,8 +122,6 @@ func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, re
d = time.Second
}
start := time.Now()
var err error
call := this.client.Go(serviceMethod, args, reply, make(chan *rpc.Call, 1))
select {
case <-time.After(d):
@ -96,7 +129,7 @@ func (this *RPClient) CallWithTimeout(serviceMethod string, args interface{}, re
case call = <-call.Done:
err = call.Error
}
if err != nil && (err == rpc.ErrShutdown || err == io.ErrUnexpectedEOF) {
if err != nil && (errors.Is(err, rpc.ErrShutdown) || errors.Is(err, io.ErrUnexpectedEOF)) {
var dailed chan struct{}
if atomic.CompareAndSwapInt32(&this.connecting, 0, 1) {
dailed = make(chan struct{})

29
rpc/rpcstate.go Normal file
View File

@ -0,0 +1,29 @@
package rpc
import (
"sync"
)
type State struct {
RunTimes int64 //执行次数
TotalRuningTime int64 //总执行时间
MaxRuningTime int64 //最长执行时间
TimeoutTimes int64 //执行超时次数 大于30秒的次数
FailTimes int64 //执行失败次数
SuccessTimes int64 //执行成功次数
NoConnTimes int64 //无连接次数
}
var StateMgr = make(map[string]*State)
var StateMgrLock = sync.RWMutex{}
func GetState() map[string]*State {
ret := make(map[string]*State)
StateMgrLock.RLock()
defer StateMgrLock.RUnlock()
for k, v := range StateMgr {
e := *v // 复制一份
ret[k] = &e
}
return ret
}

View File

@ -54,5 +54,6 @@ func main() {
schedule.StartTask()
//启动业务模块
waiter := module.Start()
StartMonitor()
waiter.Wait("main()")
}

51
worldsrv/monitor.go Normal file
View File

@ -0,0 +1,51 @@
package main
import (
"encoding/json"
"fmt"
"os"
"time"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/timer"
"mongo.games.com/game/model"
"mongo.games.com/game/rpc"
)
var (
monitorRpc = model.NewMonitorTool[rpc.State]("rpc")
)
func StartMonitor() {
timer.StartTimer(timer.TimerActionWrapper(func(h timer.TimerHandle, ud interface{}) bool {
//monitorRpc.InsertKV(rpc.GetState())
//保存到本地文件
go func() {
err := saveToFile("rpc.json", rpc.GetState())
if err != nil {
logger.Logger.Errorf("rpc.json failed to save to file: %v", err)
}
}()
return true
}), nil, time.Second*10, -1)
}
// 保存变量到文件(覆盖方式)
func saveToFile(filename string, data interface{}) error {
// 将数据转换为JSON字节
content, err := json.MarshalIndent(data, "", " ")
if err != nil {
return fmt.Errorf("failed to serialize data: %w", err)
}
// 写入文件,覆盖模式
err = os.WriteFile(filename, content, 0644)
if err != nil {
return fmt.Errorf("failed to write to file: %w", err)
}
return nil
}

View File

@ -1,169 +0,0 @@
package main
import (
"encoding/gob"
"time"
"mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/module"
"mongo.games.com/goserver/core/netlib"
"mongo.games.com/goserver/core/profile"
"mongo.games.com/goserver/core/schedule"
"mongo.games.com/goserver/core/task"
"mongo.games.com/goserver/core/transact"
"mongo.games.com/goserver/core/utils"
"mongo.games.com/game/common"
"mongo.games.com/game/model"
"mongo.games.com/game/webapi"
)
var MonitorMgrSington = &MonitorMgr{}
type MonitorMgr struct {
}
func (this *MonitorMgr) ModuleName() string {
return "MonitorMgr"
}
func (this *MonitorMgr) Init() {
}
func (this *MonitorMgr) Update() {
//player online stats
olStats := PlayerMgrSington.StatsOnline()
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", olStats)
if log != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.InsertMonitorData("online", log)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
//api stats
if len(WebAPIStats) > 0 {
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", WebAPIStats)
if log != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("webapi", log)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
apiStats := webapi.Stats()
if len(apiStats) > 0 {
log := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "api", apiStats)
if log != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("webapi", log)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//logic stats
logicStats := profile.GetStats()
if len(logicStats) > 0 {
logLogic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", logicStats)
if logLogic != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("logic", logLogic)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//net session stats
netStats := netlib.Stats()
if len(netStats) > 0 {
logNet := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", netStats)
if logNet != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("net", logNet)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//schedule stats
jobStats := schedule.Stats()
if len(jobStats) > 0 {
logJob := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", jobStats)
if logJob != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("job", logJob)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//trans stats
transStats := transact.Stats()
if len(transStats) > 0 {
logTrans := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", transStats)
if logTrans != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("transact", logTrans)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//panic stats
panicStats := utils.GetPanicStats()
if len(panicStats) > 0 {
for key, stats := range panicStats {
logPanic := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), key, stats)
if logPanic != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("panic", logPanic)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
}
//object command quene stats
objStats := core.AppCtx.GetStats()
if len(objStats) > 0 {
logCmd := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "obj", objStats)
if logCmd != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.UpsertMonitorData("cmdque", logCmd)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
//gorouting count, eg. system info
runtimeStats := utils.StatsRuntime()
logRuntime := model.NewMonitorData(int32(common.GetSelfSrvId()), int32(common.GetSelfSrvType()), "", runtimeStats)
if logRuntime != nil {
task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} {
return model.InsertMonitorData("runtime", logRuntime)
}), nil, "InsertMonitorData").StartByFixExecutor("monitor")
}
}
func (this *MonitorMgr) Shutdown() {
module.UnregisteModule(this)
}
func init() {
//gob registe
gob.Register(model.PlayerOLStats{})
gob.Register(map[string]*model.APITransactStats{})
gob.Register(webapi.ApiStats{})
gob.Register(map[string]webapi.ApiStats{})
gob.Register(profile.TimeElement{})
gob.Register(map[string]profile.TimeElement{})
gob.Register(netlib.ServiceStats{})
gob.Register(map[int]netlib.ServiceStats{})
gob.Register(schedule.TaskStats{})
gob.Register(map[string]schedule.TaskStats{})
gob.Register(transact.TransStats{})
gob.Register(map[int]transact.TransStats{})
gob.Register(utils.PanicStackInfo{})
gob.Register(map[string]utils.PanicStackInfo{})
gob.Register(basic.CmdStats{})
gob.Register(map[string]basic.CmdStats{})
gob.Register(utils.RuntimeStats{})
//gob registe
module.RegisteModule(MonitorMgrSington, time.Minute*5, 0)
}