103 lines
3.0 KiB
Go
103 lines
3.0 KiB
Go
package syn
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
"gorm.io/gorm"
|
||
"mongo.games.com/goserver/core/logger"
|
||
|
||
mysqlmodel "mongo.games.com/game/statistics/modelmysql"
|
||
mymongo "mongo.games.com/goserver/core/mongox"
|
||
mymysql "mongo.games.com/goserver/core/mysqlx"
|
||
)
|
||
|
||
// Data 数据同步方法
|
||
// T mongodb数据表结构
|
||
// F mongodb中的每条数据的处理操作,自行实现
|
||
type Data[T any] struct {
|
||
Platform string // 平台
|
||
MidType int // 数据类型 例如 modelmysql.MidTypeItem
|
||
Database string // 库名称
|
||
CollectionName string // 集合名称
|
||
BatchSize int // 一次读取数量
|
||
// F 自定义数据处理方法
|
||
// data: mongodb中的一条日志
|
||
F func(data *T, db *gorm.DB) (string, error)
|
||
}
|
||
|
||
// CommonDone 数据获取方式,根据mongodb集合主键按时间顺序批量读取
|
||
func (d *Data[T]) CommonDone() error {
|
||
db, err := mymysql.GetDatabase(d.Platform)
|
||
if err != nil {
|
||
logger.Logger.Errorf("mysql: failed to get database: %v", err)
|
||
return err
|
||
}
|
||
loginMID := &mysqlmodel.LogMid{Tp: d.MidType}
|
||
var n int64
|
||
err = db.Model(&mysqlmodel.LogMid{}).Find(loginMID).Count(&n).Error
|
||
if err != nil {
|
||
logger.Logger.Errorf("mysql: failed to get log_mid: %v", err)
|
||
return err
|
||
}
|
||
if n == 0 {
|
||
if err = db.Create(loginMID).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: failed to create log_mid: %v", err)
|
||
return err
|
||
}
|
||
}
|
||
|
||
logger.Logger.Tracef("start log_mid tp:%v _id:%v", loginMID.Tp, loginMID.MID)
|
||
|
||
_id, _ := primitive.ObjectIDFromHex(loginMID.MID)
|
||
filter := bson.M{"_id": bson.M{"$gt": _id}}
|
||
c, err := mymongo.GetCollection(d.Platform, mymongo.DatabaseType(d.Database), d.CollectionName)
|
||
if err != nil {
|
||
logger.Logger.Errorf("get collection %s %s error %v", d.Database, d.CollectionName, err)
|
||
return err
|
||
}
|
||
l, err := c.Find(context.TODO(), filter,
|
||
options.Find().SetSort(bson.D{primitive.E{Key: "_id", Value: 1}}), options.Find().SetLimit(int64(d.BatchSize)))
|
||
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||
logger.Logger.Errorf("mongo: failed to get %v: %v", d.CollectionName, err)
|
||
return err
|
||
}
|
||
|
||
var logs []*T
|
||
if err = l.All(context.TODO(), &logs); err != nil {
|
||
l.Close(context.TODO())
|
||
if errors.Is(err, mongo.ErrNoDocuments) {
|
||
return nil
|
||
}
|
||
|
||
logger.Logger.Errorf("mongo: failed to get %v: %v", d.CollectionName, err)
|
||
return err
|
||
}
|
||
l.Close(context.TODO())
|
||
if len(logs) == 0 {
|
||
logger.Logger.Infof("sync %v finished", d.CollectionName)
|
||
return nil
|
||
}
|
||
|
||
err = db.Transaction(func(tx *gorm.DB) error {
|
||
for _, v := range logs {
|
||
loginMID.MID, err = d.F(v, tx)
|
||
if err != nil {
|
||
logger.Logger.Errorf("Process %v error:%v", d.CollectionName, err)
|
||
return err
|
||
}
|
||
if err = tx.Model(loginMID).Updates(loginMID).Error; err != nil {
|
||
logger.Logger.Errorf("mysql: failed to update %v log_mid: %v", d.CollectionName, err)
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
|
||
return err
|
||
}
|