diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index bf0c4faad..59d1fbb56 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -811,6 +811,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * } bucketMap[bucket].NotificationConfigXML = configData + bucketMap[bucket].NotificationConfigUpdatedAt = updatedAt rpt.SetStatus(bucket, fileName, nil) case bucketPolicyConfig: // Error out if Content-Length is beyond allowed size. diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 8abbf3e1f..06972606b 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -124,6 +124,7 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, meta.PolicyConfigUpdatedAt = updatedAt case bucketNotificationConfig: meta.NotificationConfigXML = configData + meta.NotificationConfigUpdatedAt = updatedAt case bucketLifecycleConfig: meta.LifecycleConfigXML = configData meta.LifecycleConfigUpdatedAt = updatedAt @@ -153,6 +154,8 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string, if err != nil { return updatedAt, fmt.Errorf("Error encrypting bucket target metadata %w", err) } + meta.BucketTargetsConfigUpdatedAt = updatedAt + meta.BucketTargetsConfigMetaUpdatedAt = updatedAt default: return updatedAt, fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile) } @@ -504,7 +507,7 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []string, objAPI } // concurrently load bucket metadata to speed up loading bucket metadata. -func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string, failedBuckets map[string]struct{}) { +func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string) { g := errgroup.WithNErrs(len(buckets)) bucketMetas := make([]BucketMetadata, len(buckets)) for index := range buckets { @@ -545,10 +548,6 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []stri for i, meta := range bucketMetas { if errs[i] != nil { - if failedBuckets == nil { - failedBuckets = make(map[string]struct{}) - } - failedBuckets[buckets[i]] = struct{}{} continue } globalEventNotifier.set(buckets[i], meta) // set notification targets @@ -556,7 +555,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []stri } } -func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, failedBuckets map[string]struct{}) { +func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) { const bucketMetadataRefresh = 15 * time.Minute sleeper := newDynamicSleeper(2, 150*time.Millisecond, false) @@ -586,7 +585,10 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa for i := range buckets { wait := sleeper.Timer(ctx) - meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[i].Name) + bucket := buckets[i].Name + updated := false + + meta, err := loadBucketMetadata(ctx, sys.objAPI, bucket) if err != nil { internalLogIf(ctx, err, logger.WarningKind) wait() // wait to proceed to next entry. @@ -594,14 +596,16 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa } sys.Lock() - sys.metadataMap[buckets[i].Name] = meta + // Update if the bucket metadata in the memory is older than on-disk one + if lu := sys.metadataMap[bucket].lastUpdate(); lu.Before(meta.lastUpdate()) { + updated = true + sys.metadataMap[bucket] = meta + } sys.Unlock() - // Initialize the failed buckets - if _, ok := failedBuckets[buckets[i].Name]; ok { - globalEventNotifier.set(buckets[i].Name, meta) - globalBucketTargetSys.set(buckets[i].Name, meta) - delete(failedBuckets, buckets[i].Name) + if updated { + globalEventNotifier.set(bucket, meta) + globalBucketTargetSys.set(bucket, meta) } wait() // wait to proceed to next entry. @@ -622,13 +626,12 @@ func (sys *BucketMetadataSys) Initialized() bool { // Loads bucket metadata for all buckets into BucketMetadataSys. func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) { count := globalEndpoints.ESCount() * 10 - failedBuckets := make(map[string]struct{}) for { if len(buckets) < count { - sys.concurrentLoad(ctx, buckets, failedBuckets) + sys.concurrentLoad(ctx, buckets) break } - sys.concurrentLoad(ctx, buckets[:count], failedBuckets) + sys.concurrentLoad(ctx, buckets[:count]) buckets = buckets[count:] } @@ -637,7 +640,7 @@ func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) { sys.Unlock() if globalIsDistErasure { - go sys.refreshBucketsMetadataLoop(ctx, failedBuckets) + go sys.refreshBucketsMetadataLoop(ctx) } } diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 5e04f08fb..d0132c1c9 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -81,14 +81,19 @@ type BucketMetadata struct { ReplicationConfigXML []byte BucketTargetsConfigJSON []byte BucketTargetsConfigMetaJSON []byte - PolicyConfigUpdatedAt time.Time - ObjectLockConfigUpdatedAt time.Time - EncryptionConfigUpdatedAt time.Time - TaggingConfigUpdatedAt time.Time - QuotaConfigUpdatedAt time.Time - ReplicationConfigUpdatedAt time.Time - VersioningConfigUpdatedAt time.Time - LifecycleConfigUpdatedAt time.Time + + PolicyConfigUpdatedAt time.Time + ObjectLockConfigUpdatedAt time.Time + EncryptionConfigUpdatedAt time.Time + TaggingConfigUpdatedAt time.Time + QuotaConfigUpdatedAt time.Time + ReplicationConfigUpdatedAt time.Time + VersioningConfigUpdatedAt time.Time + LifecycleConfigUpdatedAt time.Time + NotificationConfigUpdatedAt time.Time + BucketTargetsConfigUpdatedAt time.Time + BucketTargetsConfigMetaUpdatedAt time.Time + // Add a new UpdatedAt field and update lastUpdate function // Unexported fields. Must be updated atomically. policyConfig *policy.BucketPolicy @@ -120,6 +125,46 @@ func newBucketMetadata(name string) BucketMetadata { } } +// Return the last update of this bucket metadata, which +// means, the last update of any policy document. +func (b BucketMetadata) lastUpdate() (t time.Time) { + if b.PolicyConfigUpdatedAt.After(t) { + t = b.PolicyConfigUpdatedAt + } + if b.ObjectLockConfigUpdatedAt.After(t) { + t = b.ObjectLockConfigUpdatedAt + } + if b.EncryptionConfigUpdatedAt.After(t) { + t = b.EncryptionConfigUpdatedAt + } + if b.TaggingConfigUpdatedAt.After(t) { + t = b.TaggingConfigUpdatedAt + } + if b.QuotaConfigUpdatedAt.After(t) { + t = b.QuotaConfigUpdatedAt + } + if b.ReplicationConfigUpdatedAt.After(t) { + t = b.ReplicationConfigUpdatedAt + } + if b.VersioningConfigUpdatedAt.After(t) { + t = b.VersioningConfigUpdatedAt + } + if b.LifecycleConfigUpdatedAt.After(t) { + t = b.LifecycleConfigUpdatedAt + } + if b.NotificationConfigUpdatedAt.After(t) { + t = b.NotificationConfigUpdatedAt + } + if b.BucketTargetsConfigUpdatedAt.After(t) { + t = b.BucketTargetsConfigUpdatedAt + } + if b.BucketTargetsConfigMetaUpdatedAt.After(t) { + t = b.BucketTargetsConfigMetaUpdatedAt + } + + return +} + // Versioning returns true if versioning is enabled func (b BucketMetadata) Versioning() bool { return b.LockEnabled || (b.versioningConfig != nil && b.versioningConfig.Enabled()) || (b.objectLockConfig != nil && b.objectLockConfig.Enabled()) @@ -440,6 +485,18 @@ func (b *BucketMetadata) defaultTimestamps() { if b.LifecycleConfigUpdatedAt.IsZero() { b.LifecycleConfigUpdatedAt = b.Created } + + if b.NotificationConfigUpdatedAt.IsZero() { + b.NotificationConfigUpdatedAt = b.Created + } + + if b.BucketTargetsConfigUpdatedAt.IsZero() { + b.BucketTargetsConfigUpdatedAt = b.Created + } + + if b.BucketTargetsConfigMetaUpdatedAt.IsZero() { + b.BucketTargetsConfigMetaUpdatedAt = b.Created + } } // Save config to supplied ObjectLayer api. diff --git a/cmd/bucket-metadata_gen.go b/cmd/bucket-metadata_gen.go index 3e86a80e4..133fda76b 100644 --- a/cmd/bucket-metadata_gen.go +++ b/cmd/bucket-metadata_gen.go @@ -156,6 +156,24 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "LifecycleConfigUpdatedAt") return } + case "NotificationConfigUpdatedAt": + z.NotificationConfigUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "NotificationConfigUpdatedAt") + return + } + case "BucketTargetsConfigUpdatedAt": + z.BucketTargetsConfigUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt") + return + } + case "BucketTargetsConfigMetaUpdatedAt": + z.BucketTargetsConfigMetaUpdatedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt") + return + } default: err = dc.Skip() if err != nil { @@ -169,9 +187,9 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 22 + // map header, size 25 // write "Name" - err = en.Append(0xde, 0x0, 0x16, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + err = en.Append(0xde, 0x0, 0x19, 0xa4, 0x4e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -390,15 +408,45 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "LifecycleConfigUpdatedAt") return } + // write "NotificationConfigUpdatedAt" + err = en.Append(0xbb, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.NotificationConfigUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "NotificationConfigUpdatedAt") + return + } + // write "BucketTargetsConfigUpdatedAt" + err = en.Append(0xbc, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.BucketTargetsConfigUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt") + return + } + // write "BucketTargetsConfigMetaUpdatedAt" + err = en.Append(0xd9, 0x20, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4d, 0x65, 0x74, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.BucketTargetsConfigMetaUpdatedAt) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 22 + // map header, size 25 // string "Name" - o = append(o, 0xde, 0x0, 0x16, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = append(o, 0xde, 0x0, 0x19, 0xa4, 0x4e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Name) // string "Created" o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) @@ -463,6 +511,15 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) { // string "LifecycleConfigUpdatedAt" o = append(o, 0xb8, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) o = msgp.AppendTime(o, z.LifecycleConfigUpdatedAt) + // string "NotificationConfigUpdatedAt" + o = append(o, 0xbb, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.NotificationConfigUpdatedAt) + // string "BucketTargetsConfigUpdatedAt" + o = append(o, 0xbc, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.BucketTargetsConfigUpdatedAt) + // string "BucketTargetsConfigMetaUpdatedAt" + o = append(o, 0xd9, 0x20, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4d, 0x65, 0x74, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74) + o = msgp.AppendTime(o, z.BucketTargetsConfigMetaUpdatedAt) return } @@ -616,6 +673,24 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "LifecycleConfigUpdatedAt") return } + case "NotificationConfigUpdatedAt": + z.NotificationConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "NotificationConfigUpdatedAt") + return + } + case "BucketTargetsConfigUpdatedAt": + z.BucketTargetsConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt") + return + } + case "BucketTargetsConfigMetaUpdatedAt": + z.BucketTargetsConfigMetaUpdatedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -630,6 +705,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BucketMetadata) Msgsize() (s int) { - s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize + 25 + msgp.TimeSize + s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize + 25 + msgp.TimeSize + 28 + msgp.TimeSize + 29 + msgp.TimeSize + 34 + msgp.TimeSize return }