diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index b95834817..eb4642371 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -471,9 +471,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, hasLockEnabled = true } - versioned := globalBucketVersioningSys.Enabled(bucket) - suspended := globalBucketVersioningSys.Suspended(bucket) - type deleteResult struct { delInfo DeletedObject errInfo DeleteError @@ -481,8 +478,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deleteResults := make([]deleteResult, len(deleteObjectsReq.Objects)) + vc, _ := globalBucketVersioningSys.Get(bucket) oss := make([]*objSweeper, len(deleteObjectsReq.Objects)) - for index, object := range deleteObjectsReq.Objects { if apiErrCode := checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object.ObjectName); apiErrCode != ErrNone { if apiErrCode == ErrSignatureDoesNotMatch || apiErrCode == ErrInvalidAccessKeyID { @@ -514,8 +511,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, opts := ObjectOptions{ VersionID: object.VersionID, - Versioned: versioned, - VersionSuspended: suspended, + Versioned: vc.PrefixEnabled(object.ObjectName), + VersionSuspended: vc.PrefixSuspended(object.ObjectName), } if replicateDeletes || object.VersionID != "" && hasLockEnabled || !globalTierConfigMgr.Empty() { @@ -526,7 +523,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, } if !globalTierConfigMgr.Empty() { - oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(opts.VersionID).WithVersioning(versioned, suspended) + oss[index] = newObjSweeper(bucket, object.ObjectName).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended) oss[index].SetTransitionState(goi.TransitionedObject) } @@ -581,8 +578,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, deleteList := toNames(objectsToDelete) dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{ - Versioned: versioned, - VersionSuspended: suspended, + PrefixEnabledFn: vc.PrefixEnabled, + PrefixSuspendedFn: vc.PrefixSuspended, }) for i := range errs { diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 4a8ec93cd..1b8c83ecc 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -330,7 +330,7 @@ const ( // 2. when a transitioned object expires (based on an ILM rule). func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, action expireAction) error { var opts ObjectOptions - opts.Versioned = globalBucketVersioningSys.Enabled(oi.Bucket) + opts.Versioned = globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name) opts.VersionID = lcOpts.VersionID opts.Expiration = ExpirationOptions{Expire: true} switch action { @@ -415,8 +415,8 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) ETag: oi.ETag, }, VersionID: oi.VersionID, - Versioned: globalBucketVersioningSys.Enabled(oi.Bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(oi.Bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name), MTime: oi.ModTime, } return tier, objectAPI.TransitionObject(ctx, oi.Bucket, oi.Name, opts) @@ -575,8 +575,8 @@ func (r *RestoreObjectRequest) validate(ctx context.Context, objAPI ObjectLayer) // postRestoreOpts returns ObjectOptions with version-id from the POST restore object request for a given bucket and object. func postRestoreOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) { - versioned := globalBucketVersioningSys.Enabled(bucket) - versionSuspended := globalBucketVersioningSys.Suspended(bucket) + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) if vid != "" && vid != nullVersionID { _, err := uuid.Parse(vid) @@ -627,8 +627,8 @@ func putRestoreOpts(bucket, object string, rreq *RestoreObjectRequest, objInfo O meta[xhttp.AmzServerSideEncryption] = xhttp.AmzEncryptionAES } return ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, object), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(bucket, object), UserDefined: meta, } } @@ -640,8 +640,8 @@ func putRestoreOpts(bucket, object string, rreq *RestoreObjectRequest, objInfo O } return ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, object), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(bucket, object), UserDefined: meta, VersionID: objInfo.VersionID, MTime: objInfo.ModTime, diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index c5e72a896..125d6a2fd 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -196,7 +196,7 @@ func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) { func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, error) { meta, err := sys.GetConfig(GlobalContext, bucket) if err != nil { - return nil, err + return &versioning.Versioning{}, err } return meta.versioningConfig, nil } diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 79e359e02..1a94c1029 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -509,7 +509,7 @@ func getHealReplicateObjectInfo(objInfo ObjectInfo, rcfg replicationConfig) Repl ObjectName: oi.Name, VersionID: oi.VersionID, }, - }, oi, ObjectOptions{}, nil) + }, oi, ObjectOptions{VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name)}, nil) } else { dsc = mustReplicate(GlobalContext, oi.Bucket, oi.Name, getMustReplicateOptions(ObjectInfo{ UserDefined: oi.UserDefined, diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 57cd23850..33c2a6405 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -180,6 +180,11 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica return } + // Disable server-side replication on object prefixes which are excluded + // from versioning via the MinIO bucket versioning extension. + if globalBucketVersioningSys.PrefixSuspended(bucket, object) { + return + } replStatus := mopts.ReplicationStatus() if replStatus == replication.Replica && !mopts.isMetadataReplication() { return @@ -264,6 +269,11 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet if delOpts.ReplicationRequest { return } + // Skip replication if this object's prefix is excluded from being + // versioned. + if delOpts.VersionSuspended { + return + } opts := replication.ObjectOpts{ Name: dobj.ObjectName, SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined), @@ -457,8 +467,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj VersionID: versionID, MTime: dobj.DeleteMarkerMTime.Time, DeleteReplication: drs, - Versioned: globalBucketVersioningSys.Enabled(bucket), - VersionSuspended: globalBucketVersioningSys.Suspended(bucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(bucket, dobj.ObjectName), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(bucket, dobj.ObjectName), }) if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on. logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err)) diff --git a/cmd/bucket-versioning-handler.go b/cmd/bucket-versioning-handler.go index d121f41d3..746bad119 100644 --- a/cmd/bucket-versioning-handler.go +++ b/cmd/bucket-versioning-handler.go @@ -72,7 +72,7 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r return } - if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled && v.Suspended() { + if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled && (v.Suspended() || v.PrefixesExcluded()) { writeErrorResponse(ctx, w, APIError{ Code: "InvalidBucketState", Description: "An Object Lock configuration is present on this bucket, so the versioning state cannot be changed.", diff --git a/cmd/bucket-versioning.go b/cmd/bucket-versioning.go index 33078af93..370f41e06 100644 --- a/cmd/bucket-versioning.go +++ b/cmd/bucket-versioning.go @@ -31,6 +31,17 @@ func (sys *BucketVersioningSys) Enabled(bucket string) bool { return vc.Enabled() } +// PrefixEnabled returns true is versioning is enabled at bucket level and if +// the given prefix doesn't match any excluded prefixes pattern. This is +// part of a MinIO versioning configuration extension. +func (sys *BucketVersioningSys) PrefixEnabled(bucket, prefix string) bool { + vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + if err != nil { + return false + } + return vc.PrefixEnabled(prefix) +} + // Suspended suspended versioning? func (sys *BucketVersioningSys) Suspended(bucket string) bool { vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) @@ -40,15 +51,19 @@ func (sys *BucketVersioningSys) Suspended(bucket string) bool { return vc.Suspended() } +// PrefixSuspended returns true if the given prefix matches an excluded prefix +// pattern. This is part of a MinIO versioning configuration extension. +func (sys *BucketVersioningSys) PrefixSuspended(bucket, prefix string) bool { + vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + if err != nil { + return false + } + + return vc.PrefixSuspended(prefix) +} + // Get returns stored bucket policy func (sys *BucketVersioningSys) Get(bucket string) (*versioning.Versioning, error) { - if globalIsGateway { - objAPI := newObjectLayerFn() - if objAPI == nil { - return nil, errServerNotInitialized - } - return nil, NotImplemented{} - } return globalBucketMetadataSys.GetVersioningConfig(bucket) } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index fad4142c3..33c2872cd 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1184,7 +1184,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay opts.VersionID = obj.VersionID } if opts.VersionID == "" { - opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket) + opts.Versioned = globalBucketVersioningSys.PrefixEnabled(obj.Bucket, obj.Name) } obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts) @@ -1243,6 +1243,9 @@ func (i *scannerItem) objectPath() string { // healReplication will heal a scanned item that has failed replication. func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) { roi := getHealReplicateObjectInfo(oi, i.replication) + if !roi.Dsc.ReplicateAny() { + return + } if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { // heal delete marker replication failure or versioned delete replication failure diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 85e07b75a..33c8dd1b4 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -107,7 +107,7 @@ func (fi FileInfo) IsValid() bool { func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { object = decodeDirObject(object) versionID := fi.VersionID - if (globalBucketVersioningSys.Enabled(bucket) || globalBucketVersioningSys.Suspended(bucket)) && versionID == "" { + if (globalBucketVersioningSys.PrefixEnabled(bucket, object) || globalBucketVersioningSys.PrefixSuspended(bucket, object)) && versionID == "" { versionID = nullVersionID } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 7b5f6b51d..58bd8b290 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1187,7 +1187,16 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec // VersionID is not set means delete is not specific about // any version, look for if the bucket is versioned or not. if objects[i].VersionID == "" { - if opts.Versioned || opts.VersionSuspended { + // MinIO extension to bucket version configuration + suspended := opts.VersionSuspended + if opts.PrefixSuspendedFn != nil { + suspended = opts.PrefixSuspendedFn(objects[i].ObjectName) + } + versioned := opts.Versioned + if opts.PrefixEnabledFn != nil { + versioned = opts.PrefixEnabledFn(objects[i].ObjectName) + } + if versioned || suspended { // Bucket is versioned and no version was explicitly // mentioned for deletes, create a delete marker instead. vr.ModTime = UTCNow() diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 11c738d1e..a93b14d1a 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -619,7 +619,6 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool parallelWorkers := make(chan struct{}, workerSize) - versioned := globalBucketVersioningSys.Enabled(bName) for _, set := range pool.sets { set := set disks := set.getOnlineDisks() @@ -629,6 +628,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool continue } + vc, _ := globalBucketVersioningSys.Get(bName) decommissionEntry := func(entry metaCacheEntry) { defer func() { <-parallelWorkers @@ -667,7 +667,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool bName, version.Name, ObjectOptions{ - Versioned: versioned, + Versioned: vc.PrefixEnabled(version.Name), VersionID: version.VersionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 9dfa8ccb4..991ab5070 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -76,6 +76,9 @@ type ObjectOptions struct { // Mutate set to 'true' if the call is namespace mutation call Mutate bool WalkAscending bool // return Walk results in ascending order of versions + + PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix + PrefixSuspendedFn func(prefix string) bool // function which returns true if versioning is suspended on prefix } // ExpirationOptions represents object options for object expiration at objectLayer. diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index a66abbfbc..a808c44df 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -70,7 +70,7 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te md5Bytes := md5.Sum([]byte(object.content)) _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), UserDefined: object.meta, }) if err != nil { @@ -78,7 +78,7 @@ func testListObjectsVersionedFolders(obj ObjectLayer, instanceType string, t1 Te } if object.addDeleteMarker { oi, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), }) if err != nil { t.Fatalf("%s : %s", instanceType, err.Error()) @@ -380,7 +380,7 @@ func _testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler, v _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), UserDefined: object.meta, }) if err != nil { @@ -1065,16 +1065,16 @@ func testDeleteObjectVersion(obj ObjectLayer, instanceType string, t1 TestErrHan _, err := obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), - VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(object.parentBucket, object.name), UserDefined: object.meta, }) if err != nil { t.Fatalf("%s : %s", instanceType, err) } obj, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{ - Versioned: globalBucketVersioningSys.Enabled(object.parentBucket), - VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket), + Versioned: globalBucketVersioningSys.PrefixEnabled(object.parentBucket, object.name), + VersionSuspended: globalBucketVersioningSys.PrefixSuspended(object.parentBucket, object.name), VersionID: object.versionID, }) if err != nil { diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index f7da0f29b..5fb75fdfc 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -161,13 +161,12 @@ func getOpts(ctx context.Context, r *http.Request, bucket, object string) (Objec } func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts ObjectOptions, err error) { - versioned := globalBucketVersioningSys.Enabled(bucket) opts, err = getOpts(ctx, r, bucket, object) if err != nil { return opts, err } - opts.Versioned = versioned - opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket) + opts.Versioned = globalBucketVersioningSys.PrefixEnabled(bucket, object) + opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(bucket, object) delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker)) if delMarker != "" { switch delMarker { @@ -203,8 +202,9 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts // get ObjectOptions for PUT calls from encryption headers and metadata func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { - versioned := globalBucketVersioningSys.Enabled(bucket) - versionSuspended := globalBucketVersioningSys.Suspended(bucket) + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) + vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) if vid != "" && vid != nullVersionID { _, err := uuid.Parse(vid) diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index f1444f26e..8b503e84f 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -266,8 +266,6 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, delete bool) { } func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete) { - versioned := globalBucketVersioningSys.Enabled(bucket) - versionSuspended := globalBucketVersioningSys.Suspended(bucket) for remaining := toDel; len(remaining) > 0; toDel = remaining { if len(toDel) > maxDeleteList { remaining = toDel[maxDeleteList:] @@ -275,9 +273,10 @@ func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toD } else { remaining = nil } + vc, _ := globalBucketVersioningSys.Get(bucket) deletedObjs, errs := o.DeleteObjects(ctx, bucket, toDel, ObjectOptions{ - Versioned: versioned, - VersionSuspended: versionSuspended, + PrefixEnabledFn: vc.PrefixEnabled, + PrefixSuspendedFn: vc.PrefixSuspended, }) var logged bool for i, err := range errs { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2c4e73375..d46146d90 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -204,7 +204,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r objInfo, err := getObjectInfo(ctx, bucket, object, opts) if err != nil { - if globalBucketVersioningSys.Enabled(bucket) { + if globalBucketVersioningSys.PrefixEnabled(bucket, object) { // Versioning enabled quite possibly object is deleted might be delete-marker // if present set the headers, no idea why AWS S3 sets these headers. if objInfo.VersionID != "" && objInfo.DeleteMarker { @@ -445,7 +445,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj writeErrorResponse(ctx, w, toAPIError(ctx, proxy.Err), r.URL) return } - if globalBucketVersioningSys.Enabled(bucket) && gr != nil { + if globalBucketVersioningSys.PrefixEnabled(bucket, object) && gr != nil { if !gr.ObjInfo.VersionPurgeStatus.Empty() { // Shows the replication status of a permanent delete of a version w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(gr.ObjInfo.VersionPurgeStatus)} @@ -673,7 +673,7 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob } } if !proxy.Proxy { - if globalBucketVersioningSys.Enabled(bucket) { + if globalBucketVersioningSys.PrefixEnabled(bucket, object) { switch { case !objInfo.VersionPurgeStatus.Empty(): w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(objInfo.VersionPurgeStatus)} @@ -1087,7 +1087,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if isErrPreconditionFailed(err) { return } - if globalBucketVersioningSys.Enabled(srcBucket) && gr != nil { + if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil { // Versioning enabled quite possibly object is deleted might be delete-marker // if present set the headers, no idea why AWS S3 sets these headers. if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { @@ -2480,7 +2480,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt if isErrPreconditionFailed(err) { return } - if globalBucketVersioningSys.Enabled(srcBucket) && gr != nil { + if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil { // Versioning enabled quite possibly object is deleted might be delete-marker // if present set the headers, no idea why AWS S3 sets these headers. if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { @@ -3211,8 +3211,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.Write(encodedErrorResponse) } - versioned := globalBucketVersioningSys.Enabled(bucket) - suspended := globalBucketVersioningSys.Suspended(bucket) + versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) + suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended) if !globalTierConfigMgr.Empty() { // Get appropriate object info to identify the remote object to delete diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index 142d4ff78..8534f22c1 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -243,6 +243,26 @@ Replication from a source bucket to multiple destination buckets is supported. F Note that on the source side, the `X-Amz-Replication-Status` changes from `PENDING` to `COMPLETED` after replication succeeds to each of the targets. On the destination side, a `X-Amz-Replication-Status` status of `REPLICA` indicates that the object was replicated successfully. Any replication failures are automatically re-attempted during a periodic disk scanner cycle. +### Interaction with extended Bucket Versioning configuration +When Bucket Versioning with excluded prefixes are configured objects matching these prefixes are excluded from being versioned and replicated. + +``` + + Enabled + true + + + app1-jobs/*/_temporary/ + + + app2-jobs/*/_magic/ + + + + +``` +In the above sample config, objects under prefixes matching any of the `ExcludedPrefixes` glob patterns will neither be versioned nor replicated. + ## Explore Further - [MinIO Bucket Replication Design](https://github.com/minio/minio/blob/master/docs/bucket/replication/DESIGN.md) diff --git a/docs/bucket/versioning/README.md b/docs/bucket/versioning/README.md index 315b09a41..7c1aa44d2 100644 --- a/docs/bucket/versioning/README.md +++ b/docs/bucket/versioning/README.md @@ -63,6 +63,31 @@ Similarly to suspend versioning set the configuration with Status set to `Suspen ``` +## MinIO extension to Bucket Versioning +### Motivation +Spark/Hadoop workloads which use Hadoop MR Committer v1/v2 algorithm upload objects to a temporary prefix in a bucket. These objects are 'renamed' to a different prefix on Job commit. Object storage admins are forced to configure separate ILM policies to expire these objects and their versions to reclaim space. + +### Solution +To exclude objects under a list of prefix (glob) patterns from being versioned, you can send the following versioning configuration with Status set to `Enabled`. + +``` + + Enabled + true + + + app1-jobs/*/_temporary/ + + + app2-jobs/*/_magic/ + + + + +``` + +Note: Objects matching these prefixes will behave as though versioning were suspended. These objects **will not** be replicated if replication is configured. + Only users with explicit permissions or the root credential can configure the versioning state of any bucket. ## Examples of enabling bucket versioning using MinIO Java SDK diff --git a/internal/bucket/versioning/versioning.go b/internal/bucket/versioning/versioning.go index 2141afb2b..4069ab233 100644 --- a/internal/bucket/versioning/versioning.go +++ b/internal/bucket/versioning/versioning.go @@ -20,6 +20,9 @@ package versioning import ( "encoding/xml" "io" + "strings" + + "github.com/minio/pkg/wildcard" ) // State - enabled/disabled/suspended states @@ -33,12 +36,27 @@ const ( Suspended State = "Suspended" ) +var ( + errExcludedPrefixNotSupported = Errorf("excluded prefixes extension supported only when versioning is enabled") + errTooManyExcludedPrefixes = Errorf("too many excluded prefixes") + errInvalidPrefixPattern = Errorf("invalid prefix pattern") +) + +// ExcludedPrefix - holds individual prefixes excluded from being versioned. +type ExcludedPrefix struct { + Prefix string +} + // Versioning - Configuration for bucket versioning. type Versioning struct { XMLNS string `xml:"xmlns,attr,omitempty"` XMLName xml.Name `xml:"VersioningConfiguration"` // MFADelete State `xml:"MFADelete,omitempty"` // not supported yet. Status State `xml:"Status,omitempty"` + // MinIO extension - allows selective, prefix-level versioning exclusion. + // Requires versioning to be enabled + ExcludedPrefixes []ExcludedPrefix `xml:",omitempty"` + ExcludeFolders bool `xml:",omitempty"` } // Validate - validates the versioning configuration @@ -50,7 +68,21 @@ func (v Versioning) Validate() error { // return Errorf("unsupported MFADelete state %s", v.MFADelete) // } switch v.Status { - case Enabled, Suspended: + case Enabled: + const maxExcludedPrefixes = 10 + if len(v.ExcludedPrefixes) > maxExcludedPrefixes { + return errTooManyExcludedPrefixes + } + for _, sprefix := range v.ExcludedPrefixes { + if !strings.HasSuffix(sprefix.Prefix, "/") { + return errInvalidPrefixPattern + } + } + + case Suspended: + if len(v.ExcludedPrefixes) > 0 { + return errExcludedPrefixNotSupported + } default: return Errorf("unsupported Versioning status %s", v.Status) } @@ -62,11 +94,67 @@ func (v Versioning) Enabled() bool { return v.Status == Enabled } +// PrefixEnabled - returns true if versioning is enabled at the bucket and given +// prefix, false otherwise. +func (v Versioning) PrefixEnabled(prefix string) bool { + if v.Status != Enabled { + return false + } + + if prefix == "" { + return true + } + if v.ExcludeFolders && strings.HasSuffix(prefix, "/") { + return false + } + + for _, sprefix := range v.ExcludedPrefixes { + // Note: all excluded prefix patterns end with `/` (See Validate) + sprefix.Prefix += "*" + + if matched := wildcard.MatchSimple(sprefix.Prefix, prefix); matched { + return false + } + } + return true +} + // Suspended - returns true if versioning is suspended func (v Versioning) Suspended() bool { return v.Status == Suspended } +// PrefixSuspended - returns true if versioning is suspended at the bucket level +// or suspended on the given prefix. +func (v Versioning) PrefixSuspended(prefix string) bool { + if v.Status == Suspended { + return true + } + if v.Status == Enabled { + if prefix == "" { + return false + } + if v.ExcludeFolders && strings.HasSuffix(prefix, "/") { + return true + } + + for _, sprefix := range v.ExcludedPrefixes { + // Note: all excluded prefix patterns end with `/` (See Validate) + sprefix.Prefix += "*" + if matched := wildcard.MatchSimple(sprefix.Prefix, prefix); matched { + return true + } + } + } + return false +} + +// PrefixesExcluded returns true if v contains one or more excluded object +// prefixes or if ExcludeFolders is true. +func (v Versioning) PrefixesExcluded() bool { + return len(v.ExcludedPrefixes) > 0 || v.ExcludeFolders +} + // ParseConfig - parses data in given reader to VersioningConfiguration. func ParseConfig(reader io.Reader) (*Versioning, error) { var v Versioning diff --git a/internal/bucket/versioning/versioning_test.go b/internal/bucket/versioning/versioning_test.go new file mode 100644 index 000000000..8f34bbb29 --- /dev/null +++ b/internal/bucket/versioning/versioning_test.go @@ -0,0 +1,250 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package versioning + +import ( + "encoding/xml" + "strings" + "testing" +) + +func TestParseConfig(t *testing.T) { + testcases := []struct { + input string + err error + excludedPrefixes []string + excludeFolders bool + }{ + { + input: ` + Enabled + `, + err: nil, + }, + { + input: ` + Enabled + + path/to/my/workload/_staging/ + + + path/to/my/workload/_temporary/ + + `, + err: nil, + excludedPrefixes: []string{"path/to/my/workload/_staging/", "path/to/my/workload/_temporary/"}, + }, + { + input: ` + Suspended + + path/to/my/workload/_staging + + `, + err: errExcludedPrefixNotSupported, + }, + { + input: ` + Enabled + + path/to/my/workload/_staging/ab/ + + + path/to/my/workload/_staging/cd/ + + + path/to/my/workload/_staging/ef/ + + + path/to/my/workload/_staging/gh/ + + + path/to/my/workload/_staging/ij/ + + + path/to/my/workload/_staging/kl/ + + + path/to/my/workload/_staging/mn/ + + + path/to/my/workload/_staging/op/ + + + path/to/my/workload/_staging/qr/ + + + path/to/my/workload/_staging/st/ + + + path/to/my/workload/_staging/uv/ + + `, + err: errTooManyExcludedPrefixes, + }, + { + input: ` + Enabled + true + + path/to/my/workload/_staging/ + + + path/to/my/workload/_temporary/ + + `, + err: nil, + excludedPrefixes: []string{"path/to/my/workload/_staging/", "path/to/my/workload/_temporary/"}, + excludeFolders: true, + }, + { + input: ` + Enabled + + path/to/my/workload/_staging + + `, + err: errInvalidPrefixPattern, + }, + } + + for i, tc := range testcases { + var v *Versioning + var err error + v, err = ParseConfig(strings.NewReader(tc.input)) + if tc.err != err { + t.Fatalf("Test %d: expected %v but got %v", i+1, tc.err, err) + } + if err != nil { + if tc.err == nil { + t.Fatalf("Test %d: failed due to %v", i+1, err) + } + } else { + if err := v.Validate(); tc.err != err { + t.Fatalf("Test %d: validation failed due to %v", i+1, err) + } + if len(tc.excludedPrefixes) > 0 { + var mismatch bool + if len(v.ExcludedPrefixes) != len(tc.excludedPrefixes) { + t.Fatalf("Test %d: Expected length of excluded prefixes %d but got %d", i+1, len(tc.excludedPrefixes), len(v.ExcludedPrefixes)) + } + var i int + var eprefix string + for i, eprefix = range tc.excludedPrefixes { + if eprefix != v.ExcludedPrefixes[i].Prefix { + mismatch = true + break + } + } + if mismatch { + t.Fatalf("Test %d: Expected excluded prefix %s but got %s", i+1, tc.excludedPrefixes[i], v.ExcludedPrefixes[i].Prefix) + } + } + if tc.excludeFolders != v.ExcludeFolders { + t.Fatalf("Test %d: Expected ExcludeFoldersr=%v but got %v", i+1, tc.excludeFolders, v.ExcludeFolders) + } + } + } +} + +func TestMarshalXML(t *testing.T) { + // Validates if Versioning with no excluded prefixes omits + // ExcludedPrefixes tags + v := Versioning{ + Status: Enabled, + } + buf, err := xml.Marshal(v) + if err != nil { + t.Fatalf("Failed to marshal %v: %v", v, err) + } + + str := string(buf) + if strings.Contains(str, "ExcludedPrefixes") { + t.Fatalf("XML shouldn't contain ExcludedPrefixes tag - %s", str) + } +} + +func TestVersioningZero(t *testing.T) { + var v Versioning + if v.Enabled() { + t.Fatalf("Expected to be disabled but got enabled") + } + if v.Suspended() { + t.Fatalf("Expected to be disabled but got suspended") + } +} + +func TestExcludeFolders(t *testing.T) { + v := Versioning{ + Status: Enabled, + ExcludeFolders: true, + } + testPrefixes := []string{"jobs/output/_temporary/", "jobs/output/", "jobs/"} + for i, prefix := range testPrefixes { + if v.PrefixEnabled(prefix) || !v.PrefixSuspended(prefix) { + t.Fatalf("Test %d: Expected versioning to be excluded for %s", i+1, prefix) + } + } + + // Test applicability for regular objects + if prefix := "prefix-1/obj-1"; !v.PrefixEnabled(prefix) || v.PrefixSuspended(prefix) { + t.Fatalf("Expected versioning to be enabled for %s", prefix) + } + + // Test when ExcludeFolders is disabled + v.ExcludeFolders = false + for i, prefix := range testPrefixes { + if !v.PrefixEnabled(prefix) || v.PrefixSuspended(prefix) { + t.Fatalf("Test %d: Expected versioning to be enabled for %s", i+1, prefix) + } + } +} + +func TestExcludedPrefixesMatch(t *testing.T) { + v := Versioning{ + Status: Enabled, + ExcludedPrefixes: []ExcludedPrefix{{"*/_temporary/"}}, + } + + if err := v.Validate(); err != nil { + t.Fatalf("Invalid test versioning config %v: %v", v, err) + } + tests := []struct { + prefix string + excluded bool + }{ + { + prefix: "app1-jobs/output/_temporary/attempt1/data.csv", + excluded: true, + }, + { + prefix: "app1-jobs/output/final/attempt1/data.csv", + excluded: false, + }, + } + + for i, test := range tests { + if v.PrefixSuspended(test.prefix) != test.excluded { + if test.excluded { + t.Fatalf("Test %d: Expected prefix %s to be excluded from versioning", i+1, test.prefix) + } else { + t.Fatalf("Test %d: Expected prefix %s to have versioning enabled", i+1, test.prefix) + } + } + } +}