bucket-metadata: Reload events/repl-targets for all buckets (#20334)

Currently, the bucket events and replication targets are only reloaded
with buckets that failed to load during the first cluster startup,
which is wrong because if one bucket change was done in one node but
that node was not able to notify other nodes; the other nodes will
reload the bucket metadata config but fails to set the events and bucket
targets in the memory.
This commit is contained in:
Anis Eleuch 2024-08-28 16:32:18 +01:00 committed by GitHub
parent c65e67c357
commit 38c0840834
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 166 additions and 30 deletions

View File

@ -811,6 +811,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
}
bucketMap[bucket].NotificationConfigXML = configData
bucketMap[bucket].NotificationConfigUpdatedAt = updatedAt
rpt.SetStatus(bucket, fileName, nil)
case bucketPolicyConfig:
// Error out if Content-Length is beyond allowed size.

View File

@ -124,6 +124,7 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string,
meta.PolicyConfigUpdatedAt = updatedAt
case bucketNotificationConfig:
meta.NotificationConfigXML = configData
meta.NotificationConfigUpdatedAt = updatedAt
case bucketLifecycleConfig:
meta.LifecycleConfigXML = configData
meta.LifecycleConfigUpdatedAt = updatedAt
@ -153,6 +154,8 @@ func (sys *BucketMetadataSys) updateAndParse(ctx context.Context, bucket string,
if err != nil {
return updatedAt, fmt.Errorf("Error encrypting bucket target metadata %w", err)
}
meta.BucketTargetsConfigUpdatedAt = updatedAt
meta.BucketTargetsConfigMetaUpdatedAt = updatedAt
default:
return updatedAt, fmt.Errorf("Unknown bucket %s metadata update requested %s", bucket, configFile)
}
@ -504,7 +507,7 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []string, objAPI
}
// concurrently load bucket metadata to speed up loading bucket metadata.
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string, failedBuckets map[string]struct{}) {
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string) {
g := errgroup.WithNErrs(len(buckets))
bucketMetas := make([]BucketMetadata, len(buckets))
for index := range buckets {
@ -545,10 +548,6 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []stri
for i, meta := range bucketMetas {
if errs[i] != nil {
if failedBuckets == nil {
failedBuckets = make(map[string]struct{})
}
failedBuckets[buckets[i]] = struct{}{}
continue
}
globalEventNotifier.set(buckets[i], meta) // set notification targets
@ -556,7 +555,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []stri
}
}
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, failedBuckets map[string]struct{}) {
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
const bucketMetadataRefresh = 15 * time.Minute
sleeper := newDynamicSleeper(2, 150*time.Millisecond, false)
@ -586,7 +585,10 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa
for i := range buckets {
wait := sleeper.Timer(ctx)
meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[i].Name)
bucket := buckets[i].Name
updated := false
meta, err := loadBucketMetadata(ctx, sys.objAPI, bucket)
if err != nil {
internalLogIf(ctx, err, logger.WarningKind)
wait() // wait to proceed to next entry.
@ -594,14 +596,16 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa
}
sys.Lock()
sys.metadataMap[buckets[i].Name] = meta
// Update if the bucket metadata in the memory is older than on-disk one
if lu := sys.metadataMap[bucket].lastUpdate(); lu.Before(meta.lastUpdate()) {
updated = true
sys.metadataMap[bucket] = meta
}
sys.Unlock()
// Initialize the failed buckets
if _, ok := failedBuckets[buckets[i].Name]; ok {
globalEventNotifier.set(buckets[i].Name, meta)
globalBucketTargetSys.set(buckets[i].Name, meta)
delete(failedBuckets, buckets[i].Name)
if updated {
globalEventNotifier.set(bucket, meta)
globalBucketTargetSys.set(bucket, meta)
}
wait() // wait to proceed to next entry.
@ -622,13 +626,12 @@ func (sys *BucketMetadataSys) Initialized() bool {
// Loads bucket metadata for all buckets into BucketMetadataSys.
func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) {
count := globalEndpoints.ESCount() * 10
failedBuckets := make(map[string]struct{})
for {
if len(buckets) < count {
sys.concurrentLoad(ctx, buckets, failedBuckets)
sys.concurrentLoad(ctx, buckets)
break
}
sys.concurrentLoad(ctx, buckets[:count], failedBuckets)
sys.concurrentLoad(ctx, buckets[:count])
buckets = buckets[count:]
}
@ -637,7 +640,7 @@ func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) {
sys.Unlock()
if globalIsDistErasure {
go sys.refreshBucketsMetadataLoop(ctx, failedBuckets)
go sys.refreshBucketsMetadataLoop(ctx)
}
}

View File

@ -81,14 +81,19 @@ type BucketMetadata struct {
ReplicationConfigXML []byte
BucketTargetsConfigJSON []byte
BucketTargetsConfigMetaJSON []byte
PolicyConfigUpdatedAt time.Time
ObjectLockConfigUpdatedAt time.Time
EncryptionConfigUpdatedAt time.Time
TaggingConfigUpdatedAt time.Time
QuotaConfigUpdatedAt time.Time
ReplicationConfigUpdatedAt time.Time
VersioningConfigUpdatedAt time.Time
LifecycleConfigUpdatedAt time.Time
PolicyConfigUpdatedAt time.Time
ObjectLockConfigUpdatedAt time.Time
EncryptionConfigUpdatedAt time.Time
TaggingConfigUpdatedAt time.Time
QuotaConfigUpdatedAt time.Time
ReplicationConfigUpdatedAt time.Time
VersioningConfigUpdatedAt time.Time
LifecycleConfigUpdatedAt time.Time
NotificationConfigUpdatedAt time.Time
BucketTargetsConfigUpdatedAt time.Time
BucketTargetsConfigMetaUpdatedAt time.Time
// Add a new UpdatedAt field and update lastUpdate function
// Unexported fields. Must be updated atomically.
policyConfig *policy.BucketPolicy
@ -120,6 +125,46 @@ func newBucketMetadata(name string) BucketMetadata {
}
}
// Return the last update of this bucket metadata, which
// means, the last update of any policy document.
func (b BucketMetadata) lastUpdate() (t time.Time) {
if b.PolicyConfigUpdatedAt.After(t) {
t = b.PolicyConfigUpdatedAt
}
if b.ObjectLockConfigUpdatedAt.After(t) {
t = b.ObjectLockConfigUpdatedAt
}
if b.EncryptionConfigUpdatedAt.After(t) {
t = b.EncryptionConfigUpdatedAt
}
if b.TaggingConfigUpdatedAt.After(t) {
t = b.TaggingConfigUpdatedAt
}
if b.QuotaConfigUpdatedAt.After(t) {
t = b.QuotaConfigUpdatedAt
}
if b.ReplicationConfigUpdatedAt.After(t) {
t = b.ReplicationConfigUpdatedAt
}
if b.VersioningConfigUpdatedAt.After(t) {
t = b.VersioningConfigUpdatedAt
}
if b.LifecycleConfigUpdatedAt.After(t) {
t = b.LifecycleConfigUpdatedAt
}
if b.NotificationConfigUpdatedAt.After(t) {
t = b.NotificationConfigUpdatedAt
}
if b.BucketTargetsConfigUpdatedAt.After(t) {
t = b.BucketTargetsConfigUpdatedAt
}
if b.BucketTargetsConfigMetaUpdatedAt.After(t) {
t = b.BucketTargetsConfigMetaUpdatedAt
}
return
}
// Versioning returns true if versioning is enabled
func (b BucketMetadata) Versioning() bool {
return b.LockEnabled || (b.versioningConfig != nil && b.versioningConfig.Enabled()) || (b.objectLockConfig != nil && b.objectLockConfig.Enabled())
@ -440,6 +485,18 @@ func (b *BucketMetadata) defaultTimestamps() {
if b.LifecycleConfigUpdatedAt.IsZero() {
b.LifecycleConfigUpdatedAt = b.Created
}
if b.NotificationConfigUpdatedAt.IsZero() {
b.NotificationConfigUpdatedAt = b.Created
}
if b.BucketTargetsConfigUpdatedAt.IsZero() {
b.BucketTargetsConfigUpdatedAt = b.Created
}
if b.BucketTargetsConfigMetaUpdatedAt.IsZero() {
b.BucketTargetsConfigMetaUpdatedAt = b.Created
}
}
// Save config to supplied ObjectLayer api.

View File

@ -156,6 +156,24 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "LifecycleConfigUpdatedAt")
return
}
case "NotificationConfigUpdatedAt":
z.NotificationConfigUpdatedAt, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "NotificationConfigUpdatedAt")
return
}
case "BucketTargetsConfigUpdatedAt":
z.BucketTargetsConfigUpdatedAt, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt")
return
}
case "BucketTargetsConfigMetaUpdatedAt":
z.BucketTargetsConfigMetaUpdatedAt, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt")
return
}
default:
err = dc.Skip()
if err != nil {
@ -169,9 +187,9 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 22
// map header, size 25
// write "Name"
err = en.Append(0xde, 0x0, 0x16, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
err = en.Append(0xde, 0x0, 0x19, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
@ -390,15 +408,45 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "LifecycleConfigUpdatedAt")
return
}
// write "NotificationConfigUpdatedAt"
err = en.Append(0xbb, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.NotificationConfigUpdatedAt)
if err != nil {
err = msgp.WrapError(err, "NotificationConfigUpdatedAt")
return
}
// write "BucketTargetsConfigUpdatedAt"
err = en.Append(0xbc, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.BucketTargetsConfigUpdatedAt)
if err != nil {
err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt")
return
}
// write "BucketTargetsConfigMetaUpdatedAt"
err = en.Append(0xd9, 0x20, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4d, 0x65, 0x74, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.BucketTargetsConfigMetaUpdatedAt)
if err != nil {
err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 22
// map header, size 25
// string "Name"
o = append(o, 0xde, 0x0, 0x16, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = append(o, 0xde, 0x0, 0x19, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z.Name)
// string "Created"
o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
@ -463,6 +511,15 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
// string "LifecycleConfigUpdatedAt"
o = append(o, 0xb8, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
o = msgp.AppendTime(o, z.LifecycleConfigUpdatedAt)
// string "NotificationConfigUpdatedAt"
o = append(o, 0xbb, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
o = msgp.AppendTime(o, z.NotificationConfigUpdatedAt)
// string "BucketTargetsConfigUpdatedAt"
o = append(o, 0xbc, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
o = msgp.AppendTime(o, z.BucketTargetsConfigUpdatedAt)
// string "BucketTargetsConfigMetaUpdatedAt"
o = append(o, 0xd9, 0x20, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4d, 0x65, 0x74, 0x61, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74)
o = msgp.AppendTime(o, z.BucketTargetsConfigMetaUpdatedAt)
return
}
@ -616,6 +673,24 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "LifecycleConfigUpdatedAt")
return
}
case "NotificationConfigUpdatedAt":
z.NotificationConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "NotificationConfigUpdatedAt")
return
}
case "BucketTargetsConfigUpdatedAt":
z.BucketTargetsConfigUpdatedAt, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "BucketTargetsConfigUpdatedAt")
return
}
case "BucketTargetsConfigMetaUpdatedAt":
z.BucketTargetsConfigMetaUpdatedAt, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "BucketTargetsConfigMetaUpdatedAt")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -630,6 +705,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketMetadata) Msgsize() (s int) {
s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize + 25 + msgp.TimeSize
s = 3 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.VersioningConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) + 21 + msgp.BytesPrefixSize + len(z.ReplicationConfigXML) + 24 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigJSON) + 28 + msgp.BytesPrefixSize + len(z.BucketTargetsConfigMetaJSON) + 22 + msgp.TimeSize + 26 + msgp.TimeSize + 26 + msgp.TimeSize + 23 + msgp.TimeSize + 21 + msgp.TimeSize + 27 + msgp.TimeSize + 26 + msgp.TimeSize + 25 + msgp.TimeSize + 28 + msgp.TimeSize + 29 + msgp.TimeSize + 34 + msgp.TimeSize
return
}