From f98616dce784e42e8bd5c40d0d660b1b019e5b5e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 29 Jan 2020 12:05:44 +0530 Subject: [PATCH] heal: Optimize heal listing by avoiding batches (#8901) Also limit the heal per object if there is incoming requests by suspending heal for longer periods of time. --- cmd/admin-heal-ops.go | 8 ++--- cmd/fs-v1.go | 8 +---- cmd/gateway-unsupported.go | 7 +--- cmd/object-api-interface.go | 3 +- cmd/xl-sets.go | 43 ++++++++++++------------- cmd/xl-v1-list-objects-heal.go | 7 +--- cmd/xl-zones.go | 58 ++++++++++++++++++++++++++++------ 7 files changed, 78 insertions(+), 56 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 2020ccf59..e33b4655a 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -775,15 +775,15 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error { // healObject - heal the given object and record result func (h *healSequence) healObject(bucket, object string) error { - if h.isQuitting() { - return errHealStopSignalled - } - // Get current object layer instance. objectAPI := newObjectLayerWithoutSafeModeFn() if objectAPI == nil { return errServerNotInitialized } + if h.isQuitting() { + return errHealStopSignalled + } + return h.queueHealTask(pathJoin(bucket, object), madmin.HealItemObject) } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 9f6c7d9e0..376ce747f 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1254,7 +1254,7 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo } // HealObjects - no-op for fs. Valid only for XL. -func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) { +func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) { logger.LogIf(ctx, NotImplemented{}) return NotImplemented{} } @@ -1265,12 +1265,6 @@ func (fs *FSObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) return []BucketInfo{}, NotImplemented{} } -// ListObjectsHeal - list all objects to be healed. Valid only for XL -func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { - logger.LogIf(ctx, NotImplemented{}) - return ListObjectsInfo{}, NotImplemented{} -} - // GetMetrics - no op func (fs *FSObjects) GetMetrics(ctx context.Context) (*Metrics, error) { logger.LogIf(ctx, NotImplemented{}) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index b541bf2dc..fd8a0759c 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -148,11 +148,6 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck return nil, NotImplemented{} } -// ListObjectsHeal - Not implemented stub -func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { - return ListObjectsInfo{}, NotImplemented{} -} - // HealObject - Not implemented stub func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (h madmin.HealResultItem, e error) { return h, NotImplemented{} @@ -164,7 +159,7 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c } // HealObjects - Not implemented stub -func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) { +func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) { return NotImplemented{} } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 6f9496c64..8104668a1 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -99,10 +99,9 @@ type ObjectLayer interface { HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error) - HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error + HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) - ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) // Policy operations SetBucketPolicy(context.Context, string, *policy.Policy) error diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 84c2b07e0..b4b9dff19 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1626,46 +1626,45 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { // HealObjects - Heal all objects recursively at a specified prefix, any // dangling objects deleted as well automatically. -func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error { +func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error { + endWalkCh := make(chan struct{}) + defer close(endWalkCh) - marker := "" + recursive := true + entryChs := s.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh) + + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfo, len(entryChs)) for { + entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid) + if !ok { + break + } + + if quorumCount == s.drivesPerSet { + // Skip good entries. + continue + } + if httpServer := newHTTPServerFn(); httpServer != nil { // Wait at max 10 minute for an inprogress request before proceeding to heal waitCount := 600 // Any requests in progress, delay the heal. - for (httpServer.GetRequestCount() >= int32(s.setCount*s.drivesPerSet)) && + for (httpServer.GetRequestCount() >= int32(s.drivesPerSet)) && waitCount > 0 { waitCount-- time.Sleep(1 * time.Second) } } - res, err := s.ListObjectsHeal(ctx, bucket, prefix, marker, "", maxObjectList) - if err != nil { - continue + if err := healObject(bucket, entry.Name); err != nil { + return toObjectErr(err, bucket, entry.Name) } - - for _, obj := range res.Objects { - if err = healObjectFn(bucket, obj.Name); err != nil { - return toObjectErr(err, bucket, obj.Name) - } - } - - if !res.IsTruncated { - break - } - - marker = res.NextMarker } return nil } -func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { - return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, true) -} - // PutObjectTag - replace or add tags to an existing object func (s *xlSets) PutObjectTag(ctx context.Context, bucket, object string, tags string) error { return s.getHashedSet(object).PutObjectTag(ctx, bucket, object, tags) diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index c43f329d7..df2957601 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -23,12 +23,7 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { return nil, nil } -// This is not implemented, look for xl-sets.ListObjectsHeal() -func (xl xlObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { - return ListObjectsInfo{}, nil -} - // This is not implemented/needed anymore, look for xl-sets.HealObjects() -func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, healFn func(string, string) error) (e error) { +func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error { return nil } diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index cf22f9bbe..2f1697c64 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -24,6 +24,7 @@ import ( "net/http" "strings" "sync" + "time" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -1309,19 +1310,58 @@ func (z *xlZones) HealBucket(ctx context.Context, bucket string, dryRun, remove return r, nil } -func (z *xlZones) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { - if z.SingleZone() { - return z.zones[0].ListObjectsHeal(ctx, bucket, prefix, marker, delimiter, maxKeys) - } - return z.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, true) -} +type healObjectFn func(string, string) error -func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error { +func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error { + var zonesEntryChs [][]FileInfoCh + + recursive := true for _, zone := range z.zones { - if err := zone.HealObjects(ctx, bucket, prefix, healObjectFn); err != nil { - return err + endWalkCh := make(chan struct{}) + defer close(endWalkCh) + zonesEntryChs = append(zonesEntryChs, + zone.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh)) + } + + var zoneDrivesPerSet []int + for _, zone := range z.zones { + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + } + + var zonesEntriesInfos [][]FileInfo + var zonesEntriesValid [][]bool + for _, entryChs := range zonesEntryChs { + zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs))) + zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) + } + + for { + entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + if !ok { + break + } + + if quorumCount == zoneDrivesPerSet[zoneIndex] { + // Skip good entries. + continue + } + + if httpServer := newHTTPServerFn(); httpServer != nil { + // Wait at max 10 minute for an inprogress request before proceeding to heal + waitCount := 600 + // Any requests in progress, delay the heal. + for (httpServer.GetRequestCount() >= int32(zoneDrivesPerSet[zoneIndex])) && + waitCount > 0 { + waitCount-- + time.Sleep(1 * time.Second) + } + } + + if err := healObject(bucket, entry.Name); err != nil { + return toObjectErr(err, bucket, entry.Name) } } + return nil }