rabbitmq优化

This commit is contained in:
sk 2024-09-29 16:25:50 +08:00
parent c628f8876f
commit a38f8fa366
19 changed files with 282 additions and 127 deletions

View File

@ -12,7 +12,7 @@ import (
_ "mongo.games.com/game"
"mongo.games.com/game/common"
_ "mongo.games.com/game/dbproxy/mq"
dbmq "mongo.games.com/game/dbproxy/mq"
"mongo.games.com/game/dbproxy/svc"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
@ -20,6 +20,7 @@ import (
)
func main() {
dbmq.Init()
// 自定义配置文件
model.InitGameParam()
// package模块

View File

@ -12,7 +12,7 @@ import (
)
func init() {
mq.RegisterSubscriber(model.APILogCollName, func(e broker.Event) (err error) {
mq.RegisterSubscriber(mq.MgrAPILog, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
@ -32,5 +32,5 @@ func init() {
return
}
return nil
}, broker.Queue(model.APILogCollName), broker.DisableAutoAck(), rabbitmq.DurableQueue())
}, broker.Queue(mq.MgrAPILog), broker.DisableAutoAck(), rabbitmq.DurableQueue())
}

View File

@ -26,7 +26,7 @@ func init() {
}
if log.Count == 0 { //玩家冲账探针
mq.Send(model.TopicProbeCoinLogAck, log)
mq.Send(mq.WorldLogCoinAck, log)
} else {
c := svc.CoinLogsCollection(log.Platform)
if c != nil {

View File

@ -1,11 +1,6 @@
package mq
import (
"encoding/json"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/game/dbproxy/svc"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
@ -13,48 +8,39 @@ import (
func init() {
// 竞技馆对局记录
mq.RegisterSubscriber(mq.DBCustomLog, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
e.Ack()
}()
var log model.CustomLog
err = json.Unmarshal(msg.Body, &log)
if err != nil {
return
mq.RegisterHandler(&mq.RegisterHandlerParam{
Name: mq.DBCustomLog,
Data: model.CustomLog{},
Handler: func(data interface{}) (err error) {
log, ok := data.(*model.CustomLog)
if !ok {
return nil
}
c := svc.DbCustomLogCollection(log.Platform)
if c != nil {
err = c.Insert(log)
}
return
}
return nil
}, broker.Queue(mq.DBCustomLog), broker.DisableAutoAck(), rabbitmq.DurableQueue())
return nil
},
})
// 竞技馆奖励记录
mq.RegisterSubscriber(mq.DBCustomLogAward, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
e.Ack()
}()
var log model.CustomLogAward
err = json.Unmarshal(msg.Body, &log)
if err != nil {
return
mq.RegisterHandler(&mq.RegisterHandlerParam{
Name: mq.DBCustomLogAward,
Data: model.CustomLogAward{},
Handler: func(data interface{}) (err error) {
log, ok := data.(*model.CustomLogAward)
if !ok {
return nil
}
c := svc.DbCustomLogAwardCollection(log.Platform)
if c != nil {
err = c.Insert(log)
if err == nil {
mq.Write(log) // 通知ranksrv广播获奖消息
}
}
return
}
return nil
}, broker.Queue(mq.DBCustomLogAward), broker.DisableAutoAck(), rabbitmq.DurableQueue())
},
})
}

View File

@ -20,7 +20,7 @@ import (
)
func init() {
mq.RegisterSubscriber(model.EvtInvite, func(e broker.Event) (err error) {
mq.RegisterSubscriber(mq.DBInvite, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
@ -80,7 +80,7 @@ func init() {
return err
}
msg.Score = 0
mq.Send(model.EvtInviteAck, msg)
mq.Send(mq.WorldInviteAck, msg)
}
return nil
}
@ -301,10 +301,10 @@ func init() {
logger.Logger.Errorf("EvtInvite add find error:%v psnid:%v score:%v rate:%v", err, v, log.Score, log.Rate)
return err
}
mq.Send(model.EvtInviteAck, msg)
mq.Send(mq.WorldInviteAck, msg)
}
}
return nil
}, broker.Queue(model.EvtInvite), broker.DisableAutoAck(), rabbitmq.DurableQueue())
}, broker.Queue(mq.DBInvite), broker.DisableAutoAck(), rabbitmq.DurableQueue())
}

19
dbproxy/mq/init.go Normal file
View File

