182 lines
5.3 KiB
Go
182 lines
5.3 KiB
Go
package syn
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
"gorm.io/gorm"
|
||
|
||
mongomodel "mongo.games.com/game/statistics/modelmongo"
|
||
mysqlmodel "mongo.games.com/game/statistics/modelmysql"
|
||
"mongo.games.com/goserver/core/logger"
|
||
mymongo "mongo.games.com/goserver/core/mongox"
|
||
mymysql "mongo.games.com/goserver/core/mysqlx"
|
||
)
|
||
|
||
/*
|
||
登录日志同步使用了mongo的_id,从小到大每次同步n个
|
||
*/
|
||
|
||
// LogLogin 同步登录日志
|
||
func LogLogin(platform string, batchSize int) ([]*mysqlmodel.LogLogin, error) {
|
||
db, err := mymysql.GetDatabase(platform)
|
||
if err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to get database: %v", err)
|
||
return nil, err
|
||
}
|
||
loginMID := &mysqlmodel.LogLoginMid{ID: 1}
|
||
var n int64
|
||
err = db.Model(&mysqlmodel.LogLoginMid{}).Find(loginMID).Count(&n).Error
|
||
if err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to get log_login_mid: %v", err)
|
||
return nil, err
|
||
}
|
||
if n == 0 {
|
||
if err = db.Create(loginMID).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to create log_login_mid: %v", err)
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
logger.Logger.Tracef("start SyncLogLogin log_login _id:%v", loginMID.MID)
|
||
|
||
_id, _ := primitive.ObjectIDFromHex(loginMID.MID)
|
||
filter := bson.M{"_id": bson.M{"$gt": _id}}
|
||
c, err := mymongo.GetLogCollection(platform, mongomodel.LogLogin)
|
||
if err != nil {
|
||
logger.Logger.Errorf("get collection %s error %v", mongomodel.LogLogin, err)
|
||
return nil, err
|
||
}
|
||
l, err := c.Find(context.TODO(), filter,
|
||
options.Find().SetSort(bson.D{primitive.E{Key: "_id", Value: 1}}), options.Find().SetLimit(int64(batchSize)))
|
||
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||
logger.Logger.Errorf("mongo: SyncLogLogin failed to get log_login: %v", err)
|
||
return nil, err
|
||
}
|
||
|
||
var logs []*mongomodel.LoginLog
|
||
if err = l.All(context.TODO(), &logs); err != nil {
|
||
l.Close(context.TODO())
|
||
if errors.Is(err, mongo.ErrNoDocuments) {
|
||
return nil, nil
|
||
}
|
||
|
||
logger.Logger.Errorf("mongo: SyncLogLogin failed to get loginlog: %v", err)
|
||
return nil, err
|
||
}
|
||
l.Close(context.TODO())
|
||
|
||
var ls []*mysqlmodel.LogLogin
|
||
for _, v := range logs {
|
||
logger.Logger.Tracef("mongo SyncLogLogin log_login: %+v", *v)
|
||
var e *mysqlmodel.LogLogin
|
||
switch v.LogType {
|
||
case mongomodel.LogTypeLogin, mongomodel.LogTypeRehold:
|
||
onlineType := mysqlmodel.LogTypeLogin
|
||
if v.LogType == mongomodel.LogTypeRehold {
|
||
onlineType = mysqlmodel.LogTypeRehold
|
||
}
|
||
|
||
// 创建数据
|
||
var n int64
|
||
if err = db.Model(&mysqlmodel.LogLogin{}).Where("snid = ? AND online_type = ? AND online_time = ?",
|
||
v.SnId, onlineType, v.Time).Count(&n).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to get log_login count: %v", err)
|
||
return ls, err
|
||
}
|
||
|
||
if n == 0 {
|
||
e = &mysqlmodel.LogLogin{
|
||
Snid: int(v.SnId),
|
||
OnlineType: onlineType,
|
||
//OnlineTs: int(v.Ts),
|
||
OnlineTime: v.Time,
|
||
OfflineType: 0,
|
||
//OfflineTs: 0,
|
||
OfflineTime: v.Time,
|
||
DeviceName: v.DeviceName,
|
||
AppVersion: v.AppVersion,
|
||
BuildVersion: v.BuildVersion,
|
||
AppChannel: v.AppChannel,
|
||
ChannelId: v.ChannelId,
|
||
}
|
||
if err = db.Create(e).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to create log_login: %v", err)
|
||
return ls, err
|
||
}
|
||
} else {
|
||
continue
|
||
}
|
||
|
||
case mongomodel.LogTypeLogout, mongomodel.LogTypeDrop:
|
||
// 修改数据
|
||
e = &mysqlmodel.LogLogin{}
|
||
err = db.Model(&mysqlmodel.LogLogin{}).Where("snid = ?", v.SnId).Order("online_time DESC").First(e).Error
|
||
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to find log_login: %v", err)
|
||
return ls, err
|
||
}
|
||
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
logger.Logger.Warnf("mysql: SyncLogLogin not found log_login: %v", v)
|
||
continue
|
||
}
|
||
|
||
if e.OfflineType != 0 {
|
||
logger.Logger.Tracef("mysql: SyncLogLogin already offline: %+v", *e)
|
||
continue
|
||
}
|
||
|
||
e.OfflineType = mysqlmodel.LogTypeOffline
|
||
//e.OfflineTs = int(v.Ts)
|
||
e.OfflineTime = v.Time
|
||
if err = db.Model(e).Select("OfflineType", "OfflineTime").Updates(e).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to update log_login: %v", err)
|
||
return ls, err
|
||
}
|
||
default:
|
||
continue
|
||
}
|
||
|
||
if e != nil {
|
||
ls = append(ls, e)
|
||
}
|
||
}
|
||
|
||
if len(logs) > 0 {
|
||
err = db.Transaction(func(tx *gorm.DB) error {
|
||
loginMID.MID = logs[len(logs)-1].LogId.Hex()
|
||
if err = tx.Model(loginMID).Updates(loginMID).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to update log_login_mid: %v", err)
|
||
return err
|
||
}
|
||
|
||
for _, v := range ls {
|
||
if err = tx.First(&mysqlmodel.UserID{}, "snid = ?", v.Snid).Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to find user_id: %v", err)
|
||
return err
|
||
}
|
||
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
if err = tx.Create(&mysqlmodel.UserID{Snid: v.Snid}).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to create user_id: %v", err)
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
logger.Logger.Errorf("mysql: SyncLogLogin failed to transaction: %v", err)
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
return ls, nil
|
||
}
|