heal: Set truncate when no more walk entries (#3932)

This commit is contained in:
Anis Elleuch 2017-03-20 23:31:25 +01:00 committed by Harshavardhana
parent eb02261642
commit 9d6e226692
2 changed files with 61 additions and 15 deletions

View File

@ -26,6 +26,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"time"
@ -1451,6 +1452,7 @@ func TestListHealUploadsHandler(t *testing.T) {
delimiter string
maxKeys string
statusCode int
expectedResp ListMultipartUploadsResponse
}{
// 1. Valid params.
{
@ -1460,6 +1462,14 @@ func TestListHealUploadsHandler(t *testing.T) {
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
expectedResp: ListMultipartUploadsResponse{
XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListMultipartUploadsResult"},
Bucket: "mybucket",
KeyMarker: "prefix11",
Delimiter: "/",
Prefix: "prefix",
MaxUploads: 10,
},
},
// 2. Valid params with empty prefix.
{
@ -1469,6 +1479,14 @@ func TestListHealUploadsHandler(t *testing.T) {
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
expectedResp: ListMultipartUploadsResponse{
XMLName: xml.Name{Space: "http://s3.amazonaws.com/doc/2006-03-01/", Local: "ListMultipartUploadsResult"},
Bucket: "mybucket",
KeyMarker: "",
Delimiter: "/",
Prefix: "",
MaxUploads: 10,
},
},
// 3. Invalid params with invalid bucket.
{
@ -1536,8 +1554,29 @@ func TestListHealUploadsHandler(t *testing.T) {
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
// Compare result with the expected one only when we receive 200 OK
if rec.Code == http.StatusOK {
resp := rec.Result()
xmlBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Test %d: Failed to read response %v", i+1, err)
}
var actualResult ListMultipartUploadsResponse
err = xml.Unmarshal(xmlBytes, &actualResult)
if err != nil {
t.Errorf("Test %d: Failed to unmarshal xml %v", i+1, err)
}
if !reflect.DeepEqual(test.expectedResp, actualResult) {
t.Fatalf("Test %d: Unexpected response `%+v`, expected: `%+v`", i+1, test.expectedResp, actualResult)
}
}
}
}

View File

@ -303,6 +303,8 @@ func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker,
var walkerDoneCh chan struct{}
// Check if we have room left to send more uploads.
if maxUploads > 0 {
uploadsLeft := maxUploads
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{
bucket: minioMetaMultipartBucket,
recursive: recursive,
@ -319,10 +321,14 @@ func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker,
multipartPrefixPath, multipartMarkerPath,
recursive, listDir, isLeaf, walkerDoneCh)
}
// Collect uploads until maxUploads limit is reached.
for walkResult := range walkerCh {
// For any error during tree walk we should
// return right away.
// Collect uploads until leftUploads limit is reached.
for {
walkResult, ok := <-walkerCh
if !ok {
truncated = false
break
}
// For any error during tree walk, we should return right away.
if walkResult.err != nil {
return ListMultipartsInfo{}, walkResult.err
}
@ -334,8 +340,8 @@ func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker,
uploads = append(uploads, uploadMetadata{
Object: entry,
})
maxUploads--
if maxUploads == 0 {
uploadsLeft--
if uploadsLeft == 0 {
break
}
continue
@ -347,20 +353,21 @@ func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker,
var end bool
uploadIDMarker = ""
newUploads, end, err = fetchMultipartUploadIDs(bucket, entry, uploadIDMarker,
maxUploads, xl.getLoadBalancedDisks())
uploadsLeft, xl.getLoadBalancedDisks())
if err != nil {
return ListMultipartsInfo{}, err
}
uploads = append(uploads, newUploads...)
maxUploads -= len(newUploads)
uploadsLeft -= len(newUploads)
if end && walkResult.end {
truncated = false
break
}
if maxUploads == 0 {
if uploadsLeft == 0 {
break
}
}
}
// For all received uploads fill in the multiparts result.