From 0a66a6f1e5d66cfba52b9f33a7979b30f741abe7 Mon Sep 17 00:00:00 2001 From: Poorna K Date: Wed, 8 Dec 2021 14:52:31 -0800 Subject: [PATCH] Avoid cache GC of writebacks before commit syncs (#13860) Save part.1 for writebacks in a separate folder and move it to cache dir atomically while saving the cache metadata. This is to avoid GC mistaking part.1 as orphaned cache entries and purging them. This PR also fixes object size being overwritten during retries for write-back mode. --- cmd/disk-cache-backend.go | 85 +++++++++++++++++++++++++++++++-------- cmd/disk-cache-utils.go | 4 +- cmd/disk-cache.go | 12 +++--- cmd/disk-cache_test.go | 2 +- 4 files changed, 78 insertions(+), 25 deletions(-) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 10db2f555..c36b93985 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -59,10 +59,13 @@ const ( cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days // SSECacheEncrypted is the metadata key indicating that the object // is a cache entry encrypted with cache KMS master key in globalCacheKMS. - SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache" - cacheMultipartDir = "multipart" + SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache" + cacheMultipartDir = "multipart" + cacheWritebackDir = "writeback" + cacheStaleUploadCleanupInterval = time.Hour * 24 cacheStaleUploadExpiry = time.Hour * 24 + cacheWBStaleUploadExpiry = time.Hour * 24 * 7 ) // CacheChecksumInfoV1 - carries checksums of individual blocks on disk. @@ -105,15 +108,15 @@ func (r *RangeInfo) Empty() bool { return r.Range == "" && r.File == "" && r.Size == 0 } -func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) { +func (m *cacheMeta) ToObjectInfo() (o ObjectInfo) { if len(m.Meta) == 0 { m.Meta = make(map[string]string) m.Stat.ModTime = timeSentinel } o = ObjectInfo{ - Bucket: bucket, - Name: object, + Bucket: m.Bucket, + Name: m.Object, CacheStatus: CacheHit, CacheLookupStatus: CacheHit, } @@ -381,7 +384,7 @@ func (c *diskCache) purge(ctx context.Context) { lastAtime := lastAtimeFn(meta.PartNumbers, pathJoin(c.dir, name)) // stat all cached file ranges. cachedRngFiles := fiStatRangesFn(meta.Ranges, pathJoin(c.dir, name)) - objInfo := meta.ToObjectInfo("", "") + objInfo := meta.ToObjectInfo() // prevent gc from clearing un-synced commits. This metadata is present when // cache writeback commit setting is enabled. status, ok := objInfo.UserDefined[writeBackStatusHeader] @@ -408,6 +411,9 @@ func (c *diskCache) purge(ctx context.Context) { } for fname, fi := range cachedRngFiles { + if fi == nil { + continue + } if cc != nil { if cc.isStale(objInfo.ModTime) { removeAll(fname) @@ -425,9 +431,11 @@ func (c *diskCache) purge(ctx context.Context) { } // clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile)) - if err != nil || (fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) { + if err != nil || (fi != nil && fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) { removeAll(cacheDir) - scorer.adjustSaveBytes(-fi.Size()) + if fi != nil { + scorer.adjustSaveBytes(-fi.Size()) + } // Proceed to next file. return nil } @@ -486,7 +494,7 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI if partial { return oi, numHits, errFileNotFound } - oi = meta.ToObjectInfo("", "") + oi = meta.ToObjectInfo() oi.Bucket = bucket oi.Name = object @@ -521,7 +529,7 @@ func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HT return } - oi = meta.ToObjectInfo("", "") + oi = meta.ToObjectInfo() oi.Bucket = bucket oi.Name = object if !partial { @@ -573,12 +581,16 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c if _, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)); err == nil { partial = false } + if writebackInProgress(meta.Meta) { + partial = false + } return meta, partial, meta.Hits, nil } // saves object metadata to disk cache // incHitsOnly is true if metadata update is incrementing only the hit counter -func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { +// finalizeWB is true only if metadata update accompanied by moving part from temp location to cache dir. +func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly, finalizeWB bool) error { cachedPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachedPath) lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) @@ -587,7 +599,18 @@ func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, met } ctx = lkctx.Context() defer cLock.Unlock(lkctx.Cancel) - return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly) + if err = c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly); err != nil { + return err + } + // move part saved in writeback directory and cache.json atomically + if finalizeWB { + wbdir := getCacheWriteBackSHADir(c.dir, bucket, object) + if err = renameAll(pathJoin(wbdir, cacheDataFile), pathJoin(cachedPath, cacheDataFile)); err != nil { + return err + } + removeAll(wbdir) // cleanup writeback/shadir + } + return nil } // saves object metadata to disk cache @@ -702,6 +725,11 @@ func getCacheSHADir(dir, bucket, object string) string { return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object)))) } +// returns temporary writeback cache location. +func getCacheWriteBackSHADir(dir, bucket, object string) string { + return pathJoin(dir, minioMetaBucket, "writeback", getSHA256Hash([]byte(pathJoin(bucket, object)))) +} + // Cache data to disk with bitrot checksum added for each block of 1MB func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) { if err := os.MkdirAll(cachePath, 0777); err != nil { @@ -846,6 +874,11 @@ func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Read if !c.diskSpaceAvailable(size) { return oi, errDiskFull } + + if writeback { + cachePath = getCacheWriteBackSHADir(c.dir, bucket, object) + } + if err := os.MkdirAll(cachePath, 0777); err != nil { return oi, err } @@ -1131,6 +1164,9 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang }() } else { go func() { + if writebackInProgress(objInfo.UserDefined) { + cacheObjPath = getCacheWriteBackSHADir(c.dir, bucket, object) + } filePath := pathJoin(cacheObjPath, cacheFile) err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw) if err != nil { @@ -1199,7 +1235,7 @@ func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) { return nil } - objInfo := meta.ToObjectInfo("", "") + objInfo := meta.ToObjectInfo() status, ok := objInfo.UserDefined[writeBackStatusHeader] if !ok || status == CommitComplete.String() { return nil @@ -1497,6 +1533,8 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, } uploadMeta.Stat.Size = objectSize uploadMeta.Stat.ModTime = roi.ModTime + uploadMeta.Bucket = bucket + uploadMeta.Object = object // if encrypted - make sure ETag updated uploadMeta.Meta["etag"] = roi.ETag @@ -1532,7 +1570,7 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, } renameAll(pathJoin(uploadIDDir, cacheMetaJSONFile), pathJoin(cachePath, cacheMetaJSONFile)) removeAll(uploadIDDir) // clean up any unused parts in the uploadIDDir - return uploadMeta.ToObjectInfo(bucket, object), nil + return uploadMeta.ToObjectInfo(), nil } func (c *diskCache) AbortUpload(bucket, object, uploadID string) (err error) { @@ -1579,9 +1617,6 @@ func getMultipartCacheSHADir(dir, bucket, object string) string { // clean up stale cache multipart uploads according to cleanup interval. func (c *diskCache) cleanupStaleUploads(ctx context.Context) { - if !c.commitWritethrough { - return - } timer := time.NewTimer(cacheStaleUploadCleanupInterval) defer timer.Stop() for { @@ -1605,6 +1640,22 @@ func (c *diskCache) cleanupStaleUploads(ctx context.Context) { return nil }) }) + // clean up of writeback folder where cache.json no longer exists in the main c.dir/ path + // and if past upload expiry window. + readDirFn(pathJoin(c.dir, minioMetaBucket, cacheWritebackDir), func(shaDir string, typ os.FileMode) error { + wbdir := pathJoin(c.dir, minioMetaBucket, cacheWritebackDir, shaDir) + cachedir := pathJoin(c.dir, shaDir) + if _, err := os.Stat(cachedir); os.IsNotExist(err) { + fi, err := os.Stat(wbdir) + if err != nil { + return nil + } + if now.Sub(fi.ModTime()) > cacheWBStaleUploadExpiry { + return removeAll(wbdir) + } + } + return nil + }) } } } diff --git a/cmd/disk-cache-utils.go b/cmd/disk-cache-utils.go index 2b184a042..7cd0989c7 100644 --- a/cmd/disk-cache-utils.go +++ b/cmd/disk-cache-utils.go @@ -585,8 +585,8 @@ func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer { return &multiWriter{backendWriter: w1, cacheWriter: w2} } -// skipETagVerification returns true if writeback commit is not complete -func skipETagVerification(m map[string]string) bool { +// writebackInProgress returns true if writeback commit is not complete +func writebackInProgress(m map[string]string) bool { if v, ok := m[writeBackStatusHeader]; ok { switch cacheCommitStatus(v) { case CommitPending, CommitFailed: diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 0064622f5..47a63bfc4 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -132,7 +132,7 @@ type cacheObjects struct { func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string, rs *HTTPRangeSpec) error { metadata := map[string]string{"etag": eTag} - return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true) + return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true, false) } // Backend metadata could have changed through server side copy - reset cache metadata if that is the case @@ -159,7 +159,7 @@ func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *disk bkObjectInfo.ETag != cacheObjInfo.ETag || bkObjectInfo.ContentType != cacheObjInfo.ContentType || !bkObjectInfo.Expires.Equal(cacheObjInfo.Expires) { - return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false) + return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false, false) } return c.incHitsToMeta(ctx, dcache, bucket, object, cacheObjInfo.Size, cacheObjInfo.ETag, rs) } @@ -277,7 +277,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string return bReader, err } // serve cached content without ETag verification if writeback commit is not yet complete - if skipETagVerification(cacheReader.ObjInfo.UserDefined) { + if writebackInProgress(cacheReader.ObjInfo.UserDefined) { return cacheReader, nil } } @@ -427,7 +427,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, return cachedObjInfo, nil } // serve cache metadata without ETag verification if writeback commit is not yet complete - if skipETagVerification(cachedObjInfo.UserDefined) { + if writebackInProgress(cachedObjInfo.UserDefined) { return cachedObjInfo, nil } } @@ -794,6 +794,7 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) { opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"] objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader), opts) wbCommitStatus := CommitComplete + size := objInfo.Size if err != nil { wbCommitStatus = CommitFailed } @@ -804,12 +805,13 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) { retryCnt, _ = strconv.Atoi(meta[writeBackRetryHeader]) retryCnt++ meta[writeBackRetryHeader] = strconv.Itoa(retryCnt) + size = cReader.ObjInfo.Size } else { delete(meta, writeBackRetryHeader) } meta[writeBackStatusHeader] = wbCommitStatus.String() meta["etag"] = oi.ETag - dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, objInfo.Size, nil, "", false) + dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, size, nil, "", false, wbCommitStatus == CommitComplete) if retryCnt > 0 { // slow down retries time.Sleep(time.Second * time.Duration(retryCnt%10+1)) diff --git a/cmd/disk-cache_test.go b/cmd/disk-cache_test.go index 102cd1cc9..02940dba4 100644 --- a/cmd/disk-cache_test.go +++ b/cmd/disk-cache_test.go @@ -25,7 +25,7 @@ import ( // Tests ToObjectInfo function. func TestCacheMetadataObjInfo(t *testing.T) { m := cacheMeta{Meta: nil} - objInfo := m.ToObjectInfo("testbucket", "testobject") + objInfo := m.ToObjectInfo() if objInfo.Size != 0 { t.Fatal("Unexpected object info value for Size", objInfo.Size) }