Implement HealObjects API to simplify healing (#7351)

This commit is contained in:
Harshavardhana 2019-03-13 17:35:09 -07:00 committed by kannappanr
parent 8377d00574
commit 7079abc931
11 changed files with 72 additions and 273 deletions

View File

@ -21,14 +21,12 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
) )
// healStatusSummary - overall short summary of a healing sequence // healStatusSummary - overall short summary of a healing sequence
@ -573,51 +571,22 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
// NOTE: Healing on meta is run regardless // NOTE: Healing on meta is run regardless
// of any bucket being selected, this is to ensure that // of any bucket being selected, this is to ensure that
// meta are always upto date and correct. // meta are always upto date and correct.
marker := "" return objectAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, func(bucket string, object string) error {
isTruncated := true if h.isQuitting() {
for isTruncated { return errHealStopSignalled
if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
} }
res, herr := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove)
// Lists all objects under `config` prefix. // Object might have been deleted, by the time heal
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, minioMetaBucket, metaPrefix, // was attempted we ignore this object an move on.
marker, "", 1000) if isErrObjectNotFound(herr) {
if err != nil { return nil
return errFnHealFromAPIErr(h.ctx, err)
} }
if herr != nil {
for index := range objectInfos.Objects { return herr
if h.isQuitting() {
return errHealStopSignalled
}
o := objectInfos.Objects[index]
res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun, h.settings.Remove)
// Object might have been deleted, by the time heal
// was attempted we ignore this file an move on.
if isErrObjectNotFound(herr) {
continue
}
if herr != nil {
return herr
}
res.Type = madmin.HealItemBucketMetadata
if err = h.pushHealResultItem(res); err != nil {
return err
}
} }
res.Type = madmin.HealItemBucketMetadata
isTruncated = objectInfos.IsTruncated return h.pushHealResultItem(res)
marker = objectInfos.NextMarker })
}
return nil
} }
} }
@ -720,46 +689,9 @@ func (h *healSequence) healBucket(bucket string) error {
return nil return nil
} }
entries := runtime.NumCPU() if err = objectAPI.HealObjects(h.ctx, bucket,
h.objPrefix, h.healObject); err != nil {
marker := "" return errFnHealFromAPIErr(h.ctx, err)
isTruncated := true
for isTruncated {
if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
// Heal numCPU * nodes objects at a time.
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, bucket,
h.objPrefix, marker, "", entries)
if err != nil {
return errFnHealFromAPIErr(h.ctx, err)
}
g := errgroup.WithNErrs(len(objectInfos.Objects))
for index := range objectInfos.Objects {
index := index
g.Go(func() error {
o := objectInfos.Objects[index]
return h.healObject(o.Bucket, o.Name)
}, index)
}
for _, err := range g.Wait() {
if err != nil {
return err
}
}
isTruncated = objectInfos.IsTruncated
marker = objectInfos.NextMarker
} }
return nil return nil
} }
@ -770,6 +702,16 @@ func (h *healSequence) healObject(bucket, object string) error {
return errHealStopSignalled return errHealStopSignalled
} }
if globalHTTPServer != nil {
// Wait at max 1 minute for an inprogress request
// before proceeding to heal
waitCount := 60
// Any requests in progress, delay the heal.
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
// Get current object layer instance. // Get current object layer instance.
objectAPI := newObjectLayerFn() objectAPI := newObjectLayerFn()
if objectAPI == nil { if objectAPI == nil {

View File

@ -423,7 +423,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
if delimiter == slashSeparator { if delimiter == slashSeparator {
recursive = false recursive = false
} }
walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := func(bucket, object string) bool { isLeaf := func(bucket, object string) bool {
@ -495,7 +495,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
} }
} }
params := listParams{bucket, recursive, nextMarker, prefix, false} params := listParams{bucket, recursive, nextMarker, prefix}
if !eof { if !eof {
c.listPool.Set(params, walkResultCh, endWalkCh) c.listPool.Set(params, walkResultCh, endWalkCh)
} }

View File

@ -1148,8 +1148,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de
return fs.getObjectInfo(ctx, bucket, entry) return fs.getObjectInfo(ctx, bucket, entry)
} }
heal := false // true only for xl.ListObjectsHeal() walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix})
walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := func(bucket, object string) bool { isLeaf := func(bucket, object string) bool {
@ -1203,7 +1202,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de
} }
// Save list routine for the next marker if we haven't reached EOF. // Save list routine for the next marker if we haven't reached EOF.
params := listParams{bucket, recursive, nextMarker, prefix, heal} params := listParams{bucket, recursive, nextMarker, prefix}
if !eof { if !eof {
fs.listPool.Set(params, walkResultCh, endWalkCh) fs.listPool.Set(params, walkResultCh, endWalkCh)
} }
@ -1254,10 +1253,10 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo
return madmin.HealResultItem{}, NotImplemented{} return madmin.HealResultItem{}, NotImplemented{}
} }
// ListObjectsHeal - list all objects to be healed. Valid only for XL // HealObjects - no-op for fs. Valid only for XL.
func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) {
logger.LogIf(ctx, NotImplemented{}) logger.LogIf(ctx, NotImplemented{})
return loi, NotImplemented{} return NotImplemented{}
} }
// ListBucketsHeal - list all buckets to be healed. Valid only for XL // ListBucketsHeal - list all buckets to be healed. Valid only for XL

