feat: add batch replicate from remote MinIO to local cluster (#16922)

This commit is contained in:
Poorna 2023-03-31 10:48:36 -07:00 committed by GitHub
parent d90d0c8931
commit 407c9ddcbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 422 additions and 17 deletions

View File

@ -28,6 +28,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
"path"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
@ -37,10 +38,14 @@ import (
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/lithammer/shortuuid/v4" "github.com/lithammer/shortuuid/v4"
"github.com/minio/madmin-go/v2" "github.com/minio/madmin-go/v2"
"github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/workers" "github.com/minio/minio/internal/workers"
@ -189,6 +194,11 @@ type BatchJobReplicateCredentials struct {
SessionToken string `xml:"SessionToken" json:"sessionToken,omitempty" yaml:"sessionToken"` SessionToken string `xml:"SessionToken" json:"sessionToken,omitempty" yaml:"sessionToken"`
} }
// Empty indicates if credentials are not set
func (c BatchJobReplicateCredentials) Empty() bool {
return c.AccessKey == "" && c.SecretKey == "" && c.SessionToken == ""
}
// Validate validates if credentials are valid // Validate validates if credentials are valid
func (c BatchJobReplicateCredentials) Validate() error { func (c BatchJobReplicateCredentials) Validate() error {
if !auth.IsAccessKeyValid(c.AccessKey) || !auth.IsSecretKeyValid(c.SecretKey) { if !auth.IsAccessKeyValid(c.AccessKey) || !auth.IsSecretKeyValid(c.SecretKey) {
@ -227,6 +237,11 @@ type BatchJobReplicateV1 struct {
clnt *miniogo.Core `msg:"-"` clnt *miniogo.Core `msg:"-"`
} }
// RemoteToLocal returns true if source is remote and target is local
func (r BatchJobReplicateV1) RemoteToLocal() bool {
return !r.Source.Creds.Empty()
}
// BatchJobRequest this is an internal data structure not for external consumption. // BatchJobRequest this is an internal data structure not for external consumption.
type BatchJobRequest struct { type BatchJobRequest struct {
ID string `yaml:"-" json:"name"` ID string `yaml:"-" json:"name"`
@ -270,10 +285,360 @@ func (r BatchJobReplicateV1) Notify(ctx context.Context, body io.Reader) error {
} }
// ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local. // ReplicateFromSource - this is not implemented yet where source is 'remote' and target is local.
func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObject string) error { func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api ObjectLayer, core *minio.Core, srcObjInfo ObjectInfo, retry bool) error {
srcBucket := r.Source.Bucket
tgtBucket := r.Target.Bucket
srcObject := srcObjInfo.Name
tgtObject := srcObjInfo.Name
if r.Target.Prefix != "" {
tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name)
}
versioned := globalBucketVersioningSys.PrefixEnabled(tgtBucket, tgtObject)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(tgtBucket, tgtObject)
if srcObjInfo.DeleteMarker {
_, err := api.DeleteObject(ctx, tgtBucket, tgtObject, ObjectOptions{
VersionID: srcObjInfo.VersionID,
VersionSuspended: versionSuspended,
Versioned: versioned,
MTime: srcObjInfo.ModTime,
DeleteMarker: srcObjInfo.DeleteMarker,
ReplicationRequest: true,
})
return err
}
opts := ObjectOptions{
VersionID: srcObjInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
MTime: srcObjInfo.ModTime,
PreserveETag: srcObjInfo.ETag,
UserDefined: srcObjInfo.UserDefined,
}
if crypto.S3.IsEncrypted(srcObjInfo.UserDefined) {
opts.ServerSideEncryption = encrypt.NewSSE()
}
slc := strings.Split(srcObjInfo.ETag, "-")
if len(slc) == 2 {
partsCount, err := strconv.Atoi(slc[1])
if err != nil {
return err
}
return r.copyWithMultipartfromSource(ctx, api, core, srcObjInfo, opts, partsCount)
}
gopts := minio.GetObjectOptions{
VersionID: srcObjInfo.VersionID,
}
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
return err
}
rd, objInfo, _, err := core.GetObject(ctx, srcBucket, srcObject, gopts)
if err != nil {
return err
}
defer rd.Close()
hr, err := hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
pReader := NewPutObjReader(hr)
_, err = api.PutObject(ctx, tgtBucket, tgtObject, pReader, opts)
return err
}
func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, api ObjectLayer, c *minio.Core, srcObjInfo ObjectInfo, opts ObjectOptions, partsCount int) (err error) {
srcBucket := r.Source.Bucket
tgtBucket := r.Target.Bucket
srcObject := srcObjInfo.Name
tgtObject := srcObjInfo.Name
if r.Target.Prefix != "" {
tgtObject = path.Join(r.Target.Prefix, srcObjInfo.Name)
}
var uploadedParts []CompletePart
res, err := api.NewMultipartUpload(context.Background(), tgtBucket, tgtObject, opts)
if err != nil {
return err
}
defer func() {
if err != nil {
// block and abort remote upload upon failure.
attempts := 1
for attempts <= 3 {
aerr := api.AbortMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, ObjectOptions{})
if aerr == nil {
return
}
logger.LogIf(ctx,
fmt.Errorf("trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster",
humanize.Ordinal(attempts), res.UploadID, tgtBucket, tgtObject, aerr))
attempts++
time.Sleep(time.Second)
}
}
}()
var (
hr *hash.Reader
pInfo PartInfo
)
for i := 0; i < partsCount; i++ {
gopts := minio.GetObjectOptions{
VersionID: srcObjInfo.VersionID,
PartNumber: i + 1,
}
if err := gopts.SetMatchETag(srcObjInfo.ETag); err != nil {
return err
}
rd, objInfo, _, err := c.GetObject(ctx, srcBucket, srcObject, gopts)
if err != nil {
return err
}
defer rd.Close()
hr, err = hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
pReader := NewPutObjReader(hr)
opts.PreserveETag = ""
pInfo, err = api.PutObjectPart(ctx, tgtBucket, tgtObject, res.UploadID, i+1, pReader, opts)
if err != nil {
return err
}
if pInfo.Size != objInfo.Size {
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, objInfo.Size)
}
uploadedParts = append(uploadedParts, CompletePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = api.CompleteMultipartUpload(ctx, tgtBucket, tgtObject, res.UploadID, uploadedParts, opts)
return err
}
// StartFromSource starts the batch replication job from remote source, resumes if there was a pending job via "job.ID"
func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
ri := &batchJobInfo{
JobID: job.ID,
JobType: string(job.Type()),
StartTime: job.Started,
}
if err := ri.load(ctx, api, job); err != nil {
return err
}
globalBatchJobsMetrics.save(job.ID, ri)
delay := job.Replicate.Flags.Retry.Delay
if delay == 0 {
delay = batchReplJobDefaultRetryDelay
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
skip := func(oi ObjectInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan {
// skip all objects that are newer than specified older duration
return true
}
if r.Flags.Filter.NewerThan > 0 && time.Since(oi.ModTime) >= r.Flags.Filter.NewerThan {
// skip all objects that are older than specified newer duration
return true
}
if !r.Flags.Filter.CreatedAfter.IsZero() && r.Flags.Filter.CreatedAfter.Before(oi.ModTime) {
// skip all objects that are created before the specified time.
return true
}
if !r.Flags.Filter.CreatedBefore.IsZero() && r.Flags.Filter.CreatedBefore.After(oi.ModTime) {
// skip all objects that are created after the specified time.
return true
}
if len(r.Flags.Filter.Tags) > 0 {
// Only parse object tags if tags filter is specified.
tagMap := map[string]string{}
tagStr := oi.UserTags
if len(tagStr) != 0 {
t, err := tags.ParseObjectTags(tagStr)
if err != nil {
return false
}
tagMap = t.ToMap()
}
for _, kv := range r.Flags.Filter.Tags {
for t, v := range tagMap {
if kv.Match(BatchJobReplicateKV{Key: t, Value: v}) {
return true
}
}
}
// None of the provided tags filter match skip the object
return false
}
if len(r.Flags.Filter.Metadata) > 0 {
for _, kv := range r.Flags.Filter.Metadata {
for k, v := range oi.UserDefined {
if !strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") && !isStandardHeader(k) {
continue
}
// We only need to match x-amz-meta or standardHeaders
if kv.Match(BatchJobReplicateKV{Key: k, Value: v}) {
return true
}
}
}
// None of the provided metadata filters match skip the object.
return false
}
return false
}
u, err := url.Parse(r.Source.Endpoint)
if err != nil {
return err
}
cred := r.Source.Creds
c, err := miniogo.New(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
})
if err != nil {
return err
}
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
core := &minio.Core{Client: c}
workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2)))
if err != nil {
return err
}
wk, err := workers.New(workerSize)
if err != nil {
// invalid worker size.
return err
}
retryAttempts := ri.RetryAttempts
retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts
ctx, cancel := context.WithCancel(ctx)
objInfoCh := c.ListObjects(ctx, r.Source.Bucket, minio.ListObjectsOptions{
Prefix: r.Source.Prefix,
WithVersions: true,
Recursive: true,
WithMetadata: true,
})
for obj := range objInfoCh {
oi := toObjectInfo(r.Source.Bucket, obj.Key, obj)
if skip(oi) {
continue
}
wk.Take()
go func() {
defer wk.Give()
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, oi)
success := true
if err := r.ReplicateFromSource(ctx, api, core, oi, retry); err != nil {
// object must be deleted concurrently, allow these failures but do not count them
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
return
}
stopFn(err)
logger.LogIf(ctx, err)
success = false
} else {
stopFn(nil)
}
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
}()
}
wk.Wait()
ri.RetryAttempts = attempts
ri.Complete = ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job.Location))
buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
}
cancel()
if ri.Failed {
ri.ObjectsFailed = 0
ri.Bucket = ""
ri.Object = ""
ri.Objects = 0
ri.BytesFailed = 0
ri.BytesTransferred = 0
retry = true // indicate we are retrying..
time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay)))
continue
}
break
}
return nil return nil
} }
// toObjectInfo converts minio.ObjectInfo to ObjectInfo
func toObjectInfo(bucket, object string, objInfo minio.ObjectInfo) ObjectInfo {
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
oi := ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: objInfo.LastModified,
Size: objInfo.Size,
ETag: objInfo.ETag,
VersionID: objInfo.VersionID,
IsLatest: objInfo.IsLatest,
DeleteMarker: objInfo.IsDeleteMarker,
ContentType: objInfo.ContentType,
Expires: objInfo.Expires,
StorageClass: objInfo.StorageClass,
ReplicationStatusInternal: objInfo.ReplicationStatus,
UserTags: tags.String(),
}
oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
for k, v := range objInfo.Metadata {
oi.UserDefined[k] = v[0]
}
ce, ok := oi.UserDefined[xhttp.ContentEncoding]
if !ok {
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
}
if ok {
oi.ContentEncoding = ce
}
return oi
}
// ReplicateToTarget read from source and replicate to configured target // ReplicateToTarget read from source and replicate to configured target
func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error { func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error {
srcBucket := r.Source.Bucket srcBucket := r.Source.Bucket
@ -705,7 +1070,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
buf, _ := json.Marshal(ri) buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil { if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to notify %v", err)) logger.LogIf(ctx, fmt.Errorf("unable to notify %v", err))
} }
cancel() cancel()
@ -751,8 +1116,11 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
if r.Source.Bucket == "" { if r.Source.Bucket == "" {
return errInvalidArgument return errInvalidArgument
} }
localBkt := r.Source.Bucket
info, err := o.GetBucketInfo(ctx, r.Source.Bucket, BucketOptions{}) if r.Source.Endpoint != "" {
localBkt = r.Target.Bucket
}
info, err := o.GetBucketInfo(ctx, localBkt, BucketOptions{})
if err != nil { if err != nil {
if isErrBucketNotFound(err) { if isErrBucketNotFound(err) {
return batchReplicationJobError{ return batchReplicationJobError{
@ -767,8 +1135,20 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
if err := r.Source.Type.Validate(); err != nil { if err := r.Source.Type.Validate(); err != nil {
return err return err
} }
if r.Source.Creds.Empty() && r.Target.Creds.Empty() {
return errInvalidArgument
}
if r.Target.Endpoint == "" { if !r.Source.Creds.Empty() {
if err := r.Source.Creds.Validate(); err != nil {
return err
}
}
if r.Target.Endpoint == "" && !r.Target.Creds.Empty() {
return errInvalidArgument
}
if r.Source.Endpoint == "" && !r.Source.Creds.Empty() {
return errInvalidArgument return errInvalidArgument
} }
@ -776,8 +1156,14 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
return errInvalidArgument return errInvalidArgument
} }
if err := r.Target.Creds.Validate(); err != nil { if !r.Target.Creds.Empty() {
return err if err := r.Target.Creds.Validate(); err != nil {
return err
}
}
if r.Source.Creds.Empty() && r.Target.Creds.Empty() {
return errInvalidArgument
} }
if err := r.Target.Type.Validate(); err != nil { if err := r.Target.Type.Validate(); err != nil {
@ -800,13 +1186,21 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
return err return err
} }
u, err := url.Parse(r.Target.Endpoint) remoteEp := r.Target.Endpoint
remoteBkt := r.Target.Bucket
cred := r.Target.Creds
if r.Source.Endpoint != "" {
remoteEp = r.Source.Endpoint
cred = r.Source.Creds
remoteBkt = r.Source.Bucket
}
u, err := url.Parse(remoteEp)
if err != nil { if err != nil {
return err return err
} }
cred := r.Target.Creds
c, err := miniogo.NewCore(u.Host, &miniogo.Options{ c, err := miniogo.NewCore(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https", Secure: u.Scheme == "https",
@ -817,7 +1211,7 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
} }
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
vcfg, err := c.GetBucketVersioning(ctx, r.Target.Bucket) vcfg, err := c.GetBucketVersioning(ctx, remoteBkt)
if err != nil { if err != nil {
if miniogo.ToErrorResponse(err).Code == "NoSuchBucket" { if miniogo.ToErrorResponse(err).Code == "NoSuchBucket" {
return batchReplicationJobError{ return batchReplicationJobError{
@ -1154,13 +1548,24 @@ func (j *BatchJobPool) AddWorker() {
return return
} }
if job.Replicate != nil { if job.Replicate != nil {
if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil { if job.Replicate.RemoteToLocal() {
if !isErrBucketNotFound(err) { if err := job.Replicate.StartFromSource(job.ctx, j.objLayer, *job); err != nil {
logger.LogIf(j.ctx, err) if !isErrBucketNotFound(err) {
j.canceler(job.ID, false) logger.LogIf(j.ctx, err)
continue j.canceler(job.ID, false)
continue
}
// Bucket not found proceed to delete such a job.
}
} else {
if err := job.Replicate.Start(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
j.canceler(job.ID, false)
continue
}
// Bucket not found proceed to delete such a job.
} }
// Bucket not found proceed to delete such a job.
} }
} }
job.delete(j.ctx, j.objLayer) job.delete(j.ctx, j.objLayer)