diff --git a/erasure-createfile.go b/erasure-createfile.go index 3321c6fe6..2bcd11d74 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -32,8 +32,7 @@ func (e erasure) cleanupCreateFileOps(volume, path string, writers []io.WriteClo } } -// WriteErasure reads predefined blocks, encodes them and writes to -// configured storage disks. +// WriteErasure reads predefined blocks, encodes them and writes to configured storage disks. func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) { // Release the block writer upon function return. defer wcloser.release() @@ -119,7 +118,8 @@ func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wclose // Write encoded data in routine. go func(index int, writer io.Writer) { defer wg.Done() - encodedData := dataBlocks[index] + // Pick the block from the distribution. + encodedData := dataBlocks[e.distribution[index]-1] _, wErr := writers[index].Write(encodedData) if wErr != nil { wErrs[index] = wErr diff --git a/erasure-readfile.go b/erasure-readfile.go index 0e247082d..6af4bffea 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -82,13 +82,14 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6 enBlocks := make([][]byte, len(e.storageDisks)) // Read all the readers. for index, reader := range readers { + blockIndex := e.distribution[index] - 1 // Initialize shard slice and fill the data from each parts. - enBlocks[index] = make([]byte, curEncBlockSize) + enBlocks[blockIndex] = make([]byte, curEncBlockSize) if reader == nil { continue } // Read the necessary blocks. - _, rErr := io.ReadFull(reader, enBlocks[index]) + _, rErr := io.ReadFull(reader, enBlocks[blockIndex]) if rErr != nil && rErr != io.ErrUnexpectedEOF { readers[index].Close() readers[index] = nil diff --git a/erasure.go b/erasure.go index f41a1eb40..80d9c6769 100644 --- a/erasure.go +++ b/erasure.go @@ -24,10 +24,11 @@ type erasure struct { DataBlocks int ParityBlocks int storageDisks []StorageAPI + distribution []int } // newErasure instantiate a new erasure. -func newErasure(disks []StorageAPI) *erasure { +func newErasure(disks []StorageAPI, distribution []int) *erasure { // Initialize E. e := &erasure{} @@ -46,6 +47,9 @@ func newErasure(disks []StorageAPI) *erasure { // Save all the initialized storage disks. e.storageDisks = disks + // Save the distribution. + e.distribution = distribution + // Return successfully initialized. return e } diff --git a/tree-walk-xl.go b/tree-walk-xl.go index 364e54425..eb1c2a683 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -82,7 +82,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) // getRandomDisk - gives a random disk at any point in time from the // available pool of disks. func (xl xlObjects) getRandomDisk() (disk StorageAPI) { - rand.Seed(time.Now().UTC().UnixNano()) + rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. randIndex := rand.Intn(len(xl.storageDisks) - 1) disk = xl.storageDisks[randIndex] // Pick a random disk. return disk diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 3abf00557..06f82ef10 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/json" "io" + "math/rand" "path" "sort" "sync" @@ -254,16 +255,15 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro // randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm. func randErasureDistribution(numBlocks int) []int { + rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. distribution := make([]int, numBlocks) for i := 0; i < numBlocks; i++ { distribution[i] = i + 1 } - /* - for i := 0; i < numBlocks; i++ { - // Choose index uniformly in [i, numBlocks-1] - r := i + rand.Intn(numBlocks-i) - distribution[r], distribution[i] = distribution[i], distribution[r] - } - */ + for i := 0; i < numBlocks; i++ { + // Choose index uniformly in [i, numBlocks-1] + r := i + rand.Intn(numBlocks-i) + distribution[r], distribution[i] = distribution[i], distribution[r] + } return distribution } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index d66e31815..4b0dccc06 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -136,11 +136,14 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s if err != nil { return "", toObjectErr(err, bucket, object) } + // Increment version only if we have online disks less than configured storage disks. if diskCount(onlineDisks) < len(xl.storageDisks) { higherVersion++ } - erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks + + // Initialize a new erasure with online disks and new distribution. + erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) partSuffix := fmt.Sprintf("object%d", partID) tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) diff --git a/xl-v1-object.go b/xl-v1-object.go index 88313dcef..ac242a294 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -41,16 +41,20 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read if err != nil { return nil, toObjectErr(err, bucket, object) } + // List all online disks. onlineDisks, _, err := xl.listOnlineDisks(bucket, object) if err != nil { return nil, toObjectErr(err, bucket, object) } + // For zero byte files, return a null reader. if xlMeta.Stat.Size == 0 { return nullReadCloser{}, nil } - erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks + + // Initialize a new erasure with online disks, with previous block distribution. + erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) // Get part index offset. partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset) @@ -208,16 +212,22 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1") tempObj := path.Join(tmpMetaPrefix, bucket, object) + // Initialize xl meta. + xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) + // List all online disks. onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object) if err != nil { return "", toObjectErr(err, bucket, object) } + // Increment version only if we have online disks less than configured storage disks. if diskCount(onlineDisks) < len(xl.storageDisks) { higherVersion++ } - erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks + + // Initialize a new erasure with online disks and new distribution. + erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj) if err != nil { return "", toObjectErr(err, bucket, object) @@ -301,7 +311,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. return "", toObjectErr(err, bucket, object) } - xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) + // Fill all the necessary metadata. xlMeta.Meta = metadata xlMeta.Stat.Size = size xlMeta.Stat.ModTime = modTime