From 52221db7ef4944fa0a2e0b133c760687ca8fe66e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 31 May 2022 02:57:57 -0700 Subject: [PATCH] fix: for unexpected errors in reading versioning config panic (#14994) We need to make sure if we cannot read bucket metadata for some reason, and bucket metadata is not missing and returning corrupted information we should panic such handlers to disallow I/O to protect the overall state on the system. In-case of such corruption we have a mechanism now to force recreate the metadata on the bucket, using `x-minio-force-create` header with `PUT /bucket` API call. Additionally fix the versioning config updated state to be set properly for the site replication healing to trigger correctly. --- cmd/bucket-lifecycle_test.go | 2 +- cmd/bucket-metadata-sys.go | 42 ++++++++++-------------- cmd/bucket-metadata.go | 6 ++++ cmd/bucket-metadata_gen.go | 35 +++++++++++++++++--- cmd/bucket-policy.go | 3 +- cmd/bucket-replication.go | 6 ++++ cmd/bucket-versioning.go | 38 +++++++++++++-------- cmd/data-scanner.go | 6 +++- cmd/erasure-metadata.go | 4 +-- cmd/erasure-multipart.go | 2 +- cmd/erasure-object.go | 26 +++++++-------- cmd/erasure-server-pool.go | 6 ++-- cmd/erasure-single-drive.go | 34 ++++++++++--------- cmd/metacache-entries.go | 11 +++++-- cmd/metacache-server-pool.go | 7 +++- cmd/server-main.go | 2 -- cmd/site-replication.go | 21 +++++++++++- cmd/xl-storage.go | 10 ++++-- internal/bucket/versioning/versioning.go | 5 +++ 19 files changed, 178 insertions(+), 88 deletions(-) diff --git a/cmd/bucket-lifecycle_test.go b/cmd/bucket-lifecycle_test.go index 38bafee6b..a37e3492d 100644 --- a/cmd/bucket-lifecycle_test.go +++ b/cmd/bucket-lifecycle_test.go @@ -203,7 +203,7 @@ func TestObjectIsRemote(t *testing.T) { if got := fi.IsRemote(); got != tc.remote { t.Fatalf("Test %d.a: expected %v got %v", i+1, tc.remote, got) } - oi := fi.ToObjectInfo("bucket", "object") + oi := fi.ToObjectInfo("bucket", "object", false) if got := oi.IsRemote(); got != tc.remote { t.Fatalf("Test %d.b: expected %v got %v", i+1, tc.remote, got) } diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index c8fbdca84..73c26f8ef 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -131,6 +131,7 @@ func (sys *BucketMetadataSys) Update(ctx context.Context, bucket string, configF meta.ObjectLockConfigUpdatedAt = UTCNow() case bucketVersioningConfig: meta.VersioningConfigXML = configData + meta.VersioningConfigUpdatedAt = UTCNow() case bucketReplicationConfig: meta.ReplicationConfigXML = configData meta.ReplicationConfigUpdatedAt = UTCNow() @@ -184,12 +185,15 @@ func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) { // GetVersioningConfig returns configured versioning config // The returned object may not be modified. -func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, error) { +func (sys *BucketMetadataSys) GetVersioningConfig(bucket string) (*versioning.Versioning, time.Time, error) { meta, err := sys.GetConfig(GlobalContext, bucket) if err != nil { - return &versioning.Versioning{}, err + if errors.Is(err, errConfigNotFound) { + return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, meta.Created, nil + } + return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, time.Time{}, err } - return meta.versioningConfig, nil + return meta.versioningConfig, meta.VersioningConfigUpdatedAt, nil } // GetTaggingConfig returns configured tagging config @@ -306,26 +310,27 @@ func (sys *BucketMetadataSys) CreatedAt(bucket string) (time.Time, error) { // GetPolicyConfig returns configured bucket policy // The returned object may not be modified. -func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, error) { +func (sys *BucketMetadataSys) GetPolicyConfig(bucket string) (*policy.Policy, time.Time, error) { if globalIsGateway { objAPI := newObjectLayerFn() if objAPI == nil { - return nil, errServerNotInitialized + return nil, time.Time{}, errServerNotInitialized } - return objAPI.GetBucketPolicy(GlobalContext, bucket) + p, err := objAPI.GetBucketPolicy(GlobalContext, bucket) + return p, UTCNow(), err } meta, err := sys.GetConfig(GlobalContext, bucket) if err != nil { if errors.Is(err, errConfigNotFound) { - return nil, BucketPolicyNotFound{Bucket: bucket} + return nil, time.Time{}, BucketPolicyNotFound{Bucket: bucket} } - return nil, err + return nil, time.Time{}, err } if meta.policyConfig == nil { - return nil, BucketPolicyNotFound{Bucket: bucket} + return nil, time.Time{}, BucketPolicyNotFound{Bucket: bucket} } - return meta.policyConfig, nil + return meta.policyConfig, meta.PolicyConfigUpdatedAt, nil } // GetQuotaConfig returns configured bucket quota @@ -360,6 +365,9 @@ func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket s func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.BucketTargets, error) { meta, err := sys.GetConfig(GlobalContext, bucket) if err != nil { + if errors.Is(err, errConfigNotFound) { + return nil, BucketRemoteTargetNotFound{Bucket: bucket} + } return nil, err } if meta.bucketTargetConfig == nil { @@ -368,20 +376,6 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc return meta.bucketTargetConfig, nil } -// GetBucketTarget returns the target for the bucket and arn. -func (sys *BucketMetadataSys) GetBucketTarget(bucket string, arn string) (madmin.BucketTarget, error) { - targets, err := sys.GetBucketTargetsConfig(bucket) - if err != nil { - return madmin.BucketTarget{}, err - } - for _, t := range targets.Targets { - if t.Arn == arn { - return t, nil - } - } - return madmin.BucketTarget{}, errConfigNotFound -} - // GetConfig returns a specific configuration from the bucket metadata. // The returned object may not be modified. func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (BucketMetadata, error) { diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 8837495ff..7029fa6a2 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -87,6 +87,7 @@ type BucketMetadata struct { TaggingConfigUpdatedAt time.Time QuotaConfigUpdatedAt time.Time ReplicationConfigUpdatedAt time.Time + VersioningConfigUpdatedAt time.Time // Unexported fields. Must be updated atomically. policyConfig *policy.Policy @@ -364,6 +365,7 @@ func (b *BucketMetadata) defaultTimestamps() { if b.PolicyConfigUpdatedAt.IsZero() { b.PolicyConfigUpdatedAt = b.Created } + if b.EncryptionConfigUpdatedAt.IsZero() { b.EncryptionConfigUpdatedAt = b.Created } @@ -383,6 +385,10 @@ func (b *BucketMetadata) defaultTimestamps() { if b.ReplicationConfigUpdatedAt.IsZero() { b.ReplicationConfigUpdatedAt = b.Created } + + if b.VersioningConfigUpdatedAt.IsZero() { + b.VersioningConfigUpdatedAt = b.Created + } } // Save config to supplied ObjectLayer api. diff --git a/cmd/bucket-metadata_gen.go b/cmd/bucket-metadata_gen.go index 2907e9d38..43bd3a442 100644 --- a/cmd/bucket-metadata_gen.go +++ b/cmd/bucket-metadata_gen.go @@ -144,6 +144,12 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ReplicationConfigUpdatedAt") return } + case "VersioningConfigUpdatedAt": + z.VersioningConfigUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "VersioningConfigUpdatedAt") + return + } default: err = dc.Skip() if err != nil { @@ -157,9 +163,9 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 20 + // map header, size 21 // write "Name" - err = en.Append(0xde, 0x0, 0x14, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + err = en.Append(0xde, 0x0, 0x15, 0xa4, 0x4e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -358,15 +364,25 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ReplicationConfigUpdatedAt") return } + // write "VersioningConfigUpdatedAt" + err = en.Append(0xb9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.VersioningConfigUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "VersioningConfigUpdatedAt") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 20 + // map header, size 21 // string "Name" - o = append(o, 0xde, 0x0, 0x14, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = append(o, 0xde, 0x0, 0x15, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "Created" o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) @@ -425,6 +441,9 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) { // string "ReplicationConfigUpdatedAt" o = append(o, 0xba, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) o = msgp.AppendTime(o, z.ReplicationConfigUpdatedAt) + // string "VersioningConfigUpdatedAt" + o = append(o, 0xb9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.VersioningConfigUpdatedAt) return } @@ -566,6 +585,12 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ReplicationConfigUpdatedAt") return } + case "VersioningConfigUpdatedAt": + z.VersioningConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "VersioningConfigUpdatedAt") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -580,6 +605,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BucketMetadata) Msgsize() (s int) { - s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize return } diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index 048e37635..00140379d 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -38,7 +38,8 @@ type PolicySys struct{} // Get returns stored bucket policy func (sys *PolicySys) Get(bucket string) (*policy.Policy, error) { - return globalBucketMetadataSys.GetPolicyConfig(bucket) + policy, _, err := globalBucketMetadataSys.GetPolicyConfig(bucket) + return policy, err } // IsAllowed - checks given policy args is allowed to continue the Rest API. diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 260f4c653..becdc9c3d 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -190,11 +190,17 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica return } + // object layer not initialized we return with no decision. + if newObjectLayerFn() == nil { + return + } + // Disable server-side replication on object prefixes which are excluded // from versioning via the MinIO bucket versioning extension. if globalBucketVersioningSys.PrefixSuspended(bucket, object) { return } + replStatus := mopts.ReplicationStatus() if replStatus == replication.Replica && !mopts.isMetadataReplication() { return diff --git a/cmd/bucket-versioning.go b/cmd/bucket-versioning.go index 370f41e06..00a2c69ff 100644 --- a/cmd/bucket-versioning.go +++ b/cmd/bucket-versioning.go @@ -17,16 +17,21 @@ package cmd -import "github.com/minio/minio/internal/bucket/versioning" +import ( + "strings" + + "github.com/minio/minio/internal/bucket/versioning" + "github.com/minio/minio/internal/logger" +) // BucketVersioningSys - policy subsystem. type BucketVersioningSys struct{} // Enabled enabled versioning? func (sys *BucketVersioningSys) Enabled(bucket string) bool { - vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + vc, err := sys.Get(bucket) if err != nil { - return false + logger.CriticalIf(GlobalContext, err) } return vc.Enabled() } @@ -35,18 +40,18 @@ func (sys *BucketVersioningSys) Enabled(bucket string) bool { // the given prefix doesn't match any excluded prefixes pattern. This is // part of a MinIO versioning configuration extension. func (sys *BucketVersioningSys) PrefixEnabled(bucket, prefix string) bool { - vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + vc, err := sys.Get(bucket) if err != nil { - return false + logger.CriticalIf(GlobalContext, err) } return vc.PrefixEnabled(prefix) } // Suspended suspended versioning? func (sys *BucketVersioningSys) Suspended(bucket string) bool { - vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + vc, err := sys.Get(bucket) if err != nil { - return false + logger.CriticalIf(GlobalContext, err) } return vc.Suspended() } @@ -54,9 +59,9 @@ func (sys *BucketVersioningSys) Suspended(bucket string) bool { // PrefixSuspended returns true if the given prefix matches an excluded prefix // pattern. This is part of a MinIO versioning configuration extension. func (sys *BucketVersioningSys) PrefixSuspended(bucket, prefix string) bool { - vc, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + vc, err := sys.Get(bucket) if err != nil { - return false + logger.CriticalIf(GlobalContext, err) } return vc.PrefixSuspended(prefix) @@ -64,12 +69,17 @@ func (sys *BucketVersioningSys) PrefixSuspended(bucket, prefix string) bool { // Get returns stored bucket policy func (sys *BucketVersioningSys) Get(bucket string) (*versioning.Versioning, error) { - return globalBucketMetadataSys.GetVersioningConfig(bucket) -} + if globalIsGateway { + // Gateway does not implement versioning. + return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, nil + } -// Reset BucketVersioningSys to initial state. -func (sys *BucketVersioningSys) Reset() { - // There is currently no internal state. + if bucket == minioMetaBucket || strings.HasPrefix(bucket, minioMetaBucket) { + return &versioning.Versioning{XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/"}, nil + } + + vcfg, _, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + return vcfg, err } // NewBucketVersioningSys - creates new versioning system. diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 45ad303ba..bb52bd1a5 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1057,9 +1057,13 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob fivs = fivs[:lim+1] rcfg, _ := globalBucketObjectLockSys.Get(i.bucket) + vcfg, _ := globalBucketVersioningSys.Get(i.bucket) + + versioned := vcfg != nil && vcfg.Versioned(i.objectPath()) + toDel := make([]ObjectToDelete, 0, len(overflowVersions)) for _, fi := range overflowVersions { - obj := fi.ToObjectInfo(i.bucket, i.objectPath()) + obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned) // skip versions with object locking enabled if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { if i.debug { diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 6d6c141f6..a92a55fe7 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -104,10 +104,10 @@ func (fi FileInfo) IsValid() bool { } // ToObjectInfo - Converts metadata to object info. -func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { +func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInfo { object = decodeDirObject(object) versionID := fi.VersionID - if (globalBucketVersioningSys.PrefixEnabled(bucket, object) || globalBucketVersioningSys.PrefixSuspended(bucket, object)) && versionID == "" { + if versioned && versionID == "" { versionID = nullVersionID } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index e2b104180..2c940a85a 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1027,7 +1027,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str fi.IsLatest = true // Success, return object info. - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } // AbortMultipartUpload - aborts an ongoing multipart operation diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 91ffbd07d..29ef73155 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -117,7 +117,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d if srcOpts.VersionID == "" { return oi, toObjectErr(errFileNotFound, srcBucket, srcObject) } - return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject) + return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), toObjectErr(errMethodNotAllowed, srcBucket, srcObject) } filterOnlineDisksInplace(fi, metaArr, onlineDisks) @@ -178,7 +178,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d return oi, toObjectErr(err, srcBucket, srcObject) } - return fi.ToObjectInfo(srcBucket, srcObject), nil + return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), nil } // GetObjectNInfo - returns object info and an object @@ -245,7 +245,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri } } - objInfo := fi.ToObjectInfo(bucket, object) + objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if objInfo.DeleteMarker { if opts.VersionID == "" { return &GetObjectReader{ @@ -638,7 +638,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin if err != nil { return objInfo, toObjectErr(err, bucket, object) } - objInfo = fi.ToObjectInfo(bucket, object) + objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if fi.Deleted { if opts.VersionID == "" || opts.DeleteMarker { return objInfo, toObjectErr(errFileNotFound, bucket, object) @@ -662,7 +662,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj wquorum++ } - objInfo = fi.ToObjectInfo(bucket, object) + objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { // Make sure to return object info to provide extra information. return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object) @@ -854,7 +854,7 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r * return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key) } - return fi.ToObjectInfo(minioMetaBucket, key), nil + return fi.ToObjectInfo(minioMetaBucket, key, opts.Versioned || opts.VersionSuspended), nil } // PutObject - creates an object upon reading from the input stream @@ -1159,7 +1159,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st // we are adding a new version to this object under the namespace lock, so this is the latest version. fi.IsLatest = true - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error { @@ -1533,7 +1533,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } } @@ -1638,7 +1638,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s opts.VersionID = fi.VersionID } - objInfo := fi.ToObjectInfo(bucket, object) + objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if opts.EvalMetadataFn != nil { if err := opts.EvalMetadataFn(objInfo); err != nil { return ObjectInfo{}, err @@ -1654,7 +1654,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s return ObjectInfo{}, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } // PutObjectTags - replace or add tags to an existing object @@ -1718,7 +1718,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin return ObjectInfo{}, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } // updateObjectMeta will update the metadata of a file. @@ -1856,7 +1856,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st break } - objInfo := fi.ToObjectInfo(bucket, object) + objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) sendEvent(eventArgs{ EventName: eventName, BucketName: bucket, @@ -1911,7 +1911,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } - oi = actualfi.ToObjectInfo(bucket, object) + oi = actualfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi) if len(oi.Parts) == 1 { var rs *HTTPRangeSpec diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index e4e02d784..c8ed788fb 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1756,6 +1756,8 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re defer cancel() defer close(results) + versioned := opts.Versioned || opts.VersionSuspended + for _, erasureSet := range z.serverPools { var wg sync.WaitGroup for _, set := range erasureSet.sets { @@ -1783,12 +1785,12 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re if opts.WalkAscending { for i := len(fivs.Versions) - 1; i >= 0; i-- { version := fivs.Versions[i] - results <- version.ToObjectInfo(bucket, version.Name) + results <- version.ToObjectInfo(bucket, version.Name, versioned) } return } for _, version := range fivs.Versions { - results <- version.ToObjectInfo(bucket, version.Name) + results <- version.ToObjectInfo(bucket, version.Name, versioned) } } diff --git a/cmd/erasure-single-drive.go b/cmd/erasure-single-drive.go index 0f165a5c1..15e089f9f 100644 --- a/cmd/erasure-single-drive.go +++ b/cmd/erasure-single-drive.go @@ -401,7 +401,7 @@ func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, d if srcOpts.VersionID == "" { return oi, toObjectErr(errFileNotFound, srcBucket, srcObject) } - return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject) + return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), toObjectErr(errMethodNotAllowed, srcBucket, srcObject) } filterOnlineDisksInplace(fi, metaArr, onlineDisks) @@ -457,7 +457,7 @@ func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, d return oi, toObjectErr(err, srcBucket, srcObject) } - return fi.ToObjectInfo(srcBucket, srcObject), nil + return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), nil } putOpts := ObjectOptions{ @@ -516,7 +516,7 @@ func (es *erasureSingle) GetObjectNInfo(ctx context.Context, bucket, object stri return nil, toObjectErr(err, bucket, object) } - objInfo := fi.ToObjectInfo(bucket, object) + objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if objInfo.DeleteMarker { if opts.VersionID == "" { return &GetObjectReader{ @@ -716,7 +716,7 @@ func (es *erasureSingle) getObjectInfo(ctx context.Context, bucket, object strin if err != nil { return objInfo, toObjectErr(err, bucket, object) } - objInfo = fi.ToObjectInfo(bucket, object) + objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if fi.Deleted { if opts.VersionID == "" || opts.DeleteMarker { return objInfo, toObjectErr(errFileNotFound, bucket, object) @@ -740,7 +740,7 @@ func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, obj wquorum++ } - objInfo = fi.ToObjectInfo(bucket, object) + objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { // Make sure to return object info to provide extra information. return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object) @@ -889,7 +889,7 @@ func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r * return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key) } - return fi.ToObjectInfo(minioMetaBucket, key), nil + return fi.ToObjectInfo(minioMetaBucket, key, opts.Versioned || opts.VersionSuspended), nil } // PutObject - creates an object upon reading from the input stream @@ -1148,7 +1148,7 @@ func (es *erasureSingle) putObject(ctx context.Context, bucket string, object st // we are adding a new version to this object under the namespace lock, so this is the latest version. fi.IsLatest = true - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error { @@ -1475,7 +1475,7 @@ func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } } @@ -1549,7 +1549,7 @@ func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object s opts.VersionID = fi.VersionID } - objInfo := fi.ToObjectInfo(bucket, object) + objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) if opts.EvalMetadataFn != nil { if err := opts.EvalMetadataFn(objInfo); err != nil { return ObjectInfo{}, err @@ -1565,7 +1565,7 @@ func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object s return ObjectInfo{}, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } // PutObjectTags - replace or add tags to an existing object @@ -1623,7 +1623,7 @@ func (es *erasureSingle) PutObjectTags(ctx context.Context, bucket, object strin return ObjectInfo{}, toObjectErr(err, bucket, object) } - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } // updateObjectMeta will update the metadata of a file. @@ -1739,7 +1739,7 @@ func (es *erasureSingle) TransitionObject(ctx context.Context, bucket, object st eventName = event.ObjectTransitionFailed } - objInfo := fi.ToObjectInfo(bucket, object) + objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) sendEvent(eventArgs{ EventName: eventName, BucketName: bucket, @@ -1794,7 +1794,7 @@ func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket s return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } - oi = actualfi.ToObjectInfo(bucket, object) + oi = actualfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended) ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi) if len(oi.Parts) == 1 { var rs *HTTPRangeSpec @@ -2722,7 +2722,7 @@ func (es *erasureSingle) CompleteMultipartUpload(ctx context.Context, bucket str fi.IsLatest = true // Success, return object info. - return fi.ToObjectInfo(bucket, object), nil + return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil } // AbortMultipartUpload - aborts an ongoing multipart operation @@ -2922,6 +2922,8 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result defer cancel() defer close(results) + versioned := opts.Versioned || opts.VersionSuspended + var wg sync.WaitGroup wg.Add(1) go func() { @@ -2939,12 +2941,12 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result if opts.WalkAscending { for i := len(fivs.Versions) - 1; i >= 0; i-- { version := fivs.Versions[i] - results <- version.ToObjectInfo(bucket, version.Name) + results <- version.ToObjectInfo(bucket, version.Name, versioned) } return } for _, version := range fivs.Versions { - results <- version.ToObjectInfo(bucket, version.Name) + results <- version.ToObjectInfo(bucket, version.Name, versioned) } } diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index c3eec6738..3ced82825 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -438,6 +438,9 @@ func (m metaCacheEntriesSorted) shallowClone() metaCacheEntriesSorted { func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, afterV string) (versions []ObjectInfo) { versions = make([]ObjectInfo, 0, m.len()) prevPrefix := "" + vcfg, _ := globalBucketVersioningSys.Get(bucket) + versioned := vcfg != nil && vcfg.Versioned(prefix) + for _, entry := range m.o { if entry.isObject() { if delimiter != "" { @@ -473,7 +476,7 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft } for _, version := range fiVersions { - versions = append(versions, version.ToObjectInfo(bucket, entry.name)) + versions = append(versions, version.ToObjectInfo(bucket, entry.name, versioned)) } continue @@ -509,6 +512,10 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo) { objects = make([]ObjectInfo, 0, m.len()) prevPrefix := "" + + vcfg, _ := globalBucketVersioningSys.Get(bucket) + versioned := vcfg != nil && vcfg.Versioned(prefix) + for _, entry := range m.o { if entry.isObject() { if delimiter != "" { @@ -531,7 +538,7 @@ func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (ob fi, err := entry.fileInfo(bucket) if err == nil { - objects = append(objects, fi.ToObjectInfo(bucket, entry.name)) + objects = append(objects, fi.ToObjectInfo(bucket, entry.name, versioned)) } continue } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index c28c7d554..82998f86f 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -642,6 +642,8 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions, // function closes 'out' and exits. func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, lr lock.Retention, in <-chan metaCacheEntry, out chan<- metaCacheEntry) { defer close(out) + + vcfg, _ := globalBucketVersioningSys.Get(bucket) for { var obj metaCacheEntry var ok bool @@ -658,7 +660,10 @@ func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, if err != nil { continue } - objInfo := fi.ToObjectInfo(bucket, obj.name) + + versioned := vcfg != nil && vcfg.Versioned(obj.name) + + objInfo := fi.ToObjectInfo(bucket, obj.name, versioned) action := evalActionFromLifecycle(ctx, lc, lr, objInfo, false) switch action { case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: diff --git a/cmd/server-main.go b/cmd/server-main.go index 158560cc5..bcbc544f1 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -277,8 +277,6 @@ func initAllSubsystems() { // Create new bucket versioning subsystem if globalBucketVersioningSys == nil { globalBucketVersioningSys = NewBucketVersioningSys() - } else { - globalBucketVersioningSys.Reset() } // Create new bucket replication subsytem diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 69ec63abb..a159733cf 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -2984,8 +2984,9 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI CreatedAt: createdAt, Location: globalSite.Region, } + // Get bucket policy if present. - policy, err := globalPolicySys.Get(bucket) + policy, updatedAt, err := globalBucketMetadataSys.GetPolicyConfig(bucket) found := true if _, ok := err.(BucketPolicyNotFound); ok { found = false @@ -2998,6 +2999,7 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI return info, wrapSRErr(err) } bms.Policy = policyJSON + bms.PolicyUpdatedAt = updatedAt } // Get bucket tags if present. @@ -3018,6 +3020,23 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI bms.TagConfigUpdatedAt = updatedAt } + versioningCfg, updatedAt, err := globalBucketMetadataSys.GetVersioningConfig(bucket) + found = true + if versioningCfg != nil && versioningCfg.Status == "" { + found = false + } else if err != nil { + return info, errSRBackendIssue(err) + } + if found { + versionCfgData, err := xml.Marshal(versioningCfg) + if err != nil { + return info, wrapSRErr(err) + } + versioningCfgStr := base64.StdEncoding.EncodeToString(versionCfgData) + bms.Versioning = &versioningCfgStr + bms.VersioningConfigUpdatedAt = updatedAt + } + // Get object-lock config if present. objLockCfg, updatedAt, err := globalBucketMetadataSys.GetObjectLockConfig(bucket) found = true diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 3283d7e46..d7aecadf9 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -443,6 +443,9 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } } } + + vcfg, _ := globalBucketVersioningSys.Get(cache.Info.Name) + // return initialized object layer objAPI := newObjectLayerFn() // object layer not initialized, return. @@ -494,9 +497,12 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } return sizeSummary{}, errSkipFile } + + versioned := vcfg != nil && vcfg.Versioned(item.objectPath()) + for _, version := range fivs.Versions { atomic.AddUint64(&globalScannerStats.accTotalVersions, 1) - oi := version.ToObjectInfo(item.bucket, item.objectPath()) + oi := version.ToObjectInfo(item.bucket, item.objectPath(), versioned) sz := item.applyActions(ctx, objAPI, oi, &sizeS) if oi.VersionID != "" && sz == oi.Size { sizeS.versions++ @@ -521,7 +527,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates // apply tier sweep action on free versions for _, freeVersion := range fivs.FreeVersions { - oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath()) + oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned) item.applyTierObjSweep(ctx, objAPI, oi) } return sizeS, nil diff --git a/internal/bucket/versioning/versioning.go b/internal/bucket/versioning/versioning.go index a986cc82e..bf8e227fc 100644 --- a/internal/bucket/versioning/versioning.go +++ b/internal/bucket/versioning/versioning.go @@ -88,6 +88,11 @@ func (v Versioning) Enabled() bool { return v.Status == Enabled } +// Versioned returns if 'prefix' has versioning enabled or suspended. +func (v Versioning) Versioned(prefix string) bool { + return v.PrefixEnabled(prefix) || v.PrefixSuspended(prefix) +} + // PrefixEnabled - returns true if versioning is enabled at the bucket and given // prefix, false otherwise. func (v Versioning) PrefixEnabled(prefix string) bool {