diff --git a/object-common-multipart.go b/object-common-multipart.go index 33acfb7c5..5602031dc 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -312,7 +312,7 @@ func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPa "marker": markerPath, "recursive": recursive, }).Debugf("Walk resulted in an error %s", walkResult.err) - // 'File not found' or 'Disk not found' is a valid case. + // File not found or Disk not found is a valid case. if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { return nil, true, nil } diff --git a/xl-erasure-v1-common.go b/xl-erasure-v1-common.go index 08df66dd0..ef82300a4 100644 --- a/xl-erasure-v1-common.go +++ b/xl-erasure-v1-common.go @@ -48,6 +48,43 @@ func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64) return versions } +// errsToStorageErr - convert collection of errors into a single +// error based on total errors and read quorum. +func (xl XL) errsToStorageErr(errs []error) error { + notFoundCount := 0 + diskNotFoundCount := 0 + diskAccessDeniedCount := 0 + for _, err := range errs { + if err == errFileNotFound { + notFoundCount++ + } else if err == errDiskNotFound { + diskNotFoundCount++ + } else if err == errVolumeAccessDenied { + diskAccessDeniedCount++ + } + } + // If we have errors with 'file not found' greater than + // readQuroum, return as errFileNotFound. + if notFoundCount > len(xl.storageDisks)-xl.readQuorum { + return errFileNotFound + } + // If we have errors with disk not found equal to the + // number of disks, return as errDiskNotFound. + if diskNotFoundCount == len(xl.storageDisks) { + return errDiskNotFound + } else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + // If we have errors with 'disk not found' greater than + // readQuroum, return as errFileNotFound. + return errFileNotFound + } + // If we have errors with disk not found equal to the + // number of disks, return as errDiskNotFound. + if diskAccessDeniedCount == len(xl.storageDisks) { + return errVolumeAccessDenied + } + return nil +} + // Returns slice of online disks needed. // - slice returing readable disks. // - xlMetaV1 @@ -55,27 +92,9 @@ func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64) // - error if any. func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) { partsMetadata, errs := xl.getPartsMetadata(volume, path) - notFoundCount := 0 - diskNotFoundCount := 0 - for _, err := range errs { - if err == errFileNotFound || err == errDiskNotFound { - notFoundCount++ - // If we have errors with 'file not found' or 'disk not found' greater than - // writeQuroum, return as errFileNotFound. - if notFoundCount > len(xl.storageDisks)-xl.readQuorum { - return nil, xlMetaV1{}, false, errFileNotFound - } - } - if err == errDiskNotFound { - diskNotFoundCount++ - // If we have errors with disk not found equal to the - // number of disks, return as errDiskNotFound. - if diskNotFoundCount == len(xl.storageDisks) { - return nil, xlMetaV1{}, false, errDiskNotFound - } - } + if err = xl.errsToStorageErr(errs); err != nil { + return nil, xlMetaV1{}, false, err } - highestVersion := int64(0) onlineDisks = make([]StorageAPI, len(xl.storageDisks)) // List all the file versions from partsMetadata list. diff --git a/xl-erasure-v1-createfile.go b/xl-erasure-v1-createfile.go index 2ca770adc..4b6b6c35b 100644 --- a/xl-erasure-v1-createfile.go +++ b/xl-erasure-v1-createfile.go @@ -62,24 +62,18 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w partsMetadata, errs := xl.getPartsMetadata(volume, path) nsMutex.RUnlock(volume, path) - // Count errors other than fileNotFound, bigger than the allowed - // readQuorum, if yes throw an error. - metadataReadErrCount := 0 - for _, err := range errs { - if err != nil && err != errFileNotFound { - metadataReadErrCount++ - if metadataReadErrCount > xl.readQuorum { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Errorf("%s", err) - reader.CloseWithError(err) - return - } - } + // Convert errs into meaningful err to be sent upwards if possible + // based on total number of errors and read quorum. + err := xl.errsToStorageErr(errs) + if err != nil && err != errFileNotFound { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("%s", err) + reader.CloseWithError(err) + return } - var err error // List all the file versions on existing files. versions := listFileVersions(partsMetadata, errs) // Get highest file version. diff --git a/xl-objects.go b/xl-objects.go index c67c7c366..74ac10a9f 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -93,12 +93,14 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) { } else { log.Errorf("Unable to check backend format %s", err) if err == errReadQuorum { - errMsg := fmt.Sprintf("Not all disks %s on command line are available to meet the read quroum.", exportPaths) - return nil, errors.New(errMsg) - } - if err == errDiskNotFound { - errMsg := fmt.Sprintf("All disks %s on command line are not available.", exportPaths) - return nil, errors.New(errMsg) + errMsg := fmt.Sprintf("Not all disks %s are available, did not meet read quroum.", exportPaths) + err = errors.New(errMsg) + } else if err == errDiskNotFound { + errMsg := fmt.Sprintf("Disks %s not found.", exportPaths) + err = errors.New(errMsg) + } else if err == errVolumeAccessDenied { + errMsg := fmt.Sprintf("Disks %s access permission denied.", exportPaths) + err = errors.New(errMsg) } return nil, err }