minio/cmd/disk-cache.go

606 lines
19 KiB
Go

package cmd
import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/djherbis/atime"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/wildcard"
)
const (
cacheBlkSize = int64(1 * 1024 * 1024)
)
// CacheStorageInfo - represents total, free capacity of
// underlying cache storage.
type CacheStorageInfo struct {
Total uint64 // Total cache disk space.
Free uint64 // Free cache available space.
}
// CacheObjectLayer implements primitives for cache object API layer.
type CacheObjectLayer interface {
// Object operations.
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string) error
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
// Storage operations.
StorageInfo(ctx context.Context) CacheStorageInfo
}
// Abstracts disk caching - used by the S3 layer
type cacheObjects struct {
// slice of cache drives
cache []*diskCache
// file path patterns to exclude from cache
exclude []string
// to manage cache namespace locks
nsMutex *nsLockMap
// if true migration is in progress from v1 to v2
migrating bool
// mutex to protect migration bool
migMutex sync.Mutex
// Object functions pointing to the corresponding functions of backend implementation.
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObjectFn func(ctx context.Context, bucket, object string) error
DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
}
func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer cLock.Unlock()
return dcache.Delete(ctx, bucket, object)
}
func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return err
}
defer cLock.Unlock()
return dcache.Put(ctx, bucket, object, data, size, opts)
}
func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
return nil, err
}
defer cLock.RUnlock()
return dcache.Get(ctx, bucket, object, rs, h, opts)
}
func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, object string) (oi ObjectInfo, err error) {
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
return oi, err
}
defer cLock.RUnlock()
return dcache.Stat(ctx, bucket, object)
}
// DeleteObject clears cache entry if backend delete operation succeeds
func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
if err = c.DeleteObjectFn(ctx, bucket, object); err != nil {
return
}
if c.isCacheExclude(bucket, object) || c.skipCache() {
return
}
dcache, cerr := c.getCacheLoc(ctx, bucket, object)
if cerr != nil {
return
}
if dcache.Exists(ctx, bucket, object) {
c.delete(ctx, dcache, bucket, object)
}
return
}
// DeleteObjects batch deletes objects in slice, and clears any cached entries
func (c *cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
errs := make([]error, len(objects))
for idx, object := range objects {
errs[idx] = c.DeleteObject(ctx, bucket, object)
}
return errs, nil
}
// construct a metadata k-v map
func getMetadata(objInfo ObjectInfo) map[string]string {
metadata := make(map[string]string)
metadata["etag"] = objInfo.ETag
metadata["content-type"] = objInfo.ContentType
if objInfo.ContentEncoding != "" {
metadata["content-encoding"] = objInfo.ContentEncoding
}
if objInfo.Expires != timeSentinel {
metadata["expires"] = objInfo.Expires.Format(http.TimeFormat)
}
for k, v := range objInfo.UserDefined {
metadata[k] = v
}
return metadata
}
func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
if c.isCacheExclude(bucket, object) || c.skipCache() {
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
var cc cacheControl
// fetch diskCache if object is currently cached or nearest available cache drive
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
cacheReader, cacheErr := c.get(ctx, dcache, bucket, object, rs, h, opts)
if cacheErr == nil {
cc = cacheControlOpts(cacheReader.ObjInfo)
if !cc.isEmpty() && !cc.isStale(cacheReader.ObjInfo.ModTime) {
return cacheReader, nil
}
}
objInfo, err := c.GetObjectInfoFn(ctx, bucket, object, opts)
if backendDownError(err) && cacheErr == nil {
return cacheReader, nil
} else if err != nil {
if _, ok := err.(ObjectNotFound); ok {
if cacheErr == nil {
cacheReader.Close()
// Delete cached entry if backend object
// was deleted.
dcache.Delete(ctx, bucket, object)
}
}
return nil, err
}
if !objInfo.IsCacheable() {
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
if cacheErr == nil {
// if ETag matches for stale cache entry, serve from cache
if cacheReader.ObjInfo.ETag == objInfo.ETag {
// Update metadata in case server-side copy might have changed object metadata
dcache.updateMetadataIfChanged(ctx, bucket, object, objInfo, cacheReader.ObjInfo)
return cacheReader, nil
}
cacheReader.Close()
// Object is stale, so delete from cache
c.delete(ctx, dcache, bucket, object)
}
// Since we got here, we are serving the request from backend,
// and also adding the object to the cache.
if !dcache.diskUsageLow() {
select {
case dcache.purgeChan <- struct{}{}:
default:
}
}
if !dcache.diskAvailable(objInfo.Size) {
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
if rs != nil {
go func() {
// fill cache in the background for range GET requests
bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, h, lockType, opts)
if bErr != nil {
return
}
defer bReader.Close()
oi, err := c.stat(ctx, dcache, bucket, object)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
}
}()
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
if bkErr != nil {
return nil, bkErr
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
teeReader := io.TeeReader(bkReader, pipeWriter)
go func() {
putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
// close the write end of the pipe, so the error gets
// propagated to getObjReader
pipeWriter.CloseWithError(putErr)
}()
cleanupBackend := func() { bkReader.Close() }
cleanupPipe := func() { pipeWriter.Close() }
return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts.CheckCopyPrecondFn, cleanupBackend, cleanupPipe)
}
// Returns ObjectInfo from cache if available.
func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
getObjectInfoFn := c.GetObjectInfoFn
if c.isCacheExclude(bucket, object) || c.skipCache() {
return getObjectInfoFn(ctx, bucket, object, opts)
}
// fetch diskCache if object is currently cached or nearest available cache drive
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
return getObjectInfoFn(ctx, bucket, object, opts)
}
var cc cacheControl
// if cache control setting is valid, avoid HEAD operation to backend
cachedObjInfo, cerr := c.stat(ctx, dcache, bucket, object)
if cerr == nil {
cc = cacheControlOpts(cachedObjInfo)
if !cc.isEmpty() && !cc.isStale(cachedObjInfo.ModTime) {
return cachedObjInfo, nil
}
}
objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
if err != nil {
if _, ok := err.(ObjectNotFound); ok {
// Delete the cached entry if backend object was deleted.
c.delete(ctx, dcache, bucket, object)
return ObjectInfo{}, err
}
if !backendDownError(err) {
return ObjectInfo{}, err
}
if cerr == nil {
return cachedObjInfo, nil
}
return ObjectInfo{}, BackendDown{}
}
// when backend is up, do a sanity check on cached object
if cerr != nil {
return objInfo, nil
}
if cachedObjInfo.ETag != objInfo.ETag {
// Delete the cached entry if the backend object was replaced.
c.delete(ctx, dcache, bucket, object)
}
return objInfo, nil
}
// StorageInfo - returns underlying storage statistics.
func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) {
var total, free uint64
for _, cache := range c.cache {
if cache == nil {
continue
}
info, err := getDiskInfo(cache.dir)
logger.GetReqInfo(ctx).AppendTags("cachePath", cache.dir)
logger.LogIf(ctx, err)
total += info.Total
free += info.Free
}
return CacheStorageInfo{
Total: total,
Free: free,
}
}
// skipCache() returns true if cache migration is in progress
func (c *cacheObjects) skipCache() bool {
c.migMutex.Lock()
defer c.migMutex.Unlock()
return c.migrating
}
// Returns true if object should be excluded from cache
func (c *cacheObjects) isCacheExclude(bucket, object string) bool {
// exclude directories from cache
if strings.HasSuffix(object, SlashSeparator) {
return true
}
for _, pattern := range c.exclude {
matchStr := fmt.Sprintf("%s/%s", bucket, object)
if ok := wildcard.MatchSimple(pattern, matchStr); ok {
return true
}
}
return false
}
// choose a cache deterministically based on hash of bucket,object. The hash index is treated as
// a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives
// as a circular buffer and walk through them starting at hash index until an online drive is found.
func (c *cacheObjects) getCacheLoc(ctx context.Context, bucket, object string) (*diskCache, error) {
index := c.hashIndex(bucket, object)
numDisks := len(c.cache)
for k := 0; k < numDisks; k++ {
i := (index + k) % numDisks
if c.cache[i] == nil {
continue
}
if c.cache[i].IsOnline() {
return c.cache[i], nil
}
}
return nil, errDiskNotFound
}
// get cache disk where object is currently cached for a GET operation. If object does not exist at that location,
// treat the list of cache drives as a circular buffer and walk through them starting at hash index
// until an online drive is found.If object is not found, fall back to the first online cache drive
// closest to the hash index, so that object can be re-cached.
func (c *cacheObjects) getCacheToLoc(ctx context.Context, bucket, object string) (*diskCache, error) {
index := c.hashIndex(bucket, object)
numDisks := len(c.cache)
// save first online cache disk closest to the hint index
var firstOnlineDisk *diskCache
for k := 0; k < numDisks; k++ {
i := (index + k) % numDisks
if c.cache[i] == nil {
continue
}
if c.cache[i].IsOnline() {
if firstOnlineDisk == nil {
firstOnlineDisk = c.cache[i]
}
if c.cache[i].Exists(ctx, bucket, object) {
return c.cache[i], nil
}
}
}
if firstOnlineDisk != nil {
return firstOnlineDisk, nil
}
return nil, errDiskNotFound
}
// Compute a unique hash sum for bucket and object
func (c *cacheObjects) hashIndex(bucket, object string) int {
return crcHashMod(pathJoin(bucket, object), len(c.cache))
}
// newCache initializes the cacheFSObjects for the "drives" specified in config.json
// or the global env overrides.
func newCache(config CacheConfig) ([]*diskCache, bool, error) {
var caches []*diskCache
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{})
formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives)
if err != nil {
return nil, false, err
}
for i, dir := range config.Drives {
// skip diskCache creation for cache drives missing a format.json
if formats[i] == nil {
caches = append(caches, nil)
continue
}
if err := checkAtimeSupport(dir); err != nil {
return nil, false, errors.New("Atime support required for disk caching")
}
cache, err := newdiskCache(dir, config.Expiry, config.MaxUse)
if err != nil {
return nil, false, err
}
// Start the purging go-routine for entries that have expired if no migration in progress
if !migrating {
go cache.purge()
}
caches = append(caches, cache)
}
return caches, migrating, nil
}
// Return error if Atime is disabled on the O/S
func checkAtimeSupport(dir string) (err error) {
file, err := ioutil.TempFile(dir, "prefix")
if err != nil {
return
}
defer os.Remove(file.Name())
finfo1, err := os.Stat(file.Name())
if err != nil {
return
}
// add a sleep to ensure atime change is detected
time.Sleep(10 * time.Millisecond)
if _, err = io.Copy(ioutil.Discard, file); err != nil {
return
}
finfo2, err := os.Stat(file.Name())
if atime.Get(finfo2).Equal(atime.Get(finfo1)) {
return errors.New("Atime not supported")
}
return
}
func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
logger.StartupMessage(colorBlue("Cache migration initiated ...."))
var wg = &sync.WaitGroup{}
errs := make([]error, len(c.cache))
for i, dc := range c.cache {
if dc == nil {
continue
}
wg.Add(1)
// start migration from V1 to V2
go func(ctx context.Context, dc *diskCache, errs []error, idx int) {
defer wg.Done()
if err := migrateOldCache(ctx, dc); err != nil {
errs[idx] = err
logger.LogIf(ctx, err)
return
}
// start purge routine after migration completes.
go dc.purge()
}(ctx, dc, errs, i)
}
wg.Wait()
errCnt := 0
for _, err := range errs {
if err != nil {
errCnt++
}
}
if errCnt > 0 {
return
}
// update migration status
c.migMutex.Lock()
defer c.migMutex.Unlock()
c.migrating = false
logger.StartupMessage(colorBlue("Cache migration completed successfully."))
}
// PutObject - caches the uploaded object for single Put operations
func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
putObjectFn := c.PutObjectFn
data := r.rawReader
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return putObjectFn(ctx, bucket, object, r, opts)
}
size := r.Size()
if c.skipCache() {
return putObjectFn(ctx, bucket, object, r, opts)
}
// fetch from backend if there is no space on cache drive
if !dcache.diskAvailable(size) {
return putObjectFn(ctx, bucket, object, r, opts)
}
if opts.ServerSideEncryption != nil {
return putObjectFn(ctx, bucket, object, r, opts)
}
// fetch from backend if cache exclude pattern or cache-control
// directive set to exclude
if c.isCacheExclude(bucket, object) {
dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, opts)
}
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", data.ActualSize(), globalCLIContext.StrictS3Compat)
if err != nil {
return ObjectInfo{}, err
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
oinfoCh := make(chan ObjectInfo)
errCh := make(chan error)
go func() {
oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts)
if perr != nil {
pipeWriter.CloseWithError(perr)
wPipe.CloseWithError(perr)
close(oinfoCh)
errCh <- perr
return
}
close(errCh)
oinfoCh <- oinfo
}()
// get a namespace lock on cache until cache is filled.
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return ObjectInfo{}, err
}
defer cLock.Unlock()
go func() {
if err = dcache.Put(ctx, bucket, object, rPipe, data.Size(), opts); err != nil {
wPipe.CloseWithError(err)
return
}
}()
mwriter := io.MultiWriter(pipeWriter, wPipe)
_, err = io.Copy(mwriter, data)
if err != nil {
err = <-errCh
return objInfo, err
}
pipeWriter.Close()
wPipe.Close()
objInfo = <-oinfoCh
dcache.updateETag(ctx, bucket, object, objInfo.ETag)
return objInfo, err
}
// Returns cacheObjects for use by Server.
func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObjectLayer, error) {
// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
cache, migrateSw, err := newCache(config)
if err != nil {
return nil, err
}
c := &cacheObjects{
cache: cache,
exclude: config.Exclude,
nsMutex: newNSLock(false),
migrating: migrateSw,
migMutex: sync.Mutex{},
GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
},
GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
},
DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
return newObjectLayerFn().DeleteObject(ctx, bucket, object)
},
DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) {
errs := make([]error, len(objects))
for idx, object := range objects {
errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object)
}
return errs, nil
},
PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
},
}
if migrateSw {
go c.migrateCacheFromV1toV2(ctx)
}
return c, nil
}