From 4a6c97463f2b8645ddc3ef12e1f7e71c53dab30b Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 24 Jan 2024 10:08:31 -0800 Subject: [PATCH] Fix all racy use of NewDeadlineWorker (#18861) AlmosAll uses of NewDeadlineWorker, which relied on secondary values, were used in a racy fashion, which could lead to inconsistent errors/data being returned. It also propagates the deadline downstream. Rewrite all these to use a generic WithDeadline caller that can return an error alongside a value. Remove the stateful aspect of DeadlineWorker - it was racy if used - but it wasn't AFAICT. Fixes races like: ``` WARNING: DATA RACE Read at 0x00c130b29d10 by goroutine 470237: github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).ReadVersion() github.com/minio/minio/cmd/xl-storage-disk-id-check.go:702 +0x611 github.com/minio/minio/cmd.readFileInfo() github.com/minio/minio/cmd/erasure-metadata-utils.go:160 +0x122 github.com/minio/minio/cmd.erasureObjects.getObjectFileInfo.func1.1() github.com/minio/minio/cmd/erasure-object.go:809 +0x27a github.com/minio/minio/cmd.erasureObjects.getObjectFileInfo.func1.2() github.com/minio/minio/cmd/erasure-object.go:828 +0x61 Previous write at 0x00c130b29d10 by goroutine 470298: github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).ReadVersion.func1() github.com/minio/minio/cmd/xl-storage-disk-id-check.go:698 +0x244 github.com/minio/minio/internal/ioutil.(*DeadlineWorker).Run.func1() github.com/minio/minio/internal/ioutil/ioutil.go:141 +0x33 WARNING: DATA RACE Write at 0x00c0ba6e6c00 by goroutine 94507: github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).StatVol.func1() github.com/minio/minio/cmd/xl-storage-disk-id-check.go:419 +0x104 github.com/minio/minio/internal/ioutil.(*DeadlineWorker).Run.func1() github.com/minio/minio/internal/ioutil/ioutil.go:141 +0x33 Previous read at 0x00c0ba6e6c00 by goroutine 94463: github.com/minio/minio/cmd.(*xlStorageDiskIDCheck).StatVol() github.com/minio/minio/cmd/xl-storage-disk-id-check.go:422 +0x47e github.com/minio/minio/cmd.getBucketInfoLocal.func1() github.com/minio/minio/cmd/peer-s3-server.go:275 +0x122 github.com/minio/pkg/v2/sync/errgroup.(*Group).Go.func1() ``` Probably back from #17701 --- cmd/xl-storage-disk-id-check.go | 71 ++++++++------------------------- cmd/xl-storage.go | 9 ++--- internal/ioutil/ioutil.go | 56 ++++++++++++++++---------- 3 files changed, 54 insertions(+), 82 deletions(-) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 802a3c3cb..a42bbdacb 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -417,13 +417,9 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol } defer done(&err) - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - err = w.Run(func() error { - var ierr error - vol, ierr = p.storage.StatVol(ctx, volume) - return ierr + return xioutil.WithDeadline[VolInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result VolInfo, err error) { + return p.storage.StatVol(ctx, volume) }) - return vol, err } func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) { @@ -455,13 +451,9 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path } defer done(&err) - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - err = w.Run(func() error { - n, err = p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) - return err + return xioutil.WithDeadline[int64](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result int64, err error) { + return p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) }) - - return n, err } // Legacy API - does not have any deadlines @@ -495,19 +487,9 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path } defer done(&err) - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - - var rc io.ReadCloser - err = w.Run(func() error { - var ierr error - rc, ierr = p.storage.ReadFileStream(ctx, volume, path, offset, length) - return ierr + return xioutil.WithDeadline[io.ReadCloser](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result io.ReadCloser, err error) { + return p.storage.ReadFileStream(ctx, volume, path, offset, length) }) - if err != nil { - return nil, err - } - - return rc, nil } func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { @@ -533,13 +515,9 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat done(&err) }() - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - err = w.Run(func() error { - var ierr error - sign, ierr = p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts) - return ierr + return xioutil.WithDeadline[uint64](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result uint64, err error) { + return p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts) }) - return sign, err } func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) { @@ -697,15 +675,9 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve } defer done(&err) - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - rerr := w.Run(func() error { - fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, opts) - return err + return xioutil.WithDeadline[FileInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result FileInfo, err error) { + return p.storage.ReadVersion(ctx, volume, path, versionID, opts) }) - if rerr != nil { - return fi, rerr - } - return fi, err } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { @@ -715,15 +687,9 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path } defer done(&err) - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - rerr := w.Run(func() error { - buf, err = p.storage.ReadAll(ctx, volume, path) - return err + return xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result []byte, err error) { + return p.storage.ReadAll(ctx, volume, path) }) - if rerr != nil { - return buf, rerr - } - return buf, err } func (p *xlStorageDiskIDCheck) ReadXL(ctx context.Context, volume string, path string, readData bool) (rf RawFileInfo, err error) { @@ -733,15 +699,9 @@ func (p *xlStorageDiskIDCheck) ReadXL(ctx context.Context, volume string, path s } defer done(&err) - w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()) - rerr := w.Run(func() error { - rf, err = p.storage.ReadXL(ctx, volume, path, readData) - return err + return xioutil.WithDeadline[RawFileInfo](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result RawFileInfo, err error) { + return p.storage.ReadXL(ctx, volume, path, readData) }) - if rerr != nil { - return rf, rerr - } - return rf, err } func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) { @@ -791,6 +751,7 @@ func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, Duration: duration, Path: path, Error: err, + Custom: custom, } } @@ -837,7 +798,7 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st p.apiLatencies[s].add(duration) if trace { - custom := make(map[string]string) + custom := make(map[string]string, 2) paths = append([]string{p.String()}, paths...) var errStr string if err != nil { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index a89afd0c9..3c89e9c83 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -469,13 +469,10 @@ func (s *xlStorage) readMetadataWithDMTime(ctx context.Context, itemPath string) } func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) { - var buf []byte - err := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()).Run(func() error { - var rerr error - buf, _, rerr = s.readMetadataWithDMTime(ctx, itemPath) - return rerr + return xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) { + buf, _, err := s.readMetadataWithDMTime(ctx, itemPath) + return buf, err }) - return buf, err } func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, weSleep func() bool) (dataUsageCache, error) { diff --git a/internal/ioutil/ioutil.go b/internal/ioutil/ioutil.go index c2eeb5ab8..3948158fb 100644 --- a/internal/ioutil/ioutil.go +++ b/internal/ioutil/ioutil.go @@ -99,8 +99,8 @@ func WriteOnClose(w io.Writer) *WriteOnCloser { return &WriteOnCloser{w, false} } -type ioret struct { - n int +type ioret[V any] struct { + val V err error } @@ -111,35 +111,51 @@ type DeadlineWriter struct { err error } +// WithDeadline will execute a function with a deadline and return a value of a given type. +// If the deadline/context passes before the function finishes executing, +// the zero value and the context error is returned. +func WithDeadline[V any](ctx context.Context, timeout time.Duration, work func(ctx context.Context) (result V, err error)) (result V, err error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + c := make(chan ioret[V], 1) + go func() { + v, err := work(ctx) + c <- ioret[V]{val: v, err: err} + }() + + select { + case v := <-c: + return v.val, v.err + case <-ctx.Done(): + var zero V + return zero, ctx.Err() + } +} + // DeadlineWorker implements the deadline/timeout resiliency pattern. type DeadlineWorker struct { timeout time.Duration - err error } // NewDeadlineWorker constructs a new DeadlineWorker with the given timeout. +// To return values, use the WithDeadline helper instead. func NewDeadlineWorker(timeout time.Duration) *DeadlineWorker { - return &DeadlineWorker{ + dw := &DeadlineWorker{ timeout: timeout, } + return dw } // Run runs the given function, passing it a stopper channel. If the deadline passes before -// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper -// channel so that the work function can attempt to exit gracefully. It does not (and cannot) -// simply kill the running function, so if it doesn't respect the stopper channel then it may -// keep running after the deadline passes. If the function finishes before the deadline, then -// the return value of the function is returned from Run. +// the function finishes executing, Run returns context.DeadlineExceeded to the caller. +// channel so that the work function can attempt to exit gracefully. +// Multiple calls to Run will run independently of each other. func (d *DeadlineWorker) Run(work func() error) error { - if d.err != nil { - return d.err - } - - c := make(chan ioret, 1) + c := make(chan ioret[struct{}], 1) t := time.NewTimer(d.timeout) go func() { - c <- ioret{0, work()} - close(c) + c <- ioret[struct{}]{val: struct{}{}, err: work()} }() select { @@ -147,10 +163,8 @@ func (d *DeadlineWorker) Run(work func() error) error { if !t.Stop() { <-t.C } - d.err = r.err return r.err case <-t.C: - d.err = context.DeadlineExceeded return context.DeadlineExceeded } } @@ -168,11 +182,11 @@ func (w *DeadlineWriter) Write(buf []byte) (int, error) { return 0, w.err } - c := make(chan ioret, 1) + c := make(chan ioret[int], 1) t := time.NewTimer(w.timeout) go func() { n, err := w.WriteCloser.Write(buf) - c <- ioret{n, err} + c <- ioret[int]{val: n, err: err} close(c) }() @@ -182,7 +196,7 @@ func (w *DeadlineWriter) Write(buf []byte) (int, error) { <-t.C } w.err = r.err - return r.n, r.err + return r.val, r.err case <-t.C: w.WriteCloser.Close() w.err = context.DeadlineExceeded