fs/xl: Simplify bucket metadata reading. (#3486)

ObjectLayer GetObject() now returns the entire object
if starting offset is 0 and length is negative. This
also allows to simplify handler layer code where
we always had to use GetObjectInfo() before proceeding
to read bucket metadata files examples `policy.json`.

This also reduces one additional call overhead.
This commit is contained in:
Harshavardhana 2016-12-21 11:29:32 -08:00 committed by GitHub
parent f57f773189
commit 15b4c49621
6 changed files with 43 additions and 68 deletions

View File

@ -150,16 +150,8 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader
objLock.RLock()
defer objLock.RUnlock()
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath)
if err != nil {
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, BucketPolicyNotFound{Bucket: bucket}
}
errorIf(err, "Unable to load policy for the bucket %s.", bucket)
return nil, errorCause(err)
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, policyPath, 0, objInfo.Size, &buffer)
err = objAPI.GetObject(minioMetaBucket, policyPath, 0, -1, &buffer)
if err != nil {
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, BucketPolicyNotFound{Bucket: bucket}

View File

@ -308,20 +308,8 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
objLock.RLock()
defer objLock.RUnlock()
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, ncPath)
if err != nil {
// 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket.
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, errNoSuchNotifications
}
errorIf(err, "Unable to load bucket-notification for bucket %s", bucket)
// Returns error for other errors.
return nil, err
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, ncPath, 0, objInfo.Size, &buffer)
err := objAPI.GetObject(minioMetaBucket, ncPath, 0, -1, &buffer) // Read everything.
if err != nil {
// 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no
@ -363,20 +351,8 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er
objLock.RLock()
defer objLock.RUnlock()
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, lcPath)
if err != nil {
// 'listener.json' not found return
// 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket.
if isErrObjectNotFound(err) {
return nil, errNoSuchNotifications
}
errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket)
// Returns error for other errors.
return nil, err
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, lcPath, 0, objInfo.Size, &buffer)
err := objAPI.GetObject(minioMetaBucket, lcPath, 0, -1, &buffer)
if err != nil {
// 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no

View File

@ -69,7 +69,7 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) {
}
// Test initEventNotifier() with faulty disks
for i := 1; i <= 5; i++ {
for i := 1; i <= 3; i++ {
fs.storage = newNaughtyDisk(fsstorage, map[int]error{i: errFaultyDisk}, nil)
if err := initEventNotifier(fs); errorCause(err) != errFaultyDisk {
t.Fatal("Unexpected error:", err)

View File

@ -222,8 +222,8 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
if err = checkGetObjArgs(bucket, object); err != nil {
return err
}
// Offset and length cannot be negative.
if offset < 0 || length < 0 {
// Offset cannot be negative.
if offset < 0 {
return toObjectErr(traceError(errUnexpected), bucket, object)
}
// Writer cannot be nil.
@ -237,12 +237,13 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
return toObjectErr(traceError(err), bucket, object)
}
// Reply back invalid range if the input offset and length fall out of range.
if offset > fi.Size || length > fi.Size {
return traceError(InvalidRange{offset, length, fi.Size})
// For negative length we read everything.
if length < 0 {
length = fi.Size - offset
}
// Reply if we have inputs with offset and length falling out of file size range.
if offset+length > fi.Size {
// Reply back invalid range if the input offset and length fall out of range.
if offset > fi.Size || offset+length > fi.Size {
return traceError(InvalidRange{offset, length, fi.Size})
}
@ -291,6 +292,7 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
break
}
}
// Returns any error.
return toObjectErr(err, bucket, object)
}

View File

@ -121,28 +121,28 @@ func testGetObject(obj ObjectLayer, instanceType string, t TestErrHandler) {
// Fetching the entire object.
// Test case - 8.
{bucketName, objectName, 0, int64(len(bytesData[0].byteData)), buffers[1], buffers[1], true, bytesData[0].byteData, nil},
// Test case with content-range 1 to objectSize .
// Test case with `length` parameter set to a negative value.
// Test case - 9.
{bucketName, objectName, 0, int64(-1), buffers[1], buffers[1], true, bytesData[0].byteData, nil},
// Test case with content-range 1 to objectSize .
// Test case - 10.
{bucketName, objectName, 1, int64(len(bytesData[0].byteData) - 1), buffers[1], buffers[1], true, bytesData[0].byteData[1:], nil},
// Test case with content-range 100 to objectSize - 100.
// Test case - 10.
// Test case - 11.
{bucketName, objectName, 100, int64(len(bytesData[0].byteData) - 200), buffers[1], buffers[1], true,
bytesData[0].byteData[100 : len(bytesData[0].byteData)-100], nil},
// Test case with offset greater than the size of the object
// Test case - 11.
// Test case - 12.
{bucketName, objectName, int64(len(bytesData[0].byteData) + 1), int64(len(bytesData[0].byteData)), buffers[0],
NewEOFWriter(buffers[0], 100), false, []byte{},
InvalidRange{int64(len(bytesData[0].byteData) + 1), int64(len(bytesData[0].byteData)), int64(len(bytesData[0].byteData))}},
// Test case with offset greater than the size of the object.
// Test case - 12.
// Test case - 13.
{bucketName, objectName, -1, int64(len(bytesData[0].byteData)), buffers[0], new(bytes.Buffer), false, []byte{}, errUnexpected},
// Test case length parameter is more than the object size.
// Test case - 13.
// Test case - 14.
{bucketName, objectName, 0, int64(len(bytesData[0].byteData) + 1), buffers[1], buffers[1], false, bytesData[0].byteData,
InvalidRange{0, int64(len(bytesData[0].byteData) + 1), int64(len(bytesData[0].byteData))}},
// Test case with `length` parameter set to a negative value.
// Test case - 14.
{bucketName, objectName, 0, int64(-1), buffers[1], buffers[1], false, bytesData[0].byteData, errUnexpected},
// Test case with offset + length > objectSize parameter set to a negative value.
// Test case - 15.
{bucketName, objectName, 2, int64(len(bytesData[0].byteData)), buffers[1], buffers[1], false, bytesData[0].byteData,
@ -391,34 +391,37 @@ func testGetObjectDiskNotFound(obj ObjectLayer, instanceType string, disks []str
// Fetching the entire object.
// Test case - 8.
{bucketName, objectName, 0, int64(len(bytesData[0].byteData)), buffers[1], buffers[1], true, bytesData[0].byteData, nil},
// Test case with content-range 1 to objectSize .
// Test case with `length` parameter set to a negative value.
// Test case - 9.
{bucketName, objectName, 0, int64(-1), buffers[1], buffers[1], true, bytesData[0].byteData, nil},
// Test case with `length` parameter set to a negative value and offset is positive.
// Test case - 10.
{bucketName, objectName, 1, int64(-1), buffers[1], buffers[1], true, bytesData[0].byteData[1:], nil},
// Test case with content-range 1 to objectSize .
// Test case - 11.
{bucketName, objectName, 1, int64(len(bytesData[0].byteData) - 1), buffers[1], buffers[1], true, bytesData[0].byteData[1:], nil},
// Test case with content-range 100 to objectSize - 100.
// Test case - 10.
// Test case - 12.
{bucketName, objectName, 100, int64(len(bytesData[0].byteData) - 200), buffers[1], buffers[1], true,
bytesData[0].byteData[100 : len(bytesData[0].byteData)-100], nil},
// Test case with offset greater than the size of the object
// Test case - 11.
// Test case - 13.
{bucketName, objectName, int64(len(bytesData[0].byteData) + 1), int64(len(bytesData[0].byteData)), buffers[0],
NewEOFWriter(buffers[0], 100), false, []byte{},
InvalidRange{int64(len(bytesData[0].byteData) + 1), int64(len(bytesData[0].byteData)), int64(len(bytesData[0].byteData))}},
// Test case with offset greater than the size of the object.
// Test case - 12.
// Test case - 14.
{bucketName, objectName, -1, int64(len(bytesData[0].byteData)), buffers[0], new(bytes.Buffer), false, []byte{}, errUnexpected},
// Test case length parameter is more than the object size.
// Test case - 13.
// Test case - 15.
{bucketName, objectName, 0, int64(len(bytesData[0].byteData) + 1), buffers[1], buffers[1], false, bytesData[0].byteData,
InvalidRange{0, int64(len(bytesData[0].byteData) + 1), int64(len(bytesData[0].byteData))}},
// Test case with `length` parameter set to a negative value.
// Test case - 14.
{bucketName, objectName, 0, int64(-1), buffers[1], buffers[1], false, bytesData[0].byteData, errUnexpected},
// Test case with offset + length > objectSize parameter set to a negative value.
// Test case - 15.
// Test case - 16.
{bucketName, objectName, 2, int64(len(bytesData[0].byteData)), buffers[1], buffers[1], false, bytesData[0].byteData,
InvalidRange{2, int64(len(bytesData[0].byteData)), int64(len(bytesData[0].byteData))}},
// Test case with the writer set to nil.
// Test case - 16.
// Test case - 17.
{bucketName, objectName, 0, int64(len(bytesData[0].byteData)), buffers[1], nil, false, bytesData[0].byteData, errUnexpected},
}

View File

@ -53,10 +53,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
if err := checkGetObjArgs(bucket, object); err != nil {
return err
}
// Start offset and length cannot be negative.
if startOffset < 0 || length < 0 {
// Start offset cannot be negative.
if startOffset < 0 {
return traceError(errUnexpected)
}
// Writer cannot be nil.
if writer == nil {
return traceError(errUnexpected)
@ -88,13 +90,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Reorder parts metadata based on erasure distribution order.
metaArr = getOrderedPartsMetadata(xlMeta.Erasure.Distribution, metaArr)
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > xlMeta.Stat.Size || length > xlMeta.Stat.Size {
return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size})
// For negative length read everything.
if length < 0 {
length = xlMeta.Stat.Size - startOffset
}
// Reply if we have inputs with offset and length.
if startOffset+length > xlMeta.Stat.Size {
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > xlMeta.Stat.Size || startOffset+length > xlMeta.Stat.Size {
return traceError(InvalidRange{startOffset, length, xlMeta.Stat.Size})
}