mirror of
https://github.com/minio/minio.git
synced 2025-01-27 06:33:18 -05:00
fix: do not fail upon faulty/non-writable drives
gracefully start the server, if there are other drives available - print enough information for administrator to notice the errors in console. Bonus: for really large streams use larger buffer for writes.
This commit is contained in:
parent
d84261aa6d
commit
4d876d03e8
@ -92,8 +92,10 @@ func mkdirAll(dirPath string, mode os.FileMode) (err error) {
|
|||||||
// directory" error message. Handle this specifically here.
|
// directory" error message. Handle this specifically here.
|
||||||
return errFileAccessDenied
|
return errFileAccessDenied
|
||||||
}
|
}
|
||||||
|
return osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reliably retries os.MkdirAll if for some reason os.MkdirAll returns
|
// Reliably retries os.MkdirAll if for some reason os.MkdirAll returns
|
||||||
|
@ -120,10 +120,10 @@ func formatErasureCleanupTmpLocalEndpoints(endpoints Endpoints) error {
|
|||||||
tmpOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
|
tmpOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
|
||||||
if err := renameAll(pathJoin(epPath, minioMetaTmpBucket),
|
if err := renameAll(pathJoin(epPath, minioMetaTmpBucket),
|
||||||
tmpOld); err != nil && err != errFileNotFound {
|
tmpOld); err != nil && err != errFileNotFound {
|
||||||
return fmt.Errorf("unable to rename (%s -> %s) %w",
|
logger.LogIf(GlobalContext, fmt.Errorf("unable to rename (%s -> %s) %w, drive may be faulty please investigate",
|
||||||
pathJoin(epPath, minioMetaTmpBucket),
|
pathJoin(epPath, minioMetaTmpBucket),
|
||||||
tmpOld,
|
tmpOld,
|
||||||
osErrToFileErr(err))
|
osErrToFileErr(err)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Renames and schedules for puring all bucket metacache.
|
// Renames and schedules for puring all bucket metacache.
|
||||||
@ -133,9 +133,9 @@ func formatErasureCleanupTmpLocalEndpoints(endpoints Endpoints) error {
|
|||||||
go removeAll(pathJoin(epPath, minioMetaTmpBucket+"-old"))
|
go removeAll(pathJoin(epPath, minioMetaTmpBucket+"-old"))
|
||||||
|
|
||||||
if err := mkdirAll(pathJoin(epPath, minioMetaTmpBucket), 0777); err != nil {
|
if err := mkdirAll(pathJoin(epPath, minioMetaTmpBucket), 0777); err != nil {
|
||||||
return fmt.Errorf("unable to create (%s) %w",
|
logger.LogIf(GlobalContext, fmt.Errorf("unable to create (%s) %w, drive may be faulty please investigate",
|
||||||
pathJoin(epPath, minioMetaTmpBucket),
|
pathJoin(epPath, minioMetaTmpBucket),
|
||||||
err)
|
err))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}, index)
|
}, index)
|
||||||
@ -224,11 +224,10 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
|
|
||||||
for i, err := range errs {
|
for i, err := range errs {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != errDiskNotFound {
|
if err == errDiskNotFound && retryCount >= 5 {
|
||||||
return nil, nil, fmt.Errorf("Disk %s: %w", endpoints[i], err)
|
logger.Info("Unable to connect to %s: %v", endpoints[i], isServerResolvable(endpoints[i], time.Second))
|
||||||
}
|
} else {
|
||||||
if retryCount >= 5 {
|
logger.Info("Unable to use the drive %s: %v", endpoints[i], err)
|
||||||
logger.Info("Unable to connect to %s: %v\n", endpoints[i], isServerResolvable(endpoints[i], time.Second))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -292,6 +291,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
// the disk UUID association. Below function is called to handle and fix
|
// the disk UUID association. Below function is called to handle and fix
|
||||||
// this regression, for more info refer https://github.com/minio/minio/issues/5667
|
// this regression, for more info refer https://github.com/minio/minio/issues/5667
|
||||||
if err = fixFormatErasureV3(storageDisks, endpoints, formatConfigs); err != nil {
|
if err = fixFormatErasureV3(storageDisks, endpoints, formatConfigs); err != nil {
|
||||||
|
logger.LogIf(GlobalContext, err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -302,6 +302,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
|
|
||||||
format, err = getFormatErasureInQuorum(formatConfigs)
|
format, err = getFormatErasureInQuorum(formatConfigs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.LogIf(GlobalContext, err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,6 +312,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
return nil, nil, errNotFirstDisk
|
return nil, nil, errNotFirstDisk
|
||||||
}
|
}
|
||||||
if err = formatErasureFixDeploymentID(endpoints, storageDisks, format); err != nil {
|
if err = formatErasureFixDeploymentID(endpoints, storageDisks, format); err != nil {
|
||||||
|
logger.LogIf(GlobalContext, err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -318,6 +320,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
globalDeploymentID = format.ID
|
globalDeploymentID = format.ID
|
||||||
|
|
||||||
if err = formatErasureFixLocalDeploymentID(endpoints, storageDisks, format); err != nil {
|
if err = formatErasureFixLocalDeploymentID(endpoints, storageDisks, format); err != nil {
|
||||||
|
logger.LogIf(GlobalContext, err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -941,9 +941,10 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
|
|||||||
// now, need to revist the overall erroring structure here.
|
// now, need to revist the overall erroring structure here.
|
||||||
// Do not like it :-(
|
// Do not like it :-(
|
||||||
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
||||||
if errors.Is(err, errMinDiskSize) {
|
switch {
|
||||||
|
case errors.Is(err, errMinDiskSize):
|
||||||
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend")
|
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend")
|
||||||
} else if errors.Is(err, errUnsupportedDisk) {
|
case errors.Is(err, errUnsupportedDisk):
|
||||||
var hint string
|
var hint string
|
||||||
if endpoint.URL != nil {
|
if endpoint.URL != nil {
|
||||||
hint = fmt.Sprintf("Disk '%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
|
hint = fmt.Sprintf("Disk '%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
|
||||||
@ -951,7 +952,7 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
|||||||
hint = "Disks do not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support"
|
hint = "Disks do not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support"
|
||||||
}
|
}
|
||||||
logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
|
logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
|
||||||
} else if errors.Is(err, errDiskNotDir) {
|
case errors.Is(err, errDiskNotDir):
|
||||||
var hint string
|
var hint string
|
||||||
if endpoint.URL != nil {
|
if endpoint.URL != nil {
|
||||||
hint = fmt.Sprintf("Disk '%s' is not a directory, MinIO erasure coding needs a directory", endpoint.Path)
|
hint = fmt.Sprintf("Disk '%s' is not a directory, MinIO erasure coding needs a directory", endpoint.Path)
|
||||||
@ -959,7 +960,7 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
|||||||
hint = "Disks are not directories, MinIO erasure coding needs directories"
|
hint = "Disks are not directories, MinIO erasure coding needs directories"
|
||||||
}
|
}
|
||||||
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
||||||
} else if errors.Is(err, errFileAccessDenied) {
|
case errors.Is(err, errFileAccessDenied):
|
||||||
// Show a descriptive error with a hint about how to fix it.
|
// Show a descriptive error with a hint about how to fix it.
|
||||||
var username string
|
var username string
|
||||||
if u, err := user.Current(); err == nil {
|
if u, err := user.Current(); err == nil {
|
||||||
@ -974,22 +975,26 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
|||||||
} else {
|
} else {
|
||||||
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
|
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
|
||||||
}
|
}
|
||||||
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
if !exit {
|
||||||
} else if errors.Is(err, errFaultyDisk) {
|
logger.LogIf(GlobalContext, fmt.Errorf("disk is not writable %s, %s", endpoint, hint))
|
||||||
|
} else {
|
||||||
|
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
||||||
|
}
|
||||||
|
case errors.Is(err, errFaultyDisk):
|
||||||
if !exit {
|
if !exit {
|
||||||
logger.LogIf(GlobalContext, fmt.Errorf("disk is faulty at %s, please replace the drive - disk will be offline", endpoint))
|
logger.LogIf(GlobalContext, fmt.Errorf("disk is faulty at %s, please replace the drive - disk will be offline", endpoint))
|
||||||
} else {
|
} else {
|
||||||
logger.Fatal(err, "Unable to initialize backend")
|
logger.Fatal(err, "Unable to initialize backend")
|
||||||
}
|
}
|
||||||
} else if errors.Is(err, errDiskFull) {
|
case errors.Is(err, errDiskFull):
|
||||||
if !exit {
|
if !exit {
|
||||||
logger.LogIf(GlobalContext, fmt.Errorf("disk is already full at %s, incoming I/O will fail - disk will be offline", endpoint))
|
logger.LogIf(GlobalContext, fmt.Errorf("disk is already full at %s, incoming I/O will fail - disk will be offline", endpoint))
|
||||||
} else {
|
} else {
|
||||||
logger.Fatal(err, "Unable to initialize backend")
|
logger.Fatal(err, "Unable to initialize backend")
|
||||||
}
|
}
|
||||||
} else {
|
default:
|
||||||
if !exit {
|
if !exit {
|
||||||
logger.LogIf(GlobalContext, fmt.Errorf("disk returned an unexpected error at %s, please investigate - disk will be offline", endpoint))
|
logger.LogIf(GlobalContext, fmt.Errorf("disk returned an unexpected error at %s, please investigate - disk will be offline (%w)", endpoint, err))
|
||||||
} else {
|
} else {
|
||||||
logger.Fatal(err, "Unable to initialize backend")
|
logger.Fatal(err, "Unable to initialize backend")
|
||||||
}
|
}
|
||||||
|
@ -52,9 +52,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
nullVersionID = "null"
|
nullVersionID = "null"
|
||||||
blockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects.
|
blockSizeSmall = 128 * humanize.KiByte // Default r/w block size for smaller objects.
|
||||||
blockSizeSmall = 128 * humanize.KiByte // Default r/w block size for smaller objects.
|
blockSizeLarge = 2 * humanize.MiByte // Default r/w block size for larger objects.
|
||||||
|
blockSizeReallyLarge = 4 * humanize.MiByte // Default write block size for objects per shard >= 64MiB
|
||||||
|
|
||||||
// On regular files bigger than this;
|
// On regular files bigger than this;
|
||||||
readAheadSize = 16 << 20
|
readAheadSize = 16 << 20
|
||||||
@ -63,6 +64,9 @@ const (
|
|||||||
// Size of each buffer.
|
// Size of each buffer.
|
||||||
readAheadBufSize = 1 << 20
|
readAheadBufSize = 1 << 20
|
||||||
|
|
||||||
|
// Really large streams threshold per shard.
|
||||||
|
reallyLargeFileThreshold = 64 * humanize.MiByte // Optimized for HDDs
|
||||||
|
|
||||||
// Small file threshold below which data accompanies metadata from storage layer.
|
// Small file threshold below which data accompanies metadata from storage layer.
|
||||||
smallFileThreshold = 128 * humanize.KiByte // Optimized for NVMe/SSDs
|
smallFileThreshold = 128 * humanize.KiByte // Optimized for NVMe/SSDs
|
||||||
// For hardrives it is possible to set this to a lower value to avoid any
|
// For hardrives it is possible to set this to a lower value to avoid any
|
||||||
@ -101,8 +105,9 @@ type xlStorage struct {
|
|||||||
|
|
||||||
globalSync bool
|
globalSync bool
|
||||||
|
|
||||||
poolLarge sync.Pool
|
poolReallyLarge sync.Pool
|
||||||
poolSmall sync.Pool
|
poolLarge sync.Pool
|
||||||
|
poolSmall sync.Pool
|
||||||
|
|
||||||
rootDisk bool
|
rootDisk bool
|
||||||
|
|
||||||
@ -179,7 +184,7 @@ func getValidPath(path string) (string, error) {
|
|||||||
}
|
}
|
||||||
if osIsNotExist(err) {
|
if osIsNotExist(err) {
|
||||||
// Disk not found create it.
|
// Disk not found create it.
|
||||||
if err = reliableMkdirAll(path, 0777); err != nil {
|
if err = mkdirAll(path, 0777); err != nil {
|
||||||
return path, err
|
return path, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,6 +256,12 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
|
|||||||
p := &xlStorage{
|
p := &xlStorage{
|
||||||
diskPath: path,
|
diskPath: path,
|
||||||
endpoint: ep,
|
endpoint: ep,
|
||||||
|
poolReallyLarge: sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
b := disk.AlignedBlock(blockSizeReallyLarge)
|
||||||
|
return &b
|
||||||
|
},
|
||||||
|
},
|
||||||
poolLarge: sync.Pool{
|
poolLarge: sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
b := disk.AlignedBlock(blockSizeLarge)
|
b := disk.AlignedBlock(blockSizeLarge)
|
||||||
@ -628,7 +639,7 @@ func (s *xlStorage) MakeVol(ctx context.Context, volume string) error {
|
|||||||
// Volume does not exist we proceed to create.
|
// Volume does not exist we proceed to create.
|
||||||
if osIsNotExist(err) {
|
if osIsNotExist(err) {
|
||||||
// Make a volume entry, with mode 0777 mkdir honors system umask.
|
// Make a volume entry, with mode 0777 mkdir honors system umask.
|
||||||
err = reliableMkdirAll(volumeDir, 0777)
|
err = mkdirAll(volumeDir, 0777)
|
||||||
}
|
}
|
||||||
if osIsPermission(err) {
|
if osIsPermission(err) {
|
||||||
return errDiskAccessDenied
|
return errDiskAccessDenied
|
||||||
@ -1540,8 +1551,15 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
|||||||
w.Close()
|
w.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
bufp := s.poolLarge.Get().(*[]byte)
|
var bufp *[]byte
|
||||||
defer s.poolLarge.Put(bufp)
|
if fileSize > 0 && fileSize >= reallyLargeFileThreshold {
|
||||||
|
// use a larger 4MiB buffer for really large streams.
|
||||||
|
bufp = s.poolReallyLarge.Get().(*[]byte)
|
||||||
|
defer s.poolReallyLarge.Put(bufp)
|
||||||
|
} else {
|
||||||
|
bufp = s.poolLarge.Get().(*[]byte)
|
||||||
|
defer s.poolLarge.Put(bufp)
|
||||||
|
}
|
||||||
|
|
||||||
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1984,7 +2002,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
|||||||
|
|
||||||
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
|
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
|
||||||
// legacy data dir means its old content, honor system umask.
|
// legacy data dir means its old content, honor system umask.
|
||||||
if err = reliableMkdirAll(legacyDataPath, 0777); err != nil {
|
if err = mkdirAll(legacyDataPath, 0777); err != nil {
|
||||||
return osErrToFileErr(err)
|
return osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user