Use isErrIgnored() function wherever applicable. (#3343)

This commit is contained in:
Bala FA 2016-11-23 20:05:04 -08:00 committed by Harshavardhana
parent 4ef2d8940c
commit 0f2e493c9a
14 changed files with 49 additions and 84 deletions

View File

@ -85,11 +85,9 @@ func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolic
for _, bucket := range buckets {
policy, pErr := readBucketPolicy(bucket.Name, objAPI)
if pErr != nil {
if !isErrIgnored(pErr, []error{
// net.Dial fails for rpc client or any
// other unexpected errors during net.Dial.
errDiskNotFound,
}) {
// net.Dial fails for rpc client or any
// other unexpected errors during net.Dial.
if !isErrIgnored(pErr, errDiskNotFound) {
if !isErrBucketPolicyNotFound(pErr) {
pErrs = append(pErrs, pErr)
}

View File

@ -120,3 +120,20 @@ func errorsCause(errs []error) []error {
}
return cerrs
}
var baseIgnoredErrs = []error{
errDiskNotFound,
errFaultyDisk,
errFaultyRemoteDisk,
}
// isErrIgnored returns whether given error is ignored or not.
func isErrIgnored(err error, ignoredErrs ...error) bool {
err = errorCause(err)
for _, ignoredErr := range ignoredErrs {
if ignoredErr == err {
return true
}
}
return false
}

View File

@ -438,22 +438,15 @@ func removeListenerConfig(bucket string, objAPI ObjectLayer) error {
// Loads both notification and listener config.
func loadNotificationAndListenerConfig(bucketName string, objAPI ObjectLayer) (nCfg *notificationConfig, lCfg []listenerConfig, err error) {
nConfigErrs := []error{
// When no previous notification configs were found.
errNoSuchNotifications,
// net.Dial fails for rpc client or any
// other unexpected errors during net.Dial.
errDiskNotFound,
}
// Loads notification config if any.
nCfg, err = loadNotificationConfig(bucketName, objAPI)
if err != nil && !isErrIgnored(err, nConfigErrs) {
if err != nil && !isErrIgnored(err, errDiskNotFound, errNoSuchNotifications) {
return nil, nil, err
}
// Loads listener config if any.
lCfg, err = loadListenerConfig(bucketName, objAPI)
if err != nil && !isErrIgnored(err, nConfigErrs) {
if err != nil && !isErrIgnored(err, errDiskNotFound, errNoSuchNotifications) {
return nil, nil, err
}
return nCfg, lCfg, nil

View File

@ -90,7 +90,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if isErrIgnored(walkResult.err, fsTreeWalkIgnoredErrs) {
if isErrIgnored(walkResult.err, fsTreeWalkIgnoredErrs...) {
eof = true
break
}

View File

@ -48,17 +48,6 @@ func init() {
globalObjLayerMutex = &sync.Mutex{}
}
// isErrIgnored should we ignore this error?, takes a list of errors which can be ignored.
func isErrIgnored(err error, ignoredErrs []error) bool {
err = errorCause(err)
for _, ignoredErr := range ignoredErrs {
if ignoredErr == err {
return true
}
}
return false
}
// House keeping code for FS/XL and distributed Minio setup.
func houseKeeping(storageDisks []StorageAPI) error {
var wg = &sync.WaitGroup{}
@ -83,9 +72,7 @@ func houseKeeping(storageDisks []StorageAPI) error {
// Cleanup all temp entries upon start.
err := cleanupDir(disk, minioMetaTmpBucket, "")
if err != nil {
switch errorCause(err) {
case errDiskNotFound, errVolumeNotFound, errFileNotFound:
default:
if !isErrIgnored(errorCause(err), errDiskNotFound, errVolumeNotFound, errFileNotFound) {
errs[index] = err
}
}
@ -196,12 +183,7 @@ func newStorageAPI(ep *url.URL) (storage StorageAPI, err error) {
return newStorageRPC(ep)
}
var initMetaVolIgnoredErrs = []error{
errVolumeExists,
errDiskNotFound,
errFaultyDisk,
errFaultyRemoteDisk,
}
var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists)
// Initializes meta volume on all input storage disks.
func initMetaVolume(storageDisks []StorageAPI) error {
@ -227,21 +209,21 @@ func initMetaVolume(storageDisks []StorageAPI) error {
// Attempt to create `.minio.sys`.
err := disk.MakeVol(minioMetaBucket)
if err != nil {
if !isErrIgnored(err, initMetaVolIgnoredErrs) {
if !isErrIgnored(err, initMetaVolIgnoredErrs...) {
errs[index] = err
}
return
}
err = disk.MakeVol(minioMetaTmpBucket)
if err != nil {
if !isErrIgnored(err, initMetaVolIgnoredErrs) {
if !isErrIgnored(err, initMetaVolIgnoredErrs...) {
errs[index] = err
}
return
}
err = disk.MakeVol(minioMetaMultipartBucket)
if err != nil {
if !isErrIgnored(err, initMetaVolIgnoredErrs) {
if !isErrIgnored(err, initMetaVolIgnoredErrs...) {
errs[index] = err
}
return

View File

@ -133,7 +133,7 @@ func listDirFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks ...Sto
}
// For any reason disk was deleted or goes offline, continue
// and list from other disks if possible.
if isErrIgnored(err, treeWalkIgnoredErrs) {
if isErrIgnored(err, treeWalkIgnoredErrs...) {
continue
}
break

View File

@ -21,17 +21,12 @@ import (
"sync"
)
// list all errors that can be ignore in a bucket operation.
var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
// list all errors that can be ignored in a bucket metadata operation.
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
// list all errors that can be ignore in a bucket operation.
var bucketOpIgnoredErrs = []error{
errFaultyDisk,
errFaultyRemoteDisk,
errDiskNotFound,
errDiskAccessDenied,
}
/// Bucket operations
// MakeBucket - make a bucket.
@ -143,7 +138,7 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err
}
err = traceError(err)
// For any reason disk went offline continue and pick the next one.
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
if isErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
continue
}
break
@ -219,7 +214,7 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) {
return bucketsInfo, nil
}
// Ignore any disks not found.
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
if isErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
continue
}
break

View File

@ -62,7 +62,7 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
return true
}
// Ignore for file not found, disk not found or faulty disk.
if isErrIgnored(err, xlTreeWalkIgnoredErrs) {
if isErrIgnored(err, xlTreeWalkIgnoredErrs...) {
continue
}
errorIf(err, "Unable to stat a file %s/%s/%s", bucket, prefix, xlMetaJSONFile)

View File

@ -184,7 +184,7 @@ func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}
continue
}
// Ignore any disks not found.
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
if isErrIgnored(err, bucketMetadataOpIgnoredErrs...) {
continue
}
break

View File

@ -206,15 +206,7 @@ func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) (xlMetaV1, error) {
}
// list of all errors that can be ignored in a metadata operation.
var objMetadataOpIgnoredErrs = []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errFaultyRemoteDisk,
errVolumeNotFound,
errFileAccessDenied,
errFileNotFound,
}
var objMetadataOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound, errFileAccessDenied)
// readXLMetaParts - returns the XL Metadata Parts from xl.json of one of the disks picked at random.
func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []objectPartInfo, err error) {
@ -228,7 +220,7 @@ func (xl xlObjects) readXLMetaParts(bucket, object string) (xlMetaParts []object
}
// For any reason disk or bucket is not available continue
// and read from other disks.
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
@ -250,7 +242,7 @@ func (xl xlObjects) readXLMetaStat(bucket, object string) (xlStat statInfo, xlMe
}
// For any reason disk or bucket is not available continue
// and read from other disks.
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break

View File

@ -166,7 +166,7 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
return true
}
// For any reason disk was deleted or goes offline, continue
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
@ -213,7 +213,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
}
err = traceError(err)
// For any reason disk was deleted or goes offline we continue to next disk.
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}

View File

@ -76,7 +76,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if err == nil {
break
}
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
@ -110,7 +110,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
if isErrIgnored(walkResult.err, xlTreeWalkIgnoredErrs) {
if isErrIgnored(walkResult.err, xlTreeWalkIgnoredErrs...) {
continue
}
return ListMultipartsInfo{}, err
@ -147,14 +147,14 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if err == nil {
break
}
if isErrIgnored(err, objMetadataOpIgnoredErrs) {
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
}
entryLock.RUnlock()
if err != nil {
if isErrIgnored(err, xlTreeWalkIgnoredErrs) {
if isErrIgnored(err, xlTreeWalkIgnoredErrs...) {
continue
}
return ListMultipartsInfo{}, err

View File

@ -35,7 +35,7 @@ func reduceErrs(errs []error, ignoredErrs []error) (maxCount int, maxErr error)
errorCounts := make(map[error]int)
errs = errorsCause(errs)
for _, err := range errs {
if isErrIgnored(err, ignoredErrs) {
if isErrIgnored(err, ignoredErrs...) {
continue
}
errorCounts[err]++
@ -83,12 +83,7 @@ func reduceWriteQuorumErrs(errs []error, ignoredErrs []error, writeQuorum int) (
}
// List of all errors which are ignored while verifying quorum.
var quorumIgnoredErrs = []error{
errFaultyDisk,
errFaultyRemoteDisk,
errDiskNotFound,
errDiskAccessDenied,
}
var quorumIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
// Validates if we have quorum based on the errors related to disk only.
// Returns 'true' if we have quorum, 'false' if we don't.
@ -97,7 +92,7 @@ func isDiskQuorum(errs []error, minQuorumCount int) bool {
errs = errorsCause(errs)
for _, err := range errs {
// Check if the error can be ignored for quorum verification.
if !isErrIgnored(err, quorumIgnoredErrs) {
if !isErrIgnored(err, quorumIgnoredErrs...) {
count++
}
}

View File

@ -72,14 +72,7 @@ type xlObjects struct {
}
// list of all errors that can be ignored in tree walk operation in XL
var xlTreeWalkIgnoredErrs = []error{
errFileNotFound,
errVolumeNotFound,
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errFaultyRemoteDisk,
}
var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound)
// newXLObjects - initialize new xl object layer.
func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {