mirror of https://github.com/minio/minio.git
check for context canceled after competing for locks (#13239)
once we have competed for locks, verify if the context is still valid - this is to ensure that we do not start readdir() or read() calls on the drives on canceled connections.
This commit is contained in:
parent
66fcd02aa2
commit
5ed781a330
|
@ -1787,12 +1787,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
|
|||
|
||||
// Only return ErrClientDisconnected if the provided context is actually canceled.
|
||||
// This way downstream context.Canceled will still report ErrOperationTimedOut
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
if ctx.Err() == context.Canceled {
|
||||
return ErrClientDisconnected
|
||||
}
|
||||
default:
|
||||
}
|
||||
|
||||
switch err {
|
||||
|
|
|
@ -351,10 +351,8 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return entries, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// If many failures, check the cache state.
|
||||
|
@ -424,10 +422,8 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
|||
// We got a stream to start at.
|
||||
loadedPart := 0
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return entries, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if partN != loadedPart {
|
||||
|
|
|
@ -87,7 +87,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
// Fast exit track to check if we are listing an object with
|
||||
// a trailing slash, this will avoid to list the object content.
|
||||
if HasSuffix(opts.BaseDir, SlashSeparator) {
|
||||
metadata, err := s.readMetadata(pathJoin(volumeDir,
|
||||
metadata, err := s.readMetadata(ctx, pathJoin(volumeDir,
|
||||
opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix,
|
||||
xlStorageFormatFile))
|
||||
if err == nil {
|
||||
|
@ -175,7 +175,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
if HasSuffix(entry, xlStorageFormatFile) {
|
||||
var meta metaCacheEntry
|
||||
s.walkReadMu.Lock()
|
||||
meta.metadata, err = s.readMetadata(pathJoin(volumeDir, current, entry))
|
||||
meta.metadata, err = s.readMetadata(ctx, pathJoin(volumeDir, current, entry))
|
||||
s.walkReadMu.Unlock()
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
|
@ -250,7 +250,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
}
|
||||
|
||||
s.walkReadMu.Lock()
|
||||
meta.metadata, err = s.readMetadata(pathJoin(volumeDir, meta.name, xlStorageFormatFile))
|
||||
meta.metadata, err = s.readMetadata(ctx, pathJoin(volumeDir, meta.name, xlStorageFormatFile))
|
||||
s.walkReadMu.Unlock()
|
||||
switch {
|
||||
case err == nil:
|
||||
|
@ -300,9 +300,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
|||
|
||||
func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error {
|
||||
defer p.updateStorageMetrics(storageMetricWalkDir, opts.Bucket, opts.BaseDir)()
|
||||
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.storage.WalkDir(ctx, opts, wr)
|
||||
}
|
||||
|
||||
|
|
|
@ -159,10 +159,8 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker {
|
|||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return dataUsageCache{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -210,10 +208,8 @@ func (p *xlStorageDiskIDCheck) checkDiskStale() error {
|
|||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return DiskInfo{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
info, err = p.storage.DiskInfo(ctx)
|
||||
|
@ -235,10 +231,8 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context) (info DiskInfo, err
|
|||
func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...string) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricMakeVolBulk, volumes...)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -250,10 +244,8 @@ func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...strin
|
|||
func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricMakeVol, volume)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -265,10 +257,8 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err
|
|||
func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error) {
|
||||
defer p.updateStorageMetrics(storageMetricListVols, "/")()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -280,10 +270,8 @@ func (p *xlStorageDiskIDCheck) ListVols(ctx context.Context) ([]VolInfo, error)
|
|||
func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) {
|
||||
defer p.updateStorageMetrics(storageMetricStatVol, volume)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return VolInfo{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -295,10 +283,8 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol
|
|||
func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricDeleteVol, volume)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -310,10 +296,8 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for
|
|||
func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) {
|
||||
defer p.updateStorageMetrics(storageMetricListDir, volume, dirPath)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -326,10 +310,8 @@ func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath stri
|
|||
func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
|
||||
defer p.updateStorageMetrics(storageMetricReadFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return 0, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -342,10 +324,8 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path
|
|||
func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricAppendFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -358,10 +338,8 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa
|
|||
func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path string, size int64, reader io.Reader) error {
|
||||
defer p.updateStorageMetrics(storageMetricCreateFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -374,10 +352,8 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri
|
|||
func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
|
||||
defer p.updateStorageMetrics(storageMetricReadFileStream, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -390,10 +366,8 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
|
|||
func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error {
|
||||
defer p.updateStorageMetrics(storageMetricRenameFile, srcVolume, srcPath, dstVolume, dstPath)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -406,10 +380,8 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat
|
|||
func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error {
|
||||
defer p.updateStorageMetrics(storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -422,10 +394,8 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat
|
|||
func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricCheckParts, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -438,10 +408,8 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
|
|||
func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricDelete, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -464,13 +432,11 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string
|
|||
|
||||
errs = make([]error, len(versions))
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
for i := range errs {
|
||||
errs[i] = ctx.Err()
|
||||
}
|
||||
return errs
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -486,10 +452,8 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string
|
|||
func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error {
|
||||
defer p.updateStorageMetrics(storageMetricVerifyFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
|
@ -502,10 +466,8 @@ func (p *xlStorageDiskIDCheck) VerifyFile(ctx context.Context, volume, path stri
|
|||
func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricWriteAll, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -518,10 +480,8 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path
|
|||
func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricDeleteVersion, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -534,10 +494,8 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
|
|||
func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricUpdateMetadata, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -550,10 +508,8 @@ func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path
|
|||
func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||
defer p.updateStorageMetrics(storageMetricWriteMetadata, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -566,10 +522,8 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
|
|||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
|
||||
defer p.updateStorageMetrics(storageMetricReadVersion, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return fi, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -582,10 +536,8 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve
|
|||
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
||||
defer p.updateStorageMetrics(storageMetricReadAll, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
@ -598,10 +550,8 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path
|
|||
func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
defer p.updateStorageMetrics(storageStatInfoFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if contextCanceled(ctx) {
|
||||
return StatInfo{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
|
|
|
@ -397,7 +397,11 @@ func (s *xlStorage) Healing() *healingTracker {
|
|||
return &h
|
||||
}
|
||||
|
||||
func (s *xlStorage) readMetadata(itemPath string) ([]byte, error) {
|
||||
func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) {
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
if err := checkPathLength(itemPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -468,7 +472,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
|||
return sizeSummary{}, errSkipFile
|
||||
}
|
||||
|
||||
buf, err := s.readMetadata(item.Path)
|
||||
buf, err := s.readMetadata(ctx, item.Path)
|
||||
if err != nil {
|
||||
if intDataUpdateTracker.debug {
|
||||
console.Debugf(color.Green("scannerBucket:")+" object path missing: %v: %w\n", item.Path, err)
|
||||
|
@ -791,6 +795,10 @@ func (s *xlStorage) DeleteVol(ctx context.Context, volume string, forceDelete bo
|
|||
// ListDir - return all the entries at the given directory path.
|
||||
// If an entry is a directory it will be returned with a trailing SlashSeparator.
|
||||
func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// Verify if volume is valid and it exists.
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
|
@ -1090,7 +1098,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||
if readData {
|
||||
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
} else {
|
||||
buf, err = s.readMetadata(pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
buf, err = s.readMetadata(ctx, pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if osIsNotExist(err) {
|
||||
if err = Access(volumeDir); err != nil && osIsNotExist(err) {
|
||||
|
|
|
@ -1834,7 +1834,7 @@ func TestXLStorageReadMetadata(t *testing.T) {
|
|||
}
|
||||
|
||||
disk.MakeVol(context.Background(), volume)
|
||||
if _, err := disk.readMetadata(pathJoin(tmpDir, volume, object)); err != errFileNameTooLong {
|
||||
if _, err := disk.readMetadata(context.Background(), pathJoin(tmpDir, volume, object)); err != errFileNameTooLong {
|
||||
t.Fatalf("Unexpected error from readMetadata - expect %v: got %v", errFileNameTooLong, err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue