mirror of
https://github.com/minio/minio.git
synced 2025-01-22 20:23:14 -05:00
Fix hanging erasure writes (#12253)
However, this slice is also used for closing the writers, so close is never called on these. Furthermore when an error is returned from a write it is now reported to the reader. bonus: remove unused heal param from `newBitrotWriter`. * Remove copy, now that we don't mutate.
This commit is contained in:
parent
55375fa7f6
commit
cde6469b88
@ -57,9 +57,15 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
|
||||
hashBytes := b.h.Sum(nil)
|
||||
_, err := b.iow.Write(hashBytes)
|
||||
if err != nil {
|
||||
b.closeWithErr(err)
|
||||
return 0, err
|
||||
}
|
||||
return b.iow.Write(p)
|
||||
n, err := b.iow.Write(p)
|
||||
if err != nil {
|
||||
b.closeWithErr(err)
|
||||
return n, err
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (b *streamingBitrotWriter) Close() error {
|
||||
@ -77,13 +83,17 @@ func (b *streamingBitrotWriter) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Returns streaming bitrot writer implementation.
|
||||
func newStreamingBitrotWriterBuffer(w io.Writer, algo BitrotAlgorithm, shardSize int64) io.WriteCloser {
|
||||
return &streamingBitrotWriter{iow: ioutil.NopCloser(w), h: algo.New(), shardSize: shardSize, canClose: nil}
|
||||
// newStreamingBitrotWriterBuffer returns streaming bitrot writer implementation.
|
||||
// The output is written to the supplied writer w.
|
||||
func newStreamingBitrotWriterBuffer(w io.Writer, algo BitrotAlgorithm, shardSize int64) io.Writer {
|
||||
return &streamingBitrotWriter{iow: ioutil.NopCloser(w), h: algo.New(), shardSize: shardSize, canClose: nil, closeWithErr: func(err error) error {
|
||||
// Similar to CloseWithError on pipes we always return nil.
|
||||
return nil
|
||||
}}
|
||||
}
|
||||
|
||||
// Returns streaming bitrot writer implementation.
|
||||
func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer {
|
||||
func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer {
|
||||
r, w := io.Pipe()
|
||||
h := algo.New()
|
||||
|
||||
|
@ -100,9 +100,9 @@ func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) {
|
||||
return
|
||||
}
|
||||
|
||||
func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer {
|
||||
func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer {
|
||||
if algo == HighwayHash256S {
|
||||
return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize, heal)
|
||||
return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize)
|
||||
}
|
||||
return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize)
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) {
|
||||
|
||||
disk.MakeVol(context.Background(), volume)
|
||||
|
||||
writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10, false)
|
||||
writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10)
|
||||
|
||||
_, err = writer.Write([]byte("aaaaaaaaaa"))
|
||||
if err != nil {
|
||||
|
@ -20,13 +20,12 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
crand "crypto/rand"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
func (a badDisk) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
|
||||
@ -109,8 +108,7 @@ func TestErasureDecode(t *testing.T) {
|
||||
buffer := make([]byte, test.blocksize, 2*test.blocksize)
|
||||
writers := make([]io.Writer, len(disks))
|
||||
for i, disk := range disks {
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object",
|
||||
erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
n, err := erasure.Encode(context.Background(), bytes.NewReader(data[:]), writers, buffer, erasure.dataBlocks+1)
|
||||
closeBitrotWriters(writers)
|
||||
@ -236,8 +234,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object",
|
||||
erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
// 10000 iterations with random offsets and lengths.
|
||||
@ -307,8 +304,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64,
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object",
|
||||
erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
content := make([]byte, size)
|
||||
|
@ -20,7 +20,6 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
|
@ -109,7 +109,7 @@ func TestErasureEncode(t *testing.T) {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize())
|
||||
}
|
||||
n, err := erasure.Encode(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, erasure.dataBlocks+1)
|
||||
closeBitrotWriters(writers)
|
||||
@ -133,7 +133,7 @@ func TestErasureEncode(t *testing.T) {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize())
|
||||
}
|
||||
for j := range disks[:test.offDisks] {
|
||||
switch w := writers[j].(type) {
|
||||
@ -197,8 +197,7 @@ func benchmarkErasureEncode(data, parity, dataDown, parityDown int, size int64,
|
||||
continue
|
||||
}
|
||||
disk.Delete(context.Background(), "testbucket", "object", false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object",
|
||||
erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
_, err := erasure.Encode(context.Background(), bytes.NewReader(content), writers, buffer, erasure.dataBlocks+1)
|
||||
closeBitrotWriters(writers)
|
||||
|
@ -91,8 +91,7 @@ func TestErasureHeal(t *testing.T) {
|
||||
buffer := make([]byte, test.blocksize, 2*test.blocksize)
|
||||
writers := make([]io.Writer, len(disks))
|
||||
for i, disk := range disks {
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "testobject",
|
||||
erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true)
|
||||
writers[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize())
|
||||
}
|
||||
_, err = erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1)
|
||||
closeBitrotWriters(writers)
|
||||
@ -135,8 +134,7 @@ func TestErasureHeal(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
os.Remove(pathJoin(disk.String(), "testbucket", "testobject"))
|
||||
staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject",
|
||||
erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true)
|
||||
staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
// Number of buffers, max 2GB
|
||||
|
@ -466,7 +466,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
} else {
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath,
|
||||
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true)
|
||||
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
}
|
||||
err = erasure.Heal(ctx, readers, writers, partSize, er.bp)
|
||||
|
@ -482,8 +482,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath,
|
||||
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
n, err := erasure.Encode(rctx, data, writers, buffer, writeQuorum)
|
||||
|
@ -733,8 +733,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj,
|
||||
shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
|
@ -1787,7 +1787,7 @@ func TestXLStorageVerifyFile(t *testing.T) {
|
||||
algo = HighwayHash256S
|
||||
shardSize := int64(1024 * 1024)
|
||||
shard := make([]byte, shardSize)
|
||||
w := newStreamingBitrotWriter(storage, volName, fileName, size, algo, shardSize, false)
|
||||
w := newStreamingBitrotWriter(storage, volName, fileName, size, algo, shardSize)
|
||||
reader := bytes.NewReader(data)
|
||||
for {
|
||||
// Using io.Copy instead of this loop will not work for us as io.Copy
|
||||
|
Loading…
x
Reference in New Issue
Block a user