Revert "Heal buckets at node level (#18504)"

This reverts commit 708296ae1b671efd9531729b741c96b9aaa95801.
This commit is contained in:
Harshavardhana 2023-12-05 22:34:46 -08:00
parent 8fc200c0cc
commit e30c0e7ca3
7 changed files with 183 additions and 314 deletions

View File

@ -128,6 +128,159 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin
return nil
}
// HealBucket heals a bucket if it doesn't exist on one of the disks, additionally
// also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`.
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
result madmin.HealResultItem, err error,
) {
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
// Heal bucket.
return er.healBucket(ctx, storageDisks, storageEndpoints, bucket, opts)
}
// Heal bucket - create buckets on disks where it does not exist.
func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
// get write quorum for an object
writeQuorum := len(storageDisks) - er.defaultParityCount
if writeQuorum == er.defaultParityCount {
writeQuorum++
}
if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
startTime := time.Now()
defer func() {
healTrace(healingMetricBucket, startTime, bucket, "", &opts, err, &res)
}()
}
// Initialize sync waitgroup.
g := errgroup.WithNErrs(len(storageDisks))
// Disk states slices
beforeState := make([]string, len(storageDisks))
afterState := make([]string, len(storageDisks))
// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return errDiskNotFound
}
beforeState[index] = madmin.DriveStateOk
afterState[index] = madmin.DriveStateOk
if bucket == minioReservedBucket {
return nil
}
if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil {
if serr == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return serr
}
if serr != errVolumeNotFound {
beforeState[index] = madmin.DriveStateCorrupt
afterState[index] = madmin.DriveStateCorrupt
return serr
}
beforeState[index] = madmin.DriveStateMissing
afterState[index] = madmin.DriveStateMissing
// mutate only if not a dry-run
if opts.DryRun {
return nil
}
return serr
}
return nil
}, index)
}
errs := g.Wait()
// Initialize heal result info
res = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: len(storageDisks),
ParityBlocks: er.defaultParityCount,
DataBlocks: len(storageDisks) - er.defaultParityCount,
}
for i := range beforeState {
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i].String(),
State: beforeState[i],
})
}
reducedErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, res.DataBlocks)
if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate {
for i := range beforeState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i].String(),
State: madmin.DriveStateOk,
})
}
return res, nil
}
// Initialize sync waitgroup.
g = errgroup.WithNErrs(len(storageDisks))
// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if beforeState[index] == madmin.DriveStateMissing {
makeErr := storageDisks[index].MakeVol(ctx, bucket)
if makeErr == nil {
afterState[index] = madmin.DriveStateOk
}
return makeErr
}
return errs[index]
}, index)
}
errs = g.Wait()
reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum)
if reducedErr != nil {
// If we have exactly half the drives not available,
// we should still allow HealBucket to not return error.
// this is necessary for starting the server.
readQuorum := res.DataBlocks
switch reduceReadQuorumErrs(ctx, errs, nil, readQuorum) {
case nil:
case errDiskNotFound:
default:
return res, reducedErr
}
}
for i := range afterState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i].String(),
State: afterState[i],
})
}
return res, nil
}
// listAllBuckets lists all buckets from all disks. It also
// returns the occurrence of each buckets in all disks
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error {
@ -867,22 +1020,6 @@ func isAllNotFound(errs []error) bool {
return len(errs) > 0
}
// isAllBucketsNotFound will return true if all the errors are either errFileNotFound
// or errFileCorrupt
// A 0 length slice will always return false.
func isAllBucketsNotFound(errs []error) bool {
if len(errs) == 0 {
return false
}
notFoundCount := 0
for _, err := range errs {
if err != nil && errors.Is(err, errVolumeNotFound) {
notFoundCount++
}
}
return len(errs) == notFoundCount
}
// ObjectDir is considered dangling/corrupted if any only
// if total disks - a combination of corrupted and missing
// files is lesser than N/2+1 number of disks.
@ -908,7 +1045,7 @@ func isObjectDirDangling(errs []error) (ok bool) {
return found < notFound && found > 0
}
// Object is considered dangling/corrupted if and only
// Object is considered dangling/corrupted if any only
// if total disks - a combination of corrupted and missing
// files is lesser than number of data blocks.
func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (validMeta FileInfo, ok bool) {

View File

@ -362,7 +362,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
// This would create the bucket.
_, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{
_, err = er.HealBucket(ctx, bucket, madmin.HealOpts{
DryRun: false,
Remove: false,
})
@ -543,7 +543,7 @@ func TestHealingVersioned(t *testing.T) {
t.Fatal(err)
}
// This would create the bucket.
_, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{
_, err = er.HealBucket(ctx, bucket, madmin.HealOpts{
DryRun: false,
Remove: false,
})

View File

@ -1923,55 +1923,8 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
hopts.Recreate = false
defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", hopts)
type DiskStat struct {
VolInfos []VolInfo
Errs []error
}
for _, pool := range z.serverPools {
// map of node wise disk stats
diskStats := make(map[string]DiskStat)
for _, set := range pool.sets {
vis := []VolInfo{}
errs := []error{}
for _, disk := range set.getDisks() {
if disk == OfflineDisk {
continue
}
vi, err := disk.StatVol(ctx, bucket)
hostName := disk.Hostname()
if disk.IsLocal() {
hostName = "local"
}
ds, ok := diskStats[hostName]
if !ok {
newds := DiskStat{
VolInfos: vis,
Errs: errs,
}
diskStats[hostName] = newds
} else {
ds.VolInfos = append(ds.VolInfos, vi)
ds.Errs = append(ds.Errs, err)
diskStats[hostName] = ds
}
}
}
nodeCount := len(diskStats)
bktNotFoundCount := 0
for _, ds := range diskStats {
if isAllBucketsNotFound(ds.Errs) {
bktNotFoundCount++
}
}
// if the bucket if not found on more than hslf the no of nodes, its dagling
if bktNotFoundCount > nodeCount/2 {
opts.Remove = true
} else {
opts.Recreate = true
}
result, err := z.s3Peer.HealBucket(ctx, bucket, opts)
result, err := pool.HealBucket(ctx, bucket, opts)
if err != nil {
if _, ok := err.(BucketNotFound); ok {
continue

View File

@ -1168,6 +1168,28 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
return res, nil
}
// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
// Initialize heal result info
result = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: s.setCount * s.setDriveCount,
SetCount: s.setCount,
}
for _, set := range s.sets {
healResult, err := set.HealBucket(ctx, bucket, opts)
if err != nil {
return result, toObjectErr(err, bucket)
}
result.Before.Drives = append(result.Before.Drives, healResult.Before.Drives...)
result.After.Drives = append(result.After.Drives, healResult.After.Drives...)
}
return result, nil
}
// HealObject - heals inconsistent object on a hashedSet based on object name.
func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts)

View File

@ -143,13 +143,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
healBuckets := make([]string, len(buckets))
copy(healBuckets, buckets)
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
// Heal all buckets first in this erasure set - this is useful
// for new objects upload in different buckets to be successful
for _, bucket := range healBuckets {
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode})
_, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode})
if err != nil {
// Log bucket healing error if any, we shall retry again.
logger.LogIf(ctx, err)
@ -201,7 +198,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// Heal current bucket again in case if it is failed
// in the beginning of erasure set healing
if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{
ScanMode: scanMode,
}); err != nil {
logger.LogIf(ctx, err)

View File

@ -25,9 +25,7 @@ import (
"net/url"
"sort"
"strconv"
"time"
"github.com/minio/madmin-go/v3"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/v2/sync/errgroup"
@ -38,7 +36,6 @@ var errPeerOffline = errors.New("peer is offline")
type peerS3Client interface {
ListBuckets(ctx context.Context, opts BucketOptions) ([]BucketInfo, error)
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error)
MakeBucket(ctx context.Context, bucket string, opts MakeBucketOptions) error
DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error
@ -70,10 +67,6 @@ func (l localPeerS3Client) ListBuckets(ctx context.Context, opts BucketOptions)
return listBucketsLocal(ctx, opts)
}
func (l localPeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
return healBucketLocal(ctx, bucket, opts)
}
func (l localPeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
return getBucketInfoLocal(ctx, bucket, opts)
}
@ -131,51 +124,6 @@ func NewS3PeerSys(endpoints EndpointServerPools) *S3PeerSys {
}
}
// HealBucket - heals buckets at node level
func (sys *S3PeerSys) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
g := errgroup.WithNErrs(len(sys.peerClients))
healBucketResults := make([]madmin.HealResultItem, len(sys.peerClients))
for idx, client := range sys.peerClients {
idx := idx
client := client
g.Go(func() error {
if client == nil {
return errPeerOffline
}
res, err := client.HealBucket(ctx, bucket, opts)
if err != nil {
return err
}
healBucketResults[idx] = res
return nil
}, idx)
}
errs := g.Wait()
for poolIdx := 0; poolIdx < sys.poolsCount; poolIdx++ {
perPoolErrs := make([]error, 0, len(sys.peerClients))
for i, client := range sys.peerClients {
if slices.Contains(client.GetPools(), poolIdx) {
perPoolErrs = append(perPoolErrs, errs[i])
}
}
quorum := len(perPoolErrs) / 2
if poolErr := reduceWriteQuorumErrs(ctx, perPoolErrs, bucketOpIgnoredErrs, quorum); poolErr != nil {
return madmin.HealResultItem{}, poolErr
}
}
for i, err := range errs {
if err == nil {
return healBucketResults[i], nil
}
}
return madmin.HealResultItem{}, toObjectErr(errVolumeNotFound, bucket)
}
// ListBuckets lists buckets across all nodes and returns a consistent view:
// - Return an error when a pool cannot return N/2+1 valid bucket information
// - For each pool, check if the bucket exists in N/2+1 nodes before including it in the final result
@ -237,22 +185,6 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
}
}
}
// loop through buckets and see if some with lost quorum
// these could be stale buckets lying around, queue a heal
// of such a bucket. This is needed here as we identify such
// buckets here while listing buckets. As part of regular
// globalBucketMetadataSys.Init() call would get a valid
// buckets only and not the quourum lost ones like this, so
// explicit call
for bktName, count := range bucketsMap {
if count < quorum {
// Queue a bucket heal task
globalMRFState.addPartialOp(partialOperation{
bucket: bktName,
queued: time.Now(),
})
}
}
}
result := make([]BucketInfo, 0, len(resultMap))
@ -327,23 +259,6 @@ func (client *remotePeerS3Client) ListBuckets(ctx context.Context, opts BucketOp
return buckets, err
}
func (client *remotePeerS3Client) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
v := url.Values{}
v.Set(peerS3Bucket, bucket)
v.Set(peerS3BucketDeleted, strconv.FormatBool(opts.Remove))
respBody, err := client.call(peerS3MethodHealBucket, v, nil, -1)
if err != nil {
return madmin.HealResultItem{}, err
}
defer xhttp.DrainBody(respBody)
var res madmin.HealResultItem
err = gob.NewDecoder(respBody).Decode(&res)
return res, err
}
// GetBucketInfo returns bucket stat info from a peer
func (client *remotePeerS3Client) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) {
v := url.Values{}

View File

@ -21,10 +21,8 @@ import (
"context"
"encoding/gob"
"errors"
"fmt"
"net/http"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/logger"
"github.com/minio/mux"
"github.com/minio/pkg/v2/sync/errgroup"
@ -44,7 +42,6 @@ const (
peerS3MethodGetBucketInfo = "/get-bucket-info"
peerS3MethodDeleteBucket = "/delete-bucket"
peerS3MethodListBuckets = "/list-buckets"
peerS3MethodHealBucket = "/heal-bucket"
)
const (
@ -81,133 +78,6 @@ func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) {
s.IsValid(w, r)
}
func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
globalLocalDrivesMu.RLock()
globalLocalDrives := globalLocalDrives
globalLocalDrivesMu.RUnlock()
// Initialize sync waitgroup.
g := errgroup.WithNErrs(len(globalLocalDrives))
// Disk states slices
beforeState := make([]string, len(globalLocalDrives))
afterState := make([]string, len(globalLocalDrives))
// Make a volume entry on all underlying storage disks.
for index := range globalLocalDrives {
index := index
g.Go(func() (serr error) {
if globalLocalDrives[index] == nil {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return errDiskNotFound
}
beforeState[index] = madmin.DriveStateOk
afterState[index] = madmin.DriveStateOk
if bucket == minioReservedBucket {
return nil
}
_, serr = globalLocalDrives[index].StatVol(ctx, bucket)
if serr != nil {
if serr == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return serr
}
if serr != errVolumeNotFound {
beforeState[index] = madmin.DriveStateCorrupt
afterState[index] = madmin.DriveStateCorrupt
return serr
}
beforeState[index] = madmin.DriveStateMissing
afterState[index] = madmin.DriveStateMissing
// mutate only if not a dry-run
if opts.DryRun {
return nil
}
return serr
}
return nil
}, index)
}
errs := g.Wait()
// Initialize heal result info
res = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: len(globalLocalDrives),
}
for i := range beforeState {
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: globalLocalDrives[i].String(),
State: beforeState[i],
})
}
// check dangling and delete bucket only if its not a meta bucket
if !isMinioMetaBucketName(bucket) && !isAllBucketsNotFound(errs) && opts.Remove {
g := errgroup.WithNErrs(len(globalLocalDrives))
for index := range globalLocalDrives {
index := index
g.Go(func() error {
if globalLocalDrives[index] == nil {
return errDiskNotFound
}
err := globalLocalDrives[index].DeleteVol(ctx, bucket, false)
if errors.Is(err, errVolumeNotEmpty) {
logger.LogOnceIf(ctx, fmt.Errorf("While deleting dangling Bucket (%s), Drive %s:%s returned an error (%w)",
bucket, globalLocalDrives[index].Hostname(), globalLocalDrives[index], err), "delete-dangling-bucket-"+bucket)
}
return nil
}, index)
}
g.Wait()
}
// Create the quorum lost volume only if its nor makred for delete
if !opts.Remove {
// Initialize sync waitgroup.
g = errgroup.WithNErrs(len(globalLocalDrives))
// Make a volume entry on all underlying storage disks.
for index := range globalLocalDrives {
index := index
g.Go(func() error {
if beforeState[index] == madmin.DriveStateMissing {
makeErr := globalLocalDrives[index].MakeVol(ctx, bucket)
if makeErr == nil {
afterState[index] = madmin.DriveStateOk
}
return makeErr
}
return errs[index]
}, index)
}
errs = g.Wait()
}
for i := range afterState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: globalLocalDrives[i].String(),
State: afterState[i],
})
}
return res, nil
}
func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
globalLocalDrivesMu.RLock()
globalLocalDrives := globalLocalDrives
@ -389,30 +259,6 @@ func (s *peerS3Server) ListBucketsHandler(w http.ResponseWriter, r *http.Request
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(buckets))
}
func (s *peerS3Server) HealBucketHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
bucketDeleted := r.Form.Get(peerS3BucketDeleted) == "true"
bucket := r.Form.Get(peerS3Bucket)
if isMinioMetaBucket(bucket) {
s.writeErrorResponse(w, errInvalidArgument)
return
}
res, err := healBucketLocal(r.Context(), bucket, madmin.HealOpts{
Remove: bucketDeleted,
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(res))
}
// GetBucketInfoHandler implements peer BuckeInfo call, returns bucket create date.
func (s *peerS3Server) GetBucketInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -487,5 +333,4 @@ func registerPeerS3Handlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodDeleteBucket).HandlerFunc(h(server.DeleteBucketHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodGetBucketInfo).HandlerFunc(h(server.GetBucketInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodListBuckets).HandlerFunc(h(server.ListBucketsHandler))
subrouter.Methods(http.MethodPost).Path(peerS3VersionPrefix + peerS3MethodHealBucket).HandlerFunc(h(server.HealBucketHandler))
}