Recreate bucket metacache if corrupted (#10800)

If bucket metadata cannot be read, clean up existing and create a new.
This commit is contained in:
Klaus Post 2020-10-31 10:26:16 -07:00 committed by GitHub
parent 422898d9b3
commit fe9f23e632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 6 deletions

View File

@ -51,7 +51,17 @@ type bucketMetacache struct {
}
// newBucketMetacache creates a new bucketMetacache.
func newBucketMetacache(bucket string) *bucketMetacache {
// Optionally remove all existing caches.
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
if cleanup {
// Recursively delete all caches.
objAPI := newObjectLayerFn()
ez, ok := objAPI.(*erasureServerSets)
if ok {
ctx := context.Background()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
return &bucketMetacache{
bucket: bucket,
caches: make(map[string]metacache, 10),
@ -97,19 +107,26 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache,
} else {
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket), err
if errors.Is(err, InsufficientReadQuorum{}) {
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
}
return newBucketMetacache(bucket, false), err
}
wg.Wait()
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket), nil
return newBucketMetacache(bucket, true), nil
}
return &meta, nil
}
@ -393,6 +410,16 @@ func (b *bucketMetacache) deleteAll() {
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
return
}
b.updated = true
if !b.transient {
// Delete all.
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
b.caches = make(map[string]metacache, 10)
return
}
// Transient are in different buckets.
var wg sync.WaitGroup
for id := range b.caches {
wg.Add(1)
@ -400,9 +427,9 @@ func (b *bucketMetacache) deleteAll() {
defer wg.Done()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
}(b.caches[id])
delete(b.caches, id)
}
wg.Wait()
b.caches = make(map[string]metacache, 10)
}
// deleteCache will delete a specific cache and all files related to it across the cluster.

View File

@ -45,7 +45,7 @@ const metacacheManagerTransientBucket = "**transient**"
// initManager will start async saving the cache.
func (m *metacacheManager) initManager() {
// Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket)
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb