diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 7ae3f7e12..38c91f061 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -428,7 +428,11 @@ func (s *storageRESTServer) WalkDirHandler(ctx context.Context, payload []byte, return grid.NewRemoteErr(err) } + if !s.checkID(opts.DiskID) { + return grid.NewRemoteErr(errDiskNotFound) + } + ctx, cancel := context.WithCancel(ctx) defer cancel() - return grid.NewRemoteErr(s.storage.WalkDir(ctx, opts, grid.WriterToChannel(ctx, out))) + return grid.NewRemoteErr(s.getStorage().WalkDir(ctx, opts, grid.WriterToChannel(ctx, out))) } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index e85a1d481..e6322804c 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -54,7 +54,18 @@ var errDiskStale = errors.New("drive stale") // To abstract a disk over network. type storageRESTServer struct { - storage *xlStorageDiskIDCheck + storage atomic.Value +} + +func (s *storageRESTServer) getStorage() *xlStorageDiskIDCheck { + if s, ok := s.storage.Load().(*xlStorageDiskIDCheck); ok { + return s + } + return nil +} + +func (s *storageRESTServer) setStorage(xl *xlStorageDiskIDCheck) { + s.storage.Store(xl) } func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { @@ -120,7 +131,7 @@ func storageServerRequestValidate(r *http.Request) error { // IsAuthValid - To authenticate and verify the time difference. func (s *storageRESTServer) IsAuthValid(w http.ResponseWriter, r *http.Request) bool { - if s.storage == nil { + if s.getStorage() == nil { s.writeErrorResponse(w, errDiskNotFound) return false } @@ -152,7 +163,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool return true } - storedDiskID, err := s.storage.GetDiskID() + storedDiskID, err := s.getStorage().GetDiskID() if err != nil { s.writeErrorResponse(w, err) return false @@ -169,7 +180,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool // checkID - check if the disk-id in the request corresponds to the underlying disk. func (s *storageRESTServer) checkID(wantID string) bool { - if s.storage == nil { + if s.getStorage() == nil { return false } if wantID == "" { @@ -179,7 +190,7 @@ func (s *storageRESTServer) checkID(wantID string) bool { return true } - storedDiskID, err := s.storage.GetDiskID() + storedDiskID, err := s.getStorage().GetDiskID() if err != nil { return false } @@ -202,11 +213,11 @@ func (s *storageRESTServer) DiskInfoHandler(params *grid.MSS) (*DiskInfo, *grid. return nil, grid.NewRemoteErr(errDiskNotFound) } withMetrics := params.Get(storageRESTMetrics) == "true" - info, err := s.storage.DiskInfo(context.Background(), withMetrics) + info, err := s.getStorage().DiskInfo(context.Background(), withMetrics) if err != nil { info.Error = err.Error() } - info.Scanning = s.storage != nil && s.storage.storage != nil && atomic.LoadInt32(&s.storage.storage.scanning) > 0 + info.Scanning = s.getStorage().storage != nil && atomic.LoadInt32(&s.getStorage().storage.scanning) > 0 return &info, nil } @@ -236,7 +247,7 @@ func (s *storageRESTServer) NSScannerHandler(ctx context.Context, params *nsScan out <- resp } }() - ui, err := s.storage.NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode)) + ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode)) wg.Wait() if err != nil { return grid.NewRemoteErr(err) @@ -254,7 +265,7 @@ func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Reques return } volume := r.Form.Get(storageRESTVolume) - err := s.storage.MakeVol(r.Context(), volume) + err := s.getStorage().MakeVol(r.Context(), volume) if err != nil { s.writeErrorResponse(w, err) } @@ -266,7 +277,7 @@ func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Re return } volumes := strings.Split(r.Form.Get(storageRESTVolumes), ",") - err := s.storage.MakeVolBulk(r.Context(), volumes...) + err := s.getStorage().MakeVolBulk(r.Context(), volumes...) if err != nil { s.writeErrorResponse(w, err) } @@ -277,7 +288,7 @@ func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Reque if !s.IsValid(w, r) { return } - infos, err := s.storage.ListVols(r.Context()) + infos, err := s.getStorage().ListVols(r.Context()) if err != nil { s.writeErrorResponse(w, err) return @@ -293,7 +304,7 @@ func (s *storageRESTServer) StatVolHandler(params *grid.MSS) (*VolInfo, *grid.Re if !s.checkID(params.Get(storageRESTDiskID)) { return nil, grid.NewRemoteErr(errDiskNotFound) } - info, err := s.storage.StatVol(context.Background(), params.Get(storageRESTVolume)) + info, err := s.getStorage().StatVol(context.Background(), params.Get(storageRESTVolume)) if err != nil { return nil, grid.NewRemoteErr(err) } @@ -307,7 +318,7 @@ func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Requ } volume := r.Form.Get(storageRESTVolume) forceDelete := r.Form.Get(storageRESTForceDelete) == "true" - err := s.storage.DeleteVol(r.Context(), volume, forceDelete) + err := s.getStorage().DeleteVol(r.Context(), volume, forceDelete) if err != nil { s.writeErrorResponse(w, err) } @@ -327,7 +338,7 @@ func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Req s.writeErrorResponse(w, err) return } - err = s.storage.AppendFile(r.Context(), volume, filePath, buf) + err = s.getStorage().AppendFile(r.Context(), volume, filePath, buf) if err != nil { s.writeErrorResponse(w, err) } @@ -349,7 +360,7 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req } done, body := keepHTTPReqResponseAlive(w, r) - done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body)) + done(s.getStorage().CreateFile(r.Context(), volume, filePath, int64(fileSize), body)) } var storageDeleteVersionHandler = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { @@ -365,7 +376,7 @@ func (s *storageRESTServer) DeleteVersionHandler(p *DeleteVersionHandlerParams) filePath := p.FilePath forceDelMarker := p.ForceDelMarker - err := s.storage.DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker) + err := s.getStorage().DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker) return np, grid.NewRemoteErr(err) } @@ -391,7 +402,7 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, * return nil, grid.NewRemoteErr(err) } - fi, err := s.storage.ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) + fi, err := s.getStorage().ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) if err != nil { return nil, grid.NewRemoteErr(err) } @@ -416,7 +427,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re s.writeErrorResponse(w, err) return } - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) + fi, err := s.getStorage().ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) if err != nil { s.writeErrorResponse(w, err) return @@ -437,7 +448,7 @@ func (s *storageRESTServer) WriteMetadataHandler(p *MetadataHandlerParams) (np g volume := p.Volume filePath := p.FilePath - err := s.storage.WriteMetadata(context.Background(), volume, filePath, p.FI) + err := s.getStorage().WriteMetadata(context.Background(), volume, filePath, p.FI) return np, grid.NewRemoteErr(err) } @@ -453,7 +464,7 @@ func (s *storageRESTServer) UpdateMetadataHandler(p *MetadataHandlerParams) (gri volume := p.Volume filePath := p.FilePath - return grid.NewNPErr(s.storage.UpdateMetadata(context.Background(), volume, filePath, p.FI, p.UpdateOpts)) + return grid.NewNPErr(s.getStorage().UpdateMetadata(context.Background(), volume, filePath, p.FI, p.UpdateOpts)) } // WriteAllHandler - write to file all content. @@ -474,7 +485,7 @@ func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Reque s.writeErrorResponse(w, err) return } - err = s.storage.WriteAll(r.Context(), volume, filePath, tmp) + err = s.getStorage().WriteAll(r.Context(), volume, filePath, tmp) if err != nil { s.writeErrorResponse(w, err) } @@ -491,7 +502,7 @@ func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (grid. } volume := p.Volume filePath := p.FilePath - return grid.NewNPErr(s.storage.CheckParts(context.Background(), volume, filePath, p.FI)) + return grid.NewNPErr(s.getStorage().CheckParts(context.Background(), volume, filePath, p.FI)) } // ReadAllHandler - read all the contents of a file. @@ -502,7 +513,7 @@ func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Reques volume := r.Form.Get(storageRESTVolume) filePath := r.Form.Get(storageRESTFilePath) - buf, err := s.storage.ReadAll(r.Context(), volume, filePath) + buf, err := s.getStorage().ReadAll(r.Context(), volume, filePath) if err != nil { s.writeErrorResponse(w, err) return @@ -526,7 +537,7 @@ func (s *storageRESTServer) ReadXLHandler(w http.ResponseWriter, r *http.Request return } - rf, err := s.storage.ReadXL(r.Context(), volume, filePath, readData) + rf, err := s.getStorage().ReadXL(r.Context(), volume, filePath, readData) if err != nil { s.writeErrorResponse(w, err) return @@ -551,7 +562,7 @@ func (s *storageRESTServer) ReadXLHandlerWS(params *grid.MSS) (*RawFileInfo, *gr return nil, grid.NewRemoteErr(err) } - rf, err := s.storage.ReadXL(context.Background(), volume, filePath, readData) + rf, err := s.getStorage().ReadXL(context.Background(), volume, filePath, readData) if err != nil { return nil, grid.NewRemoteErr(err) } @@ -593,7 +604,7 @@ func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Reque } buf := make([]byte, length) defer metaDataPoolPut(buf) // Reuse if we can. - _, err = s.storage.ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier) + _, err = s.getStorage().ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier) if err != nil { s.writeErrorResponse(w, err) return @@ -622,7 +633,7 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http w.Header().Set(xhttp.ContentLength, strconv.Itoa(length)) - rc, err := s.storage.ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length)) + rc, err := s.getStorage().ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length)) if err != nil { s.writeErrorResponse(w, err) return @@ -667,7 +678,7 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques return } - entries, err := s.storage.ListDir(r.Context(), volume, dirPath, count) + entries, err := s.getStorage().ListDir(r.Context(), volume, dirPath, count) if err != nil { s.writeErrorResponse(w, err) return @@ -684,7 +695,7 @@ func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid. if !s.checkID(p.DiskID) { return grid.NewNPErr(errDiskNotFound) } - return grid.NewNPErr(s.storage.Delete(context.Background(), p.Volume, p.FilePath, p.Opts)) + return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts)) } // DeleteVersionsErrsResp - collection of delete errors @@ -722,7 +733,7 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http setEventStreamHeaders(w) encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) - errs := s.storage.DeleteVersions(r.Context(), volume, versions) + errs := s.getStorage().DeleteVersions(r.Context(), volume, versions) done(nil) for idx := range versions { if errs[idx] != nil { @@ -744,7 +755,7 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena return nil, grid.NewRemoteErr(errDiskNotFound) } - sign, err := s.storage.RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath) + sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath) resp := &RenameDataResp{ Signature: sign, } @@ -760,7 +771,7 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req srcFilePath := r.Form.Get(storageRESTSrcPath) dstVolume := r.Form.Get(storageRESTDstVolume) dstFilePath := r.Form.Get(storageRESTDstPath) - err := s.storage.RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath) + err := s.getStorage().RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath) if err != nil { s.writeErrorResponse(w, err) } @@ -776,7 +787,7 @@ func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r * if volume == "" || filePath == "" { return // Ignore } - keepHTTPResponseAlive(w)(s.storage.CleanAbandonedData(r.Context(), volume, filePath)) + keepHTTPResponseAlive(w)(s.getStorage().CleanAbandonedData(r.Context(), volume, filePath)) } // closeNotifier is itself a ReadCloser that will notify when either an error occurs or @@ -1150,7 +1161,7 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req setEventStreamHeaders(w) encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) - err := s.storage.VerifyFile(r.Context(), volume, filePath, fi) + err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi) done(nil) vresp := &VerifyFileResp{} if err != nil { @@ -1233,25 +1244,25 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) { hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. && sudo chmod u+rxw `", username) } if !exit { - logger.LogIf(GlobalContext, fmt.Errorf("Drive is not writable %s, %s", endpoint, hint)) + logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is not writable %s, %s", endpoint, hint), "log-fatal-errs") } else { logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend") } case errors.Is(err, errFaultyDisk): if !exit { - logger.LogIf(GlobalContext, fmt.Errorf("Drive is faulty at %s, please replace the drive - drive will be offline", endpoint)) + logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is faulty at %s, please replace the drive - drive will be offline", endpoint), "log-fatal-errs") } else { logger.Fatal(err, "Unable to initialize backend") } case errors.Is(err, errDiskFull): if !exit { - logger.LogIf(GlobalContext, fmt.Errorf("Drive is already full at %s, incoming I/O will fail - drive will be offline", endpoint)) + logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is already full at %s, incoming I/O will fail - drive will be offline", endpoint), "log-fatal-errs") } else { logger.Fatal(err, "Unable to initialize backend") } default: if !exit { - logger.LogIf(GlobalContext, fmt.Errorf("Drive returned an unexpected error at %s, please investigate - drive will be offline (%w)", endpoint, err)) + logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive %s returned an unexpected error: %w, please investigate - drive will be offline", endpoint, err), "log-fatal-errs") } else { logger.Fatal(err, "Unable to initialize backend") } @@ -1267,7 +1278,7 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) filePath := r.Form.Get(storageRESTFilePath) glob := r.Form.Get(storageRESTGlob) done := keepHTTPResponseAlive(w) - stats, err := s.storage.StatInfoFile(r.Context(), volume, filePath, glob == "true") + stats, err := s.getStorage().StatInfoFile(r.Context(), volume, filePath, glob == "true") done(err) if err != nil { return @@ -1314,52 +1325,29 @@ func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) mw.Flush() } }() - err = s.storage.ReadMultiple(r.Context(), req, responses) + err = s.getStorage().ReadMultiple(r.Context(), req, responses) wg.Wait() rw.CloseWithError(err) } // registerStorageRESTHandlers - register storage rpc router. func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools, gm *grid.Manager) { - storageDisks := make([][]*xlStorage, len(endpointServerPools)) - for poolIdx, ep := range endpointServerPools { - storageDisks[poolIdx] = make([]*xlStorage, len(ep.Endpoints)) - } - var wg sync.WaitGroup - for poolIdx, ep := range endpointServerPools { - for setIdx, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { - continue - } - wg.Add(1) - go func(poolIdx, setIdx int, endpoint Endpoint) { - defer wg.Done() - var err error - storageDisks[poolIdx][setIdx], err = newXLStorage(endpoint, false) - if err != nil { - // if supported errors don't fail, we proceed to - // printing message and moving forward. - logFatalErrs(err, endpoint, false) - } - }(poolIdx, setIdx, endpoint) - } - } - wg.Wait() - h := func(f http.HandlerFunc) http.HandlerFunc { return collectInternodeStats(httpTraceHdrs(f)) } - registered := 0 - for _, setDisks := range storageDisks { - for _, storage := range setDisks { - if storage == nil { + driveHandlers := make([][]*storageRESTServer, len(endpointServerPools)) + for pool, serverPool := range endpointServerPools { + driveHandlers[pool] = make([]*storageRESTServer, len(serverPool.Endpoints)) + } + + for pool, serverPool := range endpointServerPools { + for set, endpoint := range serverPool.Endpoints { + if !endpoint.IsLocal { continue } - endpoint := storage.Endpoint() - - server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage, true)} - server.storage.SetDiskID(storage.diskID) + driveHandlers[pool][set] = &storageRESTServer{} + server := driveHandlers[pool][set] subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter() @@ -1402,17 +1390,39 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin Handle: server.WalkDirHandler, OutCapacity: 1, }), "unable to register handler") - registered++ } } - if registered == 0 { - // Register a dummy handler so remote calls can go out. - logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerWalkDir, grid.StreamHandler{ - Subroute: fmt.Sprintf("__dummy__%d", time.Now().UnixNano()), - Handle: func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *grid.RemoteErr { - return grid.NewRemoteErr(errDiskNotFound) - }, - OutCapacity: 1, - }), "unable to register handler") + + for pool, serverPool := range endpointServerPools { + for set, endpoint := range serverPool.Endpoints { + if !endpoint.IsLocal { + continue + } + createStorage := func(pool, set int, endpoint Endpoint) bool { + xl, err := newXLStorage(endpoint, false) + if err != nil { + // if supported errors don't fail, we proceed to + // printing message and moving forward. + logFatalErrs(err, endpoint, false) + return false + } + storage := newXLStorageDiskIDCheck(xl, true) + storage.SetDiskID(xl.diskID) + driveHandlers[pool][set].setStorage(storage) + return true + } + if createStorage(pool, set, endpoint) { + continue + } + // Start async goroutine to create storage. + go func(pool, set int, endpoint Endpoint) { + for { + time.Sleep(time.Minute) + if createStorage(pool, set, endpoint) { + return + } + } + }(pool, set, endpoint) + } } } diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index a1b5d5489..0eeebba07 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -20,9 +20,12 @@ package cmd import ( "bytes" "context" + "errors" + "math/rand" "reflect" "runtime" "testing" + "time" "github.com/minio/minio/internal/grid" xnet "github.com/minio/pkg/v2/net" @@ -480,6 +483,14 @@ func newStorageRESTHTTPServerClient(t testing.TB) *storageRESTClient { t.Fatal(err) } + for { + _, err := restClient.DiskInfo(context.Background(), false) + if err == nil || errors.Is(err, errUnformattedDisk) { + break + } + time.Sleep(time.Duration(rand.Float64() * float64(100*time.Millisecond))) + } + return restClient }