diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 4985a1437..f60f3d3ab 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -128,159 +128,6 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin return nil } -// HealBucket heals a bucket if it doesn't exist on one of the disks, additionally -// also heals the missing entries for bucket metadata files -// `policy.json, notification.xml, listeners.json`. -func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) ( - result madmin.HealResultItem, err error, -) { - storageDisks := er.getDisks() - storageEndpoints := er.getEndpoints() - - // Heal bucket. - return er.healBucket(ctx, storageDisks, storageEndpoints, bucket, opts) -} - -// Heal bucket - create buckets on disks where it does not exist. -func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) { - // get write quorum for an object - writeQuorum := len(storageDisks) - er.defaultParityCount - if writeQuorum == er.defaultParityCount { - writeQuorum++ - } - - if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 { - startTime := time.Now() - defer func() { - healTrace(healingMetricBucket, startTime, bucket, "", &opts, err, &res) - }() - } - - // Initialize sync waitgroup. - g := errgroup.WithNErrs(len(storageDisks)) - - // Disk states slices - beforeState := make([]string, len(storageDisks)) - afterState := make([]string, len(storageDisks)) - - // Make a volume entry on all underlying storage disks. - for index := range storageDisks { - index := index - g.Go(func() error { - if storageDisks[index] == nil { - beforeState[index] = madmin.DriveStateOffline - afterState[index] = madmin.DriveStateOffline - return errDiskNotFound - } - - beforeState[index] = madmin.DriveStateOk - afterState[index] = madmin.DriveStateOk - - if bucket == minioReservedBucket { - return nil - } - - if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil { - if serr == errDiskNotFound { - beforeState[index] = madmin.DriveStateOffline - afterState[index] = madmin.DriveStateOffline - return serr - } - if serr != errVolumeNotFound { - beforeState[index] = madmin.DriveStateCorrupt - afterState[index] = madmin.DriveStateCorrupt - return serr - } - - beforeState[index] = madmin.DriveStateMissing - afterState[index] = madmin.DriveStateMissing - - // mutate only if not a dry-run - if opts.DryRun { - return nil - } - - return serr - } - return nil - }, index) - } - - errs := g.Wait() - - // Initialize heal result info - res = madmin.HealResultItem{ - Type: madmin.HealItemBucket, - Bucket: bucket, - DiskCount: len(storageDisks), - ParityBlocks: er.defaultParityCount, - DataBlocks: len(storageDisks) - er.defaultParityCount, - } - - for i := range beforeState { - res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: storageEndpoints[i].String(), - State: beforeState[i], - }) - } - - reducedErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, res.DataBlocks) - if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate { - for i := range beforeState { - res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: storageEndpoints[i].String(), - State: madmin.DriveStateOk, - }) - } - return res, nil - } - - // Initialize sync waitgroup. - g = errgroup.WithNErrs(len(storageDisks)) - - // Make a volume entry on all underlying storage disks. - for index := range storageDisks { - index := index - g.Go(func() error { - if beforeState[index] == madmin.DriveStateMissing { - makeErr := storageDisks[index].MakeVol(ctx, bucket) - if makeErr == nil { - afterState[index] = madmin.DriveStateOk - } - return makeErr - } - return errs[index] - }, index) - } - - errs = g.Wait() - - reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum) - if reducedErr != nil { - // If we have exactly half the drives not available, - // we should still allow HealBucket to not return error. - // this is necessary for starting the server. - readQuorum := res.DataBlocks - switch reduceReadQuorumErrs(ctx, errs, nil, readQuorum) { - case nil: - case errDiskNotFound: - default: - return res, reducedErr - } - } - - for i := range afterState { - res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: storageEndpoints[i].String(), - State: afterState[i], - }) - } - return res, nil -} - // listAllBuckets lists all buckets from all disks. It also // returns the occurrence of each buckets in all disks func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error { @@ -1020,6 +867,22 @@ func isAllNotFound(errs []error) bool { return len(errs) > 0 } +// isAllBucketsNotFound will return true if all the errors are either errFileNotFound +// or errFileCorrupt +// A 0 length slice will always return false. +func isAllBucketsNotFound(errs []error) bool { + if len(errs) == 0 { + return false + } + notFoundCount := 0 + for _, err := range errs { + if err != nil && errors.Is(err, errVolumeNotFound) { + notFoundCount++ + } + } + return len(errs) == notFoundCount +} + // ObjectDir is considered dangling/corrupted if any only // if total disks - a combination of corrupted and missing // files is lesser than N/2+1 number of disks. @@ -1045,7 +908,7 @@ func isObjectDirDangling(errs []error) (ok bool) { return found < notFound && found > 0 } -// Object is considered dangling/corrupted if any only +// Object is considered dangling/corrupted if and only // if total disks - a combination of corrupted and missing // files is lesser than number of data blocks. func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (validMeta FileInfo, ok bool) { diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 57f6915d5..af6caea91 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -362,7 +362,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } // This would create the bucket. - _, err = er.HealBucket(ctx, bucket, madmin.HealOpts{ + _, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{ DryRun: false, Remove: false, }) @@ -543,7 +543,7 @@ func TestHealingVersioned(t *testing.T) { t.Fatal(err) } // This would create the bucket. - _, err = er.HealBucket(ctx, bucket, madmin.HealOpts{ + _, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{ DryRun: false, Remove: false, }) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 9a5b1c6a1..0570fb210 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1923,8 +1923,55 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts hopts.Recreate = false defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", hopts) + type DiskStat struct { + VolInfos []VolInfo + Errs []error + } + for _, pool := range z.serverPools { - result, err := pool.HealBucket(ctx, bucket, opts) + // map of node wise disk stats + diskStats := make(map[string]DiskStat) + for _, set := range pool.sets { + vis := []VolInfo{} + errs := []error{} + for _, disk := range set.getDisks() { + if disk == OfflineDisk { + continue + } + vi, err := disk.StatVol(ctx, bucket) + hostName := disk.Hostname() + if disk.IsLocal() { + hostName = "local" + } + ds, ok := diskStats[hostName] + if !ok { + newds := DiskStat{ + VolInfos: vis, + Errs: errs, + } + diskStats[hostName] = newds + } else { + ds.VolInfos = append(ds.VolInfos, vi) + ds.Errs = append(ds.Errs, err) + diskStats[hostName] = ds + } + } + } + nodeCount := len(diskStats) + bktNotFoundCount := 0 + for _, ds := range diskStats { + if isAllBucketsNotFound(ds.Errs) { + bktNotFoundCount++ + } + } + // if the bucket if not found on more than hslf the no of nodes, its dagling + if bktNotFoundCount > nodeCount/2 { + opts.Remove = true + } else { + opts.Recreate = true + } + + result, err := z.s3Peer.HealBucket(ctx, bucket, opts) if err != nil { if _, ok := err.(BucketNotFound); ok { continue diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 20987520c..b32c67f5c 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1168,28 +1168,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return res, nil } -// HealBucket - heals inconsistent buckets and bucket metadata on all sets. -func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { - // Initialize heal result info - result = madmin.HealResultItem{ - Type: madmin.HealItemBucket, - Bucket: bucket, - DiskCount: s.setCount * s.setDriveCount, - SetCount: s.setCount, - } - - for _, set := range s.sets { - healResult, err := set.HealBucket(ctx, bucket, opts) - if err != nil { - return result, toObjectErr(err, bucket) - } - result.Before.Drives = append(result.Before.Drives, healResult.Before.Drives...) - result.After.Drives = append(result.After.Drives, healResult.After.Drives...) - } - - return result, nil -} - // HealObject - heals inconsistent object on a hashedSet based on object name. func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts) diff --git a/cmd/global-heal.go b/cmd/global-heal.go index c94706b21..8864ca4ec 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -143,10 +143,13 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, healBuckets := make([]string, len(buckets)) copy(healBuckets, buckets) - // Heal all buckets first in this erasure set - this is useful - // for new objects upload in different buckets to be successful + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + for _, bucket := range healBuckets { - _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode}) + _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode}) if err != nil { // Log bucket healing error if any, we shall retry again. logger.LogIf(ctx, err) @@ -198,7 +201,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // Heal current bucket again in case if it is failed // in the beginning of erasure set healing - if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ + if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ ScanMode: scanMode, }); err != nil { logger.LogIf(ctx, err) diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index c64eeba96..3c851caf7 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -25,7 +25,9 @@ import ( "net/url" "sort" "strconv" + "time" + "github.com/minio/madmin-go/v3" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/v2/sync/errgroup" @@ -36,6 +38,7 @@ var errPeerOffline = errors.New("peer is offline") type peerS3Client interface { ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) + HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error @@ -67,6 +70,10 @@ func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) return listBucketsLocal(ctx, opts) } +func (l localPeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { + return healBucketLocal(ctx, bucket, opts) +} + func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { return getBucketInfoLocal(ctx, bucket, opts) } @@ -124,6 +131,51 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys { } } +// HealBucket - heals buckets at node level +func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { + g := errgroup.WithNErrs(len(sys.peerClients)) + + healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients)) + for idx, client := range sys.peerClients { + idx := idx + client := client + g.Go(func() error { + if client == nil { + return errPeerOffline + } + res, err := client.HealBucket(ctx, bucket, opts) + if err != nil { + return err + } + healBucketResults[idx] = res + return nil + }, idx) + } + + errs := g.Wait() + + for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ { + perPoolErrs := make([]error, 0, len(sys.peerClients)) + for i, client := range sys.peerClients { + if slices.Contains(client.GetPools(), poolIdx) { + perPoolErrs = append(perPoolErrs, errs[i]) + } + } + quorum := len(perPoolErrs) / 2 + if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil { + return madmin.HealResultItem{}, poolErr + } + } + + for i, err := range errs { + if err == nil { + return healBucketResults[i], nil + } + } + + return madmin.HealResultItem{}, toObjectErr(errVolumeNotFound, bucket) +} + // ListBuckets lists buckets across all nodes and returns a consistent view: // - Return an error when a pool cannot return N/2+1 valid bucket information // - For each pool, check if the bucket exists in N/2+1 nodes before including it in the final result @@ -185,6 +237,22 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu } } } + // loop through buckets and see if some with lost quorum + // these could be stale buckets lying around, queue a heal + // of such a bucket. This is needed here as we identify such + // buckets here while listing buckets. As part of regular + // globalBucketMetadataSys.Init() call would get a valid + // buckets only and not the quourum lost ones like this, so + // explicit call + for bktName, count := range bucketsMap { + if count < quorum { + // Queue a bucket heal task + globalMRFState.addPartialOp(partialOperation{ + bucket: bktName, + queued: time.Now(), + }) + } + } } result := make([]BucketInfo, 0, len(resultMap)) @@ -259,6 +327,23 @@ func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOp return buckets, err } +func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { + v := url.Values{} + v.Set(peerS3Bucket, bucket) + v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Remove)) + + respBody, err := client.call(peerS3MethodHealBucket, v, nil, -1) + if err != nil { + return madmin.HealResultItem{}, err + } + defer xhttp.DrainBody(respBody) + + var res madmin.HealResultItem + err = gob.NewDecoder(respBody).Decode(&res) + + return res, err +} + // GetBucketInfo returns bucket stat info from a peer func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { v := url.Values{} diff --git a/cmd/peer-s3-server.go b/cmd/peer-s3-server.go index 9b1764bdb..8e60c559c 100644 --- a/cmd/peer-s3-server.go +++ b/cmd/peer-s3-server.go @@ -21,8 +21,10 @@ import ( "context" "encoding/gob" "errors" + "fmt" "net/http" + "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/logger" "github.com/minio/mux" "github.com/minio/pkg/v2/sync/errgroup" @@ -42,6 +44,7 @@ const ( peerS3MethodGetBucketInfo = "/get-bucket-info" peerS3MethodDeleteBucket = "/delete-bucket" peerS3MethodListBuckets = "/list-buckets" + peerS3MethodHealBucket = "/heal-bucket" ) const ( @@ -78,6 +81,133 @@ func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) { s.IsValid(w, r) } +func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) { + globalLocalDrivesMu.RLock() + globalLocalDrives := globalLocalDrives + globalLocalDrivesMu.RUnlock() + + // Initialize sync waitgroup. + g := errgroup.WithNErrs(len(globalLocalDrives)) + + // Disk states slices + beforeState := make([]string, len(globalLocalDrives)) + afterState := make([]string, len(globalLocalDrives)) + + // Make a volume entry on all underlying storage disks. + for index := range globalLocalDrives { + index := index + g.Go(func() (serr error) { + if globalLocalDrives[index] == nil { + beforeState[index] = madmin.DriveStateOffline + afterState[index] = madmin.DriveStateOffline + return errDiskNotFound + } + + beforeState[index] = madmin.DriveStateOk + afterState[index] = madmin.DriveStateOk + + if bucket == minioReservedBucket { + return nil + } + + _, serr = globalLocalDrives[index].StatVol(ctx, bucket) + if serr != nil { + if serr == errDiskNotFound { + beforeState[index] = madmin.DriveStateOffline + afterState[index] = madmin.DriveStateOffline + return serr + } + if serr != errVolumeNotFound { + beforeState[index] = madmin.DriveStateCorrupt + afterState[index] = madmin.DriveStateCorrupt + return serr + } + + beforeState[index] = madmin.DriveStateMissing + afterState[index] = madmin.DriveStateMissing + + // mutate only if not a dry-run + if opts.DryRun { + return nil + } + + return serr + } + return nil + }, index) + } + + errs := g.Wait() + + // Initialize heal result info + res = madmin.HealResultItem{ + Type: madmin.HealItemBucket, + Bucket: bucket, + DiskCount: len(globalLocalDrives), + } + + for i := range beforeState { + res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: globalLocalDrives[i].String(), + State: beforeState[i], + }) + } + + // check dangling and delete bucket only if its not a meta bucket + if !isMinioMetaBucketName(bucket) && !isAllBucketsNotFound(errs) && opts.Remove { + g := errgroup.WithNErrs(len(globalLocalDrives)) + for index := range globalLocalDrives { + index := index + g.Go(func() error { + if globalLocalDrives[index] == nil { + return errDiskNotFound + } + err := globalLocalDrives[index].DeleteVol(ctx, bucket, false) + if errors.Is(err, errVolumeNotEmpty) { + logger.LogOnceIf(ctx, fmt.Errorf("While deleting dangling Bucket (%s), Drive %s:%s returned an error (%w)", + bucket, globalLocalDrives[index].Hostname(), globalLocalDrives[index], err), "delete-dangling-bucket-"+bucket) + } + return nil + }, index) + } + + g.Wait() + } + + // Create the quorum lost volume only if its nor makred for delete + if !opts.Remove { + // Initialize sync waitgroup. + g = errgroup.WithNErrs(len(globalLocalDrives)) + + // Make a volume entry on all underlying storage disks. + for index := range globalLocalDrives { + index := index + g.Go(func() error { + if beforeState[index] == madmin.DriveStateMissing { + makeErr := globalLocalDrives[index].MakeVol(ctx, bucket) + if makeErr == nil { + afterState[index] = madmin.DriveStateOk + } + return makeErr + } + return errs[index] + }, index) + } + + errs = g.Wait() + } + + for i := range afterState { + res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: globalLocalDrives[i].String(), + State: afterState[i], + }) + } + return res, nil +} + func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) { globalLocalDrivesMu.RLock() globalLocalDrives := globalLocalDrives @@ -259,6 +389,30 @@ func (s *peerS3Server) ListBucketsHandler(w http.ResponseWriter, r *http.Request logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(buckets)) } +func (s *peerS3Server) HealBucketHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + + bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true" + + bucket := r.Form.Get(peerS3Bucket) + if isMinioMetaBucket(bucket) { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + res, err := healBucketLocal(r.Context(), bucket, madmin.HealOpts{ + Remove: bucketDeleted, + }) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(res)) +} + // GetBucketInfoHandler implements peer BuckeInfo call, returns bucket create date. func (s *peerS3Server) GetBucketInfoHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -333,4 +487,5 @@ func registerPeerS3Handlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodDeleteBucket).HandlerFunc(h(server.DeleteBucketHandler)) subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodGetBucketInfo).HandlerFunc(h(server.GetBucketInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodListBuckets).HandlerFunc(h(server.ListBucketsHandler)) + subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealBucket).HandlerFunc(h(server.HealBucketHandler)) }