From ce02ab613d21bd777eeb35386d0eb6676c2499a2 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Mon, 6 Aug 2018 15:14:08 -0700 Subject: [PATCH] Simplify erasure code by separating bitrot from erasure code (#5959) --- cmd/bitrot.go | 192 ++++++++++++++++++++ cmd/bitrot_test.go | 71 ++++++++ cmd/erasure-createfile.go | 141 ++++++++------- cmd/erasure-createfile_test.go | 76 +++++--- cmd/erasure-healfile.go | 178 ++----------------- cmd/erasure-healfile_test.go | 60 ++++--- cmd/erasure-readfile.go | 314 ++++++++++++++------------------- cmd/erasure-readfile_test.go | 153 +++++++++++----- cmd/erasure-utils.go | 22 +++ cmd/erasure.go | 61 ++----- cmd/erasure_test.go | 3 +- cmd/posix.go | 63 ++++--- cmd/posix_test.go | 24 +-- cmd/storage-errors.go | 3 + cmd/storage-rpc-client.go | 1 - cmd/storage-rpc_test.go | 8 +- cmd/xl-v1-common.go | 13 -- cmd/xl-v1-healing-common.go | 1 - cmd/xl-v1-healing.go | 33 ++-- cmd/xl-v1-metadata.go | 116 ++---------- cmd/xl-v1-multipart.go | 26 ++- cmd/xl-v1-object.go | 64 ++++--- cmd/xl-v1-utils_test.go | 2 +- 23 files changed, 874 insertions(+), 751 deletions(-) create mode 100644 cmd/bitrot.go create mode 100644 cmd/bitrot_test.go diff --git a/cmd/bitrot.go b/cmd/bitrot.go new file mode 100644 index 000000000..891583b8e --- /dev/null +++ b/cmd/bitrot.go @@ -0,0 +1,192 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "crypto/sha256" + "errors" + "hash" + + "github.com/minio/highwayhash" + "github.com/minio/minio/cmd/logger" + "golang.org/x/crypto/blake2b" +) + +// magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key. +var magicHighwayHash256Key = []byte("\x4b\xe7\x34\xfa\x8e\x23\x8a\xcd\x26\x3e\x83\xe6\xbb\x96\x85\x52\x04\x0f\x93\x5d\xa3\x9f\x44\x14\x97\xe0\x9d\x13\x22\xde\x36\xa0") + +// BitrotAlgorithm specifies a algorithm used for bitrot protection. +type BitrotAlgorithm uint + +const ( + // SHA256 represents the SHA-256 hash function + SHA256 BitrotAlgorithm = 1 + iota + // HighwayHash256 represents the HighwayHash-256 hash function + HighwayHash256 + // BLAKE2b512 represents the BLAKE2b-512 hash function + BLAKE2b512 +) + +// DefaultBitrotAlgorithm is the default algorithm used for bitrot protection. +const ( + DefaultBitrotAlgorithm = HighwayHash256 +) + +var bitrotAlgorithms = map[BitrotAlgorithm]string{ + SHA256: "sha256", + BLAKE2b512: "blake2b", + HighwayHash256: "highwayhash256", +} + +// New returns a new hash.Hash calculating the given bitrot algorithm. +func (a BitrotAlgorithm) New() hash.Hash { + switch a { + case SHA256: + return sha256.New() + case BLAKE2b512: + b2, _ := blake2b.New512(nil) // New512 never returns an error if the key is nil + return b2 + case HighwayHash256: + hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit + return hh + default: + logger.CriticalIf(context.Background(), errors.New("Unsupported bitrot algorithm")) + return nil + } +} + +// Available reports whether the given algorihm is available. +func (a BitrotAlgorithm) Available() bool { + _, ok := bitrotAlgorithms[a] + return ok +} + +// String returns the string identifier for a given bitrot algorithm. +// If the algorithm is not supported String panics. +func (a BitrotAlgorithm) String() string { + name, ok := bitrotAlgorithms[a] + if !ok { + logger.CriticalIf(context.Background(), errors.New("Unsupported bitrot algorithm")) + } + return name +} + +// NewBitrotVerifier returns a new BitrotVerifier implementing the given algorithm. +func NewBitrotVerifier(algorithm BitrotAlgorithm, checksum []byte) *BitrotVerifier { + return &BitrotVerifier{algorithm, checksum} +} + +// BitrotVerifier can be used to verify protected data. +type BitrotVerifier struct { + algorithm BitrotAlgorithm + sum []byte +} + +// BitrotAlgorithmFromString returns a bitrot algorithm from the given string representation. +// It returns 0 if the string representation does not match any supported algorithm. +// The zero value of a bitrot algorithm is never supported. +func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) { + for alg, name := range bitrotAlgorithms { + if name == s { + return alg + } + } + return +} + +// To read bit-rot verified data. +type bitrotReader struct { + disk StorageAPI + volume string + filePath string + verifier *BitrotVerifier // Holds the bit-rot info + endOffset int64 // Affects the length of data requested in disk.ReadFile depending on Read()'s offset + buf []byte // Holds bit-rot verified data +} + +// newBitrotReader returns bitrotReader. +// Note that the buffer is allocated later in Read(). This is because we will know the buffer length only +// during the bitrotReader.Read(). Depending on when parallelReader fails-over, the buffer length can be different. +func newBitrotReader(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm, endOffset int64, sum []byte) *bitrotReader { + return &bitrotReader{ + disk: disk, + volume: volume, + filePath: filePath, + verifier: &BitrotVerifier{algo, sum}, + endOffset: endOffset, + buf: nil, + } +} + +// ReadChunk returns requested data. +func (b *bitrotReader) ReadChunk(offset int64, length int64) ([]byte, error) { + if b.buf == nil { + b.buf = make([]byte, b.endOffset-offset) + if _, err := b.disk.ReadFile(b.volume, b.filePath, offset, b.buf, b.verifier); err != nil { + logger.LogIf(context.Background(), err) + return nil, err + } + } + if int64(len(b.buf)) < length { + logger.LogIf(context.Background(), errLessData) + return nil, errLessData + } + retBuf := b.buf[:length] + b.buf = b.buf[length:] + return retBuf, nil +} + +// To calculate the bit-rot of the written data. +type bitrotWriter struct { + disk StorageAPI + volume string + filePath string + h hash.Hash +} + +// newBitrotWriter returns bitrotWriter. +func newBitrotWriter(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm) *bitrotWriter { + return &bitrotWriter{ + disk: disk, + volume: volume, + filePath: filePath, + h: algo.New(), + } +} + +// Append appends the data and while calculating the hash. +func (b *bitrotWriter) Append(buf []byte) error { + n, err := b.h.Write(buf) + if err != nil { + return err + } + if n != len(buf) { + logger.LogIf(context.Background(), errUnexpected) + return errUnexpected + } + if err = b.disk.AppendFile(b.volume, b.filePath, buf); err != nil { + logger.LogIf(context.Background(), err) + return err + } + return nil +} + +// Sum returns bit-rot sum. +func (b *bitrotWriter) Sum() []byte { + return b.h.Sum(nil) +} diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go new file mode 100644 index 000000000..1e66aa90e --- /dev/null +++ b/cmd/bitrot_test.go @@ -0,0 +1,71 @@ +/* + * Minio Cloud Storage, (C) 2018 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "io/ioutil" + "log" + "os" + "testing" +) + +func TestBitrotReaderWriter(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + volume := "testvol" + filePath := "testfile" + + disk, err := newPosix(tmpDir) + if err != nil { + t.Fatal(err) + } + + disk.MakeVol(volume) + + writer := newBitrotWriter(disk, volume, filePath, HighwayHash256) + + err = writer.Append([]byte("aaaaaaaaa")) + if err != nil { + log.Fatal(err) + } + err = writer.Append([]byte("a")) + if err != nil { + log.Fatal(err) + } + err = writer.Append([]byte("aaaaaaaaaa")) + if err != nil { + log.Fatal(err) + } + err = writer.Append([]byte("aaaaa")) + if err != nil { + log.Fatal(err) + } + err = writer.Append([]byte("aaaaaaaaaa")) + if err != nil { + log.Fatal(err) + } + + reader := newBitrotReader(disk, volume, filePath, HighwayHash256, 35, writer.Sum()) + + if _, err = reader.ReadChunk(0, 35); err != nil { + log.Fatal(err) + } +} diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index 5cb2c3546..1d0e28361 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -18,85 +18,92 @@ package cmd import ( "context" - "hash" "io" + "sync" + "github.com/minio/minio/cmd/logger" ) -// CreateFile creates a new bitrot encoded file spread over all available disks. CreateFile will create -// the file at the given volume and path. It will read from src until an io.EOF occurs. The given algorithm will -// be used to protect the erasure encoded file. -func (s *ErasureStorage) CreateFile(ctx context.Context, src io.Reader, volume, path string, buffer []byte, algorithm BitrotAlgorithm, writeQuorum int) (f ErasureFileInfo, err error) { - if !algorithm.Available() { - logger.LogIf(ctx, errBitrotHashAlgoInvalid) - return f, errBitrotHashAlgoInvalid - } - f.Checksums = make([][]byte, len(s.disks)) - hashers := make([]hash.Hash, len(s.disks)) - for i := range hashers { - hashers[i] = algorithm.New() - } - errChans, errs := make([]chan error, len(s.disks)), make([]error, len(s.disks)) - for i := range errChans { - errChans[i] = make(chan error, 1) // create buffered channel to let finished go-routines die early - } +// Writes in parallel to bitrotWriters +type parallelWriter struct { + writers []*bitrotWriter + writeQuorum int + errs []error +} - var blocks [][]byte - var n = len(buffer) - for n == len(buffer) { - n, err = io.ReadFull(src, buffer) - if n == 0 && err == io.EOF { - if f.Size != 0 { // don't write empty block if we have written to the disks - break - } - blocks = make([][]byte, len(s.disks)) // write empty block - } else if err == nil || (n > 0 && err == io.ErrUnexpectedEOF) { - blocks, err = s.ErasureEncode(ctx, buffer[:n]) - if err != nil { - return f, err - } - } else { - logger.LogIf(ctx, err) - return f, err - } +// Append appends data to bitrotWriters in parallel. +func (p *parallelWriter) Append(ctx context.Context, blocks [][]byte) error { + var wg sync.WaitGroup - for i := range errChans { // span workers - go erasureAppendFile(ctx, s.disks[i], volume, path, hashers[i], blocks[i], errChans[i]) - } - for i := range errChans { // wait until all workers are finished - errs[i] = <-errChans[i] - } - if err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum); err != nil { - return f, err - } - s.disks = evalDisks(s.disks, errs) - f.Size += int64(n) - } - - f.Algorithm = algorithm - for i, disk := range s.disks { - if disk == OfflineDisk { + for i := range p.writers { + if p.writers[i] == nil { + p.errs[i] = errDiskNotFound continue } - f.Checksums[i] = hashers[i].Sum(nil) + + wg.Add(1) + go func(i int) { + defer wg.Done() + p.errs[i] = p.writers[i].Append(blocks[i]) + if p.errs[i] != nil { + p.writers[i] = nil + } + }(i) } - return f, nil + wg.Wait() + + // If nilCount >= p.writeQuorum, we return nil. This is because HealFile() uses + // CreateFile with p.writeQuorum=1 to accommodate healing of single disk. + // i.e if we do no return here in such a case, reduceWriteQuorumErrs() would + // return a quorum error to HealFile(). + nilCount := 0 + for _, err := range p.errs { + if err == nil { + nilCount++ + } + } + if nilCount >= p.writeQuorum { + return nil + } + return reduceWriteQuorumErrs(ctx, p.errs, objectOpIgnoredErrs, p.writeQuorum) } -// erasureAppendFile appends the content of buf to the file on the given disk and updates computes -// the hash of the written data. It sends the write error (or nil) over the error channel. -func erasureAppendFile(ctx context.Context, disk StorageAPI, volume, path string, hash hash.Hash, buf []byte, errChan chan<- error) { - if disk == OfflineDisk { - logger.LogIf(ctx, errDiskNotFound) - errChan <- errDiskNotFound - return +// CreateFile reads from the reader, erasure-encodes the data and writes to the writers. +func (s *ErasureStorage) CreateFile(ctx context.Context, src io.Reader, writers []*bitrotWriter, buf []byte, quorum int) (total int64, err error) { + writer := ¶llelWriter{ + writers: writers, + writeQuorum: quorum, + errs: make([]error, len(writers)), } - err := disk.AppendFile(volume, path, buf) - if err != nil { - errChan <- err - return + + for { + var blocks [][]byte + n, err := io.ReadFull(src, buf) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + logger.LogIf(ctx, err) + return 0, err + } + eof := err == io.EOF || err == io.ErrUnexpectedEOF + if n == 0 && total != 0 { + // Reached EOF, nothing more to be done. + break + } + // We take care of the situation where if n == 0 and total == 0 by creating empty data and parity files. + blocks, err = s.ErasureEncode(ctx, buf[:n]) + if err != nil { + logger.LogIf(ctx, err) + return 0, err + } + + if err = writer.Append(ctx, blocks); err != nil { + logger.LogIf(ctx, err) + return 0, err + } + total += int64(n) + if eof { + break + } } - hash.Write(buf) - errChan <- err + return total, nil } diff --git a/cmd/erasure-createfile_test.go b/cmd/erasure-createfile_test.go index 4cd6dd902..15accf800 100644 --- a/cmd/erasure-createfile_test.go +++ b/cmd/erasure-createfile_test.go @@ -54,15 +54,14 @@ var erasureCreateFileTests = []struct { {dataBlocks: 7, onDisks: 14, offDisks: 7, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 9 {dataBlocks: 8, onDisks: 16, offDisks: 8, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 10 {dataBlocks: 5, onDisks: 10, offDisks: 3, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 11 - {dataBlocks: 6, onDisks: 12, offDisks: 5, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 102, algorithm: 0, shouldFail: true, shouldFailQuorum: false}, // 12 - {dataBlocks: 3, onDisks: 6, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: oneMiByte / 2, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 13 - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(oneMiByte / 2), data: oneMiByte, offset: oneMiByte/2 + 1, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 14 - {dataBlocks: 4, onDisks: 8, offDisks: 0, blocksize: int64(oneMiByte - 1), data: oneMiByte, offset: oneMiByte - 1, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 15 - {dataBlocks: 8, onDisks: 12, offDisks: 2, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 2, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 16 - {dataBlocks: 8, onDisks: 10, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 17 - {dataBlocks: 10, onDisks: 14, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 17, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 18 - {dataBlocks: 2, onDisks: 6, offDisks: 2, blocksize: int64(oneMiByte), data: oneMiByte, offset: oneMiByte / 2, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 19 - {dataBlocks: 10, onDisks: 16, offDisks: 8, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 20 + {dataBlocks: 3, onDisks: 6, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: oneMiByte / 2, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 12 + {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(oneMiByte / 2), data: oneMiByte, offset: oneMiByte/2 + 1, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 13 + {dataBlocks: 4, onDisks: 8, offDisks: 0, blocksize: int64(oneMiByte - 1), data: oneMiByte, offset: oneMiByte - 1, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 14 + {dataBlocks: 8, onDisks: 12, offDisks: 2, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 2, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 15 + {dataBlocks: 8, onDisks: 10, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 16 + {dataBlocks: 10, onDisks: 14, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 17, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 17 + {dataBlocks: 2, onDisks: 6, offDisks: 2, blocksize: int64(oneMiByte), data: oneMiByte, offset: oneMiByte / 2, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 18 + {dataBlocks: 10, onDisks: 16, offDisks: 8, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 19 } func TestErasureCreateFile(t *testing.T) { @@ -71,7 +70,8 @@ func TestErasureCreateFile(t *testing.T) { if err != nil { t.Fatalf("Test %d: failed to create test setup: %v", i, err) } - storage, err := NewErasureStorage(context.Background(), setup.disks, test.dataBlocks, test.onDisks-test.dataBlocks, test.blocksize) + disks := setup.disks + storage, err := NewErasureStorage(context.Background(), test.dataBlocks, test.onDisks-test.dataBlocks, test.blocksize) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create ErasureStorage: %v", i, err) @@ -83,25 +83,43 @@ func TestErasureCreateFile(t *testing.T) { setup.Remove() t.Fatalf("Test %d: failed to generate random test data: %v", i, err) } - file, err := storage.CreateFile(context.Background(), bytes.NewReader(data[test.offset:]), "testbucket", "object", buffer, test.algorithm, test.dataBlocks+1) + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + if disk == OfflineDisk { + continue + } + writers[i] = newBitrotWriter(disk, "testbucket", "object", test.algorithm) + } + n, err := storage.CreateFile(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, storage.dataBlocks+1) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but failed with: %v", i, err) } if err == nil && test.shouldFail { t.Errorf("Test %d: should fail but it passed", i) } - - if err == nil { - if length := int64(len(data[test.offset:])); file.Size != length { - t.Errorf("Test %d: invalid number of bytes written: got: #%d want #%d", i, file.Size, length) + for i, w := range writers { + if w == nil { + disks[i] = OfflineDisk } - for j := range storage.disks[:test.offDisks] { - storage.disks[j] = badDisk{nil} + } + if err == nil { + if length := int64(len(data[test.offset:])); n != length { + t.Errorf("Test %d: invalid number of bytes written: got: #%d want #%d", i, n, length) + } + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + if disk == nil { + continue + } + writers[i] = newBitrotWriter(disk, "testbucket", "object2", test.algorithm) + } + for j := range disks[:test.offDisks] { + writers[j].disk = badDisk{nil} } if test.offDisks > 0 { - storage.disks[0] = OfflineDisk + writers[0] = nil } - file, err = storage.CreateFile(context.Background(), bytes.NewReader(data[test.offset:]), "testbucket", "object2", buffer, test.algorithm, test.dataBlocks+1) + n, err = storage.CreateFile(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, storage.dataBlocks+1) if err != nil && !test.shouldFailQuorum { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -109,8 +127,8 @@ func TestErasureCreateFile(t *testing.T) { t.Errorf("Test %d: should fail but it passed", i) } if err == nil { - if length := int64(len(data[test.offset:])); file.Size != length { - t.Errorf("Test %d: invalid number of bytes written: got: #%d want #%d", i, file.Size, length) + if length := int64(len(data[test.offset:])); n != length { + t.Errorf("Test %d: invalid number of bytes written: got: #%d want #%d", i, n, length) } } } @@ -126,25 +144,33 @@ func benchmarkErasureWrite(data, parity, dataDown, parityDown int, size int64, b b.Fatalf("failed to create test setup: %v", err) } defer setup.Remove() - storage, err := NewErasureStorage(context.Background(), setup.disks, data, parity, blockSizeV1) + storage, err := NewErasureStorage(context.Background(), data, parity, blockSizeV1) if err != nil { b.Fatalf("failed to create ErasureStorage: %v", err) } + disks := setup.disks buffer := make([]byte, blockSizeV1, 2*blockSizeV1) content := make([]byte, size) for i := 0; i < dataDown; i++ { - storage.disks[i] = OfflineDisk + disks[i] = OfflineDisk } for i := data; i < data+parityDown; i++ { - storage.disks[i] = OfflineDisk + disks[i] = OfflineDisk } b.ResetTimer() b.SetBytes(size) b.ReportAllocs() for i := 0; i < b.N; i++ { - _, err := storage.CreateFile(context.Background(), bytes.NewReader(content), "testbucket", "object", buffer, DefaultBitrotAlgorithm, data+1) + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + if disk == OfflineDisk { + continue + } + writers[i] = newBitrotWriter(disk, "testbucket", "object", DefaultBitrotAlgorithm) + } + _, err := storage.CreateFile(context.Background(), bytes.NewReader(content), writers, buffer, storage.dataBlocks+1) if err != nil { panic(err) } diff --git a/cmd/erasure-healfile.go b/cmd/erasure-healfile.go index bb3980bce..7c652798a 100644 --- a/cmd/erasure-healfile.go +++ b/cmd/erasure-healfile.go @@ -18,171 +18,31 @@ package cmd import ( "context" - "fmt" - "hash" - "strings" + "io" "github.com/minio/minio/cmd/logger" ) -// HealFile tries to reconstruct an erasure-coded file spread over all -// available disks. HealFile will read the valid parts of the file, -// reconstruct the missing data and write the reconstructed parts back -// to `staleDisks` at the destination `dstVol/dstPath/`. Parts are -// verified against the given BitrotAlgorithm and checksums. -// -// `staleDisks` is a slice of disks where each non-nil entry has stale -// or no data, and so will be healed. -// -// It is required that `s.disks` have a (read-quorum) majority of -// disks with valid data for healing to work. -// -// In addition, `staleDisks` and `s.disks` must have the same ordering -// of disks w.r.t. erasure coding of the object. -// -// Errors when writing to `staleDisks` are not propagated as long as -// writes succeed for at least one disk. This allows partial healing -// despite stale disks being faulty. -// -// It returns bitrot checksums for the non-nil staleDisks on which -// healing succeeded. -func (s ErasureStorage) HealFile(ctx context.Context, staleDisks []StorageAPI, volume, path string, blocksize int64, - dstVol, dstPath string, size int64, alg BitrotAlgorithm, checksums [][]byte) ( - f ErasureFileInfo, err error) { - - if !alg.Available() { - logger.LogIf(ctx, errBitrotHashAlgoInvalid) - return f, errBitrotHashAlgoInvalid - } - - // Initialization - f.Checksums = make([][]byte, len(s.disks)) - hashers := make([]hash.Hash, len(s.disks)) - verifiers := make([]*BitrotVerifier, len(s.disks)) - for i, disk := range s.disks { - switch { - case staleDisks[i] != nil: - hashers[i] = alg.New() - case disk == nil: - // disregard unavailable disk - continue - default: - verifiers[i] = NewBitrotVerifier(alg, checksums[i]) +// HealFile heals the shard files on non-nil writers. Note that the quorum passed is 1 +// as healing should continue even if it has been successful healing only one shard file. +func (s ErasureStorage) HealFile(ctx context.Context, readers []*bitrotReader, writers []*bitrotWriter, size int64) error { + r, w := io.Pipe() + go func() { + if err := s.ReadFile(ctx, w, readers, 0, size, size); err != nil { + w.CloseWithError(err) + return } - } - writeErrors := make([]error, len(s.disks)) - - // Read part file data on each disk - chunksize := ceilFrac(blocksize, int64(s.dataBlocks)) - numBlocks := ceilFrac(size, blocksize) - - readLen := chunksize * (numBlocks - 1) - - lastChunkSize := chunksize - hasSmallerLastBlock := size%blocksize != 0 - if hasSmallerLastBlock { - lastBlockLen := size % blocksize - lastChunkSize = ceilFrac(lastBlockLen, int64(s.dataBlocks)) - } - readLen += lastChunkSize - var buffers [][]byte - buffers, _, err = s.readConcurrent(ctx, volume, path, 0, readLen, verifiers) + w.Close() + }() + buf := make([]byte, s.blockSize) + // quorum is 1 because CreateFile should continue writing as long as we are writing to even 1 disk. + n, err := s.CreateFile(ctx, r, writers, buf, 1) if err != nil { - return f, err + return err } - - // Scan part files on disk, block-by-block reconstruct it and - // write to stale disks. - blocks := make([][]byte, len(s.disks)) - - if numBlocks > 1 { - // Allocate once for all the equal length blocks. The - // last block may have a different length - allocation - // for this happens inside the for loop below. - for i := range blocks { - if len(buffers[i]) == 0 { - blocks[i] = make([]byte, chunksize) - } - } + if n != size { + logger.LogIf(ctx, errLessData) + return errLessData } - - var buffOffset int64 - for blockNumber := int64(0); blockNumber < numBlocks; blockNumber++ { - if blockNumber == numBlocks-1 && lastChunkSize != chunksize { - for i := range blocks { - if len(buffers[i]) == 0 { - blocks[i] = make([]byte, lastChunkSize) - } - } - } - - for i := range blocks { - if len(buffers[i]) == 0 { - blocks[i] = blocks[i][0:0] - } - } - - csize := chunksize - if blockNumber == numBlocks-1 { - csize = lastChunkSize - } - for i := range blocks { - if len(buffers[i]) != 0 { - blocks[i] = buffers[i][buffOffset : buffOffset+csize] - } - } - buffOffset += csize - - if err = s.ErasureDecodeDataAndParityBlocks(ctx, blocks); err != nil { - return f, err - } - - // write computed shards as chunks on file in each - // stale disk - writeSucceeded := false - for i, disk := range staleDisks { - // skip nil disk or disk that had error on - // previous write - if disk == nil || writeErrors[i] != nil { - continue - } - - writeErrors[i] = disk.AppendFile(dstVol, dstPath, blocks[i]) - if writeErrors[i] == nil { - hashers[i].Write(blocks[i]) - writeSucceeded = true - } - } - - // If all disks had write errors we quit. - if !writeSucceeded { - // build error from all write errors - err := joinWriteErrors(writeErrors) - logger.LogIf(ctx, err) - return f, err - } - } - - // copy computed file hashes into output variable - f.Size = size - f.Algorithm = alg - for i, disk := range staleDisks { - if disk == nil || writeErrors[i] != nil { - continue - } - f.Checksums[i] = hashers[i].Sum(nil) - } - return f, nil -} - -func joinWriteErrors(errs []error) error { - msgs := []string{} - for i, err := range errs { - if err == nil { - continue - } - msgs = append(msgs, fmt.Sprintf("disk %d: %v", i+1, err)) - } - return fmt.Errorf("all stale disks had write errors during healing: %s", - strings.Join(msgs, ", ")) + return nil } diff --git a/cmd/erasure-healfile_test.go b/cmd/erasure-healfile_test.go index 5206aa867..d70740893 100644 --- a/cmd/erasure-healfile_test.go +++ b/cmd/erasure-healfile_test.go @@ -21,7 +21,6 @@ import ( "context" "crypto/rand" "io" - "reflect" "testing" ) @@ -56,11 +55,9 @@ var erasureHealFileTests = []struct { {dataBlocks: 7, disks: 14, offDisks: 6, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 14 {dataBlocks: 8, disks: 16, offDisks: 4, badDisks: 5, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: true}, // 15 {dataBlocks: 2, disks: 4, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 16 - {dataBlocks: 2, disks: 4, offDisks: 0, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: 0, shouldFail: true}, // 17 - {dataBlocks: 12, disks: 16, offDisks: 2, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 18 - {dataBlocks: 6, disks: 8, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 19 - {dataBlocks: 7, disks: 10, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: 0, shouldFail: true}, // 20 - {dataBlocks: 2, disks: 4, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte * 64, algorithm: SHA256, shouldFail: false}, // 21 + {dataBlocks: 12, disks: 16, offDisks: 2, badDisks: 1, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false}, // 17 + {dataBlocks: 6, disks: 8, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte, algorithm: BLAKE2b512, shouldFail: false}, // 18 + {dataBlocks: 2, disks: 4, offDisks: 1, badDisks: 0, badStaleDisks: 0, blocksize: int64(blockSizeV1), size: oneMiByte * 64, algorithm: SHA256, shouldFail: false}, // 19 } func TestErasureHealFile(t *testing.T) { @@ -75,7 +72,8 @@ func TestErasureHealFile(t *testing.T) { if err != nil { t.Fatalf("Test %d: failed to setup XL environment: %v", i, err) } - storage, err := NewErasureStorage(context.Background(), setup.disks, test.dataBlocks, test.disks-test.dataBlocks, test.blocksize) + disks := setup.disks + storage, err := NewErasureStorage(context.Background(), test.dataBlocks, test.disks-test.dataBlocks, test.blocksize) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create ErasureStorage: %v", i, err) @@ -85,36 +83,50 @@ func TestErasureHealFile(t *testing.T) { setup.Remove() t.Fatalf("Test %d: failed to create random test data: %v", i, err) } - algorithm := test.algorithm - if !algorithm.Available() { - algorithm = DefaultBitrotAlgorithm - } buffer := make([]byte, test.blocksize, 2*test.blocksize) - file, err := storage.CreateFile(context.Background(), bytes.NewReader(data), "testbucket", "testobject", buffer, algorithm, test.dataBlocks+1) + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + writers[i] = newBitrotWriter(disk, "testbucket", "testobject", test.algorithm) + } + _, err = storage.CreateFile(context.Background(), bytes.NewReader(data), writers, buffer, storage.dataBlocks+1) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create random test data: %v", i, err) } + readers := make([]*bitrotReader, len(disks)) + for i, disk := range disks { + shardFilesize := getErasureShardFileSize(test.blocksize, test.size, storage.dataBlocks) + readers[i] = newBitrotReader(disk, "testbucket", "testobject", test.algorithm, shardFilesize, writers[i].Sum()) + } + // setup stale disks for the test case - staleDisks := make([]StorageAPI, len(storage.disks)) - copy(staleDisks, storage.disks) - for j := 0; j < len(storage.disks); j++ { + staleDisks := make([]StorageAPI, len(disks)) + copy(staleDisks, disks) + for j := 0; j < len(staleDisks); j++ { if j < test.offDisks { - storage.disks[j] = OfflineDisk + readers[j] = nil } else { staleDisks[j] = nil } } for j := 0; j < test.badDisks; j++ { - storage.disks[test.offDisks+j] = badDisk{nil} + readers[test.offDisks+j].disk = badDisk{nil} } for j := 0; j < test.badStaleDisks; j++ { staleDisks[j] = badDisk{nil} } + staleWriters := make([]*bitrotWriter, len(staleDisks)) + for i, disk := range staleDisks { + if disk == nil { + continue + } + staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", test.algorithm) + } + // test case setup is complete - now call Healfile() - info, err := storage.HealFile(context.Background(), staleDisks, "testbucket", "testobject", test.blocksize, "testbucket", "healedobject", test.size, test.algorithm, file.Checksums) + err = storage.HealFile(context.Background(), readers, staleWriters, test.size) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but it failed with: %v", i, err) } @@ -122,19 +134,13 @@ func TestErasureHealFile(t *testing.T) { t.Errorf("Test %d: should fail but it passed", i) } if err == nil { - if info.Size != test.size { - t.Errorf("Test %d: healed wrong number of bytes: got: #%d want: #%d", i, info.Size, test.size) - } - if info.Algorithm != test.algorithm { - t.Errorf("Test %d: healed with wrong algorithm: got: %v want: %v", i, info.Algorithm, test.algorithm) - } // Verify that checksums of staleDisks // match expected values - for i, disk := range staleDisks { - if disk == nil || info.Checksums[i] == nil { + for i := range staleWriters { + if staleWriters[i] == nil { continue } - if !reflect.DeepEqual(info.Checksums[i], file.Checksums[i]) { + if !bytes.Equal(staleWriters[i].Sum(), writers[i].Sum()) { t.Errorf("Test %d: heal returned different bitrot checksums", i) } } diff --git a/cmd/erasure-readfile.go b/cmd/erasure-readfile.go index 16e5943a4..d42620b7b 100644 --- a/cmd/erasure-readfile.go +++ b/cmd/erasure-readfile.go @@ -23,214 +23,166 @@ import ( "github.com/minio/minio/cmd/logger" ) -type errIdx struct { - idx int - err error +// Reads in parallel from bitrotReaders. +type parallelReader struct { + readers []*bitrotReader + dataBlocks int + offset int64 + shardSize int64 + shardFileSize int64 } -func (s ErasureStorage) readConcurrent(ctx context.Context, volume, path string, offset, length int64, - verifiers []*BitrotVerifier) (buffers [][]byte, needsReconstruction bool, - err error) { +// newParallelReader returns parallelReader. +func newParallelReader(readers []*bitrotReader, dataBlocks int, offset int64, fileSize int64, blocksize int64) *parallelReader { + shardSize := ceilFrac(blocksize, int64(dataBlocks)) + shardFileSize := getErasureShardFileSize(blocksize, fileSize, dataBlocks) + return ¶llelReader{ + readers, + dataBlocks, + (offset / blocksize) * shardSize, + shardSize, + shardFileSize, + } +} - errChan := make(chan errIdx) - stageBuffers := make([][]byte, len(s.disks)) - buffers = make([][]byte, len(s.disks)) - - readDisk := func(i int) { - stageBuffers[i] = make([]byte, length) - disk := s.disks[i] - if disk == OfflineDisk { - logger.LogIf(ctx, errDiskNotFound) - errChan <- errIdx{i, errDiskNotFound} - return +// Returns if buf can be erasure decoded. +func (p *parallelReader) canDecode(buf [][]byte) bool { + bufCount := 0 + for _, b := range buf { + if b != nil { + bufCount++ } - _, rerr := disk.ReadFile(volume, path, offset, stageBuffers[i], verifiers[i]) - errChan <- errIdx{i, rerr} + } + return bufCount >= p.dataBlocks +} + +// Read reads from bitrotReaders in parallel. Returns p.dataBlocks number of bufs. +func (p *parallelReader) Read() ([][]byte, error) { + type errIdx struct { + idx int + buf []byte + err error } - var finishedCount, successCount, launchIndex int + errCh := make(chan errIdx) + currReaderIndex := 0 + newBuf := make([][]byte, len(p.readers)) - for ; launchIndex < s.dataBlocks; launchIndex++ { - go readDisk(launchIndex) + if p.offset+p.shardSize > p.shardFileSize { + p.shardSize = p.shardFileSize - p.offset } - for finishedCount < launchIndex { - select { - case errVal := <-errChan: - finishedCount++ - if errVal.err != nil { - // TODO: meaningfully log the disk read error - // A disk failed to return data, so we - // request an additional disk if possible - if launchIndex < s.dataBlocks+s.parityBlocks { - needsReconstruction = true - // requiredBlocks++ - go readDisk(launchIndex) - launchIndex++ - } - } else { - successCount++ - buffers[errVal.idx] = stageBuffers[errVal.idx] - stageBuffers[errVal.idx] = nil + read := func(currReaderIndex int) { + b, err := p.readers[currReaderIndex].ReadChunk(p.offset, p.shardSize) + errCh <- errIdx{currReaderIndex, b, err} + } + + readerCount := 0 + for _, r := range p.readers { + if r != nil { + readerCount++ + } + } + if readerCount < p.dataBlocks { + return nil, errXLReadQuorum + } + + readerCount = 0 + for i, r := range p.readers { + if r == nil { + continue + } + go read(i) + readerCount++ + if readerCount == p.dataBlocks { + currReaderIndex = i + 1 + break + } + } + + for errVal := range errCh { + if errVal.err == nil { + newBuf[errVal.idx] = errVal.buf + if p.canDecode(newBuf) { + p.offset += int64(p.shardSize) + return newBuf, nil } + continue } + p.readers[errVal.idx] = nil + for currReaderIndex < len(p.readers) { + if p.readers[currReaderIndex] != nil { + break + } + currReaderIndex++ + } + + if currReaderIndex == len(p.readers) { + break + } + go read(currReaderIndex) + currReaderIndex++ } - if successCount != s.dataBlocks { - // Not enough disks returns data. - err = errXLReadQuorum - logger.LogIf(ctx, err) - } - return + + return nil, errXLReadQuorum } -// ReadFile reads as much data as requested from the file under the -// given volume and path and writes the data to the provided writer. -// The algorithm and the keys/checksums are used to verify the -// integrity of the given file. ReadFile will read data from the given -// offset up to the given length. If parts of the file are corrupted -// ReadFile tries to reconstruct the data. -func (s ErasureStorage) ReadFile(ctx context.Context, writer io.Writer, volume, path string, offset, - length, totalLength int64, checksums [][]byte, algorithm BitrotAlgorithm, - blocksize int64) (f ErasureFileInfo, err error) { - +// ReadFile reads from readers, reconstructs data if needed and writes the data to the writer. +func (s ErasureStorage) ReadFile(ctx context.Context, writer io.Writer, readers []*bitrotReader, offset, length, totalLength int64) error { if offset < 0 || length < 0 { - logger.LogIf(ctx, errUnexpected) - return f, errUnexpected + logger.LogIf(ctx, errInvalidArgument) + return errInvalidArgument } if offset+length > totalLength { - logger.LogIf(ctx, errUnexpected) - return f, errUnexpected + logger.LogIf(ctx, errInvalidArgument) + return errInvalidArgument } - if !algorithm.Available() { - logger.LogIf(ctx, errBitrotHashAlgoInvalid) - return f, errBitrotHashAlgoInvalid - } - - f.Checksums = make([][]byte, len(s.disks)) - verifiers := make([]*BitrotVerifier, len(s.disks)) - for i, disk := range s.disks { - if disk == OfflineDisk { - continue - } - verifiers[i] = NewBitrotVerifier(algorithm, checksums[i]) - } - - chunksize := ceilFrac(blocksize, int64(s.dataBlocks)) - - // We read all whole-blocks of erasure coded data containing - // the requested data range. - // - // The start index of the erasure coded block containing the - // `offset` byte of data is: - partDataStartIndex := (offset / blocksize) * chunksize - // The start index of the erasure coded block containing the - // (last) byte of data at the index `offset + length - 1` is: - blockStartIndex := ((offset + length - 1) / blocksize) * chunksize - // However, we need the end index of the e.c. block containing - // the last byte - we need to check if that block is the last - // block in the part (in that case, it may be have a different - // chunk size) - isLastBlock := (totalLength-1)/blocksize == (offset+length-1)/blocksize - var partDataEndIndex int64 - if isLastBlock { - lastBlockChunkSize := chunksize - if totalLength%blocksize != 0 { - lastBlockChunkSize = ceilFrac(totalLength%blocksize, int64(s.dataBlocks)) - } - partDataEndIndex = blockStartIndex + lastBlockChunkSize - 1 - } else { - partDataEndIndex = blockStartIndex + chunksize - 1 - } - - // Thus, the length of data to be read from the part file(s) is: - partDataLength := partDataEndIndex - partDataStartIndex + 1 - // The calculation above does not apply when length == 0: if length == 0 { - partDataLength = 0 + return nil } - var buffers [][]byte - var needsReconstruction bool - buffers, needsReconstruction, err = s.readConcurrent(ctx, volume, path, - partDataStartIndex, partDataLength, verifiers) - if err != nil { - // Could not read enough disks. - return - } + reader := newParallelReader(readers, s.dataBlocks, offset, totalLength, s.blockSize) - numChunks := ceilFrac(partDataLength, chunksize) - blocks := make([][]byte, len(s.disks)) + startBlock := offset / s.blockSize + endBlock := (offset + length) / s.blockSize - if needsReconstruction && numChunks > 1 { - // Allocate once for all the equal length blocks. The - // last block may have a different length - allocation - // for this happens inside the for loop below. - for i := range blocks { - if len(buffers[i]) == 0 { - blocks[i] = make([]byte, chunksize) - } + var bytesWritten int64 + for block := startBlock; block <= endBlock; block++ { + var blockOffset, blockLength int64 + switch { + case startBlock == endBlock: + blockOffset = offset % s.blockSize + blockLength = length + case block == startBlock: + blockOffset = offset % s.blockSize + blockLength = s.blockSize - blockOffset + case block == endBlock: + blockOffset = 0 + blockLength = (offset + length) % s.blockSize + default: + blockOffset = 0 + blockLength = s.blockSize } - } - - var buffOffset int64 - for chunkNumber := int64(0); chunkNumber < numChunks; chunkNumber++ { - if chunkNumber == numChunks-1 && partDataLength%chunksize != 0 { - chunksize = partDataLength % chunksize - // We allocate again as the last chunk has a - // different size. - for i := range blocks { - if len(buffers[i]) == 0 { - blocks[i] = make([]byte, chunksize) - } - } + if blockLength == 0 { + break } - - for i := range blocks { - if len(buffers[i]) == 0 { - blocks[i] = blocks[i][0:0] - } - } - - for i := range blocks { - if len(buffers[i]) != 0 { - blocks[i] = buffers[i][buffOffset : buffOffset+chunksize] - } - } - buffOffset += chunksize - - if needsReconstruction { - if err = s.ErasureDecodeDataBlocks(blocks); err != nil { - logger.LogIf(ctx, err) - return f, err - } - } - - var writeStart int64 - if chunkNumber == 0 { - writeStart = offset % blocksize - } - - writeLength := blocksize - writeStart - if chunkNumber == numChunks-1 { - lastBlockLength := (offset + length) % blocksize - if lastBlockLength != 0 { - writeLength = lastBlockLength - writeStart - } - } - n, err := writeDataBlocks(ctx, writer, blocks, s.dataBlocks, writeStart, writeLength) + bufs, err := reader.Read() if err != nil { - return f, err + return err } - - f.Size += n - } - - f.Algorithm = algorithm - for i, disk := range s.disks { - if disk == OfflineDisk || buffers[i] == nil { - continue + if err = s.ErasureDecodeDataBlocks(bufs); err != nil { + logger.LogIf(ctx, err) + return err } - f.Checksums[i] = verifiers[i].Sum(nil) + n, err := writeDataBlocks(ctx, writer, bufs, s.dataBlocks, blockOffset, blockLength) + if err != nil { + return err + } + bytesWritten += n } - return f, nil + if bytesWritten != length { + logger.LogIf(ctx, errLessData) + return errLessData + } + return nil } diff --git a/cmd/erasure-readfile_test.go b/cmd/erasure-readfile_test.go index db164a93a..e18e1a6b8 100644 --- a/cmd/erasure-readfile_test.go +++ b/cmd/erasure-readfile_test.go @@ -19,11 +19,12 @@ package cmd import ( "bytes" "context" - crand "crypto/rand" "io" "math/rand" "testing" + crand "crypto/rand" + humanize "github.com/dustin/go-humanize" ) @@ -66,19 +67,18 @@ var erasureReadFileTests = []struct { {dataBlocks: 8, onDisks: 16, offDisks: 7, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 23 {dataBlocks: 2, onDisks: 4, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 24 {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 25 - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: 0, shouldFail: true, shouldFailQuorum: false}, // 26 - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(blockSizeV1) + 1, offset: 0, length: int64(blockSizeV1) + 1, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 27 - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 12, length: int64(blockSizeV1) + 17, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 28 - {dataBlocks: 3, onDisks: 6, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 1023, length: int64(blockSizeV1) + 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 29 - {dataBlocks: 4, onDisks: 8, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 11, length: int64(blockSizeV1) + 2*1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 30 - {dataBlocks: 6, onDisks: 12, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 512, length: int64(blockSizeV1) + 8*1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 31 - {dataBlocks: 8, onDisks: 16, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: int64(blockSizeV1), length: int64(blockSizeV1) - 1, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 32 - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(oneMiByte), offset: -1, length: 3, algorithm: DefaultBitrotAlgorithm, shouldFail: true, shouldFailQuorum: false}, // 33 - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(oneMiByte), offset: 1024, length: -1, algorithm: DefaultBitrotAlgorithm, shouldFail: true, shouldFailQuorum: false}, // 34 - {dataBlocks: 4, onDisks: 6, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(blockSizeV1), offset: 0, length: int64(blockSizeV1), algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 35 - {dataBlocks: 4, onDisks: 6, offDisks: 1, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 12, length: int64(blockSizeV1) + 17, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 36 - {dataBlocks: 4, onDisks: 6, offDisks: 3, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 1023, length: int64(blockSizeV1) + 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 37 - {dataBlocks: 8, onDisks: 12, offDisks: 4, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 11, length: int64(blockSizeV1) + 2*1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 38 + {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(blockSizeV1) + 1, offset: 0, length: int64(blockSizeV1) + 1, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 26 + {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 12, length: int64(blockSizeV1) + 17, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 27 + {dataBlocks: 3, onDisks: 6, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 1023, length: int64(blockSizeV1) + 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 28 + {dataBlocks: 4, onDisks: 8, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 11, length: int64(blockSizeV1) + 2*1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 29 + {dataBlocks: 6, onDisks: 12, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 512, length: int64(blockSizeV1) + 8*1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 30 + {dataBlocks: 8, onDisks: 16, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: int64(blockSizeV1), length: int64(blockSizeV1) - 1, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 31 + {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(oneMiByte), offset: -1, length: 3, algorithm: DefaultBitrotAlgorithm, shouldFail: true, shouldFailQuorum: false}, // 32 + {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(oneMiByte), offset: 1024, length: -1, algorithm: DefaultBitrotAlgorithm, shouldFail: true, shouldFailQuorum: false}, // 33 + {dataBlocks: 4, onDisks: 6, offDisks: 0, blocksize: int64(blockSizeV1), data: int64(blockSizeV1), offset: 0, length: int64(blockSizeV1), algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 34 + {dataBlocks: 4, onDisks: 6, offDisks: 1, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 12, length: int64(blockSizeV1) + 17, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 35 + {dataBlocks: 4, onDisks: 6, offDisks: 3, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 1023, length: int64(blockSizeV1) + 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 36 + {dataBlocks: 8, onDisks: 12, offDisks: 4, blocksize: int64(blockSizeV1), data: int64(2 * blockSizeV1), offset: 11, length: int64(blockSizeV1) + 2*1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 37 } func TestErasureReadFile(t *testing.T) { @@ -87,29 +87,54 @@ func TestErasureReadFile(t *testing.T) { if err != nil { t.Fatalf("Test %d: failed to create test setup: %v", i, err) } - storage, err := NewErasureStorage(context.Background(), setup.disks, test.dataBlocks, test.onDisks-test.dataBlocks, test.blocksize) + storage, err := NewErasureStorage(context.Background(), test.dataBlocks, test.onDisks-test.dataBlocks, test.blocksize) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create ErasureStorage: %v", i, err) } - + disks := setup.disks data := make([]byte, test.data) if _, err = io.ReadFull(crand.Reader, data); err != nil { setup.Remove() t.Fatalf("Test %d: failed to generate random test data: %v", i, err) } + writeAlgorithm := test.algorithm if !test.algorithm.Available() { writeAlgorithm = DefaultBitrotAlgorithm } buffer := make([]byte, test.blocksize, 2*test.blocksize) - file, err := storage.CreateFile(context.Background(), bytes.NewReader(data[:]), "testbucket", "object", buffer, writeAlgorithm, test.dataBlocks+1) + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + writers[i] = newBitrotWriter(disk, "testbucket", "object", writeAlgorithm) + } + n, err := storage.CreateFile(context.Background(), bytes.NewReader(data[:]), writers, buffer, storage.dataBlocks+1) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create erasure test file: %v", i, err) } + if n != test.data { + setup.Remove() + t.Fatalf("Test %d: failed to create erasure test file", i) + } + for i, w := range writers { + if w == nil { + disks[i] = nil + } + } + + // Get the checksums of the current part. + bitrotReaders := make([]*bitrotReader, len(disks)) + for index, disk := range disks { + if disk == OfflineDisk { + continue + } + endOffset := getErasureShardFileEndOffset(test.offset, test.length, test.data, test.blocksize, storage.dataBlocks) + bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", writeAlgorithm, endOffset, writers[index].Sum()) + } + writer := bytes.NewBuffer(nil) - readInfo, err := storage.ReadFile(context.Background(), writer, "testbucket", "object", test.offset, test.length, test.data, file.Checksums, test.algorithm, test.blocksize) + err = storage.ReadFile(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -117,25 +142,32 @@ func TestErasureReadFile(t *testing.T) { t.Errorf("Test %d: should fail but it passed", i) } if err == nil { - if readInfo.Size != test.length { - t.Errorf("Test %d: read returns wrong number of bytes: got: #%d want: #%d", i, readInfo.Size, test.length) - } - if readInfo.Algorithm != test.algorithm { - t.Errorf("Test %d: read returns wrong algorithm: got: %v want: %v", i, readInfo.Algorithm, test.algorithm) - } if content := writer.Bytes(); !bytes.Equal(content, data[test.offset:test.offset+test.length]) { t.Errorf("Test %d: read retruns wrong file content", i) } } + for i, r := range bitrotReaders { + if r == nil { + disks[i] = OfflineDisk + } + } if err == nil && !test.shouldFail { - writer.Reset() - for j := range storage.disks[:test.offDisks] { - storage.disks[j] = badDisk{nil} + bitrotReaders = make([]*bitrotReader, len(disks)) + for index, disk := range disks { + if disk == OfflineDisk { + continue + } + endOffset := getErasureShardFileEndOffset(test.offset, test.length, test.data, test.blocksize, storage.dataBlocks) + bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", writeAlgorithm, endOffset, writers[index].Sum()) + } + for j := range disks[:test.offDisks] { + bitrotReaders[j].disk = badDisk{nil} } if test.offDisks > 0 { - storage.disks[0] = OfflineDisk + bitrotReaders[0] = nil } - readInfo, err = storage.ReadFile(context.Background(), writer, "testbucket", "object", test.offset, test.length, test.data, file.Checksums, test.algorithm, test.blocksize) + writer.Reset() + err = storage.ReadFile(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data) if err != nil && !test.shouldFailQuorum { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -143,12 +175,6 @@ func TestErasureReadFile(t *testing.T) { t.Errorf("Test %d: should fail but it passed", i) } if !test.shouldFailQuorum { - if readInfo.Size != test.length { - t.Errorf("Test %d: read returns wrong number of bytes: got: #%d want: #%d", i, readInfo.Size, test.length) - } - if readInfo.Algorithm != test.algorithm { - t.Errorf("Test %d: read returns wrong algorithm: got: %v want: %v", i, readInfo.Algorithm, test.algorithm) - } if content := writer.Bytes(); !bytes.Equal(content, data[test.offset:test.offset+test.length]) { t.Errorf("Test %d: read retruns wrong file content", i) } @@ -174,8 +200,8 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { return } defer setup.Remove() - - storage, err := NewErasureStorage(context.Background(), setup.disks, dataBlocks, parityBlocks, blockSize) + disks := setup.disks + storage, err := NewErasureStorage(context.Background(), dataBlocks, parityBlocks, blockSize) if err != nil { t.Fatalf("failed to create ErasureStorage: %v", err) } @@ -187,17 +213,25 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { t.Fatal(err) } + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + if disk == nil { + continue + } + writers[i] = newBitrotWriter(disk, "testbucket", "object", DefaultBitrotAlgorithm) + } + // 10000 iterations with random offsets and lengths. iterations := 10000 // Create a test file to read from. buffer := make([]byte, blockSize, 2*blockSize) - file, err := storage.CreateFile(context.Background(), bytes.NewReader(data), "testbucket", "testobject", buffer, DefaultBitrotAlgorithm, dataBlocks+1) + n, err := storage.CreateFile(context.Background(), bytes.NewReader(data), writers, buffer, storage.dataBlocks+1) if err != nil { t.Fatal(err) } - if file.Size != length { - t.Errorf("erasureCreateFile returned %d, expected %d", file.Size, length) + if n != length { + t.Errorf("erasureCreateFile returned %d, expected %d", n, length) } // To generate random offset/length. @@ -212,7 +246,16 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { expected := data[offset : offset+readLen] - _, err = storage.ReadFile(context.Background(), buf, "testbucket", "testobject", offset, readLen, length, file.Checksums, DefaultBitrotAlgorithm, blockSize) + // Get the checksums of the current part. + bitrotReaders := make([]*bitrotReader, len(disks)) + for index, disk := range disks { + if disk == OfflineDisk { + continue + } + endOffset := getErasureShardFileEndOffset(offset, readLen, length, blockSize, storage.dataBlocks) + bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", DefaultBitrotAlgorithm, endOffset, writers[index].Sum()) + } + err = storage.ReadFile(context.Background(), buf, bitrotReaders, offset, readLen, length) if err != nil { t.Fatal(err, offset, readLen) } @@ -232,31 +275,47 @@ func benchmarkErasureRead(data, parity, dataDown, parityDown int, size int64, b b.Fatalf("failed to create test setup: %v", err) } defer setup.Remove() - storage, err := NewErasureStorage(context.Background(), setup.disks, data, parity, blockSizeV1) + disks := setup.disks + storage, err := NewErasureStorage(context.Background(), data, parity, blockSizeV1) if err != nil { b.Fatalf("failed to create ErasureStorage: %v", err) } + writers := make([]*bitrotWriter, len(disks)) + for i, disk := range disks { + if disk == nil { + continue + } + writers[i] = newBitrotWriter(disk, "testbucket", "object", DefaultBitrotAlgorithm) + } + content := make([]byte, size) buffer := make([]byte, blockSizeV1, 2*blockSizeV1) - file, err := storage.CreateFile(context.Background(), bytes.NewReader(content), "testbucket", "object", buffer, DefaultBitrotAlgorithm, data+1) + _, err = storage.CreateFile(context.Background(), bytes.NewReader(content), writers, buffer, storage.dataBlocks+1) if err != nil { b.Fatalf("failed to create erasure test file: %v", err) } - checksums := file.Checksums for i := 0; i < dataDown; i++ { - storage.disks[i] = OfflineDisk + writers[i] = nil } for i := data; i < data+parityDown; i++ { - storage.disks[i] = OfflineDisk + writers[i] = nil } b.ResetTimer() b.SetBytes(size) b.ReportAllocs() for i := 0; i < b.N; i++ { - if file, err = storage.ReadFile(context.Background(), bytes.NewBuffer(content[:0]), "testbucket", "object", 0, size, size, checksums, DefaultBitrotAlgorithm, blockSizeV1); err != nil { + bitrotReaders := make([]*bitrotReader, len(disks)) + for index, disk := range disks { + if writers[index] == nil { + continue + } + endOffset := getErasureShardFileEndOffset(0, size, size, storage.blockSize, storage.dataBlocks) + bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", DefaultBitrotAlgorithm, endOffset, writers[index].Sum()) + } + if err = storage.ReadFile(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size); err != nil { panic(err) } } diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 31544debd..f63c19570 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -104,3 +104,25 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data // Success. return totalWritten, nil } + +// Returns shard-file size. +func getErasureShardFileSize(blockSize int64, totalLength int64, dataBlocks int) int64 { + shardSize := ceilFrac(int64(blockSize), int64(dataBlocks)) + numShards := totalLength / int64(blockSize) + lastBlockSize := totalLength % int64(blockSize) + lastShardSize := ceilFrac(lastBlockSize, int64(dataBlocks)) + return shardSize*numShards + lastShardSize +} + +// Returns the endOffset till which bitrotReader should read data using disk.ReadFile() +// partOffset, partLength and partSize are values of the object's part file. +func getErasureShardFileEndOffset(partOffset int64, partLength int64, partSize int64, erasureBlockSize int64, dataBlocks int) int64 { + shardSize := ceilFrac(erasureBlockSize, int64(dataBlocks)) + shardFileSize := getErasureShardFileSize(erasureBlockSize, partSize, dataBlocks) + endShard := (partOffset + int64(partLength)) / erasureBlockSize + endOffset := endShard*shardSize + shardSize + if endOffset > shardFileSize { + endOffset = shardFileSize + } + return endOffset +} diff --git a/cmd/erasure.go b/cmd/erasure.go index cc74e50ba..59afbdd2f 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -18,8 +18,6 @@ package cmd import ( "context" - "crypto/subtle" - "hash" "github.com/klauspost/reedsolomon" "github.com/minio/minio/cmd/logger" @@ -28,43 +26,36 @@ import ( // OfflineDisk represents an unavailable disk. var OfflineDisk StorageAPI // zero value is nil -// ErasureFileInfo contains information about an erasure file operation (create, read, heal). -type ErasureFileInfo struct { - Size int64 - Algorithm BitrotAlgorithm - Checksums [][]byte -} - -// ErasureStorage represents an array of disks. -// The disks contain erasure coded and bitrot-protected data. +// ErasureStorage - erasure encoding details. type ErasureStorage struct { - disks []StorageAPI erasure reedsolomon.Encoder dataBlocks, parityBlocks int + blockSize int64 } -// NewErasureStorage creates a new ErasureStorage. The storage erasure codes and protects all data written to -// the disks. -func NewErasureStorage(ctx context.Context, disks []StorageAPI, dataBlocks, parityBlocks int, blockSize int64) (s ErasureStorage, err error) { - shardsize := (int(blockSize) + dataBlocks - 1) / dataBlocks +// NewErasureStorage creates a new ErasureStorage. +func NewErasureStorage(ctx context.Context, dataBlocks, parityBlocks int, blockSize int64) (s ErasureStorage, err error) { + shardsize := int(ceilFrac(blockSize, int64(dataBlocks))) erasure, err := reedsolomon.New(dataBlocks, parityBlocks, reedsolomon.WithAutoGoroutines(shardsize)) if err != nil { logger.LogIf(ctx, err) return s, err } s = ErasureStorage{ - disks: make([]StorageAPI, len(disks)), erasure: erasure, dataBlocks: dataBlocks, parityBlocks: parityBlocks, + blockSize: blockSize, } - copy(s.disks, disks) return } // ErasureEncode encodes the given data and returns the erasure-coded data. // It returns an error if the erasure coding failed. func (s *ErasureStorage) ErasureEncode(ctx context.Context, data []byte) ([][]byte, error) { + if len(data) == 0 { + return make([][]byte, s.dataBlocks+s.parityBlocks), nil + } encoded, err := s.erasure.Split(data) if err != nil { logger.LogIf(ctx, err) @@ -81,6 +72,16 @@ func (s *ErasureStorage) ErasureEncode(ctx context.Context, data []byte) ([][]by // It only decodes the data blocks but does not verify them. // It returns an error if the decoding failed. func (s *ErasureStorage) ErasureDecodeDataBlocks(data [][]byte) error { + needsReconstruction := false + for _, b := range data[:s.dataBlocks] { + if b == nil { + needsReconstruction = true + break + } + } + if !needsReconstruction { + return nil + } if err := s.erasure.ReconstructData(data); err != nil { return err } @@ -96,27 +97,3 @@ func (s *ErasureStorage) ErasureDecodeDataAndParityBlocks(ctx context.Context, d } return nil } - -// NewBitrotVerifier returns a new BitrotVerifier implementing the given algorithm. -func NewBitrotVerifier(algorithm BitrotAlgorithm, checksum []byte) *BitrotVerifier { - return &BitrotVerifier{algorithm.New(), algorithm, checksum, false} -} - -// BitrotVerifier can be used to verify protected data. -type BitrotVerifier struct { - hash.Hash - - algorithm BitrotAlgorithm - sum []byte - verified bool -} - -// Verify returns true iff the computed checksum of the verifier matches the the checksum provided when the verifier -// was created. -func (v *BitrotVerifier) Verify() bool { - v.verified = true - return subtle.ConstantTimeCompare(v.Sum(nil), v.sum) == 1 -} - -// IsVerified returns true iff Verify was called at least once. -func (v *BitrotVerifier) IsVerified() bool { return v.verified } diff --git a/cmd/erasure_test.go b/cmd/erasure_test.go index cef78f5d6..ff1e89ee8 100644 --- a/cmd/erasure_test.go +++ b/cmd/erasure_test.go @@ -52,8 +52,7 @@ func TestErasureDecode(t *testing.T) { buffer := make([]byte, len(data), 2*len(data)) copy(buffer, data) - disks := make([]StorageAPI, test.dataBlocks+test.parityBlocks) - storage, err := NewErasureStorage(context.Background(), disks, test.dataBlocks, test.parityBlocks, blockSizeV1) + storage, err := NewErasureStorage(context.Background(), test.dataBlocks, test.parityBlocks, blockSizeV1) if err != nil { t.Fatalf("Test %d: failed to create erasure storage: %v", i, err) } diff --git a/cmd/posix.go b/cmd/posix.go index a23d2cdf4..772a2a6e5 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -31,6 +31,8 @@ import ( "syscall" "time" + "bytes" + humanize "github.com/dustin/go-humanize" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/disk" @@ -736,13 +738,19 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { // // Additionally ReadFile also starts reading from an offset. ReadFile // semantics are same as io.ReadFull. -func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (n int64, err error) { +func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (int64, error) { + var n int + var err error defer func() { if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() + if offset < 0 { + return 0, errInvalidArgument + } + if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError { return 0, errFaultyDisk } @@ -803,35 +811,36 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif return 0, errIsNotRegular } - if verifier != nil { - bufp := s.pool.Get().(*[]byte) - defer s.pool.Put(bufp) - - if offset != 0 { - if _, err = io.CopyBuffer(verifier, io.LimitReader(file, offset), *bufp); err != nil { - return 0, err - } - } - if _, err = file.Read(buffer); err != nil { - return 0, err - } - if _, err = verifier.Write(buffer); err != nil { - return 0, err - } - if _, err = io.CopyBuffer(verifier, file, *bufp); err != nil { - return 0, err - } - if !verifier.Verify() { - return 0, hashMismatchError{hex.EncodeToString(verifier.sum), hex.EncodeToString(verifier.Sum(nil))} - } - return int64(len(buffer)), err + if verifier == nil { + n, err = file.ReadAt(buffer, offset) + return int64(n), err } - m, err := file.ReadAt(buffer, offset) - if m > 0 && m < len(buffer) { - err = io.ErrUnexpectedEOF + bufp := s.pool.Get().(*[]byte) + defer s.pool.Put(bufp) + + h := verifier.algorithm.New() + if _, err = io.CopyBuffer(h, io.LimitReader(file, offset), *bufp); err != nil { + return 0, err } - return int64(m), err + + if n, err = io.ReadFull(file, buffer); err != nil { + return int64(n), err + } + + if _, err = h.Write(buffer); err != nil { + return 0, err + } + + if _, err = io.CopyBuffer(h, file, *bufp); err != nil { + return 0, err + } + + if bytes.Compare(h.Sum(nil), verifier.sum) != 0 { + return 0, hashMismatchError{hex.EncodeToString(verifier.sum), hex.EncodeToString(h.Sum(nil))} + } + + return int64(len(buffer)), nil } func (s *posix) createFile(volume, path string) (f *os.File, err error) { diff --git a/cmd/posix_test.go b/cmd/posix_test.go index bc7d95ae7..c67c33da6 100644 --- a/cmd/posix_test.go +++ b/cmd/posix_test.go @@ -1140,7 +1140,7 @@ func TestPosixReadFile(t *testing.T) { // Create all files needed during testing. appendFiles := testCases[:4] - + v := NewBitrotVerifier(SHA256, getSHA256Sum([]byte("hello, world"))) // Create test files for further reading. for i, appendFile := range appendFiles { err = posixStorage.AppendFile(volume, appendFile.fileName, []byte("hello, world")) @@ -1149,21 +1149,11 @@ func TestPosixReadFile(t *testing.T) { } } - // Check PathError specially. { buf := make([]byte, 5) - if _, err = posixStorage.ReadFile(volume, "myobject", -1, buf, nil); err != nil { - isPathError := false - switch err.(type) { - case *os.PathError: - isPathError = true - } - - if !isPathError { - t.Fatalf("expected: , got: %v", err) - } - } else { - t.Fatalf("expected: , got: ") + // Test for negative offset. + if _, err = posixStorage.ReadFile(volume, "myobject", -1, buf, v); err == nil { + t.Fatalf("expected: error, got: ") } } @@ -1172,7 +1162,7 @@ func TestPosixReadFile(t *testing.T) { var n int64 // Common read buffer. var buf = make([]byte, testCase.bufSize) - n, err = posixStorage.ReadFile(testCase.volume, testCase.fileName, testCase.offset, buf, nil) + n, err = posixStorage.ReadFile(testCase.volume, testCase.fileName, testCase.offset, buf, v) if err != nil && testCase.expectedErr != nil { // Validate if the type string of the errors are an exact match. if err.Error() != testCase.expectedErr.Error() { @@ -1201,7 +1191,7 @@ func TestPosixReadFile(t *testing.T) { // results. In this scenario return 'n' is always lesser than the input buffer. if err == io.ErrUnexpectedEOF { if !bytes.Equal(testCase.expectedBuf, buf[:n]) { - t.Errorf("Case: %d %#v, expected: \"%s\", got: \"%s\"", i+1, testCase, string(testCase.expectedBuf), string(buf[:testCase.bufSize])) + t.Errorf("Case: %d %#v, expected: \"%s\", got: \"%s\"", i+1, testCase, string(testCase.expectedBuf), string(buf[:n])) } if n > int64(len(buf)) { t.Errorf("Case: %d %#v, expected: %d, got: %d", i+1, testCase, testCase.bufSize, n) @@ -1245,7 +1235,7 @@ func TestPosixReadFile(t *testing.T) { // Common read buffer. var buf = make([]byte, 10) - if _, err = posixStorage.ReadFile("mybucket", "myobject", 0, buf, nil); err != errFileAccessDenied { + if _, err = posixStorage.ReadFile("mybucket", "myobject", 0, buf, v); err != errFileAccessDenied { t.Errorf("expected: %s, got: %s", errFileAccessDenied, err) } } diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index 95ff92f3c..1e4573c8f 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -79,6 +79,9 @@ var errCrossDeviceLink = errors.New("Rename across devices not allowed, please f // errMinDiskSize - cannot create volume or files when disk size is less than threshold. var errMinDiskSize = errors.New("The disk size is less than the minimum threshold") +// errLessData - returned when less data available than what was requested. +var errLessData = errors.New("less data available than what was requested") + // hashMisMatchError - represents a bit-rot hash verification failure // error. type hashMismatchError struct { diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index ae123ef22..7dd6172f9 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -233,7 +233,6 @@ func (client *StorageRPCClient) ReadFile(volume string, path string, offset int6 if verifier != nil { args.Algo = verifier.algorithm args.ExpectedHash = verifier.sum - args.Verified = verifier.IsVerified() } var reply []byte diff --git a/cmd/storage-rpc_test.go b/cmd/storage-rpc_test.go index 7d2b72921..68c3fbfb6 100644 --- a/cmd/storage-rpc_test.go +++ b/cmd/storage-rpc_test.go @@ -17,6 +17,7 @@ package cmd import ( + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -347,13 +348,14 @@ func testStorageAPIReadFile(t *testing.T, storage StorageAPI) { {"foo", "yourobject", 0, nil, true}, } + result := make([]byte, 100) for i, testCase := range testCases { - result := make([]byte, 100) - n, err := storage.ReadFile(testCase.volumeName, testCase.objectName, testCase.offset, result, nil) + result = result[testCase.offset:3] + _, err := storage.ReadFile(testCase.volumeName, testCase.objectName, testCase.offset, result, nil) expectErr := (err != nil) - result = result[:n] if expectErr != testCase.expectErr { + fmt.Println(err) t.Fatalf("case %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) } diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index b1f72ef8a..e503061fe 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -103,16 +103,3 @@ func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { } // Exhausted all disks - return false. return false } - -// Calculate the space occupied by an object in a single disk -func (xl xlObjects) sizeOnDisk(fileSize int64, blockSize int64, dataBlocks int) int64 { - numBlocks := fileSize / blockSize - chunkSize := ceilFrac(blockSize, int64(dataBlocks)) - sizeInDisk := numBlocks * chunkSize - remaining := fileSize % blockSize - if remaining > 0 { - sizeInDisk += ceilFrac(remaining, int64(dataBlocks)) - } - - return sizeInDisk -} diff --git a/cmd/xl-v1-healing-common.go b/cmd/xl-v1-healing-common.go index a1d83da1b..5a8ad2496 100644 --- a/cmd/xl-v1-healing-common.go +++ b/cmd/xl-v1-healing-common.go @@ -163,7 +163,6 @@ func getLatestXLMeta(ctx context.Context, partsMetadata []xlMetaV1, errs []error // other than file not found and not a checksum error). func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, object string) ([]StorageAPI, []error, error) { - availableDisks := make([]StorageAPI, len(onlineDisks)) buffer := []byte{} dataErrs := make([]error, len(onlineDisks)) diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 963410eeb..757a1d3a3 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -447,28 +447,35 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o // Heal each part. erasureHealFile() will write the healed // part to .minio/tmp/uuid/ which needs to be renamed later to // the final location. - storage, err := NewErasureStorage(ctx, latestDisks, latestMeta.Erasure.DataBlocks, + storage, err := NewErasureStorage(ctx, latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize) if err != nil { return result, toObjectErr(err, bucket, object) } - checksums := make([][]byte, len(latestDisks)) + for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { partName := latestMeta.Parts[partIndex].Name partSize := latestMeta.Parts[partIndex].Size erasure := latestMeta.Erasure var algorithm BitrotAlgorithm - for i, disk := range storage.disks { - if disk != OfflineDisk { - info := partsMetadata[i].Erasure.GetChecksumInfo(partName) - algorithm = info.Algorithm - checksums[i] = info.Hash + bitrotReaders := make([]*bitrotReader, len(latestDisks)) + for i, disk := range latestDisks { + if disk == OfflineDisk { + continue } + info := partsMetadata[i].Erasure.GetChecksumInfo(partName) + algorithm = info.Algorithm + endOffset := getErasureShardFileEndOffset(0, partSize, partSize, erasure.BlockSize, storage.dataBlocks) + bitrotReaders[i] = newBitrotReader(disk, bucket, pathJoin(object, partName), algorithm, endOffset, info.Hash) } - // Heal the part file. - file, hErr := storage.HealFile(ctx, outDatedDisks, bucket, pathJoin(object, partName), - erasure.BlockSize, minioMetaTmpBucket, pathJoin(tmpID, partName), partSize, - algorithm, checksums) + bitrotWriters := make([]*bitrotWriter, len(outDatedDisks)) + for i, disk := range outDatedDisks { + if disk == OfflineDisk { + continue + } + bitrotWriters[i] = newBitrotWriter(disk, minioMetaTmpBucket, pathJoin(tmpID, partName), algorithm) + } + hErr := storage.HealFile(ctx, bitrotReaders, bitrotWriters, partSize) if hErr != nil { return result, toObjectErr(hErr, bucket, object) } @@ -480,14 +487,14 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o } // A non-nil stale disk which did not receive // a healed part checksum had a write error. - if file.Checksums[i] == nil { + if bitrotWriters[i] == nil { outDatedDisks[i] = nil disksToHealCount-- continue } // append part checksums checksumInfos[i] = append(checksumInfos[i], - ChecksumInfo{partName, file.Algorithm, file.Checksums[i]}) + ChecksumInfo{partName, algorithm, bitrotWriters[i].Sum()}) } // If all disks are having errors, we give up. diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 4c9319a4e..a7f8e3ddb 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -20,92 +20,17 @@ import ( "context" "encoding/hex" "encoding/json" - "errors" "fmt" - "hash" "path" "sort" "sync" "time" - "github.com/minio/highwayhash" "github.com/minio/minio/cmd/logger" - sha256 "github.com/minio/sha256-simd" - "golang.org/x/crypto/blake2b" ) const erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde" -// magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key. -var magicHighwayHash256Key = []byte("\x4b\xe7\x34\xfa\x8e\x23\x8a\xcd\x26\x3e\x83\xe6\xbb\x96\x85\x52\x04\x0f\x93\x5d\xa3\x9f\x44\x14\x97\xe0\x9d\x13\x22\xde\x36\xa0") - -// BitrotAlgorithm specifies a algorithm used for bitrot protection. -type BitrotAlgorithm uint - -const ( - // SHA256 represents the SHA-256 hash function - SHA256 BitrotAlgorithm = 1 + iota - // HighwayHash256 represents the HighwayHash-256 hash function - HighwayHash256 - // BLAKE2b512 represents the BLAKE2b-256 hash function - BLAKE2b512 -) - -// DefaultBitrotAlgorithm is the default algorithm used for bitrot protection. -var DefaultBitrotAlgorithm = HighwayHash256 - -var bitrotAlgorithms = map[BitrotAlgorithm]string{ - SHA256: "sha256", - BLAKE2b512: "blake2b", - HighwayHash256: "highwayhash256", -} - -// New returns a new hash.Hash calculating the given bitrot algorithm. -// New logs error and exits if the algorithm is not supported or not -// linked into the binary. -func (a BitrotAlgorithm) New() hash.Hash { - switch a { - case SHA256: - return sha256.New() - case BLAKE2b512: - b2, _ := blake2b.New512(nil) // New512 never returns an error if the key is nil - return b2 - case HighwayHash256: - hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit - return hh - } - logger.CriticalIf(context.Background(), errors.New("Unsupported bitrot algorithm")) - return nil -} - -// Available reports whether the given algorihm is a supported and linked into the binary. -func (a BitrotAlgorithm) Available() bool { - _, ok := bitrotAlgorithms[a] - return ok -} - -// String returns the string identifier for a given bitrot algorithm. -// If the algorithm is not supported String panics. -func (a BitrotAlgorithm) String() string { - name, ok := bitrotAlgorithms[a] - if !ok { - logger.CriticalIf(context.Background(), errors.New("Unsupported bitrot algorithm")) - } - return name -} - -// BitrotAlgorithmFromString returns a bitrot algorithm from the given string representation. -// It returns 0 if the string representation does not match any supported algorithm. -// The zero value of a bitrot algorithm is never supported. -func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) { - for alg, name := range bitrotAlgorithms { - if name == s { - return alg - } - } - return -} - // objectPartInfo Info of each part kept in the multipart metadata // file after CompleteMultipartUpload() is called. type objectPartInfo struct { @@ -129,15 +54,15 @@ type ChecksumInfo struct { Hash []byte } +type checksumInfoJSON struct { + Name string `json:"name"` + Algorithm string `json:"algorithm"` + Hash string `json:"hash"` +} + // MarshalJSON marshals the ChecksumInfo struct func (c ChecksumInfo) MarshalJSON() ([]byte, error) { - type checksuminfo struct { - Name string `json:"name"` - Algorithm string `json:"algorithm"` - Hash string `json:"hash"` - } - - info := checksuminfo{ + info := checksumInfoJSON{ Name: c.Name, Algorithm: c.Algorithm.String(), Hash: hex.EncodeToString(c.Hash), @@ -145,28 +70,25 @@ func (c ChecksumInfo) MarshalJSON() ([]byte, error) { return json.Marshal(info) } -// UnmarshalJSON unmarshals the the given data into the ChecksumInfo struct +// UnmarshalJSON - should never be called, instead xlMetaV1UnmarshalJSON() should be used. func (c *ChecksumInfo) UnmarshalJSON(data []byte) error { - type checksuminfo struct { - Name string `json:"name"` - Algorithm string `json:"algorithm"` - Hash string `json:"hash"` - } - - var info checksuminfo - err := json.Unmarshal(data, &info) - if err != nil { + logger.LogIf(context.Background(), errUnexpected) + var info checksumInfoJSON + if err := json.Unmarshal(data, &info); err != nil { return err } - c.Algorithm = BitrotAlgorithmFromString(info.Algorithm) - if !c.Algorithm.Available() { - return errBitrotHashAlgoInvalid - } - c.Hash, err = hex.DecodeString(info.Hash) + sum, err := hex.DecodeString(info.Hash) if err != nil { return err } c.Name = info.Name + c.Algorithm = BitrotAlgorithmFromString(info.Algorithm) + c.Hash = sum + + if !c.Algorithm.Available() { + logger.LogIf(context.Background(), errBitrotHashAlgoInvalid) + return errBitrotHashAlgoInvalid + } return nil } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index ffecedd40..37f167b1a 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -369,7 +369,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID } } - storage, err := NewErasureStorage(ctx, onlineDisks, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) + storage, err := NewErasureStorage(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -387,18 +387,34 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID defer xl.bp.Put(buffer) } - file, err := storage.CreateFile(ctx, data, minioMetaTmpBucket, tmpPartPath, buffer, DefaultBitrotAlgorithm, writeQuorum) + if len(buffer) > int(xlMeta.Erasure.BlockSize) { + buffer = buffer[:xlMeta.Erasure.BlockSize] + } + writers := make([]*bitrotWriter, len(onlineDisks)) + for i, disk := range onlineDisks { + if disk == nil { + continue + } + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, DefaultBitrotAlgorithm) + } + n, err := storage.CreateFile(ctx, data, writers, buffer, storage.dataBlocks+1) if err != nil { return pi, toObjectErr(err, bucket, object) } // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. - if file.Size < data.Size() { + if n < data.Size() { logger.LogIf(ctx, IncompleteBody{}) return pi, IncompleteBody{} } + for i := range writers { + if writers[i] == nil { + onlineDisks[i] = nil + } + } + // post-upload check (write) lock postUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath) if err = postUploadIDLock.GetLock(globalOperationTimeout); err != nil { @@ -440,14 +456,14 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID md5hex := hex.EncodeToString(data.MD5Current()) // Add the current part. - xlMeta.AddObjectPart(partID, partSuffix, md5hex, file.Size) + xlMeta.AddObjectPart(partID, partSuffix, md5hex, n) for i, disk := range onlineDisks { if disk == OfflineDisk { continue } partsMetadata[i].Parts = xlMeta.Parts - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partSuffix, file.Algorithm, file.Checksums[i]}) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partSuffix, DefaultBitrotAlgorithm, writers[i].Sum()}) } // Write all the checksum metadata. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index bfdc039b4..1e007be00 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -60,7 +60,7 @@ func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, wri func (xl xlObjects) prepareFile(ctx context.Context, bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error { pErrs := make([]error, len(onlineDisks)) // Calculate the real size of the part in one disk. - actualSize := xl.sizeOnDisk(size, blockSize, dataBlocks) + actualSize := getErasureShardFileSize(blockSize, size, dataBlocks) // Prepare object creation in a all disks for index, disk := range onlineDisks { if disk != nil { @@ -262,11 +262,11 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO } var totalBytesRead int64 - storage, err := NewErasureStorage(ctx, onlineDisks, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) + storage, err := NewErasureStorage(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) if err != nil { return toObjectErr(err, bucket, object) } - checksums := make([][]byte, len(storage.disks)) + for ; partIndex <= lastPartIndex; partIndex++ { if length == totalBytesRead { break @@ -275,30 +275,34 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size - readSize := partSize - partOffset - // readSize should be adjusted so that we don't write more data than what was requested. - if readSize > (length - totalBytesRead) { - readSize = length - totalBytesRead + partLength := partSize - partOffset + // partLength should be adjusted so that we don't write more data than what was requested. + if partLength > (length - totalBytesRead) { + partLength = length - totalBytesRead } // Get the checksums of the current part. - var algorithm BitrotAlgorithm - for index, disk := range storage.disks { + bitrotReaders := make([]*bitrotReader, len(onlineDisks)) + for index, disk := range onlineDisks { if disk == OfflineDisk { continue } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partName) - algorithm = checksumInfo.Algorithm - checksums[index] = checksumInfo.Hash + endOffset := getErasureShardFileEndOffset(partOffset, partLength, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) + bitrotReaders[index] = newBitrotReader(disk, bucket, pathJoin(object, partName), checksumInfo.Algorithm, endOffset, checksumInfo.Hash) } - file, err := storage.ReadFile(ctx, writer, bucket, pathJoin(object, partName), partOffset, readSize, partSize, checksums, algorithm, xlMeta.Erasure.BlockSize) + err := storage.ReadFile(ctx, writer, bitrotReaders, partOffset, partLength, partSize) if err != nil { return toObjectErr(err, bucket, object) } - + for i, r := range bitrotReaders { + if r == nil { + onlineDisks[i] = OfflineDisk + } + } // Track total bytes read from disk and written to the client. - totalBytesRead += file.Size + totalBytesRead += partLength // partOffset will be valid only for the first part, hence reset it to 0 for // the remaining parts. @@ -605,7 +609,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // Total size of the written object var sizeWritten int64 - storage, err := NewErasureStorage(ctx, onlineDisks, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) + storage, err := NewErasureStorage(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -623,6 +627,10 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, defer xl.bp.Put(buffer) } + if len(buffer) > int(xlMeta.Erasure.BlockSize) { + buffer = buffer[:xlMeta.Erasure.BlockSize] + } + // Read data and split into parts - similar to multipart mechanism for partIdx := 1; ; partIdx++ { // Compute part name @@ -641,7 +649,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // This is only an optimization. var curPartReader io.Reader if curPartSize > 0 { - pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tempErasureObj, curPartSize, storage.disks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum) + pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tempErasureObj, curPartSize, onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum) if pErr != nil { return ObjectInfo{}, toObjectErr(pErr, bucket, object) } @@ -653,25 +661,35 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, curPartReader = reader } - file, erasureErr := storage.CreateFile(ctx, curPartReader, minioMetaTmpBucket, - tempErasureObj, buffer, DefaultBitrotAlgorithm, writeQuorum) + writers := make([]*bitrotWriter, len(onlineDisks)) + for i, disk := range onlineDisks { + if disk == nil { + continue + } + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, DefaultBitrotAlgorithm) + } + n, erasureErr := storage.CreateFile(ctx, curPartReader, writers, buffer, storage.dataBlocks+1) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) } // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. - if file.Size < curPartSize { + if n < curPartSize { logger.LogIf(ctx, IncompleteBody{}) return ObjectInfo{}, IncompleteBody{} } // Update the total written size - sizeWritten += file.Size + sizeWritten += n - for i := range partsMetadata { - partsMetadata[i].AddObjectPart(partIdx, partName, "", file.Size) - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, file.Algorithm, file.Checksums[i]}) + for i, w := range writers { + if w == nil { + onlineDisks[i] = nil + continue + } + partsMetadata[i].AddObjectPart(partIdx, partName, "", n) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, w.Sum()}) } // We wrote everything, break out. diff --git a/cmd/xl-v1-utils_test.go b/cmd/xl-v1-utils_test.go index a55171acf..b07eeb036 100644 --- a/cmd/xl-v1-utils_test.go +++ b/cmd/xl-v1-utils_test.go @@ -214,7 +214,6 @@ func getSampleXLMeta(totalParts int) xlMetaV1 { // Compare the unmarshaled XLMetaV1 with the one obtained from gjson parsing. func compareXLMetaV1(t *testing.T, unMarshalXLMeta, gjsonXLMeta xlMetaV1) { - // Start comparing the fields of xlMetaV1 obtained from gjson parsing with one parsed using json unmarshaling. if unMarshalXLMeta.Version != gjsonXLMeta.Version { t.Errorf("Expected the Version to be \"%s\", but got \"%s\".", unMarshalXLMeta.Version, gjsonXLMeta.Version) @@ -268,6 +267,7 @@ func compareXLMetaV1(t *testing.T, unMarshalXLMeta, gjsonXLMeta xlMetaV1) { } } } + if unMarshalXLMeta.Minio.Release != gjsonXLMeta.Minio.Release { t.Errorf("Expected the Release string to be \"%s\", but got \"%s\".", unMarshalXLMeta.Minio.Release, gjsonXLMeta.Minio.Release) }