@ -0,0 +1,19 @@
package mq
import (
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
)
func Init() {
InitHandler()
// 竞技馆获奖通知
mq.RegisterMessage(&mq.RegisterMessageParam{
Name: mq.RankCustomLogAward,
Data: model.CustomLogAward{},
})
}
func InitHandler() {
}

View File

@ -39,7 +39,7 @@ func (this *DBCustomLogAwardSvc) Find(req *model.CustomLogAwardFindReq, res *mod
return ErrCustomLogAwardNotFound
}
if err := c.Find(bson.M{"startts": bson.M{"$gte": req.StartTs, "$lte": req.EndTs}}).Sort("-startts").Limit(50).All(&res.List); err != nil {
if err := c.Find(bson.M{"startts": bson.M{"$gte": req.StartTs, "$lte": req.EndTs}}).Sort("-endts").Limit(50).All(&res.List); err != nil {
return err
}
return nil

View File

@ -53,5 +53,5 @@ func (c *LogChannel) WriteMQData(data *model.RabbitMQData) {
}
func init() {
LogChannelSington.RegisteLogCName(model.APILogCollName, &model.APILog{})
LogChannelSington.RegisteLogCName(mq.MgrAPILog, &model.APILog{})
}

View File

@ -12,9 +12,8 @@ import (
)
var (
CoinLogDBName = "log"
CoinLogCollName = "log_coinex"
TopicProbeCoinLogAck = "ack_logcoin"
CoinLogDBName = "log"
CoinLogCollName = "log_coinex"
)
var COINEX_GLOBAL_SEQ = int64(0)

View File

@ -64,7 +64,6 @@ type EvtInviteMsg struct {
RechargeScore int64 // 充值成功积分
}
const EvtInviteAck = "evt_invite_ack" // 绑定邀请人 dbproxy -> worldsrv
type EvtInviteAckMsg struct {
Platform string
Snid int32

View File

@ -19,10 +19,30 @@ const (
BackSystemJyb = "back_jyblog"
)
// mgrsrv
const (
MgrAPILog = "mgr_apilog"
)
// worldsrv 消息
const (
WorldLogCoinAck = "world_logcoin_ack"
WorldInviteAck = "world_invite_ack"
)
// dbproxy 消息
const (
DBVipGiftLog = "db_vipgift"
DBCustomLog = "db_customlog" // 房卡场对局记录
DBVipGiftLog = "db_vipgift"
DBCustomLog = "db_customlog" // 房卡场对局记录
DBCustomLogAward = "db_customlog_award" // 房卡场对局奖励
DBInvite = "db_invite"
)
// ranksrv 消息
const (
RankCustomLogAward = "rank_customlog_award" // 房卡场对局奖励
)

141
mq/messagemgr.go Normal file
View File

@ -0,0 +1,141 @@
package mq
import (
"encoding/json"
"fmt"
"reflect"
"mongo.games.com/goserver/core/broker"
"mongo.games.com/goserver/core/broker/rabbitmq"
"mongo.games.com/goserver/core/logger"
)
type HandlerFunc func(data interface{}) (err error)
type HandlerData struct {
T reflect.Type
F HandlerFunc
}
type MessageMgr struct {
name map[reflect.Type]string
handler map[string]HandlerData
}
type RegisterMessageParam struct {
Name string
Data interface{}
}
// RegisterMessage 注册消息
func (c *MessageMgr) RegisterMessage(param *RegisterMessageParam) {
if param == nil {
return
}
t := c.getType(param.Data)
c.name[t] = param.Name
}
func (c *MessageMgr) getType(data interface{}) reflect.Type {
return reflect.Indirect(reflect.ValueOf(data)).Type()
}
func (c *MessageMgr) getName(data interface{}) string {
t := c.getType(data)
if name, exist := c.name[t]; exist {
return name
}
return ""
}
// Write 记录消息,需要提前注册
func (c *MessageMgr) Write(data interface{}) {
name := c.getName(data)
if name == "" {
name = "_null_"
}
logger.Logger.Tracef("==> RabbitMQ(%v): %#v", name, data)
Send(name, data)
}
// WriteMQData rabbitMQ消息
func (c *MessageMgr) WriteMQData(name string, data interface{}) {
logger.Logger.Tracef("==> RabbitMQ(%v): %#v", name, data)
Send(name, data)
}
type RegisterHandlerParam struct {
Name string
Data interface{}
Handler HandlerFunc
}
// RegisterHandler 注册消息处理函数
func (c *MessageMgr) RegisterHandler(param *RegisterHandlerParam) {
if param == nil {
return
}
if _, ok := c.handler[param.Name]; ok {
panic(fmt.Sprintf("RabbitMQ RegisterHandler repeatet name:%v", param.Name))
return
}
if param.Data == nil {
return
}
if param.Handler == nil {
return
}
c.handler[param.Name] = HandlerData{
T: c.getType(param.Data),
F: param.Handler,
}
RegisterSubscriber(param.Name, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
e.Ack()
}()
log := reflect.New(c.handler[param.Name].T).Interface()
err = json.Unmarshal(msg.Body, log)
if err != nil {
logger.Logger.Errorf("RabbitMQ Unmarshal error: %v", err)
return
}
logger.Logger.Tracef("==> Receive RabbitMQ(%v): %#v", param.Name, log)
return c.handler[param.Name].F(log)
}
return nil
}, broker.Queue(param.Name), broker.DisableAutoAck(), rabbitmq.DurableQueue())
}
// MessageMgrSingle 消息发送器
var MessageMgrSingle = &MessageMgr{
name: make(map[reflect.Type]string),
handler: make(map[string]HandlerData),
}
// RegisterMessage 注册消息
// name: 消息名称
// data: 消息结构体指针
func RegisterMessage(param *RegisterMessageParam) {
MessageMgrSingle.RegisterMessage(param)
}
// Write 发送消息
// 默认队列名称规则队列前缀_消息结构体名称
func Write(data interface{}) {
MessageMgrSingle.Write(data)
}
// WriteMQData 发送消息
func WriteMQData(name string, data interface{}) {
MessageMgrSingle.WriteMQData(name, data)
}
// RegisterHandler 注册消息处理函数
func RegisterHandler(param *RegisterHandlerParam) {
MessageMgrSingle.RegisterHandler(param)
}

