add missing wait groups for certain io.Pipe() usage (#12264)

wait groups are necessary with io.Pipes() to avoid
races when a blocking function may not be expected
and a Write() -> Close() before Read() races on each
other. We should avoid such situations..

Co-authored-by: Klaus Post <klauspost@gmail.com>
This commit is contained in:
Harshavardhana 2021-05-11 09:18:37 -07:00 committed by GitHub
parent 0b34dfb479
commit e84f533c6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 113 additions and 40 deletions

View File

@ -24,6 +24,7 @@ import (
"fmt" "fmt"
"hash" "hash"
"io" "io"
"sync"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -44,7 +45,7 @@ type streamingBitrotWriter struct {
closeWithErr func(err error) error closeWithErr func(err error) error
h hash.Hash h hash.Hash
shardSize int64 shardSize int64
canClose chan struct{} // Needed to avoid race explained in Close() call. canClose *sync.WaitGroup
} }
func (b *streamingBitrotWriter) Write(p []byte) (int, error) { func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
@ -71,7 +72,7 @@ func (b *streamingBitrotWriter) Close() error {
// Now pipe.Close() can return before the data is read on the other end of the pipe and written to the disk // Now pipe.Close() can return before the data is read on the other end of the pipe and written to the disk
// Hence an immediate Read() on the file can return incorrect data. // Hence an immediate Read() on the file can return incorrect data.
if b.canClose != nil { if b.canClose != nil {
<-b.canClose b.canClose.Wait()
} }
return err return err
} }
@ -86,8 +87,8 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i
r, w := io.Pipe() r, w := io.Pipe()
h := algo.New() h := algo.New()
bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: make(chan struct{})} bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: &sync.WaitGroup{}}
bw.canClose.Add(1)
go func() { go func() {
totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1) totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1)
if length != -1 { if length != -1 {
@ -95,7 +96,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i
totalFileSize = bitrotSumsTotalSize + length totalFileSize = bitrotSumsTotalSize + length
} }
r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r)) r.CloseWithError(disk.CreateFile(context.TODO(), volume, filePath, totalFileSize, r))
close(bw.canClose) bw.canClose.Done()
}() }()
return bw return bw
} }

View File

@ -41,6 +41,7 @@ import (
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/fips" "github.com/minio/minio/pkg/fips"
xioutil "github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/kms" "github.com/minio/minio/pkg/kms"
"github.com/minio/sio" "github.com/minio/sio"
) )
@ -950,7 +951,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
return nil, numHits, nErr return nil, numHits, nErr
} }
filePath := pathJoin(cacheObjPath, cacheFile) filePath := pathJoin(cacheObjPath, cacheFile)
pr, pw := io.Pipe() pr, pw := xioutil.WaitPipe()
go func() { go func() {
err := c.bitrotReadFromCache(ctx, filePath, off, length, pw) err := c.bitrotReadFromCache(ctx, filePath, off, length, pw)
if err != nil { if err != nil {
@ -960,7 +961,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
}() }()
// Cleanup function to cause the go routine above to exit, in // Cleanup function to cause the go routine above to exit, in
// case of incomplete read. // case of incomplete read.
pipeCloser := func() { pr.Close() } pipeCloser := func() { pr.CloseWithError(nil) }
gr, gerr := fn(pr, h, opts.CheckPrecondFn, pipeCloser) gr, gerr := fn(pr, h, opts.CheckPrecondFn, pipeCloser)
if gerr != nil { if gerr != nil {

View File

@ -357,22 +357,27 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
} }
// Initialize pipe. // Initialize pipe.
pipeReader, pipeWriter := io.Pipe() pr, pw := io.Pipe()
teeReader := io.TeeReader(bkReader, pipeWriter) var wg sync.WaitGroup
teeReader := io.TeeReader(bkReader, pw)
userDefined := getMetadata(bkReader.ObjInfo) userDefined := getMetadata(bkReader.ObjInfo)
wg.Add(1)
go func() { go func() {
_, putErr := dcache.Put(ctx, bucket, object, _, putErr := dcache.Put(ctx, bucket, object,
io.LimitReader(pipeReader, bkReader.ObjInfo.Size), io.LimitReader(pr, bkReader.ObjInfo.Size),
bkReader.ObjInfo.Size, rs, ObjectOptions{ bkReader.ObjInfo.Size, rs, ObjectOptions{
UserDefined: userDefined, UserDefined: userDefined,
}, false) }, false)
// close the write end of the pipe, so the error gets // close the read end of the pipe, so the error gets
// propagated to getObjReader // propagated to teeReader
pipeWriter.CloseWithError(putErr) pr.CloseWithError(putErr)
wg.Done()
}() }()
cleanupBackend := func() { bkReader.Close() } cleanupBackend := func() {
cleanupPipe := func() { pipeWriter.Close() } pw.CloseWithError(bkReader.Close())
return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts, cleanupBackend, cleanupPipe) wg.Wait()
}
return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts, cleanupBackend)
} }
// Returns ObjectInfo from cache if available. // Returns ObjectInfo from cache if available.

View File

