xl: Add logging. (#1372)

This commit is contained in:
Harshavardhana 2016-04-24 23:12:54 -07:00 committed by Harshavardhana
parent 57f35c2bcc
commit 8bce699dae
9 changed files with 315 additions and 71 deletions

5
fs.go
View File

@ -645,7 +645,10 @@ func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) {
// If its a directory its not a regular file.
if st.Mode().IsDir() {
log.Debugf("File is %s", errIsNotRegular)
log.WithFields(logrus.Fields{
"diskPath": s.diskPath,
"filePath": filePath,
}).Debugf("File is %s.", errIsNotRegular)
return FileInfo{}, errFileNotFound
}
return FileInfo{

View File

@ -27,6 +27,8 @@ import (
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
)
type networkFS struct {
@ -78,7 +80,9 @@ func toStorageErr(err error) error {
func newNetworkFS(networkPath string) (StorageAPI, error) {
// Input validation.
if networkPath == "" && strings.LastIndex(networkPath, ":") != -1 {
log.Debugf("Network path %s is malformed", networkPath)
log.WithFields(logrus.Fields{
"networkPath": networkPath,
}).Debugf("Network path is malformed, should be of form <ip>:<port>:<export_dir>")
return nil, errInvalidArgument
}
@ -88,7 +92,10 @@ func newNetworkFS(networkPath string) (StorageAPI, error) {
// Dial minio rpc storage http path.
rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, storageRPCPath)
if err != nil {
log.Debugf("RPC HTTP dial failed for %s at path %s", netAddr, storageRPCPath)
log.WithFields(logrus.Fields{
"netAddr": netAddr,
"storageRPCPath": storageRPCPath,
}).Debugf("RPC HTTP dial failed with %s", err)
return nil, err
}
@ -118,7 +125,9 @@ func newNetworkFS(networkPath string) (StorageAPI, error) {
func (n networkFS) MakeVol(volume string) error {
reply := GenericReply{}
if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil {
log.Debugf("Storage.MakeVolHandler returned an error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
}).Debugf("Storage.MakeVolHandler returned an error %s", err)
return toStorageErr(err)
}
return nil
@ -138,7 +147,9 @@ func (n networkFS) ListVols() (vols []VolInfo, err error) {
// StatVol - get current Stat volume info.
func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) {
if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil {
log.Debugf("Storage.StatVolHandler returned an error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
}).Debugf("Storage.StatVolHandler returned an error %s", err)
return VolInfo{}, toStorageErr(err)
}
return volInfo, nil
@ -148,7 +159,9 @@ func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) {
func (n networkFS) DeleteVol(volume string) error {
reply := GenericReply{}
if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil {
log.Debugf("Storage.DeleteVolHandler returned an error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
}).Debugf("Storage.DeleteVolHandler returned an error %s", err)
return toStorageErr(err)
}
return nil
@ -168,7 +181,10 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser,
go func() {
resp, err := n.httpClient.Post(writeURL.String(), contentType, readCloser)
if err != nil {
log.Debugf("CreateFile http POST failed to upload the data with error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("CreateFile http POST failed to upload the data with error %s", err)
readCloser.CloseWithError(err)
return
}
@ -194,7 +210,10 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error)
Vol: volume,
Path: path,
}, &fileInfo); err != nil {
log.Debugf("Storage.StatFileHandler failed with %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Storage.StatFileHandler failed with %s", err)
return FileInfo{}, toStorageErr(err)
}
return fileInfo, nil
@ -211,7 +230,10 @@ func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io
readURL.RawQuery = readQuery.Encode()
resp, err := n.httpClient.Get(readURL.String())
if err != nil {
log.Debugf("ReadFile http Get failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReadFile http Get failed with error %s", err)
return nil, err
}
if resp != nil {
@ -235,7 +257,13 @@ func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, coun
Recursive: recursive,
Count: count,
}, &listFilesReply); err != nil {
log.Debugf("Storage.ListFilesHandlers failed with %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"marker": marker,
"recursive": recursive,
"count": count,
}).Debugf("Storage.ListFilesHandlers failed with %s", err)
return nil, true, toStorageErr(err)
}
// Return successfully unmarshalled results.
@ -249,7 +277,10 @@ func (n networkFS) DeleteFile(volume, path string) (err error) {
Vol: volume,
Path: path,
}, &reply); err != nil {
log.Debugf("Storage.DeleteFileHandler failed with %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Storage.DeleteFileHandler failed with %s", err)
return toStorageErr(err)
}
return nil

View File

@ -86,7 +86,7 @@ func TestGetObjectInfo(t *testing.T) {
{"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false},
{"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false},
// Test case with existing bucket but object name set to a directory (Test number 13).
{"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectExistsAsPrefix{Bucket: "test-getobjectinfo", Object: "Asia"}, false},
{"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia"}, false},
// Valid case with existing object (Test number 14).
{"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true},
}

View File

@ -48,3 +48,6 @@ var errReadQuorum = errors.New("I/O error. did not meet read quorum.")
// errWriteQuorum - did not meet write quorum.
var errWriteQuorum = errors.New("I/O error. did not meet write quorum.")
// errDataCorrupt - err data corrupt.
var errDataCorrupt = errors.New("data likely corrupted, all blocks are zero in length")

View File

@ -6,6 +6,7 @@ import (
"net/rpc"
"strconv"
"github.com/Sirupsen/logrus"
router "github.com/gorilla/mux"
)
@ -21,7 +22,9 @@ type storageServer struct {
func (s *storageServer) MakeVolHandler(arg *string, reply *GenericReply) error {
err := s.storage.MakeVol(*arg)
if err != nil {
log.Debugf("MakeVol failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": *arg,
}).Debugf("MakeVol failed with error %s", err)
return err
}
return nil
@ -42,7 +45,9 @@ func (s *storageServer) ListVolsHandler(arg *string, reply *ListVolsReply) error
func (s *storageServer) StatVolHandler(arg *string, reply *VolInfo) error {
volInfo, err := s.storage.StatVol(*arg)
if err != nil {
log.Debugf("StatVol failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": *arg,
}).Debugf("StatVol failed with error %s", err)
return err
}
*reply = volInfo
@ -54,7 +59,9 @@ func (s *storageServer) StatVolHandler(arg *string, reply *VolInfo) error {
func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error {
err := s.storage.DeleteVol(*arg)
if err != nil {
log.Debugf("DeleteVol failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": *arg,
}).Debugf("DeleteVol failed with error %s", err)
return err
}
return nil
@ -66,7 +73,13 @@ func (s *storageServer) DeleteVolHandler(arg *string, reply *GenericReply) error
func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesReply) error {
files, eof, err := s.storage.ListFiles(arg.Vol, arg.Prefix, arg.Marker, arg.Recursive, arg.Count)
if err != nil {
log.Debugf("ListFiles failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": arg.Vol,
"prefix": arg.Prefix,
"marker": arg.Marker,
"recursive": arg.Recursive,
"count": arg.Count,
}).Debugf("ListFiles failed with error %s", err)
return err
}
@ -82,7 +95,10 @@ func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesRep
func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) error {
fileInfo, err := s.storage.StatFile(arg.Vol, arg.Path)
if err != nil {
log.Debugf("StatFile failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": arg.Vol,
"path": arg.Path,
}).Debugf("StatFile failed with error %s", err)
return err
}
*reply = fileInfo
@ -93,7 +109,10 @@ func (s *storageServer) StatFileHandler(arg *StatFileArgs, reply *FileInfo) erro
func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error {
err := s.storage.DeleteFile(arg.Vol, arg.Path)
if err != nil {
log.Debugf("DeleteFile failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": arg.Vol,
"path": arg.Path,
}).Debugf("DeleteFile failed with error %s", err)
return err
}
return nil
@ -120,7 +139,10 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) {
path := vars["path"]
writeCloser, err := stServer.storage.CreateFile(volume, path)
if err != nil {
log.Debugf("CreateFile failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("CreateFile failed with error %s", err)
httpErr := http.StatusInternalServerError
if err == errVolumeNotFound {
httpErr = http.StatusNotFound
@ -132,7 +154,10 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) {
}
reader := r.Body
if _, err = io.Copy(writeCloser, reader); err != nil {
log.Debugf("Copying incoming reader to writer failed %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Copying incoming reader to writer failed %s", err)
safeCloseAndRemove(writeCloser)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -147,13 +172,19 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) {
path := vars["path"]
offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64)
if err != nil {
log.Debugf("Parse offset failure with error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Parse offset failure with error %s", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
readCloser, err := stServer.storage.ReadFile(volume, path, offset)
if err != nil {
log.Debugf("ReadFile failed with error %s", err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReadFile failed with error %s", err)
httpErr := http.StatusBadRequest
if err == errVolumeNotFound {
httpErr = http.StatusNotFound

View File

@ -26,6 +26,7 @@ import (
"strconv"
"time"
"github.com/Sirupsen/logrus"
fastSha512 "github.com/minio/minio/pkg/crypto/sha512"
)
@ -85,25 +86,39 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 {
// without allocating.
fileQuorumVersionMap := make(map[int]int64)
// TODO - all errors should be logged here.
// Read meta data from all disks
for index, disk := range xl.storageDisks {
fileQuorumVersionMap[index] = -1
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReadFile failed with %s", err)
continue
} else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("JSON decoding failed with %s", err)
continue
}
version, err := metadata.GetFileVersion()
if err == errMetadataKeyNotExist {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Missing 'file.version', %s", errMetadataKeyNotExist)
fileQuorumVersionMap[index] = 0
continue
}
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("'file.version' decoding failed with %s", err)
continue
}
fileQuorumVersionMap[index] = version
@ -133,6 +148,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, bwriter *b
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
writer, err := disk.CreateFile(volume, erasurePart)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("CreateFile failed with %s", err)
createFileError++
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
@ -152,6 +171,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, bwriter *b
var metadataWriter io.WriteCloser
metadataWriter, err = disk.CreateFile(volume, metadataFilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("CreateFile failed with %s", err)
createFileError++
// We can safely allow CreateFile errors up to

View File

@ -21,11 +21,14 @@ import (
"fmt"
"io"
slashpath "path"
"github.com/Sirupsen/logrus"
)
func (xl XL) selfHeal(volume string, path string) error {
// doSelfHeal - heals the file at path.
func (xl XL) doHealFile(volume string, path string) error {
totalBlocks := xl.DataBlocks + xl.ParityBlocks
needsSelfHeal := make([]bool, totalBlocks)
needsHeal := make([]bool, totalBlocks)
var readers = make([]io.Reader, totalBlocks)
var writers = make([]io.WriteCloser, totalBlocks)
@ -34,22 +37,30 @@ func (xl XL) selfHeal(volume string, path string) error {
xl.lockNS(volume, path, readLock)
defer xl.unlockNS(volume, path, readLock)
quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path)
quorumDisks, metadata, doHeal, err := xl.getReadableDisks(volume, path)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Get readable disks failed with %s", err)
return err
}
if !doSelfHeal {
if !doHeal {
return nil
}
size, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Failed to get file size, %s", err)
return err
}
for index, disk := range quorumDisks {
if disk == nil {
needsSelfHeal[index] = true
needsHeal[index] = true
continue
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
@ -64,26 +75,30 @@ func (xl XL) selfHeal(volume string, path string) error {
}
// Check if there is atleast one part that needs to be healed.
atleastOneSelfHeal := false
for _, shNeeded := range needsSelfHeal {
if shNeeded {
atleastOneSelfHeal = true
atleastOneHeal := false
for _, healNeeded := range needsHeal {
if healNeeded {
atleastOneHeal = true
break
}
}
if !atleastOneSelfHeal {
if !atleastOneHeal {
// Return if healing not needed anywhere.
return nil
}
// create writers for parts where healing is needed.
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
for index, healNeeded := range needsHeal {
if !healNeeded {
continue
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
writers[index], err = xl.storageDisks[index].CreateFile(volume, erasurePart)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("CreateFile failed with error %s", err)
// Unexpected error
closeAndRemoveWriters(writers...)
return err
@ -107,63 +122,86 @@ func (xl XL) selfHeal(volume string, path string) error {
// ReedSolomon.Verify() expects that slice is not nil even if the particular
// part needs healing.
enBlocks[index] = make([]byte, curBlockSize)
if needsSelfHeal[index] {
if needsHeal[index] {
// Skip reading if the part needs healing.
continue
}
_, e := io.ReadFull(reader, enBlocks[index])
if e != nil && e != io.ErrUnexpectedEOF {
_, err = io.ReadFull(reader, enBlocks[index])
if err != nil && err != io.ErrUnexpectedEOF {
enBlocks[index] = nil
}
}
// Check blocks if they are all zero in length.
if checkBlockSize(enBlocks) == 0 {
err = errors.New("Data likely corrupted, all blocks are zero in length.")
return err
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("%s", errDataCorrupt)
return errDataCorrupt
}
// Verify the blocks.
ok, e := xl.ReedSolomon.Verify(enBlocks)
if e != nil {
ok, err := xl.ReedSolomon.Verify(enBlocks)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon verify failed with %s", err)
closeAndRemoveWriters(writers...)
return e
return err
}
// Verification failed, blocks require reconstruction.
if !ok {
for index, shNeeded := range needsSelfHeal {
if shNeeded {
for index, healNeeded := range needsHeal {
if healNeeded {
// Reconstructs() reconstructs the parts if the array is nil.
enBlocks[index] = nil
}
}
e = xl.ReedSolomon.Reconstruct(enBlocks)
if e != nil {
err = xl.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon reconstruct failed with %s", err)
closeAndRemoveWriters(writers...)
return e
return err
}
// Verify reconstructed blocks again.
ok, e = xl.ReedSolomon.Verify(enBlocks)
if e != nil {
ok, err = xl.ReedSolomon.Verify(enBlocks)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon verify failed with %s", err)
closeAndRemoveWriters(writers...)
return e
return err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
e = errors.New("Verification failed after reconstruction, data likely corrupted.")
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("%s", err)
closeAndRemoveWriters(writers...)
return e
return err
}
}
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
for index, healNeeded := range needsHeal {
if !healNeeded {
continue
}
_, e := writers[index].Write(enBlocks[index])
if e != nil {
_, err := writers[index].Write(enBlocks[index])
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Write failed with %s", err)
closeAndRemoveWriters(writers...)
return e
return err
}
}
totalLeft = totalLeft - erasureBlockSize
@ -171,17 +209,17 @@ func (xl XL) selfHeal(volume string, path string) error {
// After successful healing Close() the writer so that the temp
// files are committed to their location.
for index, shNeeded := range needsSelfHeal {
if !shNeeded {
for _, writer := range writers {
if writer == nil {
continue
}
writers[index].Close()
writer.Close()
}
// Update the quorum metadata after selfheal.
errs := xl.setPartsMetadata(volume, path, metadata, needsSelfHeal)
for index, shNeeded := range needsSelfHeal {
if shNeeded && errs[index] != nil {
errs := xl.setPartsMetadata(volume, path, metadata, needsHeal)
for index, healNeeded := range needsHeal {
if healNeeded && errs[index] != nil {
return errs[index]
}
}

View File

@ -21,6 +21,8 @@ import (
"fmt"
"io"
slashpath "path"
"github.com/Sirupsen/logrus"
)
// ReadFile - read file
@ -33,25 +35,39 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
return nil, errInvalidArgument
}
// Acquire a read lock.
readLock := true
xl.lockNS(volume, path, readLock)
quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path)
xl.unlockNS(volume, path, readLock)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Get readable disks failed with %s", err)
return nil, err
}
if doSelfHeal {
if err = xl.selfHeal(volume, path); err != nil {
if err = xl.doHealFile(volume, path); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("doHealFile failed with %s", err)
return nil, err
}
}
// Acquire read lock again.
xl.lockNS(volume, path, readLock)
defer xl.unlockNS(volume, path, readLock)
fileSize, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("Failed to get file size, %s", err)
return nil, err
}
@ -102,8 +118,11 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// Check blocks if they are all zero in length.
if checkBlockSize(enBlocks) == 0 {
err = errors.New("Data likely corrupted, all blocks are zero in length.")
pipeWriter.CloseWithError(err)
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("%s", errDataCorrupt)
pipeWriter.CloseWithError(errDataCorrupt)
return
}
@ -111,6 +130,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
var ok bool
ok, err = xl.ReedSolomon.Verify(enBlocks)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon verify failed with %s", err)
pipeWriter.CloseWithError(err)
return
}
@ -125,18 +148,30 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}
err = xl.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon reconstruct failed with %s", err)
pipeWriter.CloseWithError(err)
return
}
// Verify reconstructed blocks again.
ok, err = xl.ReedSolomon.Verify(enBlocks)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon verify failed with %s", err)
pipeWriter.CloseWithError(err)
return
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("%s", err)
pipeWriter.CloseWithError(err)
return
}
@ -145,6 +180,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// Join the decoded blocks.
err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("ReedSolomon joining decoded blocks failed with %s", err)
pipeWriter.CloseWithError(err)
return
}

View File

@ -24,6 +24,7 @@ import (
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/klauspost/reedsolomon"
)
@ -164,6 +165,9 @@ func (xl XL) MakeVol(volume string) error {
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if err := disk.MakeVol(volume); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
}).Debugf("MakeVol failed with %s", err)
// We ignore error if errVolumeExists and creating a volume again.
if err == errVolumeExists {
volumeExistsMap[index] = struct{}{}
@ -191,6 +195,9 @@ func (xl XL) DeleteVol(volume string) error {
// Remove a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if err := disk.DeleteVol(volume); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
}).Debugf("DeleteVol failed with %s", err)
// We ignore error if errVolumeNotFound.
if err == errVolumeNotFound {
volumeNotFoundMap[index] = struct{}{}
@ -260,6 +267,11 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
} else if err == errVolumeNotFound {
// Count total amount of volume not found errors.
volumeNotFoundErrCnt++
} else if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
}).Debugf("StatVol failed with %s", err)
return VolInfo{}, err
}
}
@ -285,8 +297,15 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) {
var allFileInfos []FileInfo
var markerPath string
for {
fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000)
if e != nil {
fileInfos, eof, err := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"leafPath": leafPath,
"markerPath": markerPath,
"recursive": false,
"count": 1000,
}).Debugf("ListFiles failed with %s", err)
break
}
allFileInfos = append(allFileInfos, fileInfos...)
@ -318,6 +337,11 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
disk := xl.storageDisks[0]
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": metadataFilePath,
"offset": offset,
}).Debugf("ReadFile failed with %s", err)
return nil, err
}
// Close metadata reader.
@ -325,6 +349,11 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
metadata, err := fileMetadataDecode(metadataReader)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": metadataFilePath,
"offset": offset,
}).Debugf("fileMetadataDecode failed with %s", err)
return nil, err
}
return metadata, nil
@ -338,14 +367,26 @@ func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) {
metadata, err := xl.extractMetadata(volume, path)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("extractMetadata failed with %s", err)
return FileInfo{}, err
}
fileSize, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("GetSize failed with %s", err)
return FileInfo{}, err
}
fileModTime, err := metadata.GetModTime()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("GetModTime failed with %s", err)
return FileInfo{}, err
}
fileInfo.Size = fileSize
@ -382,6 +423,13 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int)
// List files.
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"prefix": prefix,
"marker": markerPath,
"recursive": recursive,
"count": count,
}).Debugf("ListFiles failed with %s", err)
return nil, true, err
}
@ -401,6 +449,10 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int)
path := slashpath.Dir(fsFileInfo.Name)
fileInfo, err = xl.extractFileInfo(volume, path)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("extractFileInfo failed with %s", err)
// For a leaf directory, if err is FileNotFound then
// perhaps has a missing metadata. Ignore it and let
// healing finish its job it will become available soon.
@ -436,11 +488,19 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
_, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path)
xl.unlockNS(volume, path, readLock)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("getReadableDisks failed with %s", err)
return FileInfo{}, err
}
if doSelfHeal {
if err = xl.selfHeal(volume, path); err != nil {
if err = xl.doHealFile(volume, path); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("doHealFile failed with %s", err)
return FileInfo{}, err
}
}
@ -448,10 +508,18 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
// Extract metadata.
size, err := metadata.GetSize()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("GetSize failed with %s", err)
return FileInfo{}, err
}
modTime, err := metadata.GetModTime()
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("GetModTime failed with %s", err)
return FileInfo{}, err
}
@ -478,11 +546,19 @@ func (xl XL) DeleteFile(volume, path string) error {
erasureFilePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
err := disk.DeleteFile(volume, erasureFilePart)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("DeleteFile failed with %s", err)
return err
}
metadataFilePath := slashpath.Join(path, metadataFile)
err = disk.DeleteFile(volume, metadataFilePath)
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Debugf("DeleteFile failed with %s", err)
return err
}
}