Cache only the range requested for range GETs (#8599)

This commit is contained in:
poornas 2019-12-08 13:58:04 -08:00 committed by Harshavardhana
parent 8390bc26db
commit 3c30e4503d
5 changed files with 261 additions and 68 deletions

View File

@ -21,14 +21,12 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"reflect"
"sync"
"time"
@ -68,6 +66,20 @@ type cacheMeta struct {
Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"`
// Metadata map for current object.
Meta map[string]string `json:"meta,omitempty"`
// Ranges maps cached range to associated filename.
Ranges map[string]string `json:"ranges,omitempty"`
}
// RangeInfo has the range, file and range length information for a cached range.
type RangeInfo struct {
Range string
File string
Size int64
}
// Empty returns true if this is an empty struct
func (r *RangeInfo) Empty() bool {
return r.Range == "" && r.File == "" && r.Size == 0
}
func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
@ -190,6 +202,22 @@ func (c *diskCache) diskAvailable(size int64) bool {
// Purge cache entries that were not accessed.
func (c *diskCache) purge() {
// this function returns FileInfo for cached range files and cache data file.
fiStatFn := func(ranges map[string]string, dataFile, pathPrefix string) map[string]os.FileInfo {
fm := make(map[string]os.FileInfo)
fname := pathJoin(pathPrefix, cacheDataFile)
if fi, err := os.Stat(fname); err == nil {
fm[fname] = fi
}
for _, rngFile := range ranges {
fname = pathJoin(pathPrefix, rngFile)
if fi, err := os.Stat(fname); err == nil {
fm[fname] = fi
}
}
return fm
}
ctx := context.Background()
for {
olderThan := c.expiry
@ -211,24 +239,21 @@ func (c *diskCache) purge() {
if obj.Name() == minioMetaBucket {
continue
}
// stat entry to get atime
var fi os.FileInfo
fi, err := os.Stat(pathJoin(c.dir, obj.Name(), cacheDataFile))
if err != nil {
continue
}
objInfo, err := c.statCache(pathJoin(c.dir, obj.Name()))
meta, _, err := c.statCachedMeta(pathJoin(c.dir, obj.Name()))
if err != nil {
// delete any partially filled cache entry left behind.
removeAll(pathJoin(c.dir, obj.Name()))
continue
}
// stat all cached file ranges and cacheDataFile.
fis := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, obj.Name()))
objInfo := meta.ToObjectInfo("", "")
cc := cacheControlOpts(objInfo)
for fname, fi := range fis {
if atime.Get(fi).Before(expiry) ||
cc.isStale(objInfo.ModTime) {
if err = removeAll(pathJoin(c.dir, obj.Name())); err != nil {
if err = removeAll(fname); err != nil {
logger.LogIf(ctx, err)
}
deletedCount++
@ -238,6 +263,7 @@ func (c *diskCache) purge() {
}
}
}
}
if deletedCount == 0 {
break
}
@ -283,49 +309,118 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
return
}
// statCachedMeta returns metadata from cache - including ranges cached, partial to indicate
// if partial object is cached.
func (c *diskCache) statCachedMeta(cacheObjPath string) (meta *cacheMeta, partial bool, err error) {
// Stat the file to get file size.
metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile)
f, err := os.Open(metaPath)
if err != nil {
return meta, partial, err
}
defer f.Close()
meta = &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, meta); err != nil {
return meta, partial, err
}
// get metadata of part.1 if full file has been cached.
partial = true
fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile))
if err == nil {
meta.Stat.ModTime = atime.Get(fi)
partial = false
}
return meta, partial, nil
}
// statRange returns ObjectInfo and RangeInfo from disk cache
func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, rngInfo RangeInfo, err error) {
// Stat the file to get file size.
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
if rs == nil {
oi, err = c.statCache(cacheObjPath)
return oi, rngInfo, err
}
meta, _, err := c.statCachedMeta(cacheObjPath)
if err != nil {
return oi, rngInfo, err
}
actualSize := uint64(meta.Stat.Size)
_, length, err := rs.GetOffsetLength(int64(actualSize))
if err != nil {
return oi, rngInfo, err
}
actualRngSize := uint64(length)
if globalCacheKMS != nil {
actualRngSize, _ = sio.EncryptedSize(uint64(length))
}
rng := rs.String(int64(actualSize))
rngFile, ok := meta.Ranges[rng]
if !ok {
return oi, rngInfo, ObjectNotFound{Bucket: bucket, Object: object}
}
rngInfo = RangeInfo{Range: rng, File: rngFile, Size: int64(actualRngSize)}
oi = meta.ToObjectInfo("", "")
oi.Bucket = bucket
oi.Name = object
if err = decryptCacheObjectETag(&oi); err != nil {
return oi, rngInfo, err
}
return
}
// statCache is a convenience function for purge() to get ObjectInfo for cached object
func (c *diskCache) statCache(cacheObjPath string) (oi ObjectInfo, e error) {
// Stat the file to get file size.
metaPath := path.Join(cacheObjPath, cacheMetaJSONFile)
f, err := os.Open(metaPath)
meta, partial, err := c.statCachedMeta(cacheObjPath)
if err != nil {
return oi, err
}
defer f.Close()
meta := &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, meta); err != nil {
return oi, err
if partial {
return oi, errFileNotFound
}
fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile))
if err != nil {
return oi, err
}
meta.Stat.ModTime = atime.Get(fi)
return meta.ToObjectInfo("", ""), nil
}
// saves object metadata to disk cache
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64) error {
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string) error {
fileName := getCacheSHADir(c.dir, bucket, object)
metaPath := pathJoin(fileName, cacheMetaJSONFile)
f, err := os.Create(metaPath)
f, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
defer f.Close()
m := cacheMeta{Meta: meta, Version: cacheMetaVersion}
m.Stat.Size = actualSize
m.Stat.ModTime = UTCNow()
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
jsonData, err := json.Marshal(m)
if err != nil {
m := &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, m); err != nil && err != io.EOF {
return err
}
_, err = f.Write(jsonData)
return err
if rs != nil {
if m.Ranges == nil {
m.Ranges = make(map[string]string)
}
m.Ranges[rs.String(actualSize)] = rsFileName
} else {
// this is necessary cleanup of range files if entire object is cached.
for _, f := range m.Ranges {
removeAll(pathJoin(fileName, f))
}
m.Ranges = nil
}
m.Stat.Size = actualSize
m.Stat.ModTime = UTCNow()
m.Meta = meta
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
return jsonSave(f, m)
}
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
@ -351,21 +446,21 @@ func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object
bkObjectInfo.ETag != cacheObjInfo.ETag ||
bkObjectInfo.ContentType != cacheObjInfo.ContentType ||
bkObjectInfo.Expires != cacheObjInfo.Expires {
return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size)
return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "")
}
return nil
}
func getCacheSHADir(dir, bucket, object string) string {
return path.Join(dir, getSHA256Hash([]byte(path.Join(bucket, object))))
return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object))))
}
// Cache data to disk with bitrot checksum added for each block of 1MB
func (c *diskCache) bitrotWriteToCache(cachePath string, reader io.Reader, size uint64) (int64, error) {
func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, error) {
if err := os.MkdirAll(cachePath, 0777); err != nil {
return 0, err
}
filePath := path.Join(cachePath, cacheDataFile)
filePath := pathJoin(cachePath, fileName)
if filePath == "" || reader == nil {
return 0, errInvalidArgument
@ -434,7 +529,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
if globalCacheKMS == nil {
return nil, errKMSNotConfigured
}
key, encKey, err := globalCacheKMS.GenerateKey(globalCacheKMS.KeyID(), crypto.Context{bucket: path.Join(bucket, object)})
key, encKey, err := globalCacheKMS.GenerateKey(globalCacheKMS.KeyID(), crypto.Context{bucket: pathJoin(bucket, object)})
if err != nil {
return nil, err
}
@ -451,7 +546,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
}
// Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error {
if c.diskUsageHigh() {
select {
case c.purgeChan <- struct{}{}:
@ -459,6 +554,9 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
}
return errDiskFull
}
if rs != nil {
return c.putRange(ctx, bucket, object, data, size, rs, opts)
}
if !c.diskAvailable(size) {
return errDiskFull
}
@ -480,7 +578,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
}
actualSize, _ = sio.EncryptedSize(uint64(size))
}
n, err := c.bitrotWriteToCache(cachePath, reader, actualSize)
n, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize)
if IsErr(err, baseErrs...) {
c.setOnline(false)
}
@ -492,7 +590,53 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
removeAll(cachePath)
return IncompleteBody{}
}
return c.saveMetadata(ctx, bucket, object, metadata, n)
return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "")
}
// Caches the range to disk
func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error {
rlen, err := rs.GetLength(size)
if err != nil {
return err
}
if !c.diskAvailable(rlen) {
return errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
if err := os.MkdirAll(cachePath, 0777); err != nil {
return err
}
var metadata = make(map[string]string)
for k, v := range opts.UserDefined {
metadata[k] = v
}
var reader = data
var actualSize = uint64(rlen)
// objSize is the actual size of object (with encryption overhead if any)
var objSize = uint64(size)
if globalCacheKMS != nil {
reader, err = newCacheEncryptReader(data, bucket, object, metadata)
if err != nil {
return err
}
actualSize, _ = sio.EncryptedSize(uint64(rlen))
objSize, _ = sio.EncryptedSize(uint64(size))
}
cacheFile := MustGetUUID()
n, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize)
if IsErr(err, baseErrs...) {
c.setOnline(false)
}
if err != nil {
removeAll(cachePath)
return err
}
if actualSize != uint64(n) {
removeAll(cachePath)
return IncompleteBody{}
}
return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile)
}
// checks streaming bitrot checksum of cached object before returning data
@ -595,11 +739,20 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
var objInfo ObjectInfo
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
var rngInfo RangeInfo
if objInfo, err = c.Stat(ctx, bucket, object); err != nil {
if objInfo, rngInfo, err = c.statRange(ctx, bucket, object, rs); err != nil {
return nil, toObjectErr(err, bucket, object)
}
cacheFile := cacheDataFile
objSize := objInfo.Size
if !rngInfo.Empty() {
// for cached ranges, need to pass actual range file size to GetObjectReader
// and clear out range spec
cacheFile = rngInfo.File
objInfo.Size = rngInfo.Size
rs = nil
}
var nsUnlocker = func() {}
// For a directory, we need to send an reader that returns no bytes.
if HasSuffix(object, SlashSeparator) {
@ -612,8 +765,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
if nErr != nil {
return nil, nErr
}
filePath := path.Join(cacheObjPath, cacheDataFile)
filePath := pathJoin(cacheObjPath, cacheFile)
pr, pw := io.Pipe()
go func() {
err := c.bitrotReadFromCache(ctx, filePath, off, length, pw)
@ -625,7 +777,20 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
pipeCloser := func() { pr.Close() }
return fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser)
gr, gerr := fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser)
if gerr != nil {
return gr, gerr
}
if globalCacheKMS != nil {
// clean up internal SSE cache metadata
delete(gr.ObjInfo.UserDefined, crypto.SSEHeader)
}
if !rngInfo.Empty() {
// overlay Size with actual object size and not the range size
gr.ObjInfo.Size = objSize
}
return gr, nil
}

