mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Avoid cache GC of writebacks before commit syncs (#13860)
Save part.1 for writebacks in a separate folder and move it to cache dir atomically while saving the cache metadata. This is to avoid GC mistaking part.1 as orphaned cache entries and purging them. This PR also fixes object size being overwritten during retries for write-back mode.
This commit is contained in:
parent
e82a5c5c54
commit
0a66a6f1e5
@ -61,8 +61,11 @@ const (
|
||||
// is a cache entry encrypted with cache KMS master key in globalCacheKMS.
|
||||
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
|
||||
cacheMultipartDir = "multipart"
|
||||
cacheWritebackDir = "writeback"
|
||||
|
||||
cacheStaleUploadCleanupInterval = time.Hour * 24
|
||||
cacheStaleUploadExpiry = time.Hour * 24
|
||||
cacheWBStaleUploadExpiry = time.Hour * 24 * 7
|
||||
)
|
||||
|
||||
// CacheChecksumInfoV1 - carries checksums of individual blocks on disk.
|
||||
@ -105,15 +108,15 @@ func (r *RangeInfo) Empty() bool {
|
||||
return r.Range == "" && r.File == "" && r.Size == 0
|
||||
}
|
||||
|
||||
func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
|
||||
func (m *cacheMeta) ToObjectInfo() (o ObjectInfo) {
|
||||
if len(m.Meta) == 0 {
|
||||
m.Meta = make(map[string]string)
|
||||
m.Stat.ModTime = timeSentinel
|
||||
}
|
||||
|
||||
o = ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
Bucket: m.Bucket,
|
||||
Name: m.Object,
|
||||
CacheStatus: CacheHit,
|
||||
CacheLookupStatus: CacheHit,
|
||||
}
|
||||
@ -381,7 +384,7 @@ func (c *diskCache) purge(ctx context.Context) {
|
||||
lastAtime := lastAtimeFn(meta.PartNumbers, pathJoin(c.dir, name))
|
||||
// stat all cached file ranges.
|
||||
cachedRngFiles := fiStatRangesFn(meta.Ranges, pathJoin(c.dir, name))
|
||||
objInfo := meta.ToObjectInfo("", "")
|
||||
objInfo := meta.ToObjectInfo()
|
||||
// prevent gc from clearing un-synced commits. This metadata is present when
|
||||
// cache writeback commit setting is enabled.
|
||||
status, ok := objInfo.UserDefined[writeBackStatusHeader]
|
||||
@ -408,6 +411,9 @@ func (c *diskCache) purge(ctx context.Context) {
|
||||
}
|
||||
|
||||
for fname, fi := range cachedRngFiles {
|
||||
if fi == nil {
|
||||
continue
|
||||
}
|
||||
if cc != nil {
|
||||
if cc.isStale(objInfo.ModTime) {
|
||||
removeAll(fname)
|
||||
@ -425,9 +431,11 @@ func (c *diskCache) purge(ctx context.Context) {
|
||||
}
|
||||
// clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json
|
||||
fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile))
|
||||
if err != nil || (fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) {
|
||||
if err != nil || (fi != nil && fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) {
|
||||
removeAll(cacheDir)
|
||||
if fi != nil {
|
||||
scorer.adjustSaveBytes(-fi.Size())
|
||||
}
|
||||
// Proceed to next file.
|
||||
return nil
|
||||
}
|
||||
@ -486,7 +494,7 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
|
||||
if partial {
|
||||
return oi, numHits, errFileNotFound
|
||||
}
|
||||
oi = meta.ToObjectInfo("", "")
|
||||
oi = meta.ToObjectInfo()
|
||||
oi.Bucket = bucket
|
||||
oi.Name = object
|
||||
|
||||
@ -521,7 +529,7 @@ func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HT
|
||||
return
|
||||
}
|
||||
|
||||
oi = meta.ToObjectInfo("", "")
|
||||
oi = meta.ToObjectInfo()
|
||||
oi.Bucket = bucket
|
||||
oi.Name = object
|
||||
if !partial {
|
||||
@ -573,12 +581,16 @@ func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *c
|
||||
if _, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)); err == nil {
|
||||
partial = false
|
||||
}
|
||||
if writebackInProgress(meta.Meta) {
|
||||
partial = false
|
||||
}
|
||||
return meta, partial, meta.Hits, nil
|
||||
}
|
||||
|
||||
// saves object metadata to disk cache
|
||||
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
||||
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
|
||||
// finalizeWB is true only if metadata update accompanied by moving part from temp location to cache dir.
|
||||
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly, finalizeWB bool) error {
|
||||
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
||||
cLock := c.NewNSLockFn(cachedPath)
|
||||
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
|
||||
@ -587,7 +599,18 @@ func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, met
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer cLock.Unlock(lkctx.Cancel)
|
||||
return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly)
|
||||
if err = c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly); err != nil {
|
||||
return err
|
||||
}
|
||||
// move part saved in writeback directory and cache.json atomically
|
||||
if finalizeWB {
|
||||
wbdir := getCacheWriteBackSHADir(c.dir, bucket, object)
|
||||
if err = renameAll(pathJoin(wbdir, cacheDataFile), pathJoin(cachedPath, cacheDataFile)); err != nil {
|
||||
return err
|
||||
}
|
||||
removeAll(wbdir) // cleanup writeback/shadir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// saves object metadata to disk cache
|
||||
@ -702,6 +725,11 @@ func getCacheSHADir(dir, bucket, object string) string {
|
||||
return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object))))
|
||||
}
|
||||
|
||||
// returns temporary writeback cache location.
|
||||
func getCacheWriteBackSHADir(dir, bucket, object string) string {
|
||||
return pathJoin(dir, minioMetaBucket, "writeback", getSHA256Hash([]byte(pathJoin(bucket, object))))
|
||||
}
|
||||
|
||||
// Cache data to disk with bitrot checksum added for each block of 1MB
|
||||
func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) {
|
||||
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
||||
@ -846,6 +874,11 @@ func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Read
|
||||
if !c.diskSpaceAvailable(size) {
|
||||
return oi, errDiskFull
|
||||
}
|
||||
|
||||
if writeback {
|
||||
cachePath = getCacheWriteBackSHADir(c.dir, bucket, object)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@ -1131,6 +1164,9 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
|
||||
}()
|
||||
} else {
|
||||
go func() {
|
||||
if writebackInProgress(objInfo.UserDefined) {
|
||||
cacheObjPath = getCacheWriteBackSHADir(c.dir, bucket, object)
|
||||
}
|
||||
filePath := pathJoin(cacheObjPath, cacheFile)
|
||||
err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw)
|
||||
if err != nil {
|
||||
@ -1199,7 +1235,7 @@ func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
|
||||
return nil
|
||||
}
|
||||
|
||||
objInfo := meta.ToObjectInfo("", "")
|
||||
objInfo := meta.ToObjectInfo()
|
||||
status, ok := objInfo.UserDefined[writeBackStatusHeader]
|
||||
if !ok || status == CommitComplete.String() {
|
||||
return nil
|
||||
@ -1497,6 +1533,8 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object,
|
||||
}
|
||||
uploadMeta.Stat.Size = objectSize
|
||||
uploadMeta.Stat.ModTime = roi.ModTime
|
||||
uploadMeta.Bucket = bucket
|
||||
uploadMeta.Object = object
|
||||
// if encrypted - make sure ETag updated
|
||||
|
||||
uploadMeta.Meta["etag"] = roi.ETag
|
||||
@ -1532,7 +1570,7 @@ func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object,
|
||||
}
|
||||
renameAll(pathJoin(uploadIDDir, cacheMetaJSONFile), pathJoin(cachePath, cacheMetaJSONFile))
|
||||
removeAll(uploadIDDir) // clean up any unused parts in the uploadIDDir
|
||||
return uploadMeta.ToObjectInfo(bucket, object), nil
|
||||
return uploadMeta.ToObjectInfo(), nil
|
||||
}
|
||||
|
||||
func (c *diskCache) AbortUpload(bucket, object, uploadID string) (err error) {
|
||||
@ -1579,9 +1617,6 @@ func getMultipartCacheSHADir(dir, bucket, object string) string {
|
||||
|
||||
// clean up stale cache multipart uploads according to cleanup interval.
|
||||
func (c *diskCache) cleanupStaleUploads(ctx context.Context) {
|
||||
if !c.commitWritethrough {
|
||||
return
|
||||
}
|
||||
timer := time.NewTimer(cacheStaleUploadCleanupInterval)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
@ -1605,6 +1640,22 @@ func (c *diskCache) cleanupStaleUploads(ctx context.Context) {
|
||||
return nil
|
||||
})
|
||||
})
|
||||
// clean up of writeback folder where cache.json no longer exists in the main c.dir/<sha256(bucket,object> path
|
||||
// and if past upload expiry window.
|
||||
readDirFn(pathJoin(c.dir, minioMetaBucket, cacheWritebackDir), func(shaDir string, typ os.FileMode) error {
|
||||
wbdir := pathJoin(c.dir, minioMetaBucket, cacheWritebackDir, shaDir)
|
||||
cachedir := pathJoin(c.dir, shaDir)
|
||||
if _, err := os.Stat(cachedir); os.IsNotExist(err) {
|
||||
fi, err := os.Stat(wbdir)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if now.Sub(fi.ModTime()) > cacheWBStaleUploadExpiry {
|
||||
return removeAll(wbdir)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -585,8 +585,8 @@ func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer {
|
||||
return &multiWriter{backendWriter: w1, cacheWriter: w2}
|
||||
}
|
||||
|
||||
// skipETagVerification returns true if writeback commit is not complete
|
||||
func skipETagVerification(m map[string]string) bool {
|
||||
// writebackInProgress returns true if writeback commit is not complete
|
||||
func writebackInProgress(m map[string]string) bool {
|
||||
if v, ok := m[writeBackStatusHeader]; ok {
|
||||
switch cacheCommitStatus(v) {
|
||||
case CommitPending, CommitFailed:
|
||||
|
@ -132,7 +132,7 @@ type cacheObjects struct {
|
||||
|
||||
func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string, rs *HTTPRangeSpec) error {
|
||||
metadata := map[string]string{"etag": eTag}
|
||||
return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true)
|
||||
return dcache.SaveMetadata(ctx, bucket, object, metadata, size, rs, "", true, false)
|
||||
}
|
||||
|
||||
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
|
||||
@ -159,7 +159,7 @@ func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *disk
|
||||
bkObjectInfo.ETag != cacheObjInfo.ETag ||
|
||||
bkObjectInfo.ContentType != cacheObjInfo.ContentType ||
|
||||
!bkObjectInfo.Expires.Equal(cacheObjInfo.Expires) {
|
||||
return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false)
|
||||
return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false, false)
|
||||
}
|
||||
return c.incHitsToMeta(ctx, dcache, bucket, object, cacheObjInfo.Size, cacheObjInfo.ETag, rs)
|
||||
}
|
||||
@ -277,7 +277,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
return bReader, err
|
||||
}
|
||||
// serve cached content without ETag verification if writeback commit is not yet complete
|
||||
if skipETagVerification(cacheReader.ObjInfo.UserDefined) {
|
||||
if writebackInProgress(cacheReader.ObjInfo.UserDefined) {
|
||||
return cacheReader, nil
|
||||
}
|
||||
}
|
||||
@ -427,7 +427,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
|
||||
return cachedObjInfo, nil
|
||||
}
|
||||
// serve cache metadata without ETag verification if writeback commit is not yet complete
|
||||
if skipETagVerification(cachedObjInfo.UserDefined) {
|
||||
if writebackInProgress(cachedObjInfo.UserDefined) {
|
||||
return cachedObjInfo, nil
|
||||
}
|
||||
}
|
||||
@ -794,6 +794,7 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
|
||||
opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"]
|
||||
objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader), opts)
|
||||
wbCommitStatus := CommitComplete
|
||||
size := objInfo.Size
|
||||
if err != nil {
|
||||
wbCommitStatus = CommitFailed
|
||||
}
|
||||
@ -804,12 +805,13 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
|
||||
retryCnt, _ = strconv.Atoi(meta[writeBackRetryHeader])
|
||||
retryCnt++
|
||||
meta[writeBackRetryHeader] = strconv.Itoa(retryCnt)
|
||||
size = cReader.ObjInfo.Size
|
||||
} else {
|
||||
delete(meta, writeBackRetryHeader)
|
||||
}
|
||||
meta[writeBackStatusHeader] = wbCommitStatus.String()
|
||||
meta["etag"] = oi.ETag
|
||||
dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, objInfo.Size, nil, "", false)
|
||||
dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, size, nil, "", false, wbCommitStatus == CommitComplete)
|
||||
if retryCnt > 0 {
|
||||
// slow down retries
|
||||
time.Sleep(time.Second * time.Duration(retryCnt%10+1))
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
// Tests ToObjectInfo function.
|
||||
func TestCacheMetadataObjInfo(t *testing.T) {
|
||||
m := cacheMeta{Meta: nil}
|
||||
objInfo := m.ToObjectInfo("testbucket", "testobject")
|
||||
objInfo := m.ToObjectInfo()
|
||||
if objInfo.Size != 0 {
|
||||
t.Fatal("Unexpected object info value for Size", objInfo.Size)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user