diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index b18d71a10..3d12617dc 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -24,6 +24,7 @@ import ( "fmt" "hash" "io" + "sync" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -44,7 +45,7 @@ type streamingBitrotWriter struct { closeWithErr func(err error) error h hash.Hash shardSize int64 - canClose chan struct{} // Needed to avoid race explained in Close() call. + canClose *sync.WaitGroup } func (b *streamingBitrotWriter) Write(p []byte) (int, error) { @@ -71,7 +72,7 @@ func (b *streamingBitrotWriter) Close() error { // Now pipe.Close() can return before the data is read on the other end of the pipe and written to the disk // Hence an immediate Read() on the file can return incorrect data. if b.canClose != nil { - <-b.canClose + b.canClose.Wait() } return err } @@ -86,8 +87,8 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i r, w := io.Pipe() h := algo.New() - bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: make(chan struct{})} - + bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: &sync.WaitGroup{}} + bw.canClose.Add(1) go func() { totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1) if length != -1 { @@ -95,7 +96,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i totalFileSize = bitrotSumsTotalSize + length } r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r)) - close(bw.canClose) + bw.canClose.Done() }() return bw } diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index c3dfd1d48..995641fd1 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -41,6 +41,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/fips" + xioutil "github.com/minio/minio/pkg/ioutil" "github.com/minio/minio/pkg/kms" "github.com/minio/sio" ) @@ -950,7 +951,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang return nil, numHits, nErr } filePath := pathJoin(cacheObjPath, cacheFile) - pr, pw := io.Pipe() + pr, pw := xioutil.WaitPipe() go func() { err := c.bitrotReadFromCache(ctx, filePath, off, length, pw) if err != nil { @@ -960,7 +961,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang }() // Cleanup function to cause the go routine above to exit, in // case of incomplete read. - pipeCloser := func() { pr.Close() } + pipeCloser := func() { pr.CloseWithError(nil) } gr, gerr := fn(pr, h, opts.CheckPrecondFn, pipeCloser) if gerr != nil { diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 578e52588..e8767eec2 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -357,22 +357,27 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string } // Initialize pipe. - pipeReader, pipeWriter := io.Pipe() - teeReader := io.TeeReader(bkReader, pipeWriter) + pr, pw := io.Pipe() + var wg sync.WaitGroup + teeReader := io.TeeReader(bkReader, pw) userDefined := getMetadata(bkReader.ObjInfo) + wg.Add(1) go func() { _, putErr := dcache.Put(ctx, bucket, object, - io.LimitReader(pipeReader, bkReader.ObjInfo.Size), + io.LimitReader(pr, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, rs, ObjectOptions{ UserDefined: userDefined, }, false) - // close the write end of the pipe, so the error gets - // propagated to getObjReader - pipeWriter.CloseWithError(putErr) + // close the read end of the pipe, so the error gets + // propagated to teeReader + pr.CloseWithError(putErr) + wg.Done() }() - cleanupBackend := func() { bkReader.Close() } - cleanupPipe := func() { pipeWriter.Close() } - return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts, cleanupBackend, cleanupPipe) + cleanupBackend := func() { + pw.CloseWithError(bkReader.Close()) + wg.Wait() + } + return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts, cleanupBackend) } // Returns ObjectInfo from cache if available. diff --git a/cmd/erasure-lowlevel-heal.go b/cmd/erasure-lowlevel-heal.go index 30deef702..b586e6176 100644 --- a/cmd/erasure-lowlevel-heal.go +++ b/cmd/erasure-lowlevel-heal.go @@ -20,20 +20,17 @@ package cmd import ( "context" "io" - "sync" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" + xioutil "github.com/minio/minio/pkg/ioutil" ) // Heal heals the shard files on non-nil writers. Note that the quorum passed is 1 // as healing should continue even if it has been successful healing only one shard file. func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error { - r, w := io.Pipe() - var wg sync.WaitGroup - wg.Add(1) + r, w := xioutil.WaitPipe() go func() { - defer wg.Done() _, err := e.Decode(ctx, w, readers, 0, size, size, nil) w.CloseWithError(err) }() @@ -58,6 +55,5 @@ func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.W err = errLessData } r.CloseWithError(err) - wg.Wait() return err } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a0932fb0f..59c5e3b01 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -36,6 +36,7 @@ import ( "github.com/minio/minio/pkg/bucket/replication" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/hash" + xioutil "github.com/minio/minio/pkg/ioutil" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -197,14 +198,17 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri return nil, err } unlockOnDefer = false - pr, pw := io.Pipe() + + pr, pw := xioutil.WaitPipe() go func() { pw.CloseWithError(er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks)) }() // Cleanup function to cause the go routine above to exit, in // case of incomplete read. - pipeCloser := func() { pr.Close() } + pipeCloser := func() { + pr.CloseWithError(nil) + } return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker) } @@ -1359,17 +1363,18 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st return err } - pr, pw := io.Pipe() + pr, pw := xioutil.WaitPipe() go func() { err := er.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks) pw.CloseWithError(err) }() - if err = tgtClient.Put(ctx, destObj, pr, fi.Size); err != nil { - pr.Close() + + err = tgtClient.Put(ctx, destObj, pr, fi.Size) + pr.CloseWithError(err) + if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err)) return err } - pr.Close() fi.TransitionStatus = lifecycle.TransitionComplete fi.TransitionedObjName = destObj fi.TransitionTier = opts.Transition.Tier diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index c02e73d91..120acbe6b 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -827,13 +827,15 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { readers := make([]*metacacheReader, askDisks) for i := range disks { r, w := io.Pipe() - d := disks[i] + // Make sure we close the pipe so blocked writes doesn't stay around. + defer r.CloseWithError(context.Canceled) + readers[i], err = newMetacacheReader(r) if err != nil { return err } - // Make sure we close the pipe so blocked writes doesn't stay around. - defer r.CloseWithError(context.Canceled) + d := disks[i] + // Send request to each disk. go func() { werr := d.WalkDir(ctx, WalkDirOptions{ diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 2545863f5..dbbaa6b9b 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -909,9 +909,9 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin // client closed the stream prematurely. func newS2CompressReader(r io.Reader, on int64) io.ReadCloser { pr, pw := io.Pipe() - comp := s2.NewWriter(pw) // Copy input to compressor go func() { + comp := s2.NewWriter(pw) cn, err := io.Copy(comp, r) if err != nil { comp.Close() @@ -926,12 +926,7 @@ func newS2CompressReader(r io.Reader, on int64) io.ReadCloser { return } // Close the stream. - if err = comp.Close(); err != nil { - pw.CloseWithError(err) - return - } - // Everything ok, do regular close. - pw.Close() + pw.CloseWithError(comp.Close()) }() return pr } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 9700c6e70..d2024b968 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -208,11 +208,10 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC }() respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1) defer xhttp.DrainBody(respBody) + pr.CloseWithError(err) if err != nil { - pr.Close() return cache, err } - pr.Close() var newCache dataUsageCache pr, pw = io.Pipe() diff --git a/pkg/ioutil/wait_pipe.go b/pkg/ioutil/wait_pipe.go new file mode 100644 index 000000000..236121476 --- /dev/null +++ b/pkg/ioutil/wait_pipe.go @@ -0,0 +1,69 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Forked from golang.org/pkg/os.ReadFile with NOATIME support. +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the https://golang.org/LICENSE file. + +package ioutil + +import ( + "io" + "sync" +) + +// PipeWriter is similar to io.PipeWriter with wait group +type PipeWriter struct { + *io.PipeWriter + done func() +} + +// CloseWithError close with supplied error the writer end. +func (w *PipeWriter) CloseWithError(err error) error { + err = w.PipeWriter.CloseWithError(err) + w.done() + return err +} + +// PipeReader is similar to io.PipeReader with wait group +type PipeReader struct { + *io.PipeReader + wait func() +} + +// CloseWithError close with supplied error the reader end +func (r *PipeReader) CloseWithError(err error) error { + err = r.PipeReader.CloseWithError(err) + r.wait() + return err +} + +// WaitPipe implements wait-group backend io.Pipe to provide +// synchronization between read() end with write() end. +func WaitPipe() (*PipeReader, *PipeWriter) { + r, w := io.Pipe() + var wg sync.WaitGroup + wg.Add(1) + return &PipeReader{ + PipeReader: r, + wait: wg.Wait, + }, &PipeWriter{ + PipeWriter: w, + done: wg.Done, + } +}