View File

@ -96,13 +96,13 @@ func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, ob
return dcache.Delete(ctx, bucket, object)
}
func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error {
cLock := c.NewNSLockFn(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer cLock.Unlock()
return dcache.Put(ctx, bucket, object, data, size, opts)
return dcache.Put(ctx, bucket, object, data, size, rs, opts)
}
func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
@ -125,6 +125,17 @@ func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, obje
return dcache.Stat(ctx, bucket, object)
}
func (c *cacheObjects) statRange(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, err error) {
cLock := c.NewNSLockFn(ctx, bucket, object)
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
defer cLock.RUnlock()
oi, _, err = dcache.statRange(ctx, bucket, object, rs)
return oi, err
}
// DeleteObject clears cache entry if backend delete operation succeeds
func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
if err = c.DeleteObjectFn(ctx, bucket, object); err != nil {
@ -200,7 +211,14 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
if (!cc.isEmpty() && !cc.isStale(cacheReader.ObjInfo.ModTime)) ||
cc.onlyIfCached {
// This is a cache hit, mark it so
c.incCacheStats(cacheObjSize)
bytesServed := cacheReader.ObjInfo.Size
if rs != nil {
if _, len, err := rs.GetOffsetLength(bytesServed); err == nil {
bytesServed = len
}
}
c.cacheStats.incHit()
c.cacheStats.incBytesServed(bytesServed)
return cacheReader, nil
}
if cc.noStore {
@ -261,20 +279,19 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
if rs != nil {
go func() {
// fill cache in the background for range GET requests
bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, h, lockType, opts)
bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
if bErr != nil {
return
}
defer bReader.Close()
oi, err := c.stat(ctx, dcache, bucket, object)
oi, err := c.statRange(ctx, dcache, bucket, object, rs)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
}
}()
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
if bkErr != nil {
return nil, bkErr
@ -283,7 +300,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
pipeReader, pipeWriter := io.Pipe()
teeReader := io.TeeReader(bkReader, pipeWriter)
go func() {
putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
// close the write end of the pipe, so the error gets
// propagated to getObjReader
pipeWriter.CloseWithError(putErr)
@ -597,7 +614,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
oi, err := c.stat(ctx, dcache, bucket, object)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
}
}()
}

