diff --git a/xl-v1-common.go b/xl-v1-common.go index 989484598..9c3cafa1b 100644 --- a/xl-v1-common.go +++ b/xl-v1-common.go @@ -21,70 +21,119 @@ import ( "errors" slashpath "path" "path/filepath" + + "github.com/Sirupsen/logrus" ) -// Returns slice of disks needed for ReadFile operation: -// - slice returing readable disks. -// - fileMetadata -// - bool value indicating if selfHeal is needed. -// - error if any. -func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, fileMetadata, bool, error) { - partsMetadata, errs := xl.getPartsMetadata(volume, path) - highestVersion := int64(0) - versions := make([]int64, len(xl.storageDisks)) - quorumDisks := make([]StorageAPI, len(xl.storageDisks)) - notFoundCount := 0 - // If quorum says errFileNotFound return errFileNotFound - for _, err := range errs { - if err == errFileNotFound { - notFoundCount++ +// Get the highest integer from a given integer slice. +func highestInt(intSlice []int64) (highestInteger int64) { + highestInteger = int64(0) + for _, integer := range intSlice { + if highestInteger < integer { + highestInteger = integer } } - if notFoundCount > xl.readQuorum { - return nil, fileMetadata{}, false, errFileNotFound - } + return highestInteger +} + +// Extracts file versions from partsMetadata slice and returns version slice. +func listFileVersions(partsMetadata []fileMetadata, errs []error) (versions []int64, err error) { + versions = make([]int64, len(partsMetadata)) for index, metadata := range partsMetadata { if errs[index] == nil { - version, err := metadata.GetFileVersion() + var version int64 + version, err = metadata.GetFileVersion() if err == errMetadataKeyNotExist { + log.WithFields(logrus.Fields{ + "metadata": metadata, + }).Errorf("Missing 'file.version', %s", errMetadataKeyNotExist) versions[index] = 0 continue } if err != nil { + log.WithFields(logrus.Fields{ + "metadata": metadata, + }).Errorf("'file.version' decoding failed with %s", err) // Unexpected, return error. - return nil, fileMetadata{}, false, err + return nil, err } versions[index] = version } else { versions[index] = -1 } } - quorumCount := 0 - for index, version := range versions { - if version == highestVersion { - quorumDisks[index] = xl.storageDisks[index] - quorumCount++ - } else { - quorumDisks[index] = nil - } - } - if quorumCount < xl.readQuorum { - return nil, fileMetadata{}, false, errReadQuorum - } - var metadata fileMetadata - for index, disk := range quorumDisks { - if disk == nil { - continue - } - metadata = partsMetadata[index] - break - } + return versions, nil +} + +// Returns slice of online disks needed. +// - slice returing readable disks. +// - fileMetadata +// - bool value indicating if healing is needed. +// - error if any. +func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata fileMetadata, heal bool, err error) { + partsMetadata, errs := xl.getPartsMetadata(volume, path) + notFoundCount := 0 // FIXME: take care of the situation when a disk has failed and been removed // by looking at the error returned from the fs layer. fs-layer will have // to return an error indicating that the disk is not available and should be // different from ErrNotExist. - doSelfHeal := quorumCount != len(xl.storageDisks) - return quorumDisks, metadata, doSelfHeal, nil + for _, err := range errs { + if err == errFileNotFound { + notFoundCount++ + // If we have errors with file not found greater than allowed read + // quorum we return err as errFileNotFound. + if notFoundCount > xl.readQuorum { + return nil, fileMetadata{}, false, errFileNotFound + } + } + } + highestVersion := int64(0) + onlineDisks = make([]StorageAPI, len(xl.storageDisks)) + // List all the file versions from partsMetadata list. + versions, err := listFileVersions(partsMetadata, errs) + if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("Extracting file versions failed with %s", err) + return nil, fileMetadata{}, false, err + } + + // Get highest file version. + highestVersion = highestInt(versions) + + // Pick online disks with version set to highestVersion. + onlineDiskCount := 0 + for index, version := range versions { + if version == highestVersion { + mdata = partsMetadata[index] + onlineDisks[index] = xl.storageDisks[index] + onlineDiskCount++ + } else { + onlineDisks[index] = nil + } + } + + // If online disks count is lesser than configured disks, most + // probably we need to heal the file, additionally verify if the + // count is lesser than readQuorum, if not we throw an error. + if onlineDiskCount < len(xl.storageDisks) { + // Online disks lesser than total storage disks, needs to be + // healed. unless we do not have readQuorum. + heal = true + // Verify if online disks count are lesser than readQuorum + // threshold, return an error if yes. + if onlineDiskCount < xl.readQuorum { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "onlineDiskCount": onlineDiskCount, + "readQuorumCount": xl.readQuorum, + }).Errorf("%s", errReadQuorum) + return nil, fileMetadata{}, false, errReadQuorum + } + } + return onlineDisks, mdata, heal, nil } // Get parts.json metadata as a map slice. diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index c2b38e47d..8ba6f8353 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -18,7 +18,6 @@ package main import ( "encoding/hex" - "encoding/json" "fmt" "hash" "io" @@ -56,91 +55,39 @@ func closeAndRemoveWriters(writers ...io.WriteCloser) { } } -type quorumDisk struct { - disk StorageAPI - index int -} - -// getQuorumDisks - get the current quorum disks. -func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) { - fileQuorumVersionMap := xl.getFileQuorumVersionMap(volume, path) - for diskIndex, formatVersion := range fileQuorumVersionMap { - if formatVersion > higherVersion { - higherVersion = formatVersion - quorumDisks = []quorumDisk{{ - disk: xl.storageDisks[diskIndex], - index: diskIndex, - }} - - } else if formatVersion == higherVersion { - quorumDisks = append(quorumDisks, quorumDisk{ - disk: xl.storageDisks[diskIndex], - index: diskIndex, - }) - - } - } - return quorumDisks, higherVersion -} - -func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 { - metadataFilePath := slashpath.Join(path, metadataFile) - // Set offset to 0 to read entire file. - offset := int64(0) - metadata := make(fileMetadata) - - // Allocate disk index format map - do not use maps directly - // without allocating. - fileQuorumVersionMap := make(map[int]int64) - - // Read meta data from all disks - for index, disk := range xl.storageDisks { - fileQuorumVersionMap[index] = -1 - - metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Debugf("ReadFile failed with %s", err) - continue - } else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Debugf("JSON decoding failed with %s", err) - continue - } - - version, err := metadata.GetFileVersion() - if err == errMetadataKeyNotExist { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Debugf("Missing 'file.version', %s", errMetadataKeyNotExist) - fileQuorumVersionMap[index] = 0 - continue - } - if err != nil { - log.WithFields(logrus.Fields{ - "volume": volume, - "path": path, - }).Debugf("'file.version' decoding failed with %s", err) - continue - } - fileQuorumVersionMap[index] = version - } - return fileQuorumVersionMap -} - // WriteErasure reads predefined blocks, encodes them and writes to // configured storage disks. func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) { // Release the block writer upon function return. defer wcloser.release() - // Get available quorum for existing file path. - _, higherVersion := xl.getQuorumDisks(volume, path) + // Lock right before reading from disk. + readLock := true + xl.lockNS(volume, path, readLock) + partsMetadata, errs := xl.getPartsMetadata(volume, path) + xl.unlockNS(volume, path, readLock) + + // Count errors other than fileNotFound, bigger than the allowed + // readQuorum, if yes throw an error. + metadataReadErrCount := 0 + for _, err := range errs { + if err != nil && err != errFileNotFound { + metadataReadErrCount++ + if metadataReadErrCount > xl.readQuorum { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("%s", err) + reader.CloseWithError(err) + return + } + } + } + + // List all the file versions on existing files. + versions, err := listFileVersions(partsMetadata, errs) + // Get highest file version. + higherVersion := highestInt(versions) // Increment to have next higher version. higherVersion++ @@ -156,12 +103,13 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w createFileError := 0 for index, disk := range xl.storageDisks { erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) - writer, err := disk.CreateFile(volume, erasurePart) + var writer io.WriteCloser + writer, err = disk.CreateFile(volume, erasurePart) if err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("CreateFile failed with %s", err) + }).Errorf("CreateFile failed with %s", err) createFileError++ // We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum @@ -183,7 +131,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("CreateFile failed with %s", err) + }).Errorf("CreateFile failed with %s", err) createFileError++ // We can safely allow CreateFile errors up to @@ -208,10 +156,15 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w var totalSize int64 // Saves total incoming stream size. for { // Read up to allocated block size. - n, err := io.ReadFull(reader, dataBuffer) + var n int + n, err = io.ReadFull(reader, dataBuffer) if err != nil { // Any unexpected errors, close the pipe reader with error. if err != io.ErrUnexpectedEOF && err != io.EOF { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("io.ReadFull failed with %s", err) // Remove all temp writers. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -227,6 +180,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w var dataBlocks [][]byte dataBlocks, err = xl.ReedSolomon.Split(dataBuffer[0:n]) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("Splitting data buffer into erasure data blocks failed with %s", err) // Remove all temp writers. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -236,6 +193,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w // Encode parity blocks using data blocks. err = xl.ReedSolomon.Encode(dataBlocks) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + }).Errorf("Encoding erasure data blocks failed with %s", err) // Remove all temp writers upon error. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -250,6 +211,11 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w encodedData := dataBlocks[index] _, err = writers[index].Write(encodedData) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "diskIndex": index, + }).Errorf("Writing encoded blocks failed with %s", err) // Remove all temp writers upon error. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -297,8 +263,13 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w } // Write metadata. - err := metadata.Write(metadataWriter) + err = metadata.Write(metadataWriter) if err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "diskIndex": index, + }).Errorf("Writing metadata failed with %s", err) // Remove temporary files. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -307,8 +278,9 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w } // Lock right before commit to disk. - xl.lockNS(volume, path, false) - defer xl.unlockNS(volume, path, false) + readLock = false // false means writeLock. + xl.lockNS(volume, path, readLock) + defer xl.unlockNS(volume, path, readLock) // Close all writers and metadata writers in routines. for index, writer := range writers { @@ -316,13 +288,34 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w continue } // Safely wrote, now rename to its actual location. - writer.Close() + if err = writer.Close(); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "diskIndex": index, + }).Errorf("Safely committing part failed with %s", err) + // Remove all temp writers upon error. + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) + reader.CloseWithError(err) + return + } if metadataWriters[index] == nil { continue } // Safely wrote, now rename to its actual location. - metadataWriters[index].Close() + if err = metadataWriters[index].Close(); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "diskIndex": index, + }).Errorf("Safely committing metadata failed with %s", err) + // Remove all temp writers upon error. + xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) + reader.CloseWithError(err) + return + } + } // Close the pipe reader and return. diff --git a/xl-v1-healfile.go b/xl-v1-healfile.go index 3bff2f9cf..c23aa3595 100644 --- a/xl-v1-healfile.go +++ b/xl-v1-healfile.go @@ -25,8 +25,8 @@ import ( "github.com/Sirupsen/logrus" ) -// doSelfHeal - heals the file at path. -func (xl XL) doHealFile(volume string, path string) error { +// healHeal - heals the file at path. +func (xl XL) healFile(volume string, path string) error { totalBlocks := xl.DataBlocks + xl.ParityBlocks needsHeal := make([]bool, totalBlocks) var readers = make([]io.Reader, totalBlocks) @@ -37,15 +37,15 @@ func (xl XL) doHealFile(volume string, path string) error { xl.lockNS(volume, path, readLock) defer xl.unlockNS(volume, path, readLock) - quorumDisks, metadata, doHeal, err := xl.getReadableDisks(volume, path) + onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path) if err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("Get readable disks failed with %s", err) + }).Errorf("List online disks failed with %s", err) return err } - if !doHeal { + if !heal { return nil } @@ -54,11 +54,11 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("Failed to get file size, %s", err) + }).Errorf("Failed to get file size, %s", err) return err } - for index, disk := range quorumDisks { + for index, disk := range onlineDisks { if disk == nil { needsHeal[index] = true continue @@ -98,7 +98,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("CreateFile failed with error %s", err) + }).Errorf("CreateFile failed with error %s", err) // Unexpected error closeAndRemoveWriters(writers...) return err @@ -137,7 +137,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("%s", errDataCorrupt) + }).Errorf("%s", errDataCorrupt) return errDataCorrupt } @@ -147,7 +147,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon verify failed with %s", err) + }).Errorf("ReedSolomon verify failed with %s", err) closeAndRemoveWriters(writers...) return err } @@ -165,7 +165,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon reconstruct failed with %s", err) + }).Errorf("ReedSolomon reconstruct failed with %s", err) closeAndRemoveWriters(writers...) return err } @@ -175,7 +175,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon verify failed with %s", err) + }).Errorf("ReedSolomon verify failed with %s", err) closeAndRemoveWriters(writers...) return err } @@ -185,7 +185,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("%s", err) + }).Errorf("%s", err) closeAndRemoveWriters(writers...) return err } @@ -199,7 +199,7 @@ func (xl XL) doHealFile(volume string, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("Write failed with %s", err) + }).Errorf("Write failed with %s", err) closeAndRemoveWriters(writers...) return err } diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index 078892c91..5285dc35b 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -38,22 +38,22 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) // Acquire a read lock. readLock := true xl.lockNS(volume, path, readLock) - quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path) + onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path) xl.unlockNS(volume, path, readLock) if err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("Get readable disks failed with %s", err) + }).Errorf("Get readable disks failed with %s", err) return nil, err } - if doSelfHeal { - if err = xl.doHealFile(volume, path); err != nil { + if heal { + if err = xl.healFile(volume, path); err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("doHealFile failed with %s", err) + }).Errorf("healFile failed with %s", err) return nil, err } } @@ -67,12 +67,12 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("Failed to get file size, %s", err) + }).Errorf("Failed to get file size, %s", err) return nil, err } readers := make([]io.ReadCloser, len(xl.storageDisks)) - for index, disk := range quorumDisks { + for index, disk := range onlineDisks { if disk == nil { continue } @@ -121,7 +121,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("%s", errDataCorrupt) + }).Errorf("%s", errDataCorrupt) pipeWriter.CloseWithError(errDataCorrupt) return } @@ -133,7 +133,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon verify failed with %s", err) + }).Errorf("ReedSolomon verify failed with %s", err) pipeWriter.CloseWithError(err) return } @@ -151,7 +151,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon reconstruct failed with %s", err) + }).Errorf("ReedSolomon reconstruct failed with %s", err) pipeWriter.CloseWithError(err) return } @@ -161,7 +161,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon verify failed with %s", err) + }).Errorf("ReedSolomon verify failed with %s", err) pipeWriter.CloseWithError(err) return } @@ -171,7 +171,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("%s", err) + }).Errorf("%s", err) pipeWriter.CloseWithError(err) return } @@ -183,7 +183,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("ReedSolomon joining decoded blocks failed with %s", err) + }).Errorf("ReedSolomon joining decoded blocks failed with %s", err) pipeWriter.CloseWithError(err) return } diff --git a/xl-v1.go b/xl-v1.go index 17c80de73..5fbfd07ae 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -166,7 +166,7 @@ func (xl XL) MakeVol(volume string) error { if err := disk.MakeVol(volume); err != nil { log.WithFields(logrus.Fields{ "volume": volume, - }).Debugf("MakeVol failed with %s", err) + }).Errorf("MakeVol failed with %s", err) // We ignore error if errVolumeExists and creating a volume again. if err == errVolumeExists { volumeExistsMap[index] = struct{}{} @@ -196,7 +196,7 @@ func (xl XL) DeleteVol(volume string) error { if err := disk.DeleteVol(volume); err != nil { log.WithFields(logrus.Fields{ "volume": volume, - }).Debugf("DeleteVol failed with %s", err) + }).Errorf("DeleteVol failed with %s", err) // We ignore error if errVolumeNotFound. if err == errVolumeNotFound { volumeNotFoundMap[index] = struct{}{} @@ -269,7 +269,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { } else if err != nil { log.WithFields(logrus.Fields{ "volume": volume, - }).Debugf("StatVol failed with %s", err) + }).Errorf("StatVol failed with %s", err) return VolInfo{}, err } } @@ -304,7 +304,7 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) { "markerPath": markerPath, "recursive": false, "count": 1000, - }).Debugf("ListFiles failed with %s", err) + }).Errorf("ListFiles failed with %s", err) break } allFileInfos = append(allFileInfos, fileInfos...) @@ -340,7 +340,7 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) { "volume": volume, "path": metadataFilePath, "offset": offset, - }).Debugf("ReadFile failed with %s", err) + }).Errorf("ReadFile failed with %s", err) return nil, err } // Close metadata reader. @@ -352,7 +352,7 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) { "volume": volume, "path": metadataFilePath, "offset": offset, - }).Debugf("fileMetadataDecode failed with %s", err) + }).Errorf("fileMetadataDecode failed with %s", err) return nil, err } return metadata, nil @@ -369,7 +369,7 @@ func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("extractMetadata failed with %s", err) + }).Errorf("extractMetadata failed with %s", err) return FileInfo{}, err } fileSize, err := metadata.GetSize() @@ -377,7 +377,7 @@ func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("GetSize failed with %s", err) + }).Errorf("GetSize failed with %s", err) return FileInfo{}, err } fileModTime, err := metadata.GetModTime() @@ -385,7 +385,7 @@ func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("GetModTime failed with %s", err) + }).Errorf("GetModTime failed with %s", err) return FileInfo{}, err } fileInfo.Size = fileSize @@ -428,7 +428,7 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) "marker": markerPath, "recursive": recursive, "count": count, - }).Debugf("ListFiles failed with %s", err) + }).Errorf("ListFiles failed with %s", err) return nil, true, err } for _, fsFileInfo := range fsFilesInfo { @@ -450,7 +450,7 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("extractFileInfo failed with %s", err) + }).Errorf("extractFileInfo failed with %s", err) // For a leaf directory, if err is FileNotFound then // perhaps has a missing metadata. Ignore it and let // healing finish its job it will become available soon. @@ -485,7 +485,7 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) "marker": markerPath, "recursive": recursive, "count": 1, - }).Debugf("ListFiles failed with %s", err) + }).Errorf("ListFiles failed with %s", err) return nil, true, err } if !eof { @@ -499,7 +499,7 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) "volume": volume, "prefix": prefix, "fsFileInfo.Name": fsFilesInfo[0].Name, - }).Debugf("ListFiles failed with %s, expected %s to be a part.json file.", err, fsFilesInfo[0].Name) + }).Errorf("ListFiles failed with %s, expected %s to be a part.json file.", err, fsFilesInfo[0].Name) return nil, true, errUnexpected } } @@ -525,22 +525,22 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { // Acquire read lock. readLock := true xl.lockNS(volume, path, readLock) - _, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path) + _, metadata, heal, err := xl.listOnlineDisks(volume, path) xl.unlockNS(volume, path, readLock) if err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("getReadableDisks failed with %s", err) + }).Errorf("getReadableDisks failed with %s", err) return FileInfo{}, err } - if doSelfHeal { - if err = xl.doHealFile(volume, path); err != nil { + if heal { + if err = xl.healFile(volume, path); err != nil { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("doHealFile failed with %s", err) + }).Errorf("doHealFile failed with %s", err) return FileInfo{}, err } } @@ -551,7 +551,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("GetSize failed with %s", err) + }).Errorf("GetSize failed with %s", err) return FileInfo{}, err } modTime, err := metadata.GetModTime() @@ -559,7 +559,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("GetModTime failed with %s", err) + }).Errorf("GetModTime failed with %s", err) return FileInfo{}, err } @@ -589,7 +589,7 @@ func (xl XL) DeleteFile(volume, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("DeleteFile failed with %s", err) + }).Errorf("DeleteFile failed with %s", err) return err } metadataFilePath := slashpath.Join(path, metadataFile) @@ -598,7 +598,7 @@ func (xl XL) DeleteFile(volume, path string) error { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Debugf("DeleteFile failed with %s", err) + }).Errorf("DeleteFile failed with %s", err) return err } }