diff --git a/fs.go b/fs.go index ba6f90bd4..de977268c 100644 --- a/fs.go +++ b/fs.go @@ -645,7 +645,10 @@ func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) { // If its a directory its not a regular file. if st.Mode().IsDir() { - log.Debugf("File is %s", errIsNotRegular) + log.WithFields(logrus.Fields{ + "diskPath": s.diskPath, + "filePath": filePath, + }).Debugf("File is %s.", errIsNotRegular) return FileInfo{}, errFileNotFound } return FileInfo{ diff --git a/network-fs.go b/network-fs.go index 2c57e1d6e..5c0fff73c 100644 --- a/network-fs.go +++ b/network-fs.go @@ -27,6 +27,8 @@ import ( "strconv" "strings" "time" + + "github.com/Sirupsen/logrus" ) type networkFS struct { @@ -78,7 +80,9 @@ func toStorageErr(err error) error { func newNetworkFS(networkPath string) (StorageAPI, error) { // Input validation. if networkPath == "" && strings.LastIndex(networkPath, ":") != -1 { - log.Debugf("Network path %s is malformed", networkPath) + log.WithFields(logrus.Fields{ + "networkPath": networkPath, + }).Debugf("Network path is malformed, should be of form ::") return nil, errInvalidArgument } @@ -88,7 +92,10 @@ func newNetworkFS(networkPath string) (StorageAPI, error) { // Dial minio rpc storage http path. rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, storageRPCPath) if err != nil { - log.Debugf("RPC HTTP dial failed for %s at path %s", netAddr, storageRPCPath) + log.WithFields(logrus.Fields{ + "netAddr": netAddr, + "storageRPCPath": storageRPCPath, + }).Debugf("RPC HTTP dial failed with %s", err) return nil, err } @@ -118,7 +125,9 @@ func newNetworkFS(networkPath string) (StorageAPI, error) { func (n networkFS) MakeVol(volume string) error { reply := GenericReply{} if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil { - log.Debugf("Storage.MakeVolHandler returned an error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + }).Debugf("Storage.MakeVolHandler returned an error %s", err) return toStorageErr(err) } return nil @@ -138,7 +147,9 @@ func (n networkFS) ListVols() (vols []VolInfo, err error) { // StatVol - get current Stat volume info. func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { - log.Debugf("Storage.StatVolHandler returned an error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + }).Debugf("Storage.StatVolHandler returned an error %s", err) return VolInfo{}, toStorageErr(err) } return volInfo, nil @@ -148,7 +159,9 @@ func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { func (n networkFS) DeleteVol(volume string) error { reply := GenericReply{} if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil { - log.Debugf("Storage.DeleteVolHandler returned an error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + }).Debugf("Storage.DeleteVolHandler returned an error %s", err) return toStorageErr(err) } return nil @@ -168,7 +181,10 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, go func() { resp, err := n.httpClient.Post(writeURL.String(), contentType, readCloser) if err != nil { - log.Debugf("CreateFile http POST failed to upload the data with error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("CreateFile http POST failed to upload the data with error %s", err) readCloser.CloseWithError(err) return } @@ -194,7 +210,10 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) Vol: volume, Path: path, }, &fileInfo); err != nil { - log.Debugf("Storage.StatFileHandler failed with %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Storage.StatFileHandler failed with %s", err) return FileInfo{}, toStorageErr(err) } return fileInfo, nil @@ -211,7 +230,10 @@ func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io readURL.RawQuery = readQuery.Encode() resp, err := n.httpClient.Get(readURL.String()) if err != nil { - log.Debugf("ReadFile http Get failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReadFile http Get failed with error %s", err) return nil, err } if resp != nil { @@ -235,7 +257,13 @@ func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, coun Recursive: recursive, Count: count, }, &listFilesReply); err != nil { - log.Debugf("Storage.ListFilesHandlers failed with %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "prefix": prefix, + "marker": marker, + "recursive": recursive, + "count": count, + }).Debugf("Storage.ListFilesHandlers failed with %s", err) return nil, true, toStorageErr(err) } // Return successfully unmarshalled results. @@ -249,7 +277,10 @@ func (n networkFS) DeleteFile(volume, path string) (err error) { Vol: volume, Path: path, }, &reply); err != nil { - log.Debugf("Storage.DeleteFileHandler failed with %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Storage.DeleteFileHandler failed with %s", err) return toStorageErr(err) } return nil diff --git a/object-api-getobjectinfo_test.go b/object-api-getobjectinfo_test.go index 784c6170a..82a473e89 100644 --- a/object-api-getobjectinfo_test.go +++ b/object-api-getobjectinfo_test.go @@ -86,7 +86,7 @@ func TestGetObjectInfo(t *testing.T) { {"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false}, {"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false}, // Test case with existing bucket but object name set to a directory (Test number 13). - {"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectExistsAsPrefix{Bucket: "test-getobjectinfo", Object: "Asia"}, false}, + {"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia"}, false}, // Valid case with existing object (Test number 14). {"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true}, } diff --git a/storage-errors.go b/storage-errors.go index 068b2ddf5..e17bed43d 100644 --- a/storage-errors.go +++ b/storage-errors.go @@ -48,3 +48,6 @@ var errReadQuorum = errors.New("I/O error. did not meet read quorum.") // errWriteQuorum - did not meet write quorum. var errWriteQuorum = errors.New("I/O error. did not meet write quorum.") + +// errDataCorrupt - err data corrupt. +var errDataCorrupt = errors.New("data likely corrupted, all blocks are zero in length") diff --git a/storage-rpc-server.go b/storage-rpc-server.go index 60bac3121..3329924be 100644 --- a/storage-rpc-server.go +++ b/storage-rpc-server.go @@ -6,6 +6,7 @@ import ( "net/rpc" "strconv" + "github.com/Sirupsen/logrus" router "github.com/gorilla/mux" ) @@ -21,7 +22,9 @@ type storageServer struct { func (s *storageServer) MakeVolHandler(arg *string, reply *GenericReply) error { err := s.storage.MakeVol(*arg) if err != nil { - log.Debugf("MakeVol failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": *arg, + }).Debugf("MakeVol failed with error %s", err) return err } return nil @@ -42,7 +45,9 @@ func (s *storageServer) ListVolsHandler(arg *string, reply *ListVolsReply) error func (s *storageServer) StatVolHandler(arg *string, reply *VolInfo) error { volInfo, err := s.storage.StatVol(*arg) if err != nil { - log.Debugf("StatVol failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": *arg, + }).Debugf("StatVol failed with error %s", err) return err } *reply = volInfo @@ -54,7 +59,9 @@ func (s *storageServer) StatVolHandler(arg *string, reply *VolInfo) error { func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error { err := s.storage.DeleteVol(*arg) if err != nil { - log.Debugf("DeleteVol failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": *arg, + }).Debugf("DeleteVol failed with error %s", err) return err } return nil @@ -66,7 +73,13 @@ func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesReply) error { files, eof, err := s.storage.ListFiles(arg.Vol, arg.Prefix, arg.Marker, arg.Recursive, arg.Count) if err != nil { - log.Debugf("ListFiles failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": arg.Vol, + "prefix": arg.Prefix, + "marker": arg.Marker, + "recursive": arg.Recursive, + "count": arg.Count, + }).Debugf("ListFiles failed with error %s", err) return err } @@ -82,7 +95,10 @@ func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesRep func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) error { fileInfo, err := s.storage.StatFile(arg.Vol, arg.Path) if err != nil { - log.Debugf("StatFile failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": arg.Vol, + "path": arg.Path, + }).Debugf("StatFile failed with error %s", err) return err } *reply = fileInfo @@ -93,7 +109,10 @@ func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) erro func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error { err := s.storage.DeleteFile(arg.Vol, arg.Path) if err != nil { - log.Debugf("DeleteFile failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": arg.Vol, + "path": arg.Path, + }).Debugf("DeleteFile failed with error %s", err) return err } return nil @@ -120,7 +139,10 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) { path := vars["path"] writeCloser, err := stServer.storage.CreateFile(volume, path) if err != nil { - log.Debugf("CreateFile failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("CreateFile failed with error %s", err) httpErr := http.StatusInternalServerError if err == errVolumeNotFound { httpErr = http.StatusNotFound @@ -132,7 +154,10 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) { } reader := r.Body if _, err = io.Copy(writeCloser, reader); err != nil { - log.Debugf("Copying incoming reader to writer failed %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Copying incoming reader to writer failed %s", err) safeCloseAndRemove(writeCloser) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -147,13 +172,19 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) { path := vars["path"] offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64) if err != nil { - log.Debugf("Parse offset failure with error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Parse offset failure with error %s", err) http.Error(w, err.Error(), http.StatusBadRequest) return } readCloser, err := stServer.storage.ReadFile(volume, path, offset) if err != nil { - log.Debugf("ReadFile failed with error %s", err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReadFile failed with error %s", err) httpErr := http.StatusBadRequest if err == errVolumeNotFound { httpErr = http.StatusNotFound diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index e5e759e05..1726b9704 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -26,6 +26,7 @@ import ( "strconv" "time" + "github.com/Sirupsen/logrus" fastSha512 "github.com/minio/minio/pkg/crypto/sha512" ) @@ -85,25 +86,39 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 { // without allocating. fileQuorumVersionMap := make(map[int]int64) - // TODO - all errors should be logged here. - // Read meta data from all disks for index, disk := range xl.storageDisks { fileQuorumVersionMap[index] = -1 metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReadFile failed with %s", err) continue } else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("JSON decoding failed with %s", err) continue } version, err := metadata.GetFileVersion() if err == errMetadataKeyNotExist { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Missing 'file.version', %s", errMetadataKeyNotExist) fileQuorumVersionMap[index] = 0 continue } if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("'file.version' decoding failed with %s", err) continue } fileQuorumVersionMap[index] = version @@ -133,6 +148,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, bwriter *b erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) writer, err := disk.CreateFile(volume, erasurePart) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("CreateFile failed with %s", err) createFileError++ // We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum @@ -152,6 +171,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, bwriter *b var metadataWriter io.WriteCloser metadataWriter, err = disk.CreateFile(volume, metadataFilePath) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("CreateFile failed with %s", err) createFileError++ // We can safely allow CreateFile errors up to diff --git a/xl-v1-healfile.go b/xl-v1-healfile.go index f1060bab0..3bff2f9cf 100644 --- a/xl-v1-healfile.go +++ b/xl-v1-healfile.go @@ -21,11 +21,14 @@ import ( "fmt" "io" slashpath "path" + + "github.com/Sirupsen/logrus" ) -func (xl XL) selfHeal(volume string, path string) error { +// doSelfHeal - heals the file at path. +func (xl XL) doHealFile(volume string, path string) error { totalBlocks := xl.DataBlocks + xl.ParityBlocks - needsSelfHeal := make([]bool, totalBlocks) + needsHeal := make([]bool, totalBlocks) var readers = make([]io.Reader, totalBlocks) var writers = make([]io.WriteCloser, totalBlocks) @@ -34,22 +37,30 @@ func (xl XL) selfHeal(volume string, path string) error { xl.lockNS(volume, path, readLock) defer xl.unlockNS(volume, path, readLock) - quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path) + quorumDisks, metadata, doHeal, err := xl.getReadableDisks(volume, path) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Get readable disks failed with %s", err) return err } - if !doSelfHeal { + if !doHeal { return nil } size, err := metadata.GetSize() if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Failed to get file size, %s", err) return err } for index, disk := range quorumDisks { if disk == nil { - needsSelfHeal[index] = true + needsHeal[index] = true continue } erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) @@ -64,26 +75,30 @@ func (xl XL) selfHeal(volume string, path string) error { } // Check if there is atleast one part that needs to be healed. - atleastOneSelfHeal := false - for _, shNeeded := range needsSelfHeal { - if shNeeded { - atleastOneSelfHeal = true + atleastOneHeal := false + for _, healNeeded := range needsHeal { + if healNeeded { + atleastOneHeal = true break } } - if !atleastOneSelfHeal { + if !atleastOneHeal { // Return if healing not needed anywhere. return nil } // create writers for parts where healing is needed. - for index, shNeeded := range needsSelfHeal { - if !shNeeded { + for index, healNeeded := range needsHeal { + if !healNeeded { continue } erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) writers[index], err = xl.storageDisks[index].CreateFile(volume, erasurePart) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("CreateFile failed with error %s", err) // Unexpected error closeAndRemoveWriters(writers...) return err @@ -107,63 +122,86 @@ func (xl XL) selfHeal(volume string, path string) error { // ReedSolomon.Verify() expects that slice is not nil even if the particular // part needs healing. enBlocks[index] = make([]byte, curBlockSize) - if needsSelfHeal[index] { + if needsHeal[index] { // Skip reading if the part needs healing. continue } - _, e := io.ReadFull(reader, enBlocks[index]) - if e != nil && e != io.ErrUnexpectedEOF { + _, err = io.ReadFull(reader, enBlocks[index]) + if err != nil && err != io.ErrUnexpectedEOF { enBlocks[index] = nil } } // Check blocks if they are all zero in length. if checkBlockSize(enBlocks) == 0 { - err = errors.New("Data likely corrupted, all blocks are zero in length.") - return err + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("%s", errDataCorrupt) + return errDataCorrupt } // Verify the blocks. - ok, e := xl.ReedSolomon.Verify(enBlocks) - if e != nil { + ok, err := xl.ReedSolomon.Verify(enBlocks) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon verify failed with %s", err) closeAndRemoveWriters(writers...) - return e + return err } // Verification failed, blocks require reconstruction. if !ok { - for index, shNeeded := range needsSelfHeal { - if shNeeded { + for index, healNeeded := range needsHeal { + if healNeeded { // Reconstructs() reconstructs the parts if the array is nil. enBlocks[index] = nil } } - e = xl.ReedSolomon.Reconstruct(enBlocks) - if e != nil { + err = xl.ReedSolomon.Reconstruct(enBlocks) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon reconstruct failed with %s", err) closeAndRemoveWriters(writers...) - return e + return err } // Verify reconstructed blocks again. - ok, e = xl.ReedSolomon.Verify(enBlocks) - if e != nil { + ok, err = xl.ReedSolomon.Verify(enBlocks) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon verify failed with %s", err) closeAndRemoveWriters(writers...) - return e + return err } if !ok { // Blocks cannot be reconstructed, corrupted data. - e = errors.New("Verification failed after reconstruction, data likely corrupted.") + err = errors.New("Verification failed after reconstruction, data likely corrupted.") + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("%s", err) closeAndRemoveWriters(writers...) - return e + return err } } - for index, shNeeded := range needsSelfHeal { - if !shNeeded { + for index, healNeeded := range needsHeal { + if !healNeeded { continue } - _, e := writers[index].Write(enBlocks[index]) - if e != nil { + _, err := writers[index].Write(enBlocks[index]) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Write failed with %s", err) closeAndRemoveWriters(writers...) - return e + return err } } totalLeft = totalLeft - erasureBlockSize @@ -171,17 +209,17 @@ func (xl XL) selfHeal(volume string, path string) error { // After successful healing Close() the writer so that the temp // files are committed to their location. - for index, shNeeded := range needsSelfHeal { - if !shNeeded { + for _, writer := range writers { + if writer == nil { continue } - writers[index].Close() + writer.Close() } // Update the quorum metadata after selfheal. - errs := xl.setPartsMetadata(volume, path, metadata, needsSelfHeal) - for index, shNeeded := range needsSelfHeal { - if shNeeded && errs[index] != nil { + errs := xl.setPartsMetadata(volume, path, metadata, needsHeal) + for index, healNeeded := range needsHeal { + if healNeeded && errs[index] != nil { return errs[index] } } diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index 54672b966..078892c91 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -21,6 +21,8 @@ import ( "fmt" "io" slashpath "path" + + "github.com/Sirupsen/logrus" ) // ReadFile - read file @@ -33,25 +35,39 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) return nil, errInvalidArgument } + // Acquire a read lock. readLock := true xl.lockNS(volume, path, readLock) quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path) xl.unlockNS(volume, path, readLock) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Get readable disks failed with %s", err) return nil, err } if doSelfHeal { - if err = xl.selfHeal(volume, path); err != nil { + if err = xl.doHealFile(volume, path); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("doHealFile failed with %s", err) return nil, err } } + // Acquire read lock again. xl.lockNS(volume, path, readLock) defer xl.unlockNS(volume, path, readLock) fileSize, err := metadata.GetSize() if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("Failed to get file size, %s", err) return nil, err } @@ -102,8 +118,11 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) // Check blocks if they are all zero in length. if checkBlockSize(enBlocks) == 0 { - err = errors.New("Data likely corrupted, all blocks are zero in length.") - pipeWriter.CloseWithError(err) + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("%s", errDataCorrupt) + pipeWriter.CloseWithError(errDataCorrupt) return } @@ -111,6 +130,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) var ok bool ok, err = xl.ReedSolomon.Verify(enBlocks) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon verify failed with %s", err) pipeWriter.CloseWithError(err) return } @@ -125,18 +148,30 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) } err = xl.ReedSolomon.Reconstruct(enBlocks) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon reconstruct failed with %s", err) pipeWriter.CloseWithError(err) return } // Verify reconstructed blocks again. ok, err = xl.ReedSolomon.Verify(enBlocks) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon verify failed with %s", err) pipeWriter.CloseWithError(err) return } if !ok { // Blocks cannot be reconstructed, corrupted data. err = errors.New("Verification failed after reconstruction, data likely corrupted.") + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("%s", err) pipeWriter.CloseWithError(err) return } @@ -145,6 +180,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) // Join the decoded blocks. err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("ReedSolomon joining decoded blocks failed with %s", err) pipeWriter.CloseWithError(err) return } diff --git a/xl-v1.go b/xl-v1.go index eff948ff8..b0184700b 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -24,6 +24,7 @@ import ( "strings" "sync" + "github.com/Sirupsen/logrus" "github.com/klauspost/reedsolomon" ) @@ -164,6 +165,9 @@ func (xl XL) MakeVol(volume string) error { // Make a volume entry on all underlying storage disks. for index, disk := range xl.storageDisks { if err := disk.MakeVol(volume); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Debugf("MakeVol failed with %s", err) // We ignore error if errVolumeExists and creating a volume again. if err == errVolumeExists { volumeExistsMap[index] = struct{}{} @@ -191,6 +195,9 @@ func (xl XL) DeleteVol(volume string) error { // Remove a volume entry on all underlying storage disks. for index, disk := range xl.storageDisks { if err := disk.DeleteVol(volume); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Debugf("DeleteVol failed with %s", err) // We ignore error if errVolumeNotFound. if err == errVolumeNotFound { volumeNotFoundMap[index] = struct{}{} @@ -260,6 +267,11 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { } else if err == errVolumeNotFound { // Count total amount of volume not found errors. volumeNotFoundErrCnt++ + } else if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + }).Debugf("StatVol failed with %s", err) + return VolInfo{}, err } } @@ -285,8 +297,15 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) { var allFileInfos []FileInfo var markerPath string for { - fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000) - if e != nil { + fileInfos, eof, err := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "leafPath": leafPath, + "markerPath": markerPath, + "recursive": false, + "count": 1000, + }).Debugf("ListFiles failed with %s", err) break } allFileInfos = append(allFileInfos, fileInfos...) @@ -318,6 +337,11 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) { disk := xl.storageDisks[0] metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": metadataFilePath, + "offset": offset, + }).Debugf("ReadFile failed with %s", err) return nil, err } // Close metadata reader. @@ -325,6 +349,11 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) { metadata, err := fileMetadataDecode(metadataReader) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": metadataFilePath, + "offset": offset, + }).Debugf("fileMetadataDecode failed with %s", err) return nil, err } return metadata, nil @@ -338,14 +367,26 @@ func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) { metadata, err := xl.extractMetadata(volume, path) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("extractMetadata failed with %s", err) return FileInfo{}, err } fileSize, err := metadata.GetSize() if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("GetSize failed with %s", err) return FileInfo{}, err } fileModTime, err := metadata.GetModTime() if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("GetModTime failed with %s", err) return FileInfo{}, err } fileInfo.Size = fileSize @@ -382,6 +423,13 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) // List files. fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "prefix": prefix, + "marker": markerPath, + "recursive": recursive, + "count": count, + }).Debugf("ListFiles failed with %s", err) return nil, true, err } @@ -401,6 +449,10 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) path := slashpath.Dir(fsFileInfo.Name) fileInfo, err = xl.extractFileInfo(volume, path) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("extractFileInfo failed with %s", err) // For a leaf directory, if err is FileNotFound then // perhaps has a missing metadata. Ignore it and let // healing finish its job it will become available soon. @@ -436,11 +488,19 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { _, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path) xl.unlockNS(volume, path, readLock) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("getReadableDisks failed with %s", err) return FileInfo{}, err } if doSelfHeal { - if err = xl.selfHeal(volume, path); err != nil { + if err = xl.doHealFile(volume, path); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("doHealFile failed with %s", err) return FileInfo{}, err } } @@ -448,10 +508,18 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { // Extract metadata. size, err := metadata.GetSize() if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("GetSize failed with %s", err) return FileInfo{}, err } modTime, err := metadata.GetModTime() if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("GetModTime failed with %s", err) return FileInfo{}, err } @@ -478,11 +546,19 @@ func (xl XL) DeleteFile(volume, path string) error { erasureFilePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) err := disk.DeleteFile(volume, erasureFilePart) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("DeleteFile failed with %s", err) return err } metadataFilePath := slashpath.Join(path, metadataFile) err = disk.DeleteFile(volume, metadataFilePath) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Debugf("DeleteFile failed with %s", err) return err } }