From a2aabfabd9e6b04adbf663748851a2e1c82c66a9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 14 Sep 2023 11:53:52 -0700 Subject: [PATCH] add backups for usage-caches to rely on upon error (#18029) This allows scanner to avoid lengthy scans, skip things appropriately and also not lose metrics in any manner. reduce longer deadlines for usage-cache loads/saves to match the disk timeout which is 2minutes now per IOP. --- cmd/bucket-replication.go | 17 ++++-- cmd/data-usage-cache.go | 110 +++++++++++++++++++++----------------- cmd/data-usage.go | 15 ++++-- 3 files changed, 87 insertions(+), 55 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 274a18085..d618483e7 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1501,9 +1501,18 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob var uploadedParts []minio.CompletePart // new multipart must not set mtime as it may lead to erroneous cleanups at various intervals. opts.Internal.SourceMTime = time.Time{} // this value is saved properly in CompleteMultipartUpload() - nctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - uploadID, err := c.NewMultipartUpload(nctx, bucket, object, opts) + var uploadID string + attempts := 1 + for attempts <= 3 { + nctx, cancel := context.WithTimeout(ctx, time.Minute) + uploadID, err = c.NewMultipartUpload(nctx, bucket, object, opts) + cancel() + if err == nil { + break + } + attempts++ + time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) + } if err != nil { return err } @@ -1524,7 +1533,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob fmt.Errorf("trying %s: Unable to cleanup failed multipart replication %s on remote %s/%s: %w - this may consume space on remote cluster", humanize.Ordinal(attempts), uploadID, bucket, object, aerr)) attempts++ - time.Sleep(time.Second) + time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) } } }() diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index bf631e050..8bc86052a 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -38,6 +38,7 @@ import ( "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" "github.com/tinylib/msgp/msgp" + "github.com/valyala/bytebufferpool" ) //go:generate msgp -file $GOFILE -unexported @@ -927,34 +928,42 @@ type objectIO interface { // The loader is optimistic and has no locking, but tries 5 times before giving up. // If the object is not found or unable to deserialize d is cleared and nil error is returned. func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) error { - // Abandon if more than 5 minutes, so we don't hold up scanner. - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() + load := func(name string, timeout time.Duration) (bool, error) { + // Abandon if more than time.Minute, so we don't hold up scanner. + // drive timeout by default is 2 minutes, we do not need to wait longer. + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - // Caches are read+written without locks, - retries := 0 - for retries < 5 { r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true}) if err != nil { switch err.(type) { case ObjectNotFound, BucketNotFound: case InsufficientReadQuorum, StorageErr: - retries++ - time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) - continue - default: - return toObjectErr(err, dataUsageBucket, name) + return true, nil } - *d = dataUsageCache{} - return nil + return false, toObjectErr(err, dataUsageBucket, name) } - if err := d.deserialize(r); err != nil { - r.Close() + err = d.deserialize(r) + r.Close() + return err != nil, nil + } + + // Caches are read+written without locks, + retries := 0 + for retries < 5 { + retry, err := load(name, time.Minute) + if err != nil { + return err + } + if retry { + retry, _ = load(name+".bkp", 30*time.Second) + if !retry { + break + } retries++ time.Sleep(time.Duration(rand.Int63n(int64(time.Second)))) continue } - r.Close() return nil } *d = dataUsageCache{} @@ -967,47 +976,52 @@ var maxConcurrentScannerSaves = make(chan struct{}, 4) // save the content of the cache to minioMetaBackgroundOpsBucket with the provided name. // Note that no locking is done when saving. func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error { - var r io.Reader - maxConcurrentScannerSaves <- struct{}{} + select { + case <-ctx.Done(): + return ctx.Err() + case maxConcurrentScannerSaves <- struct{}{}: + } defer func() { - <-maxConcurrentScannerSaves - }() - // If big, do streaming... - size := int64(-1) - if len(d.Cache) > 10000 { - pr, pw := io.Pipe() - go func() { - pw.CloseWithError(d.serializeTo(pw)) - }() - defer pr.Close() - r = pr - } else { - var buf bytes.Buffer - err := d.serializeTo(&buf) - if err != nil { - return err + select { + case <-ctx.Done(): + case <-maxConcurrentScannerSaves: } - r = &buf - size = int64(buf.Len()) + }() + + buf := bytebufferpool.Get() + defer func() { + buf.Reset() + bytebufferpool.Put(buf) + }() + + if err := d.serializeTo(buf); err != nil { + return err } - hr, err := hash.NewReader(r, size, "", "", size) + hr, err := hash.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), "", "", int64(buf.Len())) if err != nil { return err } - // Abandon if more than 5 minutes, so we don't hold up scanner. - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - _, err = store.PutObject(ctx, - dataUsageBucket, - name, - NewPutObjReader(hr), - ObjectOptions{NoLock: true}) - if isErrBucketNotFound(err) { - return nil + save := func(name string, timeout time.Duration) error { + // Abandon if more than a minute, so we don't hold up scanner. + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + _, err = store.PutObject(ctx, + dataUsageBucket, + name, + NewPutObjReader(hr), + ObjectOptions{NoLock: true}) + if isErrBucketNotFound(err) { + return nil + } + return err } - return err + defer save(name+".bkp", 30*time.Second) // Keep a backup as well + + // drive timeout by default is 2 minutes, we do not need to wait longer. + return save(name, time.Minute) } // dataUsageCacheVer indicates the cache version. diff --git a/cmd/data-usage.go b/cmd/data-usage.go index cecbb25a1..eff61740d 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -42,6 +42,7 @@ const ( // storeDataUsageInBackend will store all objects sent on the gui channel until closed. func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan DataUsageInfo) { + attempts := 1 for dataUsageInfo := range dui { json := jsoniter.ConfigCompatibleWithStandardLibrary dataUsageJSON, err := json.Marshal(dataUsageInfo) @@ -49,9 +50,14 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan logger.LogIf(ctx, err) continue } + if attempts > 10 { + saveConfig(ctx, objAPI, dataUsageObjNamePath+".bkp", dataUsageJSON) // Save a backup every 10th update. + attempts = 1 + } if err = saveConfig(ctx, objAPI, dataUsageObjNamePath, dataUsageJSON); err != nil { logger.LogIf(ctx, err) } + attempts++ } } @@ -94,10 +100,13 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) { buf, err := readConfig(ctx, objAPI, dataUsageObjNamePath) if err != nil { - if errors.Is(err, errConfigNotFound) { - return DataUsageInfo{}, nil + buf, err = readConfig(ctx, objAPI, dataUsageObjNamePath+".bkp") + if err != nil { + if errors.Is(err, errConfigNotFound) { + return DataUsageInfo{}, nil + } + return DataUsageInfo{}, toObjectErr(err, minioMetaBucket, dataUsageObjNamePath) } - return DataUsageInfo{}, toObjectErr(err, minioMetaBucket, dataUsageObjNamePath) } var dataUsageInfo DataUsageInfo