fix: healing across pools removing dangling objects (#13990)

adds other simplifications to the code when running
namespace heals across pools.
This commit is contained in:
Harshavardhana
2021-12-25 09:01:44 -08:00
committed by GitHub
parent 7e3a7d7044
commit b883803b21
4 changed files with 307 additions and 112 deletions

View File

@@ -1684,112 +1684,120 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
// HealObjectFn closure function heals the object.
type HealObjectFn func(bucket, object, versionID string) error
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
errCh := make(chan error)
func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(metaCacheEntry) error, errCh chan<- error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 {
errCh <- errors.New("listAndHeal: No non-healing disks found")
return
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
strict: false, // Allow less strict matching.
}
path := baseDirFromPrefix(prefix)
if path == "" {
path = prefix
}
lopts := listPathRawOptions{
disks: disks,
bucket: bucket,
path: path,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: func(entry metaCacheEntry) {
if err := healEntry(entry); err != nil {
errCh <- err
return
}
},
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
if err := healEntry(*entry); err != nil {
errCh <- err
return
}
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
return
}
}
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error {
errCh := make(chan error)
healEntry := func(entry metaCacheEntry) error {
if entry.isDir() {
return nil
}
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.minio.sys'
if bucket == minioMetaBucket {
if wildcard.Match("buckets/*/.metacache/*", entry.name) {
return nil
}
if wildcard.Match("tmp/*", entry.name) {
return nil
}
if wildcard.Match("multipart/*", entry.name) {
return nil
}
if wildcard.Match("tmp-old/*", entry.name) {
return nil
}
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
return healObjectFn(bucket, entry.name, "")
}
for _, version := range fivs.Versions {
if err := healObjectFn(bucket, version.Name, version.VersionID); err != nil {
return err
}
}
return nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
defer close(errCh)
defer cancel()
var wg sync.WaitGroup
for _, erasureSet := range z.serverPools {
var wg sync.WaitGroup
for _, set := range erasureSet.sets {
set := set
wg.Add(1)
go func() {
go func(set *erasureObjects) {
defer wg.Done()
disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 {
cancel()
errCh <- errors.New("HealObjects: No non-healing disks found")
return
}
healEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.minio.sys'
if bucket == minioMetaBucket {
if wildcard.Match("buckets/*/.metacache/*", entry.name) {
return
}
if wildcard.Match("tmp/*", entry.name) {
return
}
if wildcard.Match("multipart/*", entry.name) {
return
}
if wildcard.Match("tmp-old/*", entry.name) {
return
}
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
if err := healObject(bucket, entry.name, ""); err != nil {
cancel()
errCh <- err
return
}
return
}
for _, version := range fivs.Versions {
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
cancel()
errCh <- err
return
}
}
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
strict: false, // Allow less strict matching.
}
path := baseDirFromPrefix(prefix)
if path == "" {
path = prefix
}
lopts := listPathRawOptions{
disks: disks,
bucket: bucket,
path: path,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: healEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
healEntry(*entry)
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
cancel()
errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
return
}
}()
listAndHeal(ctx, bucket, prefix, set, healEntry, errCh)
}(set)
}
wg.Wait()
}
wg.Wait()
}()
return <-errCh
}
@@ -1797,14 +1805,33 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
object = encodeDirObject(object)
for _, pool := range z.serverPools {
result, err := pool.HealObject(ctx, bucket, object, versionID, opts)
result.Object = decodeDirObject(result.Object)
if err != nil {
return result, err
}
return result, nil
errs := make([]error, len(z.serverPools))
results := make([]madmin.HealResultItem, len(z.serverPools))
var wg sync.WaitGroup
for idx, pool := range z.serverPools {
wg.Add(1)
go func(idx int, pool *erasureSets) {
defer wg.Done()
result, err := pool.HealObject(ctx, bucket, object, versionID, opts)
result.Object = decodeDirObject(result.Object)
errs[idx] = err
results[idx] = result
}(idx, pool)
}
wg.Wait()
for _, err := range errs {
if err != nil {
return madmin.HealResultItem{}, err
}
}
for _, result := range results {
if result.Object != "" {
return result, nil
}
}
if versionID != "" {
return madmin.HealResultItem{}, VersionNotFound{
Bucket: bucket,