From 7079abc9313478abb1978088277183115301af79 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 13 Mar 2019 17:35:09 -0700 Subject: [PATCH] Implement HealObjects API to simplify healing (#7351) --- cmd/admin-heal-ops.go | 110 +++++---------------- cmd/disk-cache.go | 4 +- cmd/fs-v1.go | 11 +-- cmd/fs-v1_test.go | 6 +- cmd/gateway-unsupported.go | 6 +- cmd/object-api-interface.go | 2 +- cmd/tree-walk-pool.go | 1 - cmd/xl-sets.go | 116 ++++++----------------- cmd/xl-v1-list-objects-heal.go | 6 +- cmd/xl-v1-list-objects.go | 5 +- pkg/madmin/examples/heal-objects-list.go | 78 --------------- 11 files changed, 72 insertions(+), 273 deletions(-) delete mode 100644 pkg/madmin/examples/heal-objects-list.go diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b083f84ca..3a48b8250 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -21,14 +21,12 @@ import ( "encoding/json" "fmt" "net/http" - "runtime" "strings" "sync" "time" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/madmin" - "github.com/minio/minio/pkg/sync/errgroup" ) // healStatusSummary - overall short summary of a healing sequence @@ -573,51 +571,22 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error { // NOTE: Healing on meta is run regardless // of any bucket being selected, this is to ensure that // meta are always upto date and correct. - marker := "" - isTruncated := true - for isTruncated { - if globalHTTPServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to heal - waitCount := 60 - // Any requests in progress, delay the heal. - for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } + return objectAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, func(bucket string, object string) error { + if h.isQuitting() { + return errHealStopSignalled } - - // Lists all objects under `config` prefix. - objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, minioMetaBucket, metaPrefix, - marker, "", 1000) - if err != nil { - return errFnHealFromAPIErr(h.ctx, err) + res, herr := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove) + // Object might have been deleted, by the time heal + // was attempted we ignore this object an move on. + if isErrObjectNotFound(herr) { + return nil } - - for index := range objectInfos.Objects { - if h.isQuitting() { - return errHealStopSignalled - } - o := objectInfos.Objects[index] - res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun, h.settings.Remove) - // Object might have been deleted, by the time heal - // was attempted we ignore this file an move on. - if isErrObjectNotFound(herr) { - continue - } - if herr != nil { - return herr - } - res.Type = madmin.HealItemBucketMetadata - if err = h.pushHealResultItem(res); err != nil { - return err - } + if herr != nil { + return herr } - - isTruncated = objectInfos.IsTruncated - marker = objectInfos.NextMarker - } - return nil + res.Type = madmin.HealItemBucketMetadata + return h.pushHealResultItem(res) + }) } } @@ -720,46 +689,9 @@ func (h *healSequence) healBucket(bucket string) error { return nil } - entries := runtime.NumCPU() - - marker := "" - isTruncated := true - for isTruncated { - if globalHTTPServer != nil { - // Wait at max 1 minute for an inprogress request - // before proceeding to heal - waitCount := 60 - // Any requests in progress, delay the heal. - for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { - waitCount-- - time.Sleep(1 * time.Second) - } - } - - // Heal numCPU * nodes objects at a time. - objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, bucket, - h.objPrefix, marker, "", entries) - if err != nil { - return errFnHealFromAPIErr(h.ctx, err) - } - - g := errgroup.WithNErrs(len(objectInfos.Objects)) - for index := range objectInfos.Objects { - index := index - g.Go(func() error { - o := objectInfos.Objects[index] - return h.healObject(o.Bucket, o.Name) - }, index) - } - - for _, err := range g.Wait() { - if err != nil { - return err - } - } - - isTruncated = objectInfos.IsTruncated - marker = objectInfos.NextMarker + if err = objectAPI.HealObjects(h.ctx, bucket, + h.objPrefix, h.healObject); err != nil { + return errFnHealFromAPIErr(h.ctx, err) } return nil } @@ -770,6 +702,16 @@ func (h *healSequence) healObject(bucket, object string) error { return errHealStopSignalled } + if globalHTTPServer != nil { + // Wait at max 1 minute for an inprogress request + // before proceeding to heal + waitCount := 60 + // Any requests in progress, delay the heal. + for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 { + waitCount-- + time.Sleep(1 * time.Second) + } + } // Get current object layer instance. objectAPI := newObjectLayerFn() if objectAPI == nil { diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 7cf2bff4d..099d379f7 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -423,7 +423,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark if delimiter == slashSeparator { recursive = false } - walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) + walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := func(bucket, object string) bool { @@ -495,7 +495,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark } } - params := listParams{bucket, recursive, nextMarker, prefix, false} + params := listParams{bucket, recursive, nextMarker, prefix} if !eof { c.listPool.Set(params, walkResultCh, endWalkCh) } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 080158f5d..6051b340c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1148,8 +1148,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de return fs.getObjectInfo(ctx, bucket, entry) } - heal := false // true only for xl.ListObjectsHeal() - walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix, heal}) + walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := func(bucket, object string) bool { @@ -1203,7 +1202,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de } // Save list routine for the next marker if we haven't reached EOF. - params := listParams{bucket, recursive, nextMarker, prefix, heal} + params := listParams{bucket, recursive, nextMarker, prefix} if !eof { fs.listPool.Set(params, walkResultCh, endWalkCh) } @@ -1254,10 +1253,10 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo return madmin.HealResultItem{}, NotImplemented{} } -// ListObjectsHeal - list all objects to be healed. Valid only for XL -func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { +// HealObjects - no-op for fs. Valid only for XL. +func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) { logger.LogIf(ctx, NotImplemented{}) - return loi, NotImplemented{} + return NotImplemented{} } // ListBucketsHeal - list all buckets to be healed. Valid only for XL diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index 973f1f796..c746b8513 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -396,13 +396,13 @@ func TestFSHealObject(t *testing.T) { } } -// TestFSListObjectHeal - tests for fs ListObjectHeals -func TestFSListObjectsHeal(t *testing.T) { +// TestFSHealObjects - tests for fs HealObjects to return not implemented. +func TestFSHealObjects(t *testing.T) { disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) defer os.RemoveAll(disk) obj := initFSObjects(disk, t) - _, err := obj.ListObjectsHeal(context.Background(), "bucket", "prefix", "marker", "delimiter", 1000) + err := obj.HealObjects(context.Background(), "bucket", "prefix", nil) if err == nil || !isSameType(err, NotImplemented{}) { t.Fatalf("Heal Object should return NotImplemented error ") } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 62e79d0a9..64cdb6bb1 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -111,9 +111,9 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c return result, NotImplemented{} } -// ListObjectsHeal - Not implemented stub -func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { - return loi, NotImplemented{} +// HealObjects - Not implemented stub +func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) { + return NotImplemented{} } // CopyObject copies a blob from source container to destination container. diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 134d8319b..36e78b546 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -90,7 +90,7 @@ type ObjectLayer interface { HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error) ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) - ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) + HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error // Policy operations SetBucketPolicy(context.Context, string, *policy.Policy) error diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 4dd44ecb5..590e13aa0 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -33,7 +33,6 @@ type listParams struct { recursive bool marker string prefix string - heal bool } // errWalkAbort - returned by doTreeWalk() if it returns prematurely. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 809cbe4e8..aaad84964 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -725,7 +725,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi recursive = false } - walkResultCh, endWalkCh := s.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) + walkResultCh, endWalkCh := s.listPool.Release(listParams{bucket, recursive, marker, prefix}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := func(bucket, entry string) bool { @@ -798,7 +798,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi } } - params := listParams{bucket, recursive, nextMarker, prefix, false} + params := listParams{bucket, recursive, nextMarker, prefix} if !eof { s.listPool.Set(params, walkResultCh, endWalkCh) } @@ -1319,110 +1319,48 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { return listBuckets, nil } -// listObjectsHeal - wrapper function implemented over file tree walk. -func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { - // Default is recursive, if delimiter is set then list non recursive. +// HealObjects - Heal all objects recursively at a specified prefix, any +// dangling objects deleted as well automatically. +func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) (err error) { recursive := true - if delimiter == slashSeparator { - recursive = false + + endWalkCh := make(chan struct{}) + isLeaf := func(bucket, entry string) bool { + entry = strings.TrimSuffix(entry, slashSeparator) + // Verify if we are at the leaf, a leaf is where we + // see `xl.json` inside a directory. + return s.getHashedSet(entry).isObject(bucket, entry) } - // "heal" true for listObjectsHeal() and false for listObjects() - walkResultCh, endWalkCh := s.listPool.Release(listParams{bucket, recursive, marker, prefix, true}) - if walkResultCh == nil { - endWalkCh = make(chan struct{}) - isLeaf := func(bucket, entry string) bool { - entry = strings.TrimSuffix(entry, slashSeparator) - // Verify if we are at the leaf, a leaf is where we - // see `xl.json` inside a directory. - return s.getHashedSet(entry).isObject(bucket, entry) - } - - isLeafDir := func(bucket, entry string) bool { - var ok bool - for _, set := range s.sets { - ok = set.isObjectDir(bucket, entry) - if ok { - return true - } + isLeafDir := func(bucket, entry string) bool { + var ok bool + for _, set := range s.sets { + ok = set.isObjectDir(bucket, entry) + if ok { + return true } - return false } - - listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, s.sets...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) + return false } - var objInfos []ObjectInfo - var eof bool - var nextMarker string - for i := 0; i < maxKeys; { + listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, s.sets...) + walkResultCh := startTreeWalk(ctx, bucket, prefix, "", recursive, listDir, isLeaf, isLeafDir, endWalkCh) + for { walkResult, ok := <-walkResultCh if !ok { - // Closed channel. - eof = true break } // For any walk error return right away. if walkResult.err != nil { - return loi, toObjectErr(walkResult.err, bucket, prefix) + return toObjectErr(walkResult.err, bucket, prefix) + } + if err := healObjectFn(bucket, walkResult.entry); err != nil { + return toObjectErr(err, bucket, walkResult.entry) } - var objInfo ObjectInfo - objInfo.Bucket = bucket - objInfo.Name = walkResult.entry - nextMarker = objInfo.Name - objInfos = append(objInfos, objInfo) - i++ if walkResult.end { - eof = true break } } - params := listParams{bucket, recursive, nextMarker, prefix, true} - if !eof { - s.listPool.Set(params, walkResultCh, endWalkCh) - } - - result := ListObjectsInfo{IsTruncated: !eof} - for _, objInfo := range objInfos { - result.NextMarker = objInfo.Name - result.Objects = append(result.Objects, objInfo) - } - return result, nil -} - -// This is not implemented yet, will be implemented later to comply with Admin API refactor. -func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { - if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil { - return loi, err - } - - // With max keys of zero we have reached eof, return right here. - if maxKeys == 0 { - return loi, nil - } - - // For delimiter and prefix as '/' we do not list anything at all - // since according to s3 spec we stop at the 'delimiter' along - // with the prefix. On a flat namespace with 'prefix' as '/' - // we don't have any entries, since all the keys are of form 'keyName/...' - if delimiter == slashSeparator && prefix == slashSeparator { - return loi, nil - } - - // Over flowing count - reset to maxObjectList. - if maxKeys < 0 || maxKeys > maxObjectList { - maxKeys = maxObjectList - } - - // Initiate a list operation, if successful filter and return quickly. - listObjInfo, err := s.listObjectsHeal(ctx, bucket, prefix, marker, delimiter, maxKeys) - if err == nil { - // We got the entries successfully return. - return listObjInfo, nil - } - - // Return error at the end. - return loi, toObjectErr(err, bucket, prefix) + return nil } diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index de64a510b..a720686e3 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -23,7 +23,7 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { return nil, nil } -// This is not implemented/needed anymore, look for xl-sets.ListObjectsHeal() -func (xl xlObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { - return loi, nil +// This is not implemented/needed anymore, look for xl-sets.HealObjects() +func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, healFn func(string, string) error) (e error) { + return nil } diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index 6bd92fdb6..368c2d067 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -69,8 +69,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del recursive = false } - heal := false // true only for xl.ListObjectsHeal - walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal}) + walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := xl.isObject @@ -125,7 +124,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del } } - params := listParams{bucket, recursive, nextMarker, prefix, heal} + params := listParams{bucket, recursive, nextMarker, prefix} if !eof { xl.listPool.Set(params, walkResultCh, endWalkCh) } diff --git a/pkg/madmin/examples/heal-objects-list.go b/pkg/madmin/examples/heal-objects-list.go deleted file mode 100644 index ff8a8b290..000000000 --- a/pkg/madmin/examples/heal-objects-list.go +++ /dev/null @@ -1,78 +0,0 @@ -// +build ignore - -package main - -/* - * Minio Cloud Storage, (C) 2017 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import ( - "fmt" - "log" - - "github.com/minio/minio/pkg/madmin" -) - -func main() { - - // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are - // dummy values, please replace them with original values. - - // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. - // New returns an Minio Admin client object. - madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) - if err != nil { - log.Fatalln(err) - } - - bucket := "mybucket" - prefix := "myprefix" - - // Create a done channel to control 'ListObjectsHeal' go routine. - doneCh := make(chan struct{}) - // Indicate to our routine to exit cleanly upon return. - defer close(doneCh) - - // Set true if recursive listing is needed. - isRecursive := true - // List objects that need healing for a given bucket and - // prefix. - healObjectsCh, err := madmClnt.ListObjectsHeal(bucket, prefix, isRecursive, doneCh) - if err != nil { - log.Fatalln(err) - } - - for object := range healObjectsCh { - if object.Err != nil { - log.Fatalln(err) - return - } - - if object.HealObjectInfo != nil { - switch healInfo := *object.HealObjectInfo; healInfo.Status { - case madmin.CanHeal: - fmt.Println(object.Key, " can be healed.") - case madmin.CanPartiallyHeal: - fmt.Println(object.Key, " can't be healed completely, some disks are offline.") - case madmin.QuorumUnavailable: - fmt.Println(object.Key, " can't be healed until quorum is available.") - case madmin.Corrupted: - fmt.Println(object.Key, " can't be healed, not enough information.") - } - } - fmt.Println("object: ", object) - } -}