diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index d7d865536..ba74cac11 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -23,8 +23,11 @@ import ( "fmt" "hash" "io" + "time" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/env" + xioutil "github.com/minio/minio/pkg/ioutil" ) type errHashMismatch struct { @@ -37,7 +40,7 @@ func (err *errHashMismatch) Error() string { // Calculates bitrot in chunks and writes the hash into the stream. type streamingBitrotWriter struct { - iow *io.PipeWriter + iow io.WriteCloser h hash.Hash shardSize int64 canClose chan struct{} // Needed to avoid race explained in Close() call. @@ -70,19 +73,28 @@ func (b *streamingBitrotWriter) Close() error { return err } +var ( + ioDeadline, _ = time.ParseDuration(env.Get("MINIO_IO_DEADLINE", "")) +) + // Returns streaming bitrot writer implementation. -func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { +func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer { r, w := io.Pipe() h := algo.New() - bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{})} + + var wc io.WriteCloser = w + if ioDeadline > 0 && !heal { + wc = xioutil.NewDeadlineWriter(w, ioDeadline) + } + + bw := &streamingBitrotWriter{wc, h, shardSize, make(chan struct{})} go func() { totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1) if length != -1 { bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. totalFileSize = bitrotSumsTotalSize + length } - err := disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r) - r.CloseWithError(err) + r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r)) close(bw.canClose) }() return bw diff --git a/cmd/bitrot.go b/cmd/bitrot.go index ec478727e..e29e478bd 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -96,9 +96,9 @@ func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) { return } -func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { +func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer { if algo == HighwayHash256S { - return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize) + return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize, heal) } return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) } diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go index 86d3a0b9b..54aa13508 100644 --- a/cmd/bitrot_test.go +++ b/cmd/bitrot_test.go @@ -41,7 +41,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) { disk.MakeVol(context.Background(), volume) - writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10) + writer := newBitrotWriter(disk, volume, filePath, 35, bitrotAlgo, 10, false) _, err = writer.Write([]byte("aaaaaaaaaa")) if err != nil { diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index 30ae3f6b6..4e4f1df81 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -108,7 +108,8 @@ func TestErasureDecode(t *testing.T) { buffer := make([]byte, test.blocksize, 2*test.blocksize) writers := make([]io.Writer, len(disks)) for i, disk := range disks { - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "object", + erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize(), false) } n, err := erasure.Encode(context.Background(), bytes.NewReader(data[:]), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) @@ -234,7 +235,8 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "object", + erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } // 10000 iterations with random offsets and lengths. @@ -304,7 +306,8 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "object", + erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } content := make([]byte, size) diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go index 7daf3ce1a..8788b51ed 100644 --- a/cmd/erasure-encode_test.go +++ b/cmd/erasure-encode_test.go @@ -108,7 +108,7 @@ func TestErasureEncode(t *testing.T) { if disk == OfflineDisk { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize(), false) } n, err := erasure.Encode(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) @@ -132,14 +132,14 @@ func TestErasureEncode(t *testing.T) { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize(), false) } for j := range disks[:test.offDisks] { switch w := writers[j].(type) { case *wholeBitrotWriter: w.disk = badDisk{nil} case *streamingBitrotWriter: - w.iow.CloseWithError(errFaultyDisk) + w.iow.(*io.PipeWriter).CloseWithError(errFaultyDisk) } } if test.offDisks > 0 { @@ -196,7 +196,8 @@ func benchmarkErasureEncode(data, parity, dataDown, parityDown int, size int64, continue } disk.Delete(context.Background(), "testbucket", "object", false) - writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "object", + erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } _, err := erasure.Encode(context.Background(), bytes.NewReader(content), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 1c220f952..00bb3b5c5 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -87,7 +87,8 @@ func TestErasureHeal(t *testing.T) { buffer := make([]byte, test.blocksize, 2*test.blocksize) writers := make([]io.Writer, len(disks)) for i, disk := range disks { - writers[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, "testbucket", "testobject", + erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true) } _, err = erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1) closeBitrotWriters(writers) @@ -130,7 +131,8 @@ func TestErasureHeal(t *testing.T) { continue } os.Remove(pathJoin(disk.String(), "testbucket", "testobject")) - staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) + staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", + erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize(), true) } // test case setup is complete - now call Heal() diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 44a0cbdbb..c9fddec55 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -422,7 +422,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s continue } partPath := pathJoin(tmpID, dataDir, fmt.Sprintf("part.%d", partNumber)) - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, + tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true) } err = erasure.Heal(ctx, readers, writers, partSize) closeBitrotReaders(readers) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 11a38393e..0681a67de 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -476,7 +476,8 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo if disk == nil { continue } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, + erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index b2c8140ae..1e7328d7e 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -718,7 +718,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st if disk == nil { continue } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, + erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum) diff --git a/cmd/iam.go b/cmd/iam.go index b6c20b175..d24e22a14 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -577,15 +577,6 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // Hold the lock for migration only. txnLk := objAPI.NewNSLock(minioMetaBucket, minioConfigPrefix+"/iam.lock") - // Initializing IAM sub-system needs a retry mechanism for - // the following reasons: - // - Read quorum is lost just after the initialization - // of the object layer. - // - Write quorum not met when upgrading configuration - // version is needed, migration is needed etc. - rquorum := InsufficientReadQuorum{} - wquorum := InsufficientWriteQuorum{} - // allocate dynamic timeout once before the loop iamLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second) @@ -620,12 +611,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { // Migrate IAM configuration, if necessary. if err := sys.doIAMConfigMigration(ctx); err != nil { txnLk.Unlock() - if errors.Is(err, errDiskNotFound) || - errors.Is(err, errConfigNotFound) || - errors.Is(err, context.DeadlineExceeded) || - errors.As(err, &rquorum) || - errors.As(err, &wquorum) || - isErrBucketNotFound(err) { + if configRetriableErrors(err) { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) continue } @@ -641,12 +627,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for { if err := sys.store.loadAll(ctx, sys); err != nil { - if errors.Is(err, errDiskNotFound) || - errors.Is(err, errConfigNotFound) || - errors.Is(err, context.DeadlineExceeded) || - errors.As(err, &rquorum) || - errors.As(err, &wquorum) || - isErrBucketNotFound(err) { + if configRetriableErrors(err) { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) continue diff --git a/cmd/server-main.go b/cmd/server-main.go index 100503c0a..d6b841d82 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -237,6 +237,28 @@ func newAllSubsystems() { globalBucketTargetSys = NewBucketTargetSys() } +func configRetriableErrors(err error) bool { + // Initializing sub-systems needs a retry mechanism for + // the following reasons: + // - Read quorum is lost just after the initialization + // of the object layer. + // - Write quorum not met when upgrading configuration + // version is needed, migration is needed etc. + rquorum := InsufficientReadQuorum{} + wquorum := InsufficientWriteQuorum{} + + // One of these retriable errors shall be retried. + return errors.Is(err, errDiskNotFound) || + errors.Is(err, errConfigNotFound) || + errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, errErasureWriteQuorum) || + errors.Is(err, errErasureReadQuorum) || + errors.As(err, &rquorum) || + errors.As(err, &wquorum) || + isErrBucketNotFound(err) || + errors.Is(err, os.ErrDeadlineExceeded) +} + func initServer(ctx context.Context, newObject ObjectLayer) error { // Once the config is fully loaded, initialize the new object layer. setObjectLayer(newObject) @@ -252,15 +274,6 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { // Migrating to encrypted backend should happen before initialization of any // sub-systems, make sure that we do not move the above codeblock elsewhere. - // Initializing sub-systems needs a retry mechanism for - // the following reasons: - // - Read quorum is lost just after the initialization - // of the object layer. - // - Write quorum not met when upgrading configuration - // version is needed, migration is needed etc. - rquorum := InsufficientReadQuorum{} - wquorum := InsufficientWriteQuorum{} - r := rand.New(rand.NewSource(time.Now().UnixNano())) lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second) @@ -307,15 +320,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error { txnLk.Unlock() // Unlock the transaction lock and allow other nodes to acquire the lock if possible. - // One of these retriable errors shall be retried. - if errors.Is(err, errDiskNotFound) || - errors.Is(err, errConfigNotFound) || - errors.Is(err, context.DeadlineExceeded) || - errors.Is(err, errErasureWriteQuorum) || - errors.Is(err, errErasureReadQuorum) || - errors.As(err, &rquorum) || - errors.As(err, &wquorum) || - isErrBucketNotFound(err) { + if configRetriableErrors(err) { logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err) time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) continue @@ -333,8 +338,6 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { // you want to add extra context to your error. This // ensures top level retry works accordingly. // List buckets to heal, and be re-used for loading configs. - rquorum := InsufficientReadQuorum{} - wquorum := InsufficientWriteQuorum{} buckets, err := newObject.ListBuckets(ctx) if err != nil { @@ -368,14 +371,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { // Initialize config system. if err = globalConfigSys.Init(newObject); err != nil { - if errors.Is(err, errDiskNotFound) || - errors.Is(err, errConfigNotFound) || - errors.Is(err, context.DeadlineExceeded) || - errors.Is(err, errErasureWriteQuorum) || - errors.Is(err, errErasureReadQuorum) || - errors.As(err, &rquorum) || - errors.As(err, &wquorum) || - isErrBucketNotFound(err) { + if configRetriableErrors(err) { return fmt.Errorf("Unable to initialize config system: %w", err) } // Any other config errors we simply print a message and proceed forward. diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 36bacf283..85c0ee6e6 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -62,7 +62,7 @@ const ( // Detects change in underlying disk. type xlStorageDiskIDCheck struct { - storage *xlStorage + storage StorageAPI diskID string apiCalls [metricLast]uint64 diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index a7153d46e..df3bb2196 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -71,6 +71,13 @@ const ( xlStorageFormatFile = "xl.meta" ) +var alignedBuf []byte + +func init() { + alignedBuf = disk.AlignedBlock(4096) + _, _ = rand.Read(alignedBuf) +} + // isValidVolname verifies a volname name in accordance with object // layer requirements. func isValidVolname(volname string) bool { @@ -282,10 +289,17 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { var rnd [8]byte _, _ = rand.Read(rnd[:]) tmpFile := ".writable-check-" + hex.EncodeToString(rnd[:]) + ".tmp" - if err = p.CreateFile(GlobalContext, minioMetaTmpBucket, tmpFile, 1, strings.NewReader("0")); err != nil { + filePath := pathJoin(p.diskPath, minioMetaTmpBucket, tmpFile) + w, err := disk.OpenFileDirectIO(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0666) + if err != nil { return p, err } - defer os.Remove(pathJoin(p.diskPath, minioMetaTmpBucket, tmpFile)) + if _, err = w.Write(alignedBuf[:]); err != nil { + w.Close() + return p, err + } + w.Close() + defer os.Remove(filePath) volumeDir, err := p.getVolDir(minioMetaTmpBucket) if err != nil { @@ -294,7 +308,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { // Check if backend is readable, and optionally supports O_DIRECT. if _, err = p.readAllData(volumeDir, pathJoin(volumeDir, tmpFile), true); err != nil { - if err != errUnsupportedDisk { + if !errors.Is(err, errUnsupportedDisk) { return p, err } // error is unsupported disk, turn-off directIO for reads @@ -1432,7 +1446,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz return errInvalidArgument } - if fileSize <= smallFileThreshold { + if fileSize >= 0 && fileSize <= smallFileThreshold { // For streams smaller than 128KiB we simply write them as O_DSYNC (fdatasync) // and not O_DIRECT to avoid the complexities of aligned I/O. w, err := s.openFile(volume, path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC) @@ -1483,9 +1497,9 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz return err } - if written < fileSize { + if written < fileSize && fileSize >= 0 { return errLessData - } else if written > fileSize { + } else if written > fileSize && fileSize >= 0 { return errMoreData } diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index 9635341b4..8a112cd3b 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -1730,7 +1730,7 @@ func TestXLStorageVerifyFile(t *testing.T) { // 4) Streaming bitrot check on corrupted file // create xlStorage test setup - xlStorage, path, err := newXLStorageTestSetup() + storage, path, err := newXLStorageTestSetup() if err != nil { t.Fatalf("Unable to create xlStorage test setup, %s", err) } @@ -1738,7 +1738,7 @@ func TestXLStorageVerifyFile(t *testing.T) { volName := "testvol" fileName := "testfile" - if err := xlStorage.MakeVol(context.Background(), volName); err != nil { + if err := storage.MakeVol(context.Background(), volName); err != nil { t.Fatal(err) } @@ -1752,29 +1752,29 @@ func TestXLStorageVerifyFile(t *testing.T) { h := algo.New() h.Write(data) hashBytes := h.Sum(nil) - if err := xlStorage.WriteAll(context.Background(), volName, fileName, data); err != nil { + if err := storage.WriteAll(context.Background(), volName, fileName, data); err != nil { t.Fatal(err) } - if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err != nil { + if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err != nil { t.Fatal(err) } // 2) Whole-file bitrot check on corrupted file - if err := xlStorage.AppendFile(context.Background(), volName, fileName, []byte("a")); err != nil { + if err := storage.AppendFile(context.Background(), volName, fileName, []byte("a")); err != nil { t.Fatal(err) } // Check if VerifyFile reports the incorrect file length (the correct length is `size+1`) - if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err == nil { + if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, hashBytes, 0); err == nil { t.Fatal("expected to fail bitrot check") } // Check if bitrot fails - if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, hashBytes, 0); err == nil { + if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, hashBytes, 0); err == nil { t.Fatal("expected to fail bitrot check") } - if err := xlStorage.Delete(context.Background(), volName, fileName, false); err != nil { + if err := storage.Delete(context.Background(), volName, fileName, false); err != nil { t.Fatal(err) } @@ -1782,7 +1782,7 @@ func TestXLStorageVerifyFile(t *testing.T) { algo = HighwayHash256S shardSize := int64(1024 * 1024) shard := make([]byte, shardSize) - w := newStreamingBitrotWriter(xlStorage, volName, fileName, size, algo, shardSize) + w := newStreamingBitrotWriter(storage, volName, fileName, size, algo, shardSize, false) reader := bytes.NewReader(data) for { // Using io.Copy instead of this loop will not work for us as io.Copy @@ -1798,13 +1798,13 @@ func TestXLStorageVerifyFile(t *testing.T) { } t.Fatal(err) } - w.Close() - if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err != nil { + w.(io.Closer).Close() + if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err != nil { t.Fatal(err) } // 4) Streaming bitrot check on corrupted file - filePath := pathJoin(xlStorage.String(), volName, fileName) + filePath := pathJoin(storage.String(), volName, fileName) f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_SYNC, 0644) if err != nil { t.Fatal(err) @@ -1814,10 +1814,10 @@ func TestXLStorageVerifyFile(t *testing.T) { t.Fatal(err) } f.Close() - if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err == nil { + if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size, algo, nil, shardSize); err == nil { t.Fatal("expected to fail bitrot check") } - if err := xlStorage.storage.bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, nil, shardSize); err == nil { + if err := storage.storage.(*xlStorage).bitrotVerify(pathJoin(path, volName, fileName), size+1, algo, nil, shardSize); err == nil { t.Fatal("expected to fail bitrot check") } } diff --git a/pkg/ioutil/ioutil.go b/pkg/ioutil/ioutil.go index 0ceb7295b..31771ebbc 100644 --- a/pkg/ioutil/ioutil.go +++ b/pkg/ioutil/ioutil.go @@ -20,8 +20,10 @@ package ioutil import ( "bytes" + "context" "io" "os" + "time" "github.com/minio/minio/pkg/disk" ) @@ -64,6 +66,56 @@ func WriteOnClose(w io.Writer) *WriteOnCloser { return &WriteOnCloser{w, false} } +type ioret struct { + n int + err error +} + +// DeadlineWriter deadline writer with context +type DeadlineWriter struct { + io.WriteCloser + timeout time.Duration + err error +} + +// NewDeadlineWriter wraps a writer to make it respect given deadline +// value per Write(). If there is a blocking write, the returned Writer +// will return whenever the timer hits (the return values are n=0 +// and err=context.Canceled.) +func NewDeadlineWriter(w io.WriteCloser, timeout time.Duration) io.WriteCloser { + return &DeadlineWriter{WriteCloser: w, timeout: timeout} +} + +func (w *DeadlineWriter) Write(buf []byte) (int, error) { + if w.err != nil { + return 0, w.err + } + + c := make(chan ioret, 1) + t := time.NewTimer(w.timeout) + defer t.Stop() + + go func() { + n, err := w.WriteCloser.Write(buf) + c <- ioret{n, err} + close(c) + }() + + select { + case r := <-c: + w.err = r.err + return r.n, r.err + case <-t.C: + w.err = context.Canceled + return 0, context.Canceled + } +} + +// Close closer interface to close the underlying closer +func (w *DeadlineWriter) Close() error { + return w.WriteCloser.Close() +} + // LimitWriter implements io.WriteCloser. // // This is implemented such that we want to restrict diff --git a/pkg/ioutil/ioutil_test.go b/pkg/ioutil/ioutil_test.go index c2aaee7cb..aad6066ef 100644 --- a/pkg/ioutil/ioutil_test.go +++ b/pkg/ioutil/ioutil_test.go @@ -18,12 +18,49 @@ package ioutil import ( "bytes" + "context" "io" goioutil "io/ioutil" "os" "testing" + "time" ) +type sleepWriter struct { + timeout time.Duration +} + +func (w *sleepWriter) Write(p []byte) (n int, err error) { + time.Sleep(w.timeout) + return len(p), nil +} + +func (w *sleepWriter) Close() error { + return nil +} + +func TestDeadlineWriter(t *testing.T) { + w := NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 450*time.Millisecond) + _, err := w.Write([]byte("1")) + w.Close() + if err != context.Canceled { + t.Error("DeadlineWriter shouldn't be successful - should return context.Canceled") + } + _, err = w.Write([]byte("1")) + if err != context.Canceled { + t.Error("DeadlineWriter shouldn't be successful - should return context.Canceled") + } + w = NewDeadlineWriter(&sleepWriter{timeout: 500 * time.Millisecond}, 600*time.Millisecond) + n, err := w.Write([]byte("abcd")) + w.Close() + if err != nil { + t.Errorf("DeadlineWriter should succeed but failed with %s", err) + } + if n != 4 { + t.Errorf("DeadlineWriter should succeed but should have only written 0 bytes, but returned %d instead", n) + } +} + func TestCloseOnWriter(t *testing.T) { writer := WriteOnClose(goioutil.Discard) if writer.HasWritten() {