ListMultipart fixes (#1392)

* ListMultipart: listLeafEntries() - return earlier if a directory is found.
* ListMultipart: do listLeafEntries() only for directories.
This commit is contained in:
Krishna Srinivas 2016-04-27 12:45:40 +05:30 committed by Harshavardhana
parent 90987df9b4
commit d0e5470050

View File

@ -52,7 +52,6 @@ func (o objectAPI) isBucketExist(bucketName string) (bool, error) {
// directory, returns error if any - returns empty list if prefixPath // directory, returns error if any - returns empty list if prefixPath
// is not a leaf directory. // is not a leaf directory.
func (o objectAPI) listLeafEntries(prefixPath string) (entries []FileInfo, e error) { func (o objectAPI) listLeafEntries(prefixPath string) (entries []FileInfo, e error) {
var allFileInfos []FileInfo
var markerPath string var markerPath string
for { for {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, false, 1000) fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, false, 1000)
@ -63,21 +62,23 @@ func (o objectAPI) listLeafEntries(prefixPath string) (entries []FileInfo, e err
}).Errorf("%s", e) }).Errorf("%s", e)
return nil, e return nil, e
} }
allFileInfos = append(allFileInfos, fileInfos...) for _, fileInfo := range fileInfos {
// Set marker for next batch of ListFiles.
markerPath = fileInfo.Name
if fileInfo.Mode.IsDir() {
// If a directory is found, doesn't return anything.
return nil, nil
}
fileName := path.Base(fileInfo.Name)
if !strings.Contains(fileName, ".") {
// Skip the entry if it is of the pattern bucket/object/uploadID.partNum.md5sum
// and retain entries of the pattern bucket/object/uploadID
entries = append(entries, fileInfo)
}
}
if eof { if eof {
break break
} }
markerPath = allFileInfos[len(allFileInfos)-1].Name
}
for _, fileInfo := range allFileInfos {
if fileInfo.Mode.IsDir() {
// If a directory is found, doesn't return anything.
return nil, nil
}
fileName := path.Base(fileInfo.Name)
if !strings.Contains(fileName, ".") {
entries = append(entries, fileInfo)
}
} }
return entries, nil return entries, nil
} }
@ -106,51 +107,62 @@ func (o objectAPI) listMetaVolumeFiles(prefixPath string, markerPath string, rec
// Loop through and validate individual file. // Loop through and validate individual file.
for _, fi := range fileInfos { for _, fi := range fileInfos {
var entries []FileInfo var entries []FileInfo
// List all the entries if fi.Name is a leaf directory, if if fi.Mode.IsDir() {
// fi.Name is not a leaf directory them the resulting // List all the entries if fi.Name is a leaf directory, if
// entries are empty. // fi.Name is not a leaf directory then the resulting
entries, e = o.listLeafEntries(fi.Name) // entries are empty.
if e != nil { entries, e = o.listLeafEntries(fi.Name)
log.WithFields(logrus.Fields{ if e != nil {
"prefixPath": fi.Name, log.WithFields(logrus.Fields{
}).Errorf("%s", e) "prefixPath": fi.Name,
return nil, false, e }).Errorf("%s", e)
return nil, false, e
}
} }
// Set markerPath for next batch of listing. // Set markerPath for next batch of listing.
markerPath = fi.Name markerPath = fi.Name
if len(entries) > 0 { if len(entries) > 0 {
// We reach here for non-recursive case and a leaf entry.
for _, entry := range entries { for _, entry := range entries {
// Skip the entries for erasure parts if any.
if strings.Contains(path.Base(entry.Name), ".") {
continue
}
allFileInfos = append(allFileInfos, entry) allFileInfos = append(allFileInfos, entry)
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
// Return values:
// allFileInfos : "maxKeys" number of entries.
// eof : eof returned by o.storage.ListFiles()
// error : nil
return
}
} }
} else { continue
// Skip special files. }
// We reach here for a non-recursive case non-leaf entry
// OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum]
if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries.
// Skip files matching pattern bucket/object/uploadID.partNum.md5sum
// and retain files matching pattern bucket/object/uploadID
specialFile := path.Base(fi.Name) specialFile := path.Base(fi.Name)
if strings.Contains(specialFile, ".") { if strings.Contains(specialFile, ".") {
// Contains partnumber and md5sum info, skip this. // Contains partnumber and md5sum info, skip this.
continue continue
} }
allFileInfos = append(allFileInfos, fi)
} }
allFileInfos = append(allFileInfos, fi)
newMaxKeys++ newMaxKeys++
// If we have reached the maxKeys, it means we have listed // If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here. // everything that was requested. Return right here.
if newMaxKeys == maxKeys { if newMaxKeys == maxKeys {
// Returns all the entries until maxKeys entries. // Return values:
// // allFileInfos : "maxKeys" number of entries.
// eof is deliberately set as false since most of the // eof : eof returned by o.storage.ListFiles()
// time if newMaxKeys == maxKeys, there are most // error : nil
// probably more than 1000 multipart sessions in return
// progress.
//
// Setting this here allows us to set proper Markers
// so that the subsequent call returns the next set of
// entries.
eof = false
return allFileInfos, eof, nil
} }
} }
// If we have reached eof then we break out. // If we have reached eof then we break out.