From ad8e611098443a8683482508618f6f58d896a254 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Fri, 6 May 2022 19:05:28 -0700 Subject: [PATCH] feat: implement prefix-level versioning exclusion (#14828) Spark/Hadoop workloads which use Hadoop MR Committer v1/v2 algorithm upload objects to a temporary prefix in a bucket. These objects are 'renamed' to a different prefix on Job commit. Object storage admins are forced to configure separate ILM policies to expire these objects and their versions to reclaim space. Our solution: This can be avoided by simply marking objects under these prefixes to be excluded from versioning, as shown below. Consequently, these objects are excluded from replication, and don't require ILM policies to prune unnecessary versions. - MinIO Extension to Bucket Version Configuration ```xml Enabled true app1-jobs/*/_temporary/ app2-jobs/*/__magic/ ``` Note: `ExcludeFolders` excludes all folders in a bucket from versioning. This is required to prevent the parent folders from accumulating delete markers, especially those which are shared across spark workloads spanning projects/teams. - To enable version exclusion on a list of prefixes ``` mc version enable --excluded-prefixes "app1-jobs/*/_temporary/,app2-jobs/*/_magic," --exclude-prefix-marker myminio/test ``` --- cmd/bucket-handlers.go | 15 +- cmd/bucket-lifecycle.go | 18 +- cmd/bucket-metadata-sys.go | 2 +- cmd/bucket-replication-utils.go | 2 +- cmd/bucket-replication.go | 14 +- cmd/bucket-versioning-handler.go | 2 +- cmd/bucket-versioning.go | 29 +- cmd/data-scanner.go | 5 +- cmd/erasure-metadata.go | 2 +- cmd/erasure-object.go | 11 +- cmd/erasure-server-pool-decom.go | 4 +- cmd/object-api-interface.go | 3 + cmd/object-api-listobjects_test.go | 14 +- cmd/object-api-options.go | 10 +- cmd/object-handlers-common.go | 7 +- cmd/object-handlers.go | 14 +- docs/bucket/replication/README.md | 20 ++ docs/bucket/versioning/README.md | 25 ++ internal/bucket/versioning/versioning.go | 90 ++++++- internal/bucket/versioning/versioning_test.go | 250 ++++++++++++++++++ 20 files changed, 478 insertions(+), 59 deletions(-) create mode 100644 internal/bucket/versioning/versioning_test.go diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index b95834817..eb4642371 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -471,9 +471,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, hasLockEnabled = true } - versioned := globalBucketVersioningSys.Enabled(bucket) - suspended := globalBucketVersioningSys.Suspended(bucket) - type deleteResult struct { delInfo DeletedObject errInfo DeleteError @@ -481,8 +478,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deleteResults := make([]deleteResult, len(deleteObjectsReq.Objects)) + vc, _ := globalBucketVersioningSys.Get(bucket) oss := make([]*objSweeper, len(deleteObjectsReq.Objects)) - for index, object := range deleteObjectsReq.Objects { if apiErrCode := checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object.ObjectName); apiErrCode != ErrNone { if apiErrCode == ErrSignatureDoesNotMatch || apiErrCode == ErrInvalidAccessKeyID { @@ -514,8 +511,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, opts := ObjectOptions{ VersionID: object.VersionID, - Versioned: versioned, - VersionSuspended: suspended, + Versioned: vc.PrefixEnabled(object.ObjectName), + VersionSuspended: vc.PrefixSuspended(object.ObjectName), } if replicateDeletes || object.VersionID != "" && hasLockEnabled || !globalTierConfigMgr.Empty() { @@ -526,7 +523,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } if !globalTierConfigMgr.Empty() { - oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(opts.VersionID).WithVersioning(versioned, suspended) + oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended) oss[index].SetTransitionState(goi.TransitionedObject) } @@ -581,8 +578,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deleteList := toNames(objectsToDelete) dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{ - Versioned: versioned, - VersionSuspended: suspended, + PrefixEnabledFn: vc.PrefixEnabled, + PrefixSuspendedFn: vc.PrefixSuspended, }) for i := range errs { diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 4a8ec93cd..1b8c83ecc 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -330,7 +330,7 @@ const ( // 2. when a transitioned object expires (based on an ILM rule). func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, action expireAction) error { var opts ObjectOptions - opts.Versioned = globalBucketVersioningSys.Enabled(oi.Bucket) + opts.Versioned = globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name) opts.VersionID = lcOpts.VersionID opts.Expiration = ExpirationOptions{Expire: true} switch action { @@ -415,8 +415,8 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) ETag: oi.ETag, }, VersionID: oi.VersionID, - Versioned: globalBucketVersioningSys.Enabled(oi.Bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(oi.Bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name), MTime: oi.ModTime, } return tier, objectAPI.TransitionObject(ctx, oi.Bucket, oi.Name, opts) @@ -575,8 +575,8 @@ func (r *RestoreObjectRequest) validate(ctx context.Context, objAPI ObjectLayer) // postRestoreOpts returns ObjectOptions with version-id from the POST restore object request for a given bucket and object. func postRestoreOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) { - versioned := globalBucketVersioningSys.Enabled(bucket) - versionSuspended := globalBucketVersioningSys.Suspended(bucket) + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) if vid != "" && vid != nullVersionID { _, err := uuid.Parse(vid) @@ -627,8 +627,8 @@ func putRestoreOpts(bucket, object string, rreq *RestoreObjectRequest, objInfo O meta[xhttp.AmzServerSideEncryption] = xhttp.AmzEncryptionAES } return ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, object), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(bucket, object), UserDefined: meta, } } @@ -640,8 +640,8 @@ func putRestoreOpts(bucket, object string, rreq *RestoreObjectRequest, objInfo O } return ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, object), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(bucket, object), UserDefined: meta, VersionID: objInfo.VersionID, MTime: objInfo.ModTime, diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index c5e72a896..125d6a2fd 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -196,7 +196,7 @@ func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) { func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, error) { meta, err := sys.GetConfig(GlobalContext, bucket) if err != nil { - return nil, err + return &versioning.Versioning{}, err } return meta.versioningConfig, nil } diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 79e359e02..1a94c1029 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -509,7 +509,7 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl ObjectName: oi.Name, VersionID: oi.VersionID, }, - }, oi, ObjectOptions{}, nil) + }, oi, ObjectOptions{VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name)}, nil) } else { dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(ObjectInfo{ UserDefined: oi.UserDefined, diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 57cd23850..33c2a6405 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -180,6 +180,11 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica return } + // Disable server-side replication on object prefixes which are excluded + // from versioning via the MinIO bucket versioning extension. + if globalBucketVersioningSys.PrefixSuspended(bucket, object) { + return + } replStatus := mopts.ReplicationStatus() if replStatus == replication.Replica && !mopts.isMetadataReplication() { return @@ -264,6 +269,11 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet if delOpts.ReplicationRequest { return } + // Skip replication if this object's prefix is excluded from being + // versioned. + if delOpts.VersionSuspended { + return + } opts := replication.ObjectOpts{ Name: dobj.ObjectName, SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined), @@ -457,8 +467,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj VersionID: versionID, MTime: dobj.DeleteMarkerMTime.Time, DeleteReplication: drs, - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, dobj.ObjectName), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(bucket, dobj.ObjectName), }) if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on. logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err)) diff --git a/cmd/bucket-versioning-handler.go b/cmd/bucket-versioning-handler.go index d121f41d3..746bad119 100644 --- a/cmd/bucket-versioning-handler.go +++ b/cmd/bucket-versioning-handler.go @@ -72,7 +72,7 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r return } - if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled && v.Suspended() { + if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled && (v.Suspended() || v.PrefixesExcluded()) { writeErrorResponse(ctx, w, APIError{ Code: "InvalidBucketState", Description: "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed.", diff --git a/cmd/bucket-versioning.go b/cmd/bucket-versioning.go index 33078af93..370f41e06 100644 --- a/cmd/bucket-versioning.go +++ b/cmd/bucket-versioning.go @@ -31,6 +31,17 @@ func (sys *BucketVersioningSys) Enabled(bucket string) bool { return vc.Enabled() } +// PrefixEnabled returns true is versioning is enabled at bucket level and if +// the given prefix doesn't match any excluded prefixes pattern. This is +// part of a MinIO versioning configuration extension. +func (sys *BucketVersioningSys) PrefixEnabled(bucket, prefix string) bool { + vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + if err != nil { + return false + } + return vc.PrefixEnabled(prefix) +} + // Suspended suspended versioning? func (sys *BucketVersioningSys) Suspended(bucket string) bool { vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) @@ -40,15 +51,19 @@ func (sys *BucketVersioningSys) Suspended(bucket string) bool { return vc.Suspended() } +// PrefixSuspended returns true if the given prefix matches an excluded prefix +// pattern. This is part of a MinIO versioning configuration extension. +func (sys *BucketVersioningSys) PrefixSuspended(bucket, prefix string) bool { + vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + if err != nil { + return false + } + + return vc.PrefixSuspended(prefix) +} + // Get returns stored bucket policy func (sys *BucketVersioningSys) Get(bucket string) (*versioning.Versioning, error) { - if globalIsGateway { - objAPI := newObjectLayerFn() - if objAPI == nil { - return nil, errServerNotInitialized - } - return nil, NotImplemented{} - } return globalBucketMetadataSys.GetVersioningConfig(bucket) } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index fad4142c3..33c2872cd 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1184,7 +1184,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay opts.VersionID = obj.VersionID } if opts.VersionID == "" { - opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) + opts.Versioned = globalBucketVersioningSys.PrefixEnabled(obj.Bucket, obj.Name) } obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts) @@ -1243,6 +1243,9 @@ func (i *scannerItem) objectPath() string { // healReplication will heal a scanned item that has failed replication. func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) { roi := getHealReplicateObjectInfo(oi, i.replication) + if !roi.Dsc.ReplicateAny() { + return + } if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { // heal delete marker replication failure or versioned delete replication failure diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 85e07b75a..33c8dd1b4 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -107,7 +107,7 @@ func (fi FileInfo) IsValid() bool { func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { object = decodeDirObject(object) versionID := fi.VersionID - if (globalBucketVersioningSys.Enabled(bucket) || globalBucketVersioningSys.Suspended(bucket)) && versionID == "" { + if (globalBucketVersioningSys.PrefixEnabled(bucket, object) || globalBucketVersioningSys.PrefixSuspended(bucket, object)) && versionID == "" { versionID = nullVersionID } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 7b5f6b51d..58bd8b290 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1187,7 +1187,16 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec // VersionID is not set means delete is not specific about // any version, look for if the bucket is versioned or not. if objects[i].VersionID == "" { - if opts.Versioned || opts.VersionSuspended { + // MinIO extension to bucket version configuration + suspended := opts.VersionSuspended + if opts.PrefixSuspendedFn != nil { + suspended = opts.PrefixSuspendedFn(objects[i].ObjectName) + } + versioned := opts.Versioned + if opts.PrefixEnabledFn != nil { + versioned = opts.PrefixEnabledFn(objects[i].ObjectName) + } + if versioned || suspended { // Bucket is versioned and no version was explicitly // mentioned for deletes, create a delete marker instead. vr.ModTime = UTCNow() diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 11c738d1e..a93b14d1a 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -619,7 +619,6 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool parallelWorkers := make(chan struct{}, workerSize) - versioned := globalBucketVersioningSys.Enabled(bName) for _, set := range pool.sets { set := set disks := set.getOnlineDisks() @@ -629,6 +628,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool continue } + vc, _ := globalBucketVersioningSys.Get(bName) decommissionEntry := func(entry metaCacheEntry) { defer func() { <-parallelWorkers @@ -667,7 +667,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool bName, version.Name, ObjectOptions{ - Versioned: versioned, + Versioned: vc.PrefixEnabled(version.Name), VersionID: version.VersionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 9dfa8ccb4..991ab5070 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -76,6 +76,9 @@ type ObjectOptions struct { // Mutate set to 'true' if the call is namespace mutation call Mutate bool WalkAscending bool // return Walk results in ascending order of versions + + PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix + PrefixSuspendedFn func(prefix string) bool // function which returns true if versioning is suspended on prefix } // ExpirationOptions represents object options for object expiration at objectLayer. diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index a66abbfbc..a808c44df 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -70,7 +70,7 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te md5Bytes := md5.Sum([]byte(object.content)) _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), UserDefined: object.meta, }) if err != nil { @@ -78,7 +78,7 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te } if object.addDeleteMarker { oi, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), }) if err != nil { t.Fatalf("%s : %s", instanceType, err.Error()) @@ -380,7 +380,7 @@ func _testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler, v _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), UserDefined: object.meta, }) if err != nil { @@ -1065,16 +1065,16 @@ func testDeleteObjectVersion(obj ObjectLayer, instanceType string, t1 TestErrHan _, err := obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), - VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(object.parentBucket, object.name), UserDefined: object.meta, }) if err != nil { t.Fatalf("%s : %s", instanceType, err) } obj, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), - VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(object.parentBucket, object.name), VersionID: object.versionID, }) if err != nil { diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index f7da0f29b..5fb75fdfc 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -161,13 +161,12 @@ func getOpts(ctx context.Context, r *http.Request, bucket, object string) (Objec } func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) { - versioned := globalBucketVersioningSys.Enabled(bucket) opts, err = getOpts(ctx, r, bucket, object) if err != nil { return opts, err } - opts.Versioned = versioned - opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket) + opts.Versioned = globalBucketVersioningSys.PrefixEnabled(bucket, object) + opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(bucket, object) delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker)) if delMarker != "" { switch delMarker { @@ -203,8 +202,9 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts // get ObjectOptions for PUT calls from encryption headers and metadata func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { - versioned := globalBucketVersioningSys.Enabled(bucket) - versionSuspended := globalBucketVersioningSys.Suspended(bucket) + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) + vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) if vid != "" && vid != nullVersionID { _, err := uuid.Parse(vid) diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index f1444f26e..8b503e84f 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -266,8 +266,6 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, delete bool) { } func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete) { - versioned := globalBucketVersioningSys.Enabled(bucket) - versionSuspended := globalBucketVersioningSys.Suspended(bucket) for remaining := toDel; len(remaining) > 0; toDel = remaining { if len(toDel) > maxDeleteList { remaining = toDel[maxDeleteList:] @@ -275,9 +273,10 @@ func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toD } else { remaining = nil } + vc, _ := globalBucketVersioningSys.Get(bucket) deletedObjs, errs := o.DeleteObjects(ctx, bucket, toDel, ObjectOptions{ - Versioned: versioned, - VersionSuspended: versionSuspended, + PrefixEnabledFn: vc.PrefixEnabled, + PrefixSuspendedFn: vc.PrefixSuspended, }) var logged bool for i, err := range errs { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2c4e73375..d46146d90 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -204,7 +204,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r objInfo, err := getObjectInfo(ctx, bucket, object, opts) if err != nil { - if globalBucketVersioningSys.Enabled(bucket) { + if globalBucketVersioningSys.PrefixEnabled(bucket, object) { // Versioning enabled quite possibly object is deleted might be delete-marker // if present set the headers, no idea why AWS S3 sets these headers. if objInfo.VersionID != "" && objInfo.DeleteMarker { @@ -445,7 +445,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj writeErrorResponse(ctx, w, toAPIError(ctx, proxy.Err), r.URL) return } - if globalBucketVersioningSys.Enabled(bucket) && gr != nil { + if globalBucketVersioningSys.PrefixEnabled(bucket, object) && gr != nil { if !gr.ObjInfo.VersionPurgeStatus.Empty() { // Shows the replication status of a permanent delete of a version w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(gr.ObjInfo.VersionPurgeStatus)} @@ -673,7 +673,7 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob } } if !proxy.Proxy { - if globalBucketVersioningSys.Enabled(bucket) { + if globalBucketVersioningSys.PrefixEnabled(bucket, object) { switch { case !objInfo.VersionPurgeStatus.Empty(): w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(objInfo.VersionPurgeStatus)} @@ -1087,7 +1087,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if isErrPreconditionFailed(err) { return } - if globalBucketVersioningSys.Enabled(srcBucket) && gr != nil { + if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil { // Versioning enabled quite possibly object is deleted might be delete-marker // if present set the headers, no idea why AWS S3 sets these headers. if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { @@ -2480,7 +2480,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt if isErrPreconditionFailed(err) { return } - if globalBucketVersioningSys.Enabled(srcBucket) && gr != nil { + if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil { // Versioning enabled quite possibly object is deleted might be delete-marker // if present set the headers, no idea why AWS S3 sets these headers. if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { @@ -3211,8 +3211,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.Write(encodedErrorResponse) } - versioned := globalBucketVersioningSys.Enabled(bucket) - suspended := globalBucketVersioningSys.Suspended(bucket) + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) if !globalTierConfigMgr.Empty() { // Get appropriate object info to identify the remote object to delete diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index 142d4ff78..8534f22c1 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -243,6 +243,26 @@ Replication from a source bucket to multiple destination buckets is supported. F Note that on the source side, the `X-Amz-Replication-Status` changes from `PENDING` to `COMPLETED` after replication succeeds to each of the targets. On the destination side, a `X-Amz-Replication-Status` status of `REPLICA` indicates that the object was replicated successfully. Any replication failures are automatically re-attempted during a periodic disk scanner cycle. +### Interaction with extended Bucket Versioning configuration +When Bucket Versioning with excluded prefixes are configured objects matching these prefixes are excluded from being versioned and replicated. + +``` + + Enabled + true + + + app1-jobs/*/_temporary/ + + + app2-jobs/*/_magic/ + + + + +``` +In the above sample config, objects under prefixes matching any of the `ExcludedPrefixes` glob patterns will neither be versioned nor replicated. + ## Explore Further - [MinIO Bucket Replication Design](https://github.com/minio/minio/blob/master/docs/bucket/replication/DESIGN.md) diff --git a/docs/bucket/versioning/README.md b/docs/bucket/versioning/README.md index 315b09a41..7c1aa44d2 100644 --- a/docs/bucket/versioning/README.md +++ b/docs/bucket/versioning/README.md @@ -63,6 +63,31 @@ Similarly to suspend versioning set the configuration with Status set to `Suspen ``` +## MinIO extension to Bucket Versioning +### Motivation +Spark/Hadoop workloads which use Hadoop MR Committer v1/v2 algorithm upload objects to a temporary prefix in a bucket. These objects are 'renamed' to a different prefix on Job commit. Object storage admins are forced to configure separate ILM policies to expire these objects and their versions to reclaim space. + +### Solution +To exclude objects under a list of prefix (glob) patterns from being versioned, you can send the following versioning configuration with Status set to `Enabled`. + +``` + + Enabled + true + + + app1-jobs/*/_temporary/ + + + app2-jobs/*/_magic/ + + + + +``` + +Note: Objects matching these prefixes will behave as though versioning were suspended. These objects **will not** be replicated if replication is configured. + Only users with explicit permissions or the root credential can configure the versioning state of any bucket. ## Examples of enabling bucket versioning using MinIO Java SDK diff --git a/internal/bucket/versioning/versioning.go b/internal/bucket/versioning/versioning.go index 2141afb2b..4069ab233 100644 --- a/internal/bucket/versioning/versioning.go +++ b/internal/bucket/versioning/versioning.go @@ -20,6 +20,9 @@ package versioning import ( "encoding/xml" "io" + "strings" + + "github.com/minio/pkg/wildcard" ) // State - enabled/disabled/suspended states @@ -33,12 +36,27 @@ const ( Suspended State = "Suspended" ) +var ( + errExcludedPrefixNotSupported = Errorf("excluded prefixes extension supported only when versioning is enabled") + errTooManyExcludedPrefixes = Errorf("too many excluded prefixes") + errInvalidPrefixPattern = Errorf("invalid prefix pattern") +) + +// ExcludedPrefix - holds individual prefixes excluded from being versioned. +type ExcludedPrefix struct { + Prefix string +} + // Versioning - Configuration for bucket versioning. type Versioning struct { XMLNS string `xml:"xmlns,attr,omitempty"` XMLName xml.Name `xml:"VersioningConfiguration"` // MFADelete State `xml:"MFADelete,omitempty"` // not supported yet. Status State `xml:"Status,omitempty"` + // MinIO extension - allows selective, prefix-level versioning exclusion. + // Requires versioning to be enabled + ExcludedPrefixes []ExcludedPrefix `xml:",omitempty"` + ExcludeFolders bool `xml:",omitempty"` } // Validate - validates the versioning configuration @@ -50,7 +68,21 @@ func (v Versioning) Validate() error { // return Errorf("unsupported MFADelete state %s", v.MFADelete) // } switch v.Status { - case Enabled, Suspended: + case Enabled: + const maxExcludedPrefixes = 10 + if len(v.ExcludedPrefixes) > maxExcludedPrefixes { + return errTooManyExcludedPrefixes + } + for _, sprefix := range v.ExcludedPrefixes { + if !strings.HasSuffix(sprefix.Prefix, "/") { + return errInvalidPrefixPattern + } + } + + case Suspended: + if len(v.ExcludedPrefixes) > 0 { + return errExcludedPrefixNotSupported + } default: return Errorf("unsupported Versioning status %s", v.Status) } @@ -62,11 +94,67 @@ func (v Versioning) Enabled() bool { return v.Status == Enabled } +// PrefixEnabled - returns true if versioning is enabled at the bucket and given +// prefix, false otherwise. +func (v Versioning) PrefixEnabled(prefix string) bool { + if v.Status != Enabled { + return false + } + + if prefix == "" { + return true + } + if v.ExcludeFolders && strings.HasSuffix(prefix, "/") { + return false + } + + for _, sprefix := range v.ExcludedPrefixes { + // Note: all excluded prefix patterns end with `/` (See Validate) + sprefix.Prefix += "*" + + if matched := wildcard.MatchSimple(sprefix.Prefix, prefix); matched { + return false + } + } + return true +} + // Suspended - returns true if versioning is suspended func (v Versioning) Suspended() bool { return v.Status == Suspended } +// PrefixSuspended - returns true if versioning is suspended at the bucket level +// or suspended on the given prefix. +func (v Versioning) PrefixSuspended(prefix string) bool { + if v.Status == Suspended { + return true + } + if v.Status == Enabled { + if prefix == "" { + return false + } + if v.ExcludeFolders && strings.HasSuffix(prefix, "/") { + return true + } + + for _, sprefix := range v.ExcludedPrefixes { + // Note: all excluded prefix patterns end with `/` (See Validate) + sprefix.Prefix += "*" + if matched := wildcard.MatchSimple(sprefix.Prefix, prefix); matched { + return true + } + } + } + return false +} + +// PrefixesExcluded returns true if v contains one or more excluded object +// prefixes or if ExcludeFolders is true. +func (v Versioning) PrefixesExcluded() bool { + return len(v.ExcludedPrefixes) > 0 || v.ExcludeFolders +} + // ParseConfig - parses data in given reader to VersioningConfiguration. func ParseConfig(reader io.Reader) (*Versioning, error) { var v Versioning diff --git a/internal/bucket/versioning/versioning_test.go b/internal/bucket/versioning/versioning_test.go new file mode 100644 index 000000000..8f34bbb29 --- /dev/null +++ b/internal/bucket/versioning/versioning_test.go @@ -0,0 +1,250 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package versioning + +import ( + "encoding/xml" + "strings" + "testing" +) + +func TestParseConfig(t *testing.T) { + testcases := []struct { + input string + err error + excludedPrefixes []string + excludeFolders bool + }{ + { + input: ` + Enabled + `, + err: nil, + }, + { + input: ` + Enabled + + path/to/my/workload/_staging/ + + + path/to/my/workload/_temporary/ + + `, + err: nil, + excludedPrefixes: []string{"path/to/my/workload/_staging/", "path/to/my/workload/_temporary/"}, + }, + { + input: ` + Suspended + + path/to/my/workload/_staging + + `, + err: errExcludedPrefixNotSupported, + }, + { + input: ` + Enabled + + path/to/my/workload/_staging/ab/ + + + path/to/my/workload/_staging/cd/ + + + path/to/my/workload/_staging/ef/ + + + path/to/my/workload/_staging/gh/ + + + path/to/my/workload/_staging/ij/ + + + path/to/my/workload/_staging/kl/ + + + path/to/my/workload/_staging/mn/ + + + path/to/my/workload/_staging/op/ + + + path/to/my/workload/_staging/qr/ + + + path/to/my/workload/_staging/st/ + + + path/to/my/workload/_staging/uv/ + + `, + err: errTooManyExcludedPrefixes, + }, + { + input: ` + Enabled + true + + path/to/my/workload/_staging/ + + + path/to/my/workload/_temporary/ + + `, + err: nil, + excludedPrefixes: []string{"path/to/my/workload/_staging/", "path/to/my/workload/_temporary/"}, + excludeFolders: true, + }, + { + input: ` + Enabled + + path/to/my/workload/_staging + + `, + err: errInvalidPrefixPattern, + }, + } + + for i, tc := range testcases { + var v *Versioning + var err error + v, err = ParseConfig(strings.NewReader(tc.input)) + if tc.err != err { + t.Fatalf("Test %d: expected %v but got %v", i+1, tc.err, err) + } + if err != nil { + if tc.err == nil { + t.Fatalf("Test %d: failed due to %v", i+1, err) + } + } else { + if err := v.Validate(); tc.err != err { + t.Fatalf("Test %d: validation failed due to %v", i+1, err) + } + if len(tc.excludedPrefixes) > 0 { + var mismatch bool + if len(v.ExcludedPrefixes) != len(tc.excludedPrefixes) { + t.Fatalf("Test %d: Expected length of excluded prefixes %d but got %d", i+1, len(tc.excludedPrefixes), len(v.ExcludedPrefixes)) + } + var i int + var eprefix string + for i, eprefix = range tc.excludedPrefixes { + if eprefix != v.ExcludedPrefixes[i].Prefix { + mismatch = true + break + } + } + if mismatch { + t.Fatalf("Test %d: Expected excluded prefix %s but got %s", i+1, tc.excludedPrefixes[i], v.ExcludedPrefixes[i].Prefix) + } + } + if tc.excludeFolders != v.ExcludeFolders { + t.Fatalf("Test %d: Expected ExcludeFoldersr=%v but got %v", i+1, tc.excludeFolders, v.ExcludeFolders) + } + } + } +} + +func TestMarshalXML(t *testing.T) { + // Validates if Versioning with no excluded prefixes omits + // ExcludedPrefixes tags + v := Versioning{ + Status: Enabled, + } + buf, err := xml.Marshal(v) + if err != nil { + t.Fatalf("Failed to marshal %v: %v", v, err) + } + + str := string(buf) + if strings.Contains(str, "ExcludedPrefixes") { + t.Fatalf("XML shouldn't contain ExcludedPrefixes tag - %s", str) + } +} + +func TestVersioningZero(t *testing.T) { + var v Versioning + if v.Enabled() { + t.Fatalf("Expected to be disabled but got enabled") + } + if v.Suspended() { + t.Fatalf("Expected to be disabled but got suspended") + } +} + +func TestExcludeFolders(t *testing.T) { + v := Versioning{ + Status: Enabled, + ExcludeFolders: true, + } + testPrefixes := []string{"jobs/output/_temporary/", "jobs/output/", "jobs/"} + for i, prefix := range testPrefixes { + if v.PrefixEnabled(prefix) || !v.PrefixSuspended(prefix) { + t.Fatalf("Test %d: Expected versioning to be excluded for %s", i+1, prefix) + } + } + + // Test applicability for regular objects + if prefix := "prefix-1/obj-1"; !v.PrefixEnabled(prefix) || v.PrefixSuspended(prefix) { + t.Fatalf("Expected versioning to be enabled for %s", prefix) + } + + // Test when ExcludeFolders is disabled + v.ExcludeFolders = false + for i, prefix := range testPrefixes { + if !v.PrefixEnabled(prefix) || v.PrefixSuspended(prefix) { + t.Fatalf("Test %d: Expected versioning to be enabled for %s", i+1, prefix) + } + } +} + +func TestExcludedPrefixesMatch(t *testing.T) { + v := Versioning{ + Status: Enabled, + ExcludedPrefixes: []ExcludedPrefix{{"*/_temporary/"}}, + } + + if err := v.Validate(); err != nil { + t.Fatalf("Invalid test versioning config %v: %v", v, err) + } + tests := []struct { + prefix string + excluded bool + }{ + { + prefix: "app1-jobs/output/_temporary/attempt1/data.csv", + excluded: true, + }, + { + prefix: "app1-jobs/output/final/attempt1/data.csv", + excluded: false, + }, + } + + for i, test := range tests { + if v.PrefixSuspended(test.prefix) != test.excluded { + if test.excluded { + t.Fatalf("Test %d: Expected prefix %s to be excluded from versioning", i+1, test.prefix) + } else { + t.Fatalf("Test %d: Expected prefix %s to have versioning enabled", i+1, test.prefix) + } + } + } +}