diff --git a/cmd/erasure-healfile.go b/cmd/erasure-healfile.go index c77922fc9..b12f5655c 100644 --- a/cmd/erasure-healfile.go +++ b/cmd/erasure-healfile.go @@ -70,68 +70,71 @@ func (s ErasureStorage) HealFile(staleDisks []StorageAPI, volume, path string, b } writeErrors := make([]error, len(s.disks)) + // Read part file data on each disk + chunksize := ceilFrac(blocksize, int64(s.dataBlocks)) + numBlocks := ceilFrac(size, blocksize) + + readLen := chunksize * (numBlocks - 1) + + lastChunkSize := chunksize + hasSmallerLastBlock := size%blocksize != 0 + if hasSmallerLastBlock { + lastBlockLen := size % blocksize + lastChunkSize = ceilFrac(lastBlockLen, int64(s.dataBlocks)) + } + readLen += lastChunkSize + var buffers [][]byte + buffers, _, err = s.readConcurrent(volume, path, 0, readLen, verifiers) + if err != nil { + return f, err + } + // Scan part files on disk, block-by-block reconstruct it and // write to stale disks. - chunksize := getChunkSize(blocksize, s.dataBlocks) blocks := make([][]byte, len(s.disks)) - for i := range blocks { - blocks[i] = make([]byte, chunksize) + + if numBlocks > 1 { + // Allocate once for all the equal length blocks. The + // last block may have a different length - allocation + // for this happens inside the for loop below. + for i := range blocks { + if len(buffers[i]) == 0 { + blocks[i] = make([]byte, chunksize) + } + } } - var chunkOffset, blockOffset int64 - // The for loop below is entered when size == 0 and - // blockOffset == 0 to allow for reconstructing empty files. - for ; blockOffset == 0 || blockOffset < size; blockOffset += blocksize { - // last iteration may have less than blocksize data - // left, so chunksize needs to be recomputed. - if size < blockOffset+blocksize { - chunksize = getChunkSize(size-blockOffset, s.dataBlocks) + var buffOffset int64 + for blockNumber := int64(0); blockNumber < numBlocks; blockNumber++ { + if blockNumber == numBlocks-1 && lastChunkSize != chunksize { for i := range blocks { - blocks[i] = blocks[i][:chunksize] - } - } - // read a chunk from each disk, until we have - // `s.dataBlocks` number of chunks set to non-nil in - // `blocks` - numReads := 0 - for i, disk := range s.disks { - // skip reading from unavailable or stale disks - if disk == nil || staleDisks[i] != nil { - blocks[i] = blocks[i][:0] // mark shard as missing - continue - } - _, err = disk.ReadFile(volume, path, chunkOffset, blocks[i], verifiers[i]) - if err != nil { - // LOG FIXME: add a conditional log - // for read failures, once per-disk - // per-function-invocation. - blocks[i] = blocks[i][:0] // mark shard as missing - continue - } - numReads++ - if numReads == s.dataBlocks { - // we have enough data to reconstruct - // mark all other blocks as missing - for j := i + 1; j < len(blocks); j++ { - blocks[j] = blocks[j][:0] // mark shard as missing + if len(buffers[i]) == 0 { + blocks[i] = make([]byte, lastChunkSize) } - break } } - // advance the chunk offset to prepare for next loop - // iteration - chunkOffset += chunksize - - // reconstruct data - this computes all data and - // parity shards - but we skip this step if we are - // reconstructing an empty file. - if chunksize > 0 { - if err = s.ErasureDecodeDataAndParityBlocks(blocks); err != nil { - return f, err + for i := range blocks { + if len(buffers[i]) == 0 { + blocks[i] = blocks[i][0:0] } } + csize := chunksize + if blockNumber == numBlocks-1 { + csize = lastChunkSize + } + for i := range blocks { + if len(buffers[i]) != 0 { + blocks[i] = buffers[i][buffOffset : buffOffset+csize] + } + } + buffOffset += csize + + if err = s.ErasureDecodeDataAndParityBlocks(blocks); err != nil { + return f, err + } + // write computed shards as chunks on file in each // stale disk writeSucceeded := false diff --git a/cmd/erasure-readfile.go b/cmd/erasure-readfile.go index d74998636..326ea148c 100644 --- a/cmd/erasure-readfile.go +++ b/cmd/erasure-readfile.go @@ -22,10 +22,74 @@ import ( "github.com/minio/minio/pkg/errors" ) -// ReadFile reads as much data as requested from the file under the given volume and path and writes the data to the provided writer. -// The algorithm and the keys/checksums are used to verify the integrity of the given file. ReadFile will read data from the given offset -// up to the given length. If parts of the file are corrupted ReadFile tries to reconstruct the data. -func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset, length int64, totalLength int64, checksums [][]byte, algorithm BitrotAlgorithm, blocksize int64) (f ErasureFileInfo, err error) { +type errIdx struct { + idx int + err error +} + +func (s ErasureStorage) readConcurrent(volume, path string, offset, length int64, + verifiers []*BitrotVerifier) (buffers [][]byte, needsReconstruction bool, + err error) { + + errChan := make(chan errIdx) + stageBuffers := make([][]byte, len(s.disks)) + buffers = make([][]byte, len(s.disks)) + + readDisk := func(i int) { + stageBuffers[i] = make([]byte, length) + disk := s.disks[i] + if disk == OfflineDisk { + errChan <- errIdx{i, errors.Trace(errDiskNotFound)} + return + } + _, rerr := disk.ReadFile(volume, path, offset, stageBuffers[i], verifiers[i]) + errChan <- errIdx{i, rerr} + } + + var finishedCount, successCount, launchIndex int + + for ; launchIndex < s.dataBlocks; launchIndex++ { + go readDisk(launchIndex) + } + for finishedCount < launchIndex { + select { + case errVal := <-errChan: + finishedCount++ + if errVal.err != nil { + // TODO: meaningfully log the disk read error + + // A disk failed to return data, so we + // request an additional disk if possible + if launchIndex < s.dataBlocks+s.parityBlocks { + needsReconstruction = true + // requiredBlocks++ + go readDisk(launchIndex) + launchIndex++ + } + } else { + successCount++ + buffers[errVal.idx] = stageBuffers[errVal.idx] + stageBuffers[errVal.idx] = nil + } + } + } + if successCount != s.dataBlocks { + // Not enough disks returns data. + err = errors.Trace(errXLReadQuorum) + } + return +} + +// ReadFile reads as much data as requested from the file under the +// given volume and path and writes the data to the provided writer. +// The algorithm and the keys/checksums are used to verify the +// integrity of the given file. ReadFile will read data from the given +// offset up to the given length. If parts of the file are corrupted +// ReadFile tries to reconstruct the data. +func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset, + length, totalLength int64, checksums [][]byte, algorithm BitrotAlgorithm, + blocksize int64) (f ErasureFileInfo, err error) { + if offset < 0 || length < 0 { return f, errors.Trace(errUnexpected) } @@ -44,117 +108,122 @@ func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset, } verifiers[i] = NewBitrotVerifier(algorithm, checksums[i]) } - errChans := make([]chan error, len(s.disks)) - for i := range errChans { - errChans[i] = make(chan error, 1) - } - lastBlock := totalLength / blocksize - startOffset := offset % blocksize - chunksize := getChunkSize(blocksize, s.dataBlocks) + chunksize := ceilFrac(blocksize, int64(s.dataBlocks)) + + // We read all whole-blocks of erasure coded data containing + // the requested data range. + // + // The start index of the erasure coded block containing the + // `offset` byte of data is: + partDataStartIndex := (offset / blocksize) * chunksize + // The start index of the erasure coded block containing the + // (last) byte of data at the index `offset + length - 1` is: + blockStartIndex := ((offset + length - 1) / blocksize) * chunksize + // However, we need the end index of the e.c. block containing + // the last byte - we need to check if that block is the last + // block in the part (in that case, it may be have a different + // chunk size) + isLastBlock := (totalLength-1)/blocksize == (offset+length-1)/blocksize + var partDataEndIndex int64 + if isLastBlock { + lastBlockChunkSize := chunksize + if totalLength%blocksize != 0 { + lastBlockChunkSize = ceilFrac(totalLength%blocksize, int64(s.dataBlocks)) + } + partDataEndIndex = blockStartIndex + lastBlockChunkSize - 1 + } else { + partDataEndIndex = blockStartIndex + chunksize - 1 + } + + // Thus, the length of data to be read from the part file(s) is: + partDataLength := partDataEndIndex - partDataStartIndex + 1 + // The calculation above does not apply when length == 0: + if length == 0 { + partDataLength = 0 + } + + var buffers [][]byte + var needsReconstruction bool + buffers, needsReconstruction, err = s.readConcurrent(volume, path, + partDataStartIndex, partDataLength, verifiers) + if err != nil { + // Could not read enough disks. + return + } + + numChunks := ceilFrac(partDataLength, chunksize) blocks := make([][]byte, len(s.disks)) - for i := range blocks { - blocks[i] = make([]byte, chunksize) - } - for off := offset / blocksize; length > 0; off++ { - blockOffset := off * chunksize - if currentBlock := (offset + f.Size) / blocksize; currentBlock == lastBlock { - blocksize = totalLength % blocksize - chunksize = getChunkSize(blocksize, s.dataBlocks) - for i := range blocks { - blocks[i] = blocks[i][:chunksize] + if needsReconstruction && numChunks > 1 { + // Allocate once for all the equal length blocks. The + // last block may have a different length - allocation + // for this happens inside the for loop below. + for i := range blocks { + if len(buffers[i]) == 0 { + blocks[i] = make([]byte, chunksize) } } - err = s.readConcurrent(volume, path, blockOffset, blocks, verifiers, errChans) - if err != nil { - return f, errors.Trace(errXLReadQuorum) + } + + var buffOffset int64 + for chunkNumber := int64(0); chunkNumber < numChunks; chunkNumber++ { + if chunkNumber == numChunks-1 && partDataLength%chunksize != 0 { + chunksize = partDataLength % chunksize + // We allocate again as the last chunk has a + // different size. + for i := range blocks { + if len(buffers[i]) == 0 { + blocks[i] = make([]byte, chunksize) + } + } } - writeLength := blocksize - startOffset - if length < writeLength { - writeLength = length + for i := range blocks { + if len(buffers[i]) == 0 { + blocks[i] = blocks[i][0:0] + } } - n, err := writeDataBlocks(writer, blocks, s.dataBlocks, startOffset, writeLength) + + for i := range blocks { + if len(buffers[i]) != 0 { + blocks[i] = buffers[i][buffOffset : buffOffset+chunksize] + } + } + buffOffset += chunksize + + if needsReconstruction { + if err = s.ErasureDecodeDataBlocks(blocks); err != nil { + return f, errors.Trace(err) + } + } + + var writeStart int64 + if chunkNumber == 0 { + writeStart = offset % blocksize + } + + writeLength := blocksize - writeStart + if chunkNumber == numChunks-1 { + lastBlockLength := (offset + length) % blocksize + if lastBlockLength != 0 { + writeLength = lastBlockLength - writeStart + } + } + n, err := writeDataBlocks(writer, blocks, s.dataBlocks, writeStart, writeLength) if err != nil { return f, err } - startOffset = 0 + f.Size += n - length -= n } f.Algorithm = algorithm for i, disk := range s.disks { - if disk == OfflineDisk { + if disk == OfflineDisk || buffers[i] == nil { continue } f.Checksums[i] = verifiers[i].Sum(nil) } return f, nil } - -func erasureCountMissingBlocks(blocks [][]byte, limit int) int { - missing := 0 - for i := range blocks[:limit] { - if len(blocks[i]) == 0 { - missing++ - } - } - return missing -} - -// readConcurrent reads all requested data concurrently from the disks into blocks. It returns an error if -// too many disks failed while reading. -func (s *ErasureStorage) readConcurrent(volume, path string, offset int64, blocks [][]byte, verifiers []*BitrotVerifier, errChans []chan error) (err error) { - errs := make([]error, len(s.disks)) - - erasureReadBlocksConcurrent(s.disks[:s.dataBlocks], volume, path, offset, blocks[:s.dataBlocks], verifiers[:s.dataBlocks], errs[:s.dataBlocks], errChans[:s.dataBlocks]) - missingDataBlocks := erasureCountMissingBlocks(blocks, s.dataBlocks) - mustReconstruct := missingDataBlocks > 0 - if mustReconstruct { - requiredReads := s.dataBlocks + missingDataBlocks - if requiredReads > s.dataBlocks+s.parityBlocks { - return errXLReadQuorum - } - erasureReadBlocksConcurrent(s.disks[s.dataBlocks:requiredReads], volume, path, offset, blocks[s.dataBlocks:requiredReads], verifiers[s.dataBlocks:requiredReads], errs[s.dataBlocks:requiredReads], errChans[s.dataBlocks:requiredReads]) - if erasureCountMissingBlocks(blocks, requiredReads) > 0 { - erasureReadBlocksConcurrent(s.disks[requiredReads:], volume, path, offset, blocks[requiredReads:], verifiers[requiredReads:], errs[requiredReads:], errChans[requiredReads:]) - } - } - if err = reduceReadQuorumErrs(errs, []error{}, s.dataBlocks); err != nil { - return err - } - if mustReconstruct { - if err = s.ErasureDecodeDataBlocks(blocks); err != nil { - return err - } - } - return nil -} - -// erasureReadBlocksConcurrent reads all data from each disk to each data block in parallel. -// Therefore disks, blocks, verifiers errors and locks must have the same length. -func erasureReadBlocksConcurrent(disks []StorageAPI, volume, path string, offset int64, blocks [][]byte, verifiers []*BitrotVerifier, errors []error, errChans []chan error) { - for i := range errChans { - go erasureReadFromFile(disks[i], volume, path, offset, blocks[i], verifiers[i], errChans[i]) - } - for i := range errChans { - errors[i] = <-errChans[i] // blocks until the go routine 'i' is done - no data race - if errors[i] != nil { - disks[i] = OfflineDisk - blocks[i] = blocks[i][:0] // mark shard as missing - } - } -} - -// erasureReadFromFile reads data from the disk to buffer in parallel. -// It sends the returned error through the error channel. -func erasureReadFromFile(disk StorageAPI, volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier, errChan chan<- error) { - if disk == OfflineDisk { - errChan <- errors.Trace(errDiskNotFound) - return - } - _, err := disk.ReadFile(volume, path, offset, buffer, verifier) - errChan <- err -} diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index edbe0bdfc..a6346e9d5 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -98,11 +98,3 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset in // Success. return totalWritten, nil } - -// chunkSize is roughly BlockSize/DataBlocks. -// chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. -// So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by -// DataBlocks. The extra space will have 0-padding. -func getChunkSize(blockSize int64, dataBlocks int) int64 { - return (blockSize + int64(dataBlocks) - 1) / int64(dataBlocks) -} diff --git a/cmd/erasure-utils_test.go b/cmd/erasure-utils_test.go deleted file mode 100644 index cb7065c3c..000000000 --- a/cmd/erasure-utils_test.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import "testing" - -// Tests validate the output of getChunkSize. -func TestGetChunkSize(t *testing.T) { - // Refer to comments on getChunkSize() for details. - testCases := []struct { - blockSize int64 - dataBlocks int - // expected result. - expectedChunkSize int64 - }{ - { - 10, - 10, - 1, - }, - { - 10, - 11, - 1, - }, - { - 10, - 9, - 2, - }, - } - // Verify getChunkSize() for the test cases. - for i, testCase := range testCases { - got := getChunkSize(testCase.blockSize, testCase.dataBlocks) - if testCase.expectedChunkSize != got { - t.Errorf("Test %d : expected=%d got=%d", i+1, testCase.expectedChunkSize, got) - } - } -} diff --git a/cmd/posix.go b/cmd/posix.go index 3ac6ebcdd..ac9f59163 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -610,7 +610,7 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif return 0, errIsNotRegular } - if verifier != nil && !verifier.IsVerified() { + if verifier != nil { bufp := s.pool.Get().(*[]byte) defer s.pool.Put(bufp) diff --git a/cmd/utils.go b/cmd/utils.go index 0602113c9..200e17e73 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -294,3 +294,23 @@ func jsonSave(f interface { } return nil } + +// ceilFrac takes a numerator and denominator representing a fraction +// and returns its ceiling. If denominator is 0, it returns 0 instead +// of crashing. +func ceilFrac(numerator, denominator int64) (ceil int64) { + if denominator == 0 { + // do nothing on invalid input + return + } + // Make denominator positive + if denominator < 0 { + numerator = -numerator + denominator = -denominator + } + ceil = numerator / denominator + if numerator > 0 && numerator%denominator != 0 { + ceil++ + } + return +} diff --git a/cmd/utils_test.go b/cmd/utils_test.go index a09ffe68e..8dc61ba5d 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -409,3 +409,28 @@ func TestJSONSave(t *testing.T) { t.Fatal("Size should not differ after jsonSave()", fi1.Size(), fi2.Size(), f.Name()) } } + +// Test ceilFrac +func TestCeilFrac(t *testing.T) { + cases := []struct { + numerator, denominator, ceiling int64 + }{ + {0, 1, 0}, + {-1, 2, 0}, + {1, 2, 1}, + {1, 1, 1}, + {3, 2, 2}, + {54, 11, 5}, + {45, 11, 5}, + {-4, 3, -1}, + {4, -3, -1}, + {-4, -3, 2}, + {3, 0, 0}, + } + for i, testCase := range cases { + ceiling := ceilFrac(testCase.numerator, testCase.denominator) + if ceiling != testCase.ceiling { + t.Errorf("Case %d: Unexpected result: %d", i, ceiling) + } + } +} diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 222e6b39a..b044a70bb 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -74,11 +74,11 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { // Calculate the space occupied by an object in a single disk func (xl xlObjects) sizeOnDisk(fileSize int64, blockSize int64, dataBlocks int) int64 { numBlocks := fileSize / blockSize - chunkSize := getChunkSize(blockSize, dataBlocks) + chunkSize := ceilFrac(blockSize, int64(dataBlocks)) sizeInDisk := numBlocks * chunkSize remaining := fileSize % blockSize if remaining > 0 { - sizeInDisk += getChunkSize(remaining, dataBlocks) + sizeInDisk += ceilFrac(remaining, int64(dataBlocks)) } return sizeInDisk