From 489395ebb159fd5692b054f4594ea225e929abc2 Mon Sep 17 00:00:00 2001 From: sk <123456@qq.com> Date: Tue, 21 May 2024 15:56:11 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=AF=94=E8=B5=9B?= =?UTF-8?q?=E5=9C=BA=E6=AE=B5=E4=BD=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mgrsrv/api/webapi_actthrsrv.go | 203 -------------- mgrsrv/api/webapi_ranksrv.go | 186 ------------- worldsrv/action_game.go | 8 +- worldsrv/matchseasonmgr.go | 393 --------------------------- worldsrv/matchseasonrankmgr.go | 379 -------------------------- worldsrv/player.go | 1 - worldsrv/scene.go | 12 +- worldsrv/thirdPlatformGameMapping.go | 129 --------- worldsrv/thirdplatformmgr.go | 252 ----------------- worldsrv/tmmatch.go | 10 +- worldsrv/tournament.go | 7 +- worldsrv/trascate_webapi.go | 197 -------------- 12 files changed, 7 insertions(+), 1770 deletions(-) delete mode 100644 mgrsrv/api/webapi_actthrsrv.go delete mode 100644 mgrsrv/api/webapi_ranksrv.go delete mode 100644 worldsrv/matchseasonmgr.go delete mode 100644 worldsrv/matchseasonrankmgr.go delete mode 100644 worldsrv/thirdPlatformGameMapping.go delete mode 100644 worldsrv/thirdplatformmgr.go diff --git a/mgrsrv/api/webapi_actthrsrv.go b/mgrsrv/api/webapi_actthrsrv.go deleted file mode 100644 index 3971212..0000000 --- a/mgrsrv/api/webapi_actthrsrv.go +++ /dev/null @@ -1,203 +0,0 @@ -package api - -// -//import ( -// "encoding/json" -// "io/ioutil" -// "net/http" -// "time" -// -// "crypto/md5" -// "encoding/hex" -// "fmt" -// "mongo.games.com/game/common" -// "mongo.games.com/game/model" -// "mongo.games.com/game/webapi" -// "mongo.games.com/goserver/core" -// "mongo.games.com/goserver/core/admin" -// "mongo.games.com/goserver/core/logger" -// "mongo.games.com/goserver/core/netlib" -// "mongo.games.com/goserver/core/transact" -// "mongo.games.com/goserver/core/utils" -// "io" -// "sync/atomic" -//) -// -//// API -//// http://127.0.0.1:9595/api/Report/QueryOnlineReportList?ts=20141024000000&sign=41cc8cee8dd93f7dc70b6426cfd1029d -// -//func ActThrSrvApi(rw http.ResponseWriter, req *http.Request) { -// defer utils.DumpStackIfPanic("api.ActThrSrvApi") -// logger.Logger.Info("ActThrSrvApi receive:", req.URL.Path, req.URL.RawQuery) -// -// if common.RequestCheck(req, model.GameParamData.WhiteHttpAddr) == false { -// logger.Logger.Info("RemoteAddr [%v] require api.", req.RemoteAddr) -// return -// } -// data, err := io.ReadAll(req.Body) -// if err != nil { -// logger.Logger.Info("Body err.", err) -// webApiResponse(rw, map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "Post data is null!"}) -// return -// } -// m := req.URL.Query() -// timestamp := m.Get("nano") -// if timestamp == "" { -// logger.Logger.Info(req.RemoteAddr, " ActThrSrvApi param error: nano not allow null") -// return -// } -// sign := m.Get("sign") -// if sign == "" { -// logger.Logger.Info(req.RemoteAddr, " ActThrSrvApi param error: sign not allow null") -// return -// } -// startTime := time.Now().UnixNano() -// args := fmt.Sprintf("%v;%v;%v;%v", common.Config.AppId, req.URL.Path, string(data), timestamp) -// h := md5.New() -// io.WriteString(h, args) -// realSign := hex.EncodeToString(h.Sum(nil)) -// if realSign != sign && !common.Config.IsDevMode { -// logger.Logger.Info(req.RemoteAddr, " srvCtrlMain sign error: expect ", realSign, " ; but get ", sign, " raw=", args) -// webApiResponse(rw, map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "Sign error!"}) -// return -// } -// -// var stats *ApiStats -// if v, exist := WebApiStats.Load(req.URL.Path); exist { -// stats = v.(*ApiStats) -// } else { -// stats = &ApiStats{} -// WebApiStats.Store(req.URL.Path, stats) -// } -// var rep map[string]interface{} -// start := time.Now() -// res := make(chan map[string]interface{}, 1) -// core.CoreObject().SendCommand(&WebApiEvent{req: req, path: req.URL.Path, h: HandlerWrapper(func(event *WebApiEvent, data []byte) bool { -// logger.Logger.Trace("ActThrSrvApi start transcate") -// tnp := &transact.TransNodeParam{ -// Tt: common.TransType_ActThrSrvWebApi, -// Ot: transact.TransOwnerType(common.GetSelfSrvType()), -// Oid: common.GetSelfSrvId(), -// AreaID: common.GetSelfAreaId(), -// } -// tNode := transact.DTCModule.StartTrans(tnp, event, transact.DefaultTransactTimeout) //超时时间30秒 -// if tNode != nil { -// tNode.TransEnv.SetField(WEBAPI_TRANSACTE_EVENT, event) -// tNode.Go(core.CoreObject()) -// } -// return true -// }), body: data, rawQuery: req.URL.RawQuery, res: res}, false) -// select { -// case rep = <-res: -// if rep != nil { -// webApiResponse(rw, rep) -// } -// case <-time.After(ApiDefaultTimeout): -// rep = make(map[string]interface{}) -// rep[webapi.RESPONSE_STATE] = webapi.STATE_ERR -// rep[webapi.RESPONSE_ERRMSG] = "proccess timeout!" -// webApiResponse(rw, rep) -// if stats != nil { -// atomic.AddInt64(&stats.TimeoutTimes, 1) -// } -// } -// ps := int64(time.Now().Sub(start) / time.Millisecond) -// if stats != nil { -// atomic.AddInt64(&stats.RunTimes, 1) -// atomic.AddInt64(&stats.TotalRuningTime, ps) -// if atomic.LoadInt64(&stats.MaxRuningTime) < ps { -// atomic.StoreInt64(&stats.MaxRuningTime, ps) -// } -// } -// result, err := json.Marshal(rep) -// if err == nil { -// log := model.NewAPILog(req.URL.Path, req.URL.RawQuery, string(data[:]), req.RemoteAddr, string(result[:]), startTime, ps) -// APILogChannelSington.Write(log) -// } -// return -//} -// -////-------------------------------------------------------------------------------------- -//func init() { -// transact.RegisteHandler(common.TransType_ActThrSrvWebApi, &transact.TransHanderWrapper{ -// OnExecuteWrapper: transact.OnExecuteWrapper(func(tNode *transact.TransNode, ud interface{}) transact.TransExeResult { -// logger.Logger.Trace("ActThrSrvApi start TransType_WebApi OnExecuteWrapper ") -// tnp := &transact.TransNodeParam{ -// Tt: common.TransType_ActThrSrvWebApi, -// Ot: transact.TransOwnerType(common.ActThrServerType), -// Oid: common.ActThrServerID, -// AreaID: common.GetSelfAreaId(), -// Tct: transact.TransactCommitPolicy_TwoPhase, -// } -// if event, ok := ud.(*WebApiEvent); ok { -// userData := &common.M2GWebApiRequest{Path: event.path, RawQuery: event.rawQuery, Body: event.body, ReqIp: event.req.RemoteAddr} -// tNode.StartChildTrans(tnp, userData, transact.DefaultTransactTimeout) -// -// pid := tNode.MyTnp.TId -// cid := tnp.TId -// logger.Logger.Tracef("ActThrSrvApi start TransType_WebApi OnExecuteWrapper tid:%x childid:%x", pid, cid) -// return transact.TransExeResult_Success -// } -// return transact.TransExeResult_Failed -// }), -// OnCommitWrapper: transact.OnCommitWrapper(func(tNode *transact.TransNode) transact.TransExeResult { -// logger.Logger.Trace("ActThrSrvApi start TransType_WebApi OnCommitWrapper") -// event := tNode.TransEnv.GetField(WEBAPI_TRANSACTE_EVENT).(*WebApiEvent) -// resp := tNode.TransEnv.GetField(WEBAPI_TRANSACTE_RESPONSE) -// if userData, ok := resp.(*common.M2GWebApiResponse); ok { -// if len(userData.Body) > 0 { -// m := make(map[string]interface{}) -// err := json.Unmarshal(userData.Body, &m) -// if err == nil { -// event.Response(m) -// return transact.TransExeResult_Success -// } -// } -// } -// event.Response(map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "execute failed!"}) -// return transact.TransExeResult_Success -// }), -// OnRollBackWrapper: transact.OnRollBackWrapper(func(tNode *transact.TransNode) transact.TransExeResult { -// logger.Logger.Trace("ActThrSrvApi start TransType_WebApi OnRollBackWrapper") -// event := tNode.TransEnv.GetField(WEBAPI_TRANSACTE_EVENT).(*WebApiEvent) -// resp := tNode.TransEnv.GetField(WEBAPI_TRANSACTE_RESPONSE) -// if userData, ok := resp.(*common.M2GWebApiResponse); ok { -// if len(userData.Body) > 0 { -// m := make(map[string]interface{}) -// err := json.Unmarshal(userData.Body, &m) -// if err == nil { -// event.Response(m) -// return transact.TransExeResult_Success -// } -// } -// return transact.TransExeResult_Success -// } -// event.Response(map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "execute failed!"}) -// return transact.TransExeResult_Success -// }), -// OnChildRespWrapper: transact.OnChildRespWrapper(func(tNode *transact.TransNode, hChild transact.TransNodeID, retCode int, ud interface{}) transact.TransExeResult { -// logger.Logger.Tracef("ActThrSrvApi start TransType_WebApi OnChildRespWrapper ret:%v childid:%x", retCode, hChild) -// userData := &common.M2GWebApiResponse{} -// err := netlib.UnmarshalPacketNoPackId(ud.([]byte), userData) -// if err == nil { -// tNode.TransEnv.SetField(WEBAPI_TRANSACTE_RESPONSE, userData) -// } else { -// logger.Logger.Trace("trascate.OnChildRespWrapper err:", err) -// } -// return transact.TransExeResult(retCode) -// }), -// }) -// -// //测试 -// admin.MyAdminApp.Route("/api/ActThr/Test", ActThrSrvApi) -// -// //增加自动黑白名单控制 -// admin.MyAdminApp.Route("/api/ActThr/Upsert", ActThrSrvApi) -// -// //删除自动黑白名单控制 -// admin.MyAdminApp.Route("/api/ActThr/Delete", ActThrSrvApi) -// -// //删除所有添加的黑名单 -// admin.MyAdminApp.Route("/api/ActThr/ResetBW", ActThrSrvApi) -// -//} diff --git a/mgrsrv/api/webapi_ranksrv.go b/mgrsrv/api/webapi_ranksrv.go deleted file mode 100644 index e0adcc9..0000000 --- a/mgrsrv/api/webapi_ranksrv.go +++ /dev/null @@ -1,186 +0,0 @@ -package api - -// -//import ( -// "crypto/md5" -// "encoding/hex" -// "encoding/json" -// "fmt" -// "mongo.games.com/game/common" -// "mongo.games.com/game/model" -// "mongo.games.com/game/webapi" -// "mongo.games.com/goserver/core" -// "mongo.games.com/goserver/core/admin" -// "mongo.games.com/goserver/core/logger" -// "mongo.games.com/goserver/core/netlib" -// "mongo.games.com/goserver/core/transact" -// "mongo.games.com/goserver/core/utils" -// "io" -// "io/ioutil" -// "net/http" -// "time" -//) -// -//const ( -// RANKSRVAPI_TRANSACTE_EVENT = "GAMESRVAPI_TRANSACTE_EVENT" -// RANKSRVAPI_TRANSACTE_RESPONSE = "RANKSRVAPI_TRANSACTE_RESPONSE" -//) -// -//// 处理 web 请求 rank server 相关的配置协议, 转发至 rank server 处理 -// -//func RankSrvApi(rw http.ResponseWriter, req *http.Request) { -// defer utils.DumpStackIfPanic("api.RankSrvApi") -// logger.Logger.Info("RankSrvApi receive:", req.URL.Path, req.URL.RawQuery) -// -// if common.RequestCheck(req, model.GameParamData.WhiteHttpAddr) == false { -// logger.Logger.Info("RemoteAddr [%v] require api.", req.RemoteAddr) -// return -// } -// data, err := io.ReadAll(req.Body) -// if err != nil { -// logger.Logger.Info("Body err.", err) -// webApiResponse(rw, map[string]interface{}{ -// webapi.RESPONSE_STATE: webapi.STATE_ERR, -// webapi.RESPONSE_ERRMSG: "Post data is null!", -// }) -// return -// } -// logger.Logger.Info(string(data)) -// m := req.URL.Query() -// timestamp := m.Get("nano") -// if timestamp == "" { -// logger.Logger.Info(req.RemoteAddr, " RankSrvApi param error: nano not allow null") -// return -// } -// sign := m.Get("sign") -// if sign == "" { -// logger.Logger.Info(req.RemoteAddr, " RankSrvApi param error: sign not allow null") -// return -// } -// startTime := time.Now().UnixNano() -// args := fmt.Sprintf("%v;%v;%v;%v", common.Config.AppId, req.URL.Path, string(data), timestamp) -// h := md5.New() -// io.WriteString(h, args) -// realSign := hex.EncodeToString(h.Sum(nil)) -// if realSign != sign && !common.Config.IsDevMode { -// logger.Logger.Info(req.RemoteAddr, " srvCtrlMain sign error: expect ", realSign, " ; but get ", sign, " raw=", args) -// webApiResponse(rw, map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "Sign error!"}) -// return -// } -// var rep map[string]interface{} -// start := time.Now() -// res := make(chan map[string]interface{}, 1) -// core.CoreObject().SendCommand(&WebApiEvent{req: req, path: req.URL.Path, h: HandlerWrapper(func(event *WebApiEvent, data []byte) bool { -// logger.Logger.Trace("RankSrvApi start transcate") -// tnp := &transact.TransNodeParam{ -// Tt: common.TransType_WebApi_ForRank, -// Ot: transact.TransOwnerType(common.GetSelfSrvType()), -// Oid: common.GetSelfSrvId(), -// AreaID: common.GetSelfAreaId(), -// } -// logger.Info("call info:", common.GetSelfAreaId(), common.GetSelfSrvType(), common.GetSelfSrvId()) -// tNode := transact.DTCModule.StartTrans(tnp, event, transact.DefaultTransactTimeout) //超时时间30秒 -// if tNode != nil { -// tNode.TransEnv.SetField(RANKSRVAPI_TRANSACTE_EVENT, event) -// tNode.Go(core.CoreObject()) -// } -// return true -// }), body: data, rawQuery: req.URL.RawQuery, res: res}, false) -// select { -// case rep = <-res: -// if rep != nil { -// webApiResponse(rw, rep) -// } -// case <-time.After(ApiDefaultTimeout): -// rep = make(map[string]interface{}) -// rep[webapi.RESPONSE_STATE] = webapi.STATE_ERR -// rep[webapi.RESPONSE_ERRMSG] = "proccess timeout!" -// webApiResponse(rw, rep) -// } -// ps := int64(time.Now().Sub(start) / time.Millisecond) -// result, err := json.Marshal(rep) -// if err == nil { -// log := model.NewAPILog(req.URL.Path, req.URL.RawQuery, string(data[:]), req.RemoteAddr, string(result[:]), startTime, ps) -// APILogChannelSington.Write(log) -// } -// return -//} -// -//func init() { -// transact.RegisteHandler(common.TransType_WebApi_ForRank, &transact.TransHanderWrapper{ -// OnExecuteWrapper: transact.OnExecuteWrapper(func(tNode *transact.TransNode, ud interface{}) transact.TransExeResult { -// logger.Logger.Trace("RankSrvApi start TransType_WebApi_ForRank OnExecuteWrapper ") -// tnp := &transact.TransNodeParam{ -// Tt: common.TransType_WebApi_ForRank, -// Ot: transact.TransOwnerType(common.RankServerType), -// Oid: common.GetRankSrvId(), -// AreaID: common.GetSelfAreaId(), -// Tct: transact.TransactCommitPolicy_TwoPhase, -// } -// logger.Infof("params: %+v", tnp) -// if event, ok := ud.(*WebApiEvent); ok { -// userData := &common.M2GWebApiRequest{Path: event.path, RawQuery: event.rawQuery, Body: event.body, ReqIp: event.req.RemoteAddr} -// tNode.StartChildTrans(tnp, userData, transact.DefaultTransactTimeout) -// -// pid := tNode.MyTnp.TId -// cid := tnp.TId -// logger.Logger.Tracef("RankSrvApi start TransType_WebApi_ForRank OnExecuteWrapper tid:%x childid:%x", pid, cid) -// return transact.TransExeResult_Success -// } -// return transact.TransExeResult_Failed -// }), -// OnCommitWrapper: transact.OnCommitWrapper(func(tNode *transact.TransNode) transact.TransExeResult { -// logger.Logger.Trace("RankSrvApi start TransType_WebApi_ForRank OnCommitWrapper") -// event := tNode.TransEnv.GetField(RANKSRVAPI_TRANSACTE_EVENT).(*WebApiEvent) -// resp := tNode.TransEnv.GetField(RANKSRVAPI_TRANSACTE_RESPONSE) -// if userData, ok := resp.(*common.M2GWebApiResponse); ok { -// if len(userData.Body) > 0 { -// m := make(map[string]interface{}) -// err := json.Unmarshal(userData.Body, &m) -// if err == nil { -// event.Response(m) -// return transact.TransExeResult_Success -// } -// } -// } -// event.Response(map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "execute failed!"}) -// return transact.TransExeResult_Success -// }), -// OnRollBackWrapper: transact.OnRollBackWrapper(func(tNode *transact.TransNode) transact.TransExeResult { -// logger.Logger.Trace("RankSrvApi start TransType_WebApi_ForRank OnRollBackWrapper") -// event := tNode.TransEnv.GetField(RANKSRVAPI_TRANSACTE_EVENT).(*WebApiEvent) -// resp := tNode.TransEnv.GetField(RANKSRVAPI_TRANSACTE_RESPONSE) -// if userData, ok := resp.(*common.M2GWebApiResponse); ok { -// if len(userData.Body) > 0 { -// m := make(map[string]interface{}) -// err := json.Unmarshal(userData.Body, &m) -// if err == nil { -// event.Response(m) -// return transact.TransExeResult_Success -// } -// } -// return transact.TransExeResult_Success -// } -// event.Response(map[string]interface{}{webapi.RESPONSE_STATE: webapi.STATE_ERR, webapi.RESPONSE_ERRMSG: "execute failed!"}) -// return transact.TransExeResult_Success -// }), -// OnChildRespWrapper: transact.OnChildRespWrapper(func(tNode *transact.TransNode, hChild transact.TransNodeID, retCode int, ud interface{}) transact.TransExeResult { -// logger.Logger.Tracef("RankSrvApi start TransType_WebApi_ForRank OnChildRespWrapper ret:%v childid:%x", retCode, hChild) -// userData := &common.M2GWebApiResponse{} -// err := netlib.UnmarshalPacketNoPackId(ud.([]byte), userData) -// if err == nil { -// tNode.TransEnv.SetField(RANKSRVAPI_TRANSACTE_RESPONSE, userData) -// } else { -// logger.Logger.Trace("trascate.OnChildRespWrapper err:", err) -// } -// return transact.TransExeResult(retCode) -// }), -// }) //RegisteHandler -// -// admin.MyAdminApp.Route("/api/rank/getConfig", RankSrvApi) -// admin.MyAdminApp.Route("/api/rank/updateConfig", RankSrvApi) -// admin.MyAdminApp.Route("/api/rank/debug/settings", RankSrvApi) -// admin.MyAdminApp.Route("/api/rank/debug/board", RankSrvApi) -// admin.MyAdminApp.Route("/api/rank/reset", RankSrvApi) -// admin.MyAdminApp.Route("/api/rank/syncUser", RankSrvApi) // 同步主库玩家信息 -//} diff --git a/worldsrv/action_game.go b/worldsrv/action_game.go index 5e94a25..250d930 100644 --- a/worldsrv/action_game.go +++ b/worldsrv/action_game.go @@ -142,11 +142,6 @@ func (this *CSEnterRoomHandler) Process(s *netlib.Session, packetid int, data in if scene.IsMatchScene() && p.IsRob { grade := int32(1000) snid := p.SnId - ms := MatchSeasonMgrSington.GetMatchSeason(snid) // 玩家赛季信息 - lv := MatchSeasonRankMgrSington.CreateRobotLv() // - if ms != nil { - lv = ms.Lv - } roleId := int32(2000001) if p.Roles != nil { roleId = p.Roles.ModId @@ -159,12 +154,11 @@ func (this *CSEnterRoomHandler) Process(s *netlib.Session, packetid int, data in randIndex := rand.Intn(len(tm.copyRobotGrades)) grade = tm.copyRobotGrades[randIndex].grade snid = tm.copyRobotGrades[randIndex].copySnid - lv = tm.copyRobotGrades[randIndex].copyLv roleId = tm.copyRobotGrades[randIndex].copyRoleId tm.copyRobotGrades = append(tm.copyRobotGrades[:randIndex], tm.copyRobotGrades[randIndex+1:]...) } } - mc := NewMatchContext(p, tm, grade, snid, lv, roleId, 0) + mc := NewMatchContext(p, tm, grade, snid, 1, roleId, 0) if mc != nil { mc.gaming = true p.matchCtx = mc diff --git a/worldsrv/matchseasonmgr.go b/worldsrv/matchseasonmgr.go deleted file mode 100644 index b8422d0..0000000 --- a/worldsrv/matchseasonmgr.go +++ /dev/null @@ -1,393 +0,0 @@ -package main - -import ( - "github.com/globalsign/mgo/bson" - "mongo.games.com/game/model" - "mongo.games.com/game/proto" - "mongo.games.com/game/protocol/tournament" - "mongo.games.com/game/srvdata" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/goserver/core/module" - "mongo.games.com/goserver/core/task" - "sort" - "strconv" - "time" -) - -var MatchSeasonMgrSington = &MatchSeasonMgr{ - MatchSeasonList: make(map[int32]*MatchSeason), - MatchSeasonId: make(map[string]*MatchSeasonId), -} - -type MatchSeasonMgr struct { - BaseClockSinker - MatchSeasonList map[int32]*MatchSeason // snid:玩家赛季信息 - MatchSeasonId map[string]*MatchSeasonId // platform:平台赛季信息 -} - -// MatchSeason 玩家赛季信息 -type MatchSeason struct { - Id bson.ObjectId `bson:"_id"` - Platform string - SnId int32 - Name string - SeasonId int32 //赛季id - Lv int32 //段位 - LastLv int32 //上赛季段位 - IsAward bool //上赛季是否领奖 - AwardTs int64 //领奖时间 - UpdateTs int64 - dirty bool -} - -// MatchSeasonId 赛季信息 -type MatchSeasonId struct { - Id bson.ObjectId `bson:"_id"` - Platform string - SeasonId int32 //赛季id - StartStamp int64 //开始时间戳 - EndStamp int64 //结束时间戳 - UpdateTs int64 //更新时间戳 -} - -func (this *MatchSeasonMgr) exchangeModel2Cache(mms *model.MatchSeason) *MatchSeason { - if mms == nil { - return nil - } - ms := &MatchSeason{ - Id: mms.Id, - Platform: mms.Platform, - SnId: mms.SnId, - Name: mms.Name, - Lv: mms.Lv, - LastLv: mms.LastLv, - IsAward: mms.IsAward, - AwardTs: mms.AwardTs, - SeasonId: mms.SeasonId, - UpdateTs: mms.UpdateTs, - } - return ms -} - -func (this *MatchSeasonMgr) GetMatchSeason(snid int32) *MatchSeason { - return this.MatchSeasonList[snid] -} - -// GetAllMatchSeason 获取所有玩家赛季信息 -func (this *MatchSeasonMgr) GetAllMatchSeason() map[int32]*MatchSeason { - return this.MatchSeasonList -} - -func (this *MatchSeasonMgr) SetMatchSeason(ms *MatchSeason) { - if ms == nil { - return - } - this.MatchSeasonList[ms.SnId] = ms -} - -func (this *MatchSeasonMgr) DelMatchSeasonCache(snid int32) { - if this.MatchSeasonList[snid] == nil { - return - } - delete(this.MatchSeasonList, snid) -} - -// UpdateMatchSeasonLv 修改玩家段位 -// 通知段位变更 -// 更新排行榜 -func (this *MatchSeasonMgr) UpdateMatchSeasonLv(p *Player, addlv int32, dirty bool) { - logger.Logger.Trace("(this *MatchSeasonMgr) UpdateMatchSeasonLv: SnId: ", p.SnId, " addlv: ", addlv) - if p == nil || p.IsRob { - return - } - platform := p.Platform - if platform == DefaultPlatform { - return - } - ms := this.GetMatchSeason(p.SnId) - if ms != nil { - ms.Lv = ms.Lv + addlv - ms.dirty = true - ms.UpdateTs = time.Now().Unix() - msid := this.GetMatchSeasonId(platform) - if msid != nil { - if addlv != 0 || dirty { //段位有变化或者需要继承 - //通知客户端段位更新 - pack := &tournament.SCTMSeasonInfo{ - Id: msid.SeasonId, - SeasonTimeStamp: []int64{msid.StartStamp, msid.EndStamp}, - Lv: ms.Lv, - LastLv: ms.LastLv, - IsAward: ms.IsAward, - } - proto.SetDefaults(pack) - ok := p.SendToClient(int(tournament.TOURNAMENTID_PACKET_TM_SCTMSeasonInfo), pack) - logger.Logger.Trace("SCTMSeasonInfoHandler: ok: ", ok, pack) - } - //更新排行榜 - logger.Logger.Trace("更新排行榜!!!") - msrs := MatchSeasonRankMgrSington.GetMatchSeasonRank(platform) - if msrs == nil { //排行榜没有数据 去缓存中取 - ams := MatchSeasonMgrSington.GetAllMatchSeason() - mss := []*model.MatchSeason{} - if ams != nil { - for _, season := range ams { - if season.Platform == platform { - mms := &model.MatchSeason{ - Id: season.Id, - Platform: season.Platform, - SnId: season.SnId, - Name: season.Name, - SeasonId: season.SeasonId, - Lv: season.Lv, - LastLv: season.LastLv, - IsAward: season.IsAward, - AwardTs: season.AwardTs, - } - mss = append(mss, mms) - } - } - } - if mss != nil && len(mss) > 0 { - cmsrs := []*MatchSeasonRank{} - sort.Slice(mss, func(i, j int) bool { - return mss[i].Lv > mss[j].Lv - }) - if len(mss) > model.GameParamData.MatchSeasonRankMaxNum { - mss = append(mss[:model.GameParamData.MatchSeasonRankMaxNum]) - } - for i := 0; i < len(mss); i++ { - season := mss[i] - msr := &MatchSeasonRank{ - Id: season.Id, - Platform: season.Platform, - SnId: season.SnId, - Name: season.Name, - Lv: season.Lv, - UpdateTs: season.UpdateTs, - } - cmsrs = append(cmsrs, msr) - } - MatchSeasonRankMgrSington.SetMatchSeasonRank(platform, cmsrs) - } - } - MatchSeasonRankMgrSington.UpdateMatchSeasonRank(p, ms.Lv) - } - } -} - -// MatchSeasonInherit 查询段位继承 -func (this *MatchSeasonMgr) MatchSeasonInherit(lv int32) int32 { - logger.Logger.Trace("(this *MatchSeasonMgr) MatchSeasonInherit: lv: ", lv) - destLv := int32(1) - for _, v := range srvdata.PBDB_GamMatchLVMgr.Datas.GetArr() { - if v.Star != nil && len(v.Star) > 1 { - startStar := v.Star[0] - endStar := v.Star[1] - if lv >= startStar && lv <= endStar { //匹配段位 - destLv = v.Star2 //继承后段位 - } - } - } - return destLv -} - -// UpdateMatchSeasonAward 更新领奖时间 -func (this *MatchSeasonMgr) UpdateMatchSeasonAward(snid int32) { - logger.Logger.Trace("(this *MatchSeasonMgr) UpdateMatchSeasonAward ", snid) - ms := this.GetMatchSeason(snid) - if ms != nil { - ms.IsAward = true - ms.AwardTs = time.Now().Unix() - ms.UpdateTs = time.Now().Unix() - ms.dirty = true - } -} - -// SaveMatchSeasonData 保存玩家段位信息 -// logout 删除缓存 -func (this *MatchSeasonMgr) SaveMatchSeasonData(snid int32, logout bool) { - logger.Logger.Trace("(this *MatchSeasonMgr) SaveMatchSeasonData ", snid) - ms := this.MatchSeasonList[snid] - if ms != nil && ms.dirty { - ms.dirty = false - mms := &model.MatchSeason{ - Id: ms.Id, - Platform: ms.Platform, - SnId: ms.SnId, - Name: ms.Name, - Lv: ms.Lv, - LastLv: ms.LastLv, - IsAward: ms.IsAward, - AwardTs: ms.AwardTs, - SeasonId: ms.SeasonId, - UpdateTs: ms.UpdateTs, - } - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMatchSeason(mms) - }), task.CompleteNotifyWrapper(func(data interface{}, tt task.Task) { - logger.Logger.Info("SaveMatchSeasonData!!!") - if logout { - this.DelMatchSeasonCache(snid) - } - })).StartByFixExecutor("SnId:" + strconv.Itoa(int(snid))) - } -} - -// SaveAllMatchSeasonData 保存所有玩家段位信息 -func (this *MatchSeasonMgr) SaveAllMatchSeasonData() { - for _, msl := range this.MatchSeasonList { - this.SaveMatchSeasonData(msl.SnId, false) - } -} - -// UpdateMatchSeasonId 更新比赛场赛季配置 -// 更新赛季配置 -// 更新在线玩家段位 -// 更新排行榜 -func (this *MatchSeasonMgr) UpdateMatchSeasonId(platform string) { - logger.Logger.Info("(this *MatchSeasonMgr) UpdateMatchSeasonId") - if platform == DefaultPlatform { - return - } - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - ret, err := model.QueryMatchSeasonId(platform) - if err != nil { - return nil - } - return ret - }), task.CompleteNotifyWrapper(func(data interface{}, tt task.Task) { - var ret *model.MatchSeasonId - if data == nil || data.(*model.MatchSeasonId) == nil { - sstamp, estamp := this.getNowMonthStartAndEnd() - ret = model.NewMatchSeasonId(platform, int32(1), sstamp, estamp) //初始化赛季 - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMatchSeasonId(ret) - }), nil).StartByFixExecutor("UpsertMatchSeasonId") - } else { - ret = data.(*model.MatchSeasonId) - } - logger.Logger.Info("UpdateMatchSeasonId!!!", ret) - if ret != nil { - nowStamp := time.Now().Unix() - if nowStamp < ret.StartStamp { - logger.Logger.Error("赛季开始时间错误!!!") - } - if nowStamp >= ret.EndStamp { //新赛季 - logger.Logger.Info("新赛季!!!", ret) - sstamp, estamp := this.getNowMonthStartAndEnd() - ret.SeasonId++ - ret.StartStamp = sstamp - ret.EndStamp = estamp - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpsertMatchSeasonId(ret) - }), nil).StartByFixExecutor("UpsertMatchSeasonId") - //排行榜内的段位继承 - MatchSeasonRankMgrSington.MatchSeasonRankInherit(platform) - //通知平台玩家继承后的段位数据 - players := PlayerMgrSington.playerOfPlatform[platform] - for _, p := range players { - if p != nil && p.IsOnLine() && !p.IsRob { - ms := MatchSeasonMgrSington.GetMatchSeason(p.SnId) - if ms != nil { - if ms.SeasonId < ret.SeasonId { //不同赛季段位继承 - num := ret.SeasonId - ms.SeasonId - finalLv := ms.Lv - for i := 0; i < int(num); i++ { //继承几次 - if i == int(num)-1 { //上个赛季 - ms.LastLv = finalLv - } - finalLv = MatchSeasonMgrSington.MatchSeasonInherit(finalLv) - } - ms.Lv = finalLv - ms.SeasonId = ret.SeasonId - ms.IsAward = false - ms.UpdateTs = time.Now().Unix() - ms.dirty = true - MatchSeasonMgrSington.SetMatchSeason(ms) //更新缓存 - pack := &tournament.SCTMSeasonInfo{ - Id: ret.SeasonId, - SeasonTimeStamp: []int64{ret.StartStamp, ret.EndStamp}, - Lv: ms.Lv, - LastLv: ms.LastLv, - IsAward: ms.IsAward, - } - proto.SetDefaults(pack) - logger.Logger.Trace("SCTMSeasonInfo:", p.SnId, " pack: ", pack) - p.SendToClient(int(tournament.TOURNAMENTID_PACKET_TM_SCTMSeasonInfo), pack) - } - } - } - } - } - this.MatchSeasonId[platform] = &MatchSeasonId{ - Id: ret.Id, - Platform: ret.Platform, - SeasonId: ret.SeasonId, - StartStamp: ret.StartStamp, - EndStamp: ret.EndStamp, - UpdateTs: ret.UpdateTs, - } - } - })).StartByFixExecutor("platform: " + platform) -} - -// GetMatchSeasonId 获取比赛场赛季配置 -func (this *MatchSeasonMgr) GetMatchSeasonId(platform string) *MatchSeasonId { - logger.Logger.Info("(this *MatchSeasonMgr) GetMatchSeasonId", platform) - return this.MatchSeasonId[platform] -} - -// 获取当月初和月末时间戳 -func (this *MatchSeasonMgr) getNowMonthStartAndEnd() (int64, int64) { - now := time.Now() - first := now.Format("2006-01") + "-01" - start, _ := time.ParseInLocation("2006-01-02", first, time.Local) - last := start.AddDate(0, 1, 0).Format("2006-01-02") - end, _ := time.ParseInLocation("2006-01-02", last, time.Local) - return start.Unix(), end.Unix() - 1 -} - -func (this *MatchSeasonMgr) ModuleName() string { - return "MatchSeasonMgr" -} - -func (this *MatchSeasonMgr) Init() { - for _, platform := range PlatformMgrSingleton.GetPlatforms() { - if platform.IdStr == DefaultPlatform { - continue - } - this.UpdateMatchSeasonId(platform.IdStr) - } -} - -func (this *MatchSeasonMgr) Update() { - this.SaveAllMatchSeasonData() -} - -func (this *MatchSeasonMgr) Shutdown() { - this.SaveAllMatchSeasonData() - module.UnregisteModule(this) -} - -func (this *MatchSeasonMgr) InterestClockEvent() int { - //TODO implement me - //panic("implement me") - return 1 << CLOCK_EVENT_MONTH -} - -func (this *MatchSeasonMgr) OnMonthTimer() { - logger.Logger.Info("(this *MatchSeasonMgr) OnMonthTimer") - for _, platform := range PlatformMgrSingleton.GetPlatforms() { - if platform.IdStr == DefaultPlatform { - continue - } - this.UpdateMatchSeasonId(platform.IdStr) - } -} - -func init() { - module.RegisteModule(MatchSeasonMgrSington, time.Minute*1, 0) - ClockMgrSington.RegisteSinker(MatchSeasonMgrSington) -} diff --git a/worldsrv/matchseasonrankmgr.go b/worldsrv/matchseasonrankmgr.go deleted file mode 100644 index d570bf8..0000000 --- a/worldsrv/matchseasonrankmgr.go +++ /dev/null @@ -1,379 +0,0 @@ -package main - -import ( - "github.com/globalsign/mgo/bson" - "math/rand" - "mongo.games.com/game/model" - "mongo.games.com/game/srvdata" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/goserver/core/module" - "mongo.games.com/goserver/core/task" - "sort" - "time" -) - -var MatchSeasonRankMgrSington = &MatchSeasonRankMgr{ - MatchSeasonRank: make(map[string][]*MatchSeasonRank), - MatchSeasonRankDirty: make(map[string]bool), - RobotMatchSeasonRankInit: make(map[string]bool), - RobotMatchSeasonRank: make(map[string][]*MatchSeasonRank), -} - -type MatchSeasonRankMgr struct { - BaseClockSinker - MatchSeasonRank map[string][]*MatchSeasonRank //平台 - MatchSeasonRankDirty map[string]bool - RobotMatchSeasonRankInit map[string]bool - RobotMatchSeasonRank map[string][]*MatchSeasonRank //平台 -} - -type MatchSeasonRank struct { - Id bson.ObjectId `bson:"_id"` - Platform string - SnId int32 - Name string - Lv int32 //段位 - UpdateTs int64 -} - -func (this *MatchSeasonRankMgr) UpdateMatchSeasonRank(p *Player, lv int32) { - logger.Logger.Trace("(this *MatchSeasonRankMgr) UpdateMatchSeasonRank: SnId: ", p.SnId, " lv: ", lv) - platform := p.Platform - msrs := this.GetMatchSeasonRank(platform) - if msrs == nil { - msrs = []*MatchSeasonRank{} - } - have := false - for _, msr := range msrs { - if msr.SnId == p.SnId { - msr.Lv = lv - msr.UpdateTs = time.Now().Unix() - have = true - break - } - } - if !have { - msr := &MatchSeasonRank{ - Id: bson.NewObjectId(), - Platform: platform, - SnId: p.SnId, - Name: p.Name, - Lv: lv, - UpdateTs: time.Now().Unix(), - } - msrs = append(msrs, msr) - } - - sort.Slice(msrs, func(i, j int) bool { - return msrs[i].Lv > msrs[j].Lv - }) - if len(msrs) > model.GameParamData.MatchSeasonRankMaxNum { - if msrs[len(msrs)-1].SnId != p.SnId { //上榜玩家有变化 - this.MatchSeasonRankDirty[platform] = true - } - msrs = append(msrs[:model.GameParamData.MatchSeasonRankMaxNum]) - } else { - this.MatchSeasonRankDirty[platform] = true - } - this.MatchSeasonRank[platform] = msrs -} - -func (this *MatchSeasonRankMgr) GetMatchSeasonRank(platform string) []*MatchSeasonRank { - logger.Logger.Trace("(this *MatchSeasonRankMgr) GetMatchSeasonRank: platform = ", platform) - return this.MatchSeasonRank[platform] -} - -func (this *MatchSeasonRankMgr) SetMatchSeasonRank(platform string, mss []*MatchSeasonRank) { - logger.Logger.Trace("(this *MatchSeasonRankMgr) SetMatchSeasonRank: mss = ", mss) - this.MatchSeasonRank[platform] = mss - this.MatchSeasonRankDirty[platform] = true -} - -// MatchSeasonRankInherit 段位继承 -func (this *MatchSeasonRankMgr) MatchSeasonRankInherit(platform string) { - msr := this.GetMatchSeasonRank(platform) - logger.Logger.Trace("(this *MatchSeasonRankMgr) MatchSeasonRankInherit: msr = ", msr) - if msr == nil { - return - } - for _, rank := range msr { - rank.Lv = MatchSeasonMgrSington.MatchSeasonInherit(rank.Lv) - } - this.SetMatchSeasonRank(platform, msr) -} - -func (this *MatchSeasonRankMgr) InitMatchSeasonRank(platform string) { - logger.Logger.Trace("(this *MatchSeasonRankMgr) InitMatchSeasonRank: ", platform) - if platform == DefaultPlatform { - return - } - if this.MatchSeasonRank[platform] != nil { - return - } - if this.MatchSeasonRank[platform] == nil { - logger.Logger.Trace("(this *MatchSeasonRankMgr) InitMatchSeasonRank: ", this.MatchSeasonRank[platform]) - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - ret, err := model.QueryMatchSeasonRank(platform) - logger.Logger.Trace("(this *MatchSeasonRankMgr) 1 QueryMatchSeasonRank: ", ret) - if err != nil { - return nil - } - return ret - }), task.CompleteNotifyWrapper(func(data interface{}, tt task.Task) { - var ret []*model.MatchSeasonRank - if data == nil || data.([]*model.MatchSeasonRank) == nil { //初始数据去log_matchseason里面取段位前n名 - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - retRank, err := model.QueryMatchSeason(platform) - logger.Logger.Trace("(this *MatchSeasonRankMgr) 1 QueryMatchSeason: ", ret) - if err != nil { - return nil - } - return retRank - }), task.CompleteNotifyWrapper(func(data interface{}, tt task.Task) { - var retRank []*model.MatchSeason - logger.Logger.Trace("(this *MatchSeasonRankMgr) 2 QueryMatchSeason: ", ret) - if data == nil || data.([]*model.MatchSeason) == nil { - ams := MatchSeasonMgrSington.GetAllMatchSeason() - if ams != nil { - for _, season := range ams { - if season.Platform == platform { - mms := &model.MatchSeason{ - Id: season.Id, - Platform: season.Platform, - SnId: season.SnId, - Name: season.Name, - SeasonId: season.SeasonId, - Lv: season.Lv, - LastLv: season.LastLv, - IsAward: season.IsAward, - AwardTs: season.AwardTs, - } - retRank = append(retRank, mms) - } - } - } - } else { - retRank = data.([]*model.MatchSeason) - } - if retRank != nil { - this.MatchSeasonRank[platform] = []*MatchSeasonRank{} - sort.Slice(retRank, func(i, j int) bool { - return retRank[i].Lv > retRank[j].Lv - }) - if len(retRank) > model.GameParamData.MatchSeasonRankMaxNum { - retRank = append(retRank[:model.GameParamData.MatchSeasonRankMaxNum]) - } - for i := 0; i < len(retRank); i++ { - season := retRank[i] - msr := &MatchSeasonRank{ - Id: season.Id, - Platform: season.Platform, - SnId: season.SnId, - Name: season.Name, - Lv: season.Lv, - UpdateTs: season.UpdateTs, - } - this.MatchSeasonRank[platform] = append(this.MatchSeasonRank[platform], msr) - this.MatchSeasonRankDirty[platform] = true - } - logger.Logger.Trace("(this *MatchSeasonRankMgr) 3 QueryMatchSeason: ", this.MatchSeasonRank[platform]) - } - })).StartByFixExecutor("platform:" + platform) - } else { - ret = data.([]*model.MatchSeasonRank) - this.MatchSeasonRank[platform] = []*MatchSeasonRank{} - for _, rank := range ret { - msr := &MatchSeasonRank{ - Id: rank.Id, - Platform: rank.Platform, - SnId: rank.SnId, - Name: rank.Name, - Lv: rank.Lv, - UpdateTs: rank.UpdateTs, - } - this.MatchSeasonRank[platform] = append(this.MatchSeasonRank[platform], msr) - } - logger.Logger.Trace("(this *MatchSeasonRankMgr) 3 QueryMatchSeasonRank: ", this.MatchSeasonRank[platform]) - } - })).StartByFixExecutor("platform:" + platform) - } -} - -// SaveMatchSeasonRank 保存排行榜 -func (this *MatchSeasonRankMgr) SaveMatchSeasonRank(platform string) { - logger.Logger.Trace("(this *MatchSeasonRankMgr) SaveMatchSeasonRank: ", platform) - msrp := this.MatchSeasonRank[platform] - if msrp != nil && this.MatchSeasonRankDirty[platform] { - this.MatchSeasonRankDirty[platform] = false - dirtyMsrs := []*model.MatchSeasonRank{} - for _, rank := range msrp { - msr := &model.MatchSeasonRank{ - Id: rank.Id, - Platform: rank.Platform, - SnId: rank.SnId, - Name: rank.Name, - Lv: rank.Lv, - UpdateTs: rank.UpdateTs, - } - dirtyMsrs = append(dirtyMsrs, msr) - } - if dirtyMsrs != nil && len(dirtyMsrs) > 0 { - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - model.UpsertMatchSeasonRank(platform, dirtyMsrs) - return nil - }), task.CompleteNotifyWrapper(func(data interface{}, tt task.Task) { - })).StartByFixExecutor("platform:" + platform) - } - } -} - -func (this *MatchSeasonRankMgr) SaveAllMatchSeasonRank() { - for _, platform := range PlatformMgrSingleton.GetPlatforms() { - if platform.IdStr == DefaultPlatform { - continue - } - this.SaveMatchSeasonRank(platform.IdStr) - } -} - -func (this *MatchSeasonRankMgr) CreateRobotLv() int32 { - Lv := int32(1) - now := time.Now() - first := now.Format("2006-01") + "-01" - start, _ := time.ParseInLocation("2006-01-02", first, time.Local) - diffUnix := now.Unix() - start.Unix() - diffDay := diffUnix/int64(24*60*60) + 1 - data := srvdata.PBDB_MatchRankMgr.GetData(int32(diffDay)) - if data != nil && data.RankStar != nil && len(data.RankStar) > 0 { - diff := data.RankStar[1] - data.RankStar[0] - min := data.RankStar[0] - if data.RankStar[0] > data.RankStar[1] { - diff = data.RankStar[0] - data.RankStar[1] - min = data.RankStar[1] - } - Lv = rand.Int31n(diff) + min - } - return Lv -} - -func (this *MatchSeasonRankMgr) CreateRobotMatchSeasonRank(platform string) { - if this.RobotMatchSeasonRankInit[platform] { - return - } - this.RobotMatchSeasonRank[platform] = []*MatchSeasonRank{} - for _, player := range PlayerMgrSington.snidMap { - if player != nil && player.IsRob { - msr := &MatchSeasonRank{ - Platform: platform, - SnId: player.SnId, - Name: player.Name, - Lv: this.CreateRobotLv(), - } - this.RobotMatchSeasonRank[platform] = append(this.RobotMatchSeasonRank[platform], msr) - } - if len(this.RobotMatchSeasonRank[platform]) >= model.GameParamData.MatchSeasonRankMaxNum { - break - } - } - if len(this.RobotMatchSeasonRank[platform]) < model.GameParamData.MatchSeasonRankMaxNum { - localSnids := []int32{102917700, 61096000, 21058800, 47291500, 58562600, 22127000, 80639700, 49475400, 60569500, 54746600, 46797900, 88659800, 61118200, 68773200, 92010700, 13305900, 68143500, 86379100, 76177100, 95050900, 23954400, 52524000, 63618100, 31808400, 26929400, 108083700, 50751500, 92179900, 60327700, 69582700, 80156500, 30808000, 53806700, 53235700, 50049200, 30465400, 76672700, 69638500, 12351800, 48705200, 98920500, 32158900, 33519000, 42915300, 30811200, 77037600, 65779800, 22148100, 59819100, 46374600} - pool := srvdata.PBDB_NameMgr.Datas.GetArr() - cnt := int32(len(pool)) - for _, snid := range localSnids { - msr := &MatchSeasonRank{ - Platform: platform, - SnId: snid, - Name: "Guest", - Lv: this.CreateRobotLv(), - } - if cnt > 0 { - msr.Name = pool[rand.Int31n(cnt)].GetName() - } - this.RobotMatchSeasonRank[platform] = append(this.RobotMatchSeasonRank[platform], msr) - if len(this.RobotMatchSeasonRank[platform]) >= model.GameParamData.MatchSeasonRankMaxNum { - break - } - } - } - this.RobotMatchSeasonRankInit[platform] = true -} - -func (this *MatchSeasonRankMgr) GetRobotMatchSeasonRank(platform string) []*MatchSeasonRank { - logger.Logger.Trace("GetRobotMatchSeasonRank: ", platform) - if !this.RobotMatchSeasonRankInit[platform] { - this.CreateRobotMatchSeasonRank(platform) - } - if this.RobotMatchSeasonRank == nil || this.RobotMatchSeasonRank[platform] == nil || len(this.RobotMatchSeasonRank[platform]) < model.GameParamData.MatchSeasonRankMaxNum { - this.CreateRobotMatchSeasonRank(platform) - } - return this.RobotMatchSeasonRank[platform] -} - -func (this *MatchSeasonRankMgr) UpdateRobotMatchSeasonRank(platform string) { - logger.Logger.Trace("UpdateRobotMatchSeasonRank: ", platform) - rmsr := this.GetRobotMatchSeasonRank(platform) - if rmsr != nil { - for _, rank := range rmsr { - diff := rand.Int31n(7) - 3 - rank.Lv += diff - } - } -} - -func (this *MatchSeasonRankMgr) ModuleName() string { - return "MatchSeasonRankMgr" -} - -func (this *MatchSeasonRankMgr) Init() { - for _, platform := range PlatformMgrSingleton.GetPlatforms() { - if platform.IdStr == DefaultPlatform { - continue - } - this.InitMatchSeasonRank(platform.IdStr) - this.RobotMatchSeasonRankInit[platform.IdStr] = false - } -} - -func (this *MatchSeasonRankMgr) Update() { - this.SaveAllMatchSeasonRank() -} - -func (this *MatchSeasonRankMgr) Shutdown() { - this.SaveAllMatchSeasonRank() - module.UnregisteModule(this) -} - -func (this *MatchSeasonRankMgr) InterestClockEvent() int { - //TODO implement me - //panic("implement me") - return 1< thr.NextCoin { - thr.Coin = thr.NextCoin - } else { - thr.NextCoin = thr.Coin - } - } - } - } - } - - this.SaveAll(false) -} - -func (this *ThirdPlatformMgr) SaveAll(bImm bool) { - for _, p := range this.Platforms { - if p != nil && p.dirty { - pCopy := p.Clone() - if pCopy != nil { - if bImm { - err := model.UpdateThirdPlatform(pCopy.PlatformOfThirdPlatform) - if err != nil { - logger.Logger.Warnf("UpdateThirdPlatform err:%v", err) - } else { - p.dirty = false - } - } else { - p.dirty = false - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - return model.UpdateThirdPlatform(pCopy.PlatformOfThirdPlatform) - }), nil, "UpdateThirdPlatform").StartByFixExecutor("ThirdPlatform") - } - } - } - } -} - -func init() { - //module.RegisteModule(ThirdPlatformMgrSington, time.Minute*5, 0) - //ClockMgrSington.RegisteSinker(ThirdPlatformMgrSington) -} diff --git a/worldsrv/tmmatch.go b/worldsrv/tmmatch.go index ab745a0..0676ae0 100644 --- a/worldsrv/tmmatch.go +++ b/worldsrv/tmmatch.go @@ -142,12 +142,7 @@ func (tm *TmMatch) CreateRobotGrades(round int) { } if player != nil && player.IsRob { snids = append(snids, player.SnId) - ms := MatchSeasonMgrSington.GetMatchSeason(player.SnId) - lv := MatchSeasonRankMgrSington.CreateRobotLv() - if ms != nil { - lv = ms.Lv - } - lvs = append(lvs, lv) + lvs = append(lvs, 1) roleId := int32(2000001) if player.Roles != nil && player.Roles.ModId != 0 { roleId = player.Roles.ModId @@ -167,8 +162,7 @@ func (tm *TmMatch) CreateRobotGrades(round int) { tmpSnid = rand.Int31n(max-min) + min } snids = append(snids, tmpSnid) - lv := MatchSeasonRankMgrSington.CreateRobotLv() - lvs = append(lvs, lv) + lvs = append(lvs, 1) roleIds = append(roleIds, int32(2000001)) } } diff --git a/worldsrv/tournament.go b/worldsrv/tournament.go index 4b51d27..c3dddeb 100644 --- a/worldsrv/tournament.go +++ b/worldsrv/tournament.go @@ -728,17 +728,12 @@ func (this *Tournament) GetTm(sortId int64) *TmMatch { // CreatePlayerMatchContext 创建玩家比赛信息 func (this *Tournament) CreatePlayerMatchContext(p *Player, m *TmMatch, seq int) *PlayerMatchContext { - ms := MatchSeasonMgrSington.GetMatchSeason(p.SnId) - var lv int32 - if ms != nil { - lv = ms.Lv - } roleId := int32(2000001) if p.Roles != nil { roleId = p.Roles.ModId } - mc := NewMatchContext(p, m, 1000, p.SnId, lv, roleId, seq) + mc := NewMatchContext(p, m, 1000, p.SnId, 1, roleId, seq) if mc != nil { if this.players[m.SortId] == nil { this.players[m.SortId] = make(map[int32]*PlayerMatchContext) diff --git a/worldsrv/trascate_webapi.go b/worldsrv/trascate_webapi.go index 58dad72..eb8979d 100644 --- a/worldsrv/trascate_webapi.go +++ b/worldsrv/trascate_webapi.go @@ -26,7 +26,6 @@ import ( "mongo.games.com/game/model" "mongo.games.com/game/proto" "mongo.games.com/game/protocol/bag" - "mongo.games.com/game/protocol/gamehall" loginproto "mongo.games.com/game/protocol/login" playerproto "mongo.games.com/game/protocol/player" "mongo.games.com/game/protocol/qpapi" @@ -34,7 +33,6 @@ import ( "mongo.games.com/game/protocol/telegramapi" webapiproto "mongo.games.com/game/protocol/webapi" "mongo.games.com/game/srvdata" - "mongo.games.com/game/webapi" ) const ( @@ -4024,202 +4022,7 @@ func init() { return common.ResponseTag_TransactYield, pack } })) - //更新用户三方的金币到游戏服务器 - WebAPIHandlerMgrSingleton.RegisteWebAPIHandler("/api/thd/UpdatePlayerCoin", WebAPIHandlerWrapper( - func(tNode *transact.TransNode, params []byte) (int, proto.Message) { - msg := &webapiproto.ASThdUpdatePlayerCoin{} - pack := &webapiproto.SAThdUpdatePlayerCoin{} - err := proto.Unmarshal(params, msg) - if err != nil { - pack.Tag = webapiproto.TagCode_FAILED - pack.Msg = "数据序列化失败" + err.Error() - return common.ResponseTag_ParamError, pack - } - member_snid := msg.Snid - BaseGameId := int(msg.BaseGameID) - platform := msg.Platform - plt := webapi.ThridPlatformMgrSington.FindPlatformByPlatformBaseGameId(BaseGameId) - if plt == nil { - pack.Tag = webapiproto.TagCode_FAILED - pack.Msg = "三方不存在" - return common.ResponseTag_ParamError, pack - } - - p := PlayerMgrSington.GetPlayerBySnId(int32(member_snid)) - - if p != nil { - if len(platform) > 0 && p.Platform != platform { - pack.Tag = webapiproto.TagCode_FAILED - pack.Msg = "Platform is err." - return common.ResponseTag_ParamError, pack - } - //请求太快,不做处理,给API减轻一些压力 - if p.thridBalanceRefreshReqing { - pack.Tag = webapiproto.TagCode_FAILED - pack.Msg = "刷新频率太高,稍等" - return common.ResponseTag_ParamError, pack - } - - p.thridBalanceRefreshReqing = true - gainway := common.GainWay_Transfer_Thrid2System - //isSucces := true - timeout := false - timeStamp := time.Now().UnixNano() - noBaseGameId := false - pfConfig := PlatformMgrSingleton.GetPlatform(p.Platform) - if pfConfig != nil { - if pfConfig.ThirdGameMerchant == nil || pfConfig.ThirdGameMerchant[int32(plt.GetPlatformBase().BaseGameID)] == 0 { - // noBaseGameId = true - } - } - task.New(nil, task.CallableWrapper(func(o *basic.Object) interface{} { - var err error - var coinLog *model.PayCoinLog - var coinlogex *model.CoinLog - //var apiHasTransfer = false - remark := "手动刷新" + plt.GetPlatformBase().Name + "转出到系统" - amount := int64(0) - - if pfConfig == nil { - return int64(-2) - } - - if noBaseGameId { - return int64(-3) - } - - oper := plt.GetPlatformBase().Name + "2System" - err, amount = plt.ReqLeaveGame(p.SnId, fmt.Sprintf("%v", BaseGameId), p.Ip, p.Platform, p.Channel) - if err != nil { - goto Rollback - } - if amount <= 0 { - return int64(-4) - } - if plt.GetPlatformBase().TransferInteger { - amount = (amount / 100) * 100 - if amount <= 0 { - return int64(-4) - } - } - - //apiHasTransfer = true - coinLog = model.NewPayCoinLog(time.Now().UnixNano(), int32(p.SnId), amount, int32(gainway), oper, model.PayCoinLogType_Coin, 0) - - // err = model.InsertPayCoinLogs(coinLog) - err = model.InsertPayCoinLogs(p.Platform, coinLog) - if err != nil { - logger.Logger.Tracef("player snid=%v at %v model.InsertPayCoinLogs() err: %v", p.SnId, plt.GetPlatformBase().Name, err) - goto Rollback - } - timeStamp = coinLog.TimeStamp - coinlogex = model.NewCoinLogEx(&model.CoinLogParam{ - Platform: p.Platform, - SnID: p.SnId, - ChangeType: common.BillTypeCoin, - ChangeNum: amount, - RemainNum: p.Coin + amount, - Add: 0, - LogType: int32(gainway), - GameID: 0, - GameFreeID: 0, - BaseCoin: 0, - Operator: oper, - Remark: remark, - }) - err = model.InsertCoinLog(coinlogex) - if err != nil { - logger.Logger.Tracef("player snid=%v at %v model.InsertCoinLogs() err: %v", p.SnId, plt.GetPlatformBase().Name, err) - goto Rollback - } - return amount - Rollback: - if coinLog != nil { - - model.RemovePayCoinLog(p.Platform, coinLog.LogId) - } - if coinlogex != nil { - model.RemoveCoinLogOne(coinlogex.Platform, coinlogex.LogId) - } - if timeout { - logger.Logger.Errorf("web player snid=%v CSThridBalanceRefreshHandler transfer %v to %v timeout!", p.SnId, -amount, plt.GetPlatformBase().Name) - return int64(-1) - } - //if apiHasTransfer { - // err, timeout = plt.ReqTransfer(p.SnId, thirdBalance, strconv.FormatInt(time.Now().UnixNano(), 10), p.Platform, p.Channel) - // if timeout { - // logger.Logger.Errorf("web player snid=%v CSThridBalanceRefreshHandler transfer rollback %v to %v timeout!", p.SnId, thirdBalance, plt.GetPlatformBase().Name) - // } - //} - return int64(-1) - }), task.CompleteNotifyWrapper(func(data interface{}, tt task.Task) { - amount := data.(int64) - statePack := &gamehall.SCThridGameBalanceUpdateState{} - if amount < 0 { - pack.Tag = webapiproto.TagCode_FAILED - // return common.ResponseTag_ParamError, pack - pack.Msg = "刷新金币失败" - if amount == -3 { - pack.Msg = "三方关闭中" - } - statePack.OpRetCode = gamehall.OpResultCode_Game_OPRC_Error_Game - p.thridBalanceReqIsSucces = false - //isSucces = false - logger.Logger.Tracef("player snid=%v at platform=%v CSThridBalanceRefreshHandler third->system transfer fail", p.SnId, plt.GetPlatformBase().Name) - } else if amount > 0 { - pack.Tag = webapiproto.TagCode_SUCCESS - pack.Msg = "刷新金币成功" - statePack.OpRetCode = gamehall.OpResultCode_Game_OPRC_Sucess_Game - p.thridBalanceReqIsSucces = true - - p.Coin += amount - p.SetPayTs(timeStamp) - ThirdPlatformMgrSington.AddThirdPlatformCoin(p.Platform, plt.GetPlatformBase().Tag, amount) - p.dirty = true - logger.Logger.Tracef("player snid=%v at platform=%v CSThridBalanceRefreshHandler third->system transfer succes", p.SnId, plt.GetPlatformBase().Name) - //if !model.GameParamData.CloseOftenSavePlayerData { - // p.Time2Save() - //} - } - - p.diffData.Coin = -1 - p.SendDiffData() - //statePack := &protocol.SCThridGameBalanceUpdateState{} - //pack := &protocol.SCThridGameBalanceUpdate{} - //if isSucces { - // pack.OpRetCode = protocol.OpResultCode_OPRC_Sucess.Enum() - // p.thridBalanceReqIsSucces = true - // statePack.OpRetCode = protocol.OpResultCode_OPRC_Sucess.Enum() - //} - - p.SendToClient(int(gamehall.GameHallPacketID_PACKET_SC_THRIDGAMEBALANCEUPDATESTATE), statePack) - p.dirty = true - - //pack.Coin = proto.Int64(p.Coin) - //p.SendToClient(int(protocol.MmoPacketID_PACKET_SC_THRIDGAMEBALANCEUPDATE), pack) - if p.thrscene != 0 { - p.thrscene = 0 - p.scene = nil - } - p.thridBalanceRefreshReqing = false - logger.Logger.Tracef("SendToClient() player snid=%v at CSThridBalanceRefreshHandler() pack:%v", p.SnId, pack.String()) - - //dataResp := &common.M2GWebApiResponse{} - //dataResp.Body, _ = resp.Marshal() - tNode.TransRep.RetFiels = pack - tNode.Resume() - - }), "ThrUpdatePlayerCoin").Start() - - } else { - pack.Tag = webapiproto.TagCode_FAILED - pack.Msg = "暂不支持用户不在线刷新" - return common.ResponseTag_ParamError, pack - } - return common.ResponseTag_TransactYield, pack - })) - //支付回调 WebAPIHandlerMgrSingleton.RegisteWebAPIHandler("/api/pay/CallbackPayment", WebAPIHandlerWrapper( func(tNode *transact.TransNode, params []byte) (int, proto.Message) { msg := &webapiproto.ASCallbackPayment{} From c8be1b93204e362712e8fc9c9ce6d7ac2875f378 Mon Sep 17 00:00:00 2001 From: sk <123456@qq.com> Date: Wed, 22 May 2024 14:29:32 +0800 Subject: [PATCH 2/2] =?UTF-8?q?etcd,rabbitmq=E9=98=B2=E6=AD=A2=E5=9C=A8ini?= =?UTF-8?q?t=E6=96=B9=E6=B3=95=E9=9A=90=E5=BC=8F=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/action.go | 4 - common/log.go | 7 +- dbproxy/main.go | 67 ++++----- dbproxy/mq/c_apilog.go | 12 +- dbproxy/mq/c_clientlog.go | 56 ------- dbproxy/mq/c_coingivelog.go | 12 +- dbproxy/mq/c_coinlog.go | 14 +- dbproxy/mq/c_friendrecordlog.go | 12 +- dbproxy/mq/c_gamegamedetailedlog.go | 14 +- dbproxy/mq/c_gameplayerlistlog.go | 12 +- dbproxy/mq/c_invite.go | 8 +- dbproxy/mq/c_itemlog.go | 12 +- dbproxy/mq/c_jackpotlog.go | 31 ---- dbproxy/mq/c_loginlog.go | 12 +- dbproxy/mq/c_onlinelog.go | 6 - dbproxy/mq/c_rank.go | 26 +--- dbproxy/mq/c_scenecoinlog.go | 12 +- dbproxy/mq/c_welfarelog.go | 11 +- dbproxy/mq/publisher.go | 33 ---- etcd/client.go | 17 +-- etcd/init.go | 17 --- etcd/manager.go | 122 ++++++++------- gamesrv/action/action_server.go | 2 +- gamesrv/base/init.go | 45 ------ gamesrv/base/logchannel.go | 5 +- gamesrv/base/serverstate.go | 8 - gamesrv/base/srvdatamgrex.go | 10 -- gamesrv/main.go | 34 ++++- gatesrv/logchannel.go | 32 +--- gatesrv/main.go | 23 ++- mgrsrv/api/logchannel.go | 34 +---- mgrsrv/main.go | 34 ++--- mq/consumer.go | 189 ++++++++++++----------- mq/publisher.go | 226 +++++++++++++++++----------- ranksrv/com/register.go | 8 +- ranksrv/init.go | 35 ----- ranksrv/main.go | 31 +++- robot/base/init.go | 19 +-- robot/main.go | 21 ++- worldsrv/action_sign.go | 90 ----------- worldsrv/actsignmgr.go | 4 - worldsrv/blacklistmgr.go | 4 - worldsrv/gamestate.go | 5 - worldsrv/horseracelamp.go | 5 - worldsrv/init.go | 207 ------------------------- worldsrv/logchannel.go | 4 +- worldsrv/main.go | 51 ++++++- worldsrv/mq_coinlog.go | 2 - worldsrv/msgmgr.go | 5 +- worldsrv/platformgamegroup.go | 5 +- worldsrv/player.go | 9 -- 51 files changed, 551 insertions(+), 1113 deletions(-) delete mode 100644 dbproxy/mq/c_clientlog.go delete mode 100644 dbproxy/mq/c_jackpotlog.go delete mode 100644 dbproxy/mq/publisher.go delete mode 100644 etcd/init.go delete mode 100644 gamesrv/base/init.go delete mode 100644 ranksrv/init.go delete mode 100644 worldsrv/action_sign.go delete mode 100644 worldsrv/init.go diff --git a/common/action.go b/common/action.go index f72be8d..30234bb 100644 --- a/common/action.go +++ b/common/action.go @@ -19,10 +19,6 @@ var ActionMgrSington = &ActionMgr{ pool: make(map[int]ActionBase), } -func init() { - -} - type ActionMgr struct { pool map[int]ActionBase } diff --git a/common/log.go b/common/log.go index dbb8522..49a77b0 100644 --- a/common/log.go +++ b/common/log.go @@ -5,13 +5,12 @@ import ( "path/filepath" "github.com/howeyc/fsnotify" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" ) -var LastModifyConfig int64 - func init() { core.RegisteHook(core.HOOK_BEFORE_START, func() error { var err error @@ -62,7 +61,7 @@ type loggerParamModifiedCommand struct { } func (lmc *loggerParamModifiedCommand) Done(o *basic.Object) error { - logger.Logger.Info("===reload ", lmc.fileName) + logger.Logger.Info("reload logger.xml:", lmc.fileName) data, err := os.ReadFile(lmc.fileName) if err != nil { return err @@ -70,7 +69,7 @@ func (lmc *loggerParamModifiedCommand) Done(o *basic.Object) error { if len(data) != 0 { err = logger.Reload(lmc.fileName) if err != nil { - logger.Logger.Warn("===reload ", lmc.fileName, err) + logger.Logger.Warnf("reload logger.xml %v err: %v", lmc.fileName, err) } } return err diff --git a/dbproxy/main.go b/dbproxy/main.go index 84c8a5c..5b84967 100644 --- a/dbproxy/main.go +++ b/dbproxy/main.go @@ -5,55 +5,48 @@ import ( "net" "net/http" "net/rpc" + "time" "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" - _ "mongo.games.com/game/dbproxy/mq" - "mongo.games.com/game/common" "mongo.games.com/game/dbproxy/svc" + "mongo.games.com/game/etcd" "mongo.games.com/game/model" "mongo.games.com/game/mq" ) -var rabbitMqConsumer *mq.RabbitMQConsumer - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - model.InitGameParam() - rabbitMqConsumer = mq.NewRabbitMQConsumer(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}) - if rabbitMqConsumer != nil { - rabbitMqConsumer.Start() - } - - //尝试初始化 - svc.GetOnePlayerIdFromBucket() - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - if rabbitMqConsumer != nil { - rabbitMqConsumer.Stop() - } - model.ShutdownRPClient() - return nil - }) -} - func main() { + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - rpc.HandleHTTP() // 采用http协议作为rpc载体 - lis, err := net.Listen(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr")) - if err != nil { - log.Fatalln("fatal error: ", err) - } - go http.Serve(lis, nil) - - waitor := module.Start() - waitor.Wait("main()") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartConsumer(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + // 尝试初始化玩家id + svc.GetOnePlayerIdFromBucket() + // rpc 服务 + rpc.HandleHTTP() // 采用http协议作为rpc载体 + lis, err := net.Listen(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr")) + if err != nil { + log.Fatalln("rpc start fatal error: ", err) + } + go http.Serve(lis, nil) + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + etcd.Close() + mq.StopConsumer() + mq.StopPublisher() + return nil + }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/dbproxy/mq/c_apilog.go b/dbproxy/mq/c_apilog.go index 2f77882..95450de 100644 --- a/dbproxy/mq/c_apilog.go +++ b/dbproxy/mq/c_apilog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.APILog diff --git a/dbproxy/mq/c_clientlog.go b/dbproxy/mq/c_clientlog.go deleted file mode 100644 index 4df5009..0000000 --- a/dbproxy/mq/c_clientlog.go +++ /dev/null @@ -1,56 +0,0 @@ -package mq - -func init() { - //mq.RegisteSubscriber(model.ClientLogCollName, func(e broker.Event) (err error) { - // msg := e.Message() - // if msg != nil { - // defer func() { - // if err != nil { - // mq.BackUp(e, err) - // } - // - // e.Ack() - // - // recover() - // }() - // - // var log model.ClientLog - // err = json.Unmarshal(msg.Body, &log) - // if err != nil { - // logger.Logger.Errorf("[mq] %s %v", model.ClientLogCollName, err) - // return - // } - // - // logger.Logger.Tracef("[mq] %s %v", model.ClientLogCollName, string(msg.Body)) - // - // data := map[string]interface{}{} - // err = json.Unmarshal([]byte(log.Data), &data) - // if err != nil { - // logger.Logger.Errorf("[mq] %s %v", model.ClientLogCollName, err) - // return - // } - // - // // 获取平台id - // platform := log.Platform - // if log.Platform == "" { - // id, ok := data["platform"] - // if ok { - // platform = string(id.([]byte)) - // } - // } - // - // data["ts"] = log.Ts - // if log.Snid > 0 { - // data["snid"] = log.Snid - // } - // - // c := svc.ClientLogStartCollection(platform) - // if c != nil { - // err = c.Insert(data) - // } - // - // return - // } - // return nil - //}, broker.Queue(model.ClientLogCollName), broker.DisableAutoAck(), rabbitmq.DurableQueue()) -} diff --git a/dbproxy/mq/c_coingivelog.go b/dbproxy/mq/c_coingivelog.go index fb427ac..6dcd606 100644 --- a/dbproxy/mq/c_coingivelog.go +++ b/dbproxy/mq/c_coingivelog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.CoinGiveLog diff --git a/dbproxy/mq/c_coinlog.go b/dbproxy/mq/c_coinlog.go index a0948cf..0b77dde 100644 --- a/dbproxy/mq/c_coinlog.go +++ b/dbproxy/mq/c_coinlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.CoinLog @@ -30,7 +26,7 @@ func init() { } if log.Count == 0 { //玩家冲账探针 - RabbitMQPublisher.Send(model.TopicProbeCoinLogAck, log) + mq.Send(model.TopicProbeCoinLogAck, log) } else { c := svc.CoinLogsCollection(log.Platform) if c != nil { diff --git a/dbproxy/mq/c_friendrecordlog.go b/dbproxy/mq/c_friendrecordlog.go index 646521b..862b78b 100644 --- a/dbproxy/mq/c_friendrecordlog.go +++ b/dbproxy/mq/c_friendrecordlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.FriendRecord diff --git a/dbproxy/mq/c_gamegamedetailedlog.go b/dbproxy/mq/c_gamegamedetailedlog.go index 2f4206a..c3cdfac 100644 --- a/dbproxy/mq/c_gamegamedetailedlog.go +++ b/dbproxy/mq/c_gamegamedetailedlog.go @@ -2,12 +2,14 @@ package mq import ( "encoding/json" - "mongo.games.com/game/dbproxy/svc" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" + "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/dbproxy/svc" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) func init() { @@ -15,13 +17,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.GameDetailedLog diff --git a/dbproxy/mq/c_gameplayerlistlog.go b/dbproxy/mq/c_gameplayerlistlog.go index 63da3cd..bf998fc 100644 --- a/dbproxy/mq/c_gameplayerlistlog.go +++ b/dbproxy/mq/c_gameplayerlistlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.GamePlayerListLog diff --git a/dbproxy/mq/c_invite.go b/dbproxy/mq/c_invite.go index f268d8d..17254c2 100644 --- a/dbproxy/mq/c_invite.go +++ b/dbproxy/mq/c_invite.go @@ -22,13 +22,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.BindInvite @@ -59,7 +53,7 @@ func init() { InviteNumCache.Put(name, n, int64(time.Hour.Seconds())) // 更新绑定数量 - RabbitMQPublisher.Send(model.AckBindNum, &model.BindNum{ + mq.Send(model.AckBindNum, &model.BindNum{ SnId: log.InviteSnId, Num: n, }) diff --git a/dbproxy/mq/c_itemlog.go b/dbproxy/mq/c_itemlog.go index fd47133..d241234 100644 --- a/dbproxy/mq/c_itemlog.go +++ b/dbproxy/mq/c_itemlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.ItemLog diff --git a/dbproxy/mq/c_jackpotlog.go b/dbproxy/mq/c_jackpotlog.go deleted file mode 100644 index 577f1f3..0000000 --- a/dbproxy/mq/c_jackpotlog.go +++ /dev/null @@ -1,31 +0,0 @@ -package mq - -//func init() { -// mq.RegisteSubscriber(model.JackPotLogCollName, func(e broker.Event) (err error) { -// msg := e.Message() -// if msg != nil { -// defer func() { -// if err != nil { -// mq.BackUp(e, err) -// } -// -// e.Ack() -// -// recover() -// }() -// -// var log model.JackPotLog -// err = json.Unmarshal(msg.Body, &log) -// if err != nil { -// return -// } -// -// c := svc.JackPotLogsCollection(log.Platform) -// if c != nil { -// _, err = c.Upsert(bson.M{"_id": log.LogId}, log) -// } -// return -// } -// return nil -// }, broker.Queue(model.JackPotLogCollName), broker.DisableAutoAck(), rabbitmq.DurableQueue()) -//} diff --git a/dbproxy/mq/c_loginlog.go b/dbproxy/mq/c_loginlog.go index 784d3c9..2d88baa 100644 --- a/dbproxy/mq/c_loginlog.go +++ b/dbproxy/mq/c_loginlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.LoginLog diff --git a/dbproxy/mq/c_onlinelog.go b/dbproxy/mq/c_onlinelog.go index d8d5525..3a813e4 100644 --- a/dbproxy/mq/c_onlinelog.go +++ b/dbproxy/mq/c_onlinelog.go @@ -16,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.OnlineLog diff --git a/dbproxy/mq/c_rank.go b/dbproxy/mq/c_rank.go index c9fe997..61abff3 100644 --- a/dbproxy/mq/c_rank.go +++ b/dbproxy/mq/c_rank.go @@ -2,14 +2,16 @@ package mq import ( "encoding/json" - "mongo.games.com/game/dbproxy/svc" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/dbproxy/svc" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) func init() { @@ -18,13 +20,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.PlayerRankScore @@ -53,13 +49,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.RankPlayerCoin @@ -88,13 +78,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.PlayerLevelInfo diff --git a/dbproxy/mq/c_scenecoinlog.go b/dbproxy/mq/c_scenecoinlog.go index 2d9f444..bffa82d 100644 --- a/dbproxy/mq/c_scenecoinlog.go +++ b/dbproxy/mq/c_scenecoinlog.go @@ -2,11 +2,13 @@ package mq import ( "encoding/json" + + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -14,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.SceneCoinLog diff --git a/dbproxy/mq/c_welfarelog.go b/dbproxy/mq/c_welfarelog.go index 3bc4bb2..e6aa615 100644 --- a/dbproxy/mq/c_welfarelog.go +++ b/dbproxy/mq/c_welfarelog.go @@ -3,11 +3,12 @@ package mq import ( "encoding/json" + "mongo.games.com/goserver/core/broker" + "mongo.games.com/goserver/core/broker/rabbitmq" + "mongo.games.com/game/dbproxy/svc" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker" - "mongo.games.com/goserver/core/broker/rabbitmq" ) func init() { @@ -15,13 +16,7 @@ func init() { msg := e.Message() if msg != nil { defer func() { - if err != nil { - mq.BackUp(e, err) - } - e.Ack() - - recover() }() var log model.WelfareLog diff --git a/dbproxy/mq/publisher.go b/dbproxy/mq/publisher.go deleted file mode 100644 index c48c666..0000000 --- a/dbproxy/mq/publisher.go +++ /dev/null @@ -1,33 +0,0 @@ -package mq - -import ( - "mongo.games.com/game/common" - "mongo.games.com/game/mq" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" -) - -var RabbitMQPublisher *mq.RabbitMQPublisher - -func init() { - ////首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - return nil - }) - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - - return nil - }) -} diff --git a/etcd/client.go b/etcd/client.go index c4ba837..d6b0a4b 100644 --- a/etcd/client.go +++ b/etcd/client.go @@ -5,11 +5,8 @@ import ( "time" "go.etcd.io/etcd/client/v3" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/basic" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/game/model" + "mongo.games.com/goserver/core/logger" ) /* @@ -232,22 +229,12 @@ func (c *Client) GoWatch(ctx context.Context, revision int64, prefix string, f f logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, resp.Err()) continue } - if !model.GameParamData.UseEtcd { - continue - } if len(resp.Events) == 0 { continue } logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, resp.Header) - obj := core.CoreObject() - if obj != nil { - func(res clientv3.WatchResponse) { - obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { - return f(res) - }), true) - }(resp) - } + f(resp) } if skip { diff --git a/etcd/init.go b/etcd/init.go deleted file mode 100644 index 5050c32..0000000 --- a/etcd/init.go +++ /dev/null @@ -1,17 +0,0 @@ -package etcd - -import ( - "mongo.games.com/goserver/core" -) - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - mgr.Start() - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - mgr.Shutdown() - return nil - }) -} diff --git a/etcd/manager.go b/etcd/manager.go index 3dc6d98..5631a0f 100644 --- a/etcd/manager.go +++ b/etcd/manager.go @@ -7,21 +7,31 @@ import ( "go.etcd.io/etcd/client/v3" + "mongo.games.com/goserver/core" + "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/logger" - "mongo.games.com/game/common" "mongo.games.com/game/proto" ) -var mgr = &Manager{Client: new(Client)} +/* + ETCD Manager +*/ -type Manager struct { - *Client -} +var ( + defaultUrl = []string{"localhost:2379"} + defaultUser = "" + defaultPasswd = "" + defaultDialTimeout = time.Minute +) -// Register . -func (this *Manager) Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { +var globalClient = new(Client) +// Register 注册etcd监听方法 +// key:监听的key +// msgType:数据类型 +// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType,是指针类型 +func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { createFunc := func() interface{} { tp := reflect.TypeOf(msgType) if tp.Kind() == reflect.Ptr { @@ -32,7 +42,7 @@ func (this *Manager) Register(key string, msgType interface{}, f func(ctx contex initFunc := func() int64 { logger.Logger.Info("ETCD 拉取数据:", key) - res, err := this.GetValueWithPrefix(key) + res, err := globalClient.GetValueWithPrefix(key) if err == nil { for i := int64(0); i < res.Count; i++ { data := createFunc() @@ -63,61 +73,61 @@ func (this *Manager) Register(key string, msgType interface{}, f func(ctx contex // 监控数据变动 watchFunc := func(ctx context.Context, revision int64) { - this.GoWatch(ctx, revision, key, func(res clientv3.WatchResponse) error { - for _, ev := range res.Events { - switch ev.Type { - case clientv3.EventTypePut: - data := createFunc() - v, ok := data.(proto.Message) - if !ok { - logger.Logger.Errorf("ETCD %v error: not proto message", string(ev.Kv.Key)) - continue + globalClient.GoWatch(ctx, revision, key, func(res clientv3.WatchResponse) error { + obj := core.CoreObject() + if obj != nil { + obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { + for _, ev := range res.Events { + switch ev.Type { + case clientv3.EventTypePut: + data := createFunc() + v, ok := data.(proto.Message) + if !ok { + logger.Logger.Errorf("ETCD %v error: not proto message", string(ev.Kv.Key)) + continue + } + err := proto.Unmarshal(ev.Kv.Value, v) + if err != nil { + logger.Logger.Errorf("etcd unmarshal(%v) error:%v", string(ev.Kv.Key), err) + continue + } + logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v) + f(ctx, string(ev.Kv.Key), false, ev, v) + case clientv3.EventTypeDelete: + logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) + f(ctx, string(ev.Kv.Key), false, ev, nil) + } } - err := proto.Unmarshal(ev.Kv.Value, v) - if err != nil { - logger.Logger.Errorf("etcd unmarshal(%v) error:%v", string(ev.Kv.Key), err) - continue - } - logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v) - f(ctx, string(ev.Kv.Key), false, ev, v) - case clientv3.EventTypeDelete: - logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) - f(ctx, string(ev.Kv.Key), false, ev, nil) - } + return nil + }), true) } return nil }) } - this.AddFunc(initFunc, watchFunc) -} - -func (this *Manager) Start() { - logger.Logger.Infof("EtcdClient开始连接url:%v;etcduser:%v;etcdpwd:%v", common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd")) - err := this.Open(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) - if err != nil { - logger.Logger.Tracef("Manager.Open err:%v", err) - } - this.ReInitAndWatchAll() -} - -func (this *Manager) Shutdown() { - this.Close() -} - -func (this *Manager) Reset() { - this.Close() - this.Start() -} - -// Register 注册etcd监听方法 -// key:监听的key -// msgType:数据类型 -// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType,是指针类型 -func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { - mgr.Register(key, msgType, f) + globalClient.AddFunc(initFunc, watchFunc) } func Reset() { - mgr.Reset() + globalClient.Close() + Start(defaultUrl, defaultUser, defaultPasswd, defaultDialTimeout) +} + +func Close() { + globalClient.Close() +} + +func Start(url []string, user, passwd string, dialTimeout time.Duration) { + defaultUrl = url + defaultUser = user + defaultPasswd = passwd + if dialTimeout > 0 { + defaultDialTimeout = dialTimeout + } + + err := globalClient.Open(defaultUrl, defaultUser, defaultPasswd, defaultDialTimeout) + if err != nil { + logger.Logger.Errorf("etcd.Open err:%v", err) + } + globalClient.ReInitAndWatchAll() } diff --git a/gamesrv/action/action_server.go b/gamesrv/action/action_server.go index 234f687..9127452 100644 --- a/gamesrv/action/action_server.go +++ b/gamesrv/action/action_server.go @@ -559,7 +559,7 @@ func init() { return nil })) - //玩家离开 + //同步记牌器过期时间 netlib.RegisterFactory(int(server.SSPacketID_PACKET_WG_BUYRECTIMEITEM), netlib.PacketFactoryWrapper(func() interface{} { return &server.WGBuyRecTimeItem{} })) diff --git a/gamesrv/base/init.go b/gamesrv/base/init.go deleted file mode 100644 index 1863b63..0000000 --- a/gamesrv/base/init.go +++ /dev/null @@ -1,45 +0,0 @@ -package base - -import ( - "time" - - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" - - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" -) - -var RabbitMQPublisher *mq.RabbitMQPublisher - -func init() { - model.InitGameParam() - model.InitFishingParam() - model.InitNormalParam() - model.InitGMAC() - - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - model.InitGameKVData() - - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - return nil - }) -} diff --git a/gamesrv/base/logchannel.go b/gamesrv/base/logchannel.go index 63173f6..b62cd45 100644 --- a/gamesrv/base/logchannel.go +++ b/gamesrv/base/logchannel.go @@ -4,6 +4,7 @@ import ( "reflect" "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) // LogChannelSingleton 日志记录器 @@ -37,11 +38,11 @@ func (c *LogChannel) WriteLog(log interface{}) { if cname == "" { cname = "_null_" } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) + mq.Send(data.MQName, data.Data) } func init() { diff --git a/gamesrv/base/serverstate.go b/gamesrv/base/serverstate.go index 9d375dd..57e8824 100644 --- a/gamesrv/base/serverstate.go +++ b/gamesrv/base/serverstate.go @@ -2,7 +2,6 @@ package base import ( "mongo.games.com/game/common" - "mongo.games.com/goserver/core" ) var ServerStateMgr = &ServerStateManager{ @@ -24,10 +23,3 @@ func (this *ServerStateManager) SetState(state common.GameSessState) { func (this *ServerStateManager) GetState() common.GameSessState { return this.State } - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - ServerStateMgr.Init() - return nil - }) -} diff --git a/gamesrv/base/srvdatamgrex.go b/gamesrv/base/srvdatamgrex.go index 9759469..e6f700b 100644 --- a/gamesrv/base/srvdatamgrex.go +++ b/gamesrv/base/srvdatamgrex.go @@ -3,7 +3,6 @@ package base import ( "mongo.games.com/game/common" "mongo.games.com/game/srvdata" - "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/logger" ) @@ -186,12 +185,3 @@ func (this *DB_FishOutMgrEx) InitFishAppear() { } } } - -// 初始化在线奖励系统 -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - logger.Logger.Info("初始化牌库[S]") - defer logger.Logger.Info("初始化牌库[E]") - return nil - }) -} diff --git a/gamesrv/main.go b/gamesrv/main.go index 1295456..65e7e28 100644 --- a/gamesrv/main.go +++ b/gamesrv/main.go @@ -1,17 +1,21 @@ package main import ( + "time" + "mongo.games.com/goserver/core" _ "mongo.games.com/goserver/core/i18n" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" "mongo.games.com/game/common" - _ "mongo.games.com/game/srvdata" - + "mongo.games.com/game/etcd" _ "mongo.games.com/game/gamesrv/action" _ "mongo.games.com/game/gamesrv/base" _ "mongo.games.com/game/gamesrv/transact" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" + _ "mongo.games.com/game/srvdata" // game _ "mongo.games.com/game/gamesrv/chess" @@ -31,10 +35,28 @@ import ( ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + model.InitFishingParam() + model.InitNormalParam() + model.InitGMAC() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - waiter := module.Start() - waiter.Wait("main()") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + model.InitGameKVData() + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + etcd.Close() + mq.StopPublisher() + return nil + }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/gatesrv/logchannel.go b/gatesrv/logchannel.go index 0689c8e..f5fc1f1 100644 --- a/gatesrv/logchannel.go +++ b/gatesrv/logchannel.go @@ -3,10 +3,6 @@ package main import ( "reflect" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" - - "mongo.games.com/game/common" "mongo.games.com/game/model" "mongo.games.com/game/mq" ) @@ -42,33 +38,9 @@ func (c *LogChannel) WriteLog(log interface{}) { if cname == "" { cname = "_null_" } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) -} - -var RabbitMQPublisher *mq.RabbitMQPublisher - -func init() { - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - return nil - }) + mq.Send(data.MQName, data.Data) } diff --git a/gatesrv/main.go b/gatesrv/main.go index f454c79..5027b07 100644 --- a/gatesrv/main.go +++ b/gatesrv/main.go @@ -4,18 +4,27 @@ import ( _ "mongo.games.com/game" "mongo.games.com/game/common" "mongo.games.com/game/model" - + "mongo.games.com/game/mq" "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/module" ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - model.InitGameParam() - - waiter := module.Start() - waiter.Wait("main()") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + mq.StopPublisher() + return nil + }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/mgrsrv/api/logchannel.go b/mgrsrv/api/logchannel.go index 5993b72..a694462 100644 --- a/mgrsrv/api/logchannel.go +++ b/mgrsrv/api/logchannel.go @@ -3,11 +3,8 @@ package api import ( "reflect" - "mongo.games.com/game/common" "mongo.games.com/game/model" "mongo.games.com/game/mq" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" ) /* @@ -15,11 +12,9 @@ import ( */ const ( - LOGCHANEL_BLACKHOLE = "_null_" + LogChanelBlackHole = "_null_" ) -var RabbitMQPublisher *mq.RabbitMQPublisher - var LogChannelSington = &LogChannel{ cName: make(map[reflect.Type]string), } @@ -48,36 +43,15 @@ func (c *LogChannel) getLogCName(log interface{}) string { func (c *LogChannel) WriteLog(log interface{}) { cname := c.getLogCName(log) if cname == "" { - cname = LOGCHANEL_BLACKHOLE + cname = LogChanelBlackHole } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) + mq.Send(data.MQName, data.Data) } func init() { LogChannelSington.RegisteLogCName(model.APILogCollName, &model.APILog{}) - - //首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err := RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - return nil - }) } diff --git a/mgrsrv/main.go b/mgrsrv/main.go index 9c6801d..090961e 100644 --- a/mgrsrv/main.go +++ b/mgrsrv/main.go @@ -1,39 +1,37 @@ package main import ( - _ "mongo.games.com/game" - _ "mongo.games.com/game/mgrsrv/api" - _ "mongo.games.com/game/srvdata" "time" - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/srvdata" "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/module" + + _ "mongo.games.com/game" + "mongo.games.com/game/common" + _ "mongo.games.com/game/mgrsrv/api" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" + _ "mongo.games.com/game/srvdata" ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - - model.InitGameParam() - logger.Logger.Warnf("log data %v", srvdata.Config.RootPath) - waiter := module.Start() - waiter.Wait("main()") -} - -func init() { - //首先加载游戏配置 + // core hook core.RegisteHook(core.HOOK_BEFORE_START, func() error { model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) return nil }) - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + mq.StopPublisher() model.ShutdownRPClient() return nil }) + // module模块 + w := module.Start() + w.Wait("main()") } diff --git a/mq/consumer.go b/mq/consumer.go index 26c49be..4110f6d 100644 --- a/mq/consumer.go +++ b/mq/consumer.go @@ -18,8 +18,11 @@ import ( 为了使用方便,这里统一启动订阅 */ -var subscriberLock sync.RWMutex -var subscriber = make(map[string][]*Subscriber) +var ( + globalConsumer *RabbitMQConsumer + subscriberLock sync.RWMutex + subscriber = make(map[string][]*Subscriber) +) type Subscriber struct { broker.Subscriber @@ -27,6 +30,90 @@ type Subscriber struct { opts []broker.SubscribeOption } +type RabbitMQConsumer struct { + broker.Broker + url string + exchange rabbitmq.Exchange +} + +func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { + mq := &RabbitMQConsumer{ + url: url, + exchange: exchange, + } + options := []broker.Option{ + broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name), + } + if exchange.Durable { + options = append(options, rabbitmq.DurableExchange()) + } + mq.Broker = rabbitmq.NewBroker(options...) + mq.Broker.Init() + return mq +} + +func (c *RabbitMQConsumer) Start() error { + if ok, _ := common.PathExists(BackupPath); !ok { + os.MkdirAll(BackupPath, os.ModePerm) + } + + if err := c.Connect(); err != nil { + return err + } + + for topic, ss := range GetSubscribers() { + for _, s := range ss { + sub, err := c.Subscribe(topic, func(event broker.Event) error { + var err error + defer func() { + e := recover() + if e != nil { + logger.Logger.Errorf("RabbitMQConsumer.Subscriber(%s,%v) recover:%v", event.Topic(), event.Message(), e) + } + if err != nil { + c.backUp(event, err) + } + }() + err = s.h(event) + return err + }, s.opts...) + if err != nil { + return err + } + + s.Subscriber = sub + } + } + + return nil +} + +func (c *RabbitMQConsumer) Stop() error { + for _, ss := range GetSubscribers() { + for _, s := range ss { + s.Unsubscribe() + } + } + return c.Disconnect() +} + +func (c *RabbitMQConsumer) backUp(e broker.Event, err error) { + tNow := time.Now() + filePath := fmt.Sprintf(FilePathFormat, BackupPath, e.Topic(), tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) + f, er := os.Create(filePath) + if er != nil { + logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), er) + return + } + defer f.Close() + var reason string + if err != nil { + reason = err.Error() + } + f.WriteString("reason:" + reason + "\n") + f.WriteString("data:" + string(e.Message().Body) + "\n") +} + // RegisterSubscriber 注册订阅处理方法 // 不同订阅是在各自的协程中执行的 func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.SubscribeOption) { @@ -39,21 +126,6 @@ func RegisterSubscriber(topic string, h broker.Handler, opts ...broker.Subscribe subscriberLock.Unlock() } -func UnregisterSubscriber(topic string) { - subscriberLock.Lock() - delete(subscriber, topic) - subscriberLock.Unlock() -} - -func GetSubscriber(topic string) []*Subscriber { - subscriberLock.RLock() - defer subscriberLock.RUnlock() - if s, ok := subscriber[topic]; ok { - return s - } - return nil -} - func GetSubscribers() map[string][]*Subscriber { ret := make(map[string][]*Subscriber) subscriberLock.RLock() @@ -66,75 +138,18 @@ func GetSubscribers() map[string][]*Subscriber { return ret } -type RabbitMQConsumer struct { - broker.Broker - url string - exchange rabbitmq.Exchange -} - -func NewRabbitMQConsumer(url string, exchange rabbitmq.Exchange) *RabbitMQConsumer { - mq := &RabbitMQConsumer{ - url: url, - exchange: exchange, - } - - rabbitmq.DefaultRabbitURL = mq.url - rabbitmq.DefaultExchange = mq.exchange - - mq.Broker = rabbitmq.NewBroker() - mq.Broker.Init() - return mq -} - -func (c *RabbitMQConsumer) Start() error { - if err := c.Connect(); err != nil { - return err - } - - sss := GetSubscribers() - for topic, ss := range sss { - for _, s := range ss { - sub, err := c.Subscribe(topic, s.h, s.opts...) - if err != nil { - return err - } - - s.Subscriber = sub - } - } - - return nil -} - -func (c *RabbitMQConsumer) Stop() error { - sss := GetSubscribers() - for _, ss := range sss { - for _, s := range ss { - s.Unsubscribe() - } - } - return c.Disconnect() -} - -func BackUp(e broker.Event, err error) { - tNow := time.Now() - filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, e.Topic(), tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000)) - f, err := os.Create(filePath) - if err != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", e.Topic(), e.Message(), err) - return - } - defer f.Close() - var reason string - if err != nil { - reason = err.Error() - } - f.WriteString("reason:" + reason + "\n") - f.WriteString("data:" + string(e.Message().Body) + "\n") -} - -func init() { - if ok, _ := common.PathExists(BACKUP_PATH); !ok { - os.MkdirAll(BACKUP_PATH, os.ModePerm) +// StartConsumer 启动消费者 +func StartConsumer(url string, exchange string, durableExchange bool) { + StopConsumer() + globalConsumer = NewRabbitMQConsumer(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}) + if err := globalConsumer.Start(); err != nil { + panic(fmt.Sprintf("RabbitMQConsumer.Start() err:%v", err)) + } +} + +// StopConsumer 停止消费者 +func StopConsumer() { + if globalConsumer != nil { + globalConsumer.Stop() } } diff --git a/mq/publisher.go b/mq/publisher.go index a703bd8..3f27579 100644 --- a/mq/publisher.go +++ b/mq/publisher.go @@ -9,10 +9,11 @@ import ( "sync" "time" - "mongo.games.com/game/common" "mongo.games.com/goserver/core/broker" "mongo.games.com/goserver/core/broker/rabbitmq" "mongo.games.com/goserver/core/logger" + + "mongo.games.com/game/common" ) /* @@ -20,11 +21,14 @@ import ( */ const ( - BACKUP_PATH = "backup" - TIME_FORMAT = "20060102150405" + BackupPath = "backup" + TimeFormat = "20060102150405" + FilePathFormat = "%s/%s_%s_%09d_%04d.dat" ) -var ERR_CLOSED = errors.New("publisher is closed") +var ErrClosed = errors.New("publisher is closed") + +var globalPublisher *RabbitMQPublisher type item struct { topic string @@ -32,115 +36,71 @@ type item struct { } type RabbitMQPublisher struct { - b broker.Broker + broker.Broker exchange rabbitmq.Exchange url string que chan *item - closed bool - waitor sync.WaitGroup + closed chan struct{} + wg sync.WaitGroup } -func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, backlog int) *RabbitMQPublisher { - if backlog <= 0 { - backlog = 1 +func NewRabbitMQPublisher(url string, exchange rabbitmq.Exchange, queueSize int) *RabbitMQPublisher { + if queueSize <= 0 { + queueSize = 1 } mq := &RabbitMQPublisher{ url: url, exchange: exchange, - que: make(chan *item, backlog), + que: make(chan *item, queueSize), + closed: make(chan struct{}), } - - rabbitmq.DefaultRabbitURL = mq.url - rabbitmq.DefaultExchange = mq.exchange - - mq.b = rabbitmq.NewBroker() - mq.b.Init() + options := []broker.Option{ + broker.Addrs(url), rabbitmq.ExchangeName(exchange.Name), + } + if exchange.Durable { + options = append(options, rabbitmq.DurableExchange()) + } + mq.Broker = rabbitmq.NewBroker(options...) + mq.Broker.Init() return mq } -func (p *RabbitMQPublisher) Start() (err error) { - if ok, _ := common.PathExists(BACKUP_PATH); !ok { - err = os.MkdirAll(BACKUP_PATH, os.ModePerm) - if err != nil { - return - } - } - - err = p.b.Connect() - if err != nil { - return - } - - go p.workerRoutine() - - return nil -} - -func (p *RabbitMQPublisher) Stop() error { - if p.closed { - return ERR_CLOSED - } - - p.closed = true - close(p.que) - for item := range p.que { - p.publish(item.topic, item.msg) - } - - //等待所有投递出去的任务全部完成 - p.waitor.Wait() - - return p.b.Disconnect() -} - -// Send 发布消息,异步 -func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { - if p.closed { - return ERR_CLOSED - } - - i := &item{topic: topic, msg: msg} - select { - case p.que <- i: - default: - //会不会情况更糟糕 - go p.concurrentPublish(topic, msg) - } - return nil -} - -func (p *RabbitMQPublisher) concurrentPublish(topic string, msg interface{}) (err error) { - p.waitor.Add(1) - defer p.waitor.Done() - return p.publish(topic, msg) -} - // 发布消息,同步 func (p *RabbitMQPublisher) publish(topic string, msg interface{}) (err error) { defer func() { - if err != nil { + e := recover() + if e != nil { + logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) recover:%v", topic, msg, e) + } + if err != nil || e != nil { p.backup(topic, msg, err) } - - recover() }() - buf, err := json.Marshal(msg) - if err != nil { - return err + var buf []byte + switch d := msg.(type) { + case []byte: + buf = d + case string: + buf = []byte(d) + default: + buf, err = json.Marshal(msg) + if err != nil { + return err + } } - err = p.b.Publish(topic, &broker.Message{Body: buf}) + err = p.Publish(topic, &broker.Message{Body: buf}) if err != nil { - logger.Logger.Error("RabbitMQPublisher.publish err:", err) + logger.Logger.Error("RabbitMQPublisher.publish(%s,%v) err:%v", topic, msg, err) return } return nil } -func (p *RabbitMQPublisher) workerRoutine() { - p.waitor.Add(1) - defer p.waitor.Done() +func (p *RabbitMQPublisher) publishRoutine() { + p.wg.Add(1) + defer p.wg.Done() for { select { @@ -150,20 +110,81 @@ func (p *RabbitMQPublisher) workerRoutine() { } else { return } + case <-p.closed: + return } } } -func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { - buf, err := json.Marshal(msg) +func (p *RabbitMQPublisher) Start() (err error) { + if ok, _ := common.PathExists(BackupPath); !ok { + err = os.MkdirAll(BackupPath, os.ModePerm) + if err != nil { + return + } + } + + err = p.Connect() if err != nil { return } + + go p.publishRoutine() + + return nil +} + +func (p *RabbitMQPublisher) Stop() error { + select { + case <-p.closed: + return ErrClosed + default: + } + + close(p.closed) + close(p.que) + for item := range p.que { + p.publish(item.topic, item.msg) + } + + //等待所有投递出去的任务全部完成 + p.wg.Wait() + + return p.Disconnect() +} + +// Send 发布消息,异步 +func (p *RabbitMQPublisher) Send(topic string, msg interface{}) (err error) { + select { + case <-p.closed: + return ErrClosed + default: + } + + i := &item{topic: topic, msg: msg} + select { + case p.que <- i: + default: + //会不会情况更糟糕 + go func() { + p.wg.Add(1) + defer p.wg.Done() + p.publish(topic, msg) + }() + } + return nil +} + +func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { + buf, er := json.Marshal(msg) + if er != nil { + return + } tNow := time.Now() - filePath := fmt.Sprintf("%s/%s_%s_%09d_%04d.dat", BACKUP_PATH, topic, tNow.Format(TIME_FORMAT), tNow.Nanosecond(), rand.Int31n(10000)) - f, err := os.Create(filePath) - if err != nil { - logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, err) + filePath := fmt.Sprintf(FilePathFormat, BackupPath, topic, tNow.Format(TimeFormat), tNow.Nanosecond(), rand.Int31n(10000)) + f, er := os.Create(filePath) + if er != nil { + logger.Logger.Errorf("RabbitMQPublisher.public(%s,%v) err:%v", topic, msg, er) return } defer f.Close() @@ -174,3 +195,26 @@ func (p *RabbitMQPublisher) backup(topic string, msg interface{}, err error) { f.WriteString("reason:" + reason + "\n") f.WriteString("data:" + string(buf) + "\n") } + +// StartPublisher 启动发布者 +func StartPublisher(url string, exchange string, durableExchange bool, queueSize int) { + StopPublisher() + globalPublisher = NewRabbitMQPublisher(url, rabbitmq.Exchange{Name: exchange, Durable: durableExchange}, queueSize) + if err := globalPublisher.Start(); err != nil { + panic(fmt.Sprintf("RabbitMQPublisher.Start() err:%v", err)) + } +} + +// StopPublisher 停止发布者 +func StopPublisher() { + if globalPublisher != nil { + globalPublisher.Stop() + } +} + +func Send(topic string, msg interface{}) (err error) { + if globalPublisher != nil { + return globalPublisher.Send(topic, msg) + } + return ErrClosed +} diff --git a/ranksrv/com/register.go b/ranksrv/com/register.go index faef73f..0097509 100644 --- a/ranksrv/com/register.go +++ b/ranksrv/com/register.go @@ -19,7 +19,7 @@ func Register(mainId int, msgType interface{}, h func(s *netlib.Session, g *rank return reflect.New(tp).Interface() } - common.RegisterHandler(mainId, common.HandlerWrapper(func(s *netlib.Session, packetId int, data interface{}, sid int64) error { + common.Register(mainId, rankproto.GateTransmit{}, func(s *netlib.Session, packetId int, data interface{}, sid int64) error { d, ok := data.(*rankproto.GateTransmit) if !ok { return nil @@ -36,9 +36,5 @@ func Register(mainId int, msgType interface{}, h func(s *netlib.Session, g *rank } return h(s, d, packetId, msg, sid) - })) - - netlib.RegisterFactory(mainId, netlib.PacketFactoryWrapper(func() interface{} { - return &rankproto.GateTransmit{} - })) + }) } diff --git a/ranksrv/init.go b/ranksrv/init.go deleted file mode 100644 index f2dce93..0000000 --- a/ranksrv/init.go +++ /dev/null @@ -1,35 +0,0 @@ -package main - -import ( - "mongo.games.com/game/common" - "mongo.games.com/game/model" - "mongo.games.com/game/mq" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/broker/rabbitmq" - "time" -) - -var RabbitMQPublisher *mq.RabbitMQPublisher -var RabbitMqConsumer *mq.RabbitMQConsumer - -func init() { - //首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - //初始化rpc - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - - //rabbitmq打开链接 - RabbitMqConsumer = mq.NewRabbitMQConsumer(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}) - if RabbitMqConsumer != nil { - RabbitMqConsumer.Start() - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - if RabbitMqConsumer != nil { - RabbitMqConsumer.Stop() - } - return nil - }) -} diff --git a/ranksrv/main.go b/ranksrv/main.go index 79356de..a3bd2c5 100644 --- a/ranksrv/main.go +++ b/ranksrv/main.go @@ -1,21 +1,38 @@ package main import ( - "mongo.games.com/game/model" + "time" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" "mongo.games.com/game/common" + "mongo.games.com/game/etcd" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) func main() { - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + // package模块 core.LoadPackages("config.json") defer core.ClosePackages() - - model.InitGameParam() - - waitor := module.Start() - waitor.Wait("ranksrv") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartConsumer(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true) + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + etcd.Close() + mq.StopConsumer() + model.ShutdownRPClient() + return nil + }) + // module模块 + w := module.Start() + w.Wait("ranksrv") } diff --git a/robot/base/init.go b/robot/base/init.go index 3972dd8..32fbd90 100644 --- a/robot/base/init.go +++ b/robot/base/init.go @@ -6,11 +6,7 @@ import ( "time" "github.com/globalsign/mgo/bson" - "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/logger" - - "mongo.games.com/game/common" - "mongo.games.com/game/model" ) type AccountData struct { @@ -27,19 +23,8 @@ var ( ) var accountFileName = "robotaccount.json" -func init() { - model.InitGameParam() - - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - initAccountData() - // 连接数据库服务 - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - return nil - }) -} - -// 初始化机器人账号 -func initAccountData() { +// InitAccountData 初始化机器人账号 +func InitAccountData() { dirty := false newFunc := func(n int) { for i := 0; i < n; i++ { diff --git a/robot/main.go b/robot/main.go index c5b3550..6d2992e 100644 --- a/robot/main.go +++ b/robot/main.go @@ -1,11 +1,15 @@ package main import ( + "time" + "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/module" _ "mongo.games.com/game" - _ "mongo.games.com/game/common" + "mongo.games.com/game/common" + "mongo.games.com/game/model" + "mongo.games.com/game/robot/base" _ "mongo.games.com/game/robot/base" _ "mongo.games.com/game/robot/chess" _ "mongo.games.com/game/robot/thirteen" @@ -14,9 +18,22 @@ import ( ) func main() { + // 自定义配置文件 + model.InitGameParam() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") - + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + base.InitAccountData() + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + model.ShutdownRPClient() + return nil + }) + // module模块 waiter := module.Start() waiter.Wait("main()") } diff --git a/worldsrv/action_sign.go b/worldsrv/action_sign.go deleted file mode 100644 index 55bb774..0000000 --- a/worldsrv/action_sign.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "mongo.games.com/game/common" - "mongo.games.com/game/proto" - "mongo.games.com/game/protocol/activity" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/goserver/core/netlib" -) - -// ------------------------------------------------ -type CSSignPacketFactory struct { -} - -type CSSignHandler struct { -} - -func (this *CSSignPacketFactory) CreatePacket() interface{} { - pack := &activity.CSSign{} - return pack -} - -func (this *CSSignHandler) Process(s *netlib.Session, packetid int, data interface{}, sid int64) error { - logger.Logger.Trace("CSSignHandler Process recv ", data) - if msg, ok := data.(*activity.CSSign); ok { - p := PlayerMgrSington.GetPlayer(sid) - if p == nil { - logger.Logger.Warn("CSSignHandler p == nil") - return nil - } - - pack := &activity.SCSign{} - pack.SignIndex = proto.Int32(msg.GetSignIndex()) - pack.SignType = proto.Int32(msg.GetSignType()) - - retCode := ActSignMgrSington.CanSign(p, int(msg.GetSignIndex())) - if retCode != activity.OpResultCode_ActSign_OPRC_Activity_Sign_Sucess { - pack.OpRetCode = retCode - proto.SetDefaults(pack) - p.SendToClient(int(activity.ActSignPacketID_PACKET_SCSign), pack) - return nil - } - retCode = ActSignMgrSington.Sign(p, int(msg.GetSignIndex()), msg.GetSignType()) - if retCode != activity.OpResultCode_ActSign_OPRC_Activity_Sign_Sucess { - pack.OpRetCode = retCode - proto.SetDefaults(pack) - p.SendToClient(int(activity.ActSignPacketID_PACKET_SCSign), pack) - return nil - } - - pack.OpRetCode = activity.OpResultCode_ActSign_OPRC_Activity_Sign_Sucess - proto.SetDefaults(pack) - p.SendToClient(int(activity.ActSignPacketID_PACKET_SCSign), pack) - } - return nil -} - -// ------------------------------------------------ -type CSSignDataPacketFactory struct { -} - -type CSSignDataHandler struct { -} - -func (this *CSSignDataPacketFactory) CreatePacket() interface{} { - pack := &activity.CSSignData{} - return pack -} - -func (this *CSSignDataHandler) Process(s *netlib.Session, packetid int, data interface{}, sid int64) error { - logger.Logger.Trace("CSSignDataHandler Process recv ", data) - if _, ok := data.(*activity.CSSignData); ok { - p := PlayerMgrSington.GetPlayer(sid) - if p == nil { - logger.Logger.Warn("CSSignDataHandler p == nil") - return nil - } - ActSignMgrSington.SendSignDataToPlayer(p) - } - return nil -} - -func init() { - //签到 - common.RegisterHandler(int(activity.ActSignPacketID_PACKET_CSSign), &CSSignHandler{}) - netlib.RegisterFactory(int(activity.ActSignPacketID_PACKET_CSSign), &CSSignPacketFactory{}) - //签到数据 - common.RegisterHandler(int(activity.ActSignPacketID_PACKET_CSSignData), &CSSignDataHandler{}) - netlib.RegisterFactory(int(activity.ActSignPacketID_PACKET_CSSignData), &CSSignDataPacketFactory{}) -} diff --git a/worldsrv/actsignmgr.go b/worldsrv/actsignmgr.go index 9f6c56e..763e68c 100644 --- a/worldsrv/actsignmgr.go +++ b/worldsrv/actsignmgr.go @@ -152,8 +152,4 @@ func (this *ActSignMgr) Sign(player *Player, signIndex int, signType int32) acti func init() { mgo.SetStats(true) - RegisterParallelLoadFunc("14日签到", func() error { - ActSignMgrSington.Init() - return nil - }) } diff --git a/worldsrv/blacklistmgr.go b/worldsrv/blacklistmgr.go index 9e68aa5..3c2cce4 100644 --- a/worldsrv/blacklistmgr.go +++ b/worldsrv/blacklistmgr.go @@ -538,8 +538,4 @@ func (this *BlackListMgr) CheckDeviceInBlackByPlatfrom(deviceId string, blackTyp func init() { mgo.SetStats(true) - RegisterParallelLoadFunc("平台黑名单", func() error { - BlackListMgrSington.Init() - return nil - }) } diff --git a/worldsrv/gamestate.go b/worldsrv/gamestate.go index 58c9997..2eff9a8 100644 --- a/worldsrv/gamestate.go +++ b/worldsrv/gamestate.go @@ -84,11 +84,6 @@ func (gsm *GameStateManager) BrodcastGameState(gameId int32, platform string, pa } } func init() { - //使用并行加载 - RegisterParallelLoadFunc("选场游戏场次配置", func() error { - gameStateMgr.Init() - return nil - }) //gameStateMgr.gameIds[int32(common.GameId_RollCoin)] = []int32{110030001, 110030002, 110030003, 110030004} //gameStateMgr.gameIds[int32(common.GameId_RollColor)] = []int32{150010001, 150010002, 150010003, 150010004} //gameStateMgr.gameIds[int32(common.GameId_RedVsBlack)] = []int32{140010001, 140010002, 140010003, 140010004} diff --git a/worldsrv/horseracelamp.go b/worldsrv/horseracelamp.go index 574c4f2..85a23e2 100644 --- a/worldsrv/horseracelamp.go +++ b/worldsrv/horseracelamp.go @@ -406,9 +406,4 @@ func (this *HorseRaceLampMgr) BroadcastHorseRaceLampMsg(horseRaceLamp *HorseRace func init() { module.RegisteModule(HorseRaceLampMgrSington, time.Second*3, 0) - - RegisterParallelLoadFunc("平台通知", func() error { - HorseRaceLampMgrSington.InitHorseRaceLamp() - return nil - }) } diff --git a/worldsrv/init.go b/worldsrv/init.go deleted file mode 100644 index 03f6666..0000000 --- a/worldsrv/init.go +++ /dev/null @@ -1,207 +0,0 @@ -package main - -import ( - "fmt" - "math/rand" - "time" - - "mongo.games.com/game/common" - "mongo.games.com/game/gamerule/crash" - "mongo.games.com/game/mq" - "mongo.games.com/goserver/core/broker/rabbitmq" - - "sync" - - "github.com/astaxie/beego/cache" - "mongo.games.com/game/model" - "mongo.games.com/game/webapi" - "mongo.games.com/goserver/core" - "mongo.games.com/goserver/core/logger" - "mongo.games.com/goserver/core/utils" -) - -type ParalleFunc func() error - -var CacheMemory cache.Cache -var wgParalleLoad = &sync.WaitGroup{} -var ParalleLoadModules []*ParalleLoadModule -var RabbitMQPublisher *mq.RabbitMQPublisher -var RabbitMqConsumer *mq.RabbitMQConsumer - -type ParalleLoadModule struct { - name string - f ParalleFunc -} - -func RegisterParallelLoadFunc(name string, f ParalleFunc) { - ParalleLoadModules = append(ParalleLoadModules, &ParalleLoadModule{name: name, f: f}) -} - -func init() { - rand.Seed(time.Now().UnixNano()) - - //首先加载游戏配置 - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) - model.InitGameParam() - model.InitNormalParam() - - hs := model.GetCrashHash(0) - if hs == nil { - for i := 0; i < crash.POKER_CART_CNT; i++ { - model.InsertCrashHash(i, crash.Sha256(fmt.Sprintf("%v%v", i, time.Now().UnixNano()))) - } - hs = model.GetCrashHash(0) - } - model.GameParamData.InitGameHash = []string{} - for _, v := range hs { - model.GameParamData.InitGameHash = append(model.GameParamData.InitGameHash, v.Hash) - } - hsatom := model.GetCrashHashAtom(0) - if hsatom == nil { - for i := 0; i < crash.POKER_CART_CNT; i++ { - model.InsertCrashHashAtom(i, crash.Sha256(fmt.Sprintf("%v%v", i, time.Now().UnixNano()))) - } - hsatom = model.GetCrashHashAtom(0) - } - model.GameParamData.AtomGameHash = []string{} - for _, v := range hsatom { - model.GameParamData.AtomGameHash = append(model.GameParamData.AtomGameHash, v.Hash) - } - //if len(model.GameParamData.AtomGameHash) < crash.POKER_CART_CNT || - // len(model.GameParamData.InitGameHash) < crash.POKER_CART_CNT { - // panic(errors.New("hash is read error")) - //} - return nil - }) - - //RegisterParallelLoadFunc("平台红包数据", func() error { - // actRandCoinMgr.LoadPlatformData() - // return nil - //}) - - RegisterParallelLoadFunc("GMAC", func() error { - model.InitGMAC() - return nil - }) - - RegisterParallelLoadFunc("三方游戏配置", func() error { - model.InitGameConfig() - return nil - }) - - RegisterParallelLoadFunc("GameKVData", func() error { - model.InitGameKVData() - return nil - }) - - RegisterParallelLoadFunc("水池配置", func() error { - return model.GetAllCoinPoolSettingData() - }) - - RegisterParallelLoadFunc("三方平台热载数据设置", func() error { - f := func() { - webapi.ReqCgAddr = model.GameParamData.CgAddr - if plt, ok := webapi.ThridPlatformMgrSington.ThridPlatformMap.Load("XHJ平台"); ok { - //plt.(*webapi.XHJThridPlatform).IsNeedCheckQuota = model.GameParamData.FGCheckPlatformQuota - plt.(*webapi.XHJThridPlatform).ReqTimeOut = model.GameParamData.ThirdPltReqTimeout - } - } - f() - model.GameParamData.Observers = append(model.GameParamData.Observers, f) - return nil - }) - - core.RegisteHook(core.HOOK_BEFORE_START, func() error { - - //for _, v := range data { - // PlatformMgrSingleton.UpsertPlatform(v.Name, v.Isolated, v.GameStatesData) - //} - - //ps := []model.GamePlatformState{model.GamePlatformState{LogicId:130000001,Param:"",State:1},model.GamePlatformState{LogicId:150000001,Param:"",State:1}} - - //model.InsertPlatformGameConfig("360",true,ps) - - var err error - CacheMemory, err = cache.NewCache("memory", `{"interval":60}`) - if err != nil { - return err - } - - //etcd打开连接 - EtcdMgrSington.Init() - - //go func() { - // for { - // time.Sleep(time.Minute) - // EtcdMgrSington.Reset() - // } - //}() - - //rabbitmq打开链接 - RabbitMQPublisher = mq.NewRabbitMQPublisher(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}, common.CustomConfig.GetInt("RMQPublishBacklog")) - if RabbitMQPublisher != nil { - err = RabbitMQPublisher.Start() - if err != nil { - panic(err) - } - } - - RabbitMqConsumer = mq.NewRabbitMQConsumer(common.CustomConfig.GetString("RabbitMQURL"), rabbitmq.Exchange{Name: common.CustomConfig.GetString("RMQExchange"), Durable: true}) - if RabbitMqConsumer != nil { - RabbitMqConsumer.Start() - } - - //开始并行加载数据 - //改为串行加载,后台并发有点扛不住 - if len(ParalleLoadModules) > 0 { - tStart := time.Now() - logger.Logger.Infof("===[开始串行加载]===") - //wgParalleLoad.Add(paralleCnt) - for _, m := range ParalleLoadModules { - /*go*/ func(plm *ParalleLoadModule) { - ts := time.Now() - defer func() { - utils.DumpStackIfPanic(plm.name) - //wgParalleLoad.Done() - logger.Logger.Infof("[串行加载结束][%v] 耗时[%v]", plm.name, time.Now().Sub(ts)) - }() - logger.Logger.Infof("[开始串行加载][%v] ", plm.name) - - err := plm.f() - if err != nil { - logger.Logger.Warnf("[串行加载][%v][error:%v]", plm.name, err) - } - }(m) - } - //wgParalleLoad.Wait() - logger.Logger.Infof("===[串行加载结束,总耗时:%v]===", time.Now().Sub(tStart)) - } - return nil - }) - - core.RegisteHook(core.HOOK_AFTER_STOP, func() error { - //etcd关闭连接 - EtcdMgrSington.Shutdown() - //关闭rabbitmq连接 - if RabbitMQPublisher != nil { - RabbitMQPublisher.Stop() - } - - if RabbitMqConsumer != nil { - RabbitMqConsumer.Stop() - } - - //model.ShutdownRPClient() - return nil - }) - //core.RegisteHook(core.HOOK_BEFORE_START, func() error { - // ThirdPltGameMappingConfig.Init() - // return nil - //}) - //RegisterParallelLoadFunc("分层配置数据", func() error { - // //加载分层配置 - // LogicLevelMgrSington.LoadConfig() - // return nil - //}) -} diff --git a/worldsrv/logchannel.go b/worldsrv/logchannel.go index b4bcc50..d1853b2 100644 --- a/worldsrv/logchannel.go +++ b/worldsrv/logchannel.go @@ -39,12 +39,12 @@ func (c *LogChannel) WriteLog(log interface{}) { if cname == "" { cname = "_null_" } - RabbitMQPublisher.Send(cname, log) + mq.Send(cname, log) } // WriteMQData rabbitMQ消息 func (c *LogChannel) WriteMQData(data *model.RabbitMQData) { - RabbitMQPublisher.Send(data.MQName, data.Data) + mq.Send(data.MQName, data.Data) } func init() { diff --git a/worldsrv/main.go b/worldsrv/main.go index 098ddcb..8bdcc55 100644 --- a/worldsrv/main.go +++ b/worldsrv/main.go @@ -1,26 +1,65 @@ package main import ( - "math/rand" "time" - _ "mongo.games.com/game" - "mongo.games.com/game/common" + "github.com/astaxie/beego/cache" "mongo.games.com/goserver/core" _ "mongo.games.com/goserver/core/i18n" "mongo.games.com/goserver/core/module" "mongo.games.com/goserver/core/schedule" + + _ "mongo.games.com/game" + "mongo.games.com/game/common" + "mongo.games.com/game/etcd" + "mongo.games.com/game/model" + "mongo.games.com/game/mq" ) +var CacheMemory cache.Cache + func main() { - rand.Seed(time.Now().Unix()) - core.RegisterConfigEncryptor(common.ConfigFE) + // 自定义配置文件 + model.InitGameParam() + model.InitNormalParam() + model.InitGMAC() + model.InitGameConfig() + // package模块 defer core.ClosePackages() core.LoadPackages("config.json") + // core hook + core.RegisteHook(core.HOOK_BEFORE_START, func() error { + model.StartupRPClient(common.CustomConfig.GetString("MgoRpcCliNet"), common.CustomConfig.GetString("MgoRpcCliAddr"), time.Duration(common.CustomConfig.GetInt("MgoRpcCliReconnInterV"))*time.Second) + etcd.Start(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute) + mq.StartConsumer(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true) + mq.StartPublisher(common.CustomConfig.GetString("RabbitMQURL"), common.CustomConfig.GetString("RMQExchange"), true, common.CustomConfig.GetInt("RMQPublishBacklog")) + var err error + CacheMemory, err = cache.NewCache("memory", `{"interval":60}`) + if err != nil { + return err + } + + EtcdMgrSington.Init() + BlackListMgrSington.Init() + gameStateMgr.Init() + HorseRaceLampMgrSington.InitHorseRaceLamp() + model.InitGameKVData() + model.GetAllCoinPoolSettingData() + MsgMgrSington.InitMsg() + PlatformGameGroupMgrSington.LoadGameGroup() + return nil + }) + core.RegisteHook(core.HOOK_AFTER_STOP, func() error { + EtcdMgrSington.Close() + etcd.Close() + mq.StopPublisher() + mq.StopConsumer() + model.ShutdownRPClient() + return nil + }) //启动定时任务 schedule.StartTask() - //启动业务模块 waiter := module.Start() waiter.Wait("main()") diff --git a/worldsrv/mq_coinlog.go b/worldsrv/mq_coinlog.go index 9425191..e78259f 100644 --- a/worldsrv/mq_coinlog.go +++ b/worldsrv/mq_coinlog.go @@ -19,7 +19,6 @@ func init() { if msg != nil { defer func() { e.Ack() - recover() }() var log model.CoinLog @@ -48,7 +47,6 @@ func init() { if msg != nil { defer func() { e.Ack() - recover() }() var log model.BindNum diff --git a/worldsrv/msgmgr.go b/worldsrv/msgmgr.go index 90be8f9..518a969 100644 --- a/worldsrv/msgmgr.go +++ b/worldsrv/msgmgr.go @@ -52,8 +52,5 @@ func (mm *MsgMgr) GetSubscribeMsgs(platform string, ts int64) (msgs []*model.Mes } func init() { - RegisterParallelLoadFunc("平台邮件", func() error { - MsgMgrSington.InitMsg() - return nil - }) + } diff --git a/worldsrv/platformgamegroup.go b/worldsrv/platformgamegroup.go index ca35ad2..1f430f8 100644 --- a/worldsrv/platformgamegroup.go +++ b/worldsrv/platformgamegroup.go @@ -125,8 +125,5 @@ func (this *PlatformGameGroupMgr) OnGameGroupUpdate(oldCfg, newCfg *webapi_proto } func init() { - RegisterParallelLoadFunc("平台游戏组数据", func() error { - PlatformGameGroupMgrSington.LoadGameGroup() - return nil - }) + } diff --git a/worldsrv/player.go b/worldsrv/player.go index 957fbda..a92509b 100644 --- a/worldsrv/player.go +++ b/worldsrv/player.go @@ -365,10 +365,6 @@ func (this *Player) OnLogined() { this.OnlineLogLogin() this.SendToRepSrv(this.PlayerData) - - //七日活动 - ActSignMgrSington.OnPlayerLogin(this) - //红点检测 this.CheckShowRed() @@ -423,9 +419,6 @@ func (this *Player) OnRehold() { FriendMgrSington.ApplyList(this.Platform, this.SnId) FriendUnreadMgrSington.CheckSendFriendUnreadData(this.Platform, this.SnId) - //七日活动. - ActSignMgrSington.OnPlayerLogin(this) - this.CheckShowRed() this.SendJackPotInit() @@ -2212,8 +2205,6 @@ func (this *Player) OnDayTimer(login, continuous bool, t int) { this.ShopLastLookTime = make(map[int32]int64) // 福利活动更新 WelfareMgrSington.OnDayChanged(this) - //七日活动 - ActSignMgrSington.OnDayChanged(this) this.VipMatchTimes = 0 //VIP商城数据更新 this.UpdateVipShopData()