diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index f60f3d3ab..4985a1437 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -128,6 +128,159 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin return nil } +// HealBucket heals a bucket if it doesn't exist on one of the disks, additionally +// also heals the missing entries for bucket metadata files +// `policy.json, notification.xml, listeners.json`. +func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) ( + result madmin.HealResultItem, err error, +) { + storageDisks := er.getDisks() + storageEndpoints := er.getEndpoints() + + // Heal bucket. + return er.healBucket(ctx, storageDisks, storageEndpoints, bucket, opts) +} + +// Heal bucket - create buckets on disks where it does not exist. +func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) { + // get write quorum for an object + writeQuorum := len(storageDisks) - er.defaultParityCount + if writeQuorum == er.defaultParityCount { + writeQuorum++ + } + + if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 { + startTime := time.Now() + defer func() { + healTrace(healingMetricBucket, startTime, bucket, "", &opts, err, &res) + }() + } + + // Initialize sync waitgroup. + g := errgroup.WithNErrs(len(storageDisks)) + + // Disk states slices + beforeState := make([]string, len(storageDisks)) + afterState := make([]string, len(storageDisks)) + + // Make a volume entry on all underlying storage disks. + for index := range storageDisks { + index := index + g.Go(func() error { + if storageDisks[index] == nil { + beforeState[index] = madmin.DriveStateOffline + afterState[index] = madmin.DriveStateOffline + return errDiskNotFound + } + + beforeState[index] = madmin.DriveStateOk + afterState[index] = madmin.DriveStateOk + + if bucket == minioReservedBucket { + return nil + } + + if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil { + if serr == errDiskNotFound { + beforeState[index] = madmin.DriveStateOffline + afterState[index] = madmin.DriveStateOffline + return serr + } + if serr != errVolumeNotFound { + beforeState[index] = madmin.DriveStateCorrupt + afterState[index] = madmin.DriveStateCorrupt + return serr + } + + beforeState[index] = madmin.DriveStateMissing + afterState[index] = madmin.DriveStateMissing + + // mutate only if not a dry-run + if opts.DryRun { + return nil + } + + return serr + } + return nil + }, index) + } + + errs := g.Wait() + + // Initialize heal result info + res = madmin.HealResultItem{ + Type: madmin.HealItemBucket, + Bucket: bucket, + DiskCount: len(storageDisks), + ParityBlocks: er.defaultParityCount, + DataBlocks: len(storageDisks) - er.defaultParityCount, + } + + for i := range beforeState { + res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: storageEndpoints[i].String(), + State: beforeState[i], + }) + } + + reducedErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, res.DataBlocks) + if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate { + for i := range beforeState { + res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: storageEndpoints[i].String(), + State: madmin.DriveStateOk, + }) + } + return res, nil + } + + // Initialize sync waitgroup. + g = errgroup.WithNErrs(len(storageDisks)) + + // Make a volume entry on all underlying storage disks. + for index := range storageDisks { + index := index + g.Go(func() error { + if beforeState[index] == madmin.DriveStateMissing { + makeErr := storageDisks[index].MakeVol(ctx, bucket) + if makeErr == nil { + afterState[index] = madmin.DriveStateOk + } + return makeErr + } + return errs[index] + }, index) + } + + errs = g.Wait() + + reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum) + if reducedErr != nil { + // If we have exactly half the drives not available, + // we should still allow HealBucket to not return error. + // this is necessary for starting the server. + readQuorum := res.DataBlocks + switch reduceReadQuorumErrs(ctx, errs, nil, readQuorum) { + case nil: + case errDiskNotFound: + default: + return res, reducedErr + } + } + + for i := range afterState { + res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: storageEndpoints[i].String(), + State: afterState[i], + }) + } + return res, nil +} + // listAllBuckets lists all buckets from all disks. It also // returns the occurrence of each buckets in all disks func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error { @@ -867,22 +1020,6 @@ func isAllNotFound(errs []error) bool { return len(errs) > 0 } -// isAllBucketsNotFound will return true if all the errors are either errFileNotFound -// or errFileCorrupt -// A 0 length slice will always return false. -func isAllBucketsNotFound(errs []error) bool { - if len(errs) == 0 { - return false - } - notFoundCount := 0 - for _, err := range errs { - if err != nil && errors.Is(err, errVolumeNotFound) { - notFoundCount++ - } - } - return len(errs) == notFoundCount -} - // ObjectDir is considered dangling/corrupted if any only // if total disks - a combination of corrupted and missing // files is lesser than N/2+1 number of disks. @@ -908,7 +1045,7 @@ func isObjectDirDangling(errs []error) (ok bool) { return found < notFound && found > 0 } -// Object is considered dangling/corrupted if and only +// Object is considered dangling/corrupted if any only // if total disks - a combination of corrupted and missing // files is lesser than number of data blocks. func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (validMeta FileInfo, ok bool) { diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index af6caea91..57f6915d5 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -362,7 +362,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } // This would create the bucket. - _, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{ + _, err = er.HealBucket(ctx, bucket, madmin.HealOpts{ DryRun: false, Remove: false, }) @@ -543,7 +543,7 @@ func TestHealingVersioned(t *testing.T) { t.Fatal(err) } // This would create the bucket. - _, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{ + _, err = er.HealBucket(ctx, bucket, madmin.HealOpts{ DryRun: false, Remove: false, }) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 0570fb210..9a5b1c6a1 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1923,55 +1923,8 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts hopts.Recreate = false defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", hopts) - type DiskStat struct { - VolInfos []VolInfo - Errs []error - } - for _, pool := range z.serverPools { - // map of node wise disk stats - diskStats := make(map[string]DiskStat) - for _, set := range pool.sets { - vis := []VolInfo{} - errs := []error{} - for _, disk := range set.getDisks() { - if disk == OfflineDisk { - continue - } - vi, err := disk.StatVol(ctx, bucket) - hostName := disk.Hostname() - if disk.IsLocal() { - hostName = "local" - } - ds, ok := diskStats[hostName] - if !ok { - newds := DiskStat{ - VolInfos: vis, - Errs: errs, - } - diskStats[hostName] = newds - } else { - ds.VolInfos = append(ds.VolInfos, vi) - ds.Errs = append(ds.Errs, err) - diskStats[hostName] = ds - } - } - } - nodeCount := len(diskStats) - bktNotFoundCount := 0 - for _, ds := range diskStats { - if isAllBucketsNotFound(ds.Errs) { - bktNotFoundCount++ - } - } - // if the bucket if not found on more than hslf the no of nodes, its dagling - if bktNotFoundCount > nodeCount/2 { - opts.Remove = true - } else { - opts.Recreate = true - } - - result, err := z.s3Peer.HealBucket(ctx, bucket, opts) + result, err := pool.HealBucket(ctx, bucket, opts) if err != nil { if _, ok := err.(BucketNotFound); ok { continue diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index b32c67f5c..20987520c 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1168,6 +1168,28 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return res, nil } +// HealBucket - heals inconsistent buckets and bucket metadata on all sets. +func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { + // Initialize heal result info + result = madmin.HealResultItem{ + Type: madmin.HealItemBucket, + Bucket: bucket, + DiskCount: s.setCount * s.setDriveCount, + SetCount: s.setCount, + } + + for _, set := range s.sets { + healResult, err := set.HealBucket(ctx, bucket, opts) + if err != nil { + return result, toObjectErr(err, bucket) + } + result.Before.Drives = append(result.Before.Drives, healResult.Before.Drives...) + result.After.Drives = append(result.After.Drives, healResult.After.Drives...) + } + + return result, nil +} + // HealObject - heals inconsistent object on a hashedSet based on object name. func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts) diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 8864ca4ec..c94706b21 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -143,13 +143,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, healBuckets := make([]string, len(buckets)) copy(healBuckets, buckets) - objAPI := newObjectLayerFn() - if objAPI == nil { - return errServerNotInitialized - } - + // Heal all buckets first in this erasure set - this is useful + // for new objects upload in different buckets to be successful for _, bucket := range healBuckets { - _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode}) + _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode}) if err != nil { // Log bucket healing error if any, we shall retry again. logger.LogIf(ctx, err) @@ -201,7 +198,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // Heal current bucket again in case if it is failed // in the beginning of erasure set healing - if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ + if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ ScanMode: scanMode, }); err != nil { logger.LogIf(ctx, err) diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index 3c851caf7..c64eeba96 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -25,9 +25,7 @@ import ( "net/url" "sort" "strconv" - "time" - "github.com/minio/madmin-go/v3" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/v2/sync/errgroup" @@ -38,7 +36,6 @@ var errPeerOffline = errors.New("peer is offline") type peerS3Client interface { ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error) - HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error @@ -70,10 +67,6 @@ func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions) return listBucketsLocal(ctx, opts) } -func (l localPeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { - return healBucketLocal(ctx, bucket, opts) -} - func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { return getBucketInfoLocal(ctx, bucket, opts) } @@ -131,51 +124,6 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys { } } -// HealBucket - heals buckets at node level -func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { - g := errgroup.WithNErrs(len(sys.peerClients)) - - healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients)) - for idx, client := range sys.peerClients { - idx := idx - client := client - g.Go(func() error { - if client == nil { - return errPeerOffline - } - res, err := client.HealBucket(ctx, bucket, opts) - if err != nil { - return err - } - healBucketResults[idx] = res - return nil - }, idx) - } - - errs := g.Wait() - - for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ { - perPoolErrs := make([]error, 0, len(sys.peerClients)) - for i, client := range sys.peerClients { - if slices.Contains(client.GetPools(), poolIdx) { - perPoolErrs = append(perPoolErrs, errs[i]) - } - } - quorum := len(perPoolErrs) / 2 - if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil { - return madmin.HealResultItem{}, poolErr - } - } - - for i, err := range errs { - if err == nil { - return healBucketResults[i], nil - } - } - - return madmin.HealResultItem{}, toObjectErr(errVolumeNotFound, bucket) -} - // ListBuckets lists buckets across all nodes and returns a consistent view: // - Return an error when a pool cannot return N/2+1 valid bucket information // - For each pool, check if the bucket exists in N/2+1 nodes before including it in the final result @@ -237,22 +185,6 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu } } } - // loop through buckets and see if some with lost quorum - // these could be stale buckets lying around, queue a heal - // of such a bucket. This is needed here as we identify such - // buckets here while listing buckets. As part of regular - // globalBucketMetadataSys.Init() call would get a valid - // buckets only and not the quourum lost ones like this, so - // explicit call - for bktName, count := range bucketsMap { - if count < quorum { - // Queue a bucket heal task - globalMRFState.addPartialOp(partialOperation{ - bucket: bktName, - queued: time.Now(), - }) - } - } } result := make([]BucketInfo, 0, len(resultMap)) @@ -327,23 +259,6 @@ func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOp return buckets, err } -func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { - v := url.Values{} - v.Set(peerS3Bucket, bucket) - v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Remove)) - - respBody, err := client.call(peerS3MethodHealBucket, v, nil, -1) - if err != nil { - return madmin.HealResultItem{}, err - } - defer xhttp.DrainBody(respBody) - - var res madmin.HealResultItem - err = gob.NewDecoder(respBody).Decode(&res) - - return res, err -} - // GetBucketInfo returns bucket stat info from a peer func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { v := url.Values{} diff --git a/cmd/peer-s3-server.go b/cmd/peer-s3-server.go index 8e60c559c..9b1764bdb 100644 --- a/cmd/peer-s3-server.go +++ b/cmd/peer-s3-server.go @@ -21,10 +21,8 @@ import ( "context" "encoding/gob" "errors" - "fmt" "net/http" - "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/logger" "github.com/minio/mux" "github.com/minio/pkg/v2/sync/errgroup" @@ -44,7 +42,6 @@ const ( peerS3MethodGetBucketInfo = "/get-bucket-info" peerS3MethodDeleteBucket = "/delete-bucket" peerS3MethodListBuckets = "/list-buckets" - peerS3MethodHealBucket = "/heal-bucket" ) const ( @@ -81,133 +78,6 @@ func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) { s.IsValid(w, r) } -func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) { - globalLocalDrivesMu.RLock() - globalLocalDrives := globalLocalDrives - globalLocalDrivesMu.RUnlock() - - // Initialize sync waitgroup. - g := errgroup.WithNErrs(len(globalLocalDrives)) - - // Disk states slices - beforeState := make([]string, len(globalLocalDrives)) - afterState := make([]string, len(globalLocalDrives)) - - // Make a volume entry on all underlying storage disks. - for index := range globalLocalDrives { - index := index - g.Go(func() (serr error) { - if globalLocalDrives[index] == nil { - beforeState[index] = madmin.DriveStateOffline - afterState[index] = madmin.DriveStateOffline - return errDiskNotFound - } - - beforeState[index] = madmin.DriveStateOk - afterState[index] = madmin.DriveStateOk - - if bucket == minioReservedBucket { - return nil - } - - _, serr = globalLocalDrives[index].StatVol(ctx, bucket) - if serr != nil { - if serr == errDiskNotFound { - beforeState[index] = madmin.DriveStateOffline - afterState[index] = madmin.DriveStateOffline - return serr - } - if serr != errVolumeNotFound { - beforeState[index] = madmin.DriveStateCorrupt - afterState[index] = madmin.DriveStateCorrupt - return serr - } - - beforeState[index] = madmin.DriveStateMissing - afterState[index] = madmin.DriveStateMissing - - // mutate only if not a dry-run - if opts.DryRun { - return nil - } - - return serr - } - return nil - }, index) - } - - errs := g.Wait() - - // Initialize heal result info - res = madmin.HealResultItem{ - Type: madmin.HealItemBucket, - Bucket: bucket, - DiskCount: len(globalLocalDrives), - } - - for i := range beforeState { - res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: globalLocalDrives[i].String(), - State: beforeState[i], - }) - } - - // check dangling and delete bucket only if its not a meta bucket - if !isMinioMetaBucketName(bucket) && !isAllBucketsNotFound(errs) && opts.Remove { - g := errgroup.WithNErrs(len(globalLocalDrives)) - for index := range globalLocalDrives { - index := index - g.Go(func() error { - if globalLocalDrives[index] == nil { - return errDiskNotFound - } - err := globalLocalDrives[index].DeleteVol(ctx, bucket, false) - if errors.Is(err, errVolumeNotEmpty) { - logger.LogOnceIf(ctx, fmt.Errorf("While deleting dangling Bucket (%s), Drive %s:%s returned an error (%w)", - bucket, globalLocalDrives[index].Hostname(), globalLocalDrives[index], err), "delete-dangling-bucket-"+bucket) - } - return nil - }, index) - } - - g.Wait() - } - - // Create the quorum lost volume only if its nor makred for delete - if !opts.Remove { - // Initialize sync waitgroup. - g = errgroup.WithNErrs(len(globalLocalDrives)) - - // Make a volume entry on all underlying storage disks. - for index := range globalLocalDrives { - index := index - g.Go(func() error { - if beforeState[index] == madmin.DriveStateMissing { - makeErr := globalLocalDrives[index].MakeVol(ctx, bucket) - if makeErr == nil { - afterState[index] = madmin.DriveStateOk - } - return makeErr - } - return errs[index] - }, index) - } - - errs = g.Wait() - } - - for i := range afterState { - res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ - UUID: "", - Endpoint: globalLocalDrives[i].String(), - State: afterState[i], - }) - } - return res, nil -} - func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) { globalLocalDrivesMu.RLock() globalLocalDrives := globalLocalDrives @@ -389,30 +259,6 @@ func (s *peerS3Server) ListBucketsHandler(w http.ResponseWriter, r *http.Request logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(buckets)) } -func (s *peerS3Server) HealBucketHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - return - } - - bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true" - - bucket := r.Form.Get(peerS3Bucket) - if isMinioMetaBucket(bucket) { - s.writeErrorResponse(w, errInvalidArgument) - return - } - - res, err := healBucketLocal(r.Context(), bucket, madmin.HealOpts{ - Remove: bucketDeleted, - }) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(res)) -} - // GetBucketInfoHandler implements peer BuckeInfo call, returns bucket create date. func (s *peerS3Server) GetBucketInfoHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -487,5 +333,4 @@ func registerPeerS3Handlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodDeleteBucket).HandlerFunc(h(server.DeleteBucketHandler)) subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodGetBucketInfo).HandlerFunc(h(server.GetBucketInfoHandler)) subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodListBuckets).HandlerFunc(h(server.ListBucketsHandler)) - subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealBucket).HandlerFunc(h(server.HealBucketHandler)) }