Convert errors tracer into a separate package (#5221)

This commit is contained in:
Harshavardhana
2017-11-25 11:58:29 -08:00
committed by GitHub
parent 6e6aeb6a9e
commit 8efa82126b
82 changed files with 1117 additions and 896 deletions

View File

@@ -26,6 +26,7 @@ import (
"sync"
"time"
"github.com/minio/minio/pkg/errors"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/mimedb"
)
@@ -43,7 +44,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
wg := sync.WaitGroup{}
for index, disk := range xl.storageDisks {
if disk == nil {
errs[index] = traceError(errDiskNotFound)
errs[index] = errors.Trace(errDiskNotFound)
continue
}
// Update `uploads.json` in a go routine.
@@ -53,7 +54,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
// read and parse uploads.json on this disk
uploadsJSON, err := readUploadsJSON(bucket, object, disk)
if errorCause(err) == errFileNotFound {
if errors.Cause(err) == errFileNotFound {
// If file is not found, we assume an
// default (empty) upload info.
uploadsJSON, err = newUploadsV1("xl"), nil
@@ -84,7 +85,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
} else {
wErr := disk.RenameFile(minioMetaMultipartBucket, uploadsPath, minioMetaTmpBucket, tmpUploadsPath)
if wErr != nil {
errs[index] = traceError(wErr)
errs[index] = errors.Trace(wErr)
}
}
@@ -95,7 +96,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
wg.Wait()
err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
if errorCause(err) == errXLWriteQuorum {
if errors.Cause(err) == errXLWriteQuorum {
// No quorum. Perform cleanup on the minority of disks
// on which the operation succeeded.
@@ -170,7 +171,7 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
return true
}
// For any reason disk was deleted or goes offline, continue
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
@@ -218,12 +219,12 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
return fileInfo, nil
}
// For any reason disk was deleted or goes offline we continue to next disk.
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
ignoredErrs = append(ignoredErrs, err)
continue
}
// Error is not ignored, return right here.
return FileInfo{}, traceError(err)
return FileInfo{}, errors.Trace(err)
}
// If all errors were ignored, reduce to maximal occurrence
// based on the read quorum.
@@ -241,7 +242,7 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
// Rename `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = traceError(errDiskNotFound)
mErrs[index] = errors.Trace(errDiskNotFound)
continue
}
wg.Add(1)
@@ -254,7 +255,7 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
// Renames `xl.json` from source prefix to destination prefix.
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
if rErr != nil {
mErrs[index] = traceError(rErr)
mErrs[index] = errors.Trace(rErr)
return
}
mErrs[index] = nil
@@ -264,7 +265,7 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
wg.Wait()
err := reduceWriteQuorumErrs(mErrs, objectOpIgnoredErrs, quorum)
if errorCause(err) == errXLWriteQuorum {
if errors.Cause(err) == errXLWriteQuorum {
// Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs)
}
@@ -317,7 +318,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if err == nil {
break
}
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
@@ -386,14 +387,14 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if err == nil {
break
}
if isErrIgnored(err, objMetadataOpIgnoredErrs...) {
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
}
entryLock.RUnlock()
if err != nil {
if isErrIgnored(err, xlTreeWalkIgnoredErrs...) {
if errors.IsErrIgnored(err, xlTreeWalkIgnoredErrs...) {
continue
}
return lmi, err
@@ -585,7 +586,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
return pi, toObjectErr(traceError(errInvalidArgument))
return pi, toObjectErr(errors.Trace(errInvalidArgument))
}
var partsMetadata []xlMetaV1
@@ -601,14 +602,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Validates if upload ID exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
preUploadIDLock.RUnlock()
return pi, traceError(InvalidUploadID{UploadID: uploadID})
return pi, errors.Trace(InvalidUploadID{UploadID: uploadID})
}
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket,
uploadIDPath)
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
if errorCause(reducedErr) == errXLWriteQuorum {
if errors.Cause(reducedErr) == errXLWriteQuorum {
preUploadIDLock.RUnlock()
return pi, toObjectErr(reducedErr, bucket, object)
}
@@ -654,7 +655,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if file.Size < data.Size() {
return pi, traceError(IncompleteBody{})
return pi, errors.Trace(IncompleteBody{})
}
// post-upload check (write) lock
@@ -666,7 +667,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Validate again if upload ID still exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
return pi, traceError(InvalidUploadID{UploadID: uploadID})
return pi, errors.Trace(InvalidUploadID{UploadID: uploadID})
}
// Rename temporary part file to its final location.
@@ -679,7 +680,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaMultipartBucket, uploadIDPath)
reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
if errorCause(reducedErr) == errXLWriteQuorum {
if errors.Cause(reducedErr) == errXLWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
@@ -820,7 +821,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
return lpi, traceError(InvalidUploadID{UploadID: uploadID})
return lpi, errors.Trace(InvalidUploadID{UploadID: uploadID})
}
result, err := xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
return result, err
@@ -851,13 +852,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
return oi, traceError(InvalidUploadID{UploadID: uploadID})
return oi, errors.Trace(InvalidUploadID{UploadID: uploadID})
}
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return oi, toObjectErr(traceError(errFileAccessDenied), bucket, object)
return oi, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
}
// Calculate s3 compatible md5sum for complete multipart.
@@ -871,7 +872,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
if errorCause(reducedErr) == errXLWriteQuorum {
if errors.Cause(reducedErr) == errXLWriteQuorum {
return oi, toObjectErr(reducedErr, bucket, object)
}
@@ -903,17 +904,17 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
partIdx := objectPartIndex(currentXLMeta.Parts, part.PartNumber)
// All parts should have same part number.
if partIdx == -1 {
return oi, traceError(InvalidPart{})
return oi, errors.Trace(InvalidPart{})
}
// All parts should have same ETag as previously generated.
if currentXLMeta.Parts[partIdx].ETag != part.ETag {
return oi, traceError(InvalidPart{})
return oi, errors.Trace(InvalidPart{})
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) {
return oi, traceError(PartTooSmall{
return oi, errors.Trace(PartTooSmall{
PartNumber: part.PartNumber,
PartSize: currentXLMeta.Parts[partIdx].Size,
PartETag: part.ETag,
@@ -1057,7 +1058,7 @@ func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error
// Cleanup uploadID for all disks.
for index, disk := range xl.storageDisks {
if disk == nil {
errs[index] = traceError(errDiskNotFound)
errs[index] = errors.Trace(errDiskNotFound)
continue
}
wg.Add(1)
@@ -1131,7 +1132,7 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
return traceError(InvalidUploadID{UploadID: uploadID})
return errors.Trace(InvalidUploadID{UploadID: uploadID})
}
return xl.abortMultipartUpload(bucket, object, uploadID)
}