game_sync/statistics/main.go

240 lines
5.4 KiB
Go

package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"time"
"github.com/spf13/viper"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/mongo"
"mongo.games.com/game/mysql"
mongomodel "mongo.games.com/game/statistics/mongo/model"
mysqlmodel "mongo.games.com/game/statistics/mysql/model"
"mongo.games.com/game/statistics/static"
"mongo.games.com/game/statistics/syn"
"mongo.games.com/game/statistics/tools"
"mongo.games.com/game/util/viperx"
)
var VP *viper.Viper
//func init() {
// // 日志
// *log.StandardLogger() = *log.New()
//
// // 日志等级
// level, err := log.ParseLevel(VP.GetString("log.level"))
// if err != nil {
// panic(err)
// }
// log.SetLevel(level)
//
// // 打印文件路径及行号
// log.AddHook(tools.NewFileLineHook(log.ErrorLevel))
//
// // 日志切分
// for _, v := range VP.Get("log.rotate").([]interface{}) {
// conf := &tools.RotateLogConfig{}
// b, err := json.Marshal(v)
// if err != nil {
// panic(err)
// }
// if err = json.Unmarshal(b, conf); err != nil {
// panic(err)
// }
// log.AddHook(tools.NewRotateLogHook(conf))
// }
//}
// DoTick 定时执行
func DoTick(ctx context.Context, wg *sync.WaitGroup, duration time.Duration, fu func(ctx context.Context)) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
tools.RecoverPanicFunc() // 捕获异常
fu(ctx)
}
}
}()
}
func DoTickPlatform(ctx context.Context, wg *sync.WaitGroup, duration time.Duration, batchSize int,
fu func(ctx context.Context, platform string, batchSize int)) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
tools.RecoverPanicFunc() // 捕获异常
wg := new(sync.WaitGroup)
for _, v := range VP.GetStringSlice("platforms") {
platform := v
wg.Add(1)
go func() {
defer wg.Done()
fu(ctx, platform, batchSize)
}()
}
wg.Wait()
}
}
}()
}
func main() {
VP = viperx.GetViper("config", "yaml")
// mongo
vp := viperx.GetViper("mongo", "yaml")
// mongo初始化
conf := &mongo.Config{}
err := vp.Unmarshal(conf)
if err != nil {
panic(fmt.Errorf("mongo config error: %v", err))
}
mongo.Init(conf)
defer mongo.Close()
// mysql
vp = viperx.GetViper("mysql", "yaml")
myConf := &mysql.Config{}
err = vp.Unmarshal(myConf)
if err != nil {
panic(fmt.Errorf("mysql config error: %v", err))
}
mysql.Init(myConf)
defer mysql.Close()
mysql.SetAutoMigrateTables(mysqlmodel.Tables)
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
DoTick(ctx, wg, time.Duration(VP.GetInt64("update_second"))*time.Second, SyncSnId)
DoTick(ctx, wg, time.Duration(VP.GetInt64("update_second_snid"))*time.Second, func(ctx context.Context) {
wg := new(sync.WaitGroup)
for _, v := range VP.GetStringSlice("platforms") {
platform := v
wg.Add(1)
go func() {
defer wg.Done()
Static(platform)
}()
}
wg.Wait()
})
DoTick(ctx, wg, time.Duration(VP.GetInt64("update_second_invite"))*time.Second, SyncInvite)
DoTickPlatform(ctx, wg, time.Duration(VP.GetInt64("update_second_item"))*time.Second, VP.GetInt("update_item_num"),
func(ctx context.Context, platform string, batchSize int) {
err := syn.ItemGainDone(&syn.Data[mongomodel.ItemLog]{
Platform: platform,
BatchSize: batchSize,
})
if err != nil {
logger.Logger.Errorf("SyncItem error:%v", err)
}
})
logger.Logger.Info("start")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
sig := <-c
logger.Logger.Infof("closing down (signal: %v)", sig)
// release
cancel()
wg.Wait()
logger.Logger.Info("closed")
}
// SyncSnId 同步注册和登录日志
func SyncSnId(ctx context.Context) {
wg := new(sync.WaitGroup)
for _, v := range VP.GetStringSlice("platforms") {
platform := v
wg.Add(1)
go func() {
defer wg.Done()
_, err := syn.UserAccount(platform, VP.GetInt("update_account_num"))
if err != nil {
logger.Logger.Errorf("SyncUserAccount error: %v", err)
return
}
_, err = syn.LogLogin(platform, VP.GetInt("update_login_num"))
if err != nil {
logger.Logger.Errorf("SyncLogLogin error: %v", err)
return
}
}()
}
wg.Wait()
}
// Static 玩家id触发数据统计
func Static(platform string) {
// 查询需要更新的玩家id
var ids []*mysqlmodel.UserID
db, err := mysql.GetDatabase(platform)
if err != nil {
logger.Logger.Errorf("GetDatabase error: %v", err)
return
}
if err := db.Limit(VP.GetInt("update_snid_num")).Find(&ids).Error; err != nil {
logger.Logger.Warnf("Get UserID error: %v", err)
return
}
if len(ids) == 0 {
logger.Logger.Tracef("Static: no need to update")
return
}
// 统计玩家跳出记录
if err := static.UserLogin(platform, ids); err != nil {
logger.Logger.Errorf("StaticUserLogin error: %v", err)
return
}
// 删除更新过的玩家id
if err := db.Delete(ids).Error; err != nil {
logger.Logger.Errorf("Delete error: %v", err)
return
}
}
// SyncInvite 同步邀请数据
func SyncInvite(ctx context.Context) {
wg := new(sync.WaitGroup)
for _, v := range VP.GetStringSlice("platforms") {
platform := v
wg.Add(1)
go func() {
defer wg.Done()
err := syn.SyncInviteScore(platform, VP.GetInt("update_invite_num"))
if err != nil {
logger.Logger.Errorf("SyncInviteScore error: %v", err)
return
}
}()
}
wg.Wait()
}