diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 1c7d5b293..0632682a5 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -572,6 +572,7 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) { if err == nil { return ErrNone } + err = errorCause(err) // Verify if the underlying error is signature mismatch. switch err { case errSignatureMismatch: diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 161e25eed..e6a5b826e 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -187,7 +187,6 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht // Save bucket policy. if err = writeBucketPolicy(bucket, objAPI, bytes.NewReader(policyBytes), int64(len(policyBytes))); err != nil { - errorIf(err, "Unable to write bucket policy.") switch err.(type) { case BucketNameInvalid: writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) @@ -232,7 +231,6 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r // Delete bucket access policy. if err := removeBucketPolicy(bucket, objAPI); err != nil { - errorIf(err, "Unable to remove bucket policy.") switch err.(type) { case BucketNameInvalid: writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) diff --git a/cmd/bucket-policy.go b/cmd/bucket-policy.go index 8beb362e4..7e8d467bd 100644 --- a/cmd/bucket-policy.go +++ b/cmd/bucket-policy.go @@ -66,6 +66,9 @@ func (bp *bucketPolicies) RemoveBucketPolicy(bucket string) { func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolicy, err error) { // List buckets to proceed loading all notification configuration. buckets, err := objAPI.ListBuckets() + errorIf(err, "Unable to list buckets.") + err = errorCause(err) + if err != nil { return nil, err } @@ -86,7 +89,6 @@ func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolic // Success. return policies, nil - } // Intialize all bucket policies. @@ -128,6 +130,8 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader } policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath) + errorIf(err, "Unable to get policy for the bucket %s.", bucket) + err = errorCause(err) if err != nil { if _, ok := err.(ObjectNotFound); ok { return nil, BucketPolicyNotFound{Bucket: bucket} @@ -136,6 +140,8 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader } var buffer bytes.Buffer err = objAPI.GetObject(minioMetaBucket, policyPath, 0, objInfo.Size, &buffer) + errorIf(err, "Unable to get policy for the bucket %s.", bucket) + err = errorCause(err) if err != nil { if _, ok := err.(ObjectNotFound); ok { return nil, BucketPolicyNotFound{Bucket: bucket} @@ -174,6 +180,8 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error { } policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil { + errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket) + err = errorCause(err) if _, ok := err.(ObjectNotFound); ok { return BucketPolicyNotFound{Bucket: bucket} } @@ -190,6 +198,9 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, reader io.Reader, size } policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) - _, err := objAPI.PutObject(minioMetaBucket, policyPath, size, reader, nil) - return err + if _, err := objAPI.PutObject(minioMetaBucket, policyPath, size, reader, nil); err != nil { + errorIf(err, "Unable to set policy for the bucket %s", bucket) + return errorCause(err) + } + return nil } diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index 28b807108..eeadef267 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -41,7 +41,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader // FIXME: this is a bug in Golang, n == 0 and err == // io.ErrUnexpectedEOF for io.ReadFull function. if n == 0 && rErr == io.ErrUnexpectedEOF { - return 0, nil, rErr + return 0, nil, traceError(rErr) } if rErr == io.EOF { // We have reached EOF on the first byte read, io.Reader @@ -58,7 +58,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader break } if rErr != nil && rErr != io.ErrUnexpectedEOF { - return 0, nil, rErr + return 0, nil, traceError(rErr) } if n > 0 { // Returns encoded blocks. @@ -88,19 +88,19 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) { rs, err := reedsolomon.New(dataBlocks, parityBlocks) if err != nil { - return nil, err + return nil, traceError(err) } // Split the input buffer into data and parity blocks. var blocks [][]byte blocks, err = rs.Split(dataBuffer) if err != nil { - return nil, err + return nil, traceError(err) } // Encode parity blocks using data blocks. err = rs.Encode(blocks) if err != nil { - return nil, err + return nil, traceError(err) } // Return encoded blocks. @@ -122,7 +122,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash defer wg.Done() wErr := disk.AppendFile(volume, path, enBlocks[index]) if wErr != nil { - wErrs[index] = wErr + wErrs[index] = traceError(wErr) return } @@ -139,7 +139,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash // Do we have write quorum?. if !isDiskQuorum(wErrs, writeQuorum) { - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } return nil } diff --git a/cmd/erasure-createfile_test.go b/cmd/erasure-createfile_test.go index 5796202f2..6371473f5 100644 --- a/cmd/erasure-createfile_test.go +++ b/cmd/erasure-createfile_test.go @@ -93,8 +93,8 @@ func TestErasureCreateFile(t *testing.T) { // 1 more disk down. 7 disk down in total. Should return quorum error. disks[10] = AppendDiskDown{disks[10].(*posix)} _, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) - if err != errXLWriteQuorum { - t.Errorf("erasureCreateFile returned expected errXLWriteQuorum error, got %s", err) + if errorCause(err) != errXLWriteQuorum { + t.Errorf("erasureCreateFile return value: expected errXLWriteQuorum, got %s", err) } } @@ -195,7 +195,7 @@ func TestErasureEncode(t *testing.T) { } // Failed as expected, but does it fail for the expected reason. if actualErr != nil && !testCase.shouldPass { - if testCase.expectedErr != actualErr { + if errorCause(actualErr) != testCase.expectedErr { t.Errorf("Test %d: Expected Error to be \"%v\", but instead found \"%v\" ", i+1, testCase.expectedErr, actualErr) } } diff --git a/cmd/erasure-healfile.go b/cmd/erasure-healfile.go index 56ae7de65..5d029ad5c 100644 --- a/cmd/erasure-healfile.go +++ b/cmd/erasure-healfile.go @@ -64,7 +64,7 @@ func erasureHealFile(latestDisks []StorageAPI, outDatedDisks []StorageAPI, volum } err := disk.AppendFile(healBucket, healPath, enBlocks[index]) if err != nil { - return nil, err + return nil, traceError(err) } hashWriters[index].Write(enBlocks[index]) } diff --git a/cmd/erasure-readfile.go b/cmd/erasure-readfile.go index df20100ee..9f521eec6 100644 --- a/cmd/erasure-readfile.go +++ b/cmd/erasure-readfile.go @@ -84,10 +84,10 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis // Sanity checks - we should never have this situation. if dataDisks == dataBlocks { - return nil, 0, errUnexpected + return nil, 0, traceError(errUnexpected) } if dataDisks+parityDisks >= dataBlocks { - return nil, 0, errUnexpected + return nil, 0, traceError(errUnexpected) } // Find the disks from which next set of parallel reads should happen. @@ -107,7 +107,7 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis return readDisks, i + 1, nil } } - return nil, 0, errXLReadQuorum + return nil, 0, traceError(errXLReadQuorum) } // parallelRead - reads chunks in parallel from the disks specified in []readDisks. @@ -161,12 +161,12 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string, algo string, pool *bpool.BytePool) (int64, error) { // Offset and length cannot be negative. if offset < 0 || length < 0 { - return 0, errUnexpected + return 0, traceError(errUnexpected) } // Can't request more data than what is available. if offset+length > totalLength { - return 0, errUnexpected + return 0, traceError(errUnexpected) } // chunkSize is the amount of data that needs to be read from each disk at a time. @@ -248,7 +248,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } if nextIndex == len(disks) { // No more disks to read from. - return bytesWritten, errXLReadQuorum + return bytesWritten, traceError(errXLReadQuorum) } // We do not have enough enough data blocks to reconstruct the data // hence continue the for-loop till we have enough data blocks. @@ -325,24 +325,24 @@ func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { // Initialized reedsolomon. rs, err := reedsolomon.New(dataBlocks, parityBlocks) if err != nil { - return err + return traceError(err) } // Reconstruct encoded blocks. err = rs.Reconstruct(enBlocks) if err != nil { - return err + return traceError(err) } // Verify reconstructed blocks (parity). ok, err := rs.Verify(enBlocks) if err != nil { - return err + return traceError(err) } if !ok { // Blocks cannot be reconstructed, corrupted data. err = errors.New("Verification failed after reconstruction, data likely corrupted.") - return err + return traceError(err) } // Success. diff --git a/cmd/erasure-readfile_test.go b/cmd/erasure-readfile_test.go index 7b9595a4b..0d972062f 100644 --- a/cmd/erasure-readfile_test.go +++ b/cmd/erasure-readfile_test.go @@ -104,7 +104,7 @@ func testGetReadDisks(t *testing.T, xl xlObjects) { for i, test := range testCases { disks, nextIndex, err := getReadDisks(test.argDisks, test.index, xl.dataBlocks) - if err != test.err { + if errorCause(err) != test.err { t.Errorf("test-case %d - expected error : %s, got : %s", i+1, test.err, err) continue } @@ -319,7 +319,7 @@ func TestErasureReadFileDiskFail(t *testing.T) { disks[13] = ReadDiskDown{disks[13].(*posix)} buf.Reset() _, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool) - if err != errXLReadQuorum { + if errorCause(err) != errXLReadQuorum { t.Fatal("expected errXLReadQuorum error") } } diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index ec8a55f57..2f05f027a 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -76,17 +76,17 @@ func getDataBlockLen(enBlocks [][]byte, dataBlocks int) int { func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset int64, length int64) (int64, error) { // Offset and out size cannot be negative. if offset < 0 || length < 0 { - return 0, errUnexpected + return 0, traceError(errUnexpected) } // Do we have enough blocks? if len(enBlocks) < dataBlocks { - return 0, reedsolomon.ErrTooFewShards + return 0, traceError(reedsolomon.ErrTooFewShards) } // Do we have enough data? if int64(getDataBlockLen(enBlocks, dataBlocks)) < length { - return 0, reedsolomon.ErrShortData + return 0, traceError(reedsolomon.ErrShortData) } // Counter to decrement total left to write. @@ -114,7 +114,7 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset in if write < int64(len(block)) { n, err := io.Copy(dst, bytes.NewReader(block[:write])) if err != nil { - return 0, err + return 0, traceError(err) } totalWritten += n break @@ -122,7 +122,7 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset in // Copy the block. n, err := io.Copy(dst, bytes.NewReader(block)) if err != nil { - return 0, err + return 0, traceError(err) } // Decrement output size. diff --git a/cmd/errors.go b/cmd/errors.go new file mode 100644 index 000000000..b6fbf1984 --- /dev/null +++ b/cmd/errors.go @@ -0,0 +1,122 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "strings" +) + +// Holds the current directory path. Used for trimming path in traceError() +var rootPath string + +// Figure out the rootPath +func initError() { + // Root path is automatically determined from the calling function's source file location. + // Catch the calling function's source file path. + _, file, _, _ := runtime.Caller(1) + // Save the directory alone. + rootPath = filepath.Dir(file) +} + +// Represents a stack frame in the stack trace. +type traceInfo struct { + file string // File where error occurred + line int // Line where error occurred + name string // Name of the function where error occurred +} + +// Error - error type containing cause and the stack trace. +type Error struct { + e error // Holds the cause error + trace []traceInfo // stack trace + errs []error // Useful for XL to hold errors from all disks +} + +// Implement error interface. +func (e Error) Error() string { + return e.e.Error() +} + +// Trace - returns stack trace. +func (e Error) Trace() []string { + var traceArr []string + for _, info := range e.trace { + traceArr = append(traceArr, fmt.Sprintf("%s:%d:%s", + info.file, info.line, info.name)) + } + return traceArr +} + +// NewStorageError - return new Error type. +func traceError(e error, errs ...error) error { + if e == nil { + return nil + } + err := &Error{} + err.e = e + err.errs = errs + + stack := make([]uintptr, 40) + length := runtime.Callers(2, stack) + if length > len(stack) { + length = len(stack) + } + stack = stack[:length] + + for _, pc := range stack { + pc = pc - 1 + fn := runtime.FuncForPC(pc) + file, line := fn.FileLine(pc) + name := fn.Name() + if strings.HasSuffix(name, "ServeHTTP") { + break + } + if strings.HasSuffix(name, "runtime.") { + break + } + + file = strings.TrimPrefix(file, rootPath+string(os.PathSeparator)) + name = strings.TrimPrefix(name, "github.com/minio/minio/cmd.") + err.trace = append(err.trace, traceInfo{file, line, name}) + } + + return err +} + +// Returns the underlying cause error. +func errorCause(err error) error { + if e, ok := err.(*Error); ok { + err = e.e + } + return err +} + +// Returns slice of underlying cause error. +func errorsCause(errs []error) []error { + Errs := make([]error, len(errs)) + for i, err := range errs { + if err == nil { + continue + } + Errs[i] = errorCause(err) + } + return Errs +} diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index e5ed987be..a6d8c8665 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -227,6 +227,8 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon // Construct the notification config path. notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) + errorIf(err, "Unable to get bucket-notification for butkcet %s", bucket) + err = errorCause(err) if err != nil { // 'notification.xml' not found return 'errNoSuchNotifications'. // This is default when no bucket notifications are found on the bucket. @@ -239,6 +241,8 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon } var buffer bytes.Buffer err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer) + errorIf(err, "Unable to get bucket-notification for butkcet %s", bucket) + err = errorCause(err) if err != nil { // 'notification.xml' not found return 'errNoSuchNotifications'. // This is default when no bucket notifications are found on the bucket. diff --git a/cmd/fs-createfile.go b/cmd/fs-createfile.go index 2e32c6fbb..a0ec0a6c7 100644 --- a/cmd/fs-createfile.go +++ b/cmd/fs-createfile.go @@ -24,13 +24,13 @@ func fsCreateFile(disk StorageAPI, reader io.Reader, buf []byte, tmpBucket, temp for { n, rErr := reader.Read(buf) if rErr != nil && rErr != io.EOF { - return 0, rErr + return 0, traceError(rErr) } bytesWritten += int64(n) if n > 0 { wErr := disk.AppendFile(tmpBucket, tempObj, buf[0:n]) if wErr != nil { - return 0, wErr + return 0, traceError(wErr) } } if rErr == io.EOF { diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index 365b800c7..6f1f6346d 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -81,12 +81,12 @@ func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1, // Read all `fs.json`. buf, err := disk.ReadAll(bucket, filePath) if err != nil { - return fsMetaV1{}, err + return fsMetaV1{}, traceError(err) } // Decode `fs.json` into fsMeta structure. if err = json.Unmarshal(buf, &fsMeta); err != nil { - return fsMetaV1{}, err + return fsMetaV1{}, traceError(err) } // Success. @@ -94,16 +94,23 @@ func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1, } // Write fsMeta to fs.json or fs-append.json. -func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) (err error) { +func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) error { tmpPath := path.Join(tmpMetaPrefix, getUUID()) metadataBytes, err := json.Marshal(fsMeta) if err != nil { - return err + return traceError(err) } if err = disk.AppendFile(minioMetaBucket, tmpPath, metadataBytes); err != nil { - return err + return traceError(err) } - return disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath) + err = disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath) + if err != nil { + err = disk.DeleteFile(minioMetaBucket, tmpPath) + if err != nil { + return traceError(err) + } + } + return nil } // newFSMetaV1 - initializes new fsMetaV1. diff --git a/cmd/fs-v1-multipart-common.go b/cmd/fs-v1-multipart-common.go index c3ccdbdf7..3baf39242 100644 --- a/cmd/fs-v1-multipart-common.go +++ b/cmd/fs-v1-multipart-common.go @@ -64,8 +64,8 @@ func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated t var uploadsJSON uploadsV1 uploadsJSON, err = readUploadsJSON(bucket, object, fs.storage) if err != nil { - // For any other errors. - if err != errFileNotFound { + // uploads.json might not exist hence ignore errFileNotFound. + if errorCause(err) != errFileNotFound { return err } // Set uploads format to `fs`. @@ -77,18 +77,18 @@ func (fs fsObjects) writeUploadJSON(bucket, object, uploadID string, initiated t // Update `uploads.json` on all disks. uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON) if wErr != nil { - return wErr + return traceError(wErr) } // Write `uploads.json` to disk. if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil { - return wErr + return traceError(wErr) } wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) if wErr != nil { if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil { - return dErr + return traceError(dErr) } - return wErr + return traceError(wErr) } return nil } @@ -100,13 +100,13 @@ func (fs fsObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) uploadsBytes, wErr := json.Marshal(uploadsJSON) if wErr != nil { - return wErr + return traceError(wErr) } if wErr = fs.storage.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil { - return wErr + return traceError(wErr) } if wErr = fs.storage.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil { - return wErr + return traceError(wErr) } return nil } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index eba83a0ef..e46d1d39d 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -94,7 +94,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark eof = true break } - return ListMultipartsInfo{}, err + return ListMultipartsInfo{}, walkResult.err } entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket))) if strings.HasSuffix(walkResult.entry, slashSeparator) { @@ -176,42 +176,42 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { // Validate input arguments. if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListMultipartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } if !fs.isBucketExist(bucket) { - return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket} + return ListMultipartsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectPrefix(prefix) { - return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + return ListMultipartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashSeparator { - return ListMultipartsInfo{}, UnsupportedDelimiter{ + return ListMultipartsInfo{}, traceError(UnsupportedDelimiter{ Delimiter: delimiter, - } + }) } // Verify if marker has prefix. if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { - return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ + return ListMultipartsInfo{}, traceError(InvalidMarkerPrefixCombination{ Marker: keyMarker, Prefix: prefix, - } + }) } if uploadIDMarker != "" { if strings.HasSuffix(keyMarker, slashSeparator) { - return ListMultipartsInfo{}, InvalidUploadIDKeyCombination{ + return ListMultipartsInfo{}, traceError(InvalidUploadIDKeyCombination{ UploadIDMarker: uploadIDMarker, KeyMarker: keyMarker, - } + }) } id, err := uuid.Parse(uploadIDMarker) if err != nil { - return ListMultipartsInfo{}, err + return ListMultipartsInfo{}, traceError(err) } if id.IsZero() { - return ListMultipartsInfo{}, MalformedUploadID{ + return ListMultipartsInfo{}, traceError(MalformedUploadID{ UploadID: uploadIDMarker, - } + }) } } return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) @@ -247,9 +247,9 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil { return "", err } - fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) - if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, fsMetaPath) + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } // Return success. return uploadID, nil @@ -263,15 +263,15 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { // Verify if bucket name is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !fs.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } // Verify if object name is valid. if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} + return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } return fs.newMultipartUpload(bucket, object, meta) } @@ -404,14 +404,14 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) { func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !fs.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} + return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -425,7 +425,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID) nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) if !uploadIDExists { - return "", InvalidUploadID{UploadID: uploadID} + return "", traceError(InvalidUploadID{UploadID: uploadID}) } partSuffix := fmt.Sprintf("object%d", partID) @@ -459,7 +459,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // bytes than specified in request header. if bytesWritten < size { fs.storage.DeleteFile(minioMetaBucket, tmpPartPath) - return "", IncompleteBody{} + return "", traceError(IncompleteBody{}) } // Validate if payload is valid. @@ -468,7 +468,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Incoming payload wrong, delete the temporary object. fs.storage.DeleteFile(minioMetaBucket, tmpPartPath) // Error return. - return "", toObjectErr(err, bucket, object) + return "", toObjectErr(traceError(err), bucket, object) } } @@ -478,7 +478,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // MD5 mismatch, delete the temporary object. fs.storage.DeleteFile(minioMetaBucket, tmpPartPath) // Returns md5 mismatch. - return "", BadDigest{md5Hex, newMD5Hex} + return "", traceError(BadDigest{md5Hex, newMD5Hex}) } } @@ -492,7 +492,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Just check if the uploadID exists to avoid copy if it doesn't. if !fs.isUploadIDExists(bucket, object, uploadID) { - return "", InvalidUploadID{UploadID: uploadID} + return "", traceError(InvalidUploadID{UploadID: uploadID}) } fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile) @@ -506,13 +506,13 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s err = fs.storage.RenameFile(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) if err != nil { if dErr := fs.storage.DeleteFile(minioMetaBucket, tmpPartPath); dErr != nil { - return "", toObjectErr(dErr, minioMetaBucket, tmpPartPath) + return "", toObjectErr(traceError(dErr), minioMetaBucket, tmpPartPath) } - return "", toObjectErr(err, minioMetaBucket, partPath) + return "", toObjectErr(traceError(err), minioMetaBucket, partPath) } - - if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, fsMetaPath) + uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) + if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } go appendParts(fs.storage, bucket, object, uploadID, opsID) return newMD5Hex, nil @@ -541,7 +541,7 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM partNamePath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name) fi, err = fs.storage.StatFile(minioMetaBucket, partNamePath) if err != nil { - return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, partNamePath) + return ListPartsInfo{}, toObjectErr(traceError(err), minioMetaBucket, partNamePath) } result.Parts = append(result.Parts, partInfo{ PartNumber: part.Number, @@ -579,14 +579,14 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListPartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !fs.isBucketExist(bucket) { - return ListPartsInfo{}, BucketNotFound{Bucket: bucket} + return ListPartsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -598,7 +598,7 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !fs.isUploadIDExists(bucket, object, uploadID) { - return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) } return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) } @@ -612,17 +612,17 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !fs.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return "", ObjectNameInvalid{ + return "", traceError(ObjectNameInvalid{ Bucket: bucket, Object: object, - } + }) } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -638,7 +638,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID) if !fs.isUploadIDExists(bucket, object, uploadID) { - return "", InvalidUploadID{UploadID: uploadID} + return "", traceError(InvalidUploadID{UploadID: uploadID}) } // fs-append.json path @@ -650,21 +650,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := completeMultipartMD5(parts...) if err != nil { - return "", err + return "", traceError(err) } // Read saved fs metadata for ongoing multipart. fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile) fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath) if err != nil { - return "", toObjectErr(err, minioMetaBucket, fsMetaPath) + return "", toObjectErr(traceError(err), minioMetaBucket, fsMetaPath) } fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsAppendMetaPath) if err == nil && isPartsSame(fsAppendMeta.Parts, parts) { fsAppendDataPath := getFSAppendDataPath(uploadID) if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil { - return "", toObjectErr(err, minioMetaBucket, fsAppendDataPath) + return "", toObjectErr(traceError(err), minioMetaBucket, fsAppendDataPath) } // Remove the append-file metadata file in tmp location as we no longer need it. fs.storage.DeleteFile(minioMetaBucket, fsAppendMetaPath) @@ -678,18 +678,18 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload for i, part := range parts { partIdx := fsMeta.ObjectPartIndex(part.PartNumber) if partIdx == -1 { - return "", InvalidPart{} + return "", traceError(InvalidPart{}) } if fsMeta.Parts[partIdx].ETag != part.ETag { - return "", BadDigest{} + return "", traceError(BadDigest{}) } // All parts except the last part has to be atleast 5MB. if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) { - return "", PartTooSmall{ + return "", traceError(PartTooSmall{ PartNumber: part.PartNumber, PartSize: fsMeta.Parts[partIdx].Size, PartETag: part.ETag, - } + }) } // Construct part suffix. partSuffix := fmt.Sprintf("object%d", part.PartNumber) @@ -705,7 +705,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf[:curLeft]) if n > 0 { if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil { - return "", toObjectErr(err, minioMetaBucket, tempObj) + return "", toObjectErr(traceError(err), minioMetaBucket, tempObj) } } if err != nil { @@ -713,9 +713,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload break } if err == errFileNotFound { - return "", InvalidPart{} + return "", traceError(InvalidPart{}) } - return "", toObjectErr(err, minioMetaBucket, multipartPartFile) + return "", toObjectErr(traceError(err), minioMetaBucket, multipartPartFile) } offset += n totalLeft -= n @@ -726,9 +726,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) if err != nil { if dErr := fs.storage.DeleteFile(minioMetaBucket, tempObj); dErr != nil { - return "", toObjectErr(dErr, minioMetaBucket, tempObj) + return "", toObjectErr(traceError(dErr), minioMetaBucket, tempObj) } - return "", toObjectErr(err, bucket, object) + return "", toObjectErr(traceError(err), bucket, object) } } @@ -742,7 +742,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } fsMeta.Meta["md5Sum"] = s3MD5 - fsMetaPath = path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile) + fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile) + // Write the metadata to a temp file and rename it to the actual location. if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { return "", toObjectErr(err, bucket, object) } @@ -750,7 +751,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // Cleanup all the parts if everything else has been safely committed. if err = cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil { - return "", toObjectErr(err, bucket, object) + return "", toObjectErr(traceError(err), bucket, object) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -766,7 +767,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // the object, if yes do not attempt to delete 'uploads.json'. uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage) if err != nil { - return "", toObjectErr(err, minioMetaBucket, object) + return "", toObjectErr(traceError(err), minioMetaBucket, object) } // If we have successfully read `uploads.json`, then we proceed to // purge or update `uploads.json`. @@ -776,14 +777,14 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } if len(uploadsJSON.Uploads) > 0 { if err = fs.updateUploadsJSON(bucket, object, uploadsJSON); err != nil { - return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + return "", toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } // Return success. return s3MD5, nil } if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil { - return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + return "", toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } // Return md5sum. @@ -820,7 +821,7 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error } // No more pending uploads for the object, we purge the entire // entry at '.minio/multipart/bucket/object'. if err = fs.storage.DeleteFile(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)); err != nil { - return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + return toObjectErr(traceError(err), minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) } return nil } @@ -840,13 +841,13 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } if !fs.isBucketExist(bucket) { - return BucketNotFound{Bucket: bucket} + return traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -858,7 +859,7 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !fs.isUploadIDExists(bucket, object, uploadID) { - return InvalidUploadID{UploadID: uploadID} + return traceError(InvalidUploadID{UploadID: uploadID}) } fsAppendMetaPath := getFSAppendMetaPath(uploadID) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 9c2f858b6..6beb2c674 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -159,10 +159,10 @@ func (fs fsObjects) StorageInfo() StorageInfo { func (fs fsObjects) MakeBucket(bucket string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } if err := fs.storage.MakeVol(bucket); err != nil { - return toObjectErr(err, bucket) + return toObjectErr(traceError(err), bucket) } return nil } @@ -171,11 +171,11 @@ func (fs fsObjects) MakeBucket(bucket string) error { func (fs fsObjects) GetBucketInfo(bucket string) (BucketInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketInfo{}, BucketNameInvalid{Bucket: bucket} + return BucketInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } vi, err := fs.storage.StatVol(bucket) if err != nil { - return BucketInfo{}, toObjectErr(err, bucket) + return BucketInfo{}, toObjectErr(traceError(err), bucket) } return BucketInfo{ Name: bucket, @@ -188,7 +188,7 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) { var bucketInfos []BucketInfo vols, err := fs.storage.ListVols() if err != nil { - return nil, toObjectErr(err) + return nil, toObjectErr(traceError(err)) } for _, vol := range vols { // StorageAPI can send volume names which are incompatible @@ -213,11 +213,11 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) { func (fs fsObjects) DeleteBucket(bucket string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } // Attempt to delete regular bucket. if err := fs.storage.DeleteVol(bucket); err != nil { - return toObjectErr(err, bucket) + return toObjectErr(traceError(err), bucket) } // Cleanup all the previously incomplete multiparts. if err := cleanupDir(fs.storage, path.Join(minioMetaBucket, mpartMetaPrefix), bucket); err != nil && err != errVolumeNotFound { @@ -232,34 +232,34 @@ func (fs fsObjects) DeleteBucket(bucket string) error { func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if object is valid. if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // Offset and length cannot be negative. if offset < 0 || length < 0 { - return toObjectErr(errUnexpected, bucket, object) + return toObjectErr(traceError(errUnexpected), bucket, object) } // Writer cannot be nil. if writer == nil { - return toObjectErr(errUnexpected, bucket, object) + return toObjectErr(traceError(errUnexpected), bucket, object) } // Stat the file to get file size. fi, err := fs.storage.StatFile(bucket, object) if err != nil { - return toObjectErr(err, bucket, object) + return toObjectErr(traceError(err), bucket, object) } // Reply back invalid range if the input offset and length fall out of range. if offset > fi.Size || length > fi.Size { - return InvalidRange{offset, length, fi.Size} + return traceError(InvalidRange{offset, length, fi.Size}) } // Reply if we have inputs with offset and length falling out of file size range. if offset+length > fi.Size { - return InvalidRange{offset, length, fi.Size} + return traceError(InvalidRange{offset, length, fi.Size}) } var totalLeft = length @@ -288,11 +288,11 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, offset += int64(nw) } if ew != nil { - err = ew + err = traceError(ew) break } if nr != int64(nw) { - err = io.ErrShortWrite + err = traceError(io.ErrShortWrite) break } } @@ -300,7 +300,7 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, break } if er != nil { - err = er + err = traceError(er) break } if totalLeft == 0 { @@ -315,18 +315,19 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ObjectInfo{}, (BucketNameInvalid{Bucket: bucket}) + return ObjectInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if object is valid. if !IsValidObjectName(object) { - return ObjectInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object}) + return ObjectInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } fi, err := fs.storage.StatFile(bucket, object) if err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) + return ObjectInfo{}, toObjectErr(traceError(err), bucket, object) } fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)) - if err != nil && err != errFileNotFound { + // Ignore error if the metadata file is not found, other errors must be returned. + if errorCause(err) != errFileNotFound { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -361,13 +362,13 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(object) { - return "", ObjectNameInvalid{ + return "", traceError(ObjectNameInvalid{ Bucket: bucket, Object: object, - } + }) } // No metadata is set, allocate a new one. if metadata == nil { @@ -398,7 +399,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. // For size 0 we write a 0byte file. err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte("")) if err != nil { - return "", toObjectErr(err, bucket, object) + return "", toObjectErr(traceError(err), bucket, object) } } else { // Allocate a buffer to Read() from request body @@ -418,7 +419,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. // bytes than specified in request header. if bytesWritten < size { fs.storage.DeleteFile(minioMetaBucket, tempObj) - return "", IncompleteBody{} + return "", traceError(IncompleteBody{}) } } @@ -434,7 +435,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. // Incoming payload wrong, delete the temporary object. fs.storage.DeleteFile(minioMetaBucket, tempObj) // Error return. - return "", toObjectErr(vErr, bucket, object) + return "", toObjectErr(traceError(vErr), bucket, object) } } @@ -445,14 +446,14 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. // MD5 mismatch, delete the temporary object. fs.storage.DeleteFile(minioMetaBucket, tempObj) // Returns md5 mismatch. - return "", BadDigest{md5Hex, newMD5Hex} + return "", traceError(BadDigest{md5Hex, newMD5Hex}) } } // Entire object was written to the temp location, now it's safe to rename it to the actual location. err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) if err != nil { - return "", toObjectErr(err, bucket, object) + return "", toObjectErr(traceError(err), bucket, object) } // Save additional metadata only if extended headers such as "X-Amz-Meta-" are set. @@ -476,17 +477,17 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. func (fs fsObjects) DeleteObject(bucket, object string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } err := fs.storage.DeleteFile(minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)) if err != nil && err != errFileNotFound { - return toObjectErr(err, bucket, object) + return toObjectErr(traceError(err), bucket, object) } if err = fs.storage.DeleteFile(bucket, object); err != nil { - return toObjectErr(err, bucket, object) + return toObjectErr(traceError(err), bucket, object) } return nil } @@ -517,11 +518,11 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey return } if fileInfo, err = fs.storage.StatFile(bucket, entry); err != nil { - return + return FileInfo{}, traceError(err) } fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry, fsMetaJSONFile)) - if mErr != nil && mErr != errFileNotFound { - return FileInfo{}, mErr + if errorCause(mErr) != errFileNotFound { + return FileInfo{}, traceError(mErr) } if len(fsMeta.Meta) == 0 { fsMeta.Meta = make(map[string]string) @@ -534,28 +535,28 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if bucket exists. if !isBucketExist(fs.storage, bucket) { - return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectPrefix(prefix) { - return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashSeparator { - return ListObjectsInfo{}, UnsupportedDelimiter{ + return ListObjectsInfo{}, traceError(UnsupportedDelimiter{ Delimiter: delimiter, - } + }) } // Verify if marker has prefix. if marker != "" { if !strings.HasPrefix(marker, prefix) { - return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ + return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{ Marker: marker, Prefix: prefix, - } + }) } } @@ -610,7 +611,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey // For any walk error return right away. if walkResult.err != nil { // File not found is a valid case. - if walkResult.err == errFileNotFound { + if errorCause(walkResult.err) == errFileNotFound { return ListObjectsInfo{}, nil } return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) @@ -652,12 +653,12 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey // HealObject - no-op for fs. Valid only for XL. func (fs fsObjects) HealObject(bucket, object string) error { - return NotImplemented{} + return traceError(NotImplemented{}) } // HealListObjects - list objects for healing. Valid only for XL func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { - return ListObjectsInfo{}, NotImplemented{} + return ListObjectsInfo{}, traceError(NotImplemented{}) } // HealDiskMetadata -- heal disk metadata, not supported in FS diff --git a/cmd/logger.go b/cmd/logger.go index dd0191104..d2405807b 100644 --- a/cmd/logger.go +++ b/cmd/logger.go @@ -67,9 +67,10 @@ func errorIf(err error, msg string, data ...interface{}) { fields := logrus.Fields{ "cause": err.Error(), } - if globalTrace { - fields["stack"] = "\n" + stackInfo() + if e, ok := err.(*Error); ok { + fields["stack"] = strings.Join(e.Trace(), " ") } + log.WithFields(fields).Errorf(msg, data...) } diff --git a/cmd/main.go b/cmd/main.go index b75440244..00e8157be 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -163,6 +163,9 @@ func Main() { // Enable all loggers by now. enableLoggers() + // Init the error tracing module. + initError() + // Set global quiet flag. globalQuiet = c.Bool("quiet") || c.GlobalBool("quiet") diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index c2b7fe8d8..8a8b8a5a0 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -92,6 +92,7 @@ func testObjectAPIIsUploadIDExists(obj ObjectLayer, instanceType string, t TestE } err = obj.AbortMultipartUpload(bucket, object, "abc") + err = errorCause(err) switch err.(type) { case InvalidUploadID: default: diff --git a/cmd/object-api-putobject_test.go b/cmd/object-api-putobject_test.go index 11e90aa06..c394c5610 100644 --- a/cmd/object-api-putobject_test.go +++ b/cmd/object-api-putobject_test.go @@ -152,6 +152,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl for i, testCase := range testCases { actualMd5Hex, actualErr := obj.PutObject(testCase.bucketName, testCase.objName, testCase.intputDataSize, bytes.NewReader(testCase.inputData), testCase.inputMeta) + actualErr = errorCause(actualErr) if actualErr != nil && testCase.expectedError == nil { t.Errorf("Test %d: %s: Expected to pass, but failed with: error %s.", i+1, instanceType, actualErr.Error()) } @@ -159,7 +160,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl t.Errorf("Test %d: %s: Expected to fail with error \"%s\", but passed instead.", i+1, instanceType, testCase.expectedError.Error()) } // Failed as expected, but does it fail for the expected reason. - if actualErr != nil && testCase.expectedError != actualErr { + if actualErr != nil && actualErr != testCase.expectedError { t.Errorf("Test %d: %s: Expected to fail with error \"%s\", but instead failed with error \"%s\" instead.", i+1, instanceType, testCase.expectedError.Error(), actualErr.Error()) } // Test passes as expected, but the output values are verified for correctness here. diff --git a/cmd/object-common.go b/cmd/object-common.go index e3deffa88..9f0f35382 100644 --- a/cmd/object-common.go +++ b/cmd/object-common.go @@ -35,6 +35,7 @@ const ( // isErrIgnored should we ignore this error?, takes a list of errors which can be ignored. func isErrIgnored(err error, ignoredErrs []error) bool { + err = errorCause(err) for _, ignoredErr := range ignoredErrs { if ignoredErr == err { return true @@ -220,7 +221,7 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error { if err == errFileNotFound { return nil } else if err != nil { // For any other errors fail. - return err + return traceError(err) } // else on success.. // Recurse and delete all other entries. diff --git a/cmd/object-errors.go b/cmd/object-errors.go index a080a52e0..e9ef85753 100644 --- a/cmd/object-errors.go +++ b/cmd/object-errors.go @@ -26,48 +26,57 @@ import ( // handle all cases where we have known types of errors returned by // underlying storage layer. func toObjectErr(err error, params ...string) error { + e, ok := err.(*Error) + if ok { + err = e.e + } + switch err { case errVolumeNotFound: if len(params) >= 1 { - return BucketNotFound{Bucket: params[0]} + err = BucketNotFound{Bucket: params[0]} } case errVolumeNotEmpty: if len(params) >= 1 { - return BucketNotEmpty{Bucket: params[0]} + err = BucketNotEmpty{Bucket: params[0]} } case errVolumeExists: if len(params) >= 1 { - return BucketExists{Bucket: params[0]} + err = BucketExists{Bucket: params[0]} } case errDiskFull: - return StorageFull{} + err = StorageFull{} case errIsNotRegular, errFileAccessDenied: if len(params) >= 2 { - return ObjectExistsAsDirectory{ + err = ObjectExistsAsDirectory{ Bucket: params[0], Object: params[1], } } case errFileNotFound: if len(params) >= 2 { - return ObjectNotFound{ + err = ObjectNotFound{ Bucket: params[0], Object: params[1], } } case errFileNameTooLong: if len(params) >= 2 { - return ObjectNameInvalid{ + err = ObjectNameInvalid{ Bucket: params[0], Object: params[1], } } case errXLReadQuorum: - return InsufficientReadQuorum{} + err = InsufficientReadQuorum{} case errXLWriteQuorum: - return InsufficientWriteQuorum{} + err = InsufficientWriteQuorum{} case io.ErrUnexpectedEOF, io.ErrShortWrite: - return IncompleteBody{} + err = IncompleteBody{} + } + if ok { + e.e = err + return e } return err } diff --git a/cmd/object-multipart-common.go b/cmd/object-multipart-common.go index 482275f33..689a21dae 100644 --- a/cmd/object-multipart-common.go +++ b/cmd/object-multipart-common.go @@ -72,12 +72,12 @@ func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV // Reads entire `uploads.json`. buf, err := disk.ReadAll(minioMetaBucket, uploadJSONPath) if err != nil { - return uploadsV1{}, err + return uploadsV1{}, traceError(err) } // Decode `uploads.json`. if err = json.Unmarshal(buf, &uploadIDs); err != nil { - return uploadsV1{}, err + return uploadsV1{}, traceError(err) } // Success. @@ -103,7 +103,7 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora // Cleanup uploadID for all disks. for index, disk := range storageDisks { if disk == nil { - errs[index] = errDiskNotFound + errs[index] = traceError(errDiskNotFound) continue } wg.Add(1) diff --git a/cmd/object-utils.go b/cmd/object-utils.go index 420099a91..36e71e0c3 100644 --- a/cmd/object-utils.go +++ b/cmd/object-utils.go @@ -148,7 +148,7 @@ func completeMultipartMD5(parts ...completePart) (string, error) { for _, part := range parts { md5Bytes, err := hex.DecodeString(part.ETag) if err != nil { - return "", err + return "", traceError(err) } finalMD5Bytes = append(finalMD5Bytes, md5Bytes...) } diff --git a/cmd/object_api_suite_test.go b/cmd/object_api_suite_test.go index a82bf7734..0b78f27a3 100644 --- a/cmd/object_api_suite_test.go +++ b/cmd/object_api_suite_test.go @@ -707,6 +707,7 @@ func testNonExistantObjectInBucket(obj ObjectLayer, instanceType string, c TestE if err == nil { c.Fatalf("%s: Expected error but found nil", instanceType) } + err = errorCause(err) switch err := err.(type) { case ObjectNotFound: if err.Error() != "Object not found: bucket#dir1" { @@ -740,6 +741,7 @@ func testGetDirectoryReturnsObjectNotFound(obj ObjectLayer, instanceType string, } _, err = obj.GetObjectInfo("bucket", "dir1") + err = errorCause(err) switch err := err.(type) { case ObjectNotFound: if err.Bucket != "bucket" { @@ -755,6 +757,7 @@ func testGetDirectoryReturnsObjectNotFound(obj ObjectLayer, instanceType string, } _, err = obj.GetObjectInfo("bucket", "dir1/") + err = errorCause(err) switch err := err.(type) { case ObjectNameInvalid: if err.Bucket != "bucket" { diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index d3ca940ad..83b436b94 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -148,7 +148,7 @@ func listDirFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc { break } // Return error at the end. - return nil, false, err + return nil, false, traceError(err) } return listDir } @@ -173,7 +173,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo if err != nil { select { case <-endWalkCh: - return errWalkAbort + return traceError(errWalkAbort) case resultCh <- treeWalkResult{err: err}: return err } @@ -235,7 +235,7 @@ func doTreeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bo isEOF := ((i == len(entries)-1) && isEnd) select { case <-endWalkCh: - return errWalkAbort + return traceError(errWalkAbort) case resultCh <- treeWalkResult{entry: pathJoin(prefixDir, entry), end: isEOF}: } } diff --git a/cmd/tree-walk_test.go b/cmd/tree-walk_test.go index b4e6064df..372a1ebb8 100644 --- a/cmd/tree-walk_test.go +++ b/cmd/tree-walk_test.go @@ -337,7 +337,7 @@ func TestListDir(t *testing.T) { } // None of the disks are available, should get errDiskNotFound. _, _, err = listDir(volume, "", "") - if err != errDiskNotFound { + if errorCause(err) != errDiskNotFound { t.Error("expected errDiskNotFound error.") } } diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 55ee713ea..e3f767ec7 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -28,7 +28,7 @@ import ( func (xl xlObjects) MakeBucket(bucket string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -47,7 +47,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { // Make a volume entry on all underlying storage disks. for index, disk := range xl.storageDisks { if disk == nil { - dErrs[index] = errDiskNotFound + dErrs[index] = traceError(errDiskNotFound) continue } wg.Add(1) @@ -56,7 +56,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { defer wg.Done() err := disk.MakeVol(bucket) if err != nil { - dErrs[index] = err + dErrs[index] = traceError(err) } }(index, disk) } @@ -68,7 +68,7 @@ func (xl xlObjects) MakeBucket(bucket string) error { if !isDiskQuorum(dErrs, xl.writeQuorum) { // Purge successfully created buckets if we don't have writeQuorum. xl.undoMakeBucket(bucket) - return toObjectErr(errXLWriteQuorum, bucket) + return toObjectErr(traceError(errXLWriteQuorum), bucket) } // Verify we have any other errors which should undo make bucket. @@ -146,6 +146,7 @@ func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err } return bucketInfo, nil } + err = traceError(err) // For any reason disk went offline continue and pick the next one. if isErrIgnored(err, bucketMetadataOpIgnoredErrs) { continue @@ -163,7 +164,6 @@ func (xl xlObjects) isBucketExist(bucket string) bool { if err == errVolumeNotFound { return false } - errorIf(err, "Stat failed on bucket "+bucket+".") return false } return true @@ -265,7 +265,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Remove a volume entry on all underlying storage disks. for index, disk := range xl.storageDisks { if disk == nil { - dErrs[index] = errDiskNotFound + dErrs[index] = traceError(errDiskNotFound) continue } wg.Add(1) @@ -275,12 +275,15 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Attempt to delete bucket. err := disk.DeleteVol(bucket) if err != nil { - dErrs[index] = err + dErrs[index] = traceError(err) return } // Cleanup all the previously incomplete multiparts. err = cleanupDir(disk, path.Join(minioMetaBucket, mpartMetaPrefix), bucket) - if err != nil && err != errVolumeNotFound { + if err != nil { + if errorCause(err) == errVolumeNotFound { + return + } dErrs[index] = err } }(index, disk) @@ -291,7 +294,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error { if !isDiskQuorum(dErrs, xl.writeQuorum) { xl.undoDeleteBucket(bucket) - return toObjectErr(errXLWriteQuorum, bucket) + return toObjectErr(traceError(errXLWriteQuorum), bucket) } if reducedErr := reduceErrs(dErrs, []error{ diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index b9fe83955..1039faf8f 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -162,28 +162,28 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if bucket exists. if !xl.isBucketExist(bucket) { - return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectPrefix(prefix) { - return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashSeparator { - return ListObjectsInfo{}, UnsupportedDelimiter{ + return ListObjectsInfo{}, traceError(UnsupportedDelimiter{ Delimiter: delimiter, - } + }) } // Verify if marker has prefix. if marker != "" { if !strings.HasPrefix(marker, prefix) { - return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ + return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{ Marker: marker, Prefix: prefix, - } + }) } } diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index 6aa4bc7a2..29e33f932 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -48,7 +48,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey // For any walk error return right away. if walkResult.err != nil { // File not found is a valid case. - if walkResult.err == errFileNotFound { + if errorCause(walkResult.err) == errFileNotFound { return ListObjectsInfo{}, nil } return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) @@ -66,8 +66,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey objInfo, err = xl.getObjectInfo(bucket, entry) if err != nil { // Ignore errFileNotFound - if err == errFileNotFound { - errorIf(err, "Unable to get object info", bucket, entry) + if errorCause(err) == errFileNotFound { continue } return ListObjectsInfo{}, toObjectErr(err, bucket, prefix) @@ -109,28 +108,28 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListObjectsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if bucket exists. if !xl.isBucketExist(bucket) { - return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + return ListObjectsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectPrefix(prefix) { - return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + return ListObjectsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashSeparator { - return ListObjectsInfo{}, UnsupportedDelimiter{ + return ListObjectsInfo{}, traceError(UnsupportedDelimiter{ Delimiter: delimiter, - } + }) } // Verify if marker has prefix. if marker != "" { if !strings.HasPrefix(marker, prefix) { - return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ + return ListObjectsInfo{}, traceError(InvalidMarkerPrefixCombination{ Marker: marker, Prefix: prefix, - } + }) } } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 8e2b04c5e..d769e2de5 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -88,7 +88,7 @@ func (e erasureInfo) GetCheckSumInfo(partName string) (ckSum checkSumInfo, err e return sum, nil } } - return checkSumInfo{}, errUnexpected + return checkSumInfo{}, traceError(errUnexpected) } // statInfo - carries stat information of the object. @@ -188,7 +188,7 @@ func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset in partOffset -= part.Size } // Offset beyond the size of the object return InvalidRange. - return 0, 0, InvalidRange{} + return 0, 0, traceError(InvalidRange{}) } // pickValidXLMeta - picks one valid xlMeta content and returns from a @@ -239,7 +239,7 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err // deleteXLMetadata - deletes `xl.json` on a single disk. func deleteXLMetdata(disk StorageAPI, bucket, prefix string) error { jsonFile := path.Join(prefix, xlMetaJSONFile) - return disk.DeleteFile(bucket, jsonFile) + return traceError(disk.DeleteFile(bucket, jsonFile)) } // writeXLMetadata - writes `xl.json` to a single disk. @@ -249,10 +249,10 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er // Marshal json. metadataBytes, err := json.Marshal(&xlMeta) if err != nil { - return err + return traceError(err) } // Persist marshalled data. - return disk.AppendFile(bucket, jsonFile, metadataBytes) + return traceError(disk.AppendFile(bucket, jsonFile, metadataBytes)) } // deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs. @@ -284,7 +284,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] // Start writing `xl.json` to all disks in parallel. for index, disk := range disks { if disk == nil { - mErrs[index] = errDiskNotFound + mErrs[index] = traceError(errDiskNotFound) continue } wg.Add(1) @@ -310,7 +310,7 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas [] if !isDiskQuorum(mErrs, quorum) { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } return reduceErrs(mErrs, []error{ @@ -328,7 +328,7 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet // Start writing `xl.json` to all disks in parallel. for index, disk := range disks { if disk == nil { - mErrs[index] = errDiskNotFound + mErrs[index] = traceError(errDiskNotFound) continue } wg.Add(1) @@ -354,7 +354,7 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet if !isDiskQuorum(mErrs, writeQuorum) { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, bucket, prefix, mErrs) - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } return reduceErrs(mErrs, []error{ diff --git a/cmd/xl-v1-metadata_test.go b/cmd/xl-v1-metadata_test.go index b45062320..3671f0fb4 100644 --- a/cmd/xl-v1-metadata_test.go +++ b/cmd/xl-v1-metadata_test.go @@ -136,6 +136,7 @@ func TestObjectToPartOffset(t *testing.T) { // Test them. for _, testCase := range testCases { index, offset, err := xlMeta.ObjectToPartOffset(testCase.offset) + err = errorCause(err) if err != testCase.expectedErr { t.Fatalf("%+v: expected = %s, got: %s", testCase, testCase.expectedErr, err) } diff --git a/cmd/xl-v1-multipart-common.go b/cmd/xl-v1-multipart-common.go index 7d85c2693..b637b4a25 100644 --- a/cmd/xl-v1-multipart-common.go +++ b/cmd/xl-v1-multipart-common.go @@ -43,15 +43,15 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads defer wg.Done() uploadsBytes, wErr := json.Marshal(uploadsJSON) if wErr != nil { - errs[index] = wErr + errs[index] = traceError(wErr) return } if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes); wErr != nil { - errs[index] = wErr + errs[index] = traceError(wErr) return } if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil { - errs[index] = wErr + errs[index] = traceError(wErr) return } }(index, disk) @@ -82,7 +82,7 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads }(index, disk) } wg.Wait() - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } return nil } @@ -117,7 +117,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t // Reads `uploads.json` and returns error. uploadsJSON, err := xl.readUploadsJSON(bucket, object) if err != nil { - if err != errFileNotFound { + if errorCause(err) != errFileNotFound { return err } // Set uploads format to `xl` otherwise. @@ -129,7 +129,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t // Update `uploads.json` on all disks. for index, disk := range xl.storageDisks { if disk == nil { - errs[index] = errDiskNotFound + errs[index] = traceError(errDiskNotFound) continue } wg.Add(1) @@ -138,21 +138,21 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t defer wg.Done() uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON) if wErr != nil { - errs[index] = wErr + errs[index] = traceError(wErr) return } // Write `uploads.json` to disk. if wErr = disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes); wErr != nil { - errs[index] = wErr + errs[index] = traceError(wErr) return } wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) if wErr != nil { if dErr := disk.DeleteFile(minioMetaBucket, tmpUploadsPath); dErr != nil { - errs[index] = dErr + errs[index] = traceError(dErr) return } - errs[index] = wErr + errs[index] = traceError(wErr) return } errs[index] = nil @@ -180,7 +180,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t }(index, disk) } wg.Wait() - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } // Ignored errors list. @@ -248,7 +248,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf if err == nil { return fileInfo, nil } - + err = traceError(err) // For any reason disk was deleted or goes offline we continue to next disk. if isErrIgnored(err, objMetadataOpIgnoredErrs) { continue @@ -271,7 +271,7 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in // Rename `xl.json` to all disks in parallel. for index, disk := range disks { if disk == nil { - mErrs[index] = errDiskNotFound + mErrs[index] = traceError(errDiskNotFound) continue } wg.Add(1) @@ -284,7 +284,7 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in // Renames `xl.json` from source prefix to destination prefix. rErr := disk.RenameFile(minioMetaBucket, srcJSONFile, minioMetaBucket, dstJSONFile) if rErr != nil { - mErrs[index] = rErr + mErrs[index] = traceError(rErr) return } mErrs[index] = nil @@ -297,7 +297,7 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, quorum in if !isDiskQuorum(mErrs, quorum) { // Delete all `xl.json` successfully renamed. deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs) - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } // List of ignored errors. diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 2dc3a6770..7b21debe4 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -214,42 +214,42 @@ func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListMultipartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } if !xl.isBucketExist(bucket) { - return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket} + return ListMultipartsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectPrefix(prefix) { - return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + return ListMultipartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashSeparator { - return ListMultipartsInfo{}, UnsupportedDelimiter{ + return ListMultipartsInfo{}, traceError(UnsupportedDelimiter{ Delimiter: delimiter, - } + }) } // Verify if marker has prefix. if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { - return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ + return ListMultipartsInfo{}, traceError(InvalidMarkerPrefixCombination{ Marker: keyMarker, Prefix: prefix, - } + }) } if uploadIDMarker != "" { if strings.HasSuffix(keyMarker, slashSeparator) { - return result, InvalidUploadIDKeyCombination{ + return result, traceError(InvalidUploadIDKeyCombination{ UploadIDMarker: uploadIDMarker, KeyMarker: keyMarker, - } + }) } id, err := uuid.Parse(uploadIDMarker) if err != nil { - return result, err + return result, traceError(err) } if id.IsZero() { - return result, MalformedUploadID{ + return result, traceError(MalformedUploadID{ UploadID: uploadIDMarker, - } + }) } } return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) @@ -314,15 +314,15 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { // Verify if bucket name is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !xl.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } // Verify if object name is valid. if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} + return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // No metadata is set, allocate a new one. if meta == nil { @@ -339,14 +339,14 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !xl.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} + return "", traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } var partsMetadata []xlMetaV1 @@ -361,14 +361,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) - return "", InvalidUploadID{UploadID: uploadID} + return "", traceError(InvalidUploadID{UploadID: uploadID}) } // Read metadata associated with the object from all disks. partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) - return "", toObjectErr(errXLWriteQuorum, bucket, object) + return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object) } nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID) @@ -409,7 +409,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. if sizeWritten < size { - return "", IncompleteBody{} + return "", traceError(IncompleteBody{}) } // For size == -1, perhaps client is sending in chunked encoding @@ -435,7 +435,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // MD5 mismatch, delete the temporary object. xl.deleteObject(minioMetaBucket, tmpPartPath) // Returns md5 mismatch. - return "", BadDigest{md5Hex, newMD5Hex} + return "", traceError(BadDigest{md5Hex, newMD5Hex}) } } @@ -448,7 +448,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Validate again if upload ID still exists. if !xl.isUploadIDExists(bucket, object, uploadID) { - return "", InvalidUploadID{UploadID: uploadID} + return "", traceError(InvalidUploadID{UploadID: uploadID}) } // Rename temporary part file to its final location. @@ -461,7 +461,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Read metadata again because it might be updated with parallel upload of another part. partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { - return "", toObjectErr(errXLWriteQuorum, bucket, object) + return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object) } // Get current highest version based on re-read partsMetadata. @@ -578,14 +578,14 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + return ListPartsInfo{}, traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !xl.isBucketExist(bucket) { - return ListPartsInfo{}, BucketNotFound{Bucket: bucket} + return ListPartsInfo{}, traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -597,7 +597,7 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !xl.isUploadIDExists(bucket, object, uploadID) { - return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID}) } result, err := xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) return result, err @@ -612,17 +612,17 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify whether the bucket exists. if !xl.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return "", ObjectNameInvalid{ + return "", traceError(ObjectNameInvalid{ Bucket: bucket, Object: object, - } + }) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -636,7 +636,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !xl.isUploadIDExists(bucket, object, uploadID) { - return "", InvalidUploadID{UploadID: uploadID} + return "", traceError(InvalidUploadID{UploadID: uploadID}) } // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := completeMultipartMD5(parts...) @@ -650,7 +650,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) // Do we have writeQuorum?. if !isDiskQuorum(errs, xl.writeQuorum) { - return "", toObjectErr(errXLWriteQuorum, bucket, object) + return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object) } onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) @@ -678,21 +678,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload partIdx := currentXLMeta.ObjectPartIndex(part.PartNumber) // All parts should have same part number. if partIdx == -1 { - return "", InvalidPart{} + return "", traceError(InvalidPart{}) } // All parts should have same ETag as previously generated. if currentXLMeta.Parts[partIdx].ETag != part.ETag { - return "", BadDigest{} + return "", traceError(BadDigest{}) } // All parts except the last part has to be atleast 5MB. if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) { - return "", PartTooSmall{ + return "", traceError(PartTooSmall{ PartNumber: part.PartNumber, PartSize: currentXLMeta.Parts[partIdx].Size, PartETag: part.ETag, - } + }) } // Last part could have been uploaded as 0bytes, do not need @@ -716,7 +716,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Check if an object is present as one of the parent dir. if xl.parentDirIsObject(bucket, path.Dir(object)) { - return "", toObjectErr(errFileAccessDenied, bucket, object) + return "", toObjectErr(traceError(errFileAccessDenied), bucket, object) } // Save the final object size and modtime. @@ -893,13 +893,13 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } if !xl.isBucketExist(bucket) { - return BucketNotFound{Bucket: bucket} + return traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -911,7 +911,7 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID) if !xl.isUploadIDExists(bucket, object, uploadID) { - return InvalidUploadID{UploadID: uploadID} + return traceError(InvalidUploadID{UploadID: uploadID}) } err := xl.abortMultipartUpload(bucket, object, uploadID) return err diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index d1e9bceb3..a04ee3271 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -42,19 +42,19 @@ import ( func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if object is valid. if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // Start offset and length cannot be negative. if startOffset < 0 || length < 0 { - return toObjectErr(errUnexpected, bucket, object) + return traceError(errUnexpected) } // Writer cannot be nil. if writer == nil { - return toObjectErr(errUnexpected, bucket, object) + return traceError(errUnexpected) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -69,7 +69,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have read quorum? if !isDiskQuorum(errs, xl.readQuorum) { - return toObjectErr(errXLReadQuorum, bucket, object) + return traceError(InsufficientReadQuorum{}, errs...) } if reducedErr := reduceErrs(errs, []error{ @@ -94,24 +94,24 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Reply back invalid range if the input offset and length fall out of range. if startOffset > xlMeta.Stat.Size || length > xlMeta.Stat.Size { - return InvalidRange{startOffset, length, xlMeta.Stat.Size} + return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size}) } // Reply if we have inputs with offset and length. if startOffset+length > xlMeta.Stat.Size { - return InvalidRange{startOffset, length, xlMeta.Stat.Size} + return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size}) } // Get start part index and offset. partIndex, partOffset, err := xlMeta.ObjectToPartOffset(startOffset) if err != nil { - return toObjectErr(err, bucket, object) + return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size}) } // Get last part index to read given length. lastPartIndex, _, err := xlMeta.ObjectToPartOffset(startOffset + length - 1) if err != nil { - return toObjectErr(err, bucket, object) + return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size}) } // Save the writer. @@ -125,17 +125,17 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i if err == nil { // Cache hit. // Advance the buffer to offset as if it was read. if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset. - return err + return traceError(err) } // Write the requested length. if _, err = io.CopyN(writer, cachedBuffer, length); err != nil { - return err + return traceError(err) } return nil } // Cache miss. // For unknown error, return and error out. if err != objcache.ErrKeyNotFoundInCache { - return err + return traceError(err) } // Cache has not been found, fill the cache. // Cache is only set if whole object is being read. @@ -152,7 +152,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Ignore error if cache is full, proceed to write the object. if err != nil && err != objcache.ErrCacheFull { // For any other error return here. - return toObjectErr(err, bucket, object) + return toObjectErr(traceError(err), bucket, object) } } } @@ -223,12 +223,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i func (xl xlObjects) HealObject(bucket, object string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } // Verify if object is valid. if !IsValidObjectName(object) { // FIXME: return Invalid prefix. - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -275,13 +275,13 @@ func (xl xlObjects) HealObject(bucket, object string) error { err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name)) if err != nil { - return err + return traceError(err) } } // Delete xl.json file. err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile)) if err != nil { - return err + return traceError(err) } } @@ -343,7 +343,7 @@ func (xl xlObjects) HealObject(bucket, object string) error { } err := disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object)) if err != nil { - return err + return traceError(err) } } return nil @@ -447,7 +447,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, defer wg.Done() err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry) if err != nil && err != errFileNotFound { - errs[index] = err + errs[index] = traceError(err) } }(index, disk) } @@ -460,7 +460,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, if !isDiskQuorum(errs, quorum) { // Undo all the partial rename operations. undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } // Return on first error, also undo any partially successful rename operations. return reduceErrs(errs, []error{ @@ -495,17 +495,17 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (md5Sum string, err error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} + return "", traceError(BucketNameInvalid{Bucket: bucket}) } // Verify bucket exists. if !xl.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} + return "", traceError(BucketNotFound{Bucket: bucket}) } if !IsValidObjectName(object) { - return "", ObjectNameInvalid{ + return "", traceError(ObjectNameInvalid{ Bucket: bucket, Object: object, - } + }) } // No metadata is set, allocate a new one. if metadata == nil { @@ -538,7 +538,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Ignore error if cache is full, proceed to write the object. if err != nil && err != objcache.ErrCacheFull { // For any other error return here. - return "", toObjectErr(err, bucket, object) + return "", toObjectErr(traceError(err), bucket, object) } } else { mw = md5Writer @@ -636,7 +636,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. if xl.parentDirIsObject(bucket, path.Dir(object)) { // Parent (in the namespace) is an object, delete temporary object. xl.deleteObject(minioMetaTmpBucket, tempObj) - return "", toObjectErr(errFileAccessDenied, bucket, object) + return "", toObjectErr(traceError(errFileAccessDenied), bucket, object) } // Rename if an object already exists to temporary location. @@ -706,14 +706,14 @@ func (xl xlObjects) deleteObject(bucket, object string) error { for index, disk := range xl.storageDisks { if disk == nil { - dErrs[index] = errDiskNotFound + dErrs[index] = traceError(errDiskNotFound) continue } wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() err := cleanupDir(disk, bucket, object) - if err != nil && err != errFileNotFound { + if err != nil && errorCause(err) != errVolumeNotFound { dErrs[index] = err } }(index, disk) @@ -725,7 +725,7 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Do we have write quorum? if !isDiskQuorum(dErrs, xl.writeQuorum) { // Return errXLWriteQuorum if errors were more than allowed write quorum. - return errXLWriteQuorum + return traceError(errXLWriteQuorum) } return nil @@ -737,10 +737,10 @@ func (xl xlObjects) deleteObject(bucket, object string) error { func (xl xlObjects) DeleteObject(bucket, object string) (err error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} + return traceError(BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) } // generates random string on setting MINIO_DEBUG=lock, else returns empty string. @@ -752,7 +752,7 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { // Validate object exists. if !xl.isObject(bucket, object) { - return ObjectNotFound{bucket, object} + return traceError(ObjectNotFound{bucket, object}) } // else proceed to delete the object. // Delete the object on all disks. diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index ca712d4ca..0b3fb7c52 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -95,6 +95,7 @@ func TestXLDeleteObjectBasic(t *testing.T) { } for i, test := range testCases { actualErr := xl.DeleteObject(test.bucket, test.object) + actualErr = errorCause(actualErr) if test.expectedErr != nil && actualErr != test.expectedErr { t.Errorf("Test %d: Expected to fail with %s, but failed with %s", i+1, test.expectedErr, actualErr) } @@ -146,6 +147,7 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) { xl.storageDisks[7] = nil xl.storageDisks[8] = nil err = obj.DeleteObject(bucket, object) + err = errorCause(err) if err != toObjectErr(errXLWriteQuorum, bucket, object) { t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) } @@ -196,6 +198,7 @@ func TestGetObjectNoQuorum(t *testing.T) { } // Fetch object from store. err = xl.GetObject(bucket, object, 0, int64(len("abcd")), ioutil.Discard) + err = errorCause(err) if err != toObjectErr(errXLReadQuorum, bucket, object) { t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) } @@ -246,6 +249,7 @@ func TestPutObjectNoQuorum(t *testing.T) { } // Upload new content to same object "object" _, err = obj.PutObject(bucket, object, int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil) + err = errorCause(err) if err != toObjectErr(errXLWriteQuorum, bucket, object) { t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err) } diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 66c5ac4a9..35cc58c61 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -32,6 +32,7 @@ import ( func reduceErrs(errs []error, ignoredErrs []error) error { errorCounts := make(map[error]int) + errs = errorsCause(errs) for _, err := range errs { if isErrIgnored(err, ignoredErrs) { continue @@ -46,13 +47,14 @@ func reduceErrs(errs []error, ignoredErrs []error) error { errMax = err } } - return errMax + return traceError(errMax, errs...) } // Validates if we have quorum based on the errors related to disk only. // Returns 'true' if we have quorum, 'false' if we don't. func isDiskQuorum(errs []error, minQuorumCount int) bool { var count int + errs = errorsCause(errs) for _, err := range errs { switch err { case errDiskNotFound, errFaultyDisk, errDiskAccessDenied: @@ -60,6 +62,7 @@ func isDiskQuorum(errs []error, minQuorumCount int) bool { } count++ } + return count >= minQuorumCount } @@ -101,12 +104,12 @@ func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, // Reads entire `xl.json`. buf, err := disk.ReadAll(bucket, path.Join(object, xlMetaJSONFile)) if err != nil { - return xlMetaV1{}, err + return xlMetaV1{}, traceError(err) } // Unmarshal xl metadata. if err = json.Unmarshal(buf, &xlMeta); err != nil { - return xlMetaV1{}, err + return xlMetaV1{}, traceError(err) } // Return structured `xl.json`. diff --git a/cmd/xl-v1-utils_test.go b/cmd/xl-v1-utils_test.go index 85af8d0d9..209a23040 100644 --- a/cmd/xl-v1-utils_test.go +++ b/cmd/xl-v1-utils_test.go @@ -55,7 +55,7 @@ func TestReduceErrs(t *testing.T) { // Validates list of all the testcases for returning valid errors. for i, testCase := range testCases { gotErr := reduceErrs(testCase.errs, testCase.ignoredErrs) - if testCase.err != gotErr { + if errorCause(gotErr) != testCase.err { t.Errorf("Test %d : expected %s, got %s", i+1, testCase.err, gotErr) } }