Default multipart caching to writethrough (#13613)

when `MINIO_CACHE_COMMIT` is set.

- `writeback` caching applies only to single 
uploads. When cache commit mode is 
`writeback`, default multipart caching to be
synchronous.

- Add writethrough caching for single uploads
This commit is contained in:
Poorna K 2021-11-10 11:12:03 -05:00 committed by GitHub
parent 0a6f9bc1eb
commit 03725dc015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 45 deletions

View File

@ -654,6 +654,50 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
return jsonSave(f, m) return jsonSave(f, m)
} }
// updates the ETag and ModTime on cache with ETag from backend
func (c *diskCache) updateMetadata(ctx context.Context, bucket, object, etag string, modTime time.Time, size int64) error {
cachedPath := getCacheSHADir(c.dir, bucket, object)
metaPath := pathJoin(cachedPath, cacheMetaJSONFile)
// Create cache directory if needed
if err := os.MkdirAll(cachedPath, 0777); err != nil {
return err
}
f, err := os.OpenFile(metaPath, os.O_RDWR, 0666)
if err != nil {
return err
}
defer f.Close()
m := &cacheMeta{
Version: cacheMetaVersion,
Bucket: bucket,
Object: object,
}
if err := jsonLoad(f, m); err != nil && err != io.EOF {
return err
}
if m.Meta == nil {
m.Meta = make(map[string]string)
}
var key []byte
var objectEncryptionKey crypto.ObjectKey
if globalCacheKMS != nil {
// Calculating object encryption key
key, err = decryptObjectInfo(key, bucket, object, m.Meta)
if err != nil {
return err
}
copy(objectEncryptionKey[:], key)
m.Meta["etag"] = hex.EncodeToString(objectEncryptionKey.SealETag([]byte(etag)))
} else {
m.Meta["etag"] = etag
}
m.Meta["last-modified"] = modTime.UTC().Format(http.TimeFormat)
m.Meta["Content-Length"] = strconv.Itoa(int(size))
return jsonSave(f, m)
}
func getCacheSHADir(dir, bucket, object string) string { func getCacheSHADir(dir, bucket, object string) string {
return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object)))) return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object))))
} }
@ -755,22 +799,36 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
metadata[SSECacheEncrypted] = "" metadata[SSECacheEncrypted] = ""
return objectKey[:], nil return objectKey[:], nil
} }
func (c *diskCache) GetLockContext(ctx context.Context, bucket, object string) (RWLocker, LockContext, error) {
cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachePath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
return cLock, lkctx, err
}
// Caches the object to disk // Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) (oi ObjectInfo, err error) { func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
if !c.diskSpaceAvailable(size) { if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data) io.Copy(ioutil.Discard, data)
return oi, errDiskFull return oi, errDiskFull
} }
cachePath := getCacheSHADir(c.dir, bucket, object) cLock, lkctx, err := c.GetLockContext(ctx, bucket, object)
cLock := c.NewNSLockFn(cachePath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil { if err != nil {
return oi, err return oi, err
} }
ctx = lkctx.Context() ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel) defer cLock.Unlock(lkctx.Cancel)
return c.put(ctx, bucket, object, data, size, rs, opts, incHitsOnly, writeback)
}
// Caches the object to disk
func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data)
return oi, errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
meta, _, numHits, err := c.statCache(ctx, cachePath) meta, _, numHits, err := c.statCache(ctx, cachePath)
// Case where object not yet cached // Case where object not yet cached
if osIsNotExist(err) && c.after >= 1 { if osIsNotExist(err) && c.after >= 1 {
@ -819,7 +877,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
removeAll(cachePath) removeAll(cachePath)
return oi, IncompleteBody{Bucket: bucket, Object: object} return oi, IncompleteBody{Bucket: bucket, Object: object}
} }
if c.commitWriteback { if writeback {
metadata["content-md5"] = md5sum metadata["content-md5"] = md5sum
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil { if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
metadata["etag"] = hex.EncodeToString(md5bytes) metadata["etag"] = hex.EncodeToString(md5bytes)
@ -1073,7 +1131,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
// the remaining parts. // the remaining parts.
partOffset = 0 partOffset = 0
} // End of read all parts loop. } // End of read all parts loop.
pr.CloseWithError(err) pw.CloseWithError(err)
}() }()
} else { } else {
go func() { go func() {
@ -1105,8 +1163,15 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
return gr, numHits, nil return gr, numHits, nil
} }
// deletes the cached object - caller should have taken write lock
func (c *diskCache) delete(bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
return removeAll(cacheObjPath)
}
// Deletes the cached object // Deletes the cached object
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cacheObjPath) cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout) lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil { if err != nil {
@ -1116,12 +1181,6 @@ func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error)
return removeAll(cacheObjPath) return removeAll(cacheObjPath)
} }
// Deletes the cached object
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
return c.delete(ctx, cacheObjPath)
}
// convenience function to check if object is cached on this diskCache // convenience function to check if object is cached on this diskCache
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool { func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil { if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil {
@ -1199,14 +1258,11 @@ func (c *diskCache) NewMultipartUpload(ctx context.Context, bucket, object, uID
} }
m.Meta = opts.UserDefined m.Meta = opts.UserDefined
m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
if c.commitWriteback {
m.Meta[writeBackStatusHeader] = CommitPending.String()
}
m.Stat.ModTime = UTCNow() m.Stat.ModTime = UTCNow()
if globalCacheKMS != nil { if globalCacheKMS != nil {
m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
if _, err := newCacheEncryptMetadata(bucket, object, m.Meta); err != nil { if _, err := newCacheEncryptMetadata(bucket, object, m.Meta); err != nil {
return uploadID, err return uploadID, err
} }

View File

@ -368,7 +368,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// use a new context to avoid locker prematurely timing out operation when the GetObjectNInfo returns. // use a new context to avoid locker prematurely timing out operation when the GetObjectNInfo returns.
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{ dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{
UserDefined: getMetadata(bReader.ObjInfo), UserDefined: getMetadata(bReader.ObjInfo),
}, false) }, false, false)
return return
} }
}() }()
@ -386,7 +386,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
io.LimitReader(pr, bkReader.ObjInfo.Size), io.LimitReader(pr, bkReader.ObjInfo.Size),
bkReader.ObjInfo.Size, rs, ObjectOptions{ bkReader.ObjInfo.Size, rs, ObjectOptions{
UserDefined: userDefined, UserDefined: userDefined,
}, false) }, false, false)
// close the read end of the pipe, so the error gets // close the read end of the pipe, so the error gets
// propagated to teeReader // propagated to teeReader
pr.CloseWithError(putErr) pr.CloseWithError(putErr)
@ -678,31 +678,82 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
return putObjectFn(ctx, bucket, object, r, opts) return putObjectFn(ctx, bucket, object, r, opts)
} }
if c.commitWriteback { if c.commitWriteback {
oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false) oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false, true)
if err != nil { if err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
} }
go c.uploadObject(GlobalContext, oi) go c.uploadObject(GlobalContext, oi)
return oi, nil return oi, nil
} }
objInfo, err = putObjectFn(ctx, bucket, object, r, opts) if !c.commitWritethrough {
objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
if err == nil { if err == nil {
go func() { go func() {
// fill cache in the background // fill cache in the background
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{}) bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
if bErr != nil { if bErr != nil {
return return
} }
defer bReader.Close() defer bReader.Close()
oi, _, err := dcache.Stat(GlobalContext, bucket, object) oi, _, err := dcache.Stat(GlobalContext, bucket, object)
// avoid cache overwrite if another background routine filled cache // avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag { if err != nil || oi.ETag != bReader.ObjInfo.ETag {
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false) dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, true)
} }
}() }()
}
return objInfo, err
} }
return objInfo, err cLock, lkctx, cerr := dcache.GetLockContext(GlobalContext, bucket, object)
if cerr != nil {
return putObjectFn(ctx, bucket, object, r, opts)
}
defer cLock.Unlock(lkctx.Cancel)
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize())
if err != nil {
return
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
infoCh := make(chan ObjectInfo)
errorCh := make(chan error)
go func() {
info, err := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader), opts)
if err != nil {
close(infoCh)
pipeReader.CloseWithError(err)
rPipe.CloseWithError(err)
errorCh <- err
return
}
close(errorCh)
infoCh <- info
}()
go func() {
_, err := dcache.put(lkctx.Context(), bucket, object, rPipe, r.Size(), nil, opts, false, false)
if err != nil {
rPipe.CloseWithError(err)
return
}
}()
mwriter := cacheMultiWriter(pipeWriter, wPipe)
_, err = io.Copy(mwriter, r)
pipeWriter.Close()
wPipe.Close()
if err != nil {
err = <-errorCh
return ObjectInfo{}, err
}
info := <-infoCh
if cerr = dcache.updateMetadata(lkctx.Context(), bucket, object, info.ETag, info.ModTime, info.Size); cerr != nil {
dcache.delete(bucket, object)
}
return info, err
} }
// upload cached object to backend in async commit mode. // upload cached object to backend in async commit mode.
@ -922,7 +973,7 @@ func (c *cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object st
dcache.Delete(ctx, bucket, object) dcache.Delete(ctx, bucket, object)
return newMultipartUploadFn(ctx, bucket, object, opts) return newMultipartUploadFn(ctx, bucket, object, opts)
} }
if !c.commitWritethrough { if !c.commitWritethrough && !c.commitWriteback {
return newMultipartUploadFn(ctx, bucket, object, opts) return newMultipartUploadFn(ctx, bucket, object, opts)
} }
@ -941,7 +992,7 @@ func (c *cacheObjects) PutObjectPart(ctx context.Context, bucket, object, upload
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts) return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
} }
if !c.commitWritethrough { if !c.commitWritethrough && !c.commitWriteback {
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts) return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
} }
if c.skipCache() { if c.skipCache() {
@ -1039,7 +1090,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts) return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
} }
if !c.commitWritethrough { if !c.commitWritethrough && !c.commitWriteback {
return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts) return copyObjectPartFn(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, startOffset, length, srcInfo, srcOpts, dstOpts)
} }
if err := dcache.uploadIDExists(dstBucket, dstObject, uploadID); err != nil { if err := dcache.uploadIDExists(dstBucket, dstObject, uploadID); err != nil {
@ -1077,7 +1128,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
// finalizes the upload saved in cache multipart dir. // finalizes the upload saved in cache multipart dir.
func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
completeMultipartUploadFn := c.InnerCompleteMultipartUploadFn completeMultipartUploadFn := c.InnerCompleteMultipartUploadFn
if !c.commitWritethrough { if !c.commitWritethrough && !c.commitWriteback {
return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts) return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts)
} }
dcache, err := c.getCacheToLoc(ctx, bucket, object) dcache, err := c.getCacheToLoc(ctx, bucket, object)
@ -1102,7 +1153,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
oi, _, err := dcache.Stat(GlobalContext, bucket, object) oi, _, err := dcache.Stat(GlobalContext, bucket, object)
// avoid cache overwrite if another background routine filled cache // avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag { if err != nil || oi.ETag != bReader.ObjInfo.ETag {
dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false) dcache.Put(GlobalContext, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, false, false)
} }
} }
}() }()
@ -1113,7 +1164,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
// AbortMultipartUpload - aborts multipart upload on backend and cache. // AbortMultipartUpload - aborts multipart upload on backend and cache.
func (c *cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { func (c *cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
abortMultipartUploadFn := c.InnerAbortMultipartUploadFn abortMultipartUploadFn := c.InnerAbortMultipartUploadFn
if !c.commitWritethrough { if !c.commitWritethrough && !c.commitWriteback {
return abortMultipartUploadFn(ctx, bucket, object, uploadID, opts) return abortMultipartUploadFn(ctx, bucket, object, uploadID, opts)
} }
dcache, err := c.getCacheToLoc(ctx, bucket, object) dcache, err := c.getCacheToLoc(ctx, bucket, object)

View File

@ -93,13 +93,13 @@ master key to automatically encrypt all cached content.
Note that cache KMS master key is not recommended for use in production deployments. If the MinIO server/gateway machine is ever compromised, the cache KMS master key must also be treated as compromised. Note that cache KMS master key is not recommended for use in production deployments. If the MinIO server/gateway machine is ever compromised, the cache KMS master key must also be treated as compromised.
Support for external KMS to manage cache KMS keys is on the roadmap,and would be ideal for production use cases. Support for external KMS to manage cache KMS keys is on the roadmap,and would be ideal for production use cases.
- `MINIO_CACHE_COMMIT` setting of `writethrough` allows caching of multipart uploads synchronously if enabled. By default, single PUT operations are already cached on write without any special setting. - `MINIO_CACHE_COMMIT` setting of `writethrough` allows caching of single and multipart uploads synchronously if enabled. By default, however single PUT operations are cached asynchronously on write without any special setting.
- Partially cached stale uploads older than 24 hours are automatically cleaned up. - Partially cached stale uploads older than 24 hours are automatically cleaned up.
- Expiration happens automatically based on the configured interval as explained above, frequently accessed objects stay alive in cache for a significantly longer time. - Expiration happens automatically based on the configured interval as explained above, frequently accessed objects stay alive in cache for a significantly longer time.
> NOTE: `MINIO_CACHE_COMMIT` also has a value of `writeback` which allows staging single uploads in cache before committing to remote. However, for consistency reasons, `writeback` staging uploads in the cache are not permitted for multipart uploads. > NOTE: `MINIO_CACHE_COMMIT` also has a value of `writeback` which allows staging single uploads in cache before committing to remote. It is not possible to stage multipart uploads in the cache for consistency reasons - hence, multipart uploads will be cached synchronously even if `writeback` is set.
### Crash Recovery ### Crash Recovery