package syn import ( "context" "errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "gorm.io/gorm" "mongo.games.com/goserver/core/logger" mysqlmodel "mongo.games.com/game/statistics/modelmysql" mymongo "mongo.games.com/goserver/core/mongox" mymysql "mongo.games.com/goserver/core/mysqlx" ) // Data 数据同步方法 // T mongodb数据表结构 // F mongodb中的每条数据的处理操作,自行实现 type Data[T any] struct { Platform string // 平台 MidType int // 数据类型 例如 modelmysql.MidTypeItem Database string // 库名称 CollectionName string // 集合名称 BatchSize int // 一次读取数量 // F 自定义数据处理方法 // data: mongodb中的一条日志 F func(data *T, db *gorm.DB) (string, error) } // CommonDone 数据获取方式,根据mongodb集合主键按时间顺序批量读取 func (d *Data[T]) CommonDone() error { db, err := mymysql.GetDatabase(d.Platform) if err != nil { logger.Logger.Errorf("mysql: failed to get database: %v", err) return err } loginMID := &mysqlmodel.LogMid{Tp: d.MidType} var n int64 err = db.Model(&mysqlmodel.LogMid{}).Find(loginMID).Count(&n).Error if err != nil { logger.Logger.Errorf("mysql: failed to get log_mid: %v", err) return err } if n == 0 { if err = db.Create(loginMID).Error; err != nil { logger.Logger.Errorf("mysql: failed to create log_mid: %v", err) return err } } logger.Logger.Tracef("start log_mid tp:%v _id:%v", loginMID.Tp, loginMID.MID) _id, _ := primitive.ObjectIDFromHex(loginMID.MID) filter := bson.M{"_id": bson.M{"$gt": _id}} c, err := mymongo.GetCollection(d.Platform, mymongo.DatabaseType(d.Database), d.CollectionName) if err != nil { logger.Logger.Errorf("get collection %s %s error %v", d.Database, d.CollectionName, err) return err } l, err := c.Find(context.TODO(), filter, options.Find().SetSort(bson.D{primitive.E{Key: "_id", Value: 1}}), options.Find().SetLimit(int64(d.BatchSize))) if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { logger.Logger.Errorf("mongo: failed to get %v: %v", d.CollectionName, err) return err } var logs []*T if err = l.All(context.TODO(), &logs); err != nil { l.Close(context.TODO()) if errors.Is(err, mongo.ErrNoDocuments) { return nil } logger.Logger.Errorf("mongo: failed to get %v: %v", d.CollectionName, err) return err } l.Close(context.TODO()) if len(logs) == 0 { logger.Logger.Infof("sync %v finished", d.CollectionName) return nil } err = db.Transaction(func(tx *gorm.DB) error { for _, v := range logs { loginMID.MID, err = d.F(v, tx) if err != nil { logger.Logger.Errorf("Process %v error:%v", d.CollectionName, err) return err } if err = tx.Model(loginMID).Updates(loginMID).Error; err != nil { logger.Logger.Errorf("mysql: failed to update %v log_mid: %v", d.CollectionName, err) return err } } return nil }) return err }