diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index cf2aa16f9..b34180f9b 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -345,18 +345,18 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { initBackgroundHealing(ctx, objAPI) // start quick background healing - globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) - if env.Get("_MINIO_AUTO_DRIVE_HEALING", config.EnableOn) == config.EnableOn || env.Get("_MINIO_AUTO_DISK_HEALING", config.EnableOn) == config.EnableOn { + globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) + go monitorLocalDisksAndHeal(ctx, z) } } func getLocalDisksToHeal() (disksToHeal Endpoints) { globalLocalDrivesMu.RLock() - globalLocalDrives := globalLocalDrives + localDrives := globalLocalDrives globalLocalDrivesMu.RUnlock() - for _, disk := range globalLocalDrives { + for _, disk := range localDrives { _, err := disk.GetDiskID() if errors.Is(err, errUnformattedDisk) { disksToHeal = append(disksToHeal, disk.Endpoint()) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 8169d5cd9..9266d321e 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1657,9 +1657,11 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } // Return an error if the bucket does not exist - if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil && !forceDelete { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + if !forceDelete { + if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } } // Attempt to delete bucket. diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 6ef3d3343..3a967d6e8 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -3263,7 +3263,11 @@ func (p *ReplicationPool) persistToDrive(ctx context.Context, v MRFReplicateEntr return r } - for _, localDrive := range globalLocalDrives { + globalLocalDrivesMu.RLock() + localDrives := globalLocalDrives + globalLocalDrivesMu.RUnlock() + + for _, localDrive := range localDrives { r := newReader() err := localDrive.CreateFile(ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), -1, r) r.Close() @@ -3325,7 +3329,12 @@ func (p *ReplicationPool) loadMRF() (mrfRec MRFReplicateEntries, err error) { return re, nil } - for _, localDrive := range globalLocalDrives { + + globalLocalDrivesMu.RLock() + localDrives := globalLocalDrives + globalLocalDrivesMu.RUnlock() + + for _, localDrive := range localDrives { rc, err := localDrive.ReadFileStream(p.ctx, minioMetaBucket, pathJoin(replicationMRFDir, globalLocalNodeNameHex+".bin"), 0, -1) if err != nil { continue diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index f20cc319e..6ee016514 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -107,12 +107,6 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ return nil, err } - for _, storageDisk := range storageDisks[i] { - if storageDisk != nil && storageDisk.IsLocal() { - localDrives = append(localDrives, storageDisk) - } - } - if deploymentID == "" { // all pools should have same deployment ID deploymentID = formats[i].ID @@ -139,6 +133,18 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ if distributionAlgo != "" && z.distributionAlgo == "" { z.distributionAlgo = distributionAlgo } + + for _, storageDisk := range storageDisks[i] { + if storageDisk != nil && storageDisk.IsLocal() { + localDrives = append(localDrives, storageDisk) + } + } + } + + if !globalIsDistErasure { + globalLocalDrivesMu.Lock() + globalLocalDrives = localDrives + globalLocalDrivesMu.Unlock() } z.decommissionCancelers = make([]context.CancelFunc, len(z.serverPools)) @@ -167,10 +173,6 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ break } - globalLocalDrivesMu.Lock() - globalLocalDrives = localDrives - defer globalLocalDrivesMu.Unlock() - return z, nil } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 20987520c..22ed3a7f4 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -262,6 +262,10 @@ func (s *erasureSets) connectDisks() { disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) s.erasureDisks[setIndex][diskIndex] = disk s.erasureDisksMu.Unlock() + + globalLocalDrivesMu.Lock() + globalLocalSetDrives[s.poolIndex][setIndex][diskIndex] = disk + globalLocalDrivesMu.Unlock() }(endpoint) } diff --git a/cmd/metrics-resource.go b/cmd/metrics-resource.go index c4a677469..bb66db7de 100644 --- a/cmd/metrics-resource.go +++ b/cmd/metrics-resource.go @@ -252,10 +252,10 @@ func collectDriveMetrics(m madmin.RealtimeMetrics) { } globalLocalDrivesMu.RLock() - gld := globalLocalDrives + localDrives := globalLocalDrives globalLocalDrivesMu.RUnlock() - for _, d := range gld { + for _, d := range localDrives { labels := map[string]string{"drive": d.Endpoint().RawPath} di, err := d.DiskInfo(GlobalContext, false) if err == nil { diff --git a/cmd/peer-s3-server.go b/cmd/peer-s3-server.go index 9b1764bdb..80b95eb05 100644 --- a/cmd/peer-s3-server.go +++ b/cmd/peer-s3-server.go @@ -80,16 +80,16 @@ func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) { func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) { globalLocalDrivesMu.RLock() - globalLocalDrives := globalLocalDrives + localDrives := globalLocalDrives globalLocalDrivesMu.RUnlock() - quorum := (len(globalLocalDrives) / 2) + quorum := (len(localDrives) / 2) buckets = make([]BucketInfo, 0, 32) healBuckets := map[string]VolInfo{} // lists all unique buckets across drives. - if err := listAllBuckets(ctx, globalLocalDrives, healBuckets, quorum); err != nil { + if err := listAllBuckets(ctx, localDrives, healBuckets, quorum); err != nil { return nil, err } @@ -98,7 +98,7 @@ func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []Bucket if opts.Deleted { // lists all deleted buckets across drives. - if err := listDeletedBuckets(ctx, globalLocalDrives, deletedBuckets, quorum); err != nil { + if err := listDeletedBuckets(ctx, localDrives, deletedBuckets, quorum); err != nil { return nil, err } } @@ -128,23 +128,23 @@ func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []Bucket func getBucketInfoLocal(ctx context.Context, bucket string, opts BucketOptions) (BucketInfo, error) { globalLocalDrivesMu.RLock() - globalLocalDrives := globalLocalDrives + localDrives := globalLocalDrives globalLocalDrivesMu.RUnlock() - g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32) - bucketsInfo := make([]BucketInfo, len(globalLocalDrives)) + g := errgroup.WithNErrs(len(localDrives)).WithConcurrency(32) + bucketsInfo := make([]BucketInfo, len(localDrives)) // Make a volume entry on all underlying storage disks. - for index := range globalLocalDrives { + for index := range localDrives { index := index g.Go(func() error { - if globalLocalDrives[index] == nil { + if localDrives[index] == nil { return errDiskNotFound } - volInfo, err := globalLocalDrives[index].StatVol(ctx, bucket) + volInfo, err := localDrives[index].StatVol(ctx, bucket) if err != nil { if opts.Deleted { - dvi, derr := globalLocalDrives[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket)) + dvi, derr := localDrives[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket)) if derr != nil { return err } @@ -160,7 +160,7 @@ func getBucketInfoLocal(ctx context.Context, bucket string, opts BucketOptions) } errs := g.Wait() - if err := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives) / 2)); err != nil { + if err := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(localDrives) / 2)); err != nil { return BucketInfo{}, err } @@ -177,19 +177,19 @@ func getBucketInfoLocal(ctx context.Context, bucket string, opts BucketOptions) func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOptions) error { globalLocalDrivesMu.RLock() - globalLocalDrives := globalLocalDrives + localDrives := globalLocalDrives globalLocalDrivesMu.RUnlock() - g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32) + g := errgroup.WithNErrs(len(localDrives)).WithConcurrency(32) // Make a volume entry on all underlying storage disks. - for index := range globalLocalDrives { + for index := range localDrives { index := index g.Go(func() error { - if globalLocalDrives[index] == nil { + if localDrives[index] == nil { return errDiskNotFound } - return globalLocalDrives[index].DeleteVol(ctx, bucket, opts.Force) + return localDrives[index].DeleteVol(ctx, bucket, opts.Force) }, index) } @@ -201,7 +201,7 @@ func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOpti } if err == nil && recreate { // ignore any errors - globalLocalDrives[index].MakeVol(ctx, bucket) + localDrives[index].MakeVol(ctx, bucket) } } @@ -210,24 +210,24 @@ func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOpti return errVolumeNotEmpty } // for all other errors reduce by write quorum. - return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives)/2)+1) + return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(localDrives)/2)+1) } func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) error { globalLocalDrivesMu.RLock() - globalLocalDrives := globalLocalDrives + localDrives := globalLocalDrives globalLocalDrivesMu.RUnlock() - g := errgroup.WithNErrs(len(globalLocalDrives)).WithConcurrency(32) + g := errgroup.WithNErrs(len(localDrives)).WithConcurrency(32) // Make a volume entry on all underlying storage disks. - for index := range globalLocalDrives { + for index := range localDrives { index := index g.Go(func() error { - if globalLocalDrives[index] == nil { + if localDrives[index] == nil { return errDiskNotFound } - err := globalLocalDrives[index].MakeVol(ctx, bucket) + err := localDrives[index].MakeVol(ctx, bucket) if opts.ForceCreate && errors.Is(err, errVolumeExists) { // No need to return error when force create was // requested. @@ -238,7 +238,7 @@ func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) } errs := g.Wait() - return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(globalLocalDrives)/2)+1) + return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(localDrives)/2)+1) } func (s *peerS3Server) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index c0b8fd9c0..b686da42a 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -34,7 +34,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/minio/minio/internal/grid" @@ -55,18 +54,13 @@ var errDiskStale = errors.New("drive stale") // To abstract a disk over network. type storageRESTServer struct { - storage atomic.Value + poolIndex, setIndex, diskIndex int } -func (s *storageRESTServer) getStorage() *xlStorageDiskIDCheck { - if s, ok := s.storage.Load().(*xlStorageDiskIDCheck); ok { - return s - } - return nil -} - -func (s *storageRESTServer) setStorage(xl *xlStorageDiskIDCheck) { - s.storage.Store(xl) +func (s *storageRESTServer) getStorage() StorageAPI { + globalLocalDrivesMu.RLock() + defer globalLocalDrivesMu.RUnlock() + return globalLocalSetDrives[s.poolIndex][s.setIndex][s.diskIndex] } func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { @@ -218,7 +212,6 @@ func (s *storageRESTServer) DiskInfoHandler(params *grid.MSS) (*DiskInfo, *grid. if err != nil { info.Error = err.Error() } - info.Scanning = s.getStorage().storage != nil && atomic.LoadInt32(&s.getStorage().storage.scanning) > 0 return &info, nil } @@ -735,6 +728,7 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http setEventStreamHeaders(w) encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) + errs := s.getStorage().DeleteVersions(r.Context(), volume, versions) done(nil) for idx := range versions { @@ -1332,25 +1326,36 @@ func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) rw.CloseWithError(err) } +// globalLocalSetDrives is used for local drive as well as remote REST +// API caller for other nodes to talk to this node. +// +// Any updates to this must be serialized via globalLocalDrivesMu (locker) +var globalLocalSetDrives [][][]StorageAPI + // registerStorageRESTHandlers - register storage rpc router. func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools, gm *grid.Manager) { h := func(f http.HandlerFunc) http.HandlerFunc { return collectInternodeStats(httpTraceHdrs(f)) } - driveHandlers := make([][]*storageRESTServer, len(endpointServerPools)) - for pool, serverPool := range endpointServerPools { - driveHandlers[pool] = make([]*storageRESTServer, len(serverPool.Endpoints)) + globalLocalSetDrives = make([][][]StorageAPI, len(endpointServerPools)) + for pool := range globalLocalSetDrives { + globalLocalSetDrives[pool] = make([][]StorageAPI, endpointServerPools[pool].SetCount) + for set := range globalLocalSetDrives[pool] { + globalLocalSetDrives[pool][set] = make([]StorageAPI, endpointServerPools[pool].DrivesPerSet) + } } - - for pool, serverPool := range endpointServerPools { - for set, endpoint := range serverPool.Endpoints { + for _, serverPool := range endpointServerPools { + for _, endpoint := range serverPool.Endpoints { if !endpoint.IsLocal { continue } - driveHandlers[pool][set] = &storageRESTServer{} - server := driveHandlers[pool][set] + server := &storageRESTServer{ + poolIndex: endpoint.PoolIdx, + setIndex: endpoint.SetIdx, + diskIndex: endpoint.DiskIdx, + } subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter() @@ -1404,16 +1409,23 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin } storage := newXLStorageDiskIDCheck(xl, true) storage.SetDiskID(xl.diskID) - server.setStorage(storage) + + globalLocalDrivesMu.Lock() + defer globalLocalDrivesMu.Unlock() + + globalLocalDrives = append(globalLocalDrives, storage) + globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] = storage return true } + if createStorage(server) { continue } + // Start async goroutine to create storage. go func(server *storageRESTServer) { for { - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) if createStorage(server) { return } diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index 0eeebba07..bce633cf0 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -470,13 +470,19 @@ func newStorageRESTHTTPServerClient(t testing.TB) *storageRESTClient { t.Fatalf("UpdateIsLocal failed %v", err) } + endpoint.PoolIdx = 0 + endpoint.SetIdx = 0 + endpoint.DiskIdx = 0 + + poolEps := []PoolEndpoints{{ + Endpoints: Endpoints{endpoint}, + }} + poolEps[0].SetCount = 1 + poolEps[0].DrivesPerSet = 1 + // Register handlers on newly created servers - registerStorageRESTHandlers(tg.Mux[0], []PoolEndpoints{{ - Endpoints: Endpoints{endpoint}, - }}, tg.Managers[0]) - registerStorageRESTHandlers(tg.Mux[1], []PoolEndpoints{{ - Endpoints: Endpoints{endpoint}, - }}, tg.Managers[1]) + registerStorageRESTHandlers(tg.Mux[0], poolEps, tg.Managers[0]) + registerStorageRESTHandlers(tg.Mux[1], poolEps, tg.Managers[1]) restClient, err := newStorageRESTClient(endpoint, false, tg.Managers[0]) if err != nil {