introduce deadlines on READ operations (#17724)

This commit is contained in:
Harshavardhana 2023-07-27 07:33:05 -07:00 committed by GitHub
parent bf3901342c
commit 47dcfcbdd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 16 deletions

View File

@ -538,7 +538,15 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve
}
defer done(&err)
return p.storage.ReadVersion(ctx, volume, path, versionID, readData)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
rerr := w.Run(func() error {
fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, readData)
return err
})
if rerr != nil {
return fi, rerr
}
return fi, err
}
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
@ -548,7 +556,15 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path
}
defer done(&err)
return p.storage.ReadAll(ctx, volume, path)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
rerr := w.Run(func() error {
buf, err = p.storage.ReadAll(ctx, volume, path)
return err
})
if rerr != nil {
return buf, rerr
}
return buf, err
}
func (p *xlStorageDiskIDCheck) ReadXL(ctx context.Context, volume string, path string, readData bool) (rf RawFileInfo, err error) {
@ -558,7 +574,15 @@ func (p *xlStorageDiskIDCheck) ReadXL(ctx context.Context, volume string, path s
}
defer done(&err)
return p.storage.ReadXL(ctx, volume, path, readData)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
rerr := w.Run(func() error {
rf, err = p.storage.ReadXL(ctx, volume, path, readData)
return err
})
if rerr != nil {
return rf, rerr
}
return rf, err
}
func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) {
@ -571,11 +595,11 @@ func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path st
return p.storage.StatInfoFile(ctx, volume, path, glob)
}
// ReadMultiple will read multiple files and send each back as response.
// ReadMultiple will read multiple files and send each files as response.
// Files are read and returned in the given order.
// The resp channel is closed before the call returns.
// Only a canceled context will return an error.
func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error {
func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadMultiple, req.Bucket, req.Prefix)
if err != nil {
close(resp)
@ -588,14 +612,15 @@ func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipl
// CleanAbandonedData will read metadata of the object on disk
// and delete any data directories and inline data that isn't referenced in metadata.
func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume string, path string) error {
func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume string, path string) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteAbandonedParts, volume, path)
if err != nil {
return err
}
defer done(&err)
return p.storage.CleanAbandonedData(ctx, volume, path)
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
return w.Run(func() error { return p.storage.CleanAbandonedData(ctx, volume, path) })
}
func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string, err string) madmin.TraceInfo {

View File

@ -1049,7 +1049,8 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
errs[i] = ctx.Err()
continue
}
if err := s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...); err != nil {
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
if err := w.Run(func() error { return s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...) }); err != nil {
errs[i] = err
}
diskHealthCheckOK(ctx, errs[i])
@ -2660,15 +2661,16 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
}
var data []byte
var mt time.Time
var err error
fullPath := pathJoin(volumeDir, req.Prefix, f)
if req.MetadataOnly {
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
} else {
data, mt, err = s.readAllData(ctx, volumeDir, fullPath)
}
if err != nil {
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
if err := w.Run(func() (err error) {
if req.MetadataOnly {
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)
} else {
data, mt, err = s.readAllData(ctx, volumeDir, fullPath)
}
return err
}); err != nil {
if !IsErr(err, errFileNotFound, errVolumeNotFound) {
r.Exists = true
r.Error = err.Error()