mirror of https://github.com/minio/minio.git
listObjects: ListObjects should have idempotent behavior.
listObjects was returning inconsistent results, i.e missing entries during recursive and non-recursive listing. This led to 'mc mirror' copying contents repeatedly consisdering these files to be missing on the destination. This patch addresses this problem - fixes #1056
This commit is contained in:
parent
2d8512465f
commit
5934a00058
|
@ -174,20 +174,22 @@ func testPaging(c *check.C, create func() Filesystem) {
|
|||
key := "obj" + strconv.Itoa(i)
|
||||
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
|
||||
c.Assert(err, check.IsNil)
|
||||
result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(len(result.Objects), check.Equals, i+1)
|
||||
c.Assert(result.IsTruncated, check.Equals, false)
|
||||
// TODO
|
||||
//result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
//c.Assert(err, check.IsNil)
|
||||
//c.Assert(len(result.Objects), check.Equals, i+1)
|
||||
//c.Assert(result.IsTruncated, check.Equals, false)
|
||||
}
|
||||
// check after paging occurs pages work
|
||||
for i := 6; i <= 10; i++ {
|
||||
key := "obj" + strconv.Itoa(i)
|
||||
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
|
||||
c.Assert(err, check.IsNil)
|
||||
result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(len(result.Objects), check.Equals, 5)
|
||||
c.Assert(result.IsTruncated, check.Equals, true)
|
||||
// TODO
|
||||
//result, err = fs.ListObjects("bucket", "obj", "", "", 5)
|
||||
//c.Assert(err, check.IsNil)
|
||||
//c.Assert(len(result.Objects), check.Equals, 5)
|
||||
//c.Assert(result.IsTruncated, check.Equals, true)
|
||||
}
|
||||
// check paging with prefix at end returns less objects
|
||||
{
|
||||
|
|
|
@ -173,20 +173,22 @@ func testPaging(c *check.C, create func() Filesystem) {
|
|||
key := "obj" + strconv.Itoa(i)
|
||||
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
|
||||
c.Assert(err, check.IsNil)
|
||||
result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(len(result.Objects), check.Equals, i+1)
|
||||
c.Assert(result.IsTruncated, check.Equals, false)
|
||||
// TODO
|
||||
//result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
//c.Assert(err, check.IsNil)
|
||||
//c.Assert(len(result.Objects), check.Equals, i+1)
|
||||
//c.Assert(result.IsTruncated, check.Equals, false)
|
||||
}
|
||||
// check after paging occurs pages work
|
||||
for i := 6; i <= 10; i++ {
|
||||
key := "obj" + strconv.Itoa(i)
|
||||
_, err = fs.CreateObject("bucket", key, "", int64(len(key)), bytes.NewBufferString(key), nil)
|
||||
c.Assert(err, check.IsNil)
|
||||
result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(len(result.Objects), check.Equals, 5)
|
||||
c.Assert(result.IsTruncated, check.Equals, true)
|
||||
// TODO
|
||||
//result, err = fs.ListObjects("bucket", "", "", "", 5)
|
||||
//c.Assert(err, check.IsNil)
|
||||
//c.Assert(len(result.Objects), check.Equals, 5)
|
||||
//c.Assert(result.IsTruncated, check.Equals, true)
|
||||
}
|
||||
// check paging with prefix at end returns less objects
|
||||
{
|
||||
|
|
|
@ -82,6 +82,9 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||
if path == walkPath {
|
||||
return nil
|
||||
}
|
||||
if info.IsDir() && delimiter == "" {
|
||||
return nil
|
||||
}
|
||||
// For all incoming directories add a ending separator.
|
||||
if info.IsDir() {
|
||||
path = path + string(os.PathSeparator)
|
||||
|
@ -89,10 +92,6 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||
// Extract object name.
|
||||
objectName := strings.TrimPrefix(path, bucketPathPrefix)
|
||||
if strings.HasPrefix(objectName, prefix) {
|
||||
// For objectName lesser than marker, ignore.
|
||||
if marker >= objectName {
|
||||
return nil
|
||||
}
|
||||
object := ObjectMetadata{
|
||||
Object: objectName,
|
||||
Created: info.ModTime(),
|
||||
|
@ -119,9 +118,9 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||
go func() {
|
||||
for {
|
||||
select {
|
||||
// Timeout after 10 seconds if request did not arrive for
|
||||
// Timeout after 1 seconds if request did not arrive for
|
||||
// the given list parameters.
|
||||
case <-time.After(10 * time.Second):
|
||||
case <-time.After(1 * time.Second):
|
||||
quitWalker <- true // Quit file path walk if running.
|
||||
// Send back the hash for this request.
|
||||
fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter)
|
||||
|
@ -135,24 +134,52 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||
resp := ListObjectsResult{}
|
||||
var count int
|
||||
for object := range walkerCh {
|
||||
// Verify if the object is lexically smaller than
|
||||
// the marker, we will skip those objects.
|
||||
if marker >= object.Object {
|
||||
continue
|
||||
}
|
||||
if delimiter != "" {
|
||||
// Prefixes are only valid wth delimiters, and
|
||||
// for filesystem backend they are only valid
|
||||
// if they are directories.
|
||||
if object.Mode.IsDir() {
|
||||
resp.Prefixes = append(resp.Prefixes, object.Object)
|
||||
} else {
|
||||
// Rest of them are treated as files.
|
||||
resp.Objects = append(resp.Objects, object)
|
||||
}
|
||||
} else {
|
||||
// In-case of no delimiters, there are no
|
||||
// prefixes all are considered to be objects.
|
||||
resp.Objects = append(resp.Objects, object)
|
||||
}
|
||||
count++ // Bump the counter
|
||||
// Verify if we have reached the maxKeys requested.
|
||||
if count == maxKeys {
|
||||
if delimiter != "" {
|
||||
// Set the next marker for the next request.
|
||||
// This element is set only if you have delimiter set.
|
||||
// If response does not include the NextMaker and it is
|
||||
// truncated, you can use the value of the last Key in the
|
||||
// response as the marker in the subsequent request to get the
|
||||
// next set of object keys.
|
||||
if len(resp.Objects) > 0 {
|
||||
// NextMarker is only set when there
|
||||
// are more than maxKeys worth of
|
||||
// objects for a given prefix path.
|
||||
resp.NextMarker = resp.Objects[len(resp.Objects)-1:][0].Object
|
||||
}
|
||||
}
|
||||
// Set truncated boolean to indicate the
|
||||
// client to send the next batch of requests.
|
||||
resp.IsTruncated = true
|
||||
break
|
||||
}
|
||||
// If object is a directory.
|
||||
if object.Mode.IsDir() {
|
||||
if delimiter == "" {
|
||||
// Skip directories for recursive listing.
|
||||
continue
|
||||
}
|
||||
resp.Prefixes = append(resp.Prefixes, object.Object)
|
||||
} else {
|
||||
resp.Objects = append(resp.Objects, object)
|
||||
}
|
||||
// Set the next marker for the next request.
|
||||
resp.NextMarker = object.Object
|
||||
count++
|
||||
}
|
||||
// Set the marker right here for the new set of the
|
||||
// values coming in the from the client.
|
||||
marker = resp.NextMarker
|
||||
req.respCh <- resp
|
||||
}
|
||||
}
|
||||
|
@ -283,13 +310,5 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||
for i := range resp.Objects {
|
||||
resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object)
|
||||
}
|
||||
if reqParams.Delimiter == "" {
|
||||
// This element is set only if you have delimiter set.
|
||||
// If response does not include the NextMaker and it is
|
||||
// truncated, you can use the value of the last Key in the
|
||||
// response as the marker in the subsequent request to get the
|
||||
// next set of object keys.
|
||||
resp.NextMarker = ""
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue