package main import ( "context" "fmt" "os" "os/signal" "sync" "time" "github.com/spf13/viper" "mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/mongox" "mongo.games.com/goserver/core/mysqlx" "mongo.games.com/goserver/core/utils" "mongo.games.com/goserver/core/viperx" mongomodel "mongo.games.com/game/statistics/modelmongo" mysqlmodel "mongo.games.com/game/statistics/modelmysql" "mongo.games.com/game/statistics/static" "mongo.games.com/game/statistics/syn" ) var VP *viper.Viper // DoTick 定时执行 func DoTick(ctx context.Context, wg *sync.WaitGroup, duration time.Duration, fu func(ctx context.Context)) { wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return case <-time.After(duration): utils.RecoverPanicFunc() // 捕获异常 fu(ctx) } } }() } // DoTickPlatform 定时执行,根据platform执行 func DoTickPlatform(ctx context.Context, wg *sync.WaitGroup, duration time.Duration, batchSize int, fu func(ctx context.Context, platform string, batchSize int)) { wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return case <-time.After(duration): utils.RecoverPanicFunc() // 捕获异常 wg := new(sync.WaitGroup) for _, v := range VP.GetStringSlice("platforms") { platform := v wg.Add(1) go func() { defer wg.Done() fu(ctx, platform, batchSize) }() } wg.Wait() } } }() } func main() { VP = viperx.GetViper("config.yaml") // mongo vp := viperx.GetViper("mongo.yaml") // mongo初始化 conf := &mongox.Config{} err := vp.Unmarshal(conf) if err != nil { panic(fmt.Errorf("mongo config error: %v", err)) } mongox.Init(conf) defer mongox.Close() // mysql vp = viperx.GetViper("mysql.yaml") myConf := &mysqlx.Config{} err = vp.Unmarshal(myConf) if err != nil { panic(fmt.Errorf("mysql config error: %v", err)) } mysqlx.Init(myConf) defer mysqlx.Close() mysqlx.SetAutoMigrateTables(mysqlmodel.Tables) wg := &sync.WaitGroup{} ctx, cancel := context.WithCancel(context.Background()) DoTick(ctx, wg, time.Duration(VP.GetInt64("update_second"))*time.Second, SyncSnId) DoTick(ctx, wg, time.Duration(VP.GetInt64("update_second_snid"))*time.Second, func(ctx context.Context) { wg := new(sync.WaitGroup) for _, v := range VP.GetStringSlice("platforms") { platform := v wg.Add(1) go func() { defer wg.Done() Static(platform) }() } wg.Wait() }) DoTick(ctx, wg, time.Duration(VP.GetInt64("update_second_invite"))*time.Second, SyncInvite) DoTickPlatform(ctx, wg, time.Duration(VP.GetInt64("update_second_item"))*time.Second, VP.GetInt("update_item_num"), func(ctx context.Context, platform string, batchSize int) { err := syn.ItemGainDone(&syn.Data[mongomodel.ItemLog]{ Platform: platform, BatchSize: batchSize, }) if err != nil { logger.Logger.Errorf("SyncItem error:%v", err) } }) logger.Logger.Info("start") c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill) sig := <-c logger.Logger.Infof("closing down (signal: %v)", sig) // release cancel() wg.Wait() logger.Logger.Info("closed") } // SyncSnId 同步注册和登录日志 func SyncSnId(ctx context.Context) { wg := new(sync.WaitGroup) for _, v := range VP.GetStringSlice("platforms") { platform := v wg.Add(1) go func() { defer wg.Done() _, err := syn.UserAccount(platform, VP.GetInt("update_account_num")) if err != nil { logger.Logger.Errorf("SyncUserAccount error: %v", err) return } _, err = syn.LogLogin(platform, VP.GetInt("update_login_num")) if err != nil { logger.Logger.Errorf("SyncLogLogin error: %v", err) return } }() } wg.Wait() } // Static 玩家id触发数据统计 func Static(platform string) { // 查询需要更新的玩家id var ids []*mysqlmodel.UserID db, err := mysqlx.GetDatabase(platform) if err != nil { logger.Logger.Errorf("GetDatabase error: %v", err) return } if err := db.Limit(VP.GetInt("update_snid_num")).Find(&ids).Error; err != nil { logger.Logger.Warnf("Get UserID error: %v", err) return } if len(ids) == 0 { logger.Logger.Tracef("Static: no need to update") return } // 统计玩家跳出记录 if err := static.UserLogin(platform, ids); err != nil { logger.Logger.Errorf("StaticUserLogin error: %v", err) return } // 删除更新过的玩家id if err := db.Delete(ids).Error; err != nil { logger.Logger.Errorf("Delete error: %v", err) return } } // SyncInvite 同步邀请数据 func SyncInvite(ctx context.Context) { wg := new(sync.WaitGroup) for _, v := range VP.GetStringSlice("platforms") { platform := v wg.Add(1) go func() { defer wg.Done() err := syn.SyncInviteScore(platform, VP.GetInt("update_invite_num")) if err != nil { logger.Logger.Errorf("SyncInviteScore error: %v", err) return } }() } wg.Wait() }