@ -20,20 +20,17 @@ package cmd
import ( import (
"context" "context"
"io" "io"
"sync"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/bpool"
xioutil "github.com/minio/minio/pkg/ioutil"
) )
// Heal heals the shard files on non-nil writers. Note that the quorum passed is 1 // Heal heals the shard files on non-nil writers. Note that the quorum passed is 1
// as healing should continue even if it has been successful healing only one shard file. // as healing should continue even if it has been successful healing only one shard file.
func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error { func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error {
r, w := io.Pipe() r, w := xioutil.WaitPipe()
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
defer wg.Done()
_, err := e.Decode(ctx, w, readers, 0, size, size, nil) _, err := e.Decode(ctx, w, readers, 0, size, size, nil)
w.CloseWithError(err) w.CloseWithError(err)
}() }()
@ -58,6 +55,5 @@ func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.W
err = errLessData err = errLessData
} }
r.CloseWithError(err) r.CloseWithError(err)
wg.Wait()
return err return err
} }

View File

@ -36,6 +36,7 @@ import (
"github.com/minio/minio/pkg/bucket/replication" "github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/hash"
xioutil "github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
) )
@ -197,14 +198,17 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return nil, err return nil, err
} }
unlockOnDefer = false unlockOnDefer = false
pr, pw := io.Pipe()
pr, pw := xioutil.WaitPipe()
go func() { go func() {
pw.CloseWithError(er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks)) pw.CloseWithError(er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks))
}() }()
// Cleanup function to cause the go routine above to exit, in // Cleanup function to cause the go routine above to exit, in
// case of incomplete read. // case of incomplete read.
pipeCloser := func() { pr.Close() } pipeCloser := func() {
pr.CloseWithError(nil)
}
return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker) return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker)
} }
@ -1359,17 +1363,18 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
return err return err
} }
pr, pw := io.Pipe() pr, pw := xioutil.WaitPipe()
go func() { go func() {
err := er.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks) err := er.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks)
pw.CloseWithError(err) pw.CloseWithError(err)
}() }()
if err = tgtClient.Put(ctx, destObj, pr, fi.Size); err != nil {
pr.Close() err = tgtClient.Put(ctx, destObj, pr, fi.Size)
pr.CloseWithError(err)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err)) logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err))
return err return err
} }
pr.Close()
fi.TransitionStatus = lifecycle.TransitionComplete fi.TransitionStatus = lifecycle.TransitionComplete
fi.TransitionedObjName = destObj fi.TransitionedObjName = destObj
fi.TransitionTier = opts.Transition.Tier fi.TransitionTier = opts.Transition.Tier

View File

@ -827,13 +827,15 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
readers := make([]*metacacheReader, askDisks) readers := make([]*metacacheReader, askDisks)
for i := range disks { for i := range disks {
r, w := io.Pipe() r, w := io.Pipe()
d := disks[i] // Make sure we close the pipe so blocked writes doesn't stay around.
defer r.CloseWithError(context.Canceled)
readers[i], err = newMetacacheReader(r) readers[i], err = newMetacacheReader(r)
if err != nil { if err != nil {
return err return err
} }
// Make sure we close the pipe so blocked writes doesn't stay around. d := disks[i]
defer r.CloseWithError(context.Canceled)
// Send request to each disk. // Send request to each disk.
go func() { go func() {
werr := d.WalkDir(ctx, WalkDirOptions{ werr := d.WalkDir(ctx, WalkDirOptions{

View File

@ -909,9 +909,9 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin
// client closed the stream prematurely. // client closed the stream prematurely.
func newS2CompressReader(r io.Reader, on int64) io.ReadCloser { func newS2CompressReader(r io.Reader, on int64) io.ReadCloser {
pr, pw := io.Pipe() pr, pw := io.Pipe()
comp := s2.NewWriter(pw)
// Copy input to compressor // Copy input to compressor
go func() { go func() {
comp := s2.NewWriter(pw)
cn, err := io.Copy(comp, r) cn, err := io.Copy(comp, r)
if err != nil { if err != nil {
comp.Close() comp.Close()
@ -926,12 +926,7 @@ func newS2CompressReader(r io.Reader, on int64) io.ReadCloser {
return return
} }
// Close the stream. // Close the stream.
if err = comp.Close(); err != nil { pw.CloseWithError(comp.Close())
pw.CloseWithError(err)
return
}
// Everything ok, do regular close.
pw.Close()
}() }()
return pr return pr
} }

View File

@ -208,11 +208,10 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
}() }()
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1) respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
defer xhttp.DrainBody(respBody) defer xhttp.DrainBody(respBody)
pr.CloseWithError(err)
if err != nil { if err != nil {
pr.Close()
return cache, err return cache, err
} }
pr.Close()
var newCache dataUsageCache var newCache dataUsageCache
pr, pw = io.Pipe() pr, pw = io.Pipe()

69
pkg/ioutil/wait_pipe.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// Forked from golang.org/pkg/os.ReadFile with NOATIME support.
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the https://golang.org/LICENSE file.
package ioutil
import (
"io"
"sync"
)
// PipeWriter is similar to io.PipeWriter with wait group
type PipeWriter struct {
*io.PipeWriter
done func()
}
// CloseWithError close with supplied error the writer end.
func (w *PipeWriter) CloseWithError(err error) error {
err = w.PipeWriter.CloseWithError(err)
w.done()
return err
}
// PipeReader is similar to io.PipeReader with wait group
type PipeReader struct {
*io.PipeReader
wait func()
}
// CloseWithError close with supplied error the reader end
func (r *PipeReader) CloseWithError(err error) error {
err = r.PipeReader.CloseWithError(err)
r.wait()
return err
}
// WaitPipe implements wait-group backend io.Pipe to provide
// synchronization between read() end with write() end.
func WaitPipe() (*PipeReader, *PipeWriter) {
r, w := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
return &PipeReader{
PipeReader: r,
wait: wg.Wait,
}, &PipeWriter{
PipeWriter: w,
done: wg.Done,
}
}