mirror of
https://github.com/minio/minio.git
synced 2025-03-27 07:50:58 -04:00
Do not require restart when a disk is unreachable during node boot (#18576)
A disk that is not able to initialize when an instance is started will never have a handler registered, which means a user will need to restart the node after fixing the disk; This will also prevent showing the wrong 'upgrade is needed.' error message in that case. When the disk is still failing, print an error every 30 minutes; Disk reconnection will be retried every 30 seconds. Co-authored-by: Anis Elleuch <anis@min.io>
This commit is contained in:
parent
860fc200b0
commit
961b0b524e
@ -428,7 +428,11 @@ func (s *storageRESTServer) WalkDirHandler(ctx context.Context, payload []byte,
|
||||
return grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
if !s.checkID(opts.DiskID) {
|
||||
return grid.NewRemoteErr(errDiskNotFound)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
return grid.NewRemoteErr(s.storage.WalkDir(ctx, opts, grid.WriterToChannel(ctx, out)))
|
||||
return grid.NewRemoteErr(s.getStorage().WalkDir(ctx, opts, grid.WriterToChannel(ctx, out)))
|
||||
}
|
||||
|
@ -54,7 +54,18 @@ var errDiskStale = errors.New("drive stale")
|
||||
|
||||
// To abstract a disk over network.
|
||||
type storageRESTServer struct {
|
||||
storage *xlStorageDiskIDCheck
|
||||
storage atomic.Value
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) getStorage() *xlStorageDiskIDCheck {
|
||||
if s, ok := s.storage.Load().(*xlStorageDiskIDCheck); ok {
|
||||
return s
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) setStorage(xl *xlStorageDiskIDCheck) {
|
||||
s.storage.Store(xl)
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
|
||||
@ -120,7 +131,7 @@ func storageServerRequestValidate(r *http.Request) error {
|
||||
|
||||
// IsAuthValid - To authenticate and verify the time difference.
|
||||
func (s *storageRESTServer) IsAuthValid(w http.ResponseWriter, r *http.Request) bool {
|
||||
if s.storage == nil {
|
||||
if s.getStorage() == nil {
|
||||
s.writeErrorResponse(w, errDiskNotFound)
|
||||
return false
|
||||
}
|
||||
@ -152,7 +163,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool
|
||||
return true
|
||||
}
|
||||
|
||||
storedDiskID, err := s.storage.GetDiskID()
|
||||
storedDiskID, err := s.getStorage().GetDiskID()
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return false
|
||||
@ -169,7 +180,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool
|
||||
|
||||
// checkID - check if the disk-id in the request corresponds to the underlying disk.
|
||||
func (s *storageRESTServer) checkID(wantID string) bool {
|
||||
if s.storage == nil {
|
||||
if s.getStorage() == nil {
|
||||
return false
|
||||
}
|
||||
if wantID == "" {
|
||||
@ -179,7 +190,7 @@ func (s *storageRESTServer) checkID(wantID string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
storedDiskID, err := s.storage.GetDiskID()
|
||||
storedDiskID, err := s.getStorage().GetDiskID()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@ -202,11 +213,11 @@ func (s *storageRESTServer) DiskInfoHandler(params *grid.MSS) (*DiskInfo, *grid.
|
||||
return nil, grid.NewRemoteErr(errDiskNotFound)
|
||||
}
|
||||
withMetrics := params.Get(storageRESTMetrics) == "true"
|
||||
info, err := s.storage.DiskInfo(context.Background(), withMetrics)
|
||||
info, err := s.getStorage().DiskInfo(context.Background(), withMetrics)
|
||||
if err != nil {
|
||||
info.Error = err.Error()
|
||||
}
|
||||
info.Scanning = s.storage != nil && s.storage.storage != nil && atomic.LoadInt32(&s.storage.storage.scanning) > 0
|
||||
info.Scanning = s.getStorage().storage != nil && atomic.LoadInt32(&s.getStorage().storage.scanning) > 0
|
||||
return &info, nil
|
||||
}
|
||||
|
||||
@ -236,7 +247,7 @@ func (s *storageRESTServer) NSScannerHandler(ctx context.Context, params *nsScan
|
||||
out <- resp
|
||||
}
|
||||
}()
|
||||
ui, err := s.storage.NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode))
|
||||
ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode))
|
||||
wg.Wait()
|
||||
if err != nil {
|
||||
return grid.NewRemoteErr(err)
|
||||
@ -254,7 +265,7 @@ func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
volume := r.Form.Get(storageRESTVolume)
|
||||
err := s.storage.MakeVol(r.Context(), volume)
|
||||
err := s.getStorage().MakeVol(r.Context(), volume)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
@ -266,7 +277,7 @@ func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
volumes := strings.Split(r.Form.Get(storageRESTVolumes), ",")
|
||||
err := s.storage.MakeVolBulk(r.Context(), volumes...)
|
||||
err := s.getStorage().MakeVolBulk(r.Context(), volumes...)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
@ -277,7 +288,7 @@ func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Reque
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
infos, err := s.storage.ListVols(r.Context())
|
||||
infos, err := s.getStorage().ListVols(r.Context())
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -293,7 +304,7 @@ func (s *storageRESTServer) StatVolHandler(params *grid.MSS) (*VolInfo, *grid.Re
|
||||
if !s.checkID(params.Get(storageRESTDiskID)) {
|
||||
return nil, grid.NewRemoteErr(errDiskNotFound)
|
||||
}
|
||||
info, err := s.storage.StatVol(context.Background(), params.Get(storageRESTVolume))
|
||||
info, err := s.getStorage().StatVol(context.Background(), params.Get(storageRESTVolume))
|
||||
if err != nil {
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
@ -307,7 +318,7 @@ func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
volume := r.Form.Get(storageRESTVolume)
|
||||
forceDelete := r.Form.Get(storageRESTForceDelete) == "true"
|
||||
err := s.storage.DeleteVol(r.Context(), volume, forceDelete)
|
||||
err := s.getStorage().DeleteVol(r.Context(), volume, forceDelete)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
@ -327,7 +338,7 @@ func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Req
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
err = s.storage.AppendFile(r.Context(), volume, filePath, buf)
|
||||
err = s.getStorage().AppendFile(r.Context(), volume, filePath, buf)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
@ -349,7 +360,7 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
|
||||
done, body := keepHTTPReqResponseAlive(w, r)
|
||||
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body))
|
||||
done(s.getStorage().CreateFile(r.Context(), volume, filePath, int64(fileSize), body))
|
||||
}
|
||||
|
||||
var storageDeleteVersionHandler = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams {
|
||||
@ -365,7 +376,7 @@ func (s *storageRESTServer) DeleteVersionHandler(p *DeleteVersionHandlerParams)
|
||||
filePath := p.FilePath
|
||||
forceDelMarker := p.ForceDelMarker
|
||||
|
||||
err := s.storage.DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker)
|
||||
err := s.getStorage().DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker)
|
||||
return np, grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
@ -391,7 +402,7 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, *
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
fi, err := s.storage.ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
|
||||
fi, err := s.getStorage().ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
|
||||
if err != nil {
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
@ -416,7 +427,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
|
||||
fi, err := s.getStorage().ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -437,7 +448,7 @@ func (s *storageRESTServer) WriteMetadataHandler(p *MetadataHandlerParams) (np g
|
||||
volume := p.Volume
|
||||
filePath := p.FilePath
|
||||
|
||||
err := s.storage.WriteMetadata(context.Background(), volume, filePath, p.FI)
|
||||
err := s.getStorage().WriteMetadata(context.Background(), volume, filePath, p.FI)
|
||||
return np, grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
@ -453,7 +464,7 @@ func (s *storageRESTServer) UpdateMetadataHandler(p *MetadataHandlerParams) (gri
|
||||
volume := p.Volume
|
||||
filePath := p.FilePath
|
||||
|
||||
return grid.NewNPErr(s.storage.UpdateMetadata(context.Background(), volume, filePath, p.FI, p.UpdateOpts))
|
||||
return grid.NewNPErr(s.getStorage().UpdateMetadata(context.Background(), volume, filePath, p.FI, p.UpdateOpts))
|
||||
}
|
||||
|
||||
// WriteAllHandler - write to file all content.
|
||||
@ -474,7 +485,7 @@ func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Reque
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
err = s.storage.WriteAll(r.Context(), volume, filePath, tmp)
|
||||
err = s.getStorage().WriteAll(r.Context(), volume, filePath, tmp)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
@ -491,7 +502,7 @@ func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (grid.
|
||||
}
|
||||
volume := p.Volume
|
||||
filePath := p.FilePath
|
||||
return grid.NewNPErr(s.storage.CheckParts(context.Background(), volume, filePath, p.FI))
|
||||
return grid.NewNPErr(s.getStorage().CheckParts(context.Background(), volume, filePath, p.FI))
|
||||
}
|
||||
|
||||
// ReadAllHandler - read all the contents of a file.
|
||||
@ -502,7 +513,7 @@ func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Reques
|
||||
volume := r.Form.Get(storageRESTVolume)
|
||||
filePath := r.Form.Get(storageRESTFilePath)
|
||||
|
||||
buf, err := s.storage.ReadAll(r.Context(), volume, filePath)
|
||||
buf, err := s.getStorage().ReadAll(r.Context(), volume, filePath)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -526,7 +537,7 @@ func (s *storageRESTServer) ReadXLHandler(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
rf, err := s.storage.ReadXL(r.Context(), volume, filePath, readData)
|
||||
rf, err := s.getStorage().ReadXL(r.Context(), volume, filePath, readData)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -551,7 +562,7 @@ func (s *storageRESTServer) ReadXLHandlerWS(params *grid.MSS) (*RawFileInfo, *gr
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
|
||||
rf, err := s.storage.ReadXL(context.Background(), volume, filePath, readData)
|
||||
rf, err := s.getStorage().ReadXL(context.Background(), volume, filePath, readData)
|
||||
if err != nil {
|
||||
return nil, grid.NewRemoteErr(err)
|
||||
}
|
||||
@ -593,7 +604,7 @@ func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
buf := make([]byte, length)
|
||||
defer metaDataPoolPut(buf) // Reuse if we can.
|
||||
_, err = s.storage.ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier)
|
||||
_, err = s.getStorage().ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -622,7 +633,7 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
|
||||
|
||||
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
||||
|
||||
rc, err := s.storage.ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length))
|
||||
rc, err := s.getStorage().ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length))
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -667,7 +678,7 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
|
||||
entries, err := s.storage.ListDir(r.Context(), volume, dirPath, count)
|
||||
entries, err := s.getStorage().ListDir(r.Context(), volume, dirPath, count)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -684,7 +695,7 @@ func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.
|
||||
if !s.checkID(p.DiskID) {
|
||||
return grid.NewNPErr(errDiskNotFound)
|
||||
}
|
||||
return grid.NewNPErr(s.storage.Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
|
||||
return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
|
||||
}
|
||||
|
||||
// DeleteVersionsErrsResp - collection of delete errors
|
||||
@ -722,7 +733,7 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
errs := s.storage.DeleteVersions(r.Context(), volume, versions)
|
||||
errs := s.getStorage().DeleteVersions(r.Context(), volume, versions)
|
||||
done(nil)
|
||||
for idx := range versions {
|
||||
if errs[idx] != nil {
|
||||
@ -744,7 +755,7 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena
|
||||
return nil, grid.NewRemoteErr(errDiskNotFound)
|
||||
}
|
||||
|
||||
sign, err := s.storage.RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath)
|
||||
sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath)
|
||||
resp := &RenameDataResp{
|
||||
Signature: sign,
|
||||
}
|
||||
@ -760,7 +771,7 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
|
||||
srcFilePath := r.Form.Get(storageRESTSrcPath)
|
||||
dstVolume := r.Form.Get(storageRESTDstVolume)
|
||||
dstFilePath := r.Form.Get(storageRESTDstPath)
|
||||
err := s.storage.RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath)
|
||||
err := s.getStorage().RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
@ -776,7 +787,7 @@ func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r *
|
||||
if volume == "" || filePath == "" {
|
||||
return // Ignore
|
||||
}
|
||||
keepHTTPResponseAlive(w)(s.storage.CleanAbandonedData(r.Context(), volume, filePath))
|
||||
keepHTTPResponseAlive(w)(s.getStorage().CleanAbandonedData(r.Context(), volume, filePath))
|
||||
}
|
||||
|
||||
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
|
||||
@ -1150,7 +1161,7 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
err := s.storage.VerifyFile(r.Context(), volume, filePath, fi)
|
||||
err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi)
|
||||
done(nil)
|
||||
vresp := &VerifyFileResp{}
|
||||
if err != nil {
|
||||
@ -1233,25 +1244,25 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
||||
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
|
||||
}
|
||||
if !exit {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Drive is not writable %s, %s", endpoint, hint))
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is not writable %s, %s", endpoint, hint), "log-fatal-errs")
|
||||
} else {
|
||||
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
||||
}
|
||||
case errors.Is(err, errFaultyDisk):
|
||||
if !exit {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Drive is faulty at %s, please replace the drive - drive will be offline", endpoint))
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is faulty at %s, please replace the drive - drive will be offline", endpoint), "log-fatal-errs")
|
||||
} else {
|
||||
logger.Fatal(err, "Unable to initialize backend")
|
||||
}
|
||||
case errors.Is(err, errDiskFull):
|
||||
if !exit {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Drive is already full at %s, incoming I/O will fail - drive will be offline", endpoint))
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is already full at %s, incoming I/O will fail - drive will be offline", endpoint), "log-fatal-errs")
|
||||
} else {
|
||||
logger.Fatal(err, "Unable to initialize backend")
|
||||
}
|
||||
default:
|
||||
if !exit {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Drive returned an unexpected error at %s, please investigate - drive will be offline (%w)", endpoint, err))
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive %s returned an unexpected error: %w, please investigate - drive will be offline", endpoint, err), "log-fatal-errs")
|
||||
} else {
|
||||
logger.Fatal(err, "Unable to initialize backend")
|
||||
}
|
||||
@ -1267,7 +1278,7 @@ func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request)
|
||||
filePath := r.Form.Get(storageRESTFilePath)
|
||||
glob := r.Form.Get(storageRESTGlob)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
stats, err := s.storage.StatInfoFile(r.Context(), volume, filePath, glob == "true")
|
||||
stats, err := s.getStorage().StatInfoFile(r.Context(), volume, filePath, glob == "true")
|
||||
done(err)
|
||||
if err != nil {
|
||||
return
|
||||
@ -1314,52 +1325,29 @@ func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request)
|
||||
mw.Flush()
|
||||
}
|
||||
}()
|
||||
err = s.storage.ReadMultiple(r.Context(), req, responses)
|
||||
err = s.getStorage().ReadMultiple(r.Context(), req, responses)
|
||||
wg.Wait()
|
||||
rw.CloseWithError(err)
|
||||
}
|
||||
|
||||
// registerStorageRESTHandlers - register storage rpc router.
|
||||
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools, gm *grid.Manager) {
|
||||
storageDisks := make([][]*xlStorage, len(endpointServerPools))
|
||||
for poolIdx, ep := range endpointServerPools {
|
||||
storageDisks[poolIdx] = make([]*xlStorage, len(ep.Endpoints))
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for poolIdx, ep := range endpointServerPools {
|
||||
for setIdx, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(poolIdx, setIdx int, endpoint Endpoint) {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
storageDisks[poolIdx][setIdx], err = newXLStorage(endpoint, false)
|
||||
if err != nil {
|
||||
// if supported errors don't fail, we proceed to
|
||||
// printing message and moving forward.
|
||||
logFatalErrs(err, endpoint, false)
|
||||
}
|
||||
}(poolIdx, setIdx, endpoint)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
h := func(f http.HandlerFunc) http.HandlerFunc {
|
||||
return collectInternodeStats(httpTraceHdrs(f))
|
||||
}
|
||||
|
||||
registered := 0
|
||||
for _, setDisks := range storageDisks {
|
||||
for _, storage := range setDisks {
|
||||
if storage == nil {
|
||||
driveHandlers := make([][]*storageRESTServer, len(endpointServerPools))
|
||||
for pool, serverPool := range endpointServerPools {
|
||||
driveHandlers[pool] = make([]*storageRESTServer, len(serverPool.Endpoints))
|
||||
}
|
||||
|
||||
for pool, serverPool := range endpointServerPools {
|
||||
for set, endpoint := range serverPool.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
endpoint := storage.Endpoint()
|
||||
|
||||
server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage, true)}
|
||||
server.storage.SetDiskID(storage.diskID)
|
||||
driveHandlers[pool][set] = &storageRESTServer{}
|
||||
server := driveHandlers[pool][set]
|
||||
|
||||
subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter()
|
||||
|
||||
@ -1402,17 +1390,39 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
Handle: server.WalkDirHandler,
|
||||
OutCapacity: 1,
|
||||
}), "unable to register handler")
|
||||
registered++
|
||||
}
|
||||
}
|
||||
if registered == 0 {
|
||||
// Register a dummy handler so remote calls can go out.
|
||||
logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerWalkDir, grid.StreamHandler{
|
||||
Subroute: fmt.Sprintf("__dummy__%d", time.Now().UnixNano()),
|
||||
Handle: func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *grid.RemoteErr {
|
||||
return grid.NewRemoteErr(errDiskNotFound)
|
||||
},
|
||||
OutCapacity: 1,
|
||||
}), "unable to register handler")
|
||||
|
||||
for pool, serverPool := range endpointServerPools {
|
||||
for set, endpoint := range serverPool.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
createStorage := func(pool, set int, endpoint Endpoint) bool {
|
||||
xl, err := newXLStorage(endpoint, false)
|
||||
if err != nil {
|
||||
// if supported errors don't fail, we proceed to
|
||||
// printing message and moving forward.
|
||||
logFatalErrs(err, endpoint, false)
|
||||
return false
|
||||
}
|
||||
storage := newXLStorageDiskIDCheck(xl, true)
|
||||
storage.SetDiskID(xl.diskID)
|
||||
driveHandlers[pool][set].setStorage(storage)
|
||||
return true
|
||||
}
|
||||
if createStorage(pool, set, endpoint) {
|
||||
continue
|
||||
}
|
||||
// Start async goroutine to create storage.
|
||||
go func(pool, set int, endpoint Endpoint) {
|
||||
for {
|
||||
time.Sleep(time.Minute)
|
||||
if createStorage(pool, set, endpoint) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(pool, set, endpoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,9 +20,12 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/grid"
|
||||
xnet "github.com/minio/pkg/v2/net"
|
||||
@ -480,6 +483,14 @@ func newStorageRESTHTTPServerClient(t testing.TB) *storageRESTClient {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for {
|
||||
_, err := restClient.DiskInfo(context.Background(), false)
|
||||
if err == nil || errors.Is(err, errUnformattedDisk) {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(rand.Float64() * float64(100*time.Millisecond)))
|
||||
}
|
||||
|
||||
return restClient
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user