minio/cmd/disk-cache-backend.go
Harshavardhana 64f6020854
fix: cleanup locking, cancel context upon lock timeout (#12183)
upon errors to acquire lock context would still leak,
since the cancel would never be called. since the lock
is never acquired - proactively clear it before returning.
2021-04-29 20:55:21 -07:00

1038 lines
30 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/djherbis/atime"
"github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/fips"
"github.com/minio/minio/pkg/kms"
"github.com/minio/sio"
)
const (
// cache.json object metadata for cached objects.
cacheMetaJSONFile = "cache.json"
cacheDataFile = "part.1"
cacheMetaVersion = "1.0.0"
cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days
// SSECacheEncrypted is the metadata key indicating that the object
// is a cache entry encrypted with cache KMS master key in globalCacheKMS.
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
)
// CacheChecksumInfoV1 - carries checksums of individual blocks on disk.
type CacheChecksumInfoV1 struct {
Algorithm string `json:"algorithm"`
Blocksize int64 `json:"blocksize"`
}
// Represents the cache metadata struct
type cacheMeta struct {
Version string `json:"version"`
Stat StatInfo `json:"stat"` // Stat of the current object `cache.json`.
// checksums of blocks on disk.
Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"`
// Metadata map for current object.
Meta map[string]string `json:"meta,omitempty"`
// Ranges maps cached range to associated filename.
Ranges map[string]string `json:"ranges,omitempty"`
// Hits is a counter on the number of times this object has been accessed so far.
Hits int `json:"hits,omitempty"`
Bucket string `json:"bucket,omitempty"`
Object string `json:"object,omitempty"`
}
// RangeInfo has the range, file and range length information for a cached range.
type RangeInfo struct {
Range string
File string
Size int64
}
// Empty returns true if this is an empty struct
func (r *RangeInfo) Empty() bool {
return r.Range == "" && r.File == "" && r.Size == 0
}
func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
if len(m.Meta) == 0 {
m.Meta = make(map[string]string)
m.Stat.ModTime = timeSentinel
}
o = ObjectInfo{
Bucket: bucket,
Name: object,
CacheStatus: CacheHit,
CacheLookupStatus: CacheHit,
}
// We set file info only if its valid.
o.Size = m.Stat.Size
o.ETag = extractETag(m.Meta)
o.ContentType = m.Meta["content-type"]
o.ContentEncoding = m.Meta["content-encoding"]
if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok {
o.StorageClass = storageClass
} else {
o.StorageClass = globalMinioDefaultStorageClass
}
var (
t time.Time
e error
)
if exp, ok := m.Meta["expires"]; ok {
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
o.Expires = t.UTC()
}
}
if mtime, ok := m.Meta["last-modified"]; ok {
if t, e = time.Parse(http.TimeFormat, mtime); e == nil {
o.ModTime = t.UTC()
}
}
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of user-defined metadata
o.UserDefined = cleanMetadata(m.Meta)
return o
}
// represents disk cache struct
type diskCache struct {
// is set to 0 if drive is offline
online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
purgeRunning int32
triggerGC chan struct{}
dir string // caching directory
stats CacheDiskStats // disk cache stats for prometheus
quotaPct int // max usage in %
pool sync.Pool
after int // minimum accesses before an object is cached.
lowWatermark int
highWatermark int
enableRange bool
commitWriteback bool
retryWritebackCh chan ObjectInfo
// nsMutex namespace lock
nsMutex *nsLockMap
// Object functions pointing to the corresponding functions of backend implementation.
NewNSLockFn func(cachePath string) RWLocker
}
// Inits the disk cache dir if it is not initialized already.
func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCache, error) {
quotaPct := config.MaxUse
if quotaPct == 0 {
quotaPct = config.Quota
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err)
}
cache := diskCache{
dir: dir,
triggerGC: make(chan struct{}, 1),
stats: CacheDiskStats{Dir: dir},
quotaPct: quotaPct,
after: config.After,
lowWatermark: config.WatermarkLow,
highWatermark: config.WatermarkHigh,
enableRange: config.Range,
commitWriteback: config.CommitWriteback,
retryWritebackCh: make(chan ObjectInfo, 10000),
online: 1,
pool: sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(int(cacheBlkSize))
return &b
},
},
nsMutex: newNSLock(false),
}
go cache.purgeWait(ctx)
if cache.commitWriteback {
go cache.scanCacheWritebackFailures(ctx)
}
cache.diskSpaceAvailable(0) // update if cache usage is already high.
cache.NewNSLockFn = func(cachePath string) RWLocker {
return cache.nsMutex.NewNSLock(nil, cachePath, "")
}
return &cache, nil
}
// diskUsageLow() returns true if disk usage falls below the low watermark w.r.t configured cache quota.
// Ex. for a 100GB disk, if quota is configured as 70% and watermark_low = 80% and
// watermark_high = 90% then garbage collection starts when 63% of disk is used and
// stops when disk usage drops to 56%
func (c *diskCache) diskUsageLow() bool {
gcStopPct := c.quotaPct * c.lowWatermark / 100
di, err := disk.GetInfo(c.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
return false
}
usedPercent := float64(di.Used) * 100 / float64(di.Total)
low := int(usedPercent) < gcStopPct
atomic.StoreUint64(&c.stats.UsagePercent, uint64(usedPercent))
if low {
atomic.StoreInt32(&c.stats.UsageState, 0)
}
return low
}
// Returns if the disk usage reaches or exceeds configured cache quota when size is added.
// If current usage without size exceeds high watermark a GC is automatically queued.
func (c *diskCache) diskSpaceAvailable(size int64) bool {
gcTriggerPct := c.quotaPct * c.highWatermark / 100
di, err := disk.GetInfo(c.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
return false
}
if di.Total == 0 {
logger.Info("diskCache: Received 0 total disk size")
return false
}
usedPercent := float64(di.Used) * 100 / float64(di.Total)
if usedPercent >= float64(gcTriggerPct) {
atomic.StoreInt32(&c.stats.UsageState, 1)
c.queueGC()
}
atomic.StoreUint64(&c.stats.UsagePercent, uint64(usedPercent))
// Recalculate percentage with provided size added.
usedPercent = float64(di.Used+uint64(size)) * 100 / float64(di.Total)
return usedPercent < float64(c.quotaPct)
}
// queueGC will queue a GC.
// Calling this function is always non-blocking.
func (c *diskCache) queueGC() {
select {
case c.triggerGC <- struct{}{}:
default:
}
}
// toClear returns how many bytes should be cleared to reach the low watermark quota.
// returns 0 if below quota.
func (c *diskCache) toClear() uint64 {
di, err := disk.GetInfo(c.dir)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
return 0
}
return bytesToClear(int64(di.Total), int64(di.Free), uint64(c.quotaPct), uint64(c.lowWatermark), uint64(c.highWatermark))
}
func (c *diskCache) purgeWait(ctx context.Context) {
for {
select {
case <-ctx.Done():
case <-c.triggerGC: // wait here until someone triggers.
c.purge(ctx)
}
}
}
// Purge cache entries that were not accessed.
func (c *diskCache) purge(ctx context.Context) {
if atomic.LoadInt32(&c.purgeRunning) == 1 || c.diskUsageLow() {
return
}
toFree := c.toClear()
if toFree == 0 {
return
}
atomic.StoreInt32(&c.purgeRunning, 1) // do not run concurrent purge()
defer atomic.StoreInt32(&c.purgeRunning, 0)
// expiry for cleaning up old cache.json files that
// need to be cleaned up.
expiry := UTCNow().Add(-cacheExpiryDays)
// defaulting max hits count to 100
// ignore error we know what value we are passing.
scorer, _ := newFileScorer(toFree, time.Now().Unix(), 100)
// this function returns FileInfo for cached range files and cache data file.
fiStatFn := func(ranges map[string]string, dataFile, pathPrefix string) map[string]os.FileInfo {
fm := make(map[string]os.FileInfo)
fname := pathJoin(pathPrefix, dataFile)
if fi, err := os.Stat(fname); err == nil {
fm[fname] = fi
}
for _, rngFile := range ranges {
fname = pathJoin(pathPrefix, rngFile)
if fi, err := os.Stat(fname); err == nil {
fm[fname] = fi
}
}
return fm
}
filterFn := func(name string, typ os.FileMode) error {
if name == minioMetaBucket {
// Proceed to next file.
return nil
}
cacheDir := pathJoin(c.dir, name)
meta, _, numHits, err := c.statCachedMeta(ctx, cacheDir)
if err != nil {
// delete any partially filled cache entry left behind.
removeAll(cacheDir)
// Proceed to next file.
return nil
}
// stat all cached file ranges and cacheDataFile.
cachedFiles := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, name))
objInfo := meta.ToObjectInfo("", "")
// prevent gc from clearing un-synced commits. This metadata is present when
// cache writeback commit setting is enabled.
status, ok := objInfo.UserDefined[writeBackStatusHeader]
if ok && status != CommitComplete.String() {
return nil
}
cc := cacheControlOpts(objInfo)
for fname, fi := range cachedFiles {
if cc != nil {
if cc.isStale(objInfo.ModTime) {
if err = removeAll(fname); err != nil {
logger.LogIf(ctx, err)
}
scorer.adjustSaveBytes(-fi.Size())
// break early if sufficient disk space reclaimed.
if c.diskUsageLow() {
// if we found disk usage is already low, we return nil filtering is complete.
return errDoneForNow
}
}
continue
}
scorer.addFile(fname, atime.Get(fi), fi.Size(), numHits)
}
// clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json
fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile))
if err != nil || (fi.ModTime().Before(expiry) && len(cachedFiles) == 0) {
removeAll(cacheDir)
scorer.adjustSaveBytes(-fi.Size())
// Proceed to next file.
return nil
}
// if we found disk usage is already low, we return nil filtering is complete.
if c.diskUsageLow() {
return errDoneForNow
}
// Proceed to next file.
return nil
}
if err := readDirFn(c.dir, filterFn); err != nil {
logger.LogIf(ctx, err)
return
}
scorer.purgeFunc(func(qfile queuedFile) {
fileName := qfile.name
removeAll(fileName)
slashIdx := strings.LastIndex(fileName, SlashSeparator)
if slashIdx >= 0 {
fileNamePrefix := fileName[0:slashIdx]
fname := fileName[slashIdx+1:]
if fname == cacheDataFile {
removeAll(fileNamePrefix)
}
}
})
scorer.reset()
}
// sets cache drive status
func (c *diskCache) setOffline() {
atomic.StoreUint32(&c.online, 0)
}
// returns true if cache drive is online
func (c *diskCache) IsOnline() bool {
return atomic.LoadUint32(&c.online) != 0
}
// Stat returns ObjectInfo from disk cache
func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, numHits int, err error) {
var partial bool
var meta *cacheMeta
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
// Stat the file to get file size.
meta, partial, numHits, err = c.statCachedMeta(ctx, cacheObjPath)
if err != nil {
return
}
if partial {
return oi, numHits, errFileNotFound
}
oi = meta.ToObjectInfo("", "")
oi.Bucket = bucket
oi.Name = object
if err = decryptCacheObjectETag(&oi); err != nil {
return
}
return
}
// statCachedMeta returns metadata from cache - including ranges cached, partial to indicate
// if partial object is cached.
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return
}
ctx = lkctx.Context()
defer cLock.RUnlock(lkctx.Cancel)
return c.statCache(ctx, cacheObjPath)
}
// statRange returns ObjectInfo and RangeInfo from disk cache
func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, rngInfo RangeInfo, numHits int, err error) {
// Stat the file to get file size.
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
var meta *cacheMeta
var partial bool
meta, partial, numHits, err = c.statCachedMeta(ctx, cacheObjPath)
if err != nil {
return
}
oi = meta.ToObjectInfo("", "")
oi.Bucket = bucket
oi.Name = object
if !partial {
err = decryptCacheObjectETag(&oi)
return
}
actualSize := uint64(meta.Stat.Size)
var length int64
_, length, err = rs.GetOffsetLength(int64(actualSize))
if err != nil {
return
}
actualRngSize := uint64(length)
if globalCacheKMS != nil {
actualRngSize, _ = sio.EncryptedSize(uint64(length))
}
rng := rs.String(int64(actualSize))
rngFile, ok := meta.Ranges[rng]
if !ok {
return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object}
}
if _, err = os.Stat(pathJoin(cacheObjPath, rngFile)); err != nil {
return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object}
}
rngInfo = RangeInfo{Range: rng, File: rngFile, Size: int64(actualRngSize)}
err = decryptCacheObjectETag(&oi)
return
}
// statCache is a convenience function for purge() to get ObjectInfo for cached object
func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
// Stat the file to get file size.
metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile)
f, err := os.Open(metaPath)
if err != nil {
return meta, partial, 0, err
}
defer f.Close()
meta = &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, meta); err != nil {
return meta, partial, 0, err
}
// get metadata of part.1 if full file has been cached.
partial = true
if _, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)); err == nil {
partial = false
}
return meta, partial, meta.Hits, nil
}
// saves object metadata to disk cache
// incHitsOnly is true if metadata update is incrementing only the hit counter
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
cachedPath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachedPath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly)
}
// saves object metadata to disk cache
// incHitsOnly is true if metadata update is incrementing only the hit counter
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
cachedPath := getCacheSHADir(c.dir, bucket, object)
metaPath := pathJoin(cachedPath, cacheMetaJSONFile)
// Create cache directory if needed
if err := os.MkdirAll(cachedPath, 0777); err != nil {
return err
}
f, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return err
}
defer f.Close()
m := &cacheMeta{
Version: cacheMetaVersion,
Bucket: bucket,
Object: object,
}
if err := jsonLoad(f, m); err != nil && err != io.EOF {
return err
}
// increment hits
if rs != nil {
// rsFileName gets set by putRange. Check for blank values here
// coming from other code paths that set rs only (eg initial creation or hit increment).
if rsFileName != "" {
if m.Ranges == nil {
m.Ranges = make(map[string]string)
}
m.Ranges[rs.String(actualSize)] = rsFileName
}
}
if rs == nil && !incHitsOnly {
// this is necessary cleanup of range files if entire object is cached.
if _, err := os.Stat(pathJoin(cachedPath, cacheDataFile)); err == nil {
for _, f := range m.Ranges {
removeAll(pathJoin(cachedPath, f))
}
m.Ranges = nil
}
}
m.Stat.Size = actualSize
if !incHitsOnly {
// reset meta
m.Meta = meta
} else {
if m.Meta == nil {
m.Meta = make(map[string]string)
}
if etag, ok := meta["etag"]; ok {
m.Meta["etag"] = etag
}
}
m.Hits++
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
return jsonSave(f, m)
}
func getCacheSHADir(dir, bucket, object string) string {
return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object))))
}
// Cache data to disk with bitrot checksum added for each block of 1MB
func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) {
if err := os.MkdirAll(cachePath, 0777); err != nil {
return 0, "", err
}
filePath := pathJoin(cachePath, fileName)
if filePath == "" || reader == nil {
return 0, "", errInvalidArgument
}
if err := checkPathLength(filePath); err != nil {
return 0, "", err
}
f, err := os.Create(filePath)
if err != nil {
return 0, "", osErrToFileErr(err)
}
defer f.Close()
var bytesWritten int64
h := HighwayHash256S.New()
bufp := c.pool.Get().(*[]byte)
defer c.pool.Put(bufp)
md5Hash := md5.New()
var n, n2 int
for {
n, err = io.ReadFull(reader, *bufp)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return 0, "", err
}
eof := err == io.EOF || err == io.ErrUnexpectedEOF
if n == 0 && size != 0 {
// Reached EOF, nothing more to be done.
break
}
h.Reset()
if _, err = h.Write((*bufp)[:n]); err != nil {
return 0, "", err
}
hashBytes := h.Sum(nil)
// compute md5Hash of original data stream if writeback commit to cache
if c.commitWriteback {
if _, err = md5Hash.Write((*bufp)[:n]); err != nil {
return 0, "", err
}
}
if _, err = f.Write(hashBytes); err != nil {
return 0, "", err
}
if n2, err = f.Write((*bufp)[:n]); err != nil {
return 0, "", err
}
bytesWritten += int64(n2)
if eof {
break
}
}
return bytesWritten, base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)), nil
}
func newCacheEncryptReader(content io.Reader, bucket, object string, metadata map[string]string) (r io.Reader, err error) {
objectEncryptionKey, err := newCacheEncryptMetadata(bucket, object, metadata)
if err != nil {
return nil, err
}
reader, err := sio.EncryptReader(content, sio.Config{Key: objectEncryptionKey[:], MinVersion: sio.Version20, CipherSuites: fips.CipherSuitesDARE()})
if err != nil {
return nil, crypto.ErrInvalidCustomerKey
}
return reader, nil
}
func newCacheEncryptMetadata(bucket, object string, metadata map[string]string) ([]byte, error) {
var sealedKey crypto.SealedKey
if globalCacheKMS == nil {
return nil, errKMSNotConfigured
}
key, err := globalCacheKMS.GenerateKey("", kms.Context{bucket: pathJoin(bucket, object)})
if err != nil {
return nil, err
}
objectKey := crypto.GenerateKey(key.Plaintext, rand.Reader)
sealedKey = objectKey.Seal(key.Plaintext, crypto.GenerateIV(rand.Reader), crypto.S3.String(), bucket, object)
crypto.S3.CreateMetadata(metadata, key.KeyID, key.Ciphertext, sealedKey)
if etag, ok := metadata["etag"]; ok {
metadata["etag"] = hex.EncodeToString(objectKey.SealETag([]byte(etag)))
}
metadata[SSECacheEncrypted] = ""
return objectKey[:], nil
}
// Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) (oi ObjectInfo, err error) {
if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data)
return oi, errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachePath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
meta, _, numHits, err := c.statCache(ctx, cachePath)
// Case where object not yet cached
if osIsNotExist(err) && c.after >= 1 {
return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false)
}
// Case where object already has a cache metadata entry but not yet cached
if err == nil && numHits < c.after {
cETag := extractETag(meta.Meta)
bETag := extractETag(opts.UserDefined)
if cETag == bETag {
return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false)
}
incHitsOnly = true
}
if rs != nil {
return oi, c.putRange(ctx, bucket, object, data, size, rs, opts)
}
if !c.diskSpaceAvailable(size) {
return oi, errDiskFull
}
if err := os.MkdirAll(cachePath, 0777); err != nil {
return oi, err
}
var metadata = cloneMSS(opts.UserDefined)
var reader = data
var actualSize = uint64(size)
if globalCacheKMS != nil {
reader, err = newCacheEncryptReader(data, bucket, object, metadata)
if err != nil {
return oi, err
}
actualSize, _ = sio.EncryptedSize(uint64(size))
}
n, md5sum, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize)
if IsErr(err, baseErrs...) {
// take the cache drive offline
c.setOffline()
}
if err != nil {
removeAll(cachePath)
return oi, err
}
if actualSize != uint64(n) {
removeAll(cachePath)
return oi, IncompleteBody{Bucket: bucket, Object: object}
}
if c.commitWriteback {
metadata["content-md5"] = md5sum
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
metadata["etag"] = hex.EncodeToString(md5bytes)
}
metadata[writeBackStatusHeader] = CommitPending.String()
}
return ObjectInfo{
Bucket: bucket,
Name: object,
ETag: metadata["etag"],
Size: n,
UserDefined: metadata,
},
c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly)
}
// Caches the range to disk
func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error {
rlen, err := rs.GetLength(size)
if err != nil {
return err
}
if !c.diskSpaceAvailable(rlen) {
return errDiskFull
}
cachePath := getCacheSHADir(c.dir, bucket, object)
if err := os.MkdirAll(cachePath, 0777); err != nil {
return err
}
var metadata = cloneMSS(opts.UserDefined)
var reader = data
var actualSize = uint64(rlen)
// objSize is the actual size of object (with encryption overhead if any)
var objSize = uint64(size)
if globalCacheKMS != nil {
reader, err = newCacheEncryptReader(data, bucket, object, metadata)
if err != nil {
return err
}
actualSize, _ = sio.EncryptedSize(uint64(rlen))
objSize, _ = sio.EncryptedSize(uint64(size))
}
cacheFile := MustGetUUID()
n, _, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize)
if IsErr(err, baseErrs...) {
// take the cache drive offline
c.setOffline()
}
if err != nil {
removeAll(cachePath)
return err
}
if actualSize != uint64(n) {
removeAll(cachePath)
return IncompleteBody{Bucket: bucket, Object: object}
}
return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile, false)
}
// checks streaming bitrot checksum of cached object before returning data
func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, offset, length int64, writer io.Writer) error {
h := HighwayHash256S.New()
checksumHash := make([]byte, h.Size())
startBlock := offset / cacheBlkSize
endBlock := (offset + length) / cacheBlkSize
// get block start offset
var blockStartOffset int64
if startBlock > 0 {
blockStartOffset = (cacheBlkSize + int64(h.Size())) * startBlock
}
tillLength := (cacheBlkSize + int64(h.Size())) * (endBlock - startBlock + 1)
// Start offset cannot be negative.
if offset < 0 {
logger.LogIf(ctx, errUnexpected)
return errUnexpected
}
// Writer cannot be nil.
if writer == nil {
logger.LogIf(ctx, errUnexpected)
return errUnexpected
}
var blockOffset, blockLength int64
rc, err := readCacheFileStream(filePath, blockStartOffset, tillLength)
if err != nil {
return err
}
bufp := c.pool.Get().(*[]byte)
defer c.pool.Put(bufp)
for block := startBlock; block <= endBlock; block++ {
switch {
case startBlock == endBlock:
blockOffset = offset % cacheBlkSize
blockLength = length
case block == startBlock:
blockOffset = offset % cacheBlkSize
blockLength = cacheBlkSize - blockOffset
case block == endBlock:
blockOffset = 0
blockLength = (offset + length) % cacheBlkSize
default:
blockOffset = 0
blockLength = cacheBlkSize
}
if blockLength == 0 {
break
}
if _, err := io.ReadFull(rc, checksumHash); err != nil {
return err
}
h.Reset()
n, err := io.ReadFull(rc, *bufp)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
logger.LogIf(ctx, err)
return err
}
eof := err == io.EOF || err == io.ErrUnexpectedEOF
if n == 0 && length != 0 {
// Reached EOF, nothing more to be done.
break
}
if _, e := h.Write((*bufp)[:n]); e != nil {
return e
}
hashBytes := h.Sum(nil)
if !bytes.Equal(hashBytes, checksumHash) {
err = fmt.Errorf("hashes do not match expected %s, got %s",
hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes))
logger.LogIf(GlobalContext, err)
return err
}
if _, err := io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil {
if err != io.ErrClosedPipe {
logger.LogIf(ctx, err)
return err
}
eof = true
}
if eof {
break
}
}
return nil
}
// Get returns ObjectInfo and reader for object from disk cache
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, numHits, err
}
ctx = lkctx.Context()
defer cLock.RUnlock(lkctx.Cancel)
var objInfo ObjectInfo
var rngInfo RangeInfo
if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil {
return nil, numHits, toObjectErr(err, bucket, object)
}
cacheFile := cacheDataFile
objSize := objInfo.Size
if !rngInfo.Empty() {
// for cached ranges, need to pass actual range file size to GetObjectReader
// and clear out range spec
cacheFile = rngInfo.File
objInfo.Size = rngInfo.Size
rs = nil
}
var nsUnlocker = func() {}
// For a directory, we need to send an reader that returns no bytes.
if HasSuffix(object, SlashSeparator) {
// The lock taken above is released when
// objReader.Close() is called by the caller.
gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker)
return gr, numHits, gerr
}
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker)
if nErr != nil {
return nil, numHits, nErr
}
filePath := pathJoin(cacheObjPath, cacheFile)
pr, pw := io.Pipe()
go func() {
err := c.bitrotReadFromCache(ctx, filePath, off, length, pw)
if err != nil {
removeAll(cacheObjPath)
}
pw.CloseWithError(err)
}()
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
pipeCloser := func() { pr.Close() }
gr, gerr := fn(pr, h, opts.CheckPrecondFn, pipeCloser)
if gerr != nil {
return gr, numHits, gerr
}
if globalCacheKMS != nil {
// clean up internal SSE cache metadata
delete(gr.ObjInfo.UserDefined, xhttp.AmzServerSideEncryption)
}
if !rngInfo.Empty() {
// overlay Size with actual object size and not the range size
gr.ObjInfo.Size = objSize
}
return gr, numHits, nil
}
// Deletes the cached object
func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) {
cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer cLock.Unlock(lkctx.Cancel)
return removeAll(cacheObjPath)
}
// Deletes the cached object
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
return c.delete(ctx, cacheObjPath)
}
// convenience function to check if object is cached on this diskCache
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil {
return false
}
return true
}
// queues writeback upload failures on server startup
func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
defer close(c.retryWritebackCh)
filterFn := func(name string, typ os.FileMode) error {
if name == minioMetaBucket {
// Proceed to next file.
return nil
}
cacheDir := pathJoin(c.dir, name)
meta, _, _, err := c.statCachedMeta(ctx, cacheDir)
if err != nil {
return nil
}
objInfo := meta.ToObjectInfo("", "")
status, ok := objInfo.UserDefined[writeBackStatusHeader]
if !ok || status == CommitComplete.String() {
return nil
}
select {
case c.retryWritebackCh <- objInfo:
default:
}
return nil
}
if err := readDirFn(c.dir, filterFn); err != nil {
logger.LogIf(ctx, err)
return
}
}