diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 10db2f555..c36b93985 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -59,10 +59,13 @@ const ( cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days // SSECacheEncrypted is the metadata key indicating that the object // is a cache entry encrypted with cache KMS master key in globalCacheKMS. - SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache" - cacheMultipartDir = "multipart" + SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache" + cacheMultipartDir = "multipart" + cacheWritebackDir = "writeback" + cacheStaleUploadCleanupInterval = time.Hour * 24 cacheStaleUploadExpiry = time.Hour * 24 + cacheWBStaleUploadExpiry = time.Hour * 24 * 7 ) // CacheChecksumInfoV1 - carries checksums of individual blocks on disk. @@ -105,15 +108,15 @@ func (r *RangeInfo) Empty() bool { return r.Range == "" && r.File == "" && r.Size == 0 } -func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) { +func (m *cacheMeta) ToObjectInfo() (o ObjectInfo) { if len(m.Meta) == 0 { m.Meta = make(map[string]string) m.Stat.ModTime = timeSentinel } o = ObjectInfo{ - Bucket: bucket, - Name: object, + Bucket: m.Bucket, + Name: m.Object, CacheStatus: CacheHit, CacheLookupStatus: CacheHit, } @@ -381,7 +384,7 @@ func (c *diskCache) purge(ctx context.Context) { lastAtime := lastAtimeFn(meta.PartNumbers, pathJoin(c.dir, name)) // stat all cached file ranges. cachedRngFiles := fiStatRangesFn(meta.Ranges, pathJoin(c.dir, name)) - objInfo := meta.ToObjectInfo("", "") + objInfo := meta.ToObjectInfo() // prevent gc from clearing un-synced commits. This metadata is present when // cache writeback commit setting is enabled. status, ok := objInfo.UserDefined[writeBackStatusHeader] @@ -408,6 +411,9 @@ func (c *diskCache) purge(ctx context.Context) { } for fname, fi := range cachedRngFiles { + if fi == nil { + continue + } if cc != nil { if cc.isStale(objInfo.ModTime) { removeAll(fname) @@ -425,9 +431,11 @@ func (c *diskCache) purge(ctx context.Context) { } // clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile)) - if err != nil || (fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) { + if err != nil || (fi != nil && fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) { removeAll(cacheDir) - scorer.adjustSaveBytes(-fi.Size()) + if fi != nil { + scorer.adjustSaveBytes(-fi.Size()) + } // Proceed to next file. return nil } @@ -486,7 +494,7 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI if partial { return oi, numHits, errFileNotFound } - oi = meta.ToObjectInfo("", "") + oi = meta.ToObjectInfo() oi.Bucket = bucket oi.Name = object @@ -521,7 +529,7 @@ func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HT return } - oi = meta.ToObjectInfo("", "") + oi = meta.ToObjectInfo() oi.Bucket = bucket oi.Name = object if !partial { @@ -573,12 +581,16 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c if _, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)); err == nil { partial = false } + if writebackInProgress(meta.Meta) { + partial = false + } return meta, partial, meta.Hits, nil } // saves object metadata to disk cache // incHitsOnly is true if metadata update is incrementing only the hit counter -func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { +// finalizeWB is true only if metadata update accompanied by moving part from temp location to cache dir. +func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly, finalizeWB bool) error { cachedPath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(cachedPath) lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) @@ -587,7 +599,18 @@ func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, met } ctx = lkctx.Context() defer cLock.Unlock(lkctx.Cancel) - return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly) + if err = c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly); err != nil { + return err + } + // move part saved in writeback directory and cache.json atomically + if finalizeWB { + wbdir := getCacheWriteBackSHADir(c.dir, bucket, object) + if err = renameAll(pathJoin(wbdir, cacheDataFile), pathJoin(cachedPath, cacheDataFile)); err != nil { + return err + } + removeAll(wbdir) // cleanup writeback/shadir + } + return nil } // saves object metadata to disk cache @@ -702,6 +725,11 @@ func getCacheSHADir(dir, bucket, object string) string { return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object)))) } +// returns temporary writeback cache location. +func getCacheWriteBackSHADir(dir, bucket, object string) string { + return pathJoin(dir, minioMetaBucket, "writeback", getSHA256Hash([]byte(pathJoin(bucket, object)))) +} + // Cache data to disk with bitrot checksum added for each block of 1MB func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) { if err := os.MkdirAll(cachePath, 0777); err != nil { @@ -846,6 +874,11 @@ func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Read if !c.diskSpaceAvailable(size) { return oi, errDiskFull } + + if writeback { + cachePath = getCacheWriteBackSHADir(c.dir, bucket, object) + } + if err := os.MkdirAll(cachePath, 0777); err != nil { return oi, err } @@ -1131,6 +1164,9 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang }() } else { go func() { + if writebackInProgress(objInfo.UserDefined) { + cacheObjPath = getCacheWriteBackSHADir(c.dir, bucket, object) + } filePath := pathJoin(cacheObjPath, cacheFile) err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw) if err != nil { @@ -1199,7 +1235,7 @@ func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) { return nil } - objInfo := meta.ToObjectInfo("", "") + objInfo := meta.ToObjectInfo() status, ok := objInfo.UserDefined[writeBackStatusHeader] if !ok || status == CommitComplete.String() { return nil @@ -1497,6 +1533,8 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, } uploadMeta.Stat.Size = objectSize uploadMeta.Stat.ModTime = roi.ModTime + uploadMeta.Bucket = bucket + uploadMeta.Object = object // if encrypted - make sure ETag updated uploadMeta.Meta["etag"] = roi.ETag @@ -1532,7 +1570,7 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, } renameAll(pathJoin(uploadIDDir, cacheMetaJSONFile), pathJoin(cachePath, cacheMetaJSONFile)) removeAll(uploadIDDir) // clean up any unused parts in the uploadIDDir - return uploadMeta.ToObjectInfo(bucket, object), nil + return uploadMeta.ToObjectInfo(), nil } func (c *diskCache) AbortUpload(bucket, object, uploadID string) (err error) { @@ -1579,9 +1617,6 @@ func getMultipartCacheSHADir(dir, bucket, object string) string { // clean up stale cache multipart uploads according to cleanup interval. func (c *diskCache) cleanupStaleUploads(ctx context.Context) { - if !c.commitWritethrough { - return - } timer := time.NewTimer(cacheStaleUploadCleanupInterval) defer timer.Stop() for { @@ -1605,6 +1640,22 @@ func (c *diskCache) cleanupStaleUploads(ctx context.Context) { return nil }) }) + // clean up of writeback folder where cache.json no longer exists in the main c.dir/ path + // and if past upload expiry window. + readDirFn(pathJoin(c.dir, minioMetaBucket, cacheWritebackDir), func(shaDir string, typ os.FileMode) error { + wbdir := pathJoin(c.dir, minioMetaBucket, cacheWritebackDir, shaDir) + cachedir := pathJoin(c.dir, shaDir) + if _, err := os.Stat(cachedir); os.IsNotExist(err) { + fi, err := os.Stat(wbdir) + if err != nil { + return nil + } + if now.Sub(fi.ModTime()) > cacheWBStaleUploadExpiry { + return removeAll(wbdir) + } + } + return nil + }) } } } diff --git a/cmd/disk-cache-utils.go b/cmd/disk-cache-utils.go index 2b184a042..7cd0989c7 100644 --- a/cmd/disk-cache-utils.go +++ b/cmd/disk-cache-utils.go @@ -585,8 +585,8 @@ func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer { return &multiWriter{backendWriter: w1, cacheWriter: w2} } -// skipETagVerification returns true if writeback commit is not complete -func skipETagVerification(m map[string]string) bool { +// writebackInProgress returns true if writeback commit is not complete +func writebackInProgress(m map[string]string) bool { if v, ok := m[writeBackStatusHeader]; ok { switch cacheCommitStatus(v) { case CommitPending, CommitFailed: diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 0064622f5..47a63bfc4 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -132,7 +132,7 @@ type cacheObjects struct { func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string, rs *HTTPRangeSpec) error { metadata := map[string]string{"etag": eTag} - return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true) + return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true, false) } // Backend metadata could have changed through server side copy - reset cache metadata if that is the case @@ -159,7 +159,7 @@ func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *disk bkObjectInfo.ETag != cacheObjInfo.ETag || bkObjectInfo.ContentType != cacheObjInfo.ContentType || !bkObjectInfo.Expires.Equal(cacheObjInfo.Expires) { - return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false) + return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false, false) } return c.incHitsToMeta(ctx, dcache, bucket, object, cacheObjInfo.Size, cacheObjInfo.ETag, rs) } @@ -277,7 +277,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string return bReader, err } // serve cached content without ETag verification if writeback commit is not yet complete - if skipETagVerification(cacheReader.ObjInfo.UserDefined) { + if writebackInProgress(cacheReader.ObjInfo.UserDefined) { return cacheReader, nil } } @@ -427,7 +427,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, return cachedObjInfo, nil } // serve cache metadata without ETag verification if writeback commit is not yet complete - if skipETagVerification(cachedObjInfo.UserDefined) { + if writebackInProgress(cachedObjInfo.UserDefined) { return cachedObjInfo, nil } } @@ -794,6 +794,7 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) { opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"] objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader), opts) wbCommitStatus := CommitComplete + size := objInfo.Size if err != nil { wbCommitStatus = CommitFailed } @@ -804,12 +805,13 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) { retryCnt, _ = strconv.Atoi(meta[writeBackRetryHeader]) retryCnt++ meta[writeBackRetryHeader] = strconv.Itoa(retryCnt) + size = cReader.ObjInfo.Size } else { delete(meta, writeBackRetryHeader) } meta[writeBackStatusHeader] = wbCommitStatus.String() meta["etag"] = oi.ETag - dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, objInfo.Size, nil, "", false) + dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, size, nil, "", false, wbCommitStatus == CommitComplete) if retryCnt > 0 { // slow down retries time.Sleep(time.Second * time.Duration(retryCnt%10+1)) diff --git a/cmd/disk-cache_test.go b/cmd/disk-cache_test.go index 102cd1cc9..02940dba4 100644 --- a/cmd/disk-cache_test.go +++ b/cmd/disk-cache_test.go @@ -25,7 +25,7 @@ import ( // Tests ToObjectInfo function. func TestCacheMetadataObjInfo(t *testing.T) { m := cacheMeta{Meta: nil} - objInfo := m.ToObjectInfo("testbucket", "testobject") + objInfo := m.ToObjectInfo() if objInfo.Size != 0 { t.Fatal("Unexpected object info value for Size", objInfo.Size) }