Add x-amz-storage-class support (#5295)

This adds configurable data and parity options on a per object
basis. To use variable parity

- Users can set environment variables to cofigure variable
parity

- Then add header x-amz-storage-class to putobject requests
with relevant storage class values

Fixes #4997
This commit is contained in:
Nitish Tiwari
2017-12-22 16:58:13 +05:30
committed by GitHub
parent f1355da72e
commit 1a3dbbc9dd
25 changed files with 1237 additions and 129 deletions

View File

@@ -32,7 +32,7 @@ import (
)
// updateUploadJSON - add or remove upload ID info in all `uploads.json`.
func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, isRemove bool) error {
func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, writeQuorum int, isRemove bool) error {
uploadsPath := path.Join(bucket, object, uploadsJSONFile)
tmpUploadsPath := mustGetUUID()
@@ -95,7 +95,7 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
// Wait for all the writes to finish.
wg.Wait()
err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
if errors.Cause(err) == errXLWriteQuorum {
// No quorum. Perform cleanup on the minority of disks
// on which the operation succeeded.
@@ -151,13 +151,13 @@ func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated
}
// addUploadID - add upload ID and its initiated time to 'uploads.json'.
func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time) error {
return xl.updateUploadJSON(bucket, object, uploadID, initiated, false)
func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time, writeQuorum int) error {
return xl.updateUploadJSON(bucket, object, uploadID, initiated, writeQuorum, false)
}
// removeUploadID - remove upload ID in 'uploads.json'.
func (xl xlObjects) removeUploadID(bucket, object string, uploadID string) error {
return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, true)
func (xl xlObjects) removeUploadID(bucket, object string, uploadID string, writeQuorum int) error {
return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, writeQuorum, true)
}
// Returns if the prefix is a multipart upload.
@@ -228,7 +228,8 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
}
// If all errors were ignored, reduce to maximal occurrence
// based on the read quorum.
return FileInfo{}, reduceReadQuorumErrs(ignoredErrs, nil, xl.readQuorum)
readQuorum := len(xl.storageDisks) / 2
return FileInfo{}, reduceReadQuorumErrs(ignoredErrs, nil, readQuorum)
}
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
@@ -499,7 +500,15 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark
// disks. `uploads.json` carries metadata regarding on-going multipart
// operation(s) on the object.
func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (string, error) {
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks)
dataBlocks, parityBlocks := getDrivesCount(meta[amzStorageClass], xl.storageDisks)
xlMeta := newXLMetaV1(object, dataBlocks, parityBlocks)
// we now know the number of blocks this object needs for data and parity.
// establish the writeQuorum using this data
writeQuorum := dataBlocks + 1
// If not set default to "application/octet-stream"
if meta["content-type"] == "" {
contentType := "application/octet-stream"
@@ -528,7 +537,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
tempUploadIDPath := uploadID
// Write updated `xl.json` to all disks.
disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum)
disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum)
if err != nil {
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
}
@@ -538,14 +547,14 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath)
// Attempt to rename temp upload object to actual upload path object
_, rErr := renameObject(disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
_, rErr := renameObject(disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
}
initiated := UTCNow()
// Create or update 'uploads.json'
if err = xl.addUploadID(bucket, object, uploadID, initiated); err != nil {
if err = xl.addUploadID(bucket, object, uploadID, initiated, writeQuorum); err != nil {
return "", err
}
// Return success.
@@ -641,7 +650,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket,
uploadIDPath)
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
if errors.Cause(reducedErr) == errXLWriteQuorum {
preUploadIDLock.RUnlock()
return pi, toObjectErr(reducedErr, bucket, object)
@@ -669,7 +685,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
defer xl.deleteObject(minioMetaTmpBucket, tmpPart)
if data.Size() > 0 {
if pErr := xl.prepareFile(minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks); err != nil {
if pErr := xl.prepareFile(minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); err != nil {
return pi, toObjectErr(pErr, bucket, object)
}
@@ -680,7 +696,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
return pi, toObjectErr(err, bucket, object)
}
buffer := make([]byte, xlMeta.Erasure.BlockSize, 2*xlMeta.Erasure.BlockSize) // alloc additional space for parity blocks created while erasure coding
file, err := storage.CreateFile(data, minioMetaTmpBucket, tmpPartPath, buffer, DefaultBitrotAlgorithm, xl.writeQuorum)
file, err := storage.CreateFile(data, minioMetaTmpBucket, tmpPartPath, buffer, DefaultBitrotAlgorithm, writeQuorum)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
@@ -705,14 +721,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
// Rename temporary part file to its final location.
partPath := path.Join(uploadIDPath, partSuffix)
onlineDisks, err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum)
onlineDisks, err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaMultipartBucket, uploadIDPath)
reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
reducedErr = reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
if errors.Cause(reducedErr) == errXLWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
@@ -747,11 +763,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
tempXLMetaPath := newUUID
// Writes a unique `xl.json` each disk carrying new checksum related information.
if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil {
if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum); err != nil {
return pi, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath)
}
if _, err = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum); err != nil {
if _, err = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum); err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
@@ -904,7 +920,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
reducedErr := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
if errors.Cause(reducedErr) == errXLWriteQuorum {
return oi, toObjectErr(reducedErr, bucket, object)
}
@@ -992,12 +1015,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Write unique `xl.json` for each disk.
if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil {
if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, writeQuorum); err != nil {
return oi, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
}
var rErr error
onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum)
if rErr != nil {
return oi, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
}
@@ -1025,7 +1048,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors.
_, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum)
_, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
@@ -1045,7 +1068,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Rename the multipart object to final location.
if _, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil {
if _, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, writeQuorum); err != nil {
return oi, toObjectErr(err, bucket, object)
}
@@ -1060,7 +1083,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json with quorum
if err = xl.removeUploadID(bucket, object, uploadID); err != nil {
if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil {
return oi, toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
}
@@ -1081,13 +1104,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Wrapper which removes all the uploaded parts.
func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error {
func (xl xlObjects) cleanupUploadedParts(uploadIDPath string, writeQuorum int) error {
var errs = make([]error, len(xl.storageDisks))
var wg = &sync.WaitGroup{}
// Construct uploadIDPath.
uploadIDPath := path.Join(bucket, object, uploadID)
// Cleanup uploadID for all disks.
for index, disk := range xl.storageDisks {
if disk == nil {
@@ -1108,7 +1128,7 @@ func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error
// Wait for all the cleanups to finish.
wg.Wait()
return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
}
// abortMultipartUpload - wrapper for purging an ongoing multipart
@@ -1116,8 +1136,20 @@ func (xl xlObjects) cleanupUploadedParts(bucket, object, uploadID string) error
// the directory at '.minio.sys/multipart/bucket/object/uploadID' holding
// all the upload parts.
func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) {
// Construct uploadIDPath.
uploadIDPath := path.Join(bucket, object, uploadID)
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaMultipartBucket, uploadIDPath)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
if err != nil {
return toObjectErr(err, bucket, object)
}
// Cleanup all uploaded parts.
if err = xl.cleanupUploadedParts(bucket, object, uploadID); err != nil {
if err = xl.cleanupUploadedParts(uploadIDPath, writeQuorum); err != nil {
return toObjectErr(err, bucket, object)
}
@@ -1131,7 +1163,7 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json with quorum
if err = xl.removeUploadID(bucket, object, uploadID); err != nil {
if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil {
return toObjectErr(err, bucket, object)
}