mirror of
https://github.com/minio/minio.git
synced 2025-11-06 20:33:07 -05:00
Support for remote tier management (#12090)
With this change, MinIO's ILM supports transitioning objects to a remote tier. This change includes support for Azure Blob Storage, AWS S3 compatible object storage incl. MinIO and Google Cloud Storage as remote tier storage backends. Some new additions include: - Admin APIs remote tier configuration management - Simple journal to track remote objects to be 'collected' This is used by object API handlers which 'mutate' object versions by overwriting/replacing content (Put/CopyObject) or removing the version itself (e.g DeleteObjectVersion). - Rework of previous ILM transition to fit the new model In the new model, a storage class (a.k.a remote tier) is defined by the 'remote' object storage type (one of s3, azure, GCS), bucket name and a prefix. * Fixed bugs, review comments, and more unit-tests - Leverage inline small object feature - Migrate legacy objects to the latest object format before transitioning - Fix restore to particular version if specified - Extend SharedDataDirCount to handle transitioned and restored objects - Restore-object should accept version-id for version-suspended bucket (#12091) - Check if remote tier creds have sufficient permissions - Bonus minor fixes to existing error messages Co-authored-by: Poorna Krishnamoorthy <poorna@minio.io> Co-authored-by: Krishna Srinivas <krishna@minio.io> Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
committed by
Harshavardhana
parent
069432566f
commit
c829e3a13b
@@ -27,13 +27,14 @@ import (
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bucket/lifecycle"
|
||||
"github.com/minio/minio/pkg/bucket/replication"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
"github.com/minio/minio/pkg/mimedb"
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
@@ -63,14 +64,14 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
}
|
||||
|
||||
defer ObjectPathUpdated(pathJoin(dstBucket, dstObject))
|
||||
|
||||
lk := er.NewNSLock(dstBucket, dstObject)
|
||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
if !dstOpts.NoLock {
|
||||
lk := er.NewNSLock(dstBucket, dstObject)
|
||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
storageDisks := er.getDisks()
|
||||
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true)
|
||||
@@ -181,8 +182,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
}
|
||||
if objInfo.TransitionStatus == lifecycle.TransitionComplete {
|
||||
// If transitioned, stream from transition tier unless object is restored locally or restore date is past.
|
||||
restoreHdr, ok := caseInsensitiveMap(objInfo.UserDefined).Lookup(xhttp.AmzRestore)
|
||||
if !ok || !strings.HasPrefix(restoreHdr, "ongoing-request=false") || (!objInfo.RestoreExpires.IsZero() && time.Now().After(objInfo.RestoreExpires)) {
|
||||
if onDisk := isRestoredObjectOnDisk(objInfo.UserDefined); !onDisk {
|
||||
return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
|
||||
}
|
||||
}
|
||||
@@ -460,13 +460,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
|
||||
|
||||
}
|
||||
objInfo = fi.ToObjectInfo(bucket, object)
|
||||
if objInfo.TransitionStatus == lifecycle.TransitionComplete {
|
||||
// overlay storage class for transitioned objects with transition tier SC Label
|
||||
if sc := transitionSC(ctx, bucket); sc != "" {
|
||||
objInfo.StorageClass = sc
|
||||
}
|
||||
}
|
||||
if !fi.VersionPurgeStatus.Empty() && opts.VersionID != "" {
|
||||
if !fi.VersionPurgeStatus.Empty() {
|
||||
// Make sure to return object info to provide extra information.
|
||||
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
@@ -643,12 +637,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
partsMetadata := make([]FileInfo, len(storageDisks))
|
||||
|
||||
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
|
||||
|
||||
if opts.Versioned {
|
||||
fi.VersionID = opts.VersionID
|
||||
if fi.VersionID == "" {
|
||||
fi.VersionID = mustGetUUID()
|
||||
}
|
||||
fi.VersionID = opts.VersionID
|
||||
if opts.Versioned && fi.VersionID == "" {
|
||||
fi.VersionID = mustGetUUID()
|
||||
}
|
||||
fi.DataDir = mustGetUUID()
|
||||
|
||||
@@ -1001,7 +992,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
||||
DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus,
|
||||
ObjectName: versions[objIndex].Name,
|
||||
VersionPurgeStatus: versions[objIndex].VersionPurgeStatus,
|
||||
PurgeTransitioned: objects[objIndex].PurgeTransitioned,
|
||||
}
|
||||
} else {
|
||||
dobjects[objIndex] = DeletedObject{
|
||||
@@ -1009,7 +999,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
||||
VersionID: versions[objIndex].VersionID,
|
||||
VersionPurgeStatus: versions[objIndex].VersionPurgeStatus,
|
||||
DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus,
|
||||
PurgeTransitioned: objects[objIndex].PurgeTransitioned,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1093,6 +1082,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||
if opts.MTime.IsZero() {
|
||||
modTime = UTCNow()
|
||||
}
|
||||
|
||||
if markDelete {
|
||||
if opts.Versioned || opts.VersionSuspended {
|
||||
fi := FileInfo{
|
||||
@@ -1102,6 +1092,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||
ModTime: modTime,
|
||||
DeleteMarkerReplicationStatus: opts.DeleteMarkerReplicationStatus,
|
||||
VersionPurgeStatus: opts.VersionPurgeStatus,
|
||||
TransitionStatus: opts.Transition.Status,
|
||||
ExpireRestored: opts.Transition.ExpireRestored,
|
||||
}
|
||||
if opts.Versioned {
|
||||
fi.VersionID = mustGetUUID()
|
||||
@@ -1109,8 +1101,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||
fi.VersionID = opts.VersionID
|
||||
}
|
||||
}
|
||||
fi.TransitionStatus = opts.TransitionStatus
|
||||
|
||||
// versioning suspended means we add `null`
|
||||
// version as delete marker
|
||||
// Add delete marker, since we don't have any version specified explicitly.
|
||||
@@ -1131,7 +1121,8 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||
ModTime: modTime,
|
||||
DeleteMarkerReplicationStatus: opts.DeleteMarkerReplicationStatus,
|
||||
VersionPurgeStatus: opts.VersionPurgeStatus,
|
||||
TransitionStatus: opts.TransitionStatus,
|
||||
TransitionStatus: opts.Transition.Status,
|
||||
ExpireRestored: opts.Transition.ExpireRestored,
|
||||
}, opts.DeleteMarker); err != nil {
|
||||
return objInfo, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -1302,3 +1293,250 @@ func (er erasureObjects) GetObjectTags(ctx context.Context, bucket, object strin
|
||||
|
||||
return tags.ParseObjectTags(oi.UserTags)
|
||||
}
|
||||
|
||||
// TransitionObject - transition object content to target tier.
|
||||
func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
tgtClient, err := globalTierConfigMgr.getDriver(opts.Transition.Tier)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Acquire write lock before starting to transition the object.
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
ctx, err = lk.GetLock(ctx, globalDeleteOperationTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
if fi.Deleted {
|
||||
if opts.VersionID == "" {
|
||||
return toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
// Make sure to return object info to provide extra information.
|
||||
return toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
// verify that the object queued for transition is identical to that on disk.
|
||||
if !opts.MTime.Equal(fi.ModTime) || !strings.EqualFold(opts.Transition.ETag, extractETag(fi.Metadata)) {
|
||||
return toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
// if object already transitioned, return
|
||||
if fi.TransitionStatus == lifecycle.TransitionComplete {
|
||||
return nil
|
||||
}
|
||||
if fi.XLV1 {
|
||||
if _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{NoLock: true}); err != nil {
|
||||
return err
|
||||
}
|
||||
// Fetch FileInfo again. HealObject migrates object the latest
|
||||
// format. Among other things this changes fi.DataDir and
|
||||
// possibly fi.Data (if data is inlined).
|
||||
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, bucket, object, opts, true)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
destObj, err := genTransitionObjName()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
err := er.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks)
|
||||
pw.CloseWithError(err)
|
||||
}()
|
||||
if err = tgtClient.Put(ctx, destObj, pr, fi.Size); err != nil {
|
||||
pr.Close()
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err))
|
||||
return err
|
||||
}
|
||||
pr.Close()
|
||||
fi.TransitionStatus = lifecycle.TransitionComplete
|
||||
fi.TransitionedObjName = destObj
|
||||
fi.TransitionTier = opts.Transition.Tier
|
||||
eventName := event.ObjectTransitionComplete
|
||||
|
||||
storageDisks := er.getDisks()
|
||||
writeQuorum := len(storageDisks)/2 + 1
|
||||
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil {
|
||||
eventName = event.ObjectTransitionFailed
|
||||
}
|
||||
for _, disk := range storageDisks {
|
||||
if disk != nil && disk.IsOnline() {
|
||||
continue
|
||||
}
|
||||
er.addPartial(bucket, object, opts.VersionID)
|
||||
break
|
||||
}
|
||||
// Notify object deleted event.
|
||||
sendEvent(eventArgs{
|
||||
EventName: eventName,
|
||||
BucketName: bucket,
|
||||
Object: ObjectInfo{
|
||||
Name: object,
|
||||
VersionID: opts.VersionID,
|
||||
},
|
||||
Host: "Internal: [ILM-Transition]",
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// RestoreTransitionedObject - restore transitioned object content locally on this cluster.
|
||||
// This is similar to PostObjectRestore from AWS GLACIER
|
||||
// storage class. When PostObjectRestore API is called, a temporary copy of the object
|
||||
// is restored locally to the bucket on source cluster until the restore expiry date.
|
||||
// The copy that was transitioned continues to reside in the transitioned tier.
|
||||
func (er erasureObjects) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
return er.restoreTransitionedObject(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// update restore status header in the metadata
|
||||
func (er erasureObjects) updateRestoreMetadata(ctx context.Context, bucket, object string, objInfo ObjectInfo, opts ObjectOptions, noLock bool, rerr error) error {
|
||||
oi := objInfo.Clone()
|
||||
oi.metadataOnly = true // Perform only metadata updates.
|
||||
|
||||
if rerr == nil {
|
||||
oi.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
|
||||
} else { // allow retry in the case of failure to restore
|
||||
delete(oi.UserDefined, xhttp.AmzRestore)
|
||||
}
|
||||
if _, err := er.CopyObject(ctx, bucket, object, bucket, object, oi, ObjectOptions{
|
||||
VersionID: oi.VersionID,
|
||||
}, ObjectOptions{
|
||||
VersionID: oi.VersionID,
|
||||
NoLock: noLock, // true if lock already taken
|
||||
}); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to update transition restore metadata for %s/%s(%s): %s", bucket, object, oi.VersionID, err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreTransitionedObject for multipart object chunks the file stream from remote tier into the same number of parts
|
||||
// as in the xl.meta for this version and rehydrates the part.n into the fi.DataDir for this version as in the xl.meta
|
||||
func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket string, object string, opts ObjectOptions) error {
|
||||
defer func() {
|
||||
ObjectPathUpdated(pathJoin(bucket, object))
|
||||
}()
|
||||
setRestoreHeaderFn := func(oi ObjectInfo, noLock bool, rerr error) error {
|
||||
er.updateRestoreMetadata(ctx, bucket, object, oi, opts, noLock, rerr)
|
||||
return rerr
|
||||
}
|
||||
var oi ObjectInfo
|
||||
// get the file info on disk for transitioned object
|
||||
actualfi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
}
|
||||
|
||||
oi = actualfi.ToObjectInfo(bucket, object)
|
||||
if len(oi.Parts) == 1 {
|
||||
var rs *HTTPRangeSpec
|
||||
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
}
|
||||
defer gr.Close()
|
||||
hashReader, err := hash.NewReader(gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
}
|
||||
pReader := NewPutObjReader(hashReader)
|
||||
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
|
||||
ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
|
||||
_, err = er.PutObject(ctx, bucket, object, pReader, ropts)
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
}
|
||||
uploadID, err := er.NewMultipartUpload(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
}
|
||||
var uploadedParts []CompletePart
|
||||
var rs *HTTPRangeSpec
|
||||
// get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted.
|
||||
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
}
|
||||
defer gr.Close()
|
||||
// rehydrate the parts back on disk as per the original xl.meta prior to transition
|
||||
for _, partInfo := range oi.Parts {
|
||||
hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
}
|
||||
pInfo, err := er.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, NewPutObjReader(hr), ObjectOptions{})
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
}
|
||||
uploadedParts = append(uploadedParts, CompletePart{
|
||||
PartNumber: pInfo.PartNumber,
|
||||
ETag: pInfo.ETag,
|
||||
})
|
||||
}
|
||||
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
storageDisks := er.getDisks()
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(reducedErr, bucket, object))
|
||||
}
|
||||
|
||||
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
}
|
||||
//validate parts created via multipart to transitioned object's parts info in xl.meta
|
||||
partsMatch := true
|
||||
if len(actualfi.Parts) != len(fi.Parts) {
|
||||
partsMatch = false
|
||||
}
|
||||
if len(actualfi.Parts) == len(fi.Parts) {
|
||||
for i, pi := range actualfi.Parts {
|
||||
if fi.Parts[i].Size != pi.Size {
|
||||
partsMatch = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !partsMatch {
|
||||
return setRestoreHeaderFn(oi, false, InvalidObjectState{Bucket: bucket, Object: object})
|
||||
}
|
||||
var currentFI = actualfi
|
||||
currentFI.DataDir = fi.DataDir
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
// Attempt to rename temp upload object to actual upload path object
|
||||
_, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, path.Join(uploadIDPath, fi.DataDir), bucket, path.Join(object, actualfi.DataDir), true, writeQuorum, nil)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, true, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath))
|
||||
}
|
||||
// Cleanup multipart upload dir.
|
||||
if err = er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, writeQuorum); err != nil {
|
||||
return setRestoreHeaderFn(oi, true, toObjectErr(err, bucket, object, uploadID))
|
||||
}
|
||||
return setRestoreHeaderFn(oi, true, nil)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user