View File

@ -212,7 +212,7 @@ func TestDiskCache(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta})
if err != nil {
t.Fatal(err)
}
@ -290,12 +290,12 @@ func TestDiskCacheMaxUse(t *testing.T) {
t.Fatal(err)
}
if !cache.diskAvailable(int64(size)) {
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta})
if err != errDiskFull {
t.Fatal("Cache max-use limit violated.")
}
} else {
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta})
if err != nil {
t.Fatal(err)
}

View File

@ -383,8 +383,7 @@ func migrateCacheData(ctx context.Context, c *diskCache, bucket, object, oldfile
}
actualSize, _ = sio.EncryptedSize(uint64(st.Size()))
}
_, err = c.bitrotWriteToCache(destDir, reader, uint64(actualSize))
_, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, uint64(actualSize))
return err
}

View File

@ -161,3 +161,15 @@ func parseRequestRangeSpec(rangeString string) (hrange *HTTPRangeSpec, err error
return nil, fmt.Errorf("'%s' does not have valid range value", rangeString)
}
}
// String returns stringified representation of range for a particular resource size.
func (h *HTTPRangeSpec) String(resourceSize int64) string {
if h == nil {
return ""
}
off, length, err := h.GetOffsetLength(resourceSize)
if err != nil {
return ""
}
return fmt.Sprintf("%d-%d", off, off+length-1)
}