From c8be1b93204e362712e8fc9c9ce6d7ac2875f378 Mon Sep 17 00:00:00 2001 From: sk <123456@qq.com> Date: Wed, 22 May 2024 14:29:32 +0800 Subject: [PATCH] =?UTF-8?q?etcd,rabbitmq=E9=98=B2=E6=AD=A2=E5=9C=A8init?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E9=9A=90=E5=BC=8F=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/action.go | 4 - common/log.go | 7 +- dbproxy/main.go | 67 ++++----- dbproxy/mq/c_apilog.go | 12 +- dbproxy/mq/c_clientlog.go | 56 ------- dbproxy/mq/c_coingivelog.go | 12 +- dbproxy/mq/c_coinlog.go | 14 +- dbproxy/mq/c_friendrecordlog.go | 12 +- dbproxy/mq/c_gamegamedetailedlog.go | 14 +- dbproxy/mq/c_gameplayerlistlog.go | 12 +- dbproxy/mq/c_invite.go | 8 +- dbproxy/mq/c_itemlog.go | 12 +- dbproxy/mq/c_jackpotlog.go | 31 ---- dbproxy/mq/c_loginlog.go | 12 +- dbproxy/mq/c_onlinelog.go | 6 - dbproxy/mq/c_rank.go | 26 +--- dbproxy/mq/c_scenecoinlog.go | 12 +- dbproxy/mq/c_welfarelog.go | 11 +- dbproxy/mq/publisher.go | 33 ---- etcd/client.go | 17 +-- etcd/init.go | 17 --- etcd/manager.go | 122 ++++++++------- gamesrv/action/action_server.go | 2 +- gamesrv/base/init.go | 45 ------ gamesrv/base/logchannel.go | 5 +- gamesrv/base/serverstate.go | 8 - gamesrv/base/srvdatamgrex.go | 10 -- gamesrv/main.go | 34 ++++- gatesrv/logchannel.go | 32 +--- gatesrv/main.go | 23 ++- mgrsrv/api/logchannel.go | 34 +---- mgrsrv/main.go | 34 ++--- mq/consumer.go | 189 ++++++++++++----------- mq/publisher.go | 226 +++++++++++++++++----------- ranksrv/com/register.go | 8 +- ranksrv/init.go | 35 ----- ranksrv/main.go | 31 +++- robot/base/init.go | 19 +-- robot/main.go | 21 ++- worldsrv/action_sign.go | 90 ----------- worldsrv/actsignmgr.go | 4 - worldsrv/blacklistmgr.go | 4 - worldsrv/gamestate.go | 5 - worldsrv/horseracelamp.go | 5 - worldsrv/init.go | 207 ------------------------- worldsrv/logchannel.go | 4 +- worldsrv/main.go | 51 ++++++- worldsrv/mq_coinlog.go | 2 - worldsrv/msgmgr.go | 5 +- worldsrv/platformgamegroup.go | 5 +- worldsrv/player.go | 9 -- 51 files changed, 551 insertions(+), 1113 deletions(-) delete mode 100644 dbproxy/mq/c_clientlog.go delete mode 100644 dbproxy/mq/c_jackpotlog.go delete mode 100644 dbproxy/mq/publisher.go delete mode 100644 etcd/init.go delete mode 100644 gamesrv/base/init.go delete mode 100644 ranksrv/init.go delete mode 100644 worldsrv/action_sign.go delete mode 100644 worldsrv/init.go diff --git a/common/action.go b/common/action.go index f72be8d..30234bb 100644 --- a/common/action.go +++ b/common/action.go @@ -19,10 +19,6 @@ var ActionMgrSington = &ActionMgr{ pool: make(map[int]ActionBase), } -func init() { - -} - type ActionMgr struct { pool map[int]ActionBase } diff --git a/common/log.go b/common/log.go index dbb8522..49a77b0 100644 --- a/common/log.go +++ b/common/log.go @@ -5,13 +5,12 @@ import ( "path/filepath" "github.com/howeyc/fsnotify" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" ) -var LastModifyConfig int64 - func init() { core.RegisteHook(core.HOOK_BEFORE_START, func() error { var err error @@ -62,7 +61,7 @@ type loggerParamModifiedCommand struct { } func (lmc *loggerParamModifiedCommand) Done(o *basic.Object) error { - logger.Logger.Info("===reload ", lmc.fileName) + logger.Logger.Info("reload logger.xml:", lmc.fileName) data, err := os.ReadFile(lmc.fileName) if err != nil { return err @@ -70,7 +69,7 @@ func (lmc *loggerParamModifiedCommand) Done(o *basic.Object) error { if len(data) != 0 { err = logger.Reload(lmc.fileName) if err != nil { - logger.Logger.Warn("===reload ", lmc.fileName, err) + logger.Logger.Warnf("reload logger.xml %v err: %v", lmc.fileName, err) } } return err diff --git a/dbproxy/main.go b/dbproxy/main.go index 84c8a5c..5b84967 100644 --- a/dbproxy/main.go +++ b/dbproxy/main.go @@ -5,55 +5,48 @@ import ( "net" "net/http" "net/rpc" + "time" "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" - _ "mongo.games.com/game/dbproxy/mq" - "mongo.games.com/game/common" "mongo.games.com/game/dbproxy/svc" + "mongo.games.com/game/etcd" "mongo.games.com/game/model" "mongo.games.com/game/mq" ) -var rabbitMqConsumer *mq.RabbitMQConsumer - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - model.InitGameParam() - rabbitMqConsumer = mq.NewRabbitMQConsumer(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}) - if rabbitMqConsumer != nil { - rabbitMqConsumer.Start() - } - - //尝试初始化 - svc.GetOnePlayerIdFromBucket() - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - if rabbitMqConsumer != nil { - rabbitMqConsumer.Stop() - } - model.ShutdownRPClient() - return nil - }) -} - func main() { + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - rpc.HandleHTTP() // 采用http协议作为rpc载体 - lis, err := net.Listen(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr")) - if err != nil { - log.Fatalln("fatal error: ", err) - } - go http.Serve(lis, nil) - - waitor := module.Start() - waitor.Wait("main()") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartConsumer(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + // 尝试初始化玩家id + svc.GetOnePlayerIdFromBucket() + // rpc 服务 + rpc.HandleHTTP() // 采用http协议作为rpc载体 + lis, err := net.Listen(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr")) + if err != nil { + log.Fatalln("rpc start fatal error: ", err) + } + go http.Serve(lis, nil) + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + etcd.Close() + mq.StopConsumer() + mq.StopPublisher() + return nil + }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/dbproxy/mq/c_apilog.go b/dbproxy/mq/c_apilog.go index 2f77882..95450de 100644 --- a/dbproxy/mq/c_apilog.go +++ b/dbproxy/mq/c_apilog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.APILog diff --git a/dbproxy/mq/c_clientlog.go b/dbproxy/mq/c_clientlog.go deleted file mode 100644 index 4df5009..0000000 --- a/dbproxy/mq/c_clientlog.go +++ /dev/null @@ -1,56 +0,0 @@ -package mq - -func init() { - //mq.RegisteSubscriber(model.ClientLogCollName, func(e broker.Event) (err error) { - // msg := e.Message() - // if msg != nil { - // defer func() { - // if err != nil { - // mq.BackUp(e, err) - // } - // - // e.Ack() - // - // recover() - // }() - // - // var log model.ClientLog - // err = json.Unmarshal(msg.Body, &log) - // if err != nil { - // logger.Logger.Errorf("[mq] %s %v", model.ClientLogCollName, err) - // return - // } - // - // logger.Logger.Tracef("[mq] %s %v", model.ClientLogCollName, string(msg.Body)) - // - // data := map[string]interface{}{} - // err = json.Unmarshal([]byte(log.Data), &data) - // if err != nil { - // logger.Logger.Errorf("[mq] %s %v", model.ClientLogCollName, err) - // return - // } - // - // // 获取平台id - // platform := log.Platform - // if log.Platform == "" { - // id, ok := data["platform"] - // if ok { - // platform = string(id.([]byte)) - // } - // } - // - // data["ts"] = log.Ts - // if log.Snid > 0 { - // data["snid"] = log.Snid - // } - // - // c := svc.ClientLogStartCollection(platform) - // if c != nil { - // err = c.Insert(data) - // } - // - // return - // } - // return nil - //}, broker.Queue(model.ClientLogCollName), broker.DisableAutoAck(), rabbitmq.DurableQueue()) -} diff --git a/dbproxy/mq/c_coingivelog.go b/dbproxy/mq/c_coingivelog.go index fb427ac..6dcd606 100644 --- a/dbproxy/mq/c_coingivelog.go +++ b/dbproxy/mq/c_coingivelog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.CoinGiveLog diff --git a/dbproxy/mq/c_coinlog.go b/dbproxy/mq/c_coinlog.go index a0948cf..0b77dde 100644 --- a/dbproxy/mq/c_coinlog.go +++ b/dbproxy/mq/c_coinlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.CoinLog @@ -30,7 +26,7 @@ func init() { } if log.Count == 0 { //玩家冲账探针 - RabbitMQPublisher.Send(model.TopicProbeCoinLogAck, log) + mq.Send(model.TopicProbeCoinLogAck, log) } else { c := svc.CoinLogsCollection(log.Platform) if c != nil { diff --git a/dbproxy/mq/c_friendrecordlog.go b/dbproxy/mq/c_friendrecordlog.go index 646521b..862b78b 100644 --- a/dbproxy/mq/c_friendrecordlog.go +++ b/dbproxy/mq/c_friendrecordlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.FriendRecord diff --git a/dbproxy/mq/c_gamegamedetailedlog.go b/dbproxy/mq/c_gamegamedetailedlog.go index 2f4206a..c3cdfac 100644 --- a/dbproxy/mq/c_gamegamedetailedlog.go +++ b/dbproxy/mq/c_gamegamedetailedlog.go @@ -2,12 +2,14 @@ package mq import ( "encoding/json" - "mongo.games.com/game/dbproxy/svc" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" + "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/dbproxy/svc" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) func init() { @@ -15,13 +17,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.GameDetailedLog diff --git a/dbproxy/mq/c_gameplayerlistlog.go b/dbproxy/mq/c_gameplayerlistlog.go index 63da3cd..bf998fc 100644 --- a/dbproxy/mq/c_gameplayerlistlog.go +++ b/dbproxy/mq/c_gameplayerlistlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.GamePlayerListLog diff --git a/dbproxy/mq/c_invite.go b/dbproxy/mq/c_invite.go index f268d8d..17254c2 100644 --- a/dbproxy/mq/c_invite.go +++ b/dbproxy/mq/c_invite.go @@ -22,13 +22,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.BindInvite @@ -59,7 +53,7 @@ func init() { InviteNumCache.Put(name, n, int64(time.Hour.Seconds())) // 更新绑定数量 - RabbitMQPublisher.Send(model.AckBindNum, &model.BindNum{ + mq.Send(model.AckBindNum, &model.BindNum{ SnId: log.InviteSnId, Num: n, }) diff --git a/dbproxy/mq/c_itemlog.go b/dbproxy/mq/c_itemlog.go index fd47133..d241234 100644 --- a/dbproxy/mq/c_itemlog.go +++ b/dbproxy/mq/c_itemlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.ItemLog diff --git a/dbproxy/mq/c_jackpotlog.go b/dbproxy/mq/c_jackpotlog.go deleted file mode 100644 index 577f1f3..0000000 --- a/dbproxy/mq/c_jackpotlog.go +++ /dev/null @@ -1,31 +0,0 @@ -package mq - -//func init() { -// mq.RegisteSubscriber(model.JackPotLogCollName, func(e broker.Event) (err error) { -// msg := e.Message() -// if msg != nil { -// defer func() { -// if err != nil { -// mq.BackUp(e, err) -// } -// -// e.Ack() -// -// recover() -// }() -// -// var log model.JackPotLog -// err = json.Unmarshal(msg.Body, &log) -// if err != nil { -// return -// } -// -// c := svc.JackPotLogsCollection(log.Platform) -// if c != nil { -// _, err = c.Upsert(bson.M{"_id": log.LogId}, log) -// } -// return -// } -// return nil -// }, broker.Queue(model.JackPotLogCollName), broker.DisableAutoAck(), rabbitmq.DurableQueue()) -//} diff --git a/dbproxy/mq/c_loginlog.go b/dbproxy/mq/c_loginlog.go index 784d3c9..2d88baa 100644 --- a/dbproxy/mq/c_loginlog.go +++ b/dbproxy/mq/c_loginlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.LoginLog diff --git a/dbproxy/mq/c_onlinelog.go b/dbproxy/mq/c_onlinelog.go index d8d5525..3a813e4 100644 --- a/dbproxy/mq/c_onlinelog.go +++ b/dbproxy/mq/c_onlinelog.go @@ -16,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.OnlineLog diff --git a/dbproxy/mq/c_rank.go b/dbproxy/mq/c_rank.go index c9fe997..61abff3 100644 --- a/dbproxy/mq/c_rank.go +++ b/dbproxy/mq/c_rank.go @@ -2,14 +2,16 @@ package mq import ( "encoding/json" - "mongo.games.com/game/dbproxy/svc" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/dbproxy/svc" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) func init() { @@ -18,13 +20,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.PlayerRankScore @@ -53,13 +49,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.RankPlayerCoin @@ -88,13 +78,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.PlayerLevelInfo diff --git a/dbproxy/mq/c_scenecoinlog.go b/dbproxy/mq/c_scenecoinlog.go index 2d9f444..bffa82d 100644 --- a/dbproxy/mq/c_scenecoinlog.go +++ b/dbproxy/mq/c_scenecoinlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.SceneCoinLog diff --git a/dbproxy/mq/c_welfarelog.go b/dbproxy/mq/c_welfarelog.go index 3bc4bb2..e6aa615 100644 --- a/dbproxy/mq/c_welfarelog.go +++ b/dbproxy/mq/c_welfarelog.go @@ -3,11 +3,12 @@ package mq import ( "encoding/json" + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -15,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.WelfareLog diff --git a/dbproxy/mq/publisher.go b/dbproxy/mq/publisher.go deleted file mode 100644 index c48c666..0000000 --- a/dbproxy/mq/publisher.go +++ /dev/null @@ -1,33 +0,0 @@ -package mq - -import ( - "mongo.games.com/game/common" - "mongo.games.com/game/mq" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" -) - -var RabbitMQPublisher *mq.RabbitMQPublisher - -func init() { - ////首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - return nil - }) - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - - return nil - }) -} diff --git a/etcd/client.go b/etcd/client.go index c4ba837..d6b0a4b 100644 --- a/etcd/client.go +++ b/etcd/client.go @@ -5,11 +5,8 @@ import ( "time" "go.etcd.io/etcd/client/v3" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/game/model" + "mongo.games.com/goserver/core/logger" ) /* @@ -232,22 +229,12 @@ func (c *Client) GoWatch(ctx context.Context, revision int64, prefix string, f f logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, resp.Err()) continue } - if !model.GameParamData.UseEtcd { - continue - } if len(resp.Events) == 0 { continue } logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, resp.Header) - obj := core.CoreObject() - if obj != nil { - func(res clientv3.WatchResponse) { - obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { - return f(res) - }), true) - }(resp) - } + f(resp) } if skip { diff --git a/etcd/init.go b/etcd/init.go deleted file mode 100644 index 5050c32..0000000 --- a/etcd/init.go +++ /dev/null @@ -1,17 +0,0 @@ -package etcd - -import ( - "mongo.games.com/goserver/core" -) - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - mgr.Start() - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - mgr.Shutdown() - return nil - }) -} diff --git a/etcd/manager.go b/etcd/manager.go index 3dc6d98..5631a0f 100644 --- a/etcd/manager.go +++ b/etcd/manager.go @@ -7,21 +7,31 @@ import ( "go.etcd.io/etcd/client/v3" + "mongo.games.com/goserver/core" + "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" - "mongo.games.com/game/common" "mongo.games.com/game/proto" ) -var mgr = &Manager{Client: new(Client)} +/* + ETCD Manager +*/ -type Manager struct { - *Client -} +var ( + defaultUrl = []string{"localhost:2379"} + defaultUser = "" + defaultPasswd = "" + defaultDialTimeout = time.Minute +) -// Register . -func (this *Manager) Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { +var globalClient = new(Client) +// Register 注册etcd监听方法 +// key:监听的key +// msgType:数据类型 +// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType,是指针类型 +func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { createFunc := func() interface{} { tp := reflect.TypeOf(msgType) if tp.Kind() == reflect.Ptr { @@ -32,7 +42,7 @@ func (this *Manager) Register(key string, msgType interface{}, f func(ctx contex initFunc := func() int64 { logger.Logger.Info("ETCD 拉取数据:", key) - res, err := this.GetValueWithPrefix(key) + res, err := globalClient.GetValueWithPrefix(key) if err == nil { for i := int64(0); i < res.Count; i++ { data := createFunc() @@ -63,61 +73,61 @@ func (this *Manager) Register(key string, msgType interface{}, f func(ctx contex // 监控数据变动 watchFunc := func(ctx context.Context, revision int64) { - this.GoWatch(ctx, revision, key, func(res clientv3.WatchResponse) error { - for _, ev := range res.Events { - switch ev.Type { - case clientv3.EventTypePut: - data := createFunc() - v, ok := data.(proto.Message) - if !ok { - logger.Logger.Errorf("ETCD %v error: not proto message", string(ev.Kv.Key)) - continue + globalClient.GoWatch(ctx, revision, key, func(res clientv3.WatchResponse) error { + obj := core.CoreObject() + if obj != nil { + obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { + for _, ev := range res.Events { + switch ev.Type { + case clientv3.EventTypePut: + data := createFunc() + v, ok := data.(proto.Message) + if !ok { + logger.Logger.Errorf("ETCD %v error: not proto message", string(ev.Kv.Key)) + continue + } + err := proto.Unmarshal(ev.Kv.Value, v) + if err != nil { + logger.Logger.Errorf("etcd unmarshal(%v) error:%v", string(ev.Kv.Key), err) + continue + } + logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v) + f(ctx, string(ev.Kv.Key), false, ev, v) + case clientv3.EventTypeDelete: + logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) + f(ctx, string(ev.Kv.Key), false, ev, nil) + } } - err := proto.Unmarshal(ev.Kv.Value, v) - if err != nil { - logger.Logger.Errorf("etcd unmarshal(%v) error:%v", string(ev.Kv.Key), err) - continue - } - logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v) - f(ctx, string(ev.Kv.Key), false, ev, v) - case clientv3.EventTypeDelete: - logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) - f(ctx, string(ev.Kv.Key), false, ev, nil) - } + return nil + }), true) } return nil }) } - this.AddFunc(initFunc, watchFunc) -} - -func (this *Manager) Start() { - logger.Logger.Infof("EtcdClient开始连接url:%v;etcduser:%v;etcdpwd:%v", common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd")) - err := this.Open(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) - if err != nil { - logger.Logger.Tracef("Manager.Open err:%v", err) - } - this.ReInitAndWatchAll() -} - -func (this *Manager) Shutdown() { - this.Close() -} - -func (this *Manager) Reset() { - this.Close() - this.Start() -} - -// Register 注册etcd监听方法 -// key:监听的key -// msgType:数据类型 -// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType,是指针类型 -func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { - mgr.Register(key, msgType, f) + globalClient.AddFunc(initFunc, watchFunc) } func Reset() { - mgr.Reset() + globalClient.Close() + Start(defaultUrl, defaultUser, defaultPasswd, defaultDialTimeout) +} + +func Close() { + globalClient.Close() +} + +func Start(url []string, user, passwd string, dialTimeout time.Duration) { + defaultUrl = url + defaultUser = user + defaultPasswd = passwd + if dialTimeout > 0 { + defaultDialTimeout = dialTimeout + } + + err := globalClient.Open(defaultUrl, defaultUser, defaultPasswd, defaultDialTimeout) + if err != nil { + logger.Logger.Errorf("etcd.Open err:%v", err) + } + globalClient.ReInitAndWatchAll() } diff --git a/gamesrv/action/action_server.go b/gamesrv/action/action_server.go index 234f687..9127452 100644 --- a/gamesrv/action/action_server.go +++ b/gamesrv/action/action_server.go @@ -559,7 +559,7 @@ func init() { return nil })) - //玩家离开 + //同步记牌器过期时间 netlib.RegisterFactory(int(server.SSPacketID_PACKET_WG_BUYRECTIMEITEM), netlib.PacketFactoryWrapper(func() interface{} { return &server.WGBuyRecTimeItem{} })) diff --git a/gamesrv/base/init.go b/gamesrv/base/init.go deleted file mode 100644 index 1863b63..0000000 --- a/gamesrv/base/init.go +++ /dev/null @@ -1,45 +0,0 @@ -package base - -import ( - "time" - - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" - - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" -) - -var RabbitMQPublisher *mq.RabbitMQPublisher - -func init() { - model.InitGameParam() - model.InitFishingParam() - model.InitNormalParam() - model.InitGMAC() - - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - model.InitGameKVData() - - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - return nil - }) -} diff --git a/gamesrv/base/logchannel.go b/gamesrv/base/logchannel.go index 63173f6..b62cd45 100644 --- a/gamesrv/base/logchannel.go +++ b/gamesrv/base/logchannel.go @@ -4,6 +4,7 @@ import ( "reflect" "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) // LogChannelSingleton 日志记录器 @@ -37,11 +38,11 @@ func (c *LogChannel) WriteLog(log interface{}) { if cname == "" { cname = "_null_" } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) + mq.Send(data.MQName, data.Data) } func init() { diff --git a/gamesrv/base/serverstate.go b/gamesrv/base/serverstate.go index 9d375dd..57e8824 100644 --- a/gamesrv/base/serverstate.go +++ b/gamesrv/base/serverstate.go @@ -2,7 +2,6 @@ package base import ( "mongo.games.com/game/common" - "mongo.games.com/goserver/core" ) var ServerStateMgr = &ServerStateManager{ @@ -24,10 +23,3 @@ func (this *ServerStateManager) SetState(state common.GameSessState) { func (this *ServerStateManager) GetState() common.GameSessState { return this.State } - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - ServerStateMgr.Init() - return nil - }) -} diff --git a/gamesrv/base/srvdatamgrex.go b/gamesrv/base/srvdatamgrex.go index 9759469..e6f700b 100644 --- a/gamesrv/base/srvdatamgrex.go +++ b/gamesrv/base/srvdatamgrex.go @@ -3,7 +3,6 @@ package base import ( "mongo.games.com/game/common" "mongo.games.com/game/srvdata" - "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/logger" ) @@ -186,12 +185,3 @@ func (this *DB_FishOutMgrEx) InitFishAppear() { } } } - -// 初始化在线奖励系统 -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - logger.Logger.Info("初始化牌库[S]") - defer logger.Logger.Info("初始化牌库[E]") - return nil - }) -} diff --git a/gamesrv/main.go b/gamesrv/main.go index 1295456..65e7e28 100644 --- a/gamesrv/main.go +++ b/gamesrv/main.go @@ -1,17 +1,21 @@ package main import ( + "time" + "mongo.games.com/goserver/core" _ "mongo.games.com/goserver/core/i18n" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" "mongo.games.com/game/common" - _ "mongo.games.com/game/srvdata" - + "mongo.games.com/game/etcd" _ "mongo.games.com/game/gamesrv/action" _ "mongo.games.com/game/gamesrv/base" _ "mongo.games.com/game/gamesrv/transact" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" + _ "mongo.games.com/game/srvdata" // game _ "mongo.games.com/game/gamesrv/chess" @@ -31,10 +35,28 @@ import ( ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + model.InitFishingParam() + model.InitNormalParam() + model.InitGMAC() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - waiter := module.Start() - waiter.Wait("main()") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + model.InitGameKVData() + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + etcd.Close() + mq.StopPublisher() + return nil + }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/gatesrv/logchannel.go b/gatesrv/logchannel.go index 0689c8e..f5fc1f1 100644 --- a/gatesrv/logchannel.go +++ b/gatesrv/logchannel.go @@ -3,10 +3,6 @@ package main import ( "reflect" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" - - "mongo.games.com/game/common" "mongo.games.com/game/model" "mongo.games.com/game/mq" ) @@ -42,33 +38,9 @@ func (c *LogChannel) WriteLog(log interface{}) { if cname == "" { cname = "_null_" } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) -} - -var RabbitMQPublisher *mq.RabbitMQPublisher - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - return nil - }) + mq.Send(data.MQName, data.Data) } diff --git a/gatesrv/main.go b/gatesrv/main.go index f454c79..5027b07 100644 --- a/gatesrv/main.go +++ b/gatesrv/main.go @@ -4,18 +4,27 @@ import ( _ "mongo.games.com/game" "mongo.games.com/game/common" "mongo.games.com/game/model" - + "mongo.games.com/game/mq" "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/module" ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - model.InitGameParam() - - waiter := module.Start() - waiter.Wait("main()") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + mq.StopPublisher() + return nil + }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/mgrsrv/api/logchannel.go b/mgrsrv/api/logchannel.go index 5993b72..a694462 100644 --- a/mgrsrv/api/logchannel.go +++ b/mgrsrv/api/logchannel.go @@ -3,11 +3,8 @@ package api import ( "reflect" - "mongo.games.com/game/common" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" ) /* @@ -15,11 +12,9 @@ import ( */ const ( - LOGCHANEL_BLACKHOLE = "_null_" + LogChanelBlackHole = "_null_" ) -var RabbitMQPublisher *mq.RabbitMQPublisher - var LogChannelSington = &LogChannel{ cName: make(map[reflect.Type]string), } @@ -48,36 +43,15 @@ func (c *LogChannel) getLogCName(log interface{}) string { func (c *LogChannel) WriteLog(log interface{}) { cname := c.getLogCName(log) if cname == "" { - cname = LOGCHANEL_BLACKHOLE + cname = LogChanelBlackHole } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) + mq.Send(data.MQName, data.Data) } func init() { LogChannelSington.RegisteLogCName(model.APILogCollName, &model.APILog{}) - - //首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - return nil - }) } diff --git a/mgrsrv/main.go b/mgrsrv/main.go index 9c6801d..090961e 100644 --- a/mgrsrv/main.go +++ b/mgrsrv/main.go @@ -1,39 +1,37 @@ package main import ( - _ "mongo.games.com/game" - _ "mongo.games.com/game/mgrsrv/api" - _ "mongo.games.com/game/srvdata" "time" - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/srvdata" "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/module" + + _ "mongo.games.com/game" + "mongo.games.com/game/common" + _ "mongo.games.com/game/mgrsrv/api" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" + _ "mongo.games.com/game/srvdata" ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - model.InitGameParam() - logger.Logger.Warnf("log data %v", srvdata.Config.RootPath) - waiter := module.Start() - waiter.Wait("main()") -} - -func init() { - //首先加载游戏配置 + // core hook core.RegisteHook(core.HOOK_BEFORE_START, func() error { model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) return nil }) - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + mq.StopPublisher() model.ShutdownRPClient() return nil }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/mq/consumer.go b/mq/consumer.go index 26c49be..4110f6d 100644 --- a/mq/consumer.go +++ b/mq/consumer.go @@ -18,8 +18,11 @@ import ( 为了使用方便,这里统一启动订阅 */ -var subscriberLock sync.RWMutex -var subscriber = make(map[string][]*Subscriber) +var ( + globalConsumer *RabbitMQConsumer + subscriberLock sync.RWMutex + subscriber = make(map[string][]*Subscriber) +) type Subscriber struct { broker.Subscriber @@ -27,6 +30,90 @@ type Subscriber struct { opts []broker.SubscribeOption } +type RabbitMQConsumer struct { + broker.Broker + url string + exchange rabbitmq.Exchange +} + +func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { + mq := &RabbitMQConsumer{ + url: url, + exchange: exchange, + } + options := []broker.Option{ + broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name), + } + if exchange.Durable { + options = append(options, rabbitmq.DurableExchange()) + } + mq.Broker = rabbitmq.NewBroker(options...) + mq.Broker.Init() + return mq +} + +func (c *RabbitMQConsumer) Start() error { + if ok, _ := common.PathExists(BackupPath); !ok { + os.MkdirAll(BackupPath, os.ModePerm) + } + + if err := c.Connect(); err != nil { + return err + } + + for topic, ss := range GetSubscribers() { + for _, s := range ss { + sub, err := c.Subscribe(topic, func(event broker.Event) error { + var err error + defer func() { + e := recover() + if e != nil { + logger.Logger.Errorf("RabbitMQConsumer.Subscriber(%s,%v) recover:%v", event.Topic(), event.Message(), e) + } + if err != nil { + c.backUp(event, err) + } + }() + err = s.h(event) + return err + }, s.opts...) + if err != nil { + return err + } + + s.Subscriber = sub + } + } + + return nil +} + +func (c *RabbitMQConsumer) Stop() error { + for _, ss := range GetSubscribers() { + for _, s := range ss { + s.Unsubscribe() + } + } + return c.Disconnect() +} + +func (c *RabbitMQConsumer) backUp(e broker.Event, err error) { + tNow := time.Now() + filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) + f, er := os.Create(filePath) + if er != nil { + logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), er) + return + } + defer f.Close() + var reason string + if err != nil { + reason = err.Error() + } + f.WriteString("reason:" + reason + "\n") + f.WriteString("data:" + string(e.Message().Body) + "\n") +} + // RegisterSubscriber 注册订阅处理方法 // 不同订阅是在各自的协程中执行的 func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) { @@ -39,21 +126,6 @@ func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.Subscribe subscriberLock.Unlock() } -func UnregisterSubscriber(topic string) { - subscriberLock.Lock() - delete(subscriber, topic) - subscriberLock.Unlock() -} - -func GetSubscriber(topic string) []*Subscriber { - subscriberLock.RLock() - defer subscriberLock.RUnlock() - if s, ok := subscriber[topic]; ok { - return s - } - return nil -} - func GetSubscribers() map[string][]*Subscriber { ret := make(map[string][]*Subscriber) subscriberLock.RLock() @@ -66,75 +138,18 @@ func GetSubscribers() map[string][]*Subscriber { return ret } -type RabbitMQConsumer struct { - broker.Broker - url string - exchange rabbitmq.Exchange -} - -func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { - mq := &RabbitMQConsumer{ - url: url, - exchange: exchange, - } - - rabbitmq.DefaultRabbitURL = mq.url - rabbitmq.DefaultExchange = mq.exchange - - mq.Broker = rabbitmq.NewBroker() - mq.Broker.Init() - return mq -} - -func (c *RabbitMQConsumer) Start() error { - if err := c.Connect(); err != nil { - return err - } - - sss := GetSubscribers() - for topic, ss := range sss { - for _, s := range ss { - sub, err := c.Subscribe(topic, s.h, s.opts...) - if err != nil { - return err - } - - s.Subscriber = sub - } - } - - return nil -} - -func (c *RabbitMQConsumer) Stop() error { - sss := GetSubscribers() - for _, ss := range sss { - for _, s := range ss { - s.Unsubscribe() - } - } - return c.Disconnect() -} - -func BackUp(e broker.Event, err error) { - tNow := time.Now() - filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, e.Topic(), tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000)) - f, err := os.Create(filePath) - if err != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), err) - return - } - defer f.Close() - var reason string - if err != nil { - reason = err.Error() - } - f.WriteString("reason:" + reason + "\n") - f.WriteString("data:" + string(e.Message().Body) + "\n") -} - -func init() { - if ok, _ := common.PathExists(BACKUP_PATH); !ok { - os.MkdirAll(BACKUP_PATH, os.ModePerm) +// StartConsumer 启动消费者 +func StartConsumer(url string, exchange string, durableExchange bool) { + StopConsumer() + globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}) + if err := globalConsumer.Start(); err != nil { + panic(fmt.Sprintf("RabbitMQConsumer.Start() err:%v", err)) + } +} + +// StopConsumer 停止消费者 +func StopConsumer() { + if globalConsumer != nil { + globalConsumer.Stop() } } diff --git a/mq/publisher.go b/mq/publisher.go index a703bd8..3f27579 100644 --- a/mq/publisher.go +++ b/mq/publisher.go @@ -9,10 +9,11 @@ import ( "sync" "time" - "mongo.games.com/game/common" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/common" ) /* @@ -20,11 +21,14 @@ import ( */ const ( - BACKUP_PATH = "backup" - TIME_FORMAT = "20060102150405" + BackupPath = "backup" + TimeFormat = "20060102150405" + FilePathFormat = "%s/%s_%s_%09d_%04d.dat" ) -var ERR_CLOSED = errors.New("publisher is closed") +var ErrClosed = errors.New("publisher is closed") + +var globalPublisher *RabbitMQPublisher type item struct { topic string @@ -32,115 +36,71 @@ type item struct { } type RabbitMQPublisher struct { - b broker.Broker + broker.Broker exchange rabbitmq.Exchange url string que chan *item - closed bool - waitor sync.WaitGroup + closed chan struct{} + wg sync.WaitGroup } -func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, backlog int) *RabbitMQPublisher { - if backlog <= 0 { - backlog = 1 +func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int) *RabbitMQPublisher { + if queueSize <= 0 { + queueSize = 1 } mq := &RabbitMQPublisher{ url: url, exchange: exchange, - que: make(chan *item, backlog), + que: make(chan *item, queueSize), + closed: make(chan struct{}), } - - rabbitmq.DefaultRabbitURL = mq.url - rabbitmq.DefaultExchange = mq.exchange - - mq.b = rabbitmq.NewBroker() - mq.b.Init() + options := []broker.Option{ + broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name), + } + if exchange.Durable { + options = append(options, rabbitmq.DurableExchange()) + } + mq.Broker = rabbitmq.NewBroker(options...) + mq.Broker.Init() return mq } -func (p *RabbitMQPublisher) Start() (err error) { - if ok, _ := common.PathExists(BACKUP_PATH); !ok { - err = os.MkdirAll(BACKUP_PATH, os.ModePerm) - if err != nil { - return - } - } - - err = p.b.Connect() - if err != nil { - return - } - - go p.workerRoutine() - - return nil -} - -func (p *RabbitMQPublisher) Stop() error { - if p.closed { - return ERR_CLOSED - } - - p.closed = true - close(p.que) - for item := range p.que { - p.publish(item.topic, item.msg) - } - - //等待所有投递出去的任务全部完成 - p.waitor.Wait() - - return p.b.Disconnect() -} - -// Send 发布消息,异步 -func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { - if p.closed { - return ERR_CLOSED - } - - i := &item{topic: topic, msg: msg} - select { - case p.que <- i: - default: - //会不会情况更糟糕 - go p.concurrentPublish(topic, msg) - } - return nil -} - -func (p *RabbitMQPublisher) concurrentPublish(topic string, msg interface{}) (err error) { - p.waitor.Add(1) - defer p.waitor.Done() - return p.publish(topic, msg) -} - // 发布消息,同步 func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) { defer func() { - if err != nil { + e := recover() + if e != nil { + logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) recover:%v", topic, msg, e) + } + if err != nil || e != nil { p.backup(topic, msg, err) } - - recover() }() - buf, err := json.Marshal(msg) - if err != nil { - return err + var buf []byte + switch d := msg.(type) { + case []byte: + buf = d + case string: + buf = []byte(d) + default: + buf, err = json.Marshal(msg) + if err != nil { + return err + } } - err = p.b.Publish(topic, &broker.Message{Body: buf}) + err = p.Publish(topic, &broker.Message{Body: buf}) if err != nil { - logger.Logger.Error("RabbitMQPublisher.publish err:", err) + logger.Logger.Error("RabbitMQPublisher.publish(%s,%v) err:%v", topic, msg, err) return } return nil } -func (p *RabbitMQPublisher) workerRoutine() { - p.waitor.Add(1) - defer p.waitor.Done() +func (p *RabbitMQPublisher) publishRoutine() { + p.wg.Add(1) + defer p.wg.Done() for { select { @@ -150,20 +110,81 @@ func (p *RabbitMQPublisher) workerRoutine() { } else { return } + case <-p.closed: + return } } } -func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { - buf, err := json.Marshal(msg) +func (p *RabbitMQPublisher) Start() (err error) { + if ok, _ := common.PathExists(BackupPath); !ok { + err = os.MkdirAll(BackupPath, os.ModePerm) + if err != nil { + return + } + } + + err = p.Connect() if err != nil { return } + + go p.publishRoutine() + + return nil +} + +func (p *RabbitMQPublisher) Stop() error { + select { + case <-p.closed: + return ErrClosed + default: + } + + close(p.closed) + close(p.que) + for item := range p.que { + p.publish(item.topic, item.msg) + } + + //等待所有投递出去的任务全部完成 + p.wg.Wait() + + return p.Disconnect() +} + +// Send 发布消息,异步 +func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { + select { + case <-p.closed: + return ErrClosed + default: + } + + i := &item{topic: topic, msg: msg} + select { + case p.que <- i: + default: + //会不会情况更糟糕 + go func() { + p.wg.Add(1) + defer p.wg.Done() + p.publish(topic, msg) + }() + } + return nil +} + +func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { + buf, er := json.Marshal(msg) + if er != nil { + return + } tNow := time.Now() - filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, topic, tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000)) - f, err := os.Create(filePath) - if err != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, err) + filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) + f, er := os.Create(filePath) + if er != nil { + logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, er) return } defer f.Close() @@ -174,3 +195,26 @@ func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { f.WriteString("reason:" + reason + "\n") f.WriteString("data:" + string(buf) + "\n") } + +// StartPublisher 启动发布者 +func StartPublisher(url string, exchange string, durableExchange bool, queueSize int) { + StopPublisher() + globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize) + if err := globalPublisher.Start(); err != nil { + panic(fmt.Sprintf("RabbitMQPublisher.Start() err:%v", err)) + } +} + +// StopPublisher 停止发布者 +func StopPublisher() { + if globalPublisher != nil { + globalPublisher.Stop() + } +} + +func Send(topic string, msg interface{}) (err error) { + if globalPublisher != nil { + return globalPublisher.Send(topic, msg) + } + return ErrClosed +} diff --git a/ranksrv/com/register.go b/ranksrv/com/register.go index faef73f..0097509 100644 --- a/ranksrv/com/register.go +++ b/ranksrv/com/register.go @@ -19,7 +19,7 @@ func Register(mainId int, msgType interface{}, h func(s *netlib.Session, g *rank return reflect.New(tp).Interface() } - common.RegisterHandler(mainId, common.HandlerWrapper(func(s *netlib.Session, packetId int, data interface{}, sid int64) error { + common.Register(mainId, rankproto.GateTransmit{}, func(s *netlib.Session, packetId int, data interface{}, sid int64) error { d, ok := data.(*rankproto.GateTransmit) if !ok { return nil @@ -36,9 +36,5 @@ func Register(mainId int, msgType interface{}, h func(s *netlib.Session, g *rank } return h(s, d, packetId, msg, sid) - })) - - netlib.RegisterFactory(mainId, netlib.PacketFactoryWrapper(func() interface{} { - return &rankproto.GateTransmit{} - })) + }) } diff --git a/ranksrv/init.go b/ranksrv/init.go deleted file mode 100644 index f2dce93..0000000 --- a/ranksrv/init.go +++ /dev/null @@ -1,35 +0,0 @@ -package main - -import ( - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" - "time" -) - -var RabbitMQPublisher *mq.RabbitMQPublisher -var RabbitMqConsumer *mq.RabbitMQConsumer - -func init() { - //首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //初始化rpc - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - - //rabbitmq打开链接 - RabbitMqConsumer = mq.NewRabbitMQConsumer(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}) - if RabbitMqConsumer != nil { - RabbitMqConsumer.Start() - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - if RabbitMqConsumer != nil { - RabbitMqConsumer.Stop() - } - return nil - }) -} diff --git a/ranksrv/main.go b/ranksrv/main.go index 79356de..a3bd2c5 100644 --- a/ranksrv/main.go +++ b/ranksrv/main.go @@ -1,21 +1,38 @@ package main import ( - "mongo.games.com/game/model" + "time" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" "mongo.games.com/game/common" + "mongo.games.com/game/etcd" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + // package模块 core.LoadPackages("config.json") defer core.ClosePackages() - - model.InitGameParam() - - waitor := module.Start() - waitor.Wait("ranksrv") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartConsumer(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true) + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + etcd.Close() + mq.StopConsumer() + model.ShutdownRPClient() + return nil + }) + // module模块 + w := module.Start() + w.Wait("ranksrv") } diff --git a/robot/base/init.go b/robot/base/init.go index 3972dd8..32fbd90 100644 --- a/robot/base/init.go +++ b/robot/base/init.go @@ -6,11 +6,7 @@ import ( "time" "github.com/globalsign/mgo/bson" - "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/logger" - - "mongo.games.com/game/common" - "mongo.games.com/game/model" ) type AccountData struct { @@ -27,19 +23,8 @@ var ( ) var accountFileName = "robotaccount.json" -func init() { - model.InitGameParam() - - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - initAccountData() - // 连接数据库服务 - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - return nil - }) -} - -// 初始化机器人账号 -func initAccountData() { +// InitAccountData 初始化机器人账号 +func InitAccountData() { dirty := false newFunc := func(n int) { for i := 0; i < n; i++ { diff --git a/robot/main.go b/robot/main.go index c5b3550..6d2992e 100644 --- a/robot/main.go +++ b/robot/main.go @@ -1,11 +1,15 @@ package main import ( + "time" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" - _ "mongo.games.com/game/common" + "mongo.games.com/game/common" + "mongo.games.com/game/model" + "mongo.games.com/game/robot/base" _ "mongo.games.com/game/robot/base" _ "mongo.games.com/game/robot/chess" _ "mongo.games.com/game/robot/thirteen" @@ -14,9 +18,22 @@ import ( ) func main() { + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + base.InitAccountData() + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + model.ShutdownRPClient() + return nil + }) + // module模块 waiter := module.Start() waiter.Wait("main()") } diff --git a/worldsrv/action_sign.go b/worldsrv/action_sign.go deleted file mode 100644 index 55bb774..0000000 --- a/worldsrv/action_sign.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "mongo.games.com/game/common" - "mongo.games.com/game/proto" - "mongo.games.com/game/protocol/activity" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/goserver/core/netlib" -) - -// ------------------------------------------------ -type CSSignPacketFactory struct { -} - -type CSSignHandler struct { -} - -func (this *CSSignPacketFactory) CreatePacket() interface{} { - pack := &activity.CSSign{} - return pack -} - -func (this *CSSignHandler) Process(s *netlib.Session, packetid int, data interface{}, sid int64) error { - logger.Logger.Trace("CSSignHandler Process recv ", data) - if msg, ok := data.(*activity.CSSign); ok { - p := PlayerMgrSington.GetPlayer(sid) - if p == nil { - logger.Logger.Warn("CSSignHandler p == nil") - return nil - } - - pack := &activity.SCSign{} - pack.SignIndex = proto.Int32(msg.GetSignIndex()) - pack.SignType = proto.Int32(msg.GetSignType()) - - retCode := ActSignMgrSington.CanSign(p, int(msg.GetSignIndex())) - if retCode != activity.OpResultCode_ActSign_OPRC_Activity_Sign_Sucess { - pack.OpRetCode = retCode - proto.SetDefaults(pack) - p.SendToClient(int(activity.ActSignPacketID_PACKET_SCSign), pack) - return nil - } - retCode = ActSignMgrSington.Sign(p, int(msg.GetSignIndex()), msg.GetSignType()) - if retCode != activity.OpResultCode_ActSign_OPRC_Activity_Sign_Sucess { - pack.OpRetCode = retCode - proto.SetDefaults(pack) - p.SendToClient(int(activity.ActSignPacketID_PACKET_SCSign), pack) - return nil - } - - pack.OpRetCode = activity.OpResultCode_ActSign_OPRC_Activity_Sign_Sucess - proto.SetDefaults(pack) - p.SendToClient(int(activity.ActSignPacketID_PACKET_SCSign), pack) - } - return nil -} - -// ------------------------------------------------ -type CSSignDataPacketFactory struct { -} - -type CSSignDataHandler struct { -} - -func (this *CSSignDataPacketFactory) CreatePacket() interface{} { - pack := &activity.CSSignData{} - return pack -} - -func (this *CSSignDataHandler) Process(s *netlib.Session, packetid int, data interface{}, sid int64) error { - logger.Logger.Trace("CSSignDataHandler Process recv ", data) - if _, ok := data.(*activity.CSSignData); ok { - p := PlayerMgrSington.GetPlayer(sid) - if p == nil { - logger.Logger.Warn("CSSignDataHandler p == nil") - return nil - } - ActSignMgrSington.SendSignDataToPlayer(p) - } - return nil -} - -func init() { - //签到 - common.RegisterHandler(int(activity.ActSignPacketID_PACKET_CSSign), &CSSignHandler{}) - netlib.RegisterFactory(int(activity.ActSignPacketID_PACKET_CSSign), &CSSignPacketFactory{}) - //签到数据 - common.RegisterHandler(int(activity.ActSignPacketID_PACKET_CSSignData), &CSSignDataHandler{}) - netlib.RegisterFactory(int(activity.ActSignPacketID_PACKET_CSSignData), &CSSignDataPacketFactory{}) -} diff --git a/worldsrv/actsignmgr.go b/worldsrv/actsignmgr.go index 9f6c56e..763e68c 100644 --- a/worldsrv/actsignmgr.go +++ b/worldsrv/actsignmgr.go @@ -152,8 +152,4 @@ func (this *ActSignMgr) Sign(player *Player, signIndex int, signType int32) acti func init() { mgo.SetStats(true) - RegisterParallelLoadFunc("14日签到", func() error { - ActSignMgrSington.Init() - return nil - }) } diff --git a/worldsrv/blacklistmgr.go b/worldsrv/blacklistmgr.go index 9e68aa5..3c2cce4 100644 --- a/worldsrv/blacklistmgr.go +++ b/worldsrv/blacklistmgr.go @@ -538,8 +538,4 @@ func (this *BlackListMgr) CheckDeviceInBlackByPlatfrom(deviceId string, blackTyp func init() { mgo.SetStats(true) - RegisterParallelLoadFunc("平台黑名单", func() error { - BlackListMgrSington.Init() - return nil - }) } diff --git a/worldsrv/gamestate.go b/worldsrv/gamestate.go index 58c9997..2eff9a8 100644 --- a/worldsrv/gamestate.go +++ b/worldsrv/gamestate.go @@ -84,11 +84,6 @@ func (gsm *GameStateManager) BrodcastGameState(gameId int32, platform string, pa } } func init() { - //使用并行加载 - RegisterParallelLoadFunc("选场游戏场次配置", func() error { - gameStateMgr.Init() - return nil - }) //gameStateMgr.gameIds[int32(common.GameId_RollCoin)] = []int32{110030001, 110030002, 110030003, 110030004} //gameStateMgr.gameIds[int32(common.GameId_RollColor)] = []int32{150010001, 150010002, 150010003, 150010004} //gameStateMgr.gameIds[int32(common.GameId_RedVsBlack)] = []int32{140010001, 140010002, 140010003, 140010004} diff --git a/worldsrv/horseracelamp.go b/worldsrv/horseracelamp.go index 574c4f2..85a23e2 100644 --- a/worldsrv/horseracelamp.go +++ b/worldsrv/horseracelamp.go @@ -406,9 +406,4 @@ func (this *HorseRaceLampMgr) BroadcastHorseRaceLampMsg(horseRaceLamp *HorseRace func init() { module.RegisteModule(HorseRaceLampMgrSington, time.Second*3, 0) - - RegisterParallelLoadFunc("平台通知", func() error { - HorseRaceLampMgrSington.InitHorseRaceLamp() - return nil - }) } diff --git a/worldsrv/init.go b/worldsrv/init.go deleted file mode 100644 index 03f6666..0000000 --- a/worldsrv/init.go +++ /dev/null @@ -1,207 +0,0 @@ -package main - -import ( - "fmt" - "math/rand" - "time" - - "mongo.games.com/game/common" - "mongo.games.com/game/gamerule/crash" - "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker/rabbitmq" - - "sync" - - "github.com/astaxie/beego/cache" - "mongo.games.com/game/model" - "mongo.games.com/game/webapi" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/goserver/core/utils" -) - -type ParalleFunc func() error - -var CacheMemory cache.Cache -var wgParalleLoad = &sync.WaitGroup{} -var ParalleLoadModules []*ParalleLoadModule -var RabbitMQPublisher *mq.RabbitMQPublisher -var RabbitMqConsumer *mq.RabbitMQConsumer - -type ParalleLoadModule struct { - name string - f ParalleFunc -} - -func RegisterParallelLoadFunc(name string, f ParalleFunc) { - ParalleLoadModules = append(ParalleLoadModules, &ParalleLoadModule{name: name, f: f}) -} - -func init() { - rand.Seed(time.Now().UnixNano()) - - //首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - model.InitGameParam() - model.InitNormalParam() - - hs := model.GetCrashHash(0) - if hs == nil { - for i := 0; i < crash.POKER_CART_CNT; i++ { - model.InsertCrashHash(i, crash.Sha256(fmt.Sprintf("%v%v", i, time.Now().UnixNano()))) - } - hs = model.GetCrashHash(0) - } - model.GameParamData.InitGameHash = []string{} - for _, v := range hs { - model.GameParamData.InitGameHash = append(model.GameParamData.InitGameHash, v.Hash) - } - hsatom := model.GetCrashHashAtom(0) - if hsatom == nil { - for i := 0; i < crash.POKER_CART_CNT; i++ { - model.InsertCrashHashAtom(i, crash.Sha256(fmt.Sprintf("%v%v", i, time.Now().UnixNano()))) - } - hsatom = model.GetCrashHashAtom(0) - } - model.GameParamData.AtomGameHash = []string{} - for _, v := range hsatom { - model.GameParamData.AtomGameHash = append(model.GameParamData.AtomGameHash, v.Hash) - } - //if len(model.GameParamData.AtomGameHash) < crash.POKER_CART_CNT || - // len(model.GameParamData.InitGameHash) < crash.POKER_CART_CNT { - // panic(errors.New("hash is read error")) - //} - return nil - }) - - //RegisterParallelLoadFunc("平台红包数据", func() error { - // actRandCoinMgr.LoadPlatformData() - // return nil - //}) - - RegisterParallelLoadFunc("GMAC", func() error { - model.InitGMAC() - return nil - }) - - RegisterParallelLoadFunc("三方游戏配置", func() error { - model.InitGameConfig() - return nil - }) - - RegisterParallelLoadFunc("GameKVData", func() error { - model.InitGameKVData() - return nil - }) - - RegisterParallelLoadFunc("水池配置", func() error { - return model.GetAllCoinPoolSettingData() - }) - - RegisterParallelLoadFunc("三方平台热载数据设置", func() error { - f := func() { - webapi.ReqCgAddr = model.GameParamData.CgAddr - if plt, ok := webapi.ThridPlatformMgrSington.ThridPlatformMap.Load("XHJ平台"); ok { - //plt.(*webapi.XHJThridPlatform).IsNeedCheckQuota = model.GameParamData.FGCheckPlatformQuota - plt.(*webapi.XHJThridPlatform).ReqTimeOut = model.GameParamData.ThirdPltReqTimeout - } - } - f() - model.GameParamData.Observers = append(model.GameParamData.Observers, f) - return nil - }) - - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - - //for _, v := range data { - // PlatformMgrSingleton.UpsertPlatform(v.Name, v.Isolated, v.GameStatesData) - //} - - //ps := []model.GamePlatformState{model.GamePlatformState{LogicId:130000001,Param:"",State:1},model.GamePlatformState{LogicId:150000001,Param:"",State:1}} - - //model.InsertPlatformGameConfig("360",true,ps) - - var err error - CacheMemory, err = cache.NewCache("memory", `{"interval":60}`) - if err != nil { - return err - } - - //etcd打开连接 - EtcdMgrSington.Init() - - //go func() { - // for { - // time.Sleep(time.Minute) - // EtcdMgrSington.Reset() - // } - //}() - - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err = RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - - RabbitMqConsumer = mq.NewRabbitMQConsumer(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}) - if RabbitMqConsumer != nil { - RabbitMqConsumer.Start() - } - - //开始并行加载数据 - //改为串行加载,后台并发有点扛不住 - if len(ParalleLoadModules) > 0 { - tStart := time.Now() - logger.Logger.Infof("===[开始串行加载]===") - //wgParalleLoad.Add(paralleCnt) - for _, m := range ParalleLoadModules { - /*go*/ func(plm *ParalleLoadModule) { - ts := time.Now() - defer func() { - utils.DumpStackIfPanic(plm.name) - //wgParalleLoad.Done() - logger.Logger.Infof("[串行加载结束][%v] 耗时[%v]", plm.name, time.Now().Sub(ts)) - }() - logger.Logger.Infof("[开始串行加载][%v] ", plm.name) - - err := plm.f() - if err != nil { - logger.Logger.Warnf("[串行加载][%v][error:%v]", plm.name, err) - } - }(m) - } - //wgParalleLoad.Wait() - logger.Logger.Infof("===[串行加载结束,总耗时:%v]===", time.Now().Sub(tStart)) - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //etcd关闭连接 - EtcdMgrSington.Shutdown() - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - - if RabbitMqConsumer != nil { - RabbitMqConsumer.Stop() - } - - //model.ShutdownRPClient() - return nil - }) - //core.RegisteHook(core.HOOK_BEFORE_START, func() error { - // ThirdPltGameMappingConfig.Init() - // return nil - //}) - //RegisterParallelLoadFunc("分层配置数据", func() error { - // //加载分层配置 - // LogicLevelMgrSington.LoadConfig() - // return nil - //}) -} diff --git a/worldsrv/logchannel.go b/worldsrv/logchannel.go index b4bcc50..d1853b2 100644 --- a/worldsrv/logchannel.go +++ b/worldsrv/logchannel.go @@ -39,12 +39,12 @@ func (c *LogChannel) WriteLog(log interface{}) { if cname == "" { cname = "_null_" } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } // WriteMQData rabbitMQ消息 func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) + mq.Send(data.MQName, data.Data) } func init() { diff --git a/worldsrv/main.go b/worldsrv/main.go index 098ddcb..8bdcc55 100644 --- a/worldsrv/main.go +++ b/worldsrv/main.go @@ -1,26 +1,65 @@ package main import ( - "math/rand" "time" - _ "mongo.games.com/game" - "mongo.games.com/game/common" + "github.com/astaxie/beego/cache" "mongo.games.com/goserver/core" _ "mongo.games.com/goserver/core/i18n" "mongo.games.com/goserver/core/module" "mongo.games.com/goserver/core/schedule" + + _ "mongo.games.com/game" + "mongo.games.com/game/common" + "mongo.games.com/game/etcd" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) +var CacheMemory cache.Cache + func main() { - rand.Seed(time.Now().Unix()) - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + model.InitNormalParam() + model.InitGMAC() + model.InitGameConfig() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartConsumer(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + var err error + CacheMemory, err = cache.NewCache("memory", `{"interval":60}`) + if err != nil { + return err + } + + EtcdMgrSington.Init() + BlackListMgrSington.Init() + gameStateMgr.Init() + HorseRaceLampMgrSington.InitHorseRaceLamp() + model.InitGameKVData() + model.GetAllCoinPoolSettingData() + MsgMgrSington.InitMsg() + PlatformGameGroupMgrSington.LoadGameGroup() + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + EtcdMgrSington.Close() + etcd.Close() + mq.StopPublisher() + mq.StopConsumer() + model.ShutdownRPClient() + return nil + }) //启动定时任务 schedule.StartTask() - //启动业务模块 waiter := module.Start() waiter.Wait("main()") diff --git a/worldsrv/mq_coinlog.go b/worldsrv/mq_coinlog.go index 9425191..e78259f 100644 --- a/worldsrv/mq_coinlog.go +++ b/worldsrv/mq_coinlog.go @@ -19,7 +19,6 @@ func init() { if msg != nil { defer func() { e.Ack() - recover() }() var log model.CoinLog @@ -48,7 +47,6 @@ func init() { if msg != nil { defer func() { e.Ack() - recover() }() var log model.BindNum diff --git a/worldsrv/msgmgr.go b/worldsrv/msgmgr.go index 90be8f9..518a969 100644 --- a/worldsrv/msgmgr.go +++ b/worldsrv/msgmgr.go @@ -52,8 +52,5 @@ func (mm *MsgMgr) GetSubscribeMsgs(platform string, ts int64) (msgs []*model.Mes } func init() { - RegisterParallelLoadFunc("平台邮件", func() error { - MsgMgrSington.InitMsg() - return nil - }) + } diff --git a/worldsrv/platformgamegroup.go b/worldsrv/platformgamegroup.go index ca35ad2..1f430f8 100644 --- a/worldsrv/platformgamegroup.go +++ b/worldsrv/platformgamegroup.go @@ -125,8 +125,5 @@ func (this *PlatformGameGroupMgr) OnGameGroupUpdate(oldCfg, newCfg *webapi_proto } func init() { - RegisterParallelLoadFunc("平台游戏组数据", func() error { - PlatformGameGroupMgrSington.LoadGameGroup() - return nil - }) + } diff --git a/worldsrv/player.go b/worldsrv/player.go index 957fbda..a92509b 100644 --- a/worldsrv/player.go +++ b/worldsrv/player.go @@ -365,10 +365,6 @@ func (this *Player) OnLogined() { this.OnlineLogLogin() this.SendToRepSrv(this.PlayerData) - - //七日活动 - ActSignMgrSington.OnPlayerLogin(this) - //红点检测 this.CheckShowRed() @@ -423,9 +419,6 @@ func (this *Player) OnRehold() { FriendMgrSington.ApplyList(this.Platform, this.SnId) FriendUnreadMgrSington.CheckSendFriendUnreadData(this.Platform, this.SnId) - //七日活动. - ActSignMgrSington.OnPlayerLogin(this) - this.CheckShowRed() this.SendJackPotInit() @@ -2212,8 +2205,6 @@ func (this *Player) OnDayTimer(login, continuous bool, t int) { this.ShopLastLookTime = make(map[int32]int64) // 福利活动更新 WelfareMgrSington.OnDayChanged(this) - //七日活动 - ActSignMgrSington.OnDayChanged(this) this.VipMatchTimes = 0 //VIP商城数据更新 this.UpdateVipShopData()