reduce unnecessary logging during speedtest (#14387)

- speedtest logs calls that were canceled
  spuriously, in situations where it should
  be ignored.

- all errors of interest are always sent back
  to the client there is no need to log them
  on the server console.

- PUT failures should negate the increments
  such that GET is not attempted on unsuccessful
  calls.

- do not attempt MRF on speedtest objects.
This commit is contained in:
Harshavardhana 2022-02-23 11:59:13 -08:00 committed by GitHub
parent 1ef8babfef
commit 9d7648f02f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 18 deletions

View File

@ -67,6 +67,9 @@ func reduceErrs(errs []error, ignoredErrs []error) (maxCount int, maxErr error)
// values of maximally occurring errors validated against a generic // values of maximally occurring errors validated against a generic
// quorum number that can be read or write quorum depending on usage. // quorum number that can be read or write quorum depending on usage.
func reduceQuorumErrs(ctx context.Context, errs []error, ignoredErrs []error, quorum int, quorumErr error) error { func reduceQuorumErrs(ctx context.Context, errs []error, ignoredErrs []error, quorum int, quorumErr error) error {
if contextCanceled(ctx) {
return context.Canceled
}
maxCount, maxErr := reduceErrs(errs, ignoredErrs) maxCount, maxErr := reduceErrs(errs, ignoredErrs)
if maxCount >= quorum { if maxCount >= quorum {
return maxErr return maxErr

View File

@ -574,7 +574,7 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
// Wait for all renames to finish. // Wait for all renames to finish.
errs := g.Wait() errs := g.Wait()
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum // We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames. // otherwise return failure. Cleanup successful renames.
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
return evalDisks(disks, errs), err return evalDisks(disks, errs), err
@ -1001,14 +1001,18 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
} }
} }
// Whether a disk was initially or becomes offline // For speedtest objects do not attempt to heal them.
// during this upload, send it to the MRF list. if !opts.Speedtest {
for i := 0; i < len(onlineDisks); i++ { // Whether a disk was initially or becomes offline
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { // during this upload, send it to the MRF list.
continue for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
continue
}
er.addPartial(bucket, object, fi.VersionID, fi.Size)
break
} }
er.addPartial(bucket, object, fi.VersionID, fi.Size)
break
} }
fi.ReplicationState = opts.PutReplicationState() fi.ReplicationState = opts.PutReplicationState()

View File

@ -31,6 +31,9 @@ func toObjectErr(err error, params ...string) error {
if err == nil { if err == nil {
return nil return nil
} }
if errors.Is(err, context.Canceled) {
return context.Canceled
}
switch err.Error() { switch err.Error() {
case errVolumeNotFound.Error(): case errVolumeNotFound.Error():
apiErr := BucketNotFound{} apiErr := BucketNotFound{}

View File

@ -66,7 +66,9 @@ type ObjectOptions struct {
ReplicationSourceTaggingTimestamp time.Time // set if MinIOSourceTaggingTimestamp received ReplicationSourceTaggingTimestamp time.Time // set if MinIOSourceTaggingTimestamp received
ReplicationSourceLegalholdTimestamp time.Time // set if MinIOSourceObjectLegalholdTimestamp received ReplicationSourceLegalholdTimestamp time.Time // set if MinIOSourceObjectLegalholdTimestamp received
ReplicationSourceRetentionTimestamp time.Time // set if MinIOSourceObjectRetentionTimestamp received ReplicationSourceRetentionTimestamp time.Time // set if MinIOSourceObjectRetentionTimestamp received
DeletePrefix bool // set true to enforce a prefix deletion, only application for DeleteObject API, DeletePrefix bool // set true to enforce a prefix deletion, only application for DeleteObject API,
Speedtest bool // object call specifically meant for SpeedTest code, set to 'true' when invoked by SpeedtestHandler.
// Use the maximum parity (N/2), used when saving server configuration files // Use the maximum parity (N/2), used when saving server configuration files
MaxParity bool MaxParity bool

View File

@ -1169,6 +1169,7 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
var totalBytesRead uint64 var totalBytesRead uint64
objCountPerThread := make([]uint64, concurrent) objCountPerThread := make([]uint64, concurrent)
uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
defer uploadsCancel() defer uploadsCancel()
@ -1187,10 +1188,9 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
hashReader, err := hash.NewReader(newRandomReader(size), hashReader, err := hash.NewReader(newRandomReader(size),
int64(size), "", "", int64(size)) int64(size), "", "", int64(size))
if err != nil { if err != nil {
if !contextCanceled(uploadsCtx) { if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() { errOnce.Do(func() {
retError = err.Error() retError = err.Error()
logger.LogIf(ctx, err)
}) })
} }
uploadsCancel() uploadsCancel()
@ -1202,12 +1202,13 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
UserDefined: map[string]string{ UserDefined: map[string]string{
xhttp.AmzStorageClass: storageClass, xhttp.AmzStorageClass: storageClass,
}, },
Speedtest: true,
}) })
if err != nil { if err != nil {
if !contextCanceled(uploadsCtx) { objCountPerThread[i]--
if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() { errOnce.Do(func() {
retError = err.Error() retError = err.Error()
logger.LogIf(ctx, err)
}) })
} }
uploadsCancel() uploadsCancel()
@ -1247,10 +1248,12 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
r, err := objAPI.GetObjectNInfo(downloadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d", r, err := objAPI.GetObjectNInfo(downloadsCtx, minioMetaBucket, fmt.Sprintf("%s.%d.%d",
objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{}) objNamePrefix, i, j), nil, nil, noLock, ObjectOptions{})
if err != nil { if err != nil {
if !contextCanceled(downloadsCtx) { if isErrObjectNotFound(err) {
continue
}
if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() { errOnce.Do(func() {
retError = err.Error() retError = err.Error()
logger.LogIf(ctx, err)
}) })
} }
downloadsCancel() downloadsCancel()
@ -1265,10 +1268,9 @@ func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Dura
atomic.AddUint64(&totalBytesRead, uint64(n)) atomic.AddUint64(&totalBytesRead, uint64(n))
} }
if err != nil { if err != nil {
if !contextCanceled(downloadsCtx) { if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
errOnce.Do(func() { errOnce.Do(func() {
retError = err.Error() retError = err.Error()
logger.LogIf(ctx, err)
}) })
} }
downloadsCancel() downloadsCancel()

View File

@ -2281,7 +2281,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// Any failed rename calls un-roll previous transaction. // Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true) s.deleteFile(dstVolumeDir, legacyDataPath, true)
} }
return err return osErrToFileErr(err)
} }
// renameAll only for objects that have xl.meta not saved inline. // renameAll only for objects that have xl.meta not saved inline.