fix: add deadlines for all synchronous REST callers (#19741)

add deadlines that can be dynamically changed via
the drive max timeout values.

Bonus: optimize "file not found" case and hung drives/network - circuit break the check and return right
away instead of waiting.
This commit is contained in:
Harshavardhana
2024-05-15 09:52:29 -07:00
committed by GitHub
parent c05ca63158
commit d3db7d31a3
6 changed files with 56 additions and 17 deletions

View File

@@ -899,6 +899,20 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
if success {
validResp++
}
if totalResp >= minDisks && opts.FastGetObjInfo {
rw.Lock()
ok := countErrs(errs, errFileNotFound) >= minDisks || countErrs(errs, errFileVersionNotFound) >= minDisks
rw.Unlock()
if ok {
err = errFileNotFound
if opts.VersionID != "" {
err = errFileVersionNotFound
}
break
}
}
if totalResp < er.setDriveCount {
if !opts.FastGetObjInfo {
continue

View File

@@ -391,6 +391,9 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, origvolume, vol
}
func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err := storageWriteMetadataRPC.Call(ctx, client.gridConn, &MetadataHandlerParams{
DiskID: *client.diskID.Load(),
OrigVolume: origvolume,
@@ -402,6 +405,9 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume,
}
func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err := storageUpdateMetadataRPC.Call(ctx, client.gridConn, &MetadataHandlerParams{
DiskID: *client.diskID.Load(),
Volume: volume,
@@ -413,6 +419,9 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat
}
func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err = storageDeleteVersionRPC.Call(ctx, client.gridConn, &DeleteVersionHandlerParams{
DiskID: *client.diskID.Load(),
Volume: volume,
@@ -426,6 +435,9 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
// WriteAll - write all data to a file.
func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, path string, b []byte) error {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
_, err := storageWriteAllRPC.Call(ctx, client.gridConn, &WriteAllHandlerParams{
DiskID: *client.diskID.Load(),
Volume: volume,
@@ -497,6 +509,9 @@ func readMsgpReaderPoolPut(r *msgp.Reader) {
}
func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
// Use websocket when not reading data.
if !opts.ReadData {
resp, err := storageReadVersionRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
@@ -537,6 +552,9 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, vo
// ReadXL - reads all contents of xl.meta of a file.
func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path string, readData bool) (rf RawFileInfo, err error) {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
// Use websocket when not reading data.
if !readData {
resp, err := storageReadXLRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
@@ -570,6 +588,9 @@ func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path
// ReadAll - reads all contents of a file.
func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, path string) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
gridBytes, err := storageReadAllRPC.Call(ctx, client.gridConn, &ReadAllHandlerParams{
DiskID: *client.diskID.Load(),
Volume: volume,