View File

@ -396,13 +396,13 @@ func TestFSHealObject(t *testing.T) {
} }
} }
// TestFSListObjectHeal - tests for fs ListObjectHeals // TestFSHealObjects - tests for fs HealObjects to return not implemented.
func TestFSListObjectsHeal(t *testing.T) { func TestFSHealObjects(t *testing.T) {
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
defer os.RemoveAll(disk) defer os.RemoveAll(disk)
obj := initFSObjects(disk, t) obj := initFSObjects(disk, t)
_, err := obj.ListObjectsHeal(context.Background(), "bucket", "prefix", "marker", "delimiter", 1000) err := obj.HealObjects(context.Background(), "bucket", "prefix", nil)
if err == nil || !isSameType(err, NotImplemented{}) { if err == nil || !isSameType(err, NotImplemented{}) {
t.Fatalf("Heal Object should return NotImplemented error ") t.Fatalf("Heal Object should return NotImplemented error ")
} }

View File

@ -111,9 +111,9 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c
return result, NotImplemented{} return result, NotImplemented{}
} }
// ListObjectsHeal - Not implemented stub // HealObjects - Not implemented stub
func (a GatewayUnsupported) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn func(string, string) error) (e error) {
return loi, NotImplemented{} return NotImplemented{}
} }
// CopyObject copies a blob from source container to destination container. // CopyObject copies a blob from source container to destination container.

View File

@ -90,7 +90,7 @@ type ObjectLayer interface {
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error)
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error
// Policy operations // Policy operations
SetBucketPolicy(context.Context, string, *policy.Policy) error SetBucketPolicy(context.Context, string, *policy.Policy) error

View File

@ -33,7 +33,6 @@ type listParams struct {
recursive bool recursive bool
marker string marker string
prefix string prefix string
heal bool
} }
// errWalkAbort - returned by doTreeWalk() if it returns prematurely. // errWalkAbort - returned by doTreeWalk() if it returns prematurely.

View File

