xl: Fixes a bug in read quorum ListFiles() (#1412)

Fixes a bug in #1406
This commit is contained in:
Harshavardhana 2016-04-28 17:32:46 -07:00
parent eed756777b
commit 41b35cff7b

View File

@ -293,48 +293,44 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
// isLeafDirectory - check if a given path is leaf directory. i.e // isLeafDirectory - check if a given path is leaf directory. i.e
// there are no more directories inside it. Erasure code backend // there are no more directories inside it. Erasure code backend
// format it means that the parent directory is the actual object name. // format it means that the parent directory is the actual object name.
func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) { func isLeafDirectory(disk StorageAPI, volume, leafPath string) (isLeaf bool) {
var allFileInfos []FileInfo
var markerPath string var markerPath string
var xlListCount = 1000 // Count page.
for { for {
fileInfos, eof, err := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000) fileInfos, eof, err := disk.ListFiles(volume, leafPath, markerPath, false, xlListCount)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,
"leafPath": leafPath, "leafPath": leafPath,
"markerPath": markerPath, "markerPath": markerPath,
"recursive": false, "recursive": false,
"count": 1000, "count": xlListCount,
}).Errorf("ListFiles failed with %s", err) }).Errorf("ListFiles failed with %s", err)
break break
} }
allFileInfos = append(allFileInfos, fileInfos...) for _, fileInfo := range fileInfos {
if fileInfo.Mode.IsDir() {
// Directory found, not a leaf directory, return right here.
return false
}
}
if eof { if eof {
break break
} }
// MarkerPath to get the next set of files. // MarkerPath to get the next set of files.
markerPath = allFileInfos[len(allFileInfos)-1].Name markerPath = fileInfos[len(fileInfos)-1].Name
}
for _, fileInfo := range allFileInfos {
if fileInfo.Mode.IsDir() {
// Directory found, not a leaf directory, return right here.
isLeaf = false
return isLeaf
}
} }
// Exhausted all the entries, no directories found must be leaf // Exhausted all the entries, no directories found must be leaf
// return right here. // return right here.
isLeaf = true return true
return isLeaf
} }
// extractMetadata - extract file metadata. // extractMetadata - extract file metadata.
func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) { func extractMetadata(disk StorageAPI, volume, path string) (fileMetadata, error) {
metadataFilePath := slashpath.Join(path, metadataFile) metadataFilePath := slashpath.Join(path, metadataFile)
// We are not going to read partial data from metadata file, // We are not going to read partial data from metadata file,
// read the whole file always. // read the whole file always.
offset := int64(0) offset := int64(0)
disk := xl.storageDisks[0]
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
@ -360,12 +356,12 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
} }
// Extract file info from paths. // Extract file info from paths.
func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) { func extractFileInfo(disk StorageAPI, volume, path string) (FileInfo, error) {
fileInfo := FileInfo{} fileInfo := FileInfo{}
fileInfo.Volume = volume fileInfo.Volume = volume
fileInfo.Name = path fileInfo.Name = path
metadata, err := xl.extractMetadata(volume, path) metadata, err := extractMetadata(disk, volume, path)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,
@ -422,7 +418,7 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int)
var firstErr error var firstErr error
for _, disk := range xl.storageDisks { for _, disk := range xl.storageDisks {
if filesInfo, eof, err = xl.listFiles(disk, volume, prefix, marker, recursive, count); err == nil { if filesInfo, eof, err = listFiles(disk, volume, prefix, marker, recursive, count); err == nil {
// we need to return first successful result // we need to return first successful result
if firstFilesInfo == nil { if firstFilesInfo == nil {
firstFilesInfo = filesInfo firstFilesInfo = filesInfo
@ -454,11 +450,11 @@ func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int)
return nil, false, errReadQuorum return nil, false, errReadQuorum
} }
func (xl XL) listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) { func listFiles(disk StorageAPI, volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) {
var fsFilesInfo []FileInfo var fsFilesInfo []FileInfo
var markerPath = marker var markerPath = marker
if marker != "" { if marker != "" {
isLeaf := xl.isLeafDirectory(volume, retainSlash(marker)) isLeaf := isLeafDirectory(disk, volume, retainSlash(marker))
if isLeaf { if isLeaf {
// For leaf for now we just point to the first block, make it // For leaf for now we just point to the first block, make it
// dynamic in future based on the availability of storage disks. // dynamic in future based on the availability of storage disks.
@ -466,6 +462,8 @@ func (xl XL) listFiles(disk StorageAPI, volume, prefix, marker string, recursive
} }
} }
// Loop and capture the proper fileInfos, requires extraction and
// separation of XL related metadata information.
for { for {
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count) fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count)
if err != nil { if err != nil {
@ -486,13 +484,13 @@ func (xl XL) listFiles(disk StorageAPI, volume, prefix, marker string, recursive
var fileInfo FileInfo var fileInfo FileInfo
var isLeaf bool var isLeaf bool
if fsFileInfo.Mode.IsDir() { if fsFileInfo.Mode.IsDir() {
isLeaf = xl.isLeafDirectory(volume, fsFileInfo.Name) isLeaf = isLeafDirectory(disk, volume, fsFileInfo.Name)
} }
if isLeaf || !fsFileInfo.Mode.IsDir() { if isLeaf || !fsFileInfo.Mode.IsDir() {
// Extract the parent of leaf directory or file to get the // Extract the parent of leaf directory or file to get the
// actual name. // actual name.
path := slashpath.Dir(fsFileInfo.Name) path := slashpath.Dir(fsFileInfo.Name)
fileInfo, err = xl.extractFileInfo(volume, path) fileInfo, err = extractFileInfo(disk, volume, path)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,
@ -516,10 +514,9 @@ func (xl XL) listFiles(disk StorageAPI, volume, prefix, marker string, recursive
break break
} }
} }
lenFsFilesInfo := len(fsFilesInfo) if len(fsFilesInfo) > 0 {
if lenFsFilesInfo != 0 {
// markerPath for the next disk.ListFiles() iteration. // markerPath for the next disk.ListFiles() iteration.
markerPath = fsFilesInfo[lenFsFilesInfo-1].Name markerPath = fsFilesInfo[len(fsFilesInfo)-1].Name
} }
if count == 0 && recursive && !strings.HasSuffix(markerPath, metadataFile) { if count == 0 && recursive && !strings.HasSuffix(markerPath, metadataFile) {
// If last entry is not part.json then loop once more to check if we // If last entry is not part.json then loop once more to check if we