mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
[feature] allow for an odd number of erasure packs (#9221)
Too many deployments come up with an odd number of hosts or drives, to facilitate even distribution among those setups allow for odd and prime numbers based packs.
This commit is contained in:
parent
90c365a174
commit
30707659b5
@ -40,7 +40,7 @@ type endpointSet struct {
|
|||||||
|
|
||||||
// Supported set sizes this is used to find the optimal
|
// Supported set sizes this is used to find the optimal
|
||||||
// single set size.
|
// single set size.
|
||||||
var setSizes = []uint64{4, 6, 8, 10, 12, 14, 16}
|
var setSizes = []uint64{4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||||
|
|
||||||
// getDivisibleSize - returns a greatest common divisor of
|
// getDivisibleSize - returns a greatest common divisor of
|
||||||
// all the ellipses sizes.
|
// all the ellipses sizes.
|
||||||
@ -60,7 +60,7 @@ func getDivisibleSize(totalSizes []uint64) (result uint64) {
|
|||||||
|
|
||||||
// isValidSetSize - checks whether given count is a valid set size for erasure coding.
|
// isValidSetSize - checks whether given count is a valid set size for erasure coding.
|
||||||
var isValidSetSize = func(count uint64) bool {
|
var isValidSetSize = func(count uint64) bool {
|
||||||
return (count >= setSizes[0] && count <= setSizes[len(setSizes)-1] && count%2 == 0)
|
return (count >= setSizes[0] && count <= setSizes[len(setSizes)-1])
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSetIndexes returns list of indexes which provides the set size
|
// getSetIndexes returns list of indexes which provides the set size
|
||||||
|
@ -130,9 +130,9 @@ func TestGetSetIndexesEnvOverride(t *testing.T) {
|
|||||||
{
|
{
|
||||||
[]string{"http://host{1...11}/data{1...11}"},
|
[]string{"http://host{1...11}/data{1...11}"},
|
||||||
[]uint64{121},
|
[]uint64{121},
|
||||||
nil,
|
[][]uint64{{11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11}},
|
||||||
11,
|
11,
|
||||||
false,
|
true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
[]string{"data{1...60}"},
|
[]string{"data{1...60}"},
|
||||||
@ -183,12 +183,6 @@ func TestGetSetIndexes(t *testing.T) {
|
|||||||
success bool
|
success bool
|
||||||
}{
|
}{
|
||||||
// Invalid inputs.
|
// Invalid inputs.
|
||||||
{
|
|
||||||
[]string{"data{1...27}"},
|
|
||||||
[]uint64{27},
|
|
||||||
nil,
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
[]string{"data{1...3}"},
|
[]string{"data{1...3}"},
|
||||||
[]uint64{3},
|
[]uint64{3},
|
||||||
@ -202,6 +196,12 @@ func TestGetSetIndexes(t *testing.T) {
|
|||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
// Valid inputs.
|
// Valid inputs.
|
||||||
|
{
|
||||||
|
[]string{"data{1...27}"},
|
||||||
|
[]uint64{27},
|
||||||
|
[][]uint64{{9, 9, 9}},
|
||||||
|
true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
[]string{"data/controller1/export{1...4}, data/controller2/export{1...8}, data/controller3/export{1...12}"},
|
[]string{"data/controller1/export{1...4}, data/controller2/export{1...8}, data/controller3/export{1...12}"},
|
||||||
[]uint64{4, 8, 12},
|
[]uint64{4, 8, 12},
|
||||||
@ -223,7 +223,7 @@ func TestGetSetIndexes(t *testing.T) {
|
|||||||
{
|
{
|
||||||
[]string{"data/controller{1...11}/export{1...8}"},
|
[]string{"data/controller{1...11}/export{1...8}"},
|
||||||
[]uint64{88},
|
[]uint64{88},
|
||||||
[][]uint64{{8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8}},
|
[][]uint64{{11, 11, 11, 11, 11, 11, 11, 11}},
|
||||||
true,
|
true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -292,12 +292,6 @@ func TestParseEndpointSet(t *testing.T) {
|
|||||||
endpointSet{},
|
endpointSet{},
|
||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
// Indivisible range.
|
|
||||||
{
|
|
||||||
"{1...27}",
|
|
||||||
endpointSet{},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
// No range specified.
|
// No range specified.
|
||||||
{
|
{
|
||||||
"{...}",
|
"{...}",
|
||||||
@ -323,6 +317,23 @@ func TestParseEndpointSet(t *testing.T) {
|
|||||||
false,
|
false,
|
||||||
},
|
},
|
||||||
// Tests valid inputs.
|
// Tests valid inputs.
|
||||||
|
{
|
||||||
|
"{1...27}",
|
||||||
|
endpointSet{
|
||||||
|
[]ellipses.ArgPattern{
|
||||||
|
[]ellipses.Pattern{
|
||||||
|
{
|
||||||
|
Prefix: "",
|
||||||
|
Suffix: "",
|
||||||
|
Seq: getSequences(1, 27, 0),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
[][]uint64{{9, 9, 9}},
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"/export/set{1...64}",
|
"/export/set{1...64}",
|
||||||
endpointSet{
|
endpointSet{
|
||||||
|
@ -705,7 +705,7 @@ func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*
|
|||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
writeQuorum := len(storageDisks)/2 + 1
|
writeQuorum := getWriteQuorum(len(storageDisks))
|
||||||
// Wait for the routines to finish.
|
// Wait for the routines to finish.
|
||||||
return reduceWriteQuorumErrs(ctx, g.Wait(), nil, writeQuorum)
|
return reduceWriteQuorumErrs(ctx, g.Wait(), nil, writeQuorum)
|
||||||
}
|
}
|
||||||
|
16
cmd/utils.go
16
cmd/utils.go
@ -93,6 +93,22 @@ func path2BucketObject(s string) (bucket, prefix string) {
|
|||||||
return path2BucketObjectWithBasePath("", s)
|
return path2BucketObjectWithBasePath("", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getDefaultParityBlocks(drive int) int {
|
||||||
|
return drive / 2
|
||||||
|
}
|
||||||
|
|
||||||
|
func getDefaultDataBlocks(drive int) int {
|
||||||
|
return drive - getDefaultParityBlocks(drive)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReadQuorum(drive int) int {
|
||||||
|
return getDefaultDataBlocks(drive)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getWriteQuorum(drive int) int {
|
||||||
|
return getDefaultDataBlocks(drive) + 1
|
||||||
|
}
|
||||||
|
|
||||||
// URI scheme constants.
|
// URI scheme constants.
|
||||||
const (
|
const (
|
||||||
httpScheme = "http"
|
httpScheme = "http"
|
||||||
|
@ -65,7 +65,7 @@ func (xl xlObjects) MakeBucketWithLocation(ctx context.Context, bucket, location
|
|||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
writeQuorum := len(storageDisks)/2 + 1
|
writeQuorum := getWriteQuorum(len(storageDisks))
|
||||||
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, writeQuorum)
|
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, writeQuorum)
|
||||||
if err == errXLWriteQuorum {
|
if err == errXLWriteQuorum {
|
||||||
// Purge successfully created buckets if we don't have writeQuorum.
|
// Purge successfully created buckets if we don't have writeQuorum.
|
||||||
@ -136,7 +136,7 @@ func (xl xlObjects) getBucketInfo(ctx context.Context, bucketName string) (bucke
|
|||||||
// reduce to one error based on read quorum.
|
// reduce to one error based on read quorum.
|
||||||
// `nil` is deliberately passed for ignoredErrs
|
// `nil` is deliberately passed for ignoredErrs
|
||||||
// because these errors were already ignored.
|
// because these errors were already ignored.
|
||||||
readQuorum := len(xl.getDisks()) / 2
|
readQuorum := getReadQuorum(len(xl.getDisks()))
|
||||||
return BucketInfo{}, reduceReadQuorumErrs(ctx, bucketErrs, nil, readQuorum)
|
return BucketInfo{}, reduceReadQuorumErrs(ctx, bucketErrs, nil, readQuorum)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,7 +259,7 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
writeQuorum := len(storageDisks)/2 + 1
|
writeQuorum := getWriteQuorum(len(storageDisks))
|
||||||
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
|
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
|
||||||
if err == errXLWriteQuorum {
|
if err == errXLWriteQuorum {
|
||||||
undoDeleteBucket(storageDisks, bucket)
|
undoDeleteBucket(storageDisks, bucket)
|
||||||
|
@ -81,7 +81,7 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
|
|||||||
// quorum intentionally, but rely on the default case scenario. Actual quorum
|
// quorum intentionally, but rely on the default case scenario. Actual quorum
|
||||||
// verification will happen by top layer by using getObjectInfo() and will be
|
// verification will happen by top layer by using getObjectInfo() and will be
|
||||||
// ignored if necessary.
|
// ignored if necessary.
|
||||||
readQuorum := len(storageDisks) / 2
|
readQuorum := getReadQuorum(len(storageDisks))
|
||||||
|
|
||||||
return reduceReadQuorumErrs(context.Background(), g.Wait(), objectOpIgnoredErrs, readQuorum) == nil
|
return reduceReadQuorumErrs(context.Background(), g.Wait(), objectOpIgnoredErrs, readQuorum) == nil
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remov
|
|||||||
storageDisks := xl.getDisks()
|
storageDisks := xl.getDisks()
|
||||||
|
|
||||||
// get write quorum for an object
|
// get write quorum for an object
|
||||||
writeQuorum := len(storageDisks)/2 + 1
|
writeQuorum := getWriteQuorum(len(storageDisks))
|
||||||
|
|
||||||
// Heal bucket.
|
// Heal bucket.
|
||||||
return healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun)
|
return healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun)
|
||||||
@ -314,7 +314,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
|
|||||||
if m, ok := isObjectDangling(partsMetadata, errs, dataErrs); ok {
|
if m, ok := isObjectDangling(partsMetadata, errs, dataErrs); ok {
|
||||||
writeQuorum := m.Erasure.DataBlocks + 1
|
writeQuorum := m.Erasure.DataBlocks + 1
|
||||||
if m.Erasure.DataBlocks == 0 {
|
if m.Erasure.DataBlocks == 0 {
|
||||||
writeQuorum = len(storageDisks)/2 + 1
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
||||||
}
|
}
|
||||||
if !dryRun && remove {
|
if !dryRun && remove {
|
||||||
err = xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
err = xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
||||||
@ -492,8 +492,8 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
|
|||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Object: object,
|
Object: object,
|
||||||
DiskCount: len(storageDisks),
|
DiskCount: len(storageDisks),
|
||||||
ParityBlocks: len(storageDisks) / 2,
|
ParityBlocks: getDefaultParityBlocks(len(storageDisks)),
|
||||||
DataBlocks: len(storageDisks) / 2,
|
DataBlocks: getDefaultDataBlocks(len(storageDisks)),
|
||||||
ObjectSize: 0,
|
ObjectSize: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -601,8 +601,8 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []
|
|||||||
|
|
||||||
if !latestXLMeta.IsValid() {
|
if !latestXLMeta.IsValid() {
|
||||||
// Default to most common configuration for erasure blocks.
|
// Default to most common configuration for erasure blocks.
|
||||||
result.ParityBlocks = len(storageDisks) / 2
|
result.ParityBlocks = getDefaultParityBlocks(len(storageDisks))
|
||||||
result.DataBlocks = len(storageDisks) / 2
|
result.DataBlocks = getDefaultDataBlocks(len(storageDisks))
|
||||||
} else {
|
} else {
|
||||||
result.ParityBlocks = latestXLMeta.Erasure.ParityBlocks
|
result.ParityBlocks = latestXLMeta.Erasure.ParityBlocks
|
||||||
result.DataBlocks = latestXLMeta.Erasure.DataBlocks
|
result.DataBlocks = latestXLMeta.Erasure.DataBlocks
|
||||||
@ -729,7 +729,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts
|
|||||||
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
||||||
writeQuorum := m.Erasure.DataBlocks + 1
|
writeQuorum := m.Erasure.DataBlocks + 1
|
||||||
if m.Erasure.DataBlocks == 0 {
|
if m.Erasure.DataBlocks == 0 {
|
||||||
writeQuorum = len(xl.getDisks())/2 + 1
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
||||||
}
|
}
|
||||||
if !opts.DryRun && opts.Remove {
|
if !opts.DryRun && opts.Remove {
|
||||||
xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
|
xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
|
||||||
@ -758,7 +758,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts
|
|||||||
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
if m, ok := isObjectDangling(partsMetadata, errs, []error{}); ok {
|
||||||
writeQuorum := m.Erasure.DataBlocks + 1
|
writeQuorum := m.Erasure.DataBlocks + 1
|
||||||
if m.Erasure.DataBlocks == 0 {
|
if m.Erasure.DataBlocks == 0 {
|
||||||
writeQuorum = len(storageDisks)/2 + 1
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
||||||
}
|
}
|
||||||
if !opts.DryRun && opts.Remove {
|
if !opts.DryRun && opts.Remove {
|
||||||
xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
xl.deleteObject(ctx, bucket, object, writeQuorum, false)
|
||||||
|
@ -377,61 +377,6 @@ func pickValidXLMeta(ctx context.Context, metaArr []xlMetaV1, modTime time.Time,
|
|||||||
// list of all errors that can be ignored in a metadata operation.
|
// list of all errors that can be ignored in a metadata operation.
|
||||||
var objMetadataOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound, errFileAccessDenied, errCorruptedFormat)
|
var objMetadataOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound, errFileAccessDenied, errCorruptedFormat)
|
||||||
|
|
||||||
// readXLMetaParts - returns the XL Metadata Parts from xl.json of one of the disks picked at random.
|
|
||||||
func (xl xlObjects) readXLMetaParts(ctx context.Context, bucket, object string) (xlMetaParts []ObjectPartInfo, xlMeta map[string]string, err error) {
|
|
||||||
var ignoredErrs []error
|
|
||||||
for _, disk := range xl.getLoadBalancedDisks() {
|
|
||||||
if disk == nil {
|
|
||||||
ignoredErrs = append(ignoredErrs, errDiskNotFound)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
xlMetaParts, xlMeta, err = readXLMetaParts(ctx, disk, bucket, object)
|
|
||||||
if err == nil {
|
|
||||||
return xlMetaParts, xlMeta, nil
|
|
||||||
}
|
|
||||||
// For any reason disk or bucket is not available continue
|
|
||||||
// and read from other disks.
|
|
||||||
if IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
|
|
||||||
ignoredErrs = append(ignoredErrs, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Error is not ignored, return right here.
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
// If all errors were ignored, reduce to maximal occurrence
|
|
||||||
// based on the read quorum.
|
|
||||||
readQuorum := len(xl.getDisks()) / 2
|
|
||||||
return nil, nil, reduceReadQuorumErrs(ctx, ignoredErrs, nil, readQuorum)
|
|
||||||
}
|
|
||||||
|
|
||||||
// readXLMetaStat - return xlMetaV1.Stat and xlMetaV1.Meta from one of the disks picked at random.
|
|
||||||
func (xl xlObjects) readXLMetaStat(ctx context.Context, bucket, object string) (xlStat statInfo, xlMeta map[string]string, err error) {
|
|
||||||
var ignoredErrs []error
|
|
||||||
for _, disk := range xl.getLoadBalancedDisks() {
|
|
||||||
if disk == nil {
|
|
||||||
ignoredErrs = append(ignoredErrs, errDiskNotFound)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// parses only xlMetaV1.Meta and xlMeta.Stat
|
|
||||||
xlStat, xlMeta, err = readXLMetaStat(ctx, disk, bucket, object)
|
|
||||||
if err == nil {
|
|
||||||
return xlStat, xlMeta, nil
|
|
||||||
}
|
|
||||||
// For any reason disk or bucket is not available continue
|
|
||||||
// and read from other disks.
|
|
||||||
if IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
|
|
||||||
ignoredErrs = append(ignoredErrs, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Error is not ignored, return right here.
|
|
||||||
return statInfo{}, nil, err
|
|
||||||
}
|
|
||||||
// If all errors were ignored, reduce to maximal occurrence
|
|
||||||
// based on the read quorum.
|
|
||||||
readQuorum := len(xl.getDisks()) / 2
|
|
||||||
return statInfo{}, nil, reduceReadQuorumErrs(ctx, ignoredErrs, nil, readQuorum)
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeXLMetadata - writes `xl.json` to a single disk.
|
// writeXLMetadata - writes `xl.json` to a single disk.
|
||||||
func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error {
|
func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error {
|
||||||
jsonFile := path.Join(prefix, xlMetaJSONFile)
|
jsonFile := path.Join(prefix, xlMetaJSONFile)
|
||||||
|
@ -17,10 +17,7 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,166 +26,6 @@ import (
|
|||||||
|
|
||||||
const ActualSize = 1000
|
const ActualSize = 1000
|
||||||
|
|
||||||
// Tests for reading XL object info.
|
|
||||||
func TestXLReadStat(t *testing.T) {
|
|
||||||
ExecObjectLayerDiskAlteredTest(t, testXLReadStat)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testXLReadStat(obj ObjectLayer, instanceType string, disks []string, t *testing.T) {
|
|
||||||
// Setup for the tests.
|
|
||||||
bucketName := getRandomBucketName()
|
|
||||||
objectName := "test-object"
|
|
||||||
// create bucket.
|
|
||||||
err := obj.MakeBucketWithLocation(context.Background(), bucketName, "")
|
|
||||||
// Stop the test if creation of the bucket fails.
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// set of byte data for PutObject.
|
|
||||||
// object has to be created before running tests for GetObject.
|
|
||||||
// this is required even to assert the GetObject data,
|
|
||||||
// since dataInserted === dataFetched back is a primary criteria for any object storage this assertion is critical.
|
|
||||||
bytesData := []struct {
|
|
||||||
byteData []byte
|
|
||||||
}{
|
|
||||||
{generateBytesData(6 * humanize.MiByte)},
|
|
||||||
}
|
|
||||||
// set of inputs for uploading the objects before tests for downloading is done.
|
|
||||||
putObjectInputs := []struct {
|
|
||||||
bucketName string
|
|
||||||
objectName string
|
|
||||||
contentLength int64
|
|
||||||
textData []byte
|
|
||||||
metaData map[string]string
|
|
||||||
}{
|
|
||||||
// case - 1.
|
|
||||||
{bucketName, objectName, int64(len(bytesData[0].byteData)), bytesData[0].byteData, make(map[string]string)},
|
|
||||||
}
|
|
||||||
// iterate through the above set of inputs and upkoad the object.
|
|
||||||
for i, input := range putObjectInputs {
|
|
||||||
// uploading the object.
|
|
||||||
_, err = obj.PutObject(context.Background(), input.bucketName, input.objectName, mustGetPutObjReader(t, bytes.NewBuffer(input.textData), input.contentLength, input.metaData["etag"], ""), ObjectOptions{UserDefined: input.metaData})
|
|
||||||
// if object upload fails stop the test.
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Put Object case %d: Error uploading object: <ERROR> %v", i+1, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
z := obj.(*xlZones)
|
|
||||||
xl := z.zones[0].sets[0]
|
|
||||||
_, _, err = xl.readXLMetaStat(context.Background(), bucketName, objectName)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove one disk.
|
|
||||||
removeDiskN(disks, 7)
|
|
||||||
|
|
||||||
// Removing disk shouldn't affect reading object info.
|
|
||||||
_, _, err = xl.readXLMetaStat(context.Background(), bucketName, objectName)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, disk := range disks {
|
|
||||||
os.RemoveAll(path.Join(disk, bucketName))
|
|
||||||
}
|
|
||||||
|
|
||||||
_, _, err = xl.readXLMetaStat(context.Background(), bucketName, objectName)
|
|
||||||
if err != errVolumeNotFound {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tests for reading XL meta parts.
|
|
||||||
func TestXLReadMetaParts(t *testing.T) {
|
|
||||||
ExecObjectLayerDiskAlteredTest(t, testXLReadMetaParts)
|
|
||||||
}
|
|
||||||
|
|
||||||
// testListObjectParts - Tests validate listing of object parts when disks go offline.
|
|
||||||
func testXLReadMetaParts(obj ObjectLayer, instanceType string, disks []string, t *testing.T) {
|
|
||||||
bucketNames := []string{"minio-bucket", "minio-2-bucket"}
|
|
||||||
objectNames := []string{"minio-object-1.txt"}
|
|
||||||
uploadIDs := []string{}
|
|
||||||
var opts ObjectOptions
|
|
||||||
|
|
||||||
// bucketnames[0].
|
|
||||||
// objectNames[0].
|
|
||||||
// uploadIds [0].
|
|
||||||
// Create bucket before intiating NewMultipartUpload.
|
|
||||||
err := obj.MakeBucketWithLocation(context.Background(), bucketNames[0], "")
|
|
||||||
if err != nil {
|
|
||||||
// Failed to create newbucket, abort.
|
|
||||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
|
||||||
}
|
|
||||||
// Initiate Multipart Upload on the above created bucket.
|
|
||||||
uploadID, err := obj.NewMultipartUpload(context.Background(), bucketNames[0], objectNames[0], opts)
|
|
||||||
if err != nil {
|
|
||||||
// Failed to create NewMultipartUpload, abort.
|
|
||||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
uploadIDs = append(uploadIDs, uploadID)
|
|
||||||
|
|
||||||
// Create multipart parts.
|
|
||||||
// Need parts to be uploaded before MultipartLists can be called and tested.
|
|
||||||
createPartCases := []struct {
|
|
||||||
bucketName string
|
|
||||||
objName string
|
|
||||||
uploadID string
|
|
||||||
PartID int
|
|
||||||
inputReaderData string
|
|
||||||
inputMd5 string
|
|
||||||
intputDataSize int64
|
|
||||||
expectedMd5 string
|
|
||||||
}{
|
|
||||||
// Case 1-4.
|
|
||||||
// Creating sequence of parts for same uploadID.
|
|
||||||
// Used to ensure that the ListMultipartResult produces one output for the four parts uploaded below for the given upload ID.
|
|
||||||
{bucketNames[0], objectNames[0], uploadIDs[0], 1, "abcd", "e2fc714c4727ee9395f324cd2e7f331f", int64(len("abcd")), "e2fc714c4727ee9395f324cd2e7f331f"},
|
|
||||||
{bucketNames[0], objectNames[0], uploadIDs[0], 2, "efgh", "1f7690ebdd9b4caf8fab49ca1757bf27", int64(len("efgh")), "1f7690ebdd9b4caf8fab49ca1757bf27"},
|
|
||||||
{bucketNames[0], objectNames[0], uploadIDs[0], 3, "ijkl", "09a0877d04abf8759f99adec02baf579", int64(len("abcd")), "09a0877d04abf8759f99adec02baf579"},
|
|
||||||
{bucketNames[0], objectNames[0], uploadIDs[0], 4, "mnop", "e132e96a5ddad6da8b07bba6f6131fef", int64(len("abcd")), "e132e96a5ddad6da8b07bba6f6131fef"},
|
|
||||||
}
|
|
||||||
sha256sum := ""
|
|
||||||
// Iterating over creatPartCases to generate multipart chunks.
|
|
||||||
for _, testCase := range createPartCases {
|
|
||||||
_, perr := obj.PutObjectPart(context.Background(), testCase.bucketName, testCase.objName, testCase.uploadID, testCase.PartID, mustGetPutObjReader(t, bytes.NewBufferString(testCase.inputReaderData), testCase.intputDataSize, testCase.inputMd5, sha256sum), opts)
|
|
||||||
if perr != nil {
|
|
||||||
t.Fatalf("%s : %s", instanceType, perr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
z := obj.(*xlZones)
|
|
||||||
xl := z.zones[0].sets[0]
|
|
||||||
uploadIDPath := xl.getUploadIDDir(bucketNames[0], objectNames[0], uploadIDs[0])
|
|
||||||
|
|
||||||
_, _, err = xl.readXLMetaParts(context.Background(), minioMetaMultipartBucket, uploadIDPath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove one disk.
|
|
||||||
removeDiskN(disks, 7)
|
|
||||||
|
|
||||||
// Removing disk shouldn't affect reading object parts info.
|
|
||||||
_, _, err = xl.readXLMetaParts(context.Background(), minioMetaMultipartBucket, uploadIDPath)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, disk := range disks {
|
|
||||||
os.RemoveAll(path.Join(disk, bucketNames[0]))
|
|
||||||
os.RemoveAll(path.Join(disk, minioMetaMultipartBucket, xl.getMultipartSHADir(bucketNames[0], objectNames[0])))
|
|
||||||
}
|
|
||||||
|
|
||||||
_, _, err = xl.readXLMetaParts(context.Background(), minioMetaMultipartBucket, uploadIDPath)
|
|
||||||
if err != errFileNotFound {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test xlMetaV1.AddObjectPart()
|
// Test xlMetaV1.AddObjectPart()
|
||||||
func TestAddObjectPart(t *testing.T) {
|
func TestAddObjectPart(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
@ -808,7 +808,8 @@ func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if now.Sub(fi.ModTime) > expiry {
|
if now.Sub(fi.ModTime) > expiry {
|
||||||
xl.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, len(xl.getDisks())/2+1, false)
|
writeQuorum := getWriteQuorum(len(xl.getDisks()))
|
||||||
|
xl.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, writeQuorum, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,7 +340,7 @@ func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string)
|
|||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
readQuorum := len(storageDisks) / 2
|
readQuorum := getReadQuorum(len(storageDisks))
|
||||||
err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum)
|
err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum)
|
||||||
return dirObjectInfo(bucket, object, 0, map[string]string{}), err
|
return dirObjectInfo(bucket, object, 0, map[string]string{}), err
|
||||||
}
|
}
|
||||||
@ -488,7 +488,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
|
|||||||
// Get parity and data drive count based on storage class metadata
|
// Get parity and data drive count based on storage class metadata
|
||||||
parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
||||||
if parityDrives == 0 {
|
if parityDrives == 0 {
|
||||||
parityDrives = len(storageDisks) / 2
|
parityDrives = getDefaultParityBlocks(len(storageDisks))
|
||||||
}
|
}
|
||||||
dataDrives := len(storageDisks) - parityDrives
|
dataDrives := len(storageDisks) - parityDrives
|
||||||
|
|
||||||
@ -842,11 +842,13 @@ func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []
|
|||||||
isObjectDirs[i] = HasSuffix(object, SlashSeparator)
|
isObjectDirs[i] = HasSuffix(object, SlashSeparator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
storageDisks := xl.getDisks()
|
||||||
|
|
||||||
for i, object := range objects {
|
for i, object := range objects {
|
||||||
if isObjectDirs[i] {
|
if isObjectDirs[i] {
|
||||||
_, err := xl.getObjectInfoDir(ctx, bucket, object)
|
_, err := xl.getObjectInfoDir(ctx, bucket, object)
|
||||||
if err == errXLReadQuorum {
|
if err == errXLReadQuorum {
|
||||||
if isObjectDirDangling(statAllDirs(ctx, xl.getDisks(), bucket, object)) {
|
if isObjectDirDangling(statAllDirs(ctx, storageDisks, bucket, object)) {
|
||||||
// If object is indeed dangling, purge it.
|
// If object is indeed dangling, purge it.
|
||||||
errs[i] = nil
|
errs[i] = nil
|
||||||
}
|
}
|
||||||
@ -868,7 +870,7 @@ func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []
|
|||||||
// class for objects which have reduced quorum
|
// class for objects which have reduced quorum
|
||||||
// storage class only needs to be honored for
|
// storage class only needs to be honored for
|
||||||
// Read() requests alone which we already do.
|
// Read() requests alone which we already do.
|
||||||
writeQuorums[i] = len(xl.getDisks())/2 + 1
|
writeQuorums[i] = getWriteQuorum(len(storageDisks))
|
||||||
}
|
}
|
||||||
|
|
||||||
return xl.doDeleteObjects(ctx, bucket, objects, errs, writeQuorums, isObjectDirs)
|
return xl.doDeleteObjects(ctx, bucket, objects, errs, writeQuorums, isObjectDirs)
|
||||||
@ -934,10 +936,12 @@ func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (er
|
|||||||
var writeQuorum int
|
var writeQuorum int
|
||||||
var isObjectDir = HasSuffix(object, SlashSeparator)
|
var isObjectDir = HasSuffix(object, SlashSeparator)
|
||||||
|
|
||||||
|
storageDisks := xl.getDisks()
|
||||||
|
|
||||||
if isObjectDir {
|
if isObjectDir {
|
||||||
_, err = xl.getObjectInfoDir(ctx, bucket, object)
|
_, err = xl.getObjectInfoDir(ctx, bucket, object)
|
||||||
if err == errXLReadQuorum {
|
if err == errXLReadQuorum {
|
||||||
if isObjectDirDangling(statAllDirs(ctx, xl.getDisks(), bucket, object)) {
|
if isObjectDirDangling(statAllDirs(ctx, storageDisks, bucket, object)) {
|
||||||
// If object is indeed dangling, purge it.
|
// If object is indeed dangling, purge it.
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
@ -948,10 +952,10 @@ func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (er
|
|||||||
}
|
}
|
||||||
|
|
||||||
if isObjectDir {
|
if isObjectDir {
|
||||||
writeQuorum = len(xl.getDisks())/2 + 1
|
writeQuorum = getWriteQuorum(len(storageDisks))
|
||||||
} else {
|
} else {
|
||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, bucket, object)
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err = objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
_, writeQuorum, err = objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -122,44 +122,6 @@ func xlMetaV1UnmarshalJSON(ctx context.Context, xlMetaBuf []byte) (xlMeta xlMeta
|
|||||||
return xlMeta, err
|
return xlMeta, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// read xl.json from the given disk, parse and return xlV1MetaV1.Parts.
|
|
||||||
func readXLMetaParts(ctx context.Context, disk StorageAPI, bucket string, object string) ([]ObjectPartInfo, map[string]string, error) {
|
|
||||||
// Reads entire `xl.json`.
|
|
||||||
xlMetaBuf, err := disk.ReadAll(bucket, path.Join(object, xlMetaJSONFile))
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var xlMeta xlMetaV1
|
|
||||||
xlMeta, err = xlMetaV1UnmarshalJSON(ctx, xlMetaBuf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return xlMeta.Parts, xlMeta.Meta, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// read xl.json from the given disk and parse xlV1Meta.Stat and xlV1Meta.Meta using jsoniter.
|
|
||||||
func readXLMetaStat(ctx context.Context, disk StorageAPI, bucket string, object string) (si statInfo,
|
|
||||||
mp map[string]string, e error) {
|
|
||||||
// Reads entire `xl.json`.
|
|
||||||
xlMetaBuf, err := disk.ReadAll(bucket, path.Join(object, xlMetaJSONFile))
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return si, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var xlMeta xlMetaV1
|
|
||||||
xlMeta, err = xlMetaV1UnmarshalJSON(ctx, xlMetaBuf)
|
|
||||||
if err != nil {
|
|
||||||
return si, mp, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return structured `xl.json`.
|
|
||||||
return xlMeta.Stat, xlMeta.Meta, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// readXLMeta reads `xl.json` and returns back XL metadata structure.
|
// readXLMeta reads `xl.json` and returns back XL metadata structure.
|
||||||
func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, err error) {
|
func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, err error) {
|
||||||
// Reads entire `xl.json`.
|
// Reads entire `xl.json`.
|
||||||
|
@ -42,7 +42,7 @@ Expansion of ellipses and choice of erasure sets based on this expansion is an a
|
|||||||
|
|
||||||
- We limited the number of drives to 16 for erasure set because, erasure code shards more than 16 can become chatty and do not have any performance advantages. Additionally since 16 drive erasure set gives you tolerance of 8 disks per object by default which is plenty in any practical scenario.
|
- We limited the number of drives to 16 for erasure set because, erasure code shards more than 16 can become chatty and do not have any performance advantages. Additionally since 16 drive erasure set gives you tolerance of 8 disks per object by default which is plenty in any practical scenario.
|
||||||
|
|
||||||
- Choice of erasure set size is automatic based on the number of disks available, let's say for example if there are 32 servers and 32 disks which is a total of 1024 disks. In this scenario 16 becomes the erasure set size. This is decided based on the greatest common divisor (GCD) of acceptable erasure set sizes ranging from *4, 6, 8, 10, 12, 14, 16*.
|
- Choice of erasure set size is automatic based on the number of disks available, let's say for example if there are 32 servers and 32 disks which is a total of 1024 disks. In this scenario 16 becomes the erasure set size. This is decided based on the greatest common divisor (GCD) of acceptable erasure set sizes ranging from *4 to 16*.
|
||||||
|
|
||||||
- *If total disks has many common divisors the algorithm chooses the minimum amounts of erasure sets possible for a erasure set size of any N*. In the example with 1024 disks - 4, 8, 16 are GCD factors. With 16 disks we get a total of 64 possible sets, with 8 disks we get a total of 128 possible sets, with 4 disks we get a total of 256 possible sets. So algorithm automatically chooses 64 sets, which is *16 * 64 = 1024* disks in total.
|
- *If total disks has many common divisors the algorithm chooses the minimum amounts of erasure sets possible for a erasure set size of any N*. In the example with 1024 disks - 4, 8, 16 are GCD factors. With 16 disks we get a total of 64 possible sets, with 8 disks we get a total of 128 possible sets, with 4 disks we get a total of 256 possible sets. So algorithm automatically chooses 64 sets, which is *16 * 64 = 1024* disks in total.
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ To start a distributed MinIO instance, you just need to pass drive locations as
|
|||||||
__NOTE:__
|
__NOTE:__
|
||||||
|
|
||||||
- All the nodes running distributed MinIO need to have same access key and secret key for the nodes to connect. To achieve this, it is __recommended__ to export access key and secret key as environment variables, `MINIO_ACCESS_KEY` and `MINIO_SECRET_KEY`, on all the nodes before executing MinIO server command.
|
- All the nodes running distributed MinIO need to have same access key and secret key for the nodes to connect. To achieve this, it is __recommended__ to export access key and secret key as environment variables, `MINIO_ACCESS_KEY` and `MINIO_SECRET_KEY`, on all the nodes before executing MinIO server command.
|
||||||
- __MinIO creates erasure-coding sets of 4, 6, 8, 10, 12, 14 or 16 drives. The number of drives you provide must be a multiple of one of those numbers.__
|
- __MinIO creates erasure-coding sets of *4* to *16* drives. The number of drives you provide must be a multiple of one of those numbers.__
|
||||||
- __MinIO chooses the largest EC set size which divides into the total number of drives given. For example, 8 drives will be used as a single EC set of size 8, not two sets of size 4.__
|
- __MinIO chooses the largest EC set size which divides into the total number of drives given. For example, 8 drives will be used as a single EC set of size 8, not two sets of size 4.__
|
||||||
- __Each object is written to a single EC set, and therefore is spread over no more than 16 drives.__
|
- __Each object is written to a single EC set, and therefore is spread over no more than 16 drives.__
|
||||||
- __All the nodes running distributed MinIO setup are recommended to be homogeneous, i.e. same operating system, same number of disks and same network interconnects.__
|
- __All the nodes running distributed MinIO setup are recommended to be homogeneous, i.e. same operating system, same number of disks and same network interconnects.__
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
golog "log"
|
golog "log"
|
||||||
"math"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
@ -237,8 +236,8 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa
|
|||||||
done := false
|
done := false
|
||||||
timeout := time.After(DRWMutexAcquireTimeout)
|
timeout := time.After(DRWMutexAcquireTimeout)
|
||||||
|
|
||||||
dquorum := int(len(restClnts)/2) + 1
|
dquorumReads := (len(restClnts) + 1) / 2
|
||||||
dquorumReads := int(math.Ceil(float64(len(restClnts)) / 2.0))
|
dquorum := dquorumReads + 1
|
||||||
|
|
||||||
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
|
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user