fix: do not log concurrently when multiple disks return errors (#15044)

since the values inside 'context' are mutated internally by
logger, make sure to log serially upon errors not concurrently.
This commit is contained in:
Harshavardhana 2022-06-06 15:15:11 -07:00 committed by GitHub
parent 31c4fdbf79
commit df9eeb7f8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 24 deletions

View File

@ -137,24 +137,26 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
return errDiskNotFound
}
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData)
if err != nil {
if !IsErr(err, []error{
errFileNotFound,
errVolumeNotFound,
errFileVersionNotFound,
errDiskNotFound,
}...) {
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
disks[index], bucket, object, err),
disks[index].String())
}
}
return err
}, index)
}
errs := g.Wait()
for index, err := range errs {
if !IsErr(err, []error{
errFileNotFound,
errVolumeNotFound,
errFileVersionNotFound,
errDiskNotFound,
}...) {
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
disks[index], bucket, object, err),
disks[index].String())
}
}
// Return all the metadata.
return metadataArray, g.Wait()
return metadataArray, errs
}
// shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo()

View File

@ -476,16 +476,6 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
}
rf, err := disks[index].ReadXL(ctx, bucket, object, readData)
if err != nil {
if !IsErr(err, []error{
errFileNotFound,
errVolumeNotFound,
errFileVersionNotFound,
errDiskNotFound,
}...) {
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
disks[index], bucket, object, err),
disks[index].String())
}
return err
}
@ -502,6 +492,19 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
}
errs := g.Wait()
for index, err := range errs {
if !IsErr(err, []error{
errFileNotFound,
errVolumeNotFound,
errFileVersionNotFound,
errDiskNotFound,
}...) {
logger.LogOnceIf(ctx, fmt.Errorf("Drive %s, path (%s/%s) returned an error (%w)",
disks[index], bucket, object, err),
disks[index].String())
}
}
for index := range metadataArray {
if metadataArray[index] != nil {
metadataShallowVersions[index] = metadataArray[index].versions

View File

@ -2066,7 +2066,7 @@ func skipAccessChecks(volume string) (ok bool) {
// RenameData - rename source path to destination path atomically, metadata and data directory.
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (err error) {
defer func() {
if err != nil && !contextCanceled(ctx) {
if err != nil && !contextCanceled(ctx) && !errors.Is(err, errFileNotFound) {
// Only log these errors if context is not yet canceled.
logger.LogIf(ctx, fmt.Errorf("srcVolume: %s, srcPath: %s, dstVolume: %s:, dstPath: %s - error %v",
srcVolume, srcPath,