View File

@ -1,53 +0,0 @@
package com
import (
"reflect"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
)
// LogChannelSingleton 日志记录器
var LogChannelSingleton = &LogChannel{
cName: make(map[reflect.Type]string),
}
type LogChannel struct {
cName map[reflect.Type]string
}
func (c *LogChannel) RegisterLogCName(cname string, log interface{}) {
t := c.getLogType(log)
c.cName[t] = cname
}
func (c *LogChannel) getLogType(log interface{}) reflect.Type {
return reflect.Indirect(reflect.ValueOf(log)).Type()
}
func (c *LogChannel) getLogCName(log interface{}) string {
t := c.getLogType(log)
if name, exist := c.cName[t]; exist {
return name
}
return ""
}
func (c *LogChannel) WriteLog(log interface{}) {
cname := c.getLogCName(log)
if cname == "" {
cname = "_null_"
}
logger.Logger.Tracef("LogChannel ==> %#v", log)
mq.Send(cname, log)
}
func (c *LogChannel) WriteMQData(data *model.RabbitMQData) {
mq.Send(data.MQName, data.Data)
}
func init() {
LogChannelSingleton.RegisterLogCName(mq.DBCustomLogAward, &model.CustomLogAward{})
}

View File

@ -11,9 +11,11 @@ import (
"mongo.games.com/game/common"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
rankmq "mongo.games.com/game/ranksrv/mq"
)
func main() {
rankmq.Init()
// 自定义配置文件
model.InitGameParam()
// package模块

55
ranksrv/mq/init.go Normal file
View File

@ -0,0 +1,55 @@
package mq
import (
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/srvlib"
"mongo.games.com/goserver/srvlib/action"
"mongo.games.com/game/common"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
rankproto "mongo.games.com/game/protocol/rank"
"mongo.games.com/game/ranksrv/rank"
)
// Init 初始化
func Init() {
InitHandler()
// 注册消息
mq.RegisterMessage(&mq.RegisterMessageParam{
Name: mq.DBCustomLogAward,
Data: model.CustomLogAward{},
})
}
// InitHandler 注册消息处理方法
func InitHandler() {
mq.RegisterHandler(&mq.RegisterHandlerParam{
Name: mq.RankCustomLogAward,
Data: model.CustomLogAward{},
Handler: func(data interface{}) (err error) {
log, ok := data.(*model.CustomLogAward)
if !ok {
return nil
}
rank.CustomAwardMgrInstance.UpdateCache(log.Platform, 0)
var awards []*rankproto.Item
for _, v := range log.Awards {
awards = append(awards, &rankproto.Item{
Id: v.ItemId,
N: v.ItemNum,
})
}
pack := &rankproto.UserAward{
Snid: log.SnId,
Name: log.Name,
Awards: awards,
Ts: log.EndTs,
}
action.BroadcastMessage(common.GetSelfAreaId(), srvlib.GateServerType, int(rankproto.Rank_PACKET_SCRoomAwardOne), pack, nil)
logger.Logger.Tracef("SCRoomAwardOne %v", pack)
return
},
})
}

View File

@ -7,12 +7,10 @@ import (
"github.com/jinzhu/now"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/goserver/core/module"
"mongo.games.com/goserver/core/timer"
"mongo.games.com/goserver/srvlib"
"mongo.games.com/goserver/srvlib/action"
"mongo.games.com/game/common"
"mongo.games.com/game/model"
"mongo.games.com/game/mq"
"mongo.games.com/game/protocol/rank"
"mongo.games.com/game/ranksrv/com"
"mongo.games.com/game/srvdata"
@ -76,7 +74,7 @@ func (c *CustomAwardMgr) Update() {
name = srvdata.PBDB_NameMgr.Datas.GetArr()[n].Name
}
id := common.RandInt(20000000, 99999999)
com.LogChannelSingleton.WriteLog(&model.CustomLogAward{
mq.Write(&model.CustomLogAward{
Platform: k,
CycleId: "",
SnId: int32(id),
@ -85,19 +83,6 @@ func (c *CustomAwardMgr) Update() {
StartTs: nowTime.Add(-time.Minute * 8).Unix(),
EndTs: nowTime.Unix(),
})
timer.AfterTimer(func(h timer.TimerHandle, ud interface{}) bool {
CustomAwardMgrInstance.UpdateCache(k, 0)
return true
}, nil, time.Second*2)
// 通知获奖
pack := &rank.UserAward{
Snid: int32(id),
Name: name,
Awards: items,
Ts: nowTime.Unix(),
}
action.BroadcastMessage(common.GetSelfAreaId(), srvlib.GateServerType, int(rank.Rank_PACKET_SCRoomAwardOne), pack, nil)
logger.Logger.Tracef("BroadcastMessage UserAward: %v", pack)
break
}
}

View File

@ -61,6 +61,7 @@ func (w *WinCoinMgr) Update() {
}
func (w *WinCoinMgr) Shutdown() {
module.UnregisteModule(w)
}
func init() {

View File

@ -61,7 +61,7 @@ func init() {
LogChannelSingleton.RegisterLogCName(model.MQRankPlayerCoin, &model.RankPlayerCoin{})
LogChannelSingleton.RegisterLogCName(mq.BackBankrupt, &model.BankruptLog{})
LogChannelSingleton.RegisterLogCName(mq.BackReliefund, &model.ReliefFundLog{})
LogChannelSingleton.RegisterLogCName(model.EvtInvite, &model.EvtInviteMsg{})
LogChannelSingleton.RegisterLogCName(mq.DBInvite, &model.EvtInviteMsg{})
LogChannelSingleton.RegisterLogCName(model.MQRankPlayerLevel, &model.PlayerLevelInfo{})
LogChannelSingleton.RegisterLogCName(model.MQRankPlayerPermit, &model.PermitScore{})
LogChannelSingleton.RegisterLogCName(mq.BackSystemPermitCycle, &model.BackendPermitCycle{})

View File

@ -15,7 +15,7 @@ import (
)
func init() {
mq.RegisterSubscriber(model.TopicProbeCoinLogAck, func(e broker.Event) (err error) {
mq.RegisterSubscriber(mq.WorldLogCoinAck, func(e broker.Event) (err error) {
msg := e.Message()
if msg != nil {
defer func() {
@ -40,10 +40,10 @@ func init() {
return
}
return nil
}, broker.Queue(model.TopicProbeCoinLogAck), rabbitmq.DurableQueue())
}, broker.Queue(mq.WorldLogCoinAck), rabbitmq.DurableQueue())
// 邀请数据同步
mq.RegisterSubscriber(model.EvtInviteAck, func(e broker.Event) error {
mq.RegisterSubscriber(mq.WorldInviteAck, func(e broker.Event) error {
msg := e.Message()
if msg != nil {
defer func() {
@ -86,5 +86,5 @@ func init() {
}), true)
}
return nil
}, broker.Queue(model.EvtInviteAck), rabbitmq.DurableQueue())
}, broker.Queue(mq.WorldInviteAck), rabbitmq.DurableQueue())
}