@ -725,7 +725,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
recursive = false recursive = false
} }
walkResultCh, endWalkCh := s.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) walkResultCh, endWalkCh := s.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := func(bucket, entry string) bool { isLeaf := func(bucket, entry string) bool {
@ -798,7 +798,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
} }
} }
params := listParams{bucket, recursive, nextMarker, prefix, false} params := listParams{bucket, recursive, nextMarker, prefix}
if !eof { if !eof {
s.listPool.Set(params, walkResultCh, endWalkCh) s.listPool.Set(params, walkResultCh, endWalkCh)
} }
@ -1319,110 +1319,48 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
return listBuckets, nil return listBuckets, nil
} }
// listObjectsHeal - wrapper function implemented over file tree walk. // HealObjects - Heal all objects recursively at a specified prefix, any
func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { // dangling objects deleted as well automatically.
// Default is recursive, if delimiter is set then list non recursive. func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) (err error) {
recursive := true recursive := true
if delimiter == slashSeparator {
recursive = false endWalkCh := make(chan struct{})
isLeaf := func(bucket, entry string) bool {
entry = strings.TrimSuffix(entry, slashSeparator)
// Verify if we are at the leaf, a leaf is where we
// see `xl.json` inside a directory.
return s.getHashedSet(entry).isObject(bucket, entry)
} }
// "heal" true for listObjectsHeal() and false for listObjects() isLeafDir := func(bucket, entry string) bool {
walkResultCh, endWalkCh := s.listPool.Release(listParams{bucket, recursive, marker, prefix, true}) var ok bool
if walkResultCh == nil { for _, set := range s.sets {
endWalkCh = make(chan struct{}) ok = set.isObjectDir(bucket, entry)
isLeaf := func(bucket, entry string) bool { if ok {
entry = strings.TrimSuffix(entry, slashSeparator) return true
// Verify if we are at the leaf, a leaf is where we
// see `xl.json` inside a directory.
return s.getHashedSet(entry).isObject(bucket, entry)
}
isLeafDir := func(bucket, entry string) bool {
var ok bool
for _, set := range s.sets {
ok = set.isObjectDir(bucket, entry)
if ok {
return true
}
} }
return false
} }
return false
listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, s.sets...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
} }
var objInfos []ObjectInfo listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, s.sets...)
var eof bool walkResultCh := startTreeWalk(ctx, bucket, prefix, "", recursive, listDir, isLeaf, isLeafDir, endWalkCh)
var nextMarker string for {
for i := 0; i < maxKeys; {
walkResult, ok := <-walkResultCh walkResult, ok := <-walkResultCh
if !ok { if !ok {
// Closed channel.
eof = true
break break
} }
// For any walk error return right away. // For any walk error return right away.
if walkResult.err != nil { if walkResult.err != nil {
return loi, toObjectErr(walkResult.err, bucket, prefix) return toObjectErr(walkResult.err, bucket, prefix)
}
if err := healObjectFn(bucket, walkResult.entry); err != nil {
return toObjectErr(err, bucket, walkResult.entry)
} }
var objInfo ObjectInfo
objInfo.Bucket = bucket
objInfo.Name = walkResult.entry
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
i++
if walkResult.end { if walkResult.end {
eof = true
break break
} }
} }
params := listParams{bucket, recursive, nextMarker, prefix, true} return nil
if !eof {
s.listPool.Set(params, walkResultCh, endWalkCh)
}
result := ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
result.Objects = append(result.Objects, objInfo)
}
return result, nil
}
// This is not implemented yet, will be implemented later to comply with Admin API refactor.
func (s *xlSets) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
return loi, err
}
// With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return loi, nil
}
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter' along
// with the prefix. On a flat namespace with 'prefix' as '/'
// we don't have any entries, since all the keys are of form 'keyName/...'
if delimiter == slashSeparator && prefix == slashSeparator {
return loi, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
// Initiate a list operation, if successful filter and return quickly.
listObjInfo, err := s.listObjectsHeal(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err == nil {
// We got the entries successfully return.
return listObjInfo, nil
}
// Return error at the end.
return loi, toObjectErr(err, bucket, prefix)
} }

View File

@ -23,7 +23,7 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
return nil, nil return nil, nil
} }
// This is not implemented/needed anymore, look for xl-sets.ListObjectsHeal() // This is not implemented/needed anymore, look for xl-sets.HealObjects()
func (xl xlObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, healFn func(string, string) error) (e error) {
return loi, nil return nil
} }

View File

@ -69,8 +69,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
recursive = false recursive = false
} }
heal := false // true only for xl.ListObjectsHeal walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := xl.isObject isLeaf := xl.isObject
@ -125,7 +124,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
} }
} }
params := listParams{bucket, recursive, nextMarker, prefix, heal} params := listParams{bucket, recursive, nextMarker, prefix}
if !eof { if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh) xl.listPool.Set(params, walkResultCh, endWalkCh)
} }

View File

@ -1,78 +0,0 @@
// +build ignore
package main
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import (
"fmt"
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
bucket := "mybucket"
prefix := "myprefix"
// Create a done channel to control 'ListObjectsHeal' go routine.
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)
// Set true if recursive listing is needed.
isRecursive := true
// List objects that need healing for a given bucket and
// prefix.
healObjectsCh, err := madmClnt.ListObjectsHeal(bucket, prefix, isRecursive, doneCh)
if err != nil {
log.Fatalln(err)
}
for object := range healObjectsCh {
if object.Err != nil {
log.Fatalln(err)
return
}
if object.HealObjectInfo != nil {
switch healInfo := *object.HealObjectInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(object.Key, " can be healed.")
case madmin.CanPartiallyHeal:
fmt.Println(object.Key, " can't be healed completely, some disks are offline.")
case madmin.QuorumUnavailable:
fmt.Println(object.Key, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(object.Key, " can't be healed, not enough information.")
}
}
fmt.Println("object: ", object)
}
}