Add large bucket support for erasure coded backend (#5160)

This PR implements an object layer which
combines input erasure sets of XL layers
into a unified namespace.

This object layer extends the existing
erasure coded implementation, it is assumed
in this design that providing > 16 disks is
a static configuration as well i.e if you started
the setup with 32 disks with 4 sets 8 disks per
pack then you would need to provide 4 sets always.

Some design details and restrictions:

- Objects are distributed using consistent ordering
  to a unique erasure coded layer.
- Each pack has its own dsync so locks are synchronized
  properly at pack (erasure layer).
- Each pack still has a maximum of 16 disks
  requirement, you can start with multiple
  such sets statically.
- Static sets set of disks and cannot be
  changed, there is no elastic expansion allowed.
- Static sets set of disks and cannot be
  changed, there is no elastic removal allowed.
- ListObjects() across sets can be noticeably
  slower since List happens on all servers,
  and is merged at this sets layer.

Fixes #5465
Fixes #5464
Fixes #5461
Fixes #5460
Fixes #5459
Fixes #5458
Fixes #5460
Fixes #5488
Fixes #5489
Fixes #5497
Fixes #5496
This commit is contained in:
Harshavardhana
2018-02-15 17:45:57 -08:00
committed by kannappanr
parent dd80256151
commit fb96779a8a
82 changed files with 5046 additions and 4771 deletions

View File

@@ -36,12 +36,12 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
tmpUploadsPath := mustGetUUID()
// slice to store errors from disks
errs := make([]error, len(xl.storageDisks))
errs := make([]error, len(xl.getDisks()))
// slice to store if it is a delete operation on a disk
isDelete := make([]bool, len(xl.storageDisks))
isDelete := make([]bool, len(xl.getDisks()))
wg := sync.WaitGroup{}
for index, disk := range xl.storageDisks {
for index, disk := range xl.getDisks() {
if disk == nil {
errs[index] = errors.Trace(errDiskNotFound)
continue
@@ -108,7 +108,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
//
// 2. uploads.json was deleted -> in this case since
// the delete failed, we restore from tmp.
for index, disk := range xl.storageDisks {
for index, disk := range xl.getDisks() {
if disk == nil || errs[index] != nil {
continue
}
@@ -134,7 +134,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
// we do have quorum, so in case of delete upload.json file
// operation, we purge from tmp.
for index, disk := range xl.storageDisks {
for index, disk := range xl.getDisks() {
if disk == nil || !isDelete[index] {
continue
}
@@ -188,7 +188,7 @@ func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool {
func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) {
curpartPath := path.Join(bucket, object, uploadID, partName)
wg := sync.WaitGroup{}
for i, disk := range xl.storageDisks {
for i, disk := range xl.getDisks() {
if disk == nil {
continue
}
@@ -227,7 +227,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
}
// If all errors were ignored, reduce to maximal occurrence
// based on the read quorum.
readQuorum := len(xl.storageDisks) / 2
readQuorum := len(xl.getDisks()) / 2
return FileInfo{}, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum)
}
@@ -506,7 +506,7 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark
// operation(s) on the object.
func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (string, error) {
dataBlocks, parityBlocks := getRedundancyCount(meta[amzStorageClass], len(xl.storageDisks))
dataBlocks, parityBlocks := getRedundancyCount(meta[amzStorageClass], len(xl.getDisks()))
xlMeta := newXLMetaV1(object, dataBlocks, parityBlocks)
@@ -542,7 +542,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
tempUploadIDPath := uploadID
// Write updated `xl.json` to all disks.
disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum)
disks, err := writeSameXLMetadata(xl.getDisks(), minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
}
@@ -678,7 +678,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
}
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket,
partsMetadata, errs = readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket,
uploadIDPath)
// get Quorum for this object
@@ -695,7 +695,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
preUploadIDLock.RUnlock()
// List all online disks.
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
onlineDisks, modTime := listOnlineDisks(xl.getDisks(), partsMetadata, errs)
// Pick one from the first valid metadata.
xlMeta, err := pickValidXLMeta(partsMetadata, modTime)
@@ -725,7 +725,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
buffer := make([]byte, xlMeta.Erasure.BlockSize, 2*xlMeta.Erasure.BlockSize) // alloc additional space for parity blocks created while erasure coding
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
buffer := xl.bp.Get()
defer xl.bp.Put(buffer)
file, err := storage.CreateFile(data, minioMetaTmpBucket, tmpPartPath, buffer, DefaultBitrotAlgorithm, writeQuorum)
if err != nil {
return pi, toObjectErr(err, bucket, object)
@@ -961,7 +965,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
uploadIDPath := pathJoin(bucket, object, uploadID)
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
@@ -974,7 +978,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return oi, toObjectErr(reducedErr, bucket, object)
}
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
onlineDisks, modTime := listOnlineDisks(xl.getDisks(), partsMetadata, errs)
// Calculate full object size.
var objectSize int64
@@ -1077,7 +1081,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors.
_, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum)
_, err = renameObject(xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
@@ -1134,11 +1138,11 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Wrapper which removes all the uploaded parts.
func (xl xlObjects) cleanupUploadedParts(uploadIDPath string, writeQuorum int) error {
var errs = make([]error, len(xl.storageDisks))
var errs = make([]error, len(xl.getDisks()))
var wg = &sync.WaitGroup{}
// Cleanup uploadID for all disks.
for index, disk := range xl.storageDisks {
for index, disk := range xl.getDisks() {
if disk == nil {
errs[index] = errors.Trace(errDiskNotFound)
continue
@@ -1169,7 +1173,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
uploadIDPath := path.Join(bucket, object, uploadID)
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)