From 3385bf3da894e5f1da72553ff6d89707a09496c5 Mon Sep 17 00:00:00 2001 From: poornas Date: Fri, 9 Aug 2019 17:09:08 -0700 Subject: [PATCH] Rewrite cache implementation to cache only on GET (#7694) Fixes #7458 Fixes #7573 Fixes #7938 Fixes #6934 Fixes #6265 Fixes #6630 This will allow the cache to consistently work for server and gateways. Range GET requests will be cached in the background after the request is served from the backend. - All cached content is automatically bitrot protected. - Avoid ETag verification if a cache-control header is set and the cached content is still valid. - This PR changes the cache backend format, and all existing content will be migrated to the new format. Until the data is migrated completely, all content will be served from the backend. --- cmd/bucket-handlers-listobjects.go | 7 +- cmd/bucket-handlers.go | 15 +- cmd/disk-cache-backend.go | 573 ++++++++++++++ cmd/disk-cache-fs.go | 537 ------------- cmd/disk-cache-utils.go | 170 ++++ cmd/disk-cache-utils_test.go | 60 ++ cmd/disk-cache.go | 1182 +++++++--------------------- cmd/disk-cache_test.go | 179 ++--- cmd/format-disk-cache.go | 239 +++++- cmd/format-disk-cache_test.go | 64 +- cmd/gateway-main.go | 2 +- cmd/object-handlers.go | 20 +- cmd/server-main.go | 2 +- cmd/web-handlers.go | 51 +- docs/disk-caching/DESIGN.md | 8 +- 15 files changed, 1423 insertions(+), 1686 deletions(-) create mode 100644 cmd/disk-cache-backend.go delete mode 100644 cmd/disk-cache-fs.go create mode 100644 cmd/disk-cache-utils.go create mode 100644 cmd/disk-cache-utils_test.go diff --git a/cmd/bucket-handlers-listobjects.go b/cmd/bucket-handlers-listobjects.go index 0eb7e9e6d..3874b2383 100644 --- a/cmd/bucket-handlers-listobjects.go +++ b/cmd/bucket-handlers-listobjects.go @@ -93,9 +93,7 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http } listObjectsV2 := objectAPI.ListObjectsV2 - if api.CacheAPI() != nil { - listObjectsV2 = api.CacheAPI().ListObjectsV2 - } + // Inititate a list objects operation based on the input params. // On success would return back ListObjectsInfo object to be // marshaled into S3 compatible XML header. @@ -172,9 +170,6 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http } listObjects := objectAPI.ListObjects - if api.CacheAPI() != nil { - listObjects = api.CacheAPI().ListObjects - } // Inititate a list objects operation based on the input params. // On success would return back ListObjectsInfo object to be diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 62517756a..ead2eaf74 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -106,9 +106,7 @@ func (api objectAPIHandlers) GetBucketLocationHandler(w http.ResponseWriter, r * } getBucketInfo := objectAPI.GetBucketInfo - if api.CacheAPI() != nil { - getBucketInfo = api.CacheAPI().GetBucketInfo - } + if _, err := getBucketInfo(ctx, bucket); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -203,9 +201,6 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R } listBuckets := objectAPI.ListBuckets - if api.CacheAPI() != nil { - listBuckets = api.CacheAPI().ListBuckets - } if s3Error := checkRequestAuthType(ctx, r, policy.ListAllMyBucketsAction, "", ""); s3Error != ErrNone { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) @@ -747,9 +742,7 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re } getBucketInfo := objectAPI.GetBucketInfo - if api.CacheAPI() != nil { - getBucketInfo = api.CacheAPI().GetBucketInfo - } + if _, err := getBucketInfo(ctx, bucket); err != nil { writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) return @@ -779,9 +772,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } deleteBucket := objectAPI.DeleteBucket - if api.CacheAPI() != nil { - deleteBucket = api.CacheAPI().DeleteBucket - } + // Attempt to delete bucket. if err := deleteBucket(ctx, bucket); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go new file mode 100644 index 000000000..96f6a0597 --- /dev/null +++ b/cmd/disk-cache-backend.go @@ -0,0 +1,573 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "path" + "reflect" + "sync" + "time" + + "github.com/djherbis/atime" + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/disk" + "github.com/ncw/directio" +) + +const ( + // cache.json object metadata for cached objects. + cacheMetaJSONFile = "cache.json" + cacheDataFile = "part.1" + cacheMetaVersion = "1.0.0" + + cacheEnvDelimiter = ";" +) + +// CacheChecksumInfoV1 - carries checksums of individual blocks on disk. +type CacheChecksumInfoV1 struct { + Algorithm string `json:"algorithm"` + Blocksize int64 `json:"blocksize"` +} + +// Represents the cache metadata struct +type cacheMeta struct { + Version string `json:"version"` + Stat statInfo `json:"stat"` // Stat of the current object `cache.json`. + + // checksums of blocks on disk. + Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"` + // Metadata map for current object. + Meta map[string]string `json:"meta,omitempty"` +} + +func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) { + if len(m.Meta) == 0 { + m.Meta = make(map[string]string) + m.Stat.ModTime = timeSentinel + } + + o = ObjectInfo{ + Bucket: bucket, + Name: object, + } + + // We set file info only if its valid. + o.ModTime = m.Stat.ModTime + o.Size = m.Stat.Size + o.ETag = extractETag(m.Meta) + o.ContentType = m.Meta["content-type"] + o.ContentEncoding = m.Meta["content-encoding"] + if storageClass, ok := m.Meta[amzStorageClass]; ok { + o.StorageClass = storageClass + } else { + o.StorageClass = globalMinioDefaultStorageClass + } + var ( + t time.Time + e error + ) + if exp, ok := m.Meta["expires"]; ok { + if t, e = time.Parse(http.TimeFormat, exp); e == nil { + o.Expires = t.UTC() + } + } + // etag/md5Sum has already been extracted. We need to + // remove to avoid it from appearing as part of user-defined metadata + o.UserDefined = cleanMetadata(m.Meta) + return o +} + +// represents disk cache struct +type diskCache struct { + dir string // caching directory + maxDiskUsagePct int // max usage in % + expiry int // cache expiry in days + // mark false if drive is offline + online bool + // mutex to protect updates to online variable + onlineMutex *sync.RWMutex + // purge() listens on this channel to start the cache-purge process + purgeChan chan struct{} + pool sync.Pool +} + +// Inits the disk cache dir if it is not initialized already. +func newdiskCache(dir string, expiry int, maxDiskUsagePct int) (*diskCache, error) { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, fmt.Errorf("Unable to initialize '%s' dir, %s", dir, err) + } + + if expiry == 0 { + expiry = globalCacheExpiry + } + cache := diskCache{ + dir: dir, + expiry: expiry, + maxDiskUsagePct: maxDiskUsagePct, + purgeChan: make(chan struct{}), + online: true, + onlineMutex: &sync.RWMutex{}, + pool: sync.Pool{ + New: func() interface{} { + b := directio.AlignedBlock(int(cacheBlkSize)) + return &b + }, + }, + } + return &cache, nil +} + +// Returns if the disk usage is low. +// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct +// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G +// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G) +func (c *diskCache) diskUsageLow() bool { + minUsage := c.maxDiskUsagePct * 80 / 100 + di, err := disk.GetInfo(c.dir) + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + return false + } + usedPercent := (di.Total - di.Free) * 100 / di.Total + return int(usedPercent) < minUsage +} + +// Return if the disk usage is high. +// Disk usage is high if disk used is > cacheMaxDiskUsagePct +func (c *diskCache) diskUsageHigh() bool { + di, err := disk.GetInfo(c.dir) + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + return true + } + usedPercent := (di.Total - di.Free) * 100 / di.Total + return int(usedPercent) > c.maxDiskUsagePct +} + +// Returns if size space can be allocated without exceeding +// max disk usable for caching +func (c *diskCache) diskAvailable(size int64) bool { + di, err := disk.GetInfo(c.dir) + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + return false + } + usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total + return int(usedPercent) < c.maxDiskUsagePct +} + +// Purge cache entries that were not accessed. +func (c *diskCache) purge() { + ctx := context.Background() + for { + olderThan := c.expiry + for !c.diskUsageLow() { + // delete unaccessed objects older than expiry duration + expiry := UTCNow().AddDate(0, 0, -1*olderThan) + olderThan /= 2 + if olderThan < 1 { + break + } + deletedCount := 0 + + objDirs, err := ioutil.ReadDir(c.dir) + if err != nil { + log.Fatal(err) + } + + for _, obj := range objDirs { + if obj.Name() == minioMetaBucket { + continue + } + // stat entry to get atime + var fi os.FileInfo + fi, err := os.Stat(pathJoin(c.dir, obj.Name(), cacheDataFile)) + if err != nil { + continue + } + + objInfo, err := c.statCache(ctx, pathJoin(c.dir, obj.Name())) + if err != nil { + // delete any partially filled cache entry left behind. + removeAll(pathJoin(c.dir, obj.Name())) + continue + } + cc := cacheControlOpts(objInfo) + if atime.Get(fi).Before(expiry) || + cc.isStale(objInfo.ModTime) { + if err = removeAll(pathJoin(c.dir, obj.Name())); err != nil { + logger.LogIf(ctx, err) + } + deletedCount++ + // break early if sufficient disk space reclaimed. + if !c.diskUsageLow() { + break + } + } + } + if deletedCount == 0 { + break + } + } + lastRunTime := time.Now() + for { + <-c.purgeChan + timeElapsed := time.Since(lastRunTime) + if timeElapsed > time.Hour { + break + } + } + } +} + +// sets cache drive status +func (c *diskCache) setOnline(status bool) { + c.onlineMutex.Lock() + c.online = status + c.onlineMutex.Unlock() +} + +// returns true if cache drive is online +func (c *diskCache) IsOnline() bool { + c.onlineMutex.RLock() + defer c.onlineMutex.RUnlock() + return c.online +} + +// Stat returns ObjectInfo from disk cache +func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { + cacheObjPath := getCacheSHADir(c.dir, bucket, object) + oi, err = c.statCache(ctx, cacheObjPath) + if err != nil { + return + } + oi.Bucket = bucket + oi.Name = object + return +} + +// statCache is a convenience function for purge() to get ObjectInfo for cached object +func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (oi ObjectInfo, e error) { + // Stat the file to get file size. + metaPath := path.Join(cacheObjPath, cacheMetaJSONFile) + f, err := os.Open(metaPath) + if err != nil { + return oi, err + } + defer f.Close() + + meta := &cacheMeta{Version: cacheMetaVersion} + if err := jsonLoad(f, meta); err != nil { + return oi, err + } + fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)) + if err != nil { + return oi, err + } + meta.Stat.ModTime = atime.Get(fi) + return meta.ToObjectInfo("", ""), nil +} + +// saves object metadata to disk cache +func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64) error { + fileName := getCacheSHADir(c.dir, bucket, object) + metaPath := pathJoin(fileName, cacheMetaJSONFile) + + f, err := os.Create(metaPath) + if err != nil { + return err + } + defer f.Close() + + m := cacheMeta{Meta: meta, Version: cacheMetaVersion} + m.Stat.Size = actualSize + m.Stat.ModTime = UTCNow() + m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} + jsonData, err := json.Marshal(m) + if err != nil { + return err + } + _, err = f.Write(jsonData) + return err +} + +// Backend metadata could have changed through server side copy - reset cache metadata if that is the case +func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error { + if !reflect.DeepEqual(bkObjectInfo.UserDefined, cacheObjInfo.UserDefined) || + bkObjectInfo.ETag != cacheObjInfo.ETag || + bkObjectInfo.ContentType != cacheObjInfo.ContentType || + bkObjectInfo.Expires != cacheObjInfo.Expires { + return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size) + } + return nil +} + +func getCacheSHADir(dir, bucket, object string) string { + return path.Join(dir, getSHA256Hash([]byte(path.Join(bucket, object)))) +} + +// Cache data to disk with bitrot checksum added for each block of 1MB +func (c *diskCache) bitrotWriteToCache(ctx context.Context, cachePath string, reader io.Reader, size int64) (int64, error) { + if err := os.MkdirAll(cachePath, 0777); err != nil { + return 0, err + } + bufSize := int64(readSizeV1) + if size > 0 && bufSize > size { + bufSize = size + } + filePath := path.Join(cachePath, cacheDataFile) + + if filePath == "" || reader == nil { + return 0, errInvalidArgument + } + + if err := checkPathLength(filePath); err != nil { + return 0, err + } + f, err := os.Create(filePath) + if err != nil { + return 0, osErrToFSFileErr(err) + } + defer f.Close() + + var bytesWritten int64 + + h := HighwayHash256S.New() + + bufp := c.pool.Get().(*[]byte) + defer c.pool.Put(bufp) + + for { + n, err := io.ReadFull(reader, *bufp) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF && err != io.ErrClosedPipe { + return 0, err + } + eof := err == io.EOF || err == io.ErrUnexpectedEOF || err == io.ErrClosedPipe + if n == 0 && size != 0 { + // Reached EOF, nothing more to be done. + break + } + h.Reset() + if _, err := h.Write((*bufp)[:n]); err != nil { + return 0, err + } + hashBytes := h.Sum(nil) + if _, err = f.Write(hashBytes); err != nil { + return 0, err + } + if _, err = f.Write((*bufp)[:n]); err != nil { + return 0, err + } + bytesWritten += int64(n) + if eof { + break + } + } + return bytesWritten, nil +} + +// Caches the object to disk +func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error { + if c.diskUsageHigh() { + select { + case c.purgeChan <- struct{}{}: + default: + } + return errDiskFull + } + if !c.diskAvailable(size) { + return errDiskFull + } + cachePath := getCacheSHADir(c.dir, bucket, object) + if err := os.MkdirAll(cachePath, 0777); err != nil { + return err + } + bufSize := int64(readSizeV1) + if size > 0 && bufSize > size { + bufSize = size + } + + n, err := c.bitrotWriteToCache(ctx, cachePath, data, size) + if IsErr(err, baseErrs...) { + c.setOnline(false) + } + if err != nil { + return err + } + return c.saveMetadata(ctx, bucket, object, opts.UserDefined, n) +} + +// checks streaming bitrot checksum of cached object before returning data +func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, offset, length int64, writer io.Writer) error { + h := HighwayHash256S.New() + + checksumHash := make([]byte, h.Size()) + + startBlock := offset / cacheBlkSize + endBlock := (offset + length) / cacheBlkSize + + // get block start offset + var blockStartOffset int64 + if startBlock > 0 { + blockStartOffset = (cacheBlkSize + int64(h.Size())) * startBlock + } + + tillLength := (cacheBlkSize + int64(h.Size())) * (endBlock - startBlock + 1) + + // Start offset cannot be negative. + if offset < 0 { + logger.LogIf(ctx, errUnexpected) + return errUnexpected + } + + // Writer cannot be nil. + if writer == nil { + logger.LogIf(ctx, errUnexpected) + return errUnexpected + } + var blockOffset, blockLength int64 + rc, err := readCacheFileStream(filePath, blockStartOffset, tillLength) + if err != nil { + return err + } + bufp := c.pool.Get().(*[]byte) + defer c.pool.Put(bufp) + + for block := startBlock; block <= endBlock; block++ { + switch { + case startBlock == endBlock: + blockOffset = offset % cacheBlkSize + blockLength = length + case block == startBlock: + blockOffset = offset % cacheBlkSize + blockLength = cacheBlkSize - blockOffset + case block == endBlock: + blockOffset = 0 + blockLength = (offset + length) % cacheBlkSize + default: + blockOffset = 0 + blockLength = cacheBlkSize + } + if blockLength == 0 { + break + } + if _, err := io.ReadFull(rc, checksumHash); err != nil { + return err + } + + h.Reset() + n, err := io.ReadFull(rc, *bufp) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + logger.LogIf(ctx, err) + return err + } + eof := err == io.EOF || err == io.ErrUnexpectedEOF + if n == 0 && length != 0 { + // Reached EOF, nothing more to be done. + break + } + + if _, e := h.Write((*bufp)[:n]); e != nil { + return e + } + hashBytes := h.Sum(nil) + + if !bytes.Equal(hashBytes, checksumHash) { + err = HashMismatchError{hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes)} + logger.LogIf(context.Background(), err) + return err + } + + if _, err := io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil { + if err != io.ErrClosedPipe { + logger.LogIf(ctx, err) + } + return err + } + if eof { + break + } + } + + return nil +} + +// Get returns ObjectInfo and reader for object from disk cache +func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { + var objInfo ObjectInfo + cacheObjPath := getCacheSHADir(c.dir, bucket, object) + + if objInfo, err = c.statCache(ctx, cacheObjPath); err != nil { + return nil, toObjectErr(err, bucket, object) + } + + var nsUnlocker = func() {} + // For a directory, we need to send an reader that returns no bytes. + if hasSuffix(object, SlashSeparator) { + // The lock taken above is released when + // objReader.Close() is called by the caller. + return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker) + } + + fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts.CheckCopyPrecondFn, nsUnlocker) + if nErr != nil { + return nil, nErr + } + + filePath := path.Join(cacheObjPath, cacheDataFile) + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(c.bitrotReadFromCache(ctx, filePath, off, length, pw)) + }() + // Cleanup function to cause the go routine above to exit, in + // case of incomplete read. + pipeCloser := func() { pr.Close() } + + return fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser) + +} + +// Deletes the cached object +func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) { + cachePath := getCacheSHADir(c.dir, bucket, object) + return removeAll(cachePath) + +} + +// convenience function to check if object is cached on this diskCache +func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool { + if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil { + return false + } + return true +} diff --git a/cmd/disk-cache-fs.go b/cmd/disk-cache-fs.go deleted file mode 100644 index 36ed85245..000000000 --- a/cmd/disk-cache-fs.go +++ /dev/null @@ -1,537 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2018 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "os" - "path" - "sync" - "time" - - "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/disk" - "github.com/minio/minio/pkg/lock" -) - -const ( - // cache.json object metadata for cached objects. - cacheMetaJSONFile = "cache.json" - - cacheEnvDelimiter = ";" -) - -// cacheFSObjects implements the cache backend operations. -type cacheFSObjects struct { - *FSObjects - // caching drive path (from cache "drives" in config.json) - dir string - // expiry in days specified in config.json - expiry int - // max disk usage pct - maxDiskUsagePct int - // purge() listens on this channel to start the cache-purge process - purgeChan chan struct{} - // mark false if drive is offline - online bool - // mutex to protect updates to online variable - onlineMutex *sync.RWMutex -} - -// Inits the cache directory if it is not init'ed already. -// Initializing implies creation of new FS Object layer. -func newCacheFSObjects(dir string, expiry int, maxDiskUsagePct int) (*cacheFSObjects, error) { - // Assign a new UUID for FS minio mode. Each server instance - // gets its own UUID for temporary file transaction. - fsUUID := mustGetUUID() - - // Initialize meta volume, if volume already exists ignores it. - if err := initMetaVolumeFS(dir, fsUUID); err != nil { - return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err) - } - - trashPath := pathJoin(dir, minioMetaBucket, cacheTrashDir) - if err := os.MkdirAll(trashPath, 0777); err != nil { - return nil, err - } - - if expiry == 0 { - expiry = globalCacheExpiry - } - - // Initialize fs objects. - fsObjects := &FSObjects{ - fsPath: dir, - metaJSONFile: cacheMetaJSONFile, - fsUUID: fsUUID, - rwPool: &fsIOPool{ - readersMap: make(map[string]*lock.RLockedFile), - }, - nsMutex: newNSLock(false), - listPool: NewTreeWalkPool(globalLookupTimeout), - appendFileMap: make(map[string]*fsAppendFile), - } - - go fsObjects.cleanupStaleMultipartUploads(context.Background(), GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh) - - cacheFS := cacheFSObjects{ - FSObjects: fsObjects, - dir: dir, - expiry: expiry, - maxDiskUsagePct: maxDiskUsagePct, - purgeChan: make(chan struct{}), - online: true, - onlineMutex: &sync.RWMutex{}, - } - return &cacheFS, nil -} - -// Returns if the disk usage is low. -// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct -// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G -// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G) -func (cfs *cacheFSObjects) diskUsageLow() bool { - - minUsage := cfs.maxDiskUsagePct * 80 / 100 - di, err := disk.GetInfo(cfs.dir) - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return false - } - usedPercent := (di.Total - di.Free) * 100 / di.Total - return int(usedPercent) < minUsage -} - -// Return if the disk usage is high. -// Disk usage is high if disk used is > cacheMaxDiskUsagePct -func (cfs *cacheFSObjects) diskUsageHigh() bool { - di, err := disk.GetInfo(cfs.dir) - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return true - } - usedPercent := (di.Total - di.Free) * 100 / di.Total - return int(usedPercent) > cfs.maxDiskUsagePct -} - -// Returns if size space can be allocated without exceeding -// max disk usable for caching -func (cfs *cacheFSObjects) diskAvailable(size int64) bool { - di, err := disk.GetInfo(cfs.dir) - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return false - } - usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total - return int(usedPercent) < cfs.maxDiskUsagePct -} - -// purges all content marked trash from the cache. -func (cfs *cacheFSObjects) purgeTrash() { - ticker := time.NewTicker(time.Minute * cacheCleanupInterval) - defer ticker.Stop() - - for { - select { - case <-GlobalServiceDoneCh: - return - case <-ticker.C: - trashPath := path.Join(cfs.fsPath, minioMetaBucket, cacheTrashDir) - entries, err := readDir(trashPath) - if err != nil { - return - } - for _, entry := range entries { - ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{}) - fi, err := fsStatVolume(ctx, pathJoin(trashPath, entry)) - if err != nil { - continue - } - dir := path.Join(trashPath, fi.Name()) - - // Delete all expired cache content. - fsRemoveAll(ctx, dir) - } - } - } -} - -// Purge cache entries that were not accessed. -func (cfs *cacheFSObjects) purge() { - delimiter := SlashSeparator - maxKeys := 1000 - ctx := context.Background() - for { - olderThan := cfs.expiry - for !cfs.diskUsageLow() { - // delete unaccessed objects older than expiry duration - expiry := UTCNow().AddDate(0, 0, -1*olderThan) - olderThan /= 2 - if olderThan < 1 { - break - } - deletedCount := 0 - buckets, err := cfs.ListBuckets(ctx) - if err != nil { - logger.LogIf(ctx, err) - } - // Reset cache online status if drive was offline earlier. - if !cfs.IsOnline() { - cfs.setOnline(true) - } - for _, bucket := range buckets { - var continuationToken string - var marker string - for { - objects, err := cfs.ListObjects(ctx, bucket.Name, marker, continuationToken, delimiter, maxKeys) - if err != nil { - break - } - - if !objects.IsTruncated { - break - } - marker = objects.NextMarker - for _, object := range objects.Objects { - // purge objects that qualify because of cache-control directives or - // past cache expiry duration. - if !filterFromCache(object.UserDefined) || - !isStaleCache(object) || - object.AccTime.After(expiry) { - continue - } - if err = cfs.DeleteObject(ctx, bucket.Name, object.Name); err != nil { - logger.LogIf(ctx, err) - continue - } - deletedCount++ - } - } - } - if deletedCount == 0 { - // to avoid a busy loop - time.Sleep(time.Minute * 30) - } - } - <-cfs.purgeChan - } -} - -// sets cache drive status -func (cfs *cacheFSObjects) setOnline(status bool) { - cfs.onlineMutex.Lock() - cfs.online = status - cfs.onlineMutex.Unlock() -} - -// returns true if cache drive is online -func (cfs *cacheFSObjects) IsOnline() bool { - cfs.onlineMutex.RLock() - defer cfs.onlineMutex.RUnlock() - return cfs.online -} - -// Caches the object to disk -func (cfs *cacheFSObjects) Put(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) error { - if cfs.diskUsageHigh() { - select { - case cfs.purgeChan <- struct{}{}: - default: - } - return errDiskFull - } - if !cfs.diskAvailable(data.Size()) { - return errDiskFull - } - if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil { - pErr := cfs.MakeBucketWithLocation(ctx, bucket, "") - if pErr != nil { - return pErr - } - } - _, err := cfs.PutObject(ctx, bucket, object, data, opts) - // if err is due to disk being offline , mark cache drive as offline - if IsErr(err, baseErrs...) { - cfs.setOnline(false) - } - return err -} - -// Returns the handle for the cached object -func (cfs *cacheFSObjects) Get(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) { - return cfs.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts) -} - -// Deletes the cached object -func (cfs *cacheFSObjects) Delete(ctx context.Context, bucket, object string) (err error) { - return cfs.DeleteObject(ctx, bucket, object) -} - -// convenience function to check if object is cached on this cacheFSObjects -func (cfs *cacheFSObjects) Exists(ctx context.Context, bucket, object string) bool { - _, err := cfs.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) - return err == nil -} - -// Identical to fs PutObject operation except that it uses ETag in metadata -// headers. -func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) { - data := r.Reader - fs := cfs.FSObjects - // Lock the object. - objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object) - if err := objectLock.GetLock(globalObjectTimeout); err != nil { - return objInfo, err - } - defer objectLock.Unlock() - - // No metadata is set, allocate a new one. - meta := make(map[string]string) - for k, v := range opts.UserDefined { - meta[k] = v - } - - var err error - - // Validate if bucket name is valid and exists. - if _, err = fs.statBucketDir(ctx, bucket); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket) - } - - fsMeta := newFSMetaV1() - fsMeta.Meta = meta - - // This is a special case with size as '0' and object ends - // with a slash separator, we treat it like a valid operation - // and return success. - if isObjectDir(object, data.Size()) { - // Check if an object is present as one of the parent dir. - if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) { - return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object) - } - if err = mkdirAll(pathJoin(fs.fsPath, bucket, object), 0777); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - var fi os.FileInfo - if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - return fsMeta.ToObjectInfo(bucket, object, fi), nil - } - - if err = checkPutObjectArgs(ctx, bucket, object, fs, data.Size()); err != nil { - return ObjectInfo{}, err - } - - // Check if an object is present as one of the parent dir. - if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) { - return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object) - } - - // Validate input data size and it can never be less than zero. - if data.Size() < -1 { - logger.LogIf(ctx, errInvalidArgument) - return ObjectInfo{}, errInvalidArgument - } - - var wlk *lock.LockedFile - if bucket != minioMetaBucket { - bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix) - fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile) - - wlk, err = fs.rwPool.Create(fsMetaPath) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - // This close will allow for locks to be synchronized on `fs.json`. - defer wlk.Close() - defer func() { - // Remove meta file when PutObject encounters any error - if retErr != nil { - tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID) - fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir) - } - }() - } - - // Uploaded object will first be written to the temporary location which will eventually - // be renamed to the actual location. It is first written to the temporary location - // so that cleaning it up will be easy if the server goes down. - tempObj := mustGetUUID() - - // Allocate a buffer to Read() from request body - bufSize := int64(readSizeV1) - if size := data.Size(); size > 0 && bufSize > size { - bufSize = size - } - - buf := make([]byte, int(bufSize)) - fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj) - bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, buf, data.Size()) - if err != nil { - fsRemoveFile(ctx, fsTmpObjPath) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - if fsMeta.Meta["etag"] == "" { - fsMeta.Meta["etag"] = r.MD5CurrentHexString() - } - // Should return IncompleteBody{} error when reader has fewer - // bytes than specified in request header. - if bytesWritten < data.Size() { - fsRemoveFile(ctx, fsTmpObjPath) - return ObjectInfo{}, IncompleteBody{} - } - - // Delete the temporary object in the case of a - // failure. If PutObject succeeds, then there would be - // nothing to delete. - defer fsRemoveFile(ctx, fsTmpObjPath) - - // Entire object was written to the temp location, now it's safe to rename it to the actual location. - fsNSObjPath := pathJoin(fs.fsPath, bucket, object) - if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - - if bucket != minioMetaBucket { - // Write FS metadata after a successful namespace operation. - if _, err = fsMeta.WriteTo(wlk); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - } - - // Stat the file to fetch timestamp, size. - fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)) - if err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - // Success. - return fsMeta.ToObjectInfo(bucket, object, fi), nil -} - -// Implements S3 compatible initiate multipart API. Operation here is identical -// to fs backend implementation - with the exception that cache FS uses the uploadID -// generated on the backend -func (cfs *cacheFSObjects) NewMultipartUpload(ctx context.Context, bucket, object string, uploadID string, opts ObjectOptions) (string, error) { - if cfs.diskUsageHigh() { - select { - case cfs.purgeChan <- struct{}{}: - default: - } - return "", errDiskFull - } - - if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil { - pErr := cfs.MakeBucketWithLocation(ctx, bucket, "") - if pErr != nil { - return "", pErr - } - } - fs := cfs.FSObjects - if err := checkNewMultipartArgs(ctx, bucket, object, fs); err != nil { - return "", toObjectErr(err, bucket) - } - - if _, err := fs.statBucketDir(ctx, bucket); err != nil { - return "", toObjectErr(err, bucket) - } - - uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) - - err := mkdirAll(uploadIDDir, 0755) - if err != nil { - logger.LogIf(ctx, err) - return "", err - } - - // Initialize fs.json values. - fsMeta := newFSMetaV1() - fsMeta.Meta = opts.UserDefined - - fsMetaBytes, err := json.Marshal(fsMeta) - if err != nil { - logger.LogIf(ctx, err) - return "", err - } - - if err = ioutil.WriteFile(pathJoin(uploadIDDir, fs.metaJSONFile), fsMetaBytes, 0644); err != nil { - logger.LogIf(ctx, err) - return "", err - } - return uploadID, nil -} - -// moveBucketToTrash clears cacheFSObjects of bucket contents and moves it to trash folder. -func (cfs *cacheFSObjects) moveBucketToTrash(ctx context.Context, bucket string) (err error) { - fs := cfs.FSObjects - bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "") - if err = bucketLock.GetLock(globalObjectTimeout); err != nil { - return err - } - defer bucketLock.Unlock() - bucketDir, err := fs.getBucketDir(ctx, bucket) - if err != nil { - return toObjectErr(err, bucket) - } - trashPath := pathJoin(cfs.fsPath, minioMetaBucket, cacheTrashDir) - expiredDir := path.Join(trashPath, bucket) - // Attempt to move regular bucket to expired directory. - if err = fsRenameDir(bucketDir, expiredDir); err != nil { - logger.LogIf(ctx, err) - return toObjectErr(err, bucket) - } - // Cleanup all the bucket metadata. - ominioMetadataBucketDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket) - nminioMetadataBucketDir := pathJoin(trashPath, MustGetUUID()) - logger.LogIf(ctx, fsRenameDir(ominioMetadataBucketDir, nminioMetadataBucketDir)) - return nil -} - -// Removes a directory only if its empty, handles long -// paths for windows automatically. -func fsRenameDir(dirPath, newPath string) (err error) { - if dirPath == "" || newPath == "" { - return errInvalidArgument - } - - if err = checkPathLength(dirPath); err != nil { - return err - } - if err = checkPathLength(newPath); err != nil { - return err - } - if err = os.Rename(dirPath, newPath); err != nil { - if os.IsNotExist(err) { - return errVolumeNotFound - } else if isSysErrNotEmpty(err) { - return errVolumeNotEmpty - } - return err - } - return nil -} diff --git a/cmd/disk-cache-utils.go b/cmd/disk-cache-utils.go new file mode 100644 index 000000000..30481be8d --- /dev/null +++ b/cmd/disk-cache-utils.go @@ -0,0 +1,170 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "io" + "os" + "strconv" + "strings" + "time" + + "github.com/minio/minio/cmd/crypto" +) + +type cacheControl struct { + expiry time.Time + maxAge int + sMaxAge int + minFresh int + maxStale int +} + +func (c cacheControl) isEmpty() bool { + return c == cacheControl{} + +} + +func (c cacheControl) isStale(modTime time.Time) bool { + if c.isEmpty() { + return false + } + now := time.Now() + + if c.sMaxAge > 0 && c.sMaxAge < int(now.Sub(modTime).Seconds()) { + return true + } + if c.maxAge > 0 && c.maxAge < int(now.Sub(modTime).Seconds()) { + return true + } + + if !c.expiry.Equal(time.Time{}) && c.expiry.Before(time.Now().Add(time.Duration(c.maxStale))) { + return true + } + + if c.minFresh > 0 && c.minFresh <= int(now.Sub(modTime).Seconds()) { + return true + } + + return false +} + +// returns struct with cache-control settings from user metadata. +func cacheControlOpts(o ObjectInfo) (c cacheControl) { + m := o.UserDefined + if o.Expires != timeSentinel { + c.expiry = o.Expires + } + + var headerVal string + for k, v := range m { + if strings.ToLower(k) == "cache-control" { + headerVal = v + } + + } + if headerVal == "" { + return + } + headerVal = strings.ToLower(headerVal) + headerVal = strings.TrimSpace(headerVal) + + vals := strings.Split(headerVal, ",") + for _, val := range vals { + val = strings.TrimSpace(val) + p := strings.Split(val, "=") + + if len(p) != 2 { + continue + } + if p[0] == "max-age" || + p[0] == "s-maxage" || + p[0] == "min-fresh" || + p[0] == "max-stale" { + i, err := strconv.Atoi(p[1]) + if err != nil { + return cacheControl{} + } + if p[0] == "max-age" { + c.maxAge = i + } + if p[0] == "s-maxage" { + c.sMaxAge = i + } + if p[0] == "min-fresh" { + c.minFresh = i + } + if p[0] == "max-stale" { + c.maxStale = i + } + } + } + return c +} + +// backendDownError returns true if err is due to backend failure or faulty disk if in server mode +func backendDownError(err error) bool { + _, backendDown := err.(BackendDown) + return backendDown || IsErr(err, baseErrs...) +} + +// IsCacheable returns if the object should be saved in the cache. +func (o ObjectInfo) IsCacheable() bool { + return !crypto.IsEncrypted(o.UserDefined) +} + +// reads file cached on disk from offset upto length +func readCacheFileStream(filePath string, offset, length int64) (io.ReadCloser, error) { + if filePath == "" || offset < 0 { + return nil, errInvalidArgument + } + if err := checkPathLength(filePath); err != nil { + return nil, err + } + + fr, err := os.Open(filePath) + if err != nil { + return nil, osErrToFSFileErr(err) + } + // Stat to get the size of the file at path. + st, err := fr.Stat() + if err != nil { + err = osErrToFSFileErr(err) + return nil, err + } + + // Verify if its not a regular file, since subsequent Seek is undefined. + if !st.Mode().IsRegular() { + return nil, errIsNotRegular + } + + if err = os.Chtimes(filePath, time.Now(), st.ModTime()); err != nil { + return nil, err + } + + // Seek to the requested offset. + if offset > 0 { + _, err = fr.Seek(offset, io.SeekStart) + if err != nil { + return nil, err + } + } + return struct { + io.Reader + io.Closer + }{Reader: io.LimitReader(fr, length), Closer: fr}, nil +} diff --git a/cmd/disk-cache-utils_test.go b/cmd/disk-cache-utils_test.go new file mode 100644 index 000000000..27ec3cf71 --- /dev/null +++ b/cmd/disk-cache-utils_test.go @@ -0,0 +1,60 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/http" + "reflect" + "testing" + "time" +) + +func TestGetCacheControlOpts(t *testing.T) { + expiry, _ := time.Parse(http.TimeFormat, "Wed, 21 Oct 2015 07:28:00 GMT") + + testCases := []struct { + cacheControlHeaderVal string + expiryHeaderVal time.Time + expectedCacheControl cacheControl + expectedErr bool + }{ + {"", timeSentinel, cacheControl{}, false}, + {"max-age=2592000, public", timeSentinel, cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false}, + {"max-age=2592000, no-store", timeSentinel, cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false}, + {"must-revalidate, max-age=600", timeSentinel, cacheControl{maxAge: 600, sMaxAge: 0, minFresh: 0, expiry: time.Time{}}, false}, + {"s-maxAge=2500, max-age=600", timeSentinel, cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}}, false}, + {"s-maxAge=2500, max-age=600", expiry, cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Date(2015, time.October, 21, 07, 28, 00, 00, time.UTC)}, false}, + {"s-maxAge=2500, max-age=600s", timeSentinel, cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}}, true}, + } + var m map[string]string + + for i, testCase := range testCases { + m = make(map[string]string) + m["cache-control"] = testCase.cacheControlHeaderVal + if testCase.expiryHeaderVal != timeSentinel { + m["expires"] = testCase.expiryHeaderVal.String() + } + c := cacheControlOpts(ObjectInfo{UserDefined: m, Expires: testCase.expiryHeaderVal}) + if testCase.expectedErr && (c != cacheControl{}) { + t.Errorf("expected err for case %d", i) + } + if !testCase.expectedErr && !reflect.DeepEqual(c, testCase.expectedCacheControl) { + t.Errorf("expected %v got %v for case %d", testCase.expectedCacheControl, c, i) + } + + } +} diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 679f87b01..2b226fc5a 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -1,19 +1,3 @@ -/* - * MinIO Cloud Storage, (C) 2018 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package cmd import ( @@ -24,56 +8,19 @@ import ( "io/ioutil" "net/http" "os" - "sort" - "strconv" "strings" + "sync" "time" "github.com/djherbis/atime" - - "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/wildcard" ) const ( - // disk cache needs to have object size space free for a cache entry to be created. - cacheTrashDir = "trash" - cacheCleanupInterval = 10 // in minutes + cacheBlkSize = int64(1 * 1024 * 1024) ) -// abstract slice of cache drives backed by FS. -type diskCache struct { - cfs []*cacheFSObjects -} - -// Abstracts disk caching - used by the S3 layer -type cacheObjects struct { - // pointer to disk cache - cache *diskCache - // ListObjects pool management. - listPool *TreeWalkPool - // file path patterns to exclude from cache - exclude []string - // Object functions pointing to the corresponding functions of backend implementation. - GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) - GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) - GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) - PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) - DeleteObjectFn func(ctx context.Context, bucket, object string) error - DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error) - ListObjectsFn func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) - ListObjectsV2Fn func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) - ListBucketsFn func(ctx context.Context) (buckets []BucketInfo, err error) - GetBucketInfoFn func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) - NewMultipartUploadFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) - PutObjectPartFn func(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) - AbortMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string) error - CompleteMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) - DeleteBucketFn func(ctx context.Context, bucket string) error -} - // CacheStorageInfo - represents total, free capacity of // underlying cache storage. type CacheStorageInfo struct { @@ -83,119 +30,138 @@ type CacheStorageInfo struct { // CacheObjectLayer implements primitives for cache object API layer. type CacheObjectLayer interface { - // Bucket operations. - ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) - ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) - GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) - ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) - DeleteBucket(ctx context.Context, bucket string) error // Object operations. GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) - GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) - PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObject(ctx context.Context, bucket, object string) error DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) - - // Multipart operations. - NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) - PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) - AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error - CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) - // Storage operations. StorageInfo(ctx context.Context) CacheStorageInfo } -// IsCacheable returns if the object should be saved in the cache. -func (o ObjectInfo) IsCacheable() bool { - return !crypto.IsEncrypted(o.UserDefined) +// Abstracts disk caching - used by the S3 layer +type cacheObjects struct { + // slice of cache drives + cache []*diskCache + // file path patterns to exclude from cache + exclude []string + // to manage cache namespace locks + nsMutex *nsLockMap + + // if true migration is in progress from v1 to v2 + migrating bool + // mutex to protect migration bool + migMutex sync.Mutex + + // Object functions pointing to the corresponding functions of backend implementation. + GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) + GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) + DeleteObjectFn func(ctx context.Context, bucket, object string) error + DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error) } -// backendDownError returns true if err is due to backend failure or faulty disk if in server mode -func backendDownError(err error) bool { - _, backendDown := err.(BackendDown) - return backendDown || IsErr(err, baseErrs...) +func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) { + cLock := c.nsMutex.NewNSLock(ctx, bucket, object) + if err := cLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer cLock.Unlock() + return dcache.Delete(ctx, bucket, object) } -// get cache disk where object is currently cached for a GET operation. If object does not exist at that location, -// treat the list of cache drives as a circular buffer and walk through them starting at hash index -// until an online drive is found.If object is not found, fall back to the first online cache drive -// closest to the hash index, so that object can be recached. -func (c diskCache) getCachedFSLoc(ctx context.Context, bucket, object string) (*cacheFSObjects, error) { - index := c.hashIndex(bucket, object) - numDisks := len(c.cfs) - // save first online cache disk closest to the hint index - var firstOnlineDisk *cacheFSObjects - for k := 0; k < numDisks; k++ { - i := (index + k) % numDisks - if c.cfs[i] == nil { - continue - } - if c.cfs[i].IsOnline() { - if firstOnlineDisk == nil { - firstOnlineDisk = c.cfs[i] - } - if c.cfs[i].Exists(ctx, bucket, object) { - return c.cfs[i], nil - } - } +func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error { + cLock := c.nsMutex.NewNSLock(ctx, bucket, object) + if err := cLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer cLock.Unlock() + return dcache.Put(ctx, bucket, object, data, size, opts) +} + +func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { + cLock := c.nsMutex.NewNSLock(ctx, bucket, object) + if err := cLock.GetRLock(globalObjectTimeout); err != nil { + return nil, err } - if firstOnlineDisk != nil { - return firstOnlineDisk, nil - } - return nil, errDiskNotFound + defer cLock.RUnlock() + return dcache.Get(ctx, bucket, object, rs, h, opts) } -// choose a cache deterministically based on hash of bucket,object. The hash index is treated as -// a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives -// as a circular buffer and walk through them starting at hash index until an online drive is found. -func (c diskCache) getCacheFS(ctx context.Context, bucket, object string) (*cacheFSObjects, error) { - index := c.hashIndex(bucket, object) - numDisks := len(c.cfs) - for k := 0; k < numDisks; k++ { - i := (index + k) % numDisks - if c.cfs[i] == nil { - continue - } - if c.cfs[i].IsOnline() { - return c.cfs[i], nil - } +func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, object string) (oi ObjectInfo, err error) { + cLock := c.nsMutex.NewNSLock(ctx, bucket, object) + if err := cLock.GetRLock(globalObjectTimeout); err != nil { + return oi, err } - return nil, errDiskNotFound + + defer cLock.RUnlock() + return dcache.Stat(ctx, bucket, object) } -// Compute a unique hash sum for bucket and object -func (c diskCache) hashIndex(bucket, object string) int { - return crcHashMod(pathJoin(bucket, object), len(c.cfs)) +// DeleteObject clears cache entry if backend delete operation succeeds +func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) { + if err = c.DeleteObjectFn(ctx, bucket, object); err != nil { + return + } + if c.isCacheExclude(bucket, object) || c.skipCache() { + return + } + + dcache, cerr := c.getCacheLoc(ctx, bucket, object) + if cerr != nil { + return + } + if dcache.Exists(ctx, bucket, object) { + c.delete(ctx, dcache, bucket, object) + } + return +} + +// DeleteObjects batch deletes objects in slice, and clears any cached entries +func (c *cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { + errs := make([]error, len(objects)) + for idx, object := range objects { + errs[idx] = c.DeleteObject(ctx, bucket, object) + } + return errs, nil } // construct a metadata k-v map -func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string { +func getMetadata(objInfo ObjectInfo) map[string]string { metadata := make(map[string]string) metadata["etag"] = objInfo.ETag metadata["content-type"] = objInfo.ContentType - metadata["content-encoding"] = objInfo.ContentEncoding - - for key, val := range objInfo.UserDefined { - metadata[key] = val + if objInfo.ContentEncoding != "" { + metadata["content-encoding"] = objInfo.ContentEncoding + } + if objInfo.Expires != timeSentinel { + metadata["expires"] = objInfo.Expires.Format(http.TimeFormat) + } + for k, v := range objInfo.UserDefined { + metadata[k] = v } return metadata } -func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { - if c.isCacheExclude(bucket, object) { +func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { + if c.isCacheExclude(bucket, object) || c.skipCache() { return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) } + var cc cacheControl - // fetch cacheFSObjects if object is currently cached or nearest available cache drive - dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object) + // fetch diskCache if object is currently cached or nearest available cache drive + dcache, err := c.getCacheToLoc(ctx, bucket, object) if err != nil { return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) } - cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) + cacheReader, cacheErr := c.get(ctx, dcache, bucket, object, rs, h, opts) + if cacheErr == nil { + cc = cacheControlOpts(cacheReader.ObjInfo) + if !cc.isEmpty() && !cc.isStale(cacheReader.ObjInfo.ModTime) { + return cacheReader, nil + } + } objInfo, err := c.GetObjectInfoFn(ctx, bucket, object, opts) if backendDownError(err) && cacheErr == nil { @@ -212,660 +178,128 @@ func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, return nil, err } - if !objInfo.IsCacheable() || filterFromCache(objInfo.UserDefined) { + if !objInfo.IsCacheable() { return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) } if cacheErr == nil { - if cacheReader.ObjInfo.ETag == objInfo.ETag && !isStaleCache(objInfo) { - // Object is not stale, so serve from cache + // if ETag matches for stale cache entry, serve from cache + if cacheReader.ObjInfo.ETag == objInfo.ETag { + // Update metadata in case server-side copy might have changed object metadata + dcache.updateMetadataIfChanged(ctx, bucket, object, objInfo, cacheReader.ObjInfo) return cacheReader, nil } cacheReader.Close() // Object is stale, so delete from cache - dcache.Delete(ctx, bucket, object) + c.delete(ctx, dcache, bucket, object) } // Since we got here, we are serving the request from backend, // and also adding the object to the cache. - - if rs != nil { - // We don't cache partial objects. - return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) + if !dcache.diskUsageLow() { + select { + case dcache.purgeChan <- struct{}{}: + default: + } } if !dcache.diskAvailable(objInfo.Size) { return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) } + if rs != nil { + go func() { + // fill cache in the background for range GET requests + bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, h, lockType, opts) + if bErr != nil { + return + } + defer bReader.Close() + oi, err := c.stat(ctx, dcache, bucket, object) + // avoid cache overwrite if another background routine filled cache + if err != nil || oi.ETag != bReader.ObjInfo.ETag { + c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) + } + }() + return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) + } + bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) if bkErr != nil { return nil, bkErr } - // Initialize pipe. pipeReader, pipeWriter := io.Pipe() teeReader := io.TeeReader(bkReader, pipeWriter) - hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "", bkReader.ObjInfo.Size, globalCLIContext.StrictS3Compat) - if herr != nil { - bkReader.Close() - return nil, herr - } - go func() { - putErr := dcache.Put(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: c.getMetadata(bkReader.ObjInfo)}) + putErr := dcache.Put(ctx, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}) // close the write end of the pipe, so the error gets // propagated to getObjReader pipeWriter.CloseWithError(putErr) }() - cleanupBackend := func() { bkReader.Close() } cleanupPipe := func() { pipeReader.Close() } return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts.CheckCopyPrecondFn, cleanupBackend, cleanupPipe) } -// Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also -// stores it in the cache for serving subsequent requests. -func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) { - GetObjectFn := c.GetObjectFn - GetObjectInfoFn := c.GetObjectInfoFn - - if c.isCacheExclude(bucket, object) { - return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - // fetch cacheFSObjects if object is currently cached or nearest available cache drive - dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object) - if err != nil { - return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - // stat object on backend - objInfo, err := GetObjectInfoFn(ctx, bucket, object, opts) - backendDown := backendDownError(err) - if err != nil && !backendDown { - if _, ok := err.(ObjectNotFound); ok { - // Delete the cached entry if backend object was deleted. - dcache.Delete(ctx, bucket, object) - } - return err - } - - if !backendDown && !objInfo.IsCacheable() { - return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - - if !backendDown && filterFromCache(objInfo.UserDefined) { - return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - - cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object, opts) - if err == nil { - if backendDown { - // If the backend is down, serve the request from cache. - return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - if cachedObjInfo.ETag == objInfo.ETag && !isStaleCache(objInfo) { - return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - dcache.Delete(ctx, bucket, object) - } - if startOffset != 0 || (length > 0 && length != objInfo.Size) { - // We don't cache partial objects. - return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - if !dcache.diskAvailable(objInfo.Size) { - return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts) - } - // Initialize pipe. - pipeReader, pipeWriter := io.Pipe() - hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "", objInfo.Size, globalCLIContext.StrictS3Compat) - if err != nil { - return err - } - go func() { - gerr := GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts) - pipeWriter.CloseWithError(gerr) // Close writer explicitly signaling we wrote all data. - }() - - opts.UserDefined = c.getMetadata(objInfo) - err = dcache.Put(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts) - if err != nil { - return err - } - pipeReader.Close() - return -} - // Returns ObjectInfo from cache if available. -func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { +func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { getObjectInfoFn := c.GetObjectInfoFn - if c.isCacheExclude(bucket, object) { + + if c.isCacheExclude(bucket, object) || c.skipCache() { return getObjectInfoFn(ctx, bucket, object, opts) } - // fetch cacheFSObjects if object is currently cached or nearest available cache drive - dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object) + + // fetch diskCache if object is currently cached or nearest available cache drive + dcache, err := c.getCacheToLoc(ctx, bucket, object) if err != nil { return getObjectInfoFn(ctx, bucket, object, opts) } + var cc cacheControl + // if cache control setting is valid, avoid HEAD operation to backend + cachedObjInfo, cerr := c.stat(ctx, dcache, bucket, object) + if cerr == nil { + cc = cacheControlOpts(cachedObjInfo) + if !cc.isEmpty() && !cc.isStale(cachedObjInfo.ModTime) { + return cachedObjInfo, nil + } + } + objInfo, err := getObjectInfoFn(ctx, bucket, object, opts) if err != nil { if _, ok := err.(ObjectNotFound); ok { // Delete the cached entry if backend object was deleted. - dcache.Delete(ctx, bucket, object) + c.delete(ctx, dcache, bucket, object) return ObjectInfo{}, err } if !backendDownError(err) { return ObjectInfo{}, err } - // when backend is down, serve from cache. - cachedObjInfo, cerr := dcache.GetObjectInfo(ctx, bucket, object, opts) if cerr == nil { return cachedObjInfo, nil } return ObjectInfo{}, BackendDown{} } + // when backend is up, do a sanity check on cached object - cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object, opts) - if err != nil { + if cerr != nil { return objInfo, nil } if cachedObjInfo.ETag != objInfo.ETag { // Delete the cached entry if the backend object was replaced. - dcache.Delete(ctx, bucket, object) + c.delete(ctx, dcache, bucket, object) } return objInfo, nil } -// Returns function "listDir" of the type listDirFunc. -// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. -// disks - list of fsObjects -func listDirCacheFactory(isLeaf func(string, string) bool, disks []*cacheFSObjects) ListDirFunc { - listCacheDirs := func(bucket, prefixDir, prefixEntry string) (dirs []string) { - var entries []string - for _, disk := range disks { - // ignore disk-caches that might be missing/offline - if disk == nil { - continue - } - - fs := disk.FSObjects - var err error - entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir)) - if err != nil { - continue - } - - for i := range entries { - if isLeaf(bucket, entries[i]) { - entries[i] = strings.TrimSuffix(entries[i], SlashSeparator) - } - } - - // Filter entries that have the prefix prefixEntry. - entries = filterMatchingPrefix(entries, prefixEntry) - dirs = append(dirs, entries...) - } - return dirs - } - - // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string) { - cacheEntries := listCacheDirs(bucket, prefixDir, prefixEntry) - for _, entry := range cacheEntries { - // Find elements in entries which are not in mergedEntries - idx := sort.SearchStrings(mergedEntries, entry) - // if entry is already present in mergedEntries don't add. - if idx < len(mergedEntries) && mergedEntries[idx] == entry { - continue - } - mergedEntries = append(mergedEntries, entry) - sort.Strings(mergedEntries) - } - return mergedEntries - } - return listDir -} - -// List all objects at prefix upto maxKeys, optionally delimited by '/' from the cache. Maintains the list pool -// state for future re-entrant list requests. -func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { - var objInfos []ObjectInfo - var eof bool - var nextMarker string - - recursive := true - if delimiter == SlashSeparator { - recursive = false - } - walkResultCh, endWalkCh := c.listPool.Release(listParams{bucket, recursive, marker, prefix, false}) - if walkResultCh == nil { - endWalkCh = make(chan struct{}) - - listDir := listDirCacheFactory(func(bucket, object string) bool { - fs, err := c.cache.getCacheFS(ctx, bucket, object) - if err != nil { - return false - } - _, err = fs.getObjectInfo(ctx, bucket, object) - return err == nil - }, c.cache.cfs) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh) - } - - for i := 0; i < maxKeys; { - walkResult, ok := <-walkResultCh - if !ok { - // Closed channel. - eof = true - break - } - - entry := walkResult.entry - var objInfo ObjectInfo - if hasSuffix(entry, SlashSeparator) { - // Object name needs to be full path. - objInfo.Bucket = bucket - objInfo.Name = entry - objInfo.IsDir = true - } else { - // Set the Mode to a "regular" file. - var err error - fs, err := c.cache.getCacheFS(ctx, bucket, entry) - if err != nil { - // Ignore errDiskNotFound - if err == errDiskNotFound { - continue - } - return result, toObjectErr(err, bucket, prefix) - } - objInfo, err = fs.getObjectInfo(ctx, bucket, entry) - if err != nil { - // Ignore ObjectNotFound error - if _, ok := err.(ObjectNotFound); ok { - continue - } - return result, toObjectErr(err, bucket, prefix) - } - } - nextMarker = objInfo.Name - objInfos = append(objInfos, objInfo) - i++ - if walkResult.end { - eof = true - break - } - } - - params := listParams{bucket, recursive, nextMarker, prefix, false} - if !eof { - c.listPool.Set(params, walkResultCh, endWalkCh) - } - - result = ListObjectsInfo{IsTruncated: !eof} - for _, objInfo := range objInfos { - result.NextMarker = objInfo.Name - if objInfo.IsDir && delimiter == SlashSeparator { - result.Prefixes = append(result.Prefixes, objInfo.Name) - continue - } - result.Objects = append(result.Objects, objInfo) - } - return result, nil -} - -// listCacheV2Objects lists all blobs in bucket filtered by prefix from the cache -func (c cacheObjects) listCacheV2Objects(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) { - loi, err := c.listCacheObjects(ctx, bucket, prefix, continuationToken, delimiter, maxKeys) - if err != nil { - return result, err - } - - listObjectsV2Info := ListObjectsV2Info{ - IsTruncated: loi.IsTruncated, - ContinuationToken: continuationToken, - NextContinuationToken: loi.NextMarker, - Objects: loi.Objects, - Prefixes: loi.Prefixes, - } - return listObjectsV2Info, err -} - -// List all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool -// state for future re-entrant list requests. Retrieve from cache if backend is down -func (c cacheObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { - - listObjectsFn := c.ListObjectsFn - - result, err = listObjectsFn(ctx, bucket, prefix, marker, delimiter, maxKeys) - if err != nil { - if backendDownError(err) { - return c.listCacheObjects(ctx, bucket, prefix, marker, delimiter, maxKeys) - } - return - } - return -} - -// ListObjectsV2 lists all blobs in bucket filtered by prefix -func (c cacheObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) { - listObjectsV2Fn := c.ListObjectsV2Fn - - result, err = listObjectsV2Fn(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter) - if err != nil { - if backendDownError(err) { - return c.listCacheV2Objects(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter) - } - return - } - return -} - -// Lists all the buckets in the cache -func (c cacheObjects) listBuckets(ctx context.Context) (buckets []BucketInfo, err error) { - m := make(map[string]string) - for _, cache := range c.cache.cfs { - // ignore disk-caches that might be missing/offline +// StorageInfo - returns underlying storage statistics. +func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) { + var total, free uint64 + for _, cache := range c.cache { if cache == nil { continue } - entries, err := cache.ListBuckets(ctx) - - if err != nil { - return nil, err - } - for _, entry := range entries { - _, ok := m[entry.Name] - if !ok { - m[entry.Name] = entry.Name - buckets = append(buckets, entry) - } - } - } - // Sort bucket infos by bucket name. - sort.Sort(byBucketName(buckets)) - return -} - -// Returns list of buckets from cache or the backend. If the backend is down, buckets -// available on cache are served. -func (c cacheObjects) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) { - listBucketsFn := c.ListBucketsFn - buckets, err = listBucketsFn(ctx) - if err != nil { - if backendDownError(err) { - return c.listBuckets(ctx) - } - return []BucketInfo{}, err - } - return -} - -// Returns bucket info from cache if backend is down. -func (c cacheObjects) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) { - getBucketInfoFn := c.GetBucketInfoFn - bucketInfo, err = getBucketInfoFn(ctx, bucket) - if backendDownError(err) { - for _, cache := range c.cache.cfs { - // ignore disk-caches that might be missing/offline - if cache == nil { - continue - } - if bucketInfo, err = cache.GetBucketInfo(ctx, bucket); err == nil { - return - } - } - } - return -} - -// Delete Object deletes from cache as well if backend operation succeeds -func (c cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) { - if err = c.DeleteObjectFn(ctx, bucket, object); err != nil { - return - } - if c.isCacheExclude(bucket, object) { - return - } - dcache, cerr := c.cache.getCachedFSLoc(ctx, bucket, object) - if cerr == nil { - _ = dcache.DeleteObject(ctx, bucket, object) - } - return -} - -func (c cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { - errs := make([]error, len(objects)) - for idx, object := range objects { - errs[idx] = c.DeleteObject(ctx, bucket, object) - } - return errs, nil -} - -// Returns true if object should be excluded from cache -func (c cacheObjects) isCacheExclude(bucket, object string) bool { - for _, pattern := range c.exclude { - matchStr := fmt.Sprintf("%s/%s", bucket, object) - if ok := wildcard.MatchSimple(pattern, matchStr); ok { - return true - } - } - return false -} - -// PutObject - caches the uploaded object for single Put operations -func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { - putObjectFn := c.PutObjectFn - data := r.Reader - dcache, err := c.cache.getCacheFS(ctx, bucket, object) - if err != nil { - // disk cache could not be located,execute backend call. - return putObjectFn(ctx, bucket, object, r, opts) - } - size := r.Size() - - // fetch from backend if there is no space on cache drive - if !dcache.diskAvailable(size) { - return putObjectFn(ctx, bucket, object, r, opts) - } - // fetch from backend if cache exclude pattern or cache-control - // directive set to exclude - if c.isCacheExclude(bucket, object) || filterFromCache(opts.UserDefined) { - dcache.Delete(ctx, bucket, object) - return putObjectFn(ctx, bucket, object, r, opts) - } - objInfo = ObjectInfo{} - // Initialize pipe to stream data to backend - pipeReader, pipeWriter := io.Pipe() - hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize(), globalCLIContext.StrictS3Compat) - if err != nil { - return ObjectInfo{}, err - } - // Initialize pipe to stream data to cache - rPipe, wPipe := io.Pipe() - cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize(), globalCLIContext.StrictS3Compat) - if err != nil { - return ObjectInfo{}, err - } - oinfoCh := make(chan ObjectInfo) - errCh := make(chan error) - go func() { - oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts) - if perr != nil { - pipeWriter.CloseWithError(perr) - wPipe.CloseWithError(perr) - close(oinfoCh) - errCh <- perr - return - } - close(errCh) - oinfoCh <- oinfo - }() - - go func() { - if err = dcache.Put(ctx, bucket, object, NewPutObjReader(cHashReader, nil, nil), opts); err != nil { - wPipe.CloseWithError(err) - return - } - }() - - mwriter := io.MultiWriter(pipeWriter, wPipe) - _, err = io.Copy(mwriter, data) - if err != nil { - err = <-errCh - return objInfo, err - } - pipeWriter.Close() - wPipe.Close() - objInfo = <-oinfoCh - return objInfo, err -} - -// NewMultipartUpload - Starts a new multipart upload operation to backend and cache. -func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) { - newMultipartUploadFn := c.NewMultipartUploadFn - - if c.isCacheExclude(bucket, object) || filterFromCache(opts.UserDefined) { - return newMultipartUploadFn(ctx, bucket, object, opts) - } - - dcache, err := c.cache.getCacheFS(ctx, bucket, object) - if err != nil { - // disk cache could not be located,execute backend call. - return newMultipartUploadFn(ctx, bucket, object, opts) - } - - uploadID, err = newMultipartUploadFn(ctx, bucket, object, opts) - if err != nil { - return - } - // create new multipart upload in cache with same uploadID - dcache.NewMultipartUpload(ctx, bucket, object, uploadID, opts) - return uploadID, err -} - -// PutObjectPart - uploads part to backend and cache simultaneously. -func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (info PartInfo, err error) { - data := r.Reader - putObjectPartFn := c.PutObjectPartFn - dcache, err := c.cache.getCacheFS(ctx, bucket, object) - if err != nil { - // disk cache could not be located,execute backend call. - return putObjectPartFn(ctx, bucket, object, uploadID, partID, r, opts) - } - - if c.isCacheExclude(bucket, object) { - return putObjectPartFn(ctx, bucket, object, uploadID, partID, r, opts) - } - - // make sure cache has at least size space available - size := data.Size() - if !dcache.diskAvailable(size) { - select { - case dcache.purgeChan <- struct{}{}: - default: - } - return putObjectPartFn(ctx, bucket, object, uploadID, partID, r, opts) - } - - info = PartInfo{} - // Initialize pipe to stream data to backend - pipeReader, pipeWriter := io.Pipe() - hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize(), globalCLIContext.StrictS3Compat) - if err != nil { - return - } - // Initialize pipe to stream data to cache - rPipe, wPipe := io.Pipe() - cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize(), globalCLIContext.StrictS3Compat) - if err != nil { - return - } - pinfoCh := make(chan PartInfo) - errorCh := make(chan error) - go func() { - info, err = putObjectPartFn(ctx, bucket, object, uploadID, partID, NewPutObjReader(hashReader, nil, nil), opts) - if err != nil { - close(pinfoCh) - pipeWriter.CloseWithError(err) - wPipe.CloseWithError(err) - errorCh <- err - return - } - close(errorCh) - pinfoCh <- info - }() - go func() { - if _, perr := dcache.PutObjectPart(ctx, bucket, object, uploadID, partID, NewPutObjReader(cHashReader, nil, nil), opts); perr != nil { - wPipe.CloseWithError(perr) - return - } - }() - - mwriter := io.MultiWriter(pipeWriter, wPipe) - _, err = io.Copy(mwriter, data) - if err != nil { - err = <-errorCh - return PartInfo{}, err - } - pipeWriter.Close() - wPipe.Close() - info = <-pinfoCh - return info, err -} - -// AbortMultipartUpload - aborts multipart upload on backend and cache. -func (c cacheObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error { - abortMultipartUploadFn := c.AbortMultipartUploadFn - - if c.isCacheExclude(bucket, object) { - return abortMultipartUploadFn(ctx, bucket, object, uploadID) - } - - dcache, err := c.cache.getCacheFS(ctx, bucket, object) - if err != nil { - // disk cache could not be located,execute backend call. - return abortMultipartUploadFn(ctx, bucket, object, uploadID) - } - // execute backend operation - err = abortMultipartUploadFn(ctx, bucket, object, uploadID) - if err != nil { - return err - } - // abort multipart upload on cache - dcache.AbortMultipartUpload(ctx, bucket, object, uploadID) - return nil -} - -// CompleteMultipartUpload - completes multipart upload operation on backend and cache. -func (c cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) { - completeMultipartUploadFn := c.CompleteMultipartUploadFn - - if c.isCacheExclude(bucket, object) { - return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts) - } - - dcache, err := c.cache.getCacheFS(ctx, bucket, object) - if err != nil { - // disk cache could not be located,execute backend call. - return completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts) - } - // perform backend operation - objInfo, err = completeMultipartUploadFn(ctx, bucket, object, uploadID, uploadedParts, opts) - if err != nil { - return - } - // create new multipart upload in cache with same uploadID - dcache.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) - return -} - -// StorageInfo - returns underlying storage statistics. -func (c cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) { - var total, free uint64 - for _, cfs := range c.cache.cfs { - if cfs == nil { - continue - } - info, err := getDiskInfo(cfs.fsPath) - logger.GetReqInfo(ctx).AppendTags("cachePath", cfs.fsPath) + info, err := getDiskInfo(cache.dir) + logger.GetReqInfo(ctx).AppendTags("cachePath", cache.dir) logger.LogIf(ctx, err) total += info.Total free += info.Free @@ -876,62 +310,113 @@ func (c cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) } } -// DeleteBucket - marks bucket to be deleted from cache if bucket is deleted from backend. -func (c cacheObjects) DeleteBucket(ctx context.Context, bucket string) (err error) { - deleteBucketFn := c.DeleteBucketFn - var toDel []*cacheFSObjects - for _, cfs := range c.cache.cfs { - // ignore disk-caches that might be missing/offline - if cfs == nil { +// skipCache() returns true if cache migration is in progress +func (c *cacheObjects) skipCache() bool { + c.migMutex.Lock() + defer c.migMutex.Unlock() + return c.migrating +} + +// Returns true if object should be excluded from cache +func (c *cacheObjects) isCacheExclude(bucket, object string) bool { + // exclude directories from cache + if strings.HasSuffix(object, SlashSeparator) { + return true + } + for _, pattern := range c.exclude { + matchStr := fmt.Sprintf("%s/%s", bucket, object) + if ok := wildcard.MatchSimple(pattern, matchStr); ok { + return true + } + } + return false +} + +// choose a cache deterministically based on hash of bucket,object. The hash index is treated as +// a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives +// as a circular buffer and walk through them starting at hash index until an online drive is found. +func (c *cacheObjects) getCacheLoc(ctx context.Context, bucket, object string) (*diskCache, error) { + index := c.hashIndex(bucket, object) + numDisks := len(c.cache) + for k := 0; k < numDisks; k++ { + i := (index + k) % numDisks + if c.cache[i] == nil { continue } - if _, cerr := cfs.GetBucketInfo(ctx, bucket); cerr == nil { - toDel = append(toDel, cfs) + if c.cache[i].IsOnline() { + return c.cache[i], nil } } - // perform backend operation - err = deleteBucketFn(ctx, bucket) - if err != nil { - return + return nil, errDiskNotFound +} + +// get cache disk where object is currently cached for a GET operation. If object does not exist at that location, +// treat the list of cache drives as a circular buffer and walk through them starting at hash index +// until an online drive is found.If object is not found, fall back to the first online cache drive +// closest to the hash index, so that object can be re-cached. +func (c *cacheObjects) getCacheToLoc(ctx context.Context, bucket, object string) (*diskCache, error) { + index := c.hashIndex(bucket, object) + + numDisks := len(c.cache) + // save first online cache disk closest to the hint index + var firstOnlineDisk *diskCache + for k := 0; k < numDisks; k++ { + i := (index + k) % numDisks + if c.cache[i] == nil { + continue + } + if c.cache[i].IsOnline() { + if firstOnlineDisk == nil { + firstOnlineDisk = c.cache[i] + } + if c.cache[i].Exists(ctx, bucket, object) { + return c.cache[i], nil + } + } } - // move bucket metadata and content to cache's trash dir - for _, d := range toDel { - d.moveBucketToTrash(ctx, bucket) + + if firstOnlineDisk != nil { + return firstOnlineDisk, nil } - return + return nil, errDiskNotFound +} + +// Compute a unique hash sum for bucket and object +func (c *cacheObjects) hashIndex(bucket, object string) int { + return crcHashMod(pathJoin(bucket, object), len(c.cache)) } // newCache initializes the cacheFSObjects for the "drives" specified in config.json // or the global env overrides. -func newCache(config CacheConfig) (*diskCache, error) { - var cfsObjects []*cacheFSObjects +func newCache(config CacheConfig) ([]*diskCache, bool, error) { + var caches []*diskCache ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{}) - formats, err := loadAndValidateCacheFormat(ctx, config.Drives) + formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives) if err != nil { - return nil, err + return nil, false, err } for i, dir := range config.Drives { - // skip cacheFSObjects creation for cache drives missing a format.json + // skip diskCache creation for cache drives missing a format.json if formats[i] == nil { - cfsObjects = append(cfsObjects, nil) + caches = append(caches, nil) continue } if err := checkAtimeSupport(dir); err != nil { - return nil, errors.New("Atime support required for disk caching") + return nil, false, errors.New("Atime support required for disk caching") } - cache, err := newCacheFSObjects(dir, config.Expiry, config.MaxUse) + + cache, err := newdiskCache(dir, config.Expiry, config.MaxUse) if err != nil { - return nil, err + return nil, false, err + } + // Start the purging go-routine for entries that have expired if no migration in progress + if !migrating { + go cache.purge() } - // Start the purging go-routine for entries that have expired - go cache.purge() - // Start trash purge routine for deleted buckets. - go cache.purgeTrash() - - cfsObjects = append(cfsObjects, cache) + caches = append(caches, cache) } - return &diskCache{cfs: cfsObjects}, nil + return caches, migrating, nil } // Return error if Atime is disabled on the O/S @@ -959,31 +444,65 @@ func checkAtimeSupport(dir string) (err error) { } return } +func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) { + logger.StartupMessage(colorBlue("Cache migration initiated ....")) + var wg = &sync.WaitGroup{} + errs := make([]error, len(c.cache)) + for i, dc := range c.cache { + if dc == nil { + continue + } + wg.Add(1) + // start migration from V1 to V2 + go func(ctx context.Context, dc *diskCache, errs []error, idx int) { + defer wg.Done() + if err := migrateOldCache(ctx, dc); err != nil { + errs[idx] = err + logger.LogIf(ctx, err) + return + } + // start purge routine after migration completes. + go dc.purge() + }(ctx, dc, errs, i) + } + wg.Wait() + + errCnt := 0 + for _, err := range errs { + if err != nil { + errCnt++ + } + } + if errCnt > 0 { + return + } + // update migration status + c.migMutex.Lock() + defer c.migMutex.Unlock() + c.migrating = false + logger.StartupMessage(colorBlue("Cache migration completed successfully.")) +} // Returns cacheObjects for use by Server. -func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) { +func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObjectLayer, error) { // list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var. - dcache, err := newCache(config) + cache, migrateSw, err := newCache(config) if err != nil { return nil, err } - return &cacheObjects{ - cache: dcache, - exclude: config.Exclude, - listPool: NewTreeWalkPool(globalLookupTimeout), - GetObjectFn: func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error { - return newObjectLayerFn().GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts) - }, + c := &cacheObjects{ + cache: cache, + exclude: config.Exclude, + nsMutex: newNSLock(false), + migrating: migrateSw, + migMutex: sync.Mutex{}, GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts) }, GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) }, - PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { - return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts) - }, DeleteObjectFn: func(ctx context.Context, bucket, object string) error { return newObjectLayerFn().DeleteObject(ctx, bucket, object) }, @@ -994,142 +513,9 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) { } return errs, nil }, - - ListObjectsFn: func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) { - return newObjectLayerFn().ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys) - }, - ListObjectsV2Fn: func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) { - return newObjectLayerFn().ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, fetchOwner, startAfter) - }, - ListBucketsFn: func(ctx context.Context) (buckets []BucketInfo, err error) { - return newObjectLayerFn().ListBuckets(ctx) - }, - GetBucketInfoFn: func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) { - return newObjectLayerFn().GetBucketInfo(ctx, bucket) - }, - NewMultipartUploadFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) { - return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, opts) - }, - PutObjectPartFn: func(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) { - return newObjectLayerFn().PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) - }, - AbortMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string) error { - return newObjectLayerFn().AbortMultipartUpload(ctx, bucket, object, uploadID) - }, - CompleteMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) { - return newObjectLayerFn().CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) - }, - DeleteBucketFn: func(ctx context.Context, bucket string) error { - return newObjectLayerFn().DeleteBucket(ctx, bucket) - }, - }, nil -} - -type cacheControl struct { - exclude bool - expiry time.Time - maxAge int - sMaxAge int - minFresh int -} - -// cache exclude directives in cache-control header -var cacheExcludeDirectives = []string{ - "no-cache", - "no-store", - "must-revalidate", -} - -// returns true if cache exclude directives are set. -func isCacheExcludeDirective(s string) bool { - for _, directive := range cacheExcludeDirectives { - if s == directive { - return true - } } - return false -} - -// returns struct with cache-control settings from user metadata. -func getCacheControlOpts(m map[string]string) (c cacheControl, err error) { - var headerVal string - for k, v := range m { - if k == "cache-control" { - headerVal = v - } - if k == "expires" { - if e, err := http.ParseTime(v); err == nil { - c.expiry = e - } - } - } - if headerVal == "" { - return - } - headerVal = strings.ToLower(headerVal) - headerVal = strings.TrimSpace(headerVal) - - vals := strings.Split(headerVal, ",") - for _, val := range vals { - val = strings.TrimSpace(val) - p := strings.Split(val, "=") - if isCacheExcludeDirective(p[0]) { - c.exclude = true - continue - } - - if len(p) != 2 { - continue - } - if p[0] == "max-age" || - p[0] == "s-maxage" || - p[0] == "min-fresh" { - i, err := strconv.Atoi(p[1]) - if err != nil { - return c, err - } - if p[0] == "max-age" { - c.maxAge = i - } - if p[0] == "s-maxage" { - c.sMaxAge = i - } - if p[0] == "min-fresh" { - c.minFresh = i - } - } + if migrateSw { + go c.migrateCacheFromV1toV2(ctx) } return c, nil } - -// return true if metadata has a cache-control header -// directive to exclude object from cache. -func filterFromCache(m map[string]string) bool { - c, err := getCacheControlOpts(m) - if err != nil { - return false - } - return c.exclude -} - -// returns true if cache expiry conditions met in cache-control/expiry metadata. -func isStaleCache(objInfo ObjectInfo) bool { - c, err := getCacheControlOpts(objInfo.UserDefined) - if err != nil { - return false - } - now := time.Now() - if c.sMaxAge > 0 && c.sMaxAge > int(now.Sub(objInfo.ModTime).Seconds()) { - return true - } - if c.maxAge > 0 && c.maxAge > int(now.Sub(objInfo.ModTime).Seconds()) { - return true - } - if !c.expiry.Equal(time.Time{}) && c.expiry.Before(time.Now()) { - return true - } - if c.minFresh > 0 && c.minFresh <= int(now.Sub(objInfo.ModTime).Seconds()) { - return true - } - return false -} diff --git a/cmd/disk-cache_test.go b/cmd/disk-cache_test.go index ffbdc9ba9..80ad46964 100644 --- a/cmd/disk-cache_test.go +++ b/cmd/disk-cache_test.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2018 MinIO, Inc. + * MinIO Cloud Storage, (C) 2018,2019 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,34 +19,52 @@ package cmd import ( "bytes" "context" - "reflect" + "io" + "net/http" "testing" - "time" "github.com/minio/minio/pkg/hash" ) -// Initialize cache FS objects. -func initCacheFSObjects(disk string, cacheMaxUse int) (*cacheFSObjects, error) { - return newCacheFSObjects(disk, globalCacheExpiry, cacheMaxUse) +// Initialize cache objects. +func initCacheObjects(disk string, cacheMaxUse int) (*diskCache, error) { + return newdiskCache(disk, globalCacheExpiry, cacheMaxUse) } // inits diskCache struct for nDisks -func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) (*diskCache, error) { - var cfs []*cacheFSObjects +func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) ([]*diskCache, error) { + var cb []*diskCache for _, d := range drives { - obj, err := initCacheFSObjects(d, cacheMaxUse) + obj, err := initCacheObjects(d, cacheMaxUse) if err != nil { return nil, err } - cfs = append(cfs, obj) + cb = append(cb, obj) + } + return cb, nil +} + +// Tests ToObjectInfo function. +func TestCacheMetadataObjInfo(t *testing.T) { + m := cacheMeta{Meta: nil} + objInfo := m.ToObjectInfo("testbucket", "testobject") + if objInfo.Size != 0 { + t.Fatal("Unexpected object info value for Size", objInfo.Size) + } + if objInfo.ModTime != timeSentinel { + t.Fatal("Unexpected object info value for ModTime ", objInfo.ModTime) + } + if objInfo.IsDir { + t.Fatal("Unexpected object info value for IsDir", objInfo.IsDir) + } + if !objInfo.Expires.IsZero() { + t.Fatal("Unexpected object info value for Expires ", objInfo.Expires) } - return &diskCache{cfs: cfs}, nil } // test whether a drive being offline causes -// getCacheFS to fetch next online drive -func TestGetCacheFS(t *testing.T) { +// getCachedLoc to fetch next online drive +func TestGetCachedLoc(t *testing.T) { for n := 1; n < 10; n++ { fsDirs, err := getRandomDisks(n) if err != nil { @@ -56,14 +74,15 @@ func TestGetCacheFS(t *testing.T) { if err != nil { t.Fatal(err) } + c := cacheObjects{cache: d} bucketName := "testbucket" objectName := "testobject" ctx := context.Background() // find cache drive where object would be hashed - index := d.hashIndex(bucketName, objectName) + index := c.hashIndex(bucketName, objectName) // turn off drive by setting online status to false - d.cfs[index].online = false - cfs, err := d.getCacheFS(ctx, bucketName, objectName) + c.cache[index].online = false + cfs, err := c.getCacheLoc(ctx, bucketName, objectName) if n == 1 && err == errDiskNotFound { continue } @@ -71,7 +90,7 @@ func TestGetCacheFS(t *testing.T) { t.Fatal(err) } i := -1 - for j, f := range d.cfs { + for j, f := range c.cache { if f == cfs { i = j break @@ -84,8 +103,8 @@ func TestGetCacheFS(t *testing.T) { } // test whether a drive being offline causes -// getCacheFS to fetch next online drive -func TestGetCacheFSMaxUse(t *testing.T) { +// getCachedLoc to fetch next online drive +func TestGetCacheMaxUse(t *testing.T) { for n := 1; n < 10; n++ { fsDirs, err := getRandomDisks(n) if err != nil { @@ -95,14 +114,16 @@ func TestGetCacheFSMaxUse(t *testing.T) { if err != nil { t.Fatal(err) } + c := cacheObjects{cache: d} + bucketName := "testbucket" objectName := "testobject" ctx := context.Background() // find cache drive where object would be hashed - index := d.hashIndex(bucketName, objectName) + index := c.hashIndex(bucketName, objectName) // turn off drive by setting online status to false - d.cfs[index].online = false - cfs, err := d.getCacheFS(ctx, bucketName, objectName) + c.cache[index].online = false + cb, err := c.getCacheLoc(ctx, bucketName, objectName) if n == 1 && err == errDiskNotFound { continue } @@ -110,8 +131,8 @@ func TestGetCacheFSMaxUse(t *testing.T) { t.Fatal(err) } i := -1 - for j, f := range d.cfs { - if f == cfs { + for j, f := range d { + if f == cb { i = j break } @@ -165,7 +186,9 @@ func TestDiskCache(t *testing.T) { if err != nil { t.Fatal(err) } - cache := d.cfs[0] + c := cacheObjects{cache: d} + + cache := c.cache[0] ctx := context.Background() bucketName := "testbucket" objectName := "testobject" @@ -191,14 +214,17 @@ func TestDiskCache(t *testing.T) { if err != nil { t.Fatal(err) } - err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta}) if err != nil { t.Fatal(err) } - cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName, opts) + cReader, err := cache.Get(ctx, bucketName, objectName, nil, http.Header{ + "Content-Type": []string{"application/json"}, + }, opts) if err != nil { t.Fatal(err) } + cachedObjInfo := cReader.ObjInfo if !cache.Exists(ctx, bucketName, objectName) { t.Fatal("Expected object to exist on cache") } @@ -212,17 +238,16 @@ func TestDiskCache(t *testing.T) { t.Fatal("Cached content-type does not match") } writer := bytes.NewBuffer(nil) - err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "", opts) + _, err = io.Copy(writer, cReader) if err != nil { t.Fatal(err) } if ccontent := writer.Bytes(); !bytes.Equal([]byte(content), ccontent) { t.Errorf("wrong cached file content") } - err = cache.Delete(ctx, bucketName, objectName) - if err != nil { - t.Errorf("object missing from cache") - } + cReader.Close() + + cache.Delete(ctx, bucketName, objectName) online := cache.IsOnline() if !online { t.Errorf("expected cache drive to be online") @@ -239,7 +264,7 @@ func TestDiskCacheMaxUse(t *testing.T) { if err != nil { t.Fatal(err) } - cache := d.cfs[0] + cache := d[0] ctx := context.Background() bucketName := "testbucket" objectName := "testobject" @@ -267,19 +292,20 @@ func TestDiskCacheMaxUse(t *testing.T) { t.Fatal(err) } if !cache.diskAvailable(int64(size)) { - err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta}) if err != errDiskFull { t.Fatal("Cache max-use limit violated.") } } else { - err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta}) if err != nil { t.Fatal(err) } - cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName, opts) + cReader, err := cache.Get(ctx, bucketName, objectName, nil, nil, opts) if err != nil { t.Fatal(err) } + cachedObjInfo := cReader.ObjInfo if !cache.Exists(ctx, bucketName, objectName) { t.Fatal("Expected object to exist on cache") } @@ -293,92 +319,19 @@ func TestDiskCacheMaxUse(t *testing.T) { t.Fatal("Cached content-type does not match") } writer := bytes.NewBuffer(nil) - err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "", opts) + _, err = io.Copy(writer, cReader) if err != nil { t.Fatal(err) } if ccontent := writer.Bytes(); !bytes.Equal([]byte(content), ccontent) { t.Errorf("wrong cached file content") } - err = cache.Delete(ctx, bucketName, objectName) - if err != nil { - t.Errorf("object missing from cache") - } + cReader.Close() + + cache.Delete(ctx, bucketName, objectName) online := cache.IsOnline() if !online { t.Errorf("expected cache drive to be online") } } } - -func TestIsCacheExcludeDirective(t *testing.T) { - testCases := []struct { - cacheControlOpt string - expectedResult bool - }{ - {"no-cache", true}, - {"no-store", true}, - {"must-revalidate", true}, - {"no-transform", false}, - {"max-age", false}, - } - - for i, testCase := range testCases { - if isCacheExcludeDirective(testCase.cacheControlOpt) != testCase.expectedResult { - t.Errorf("Cache exclude directive test failed for case %d", i) - } - } -} - -func TestGetCacheControlOpts(t *testing.T) { - testCases := []struct { - cacheControlHeaderVal string - expiryHeaderVal string - expectedCacheControl cacheControl - expectedErr bool - }{ - {"", "", cacheControl{}, false}, - {"max-age=2592000, public", "", cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}, exclude: false}, false}, - {"max-age=2592000, no-store", "", cacheControl{maxAge: 2592000, sMaxAge: 0, minFresh: 0, expiry: time.Time{}, exclude: true}, false}, - {"must-revalidate, max-age=600", "", cacheControl{maxAge: 600, sMaxAge: 0, minFresh: 0, expiry: time.Time{}, exclude: true}, false}, - {"s-maxAge=2500, max-age=600", "", cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}, exclude: false}, false}, - {"s-maxAge=2500, max-age=600", "Wed, 21 Oct 2015 07:28:00 GMT", cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Date(2015, time.October, 21, 07, 28, 00, 00, time.UTC), exclude: false}, false}, - {"s-maxAge=2500, max-age=600s", "", cacheControl{maxAge: 600, sMaxAge: 2500, minFresh: 0, expiry: time.Time{}, exclude: false}, true}, - } - var m map[string]string - - for i, testCase := range testCases { - m = make(map[string]string) - m["cache-control"] = testCase.cacheControlHeaderVal - if testCase.expiryHeaderVal != "" { - m["expires"] = testCase.expiryHeaderVal - } - c, err := getCacheControlOpts(m) - if testCase.expectedErr && err == nil { - t.Errorf("expected err for case %d", i) - } - if !testCase.expectedErr && !reflect.DeepEqual(c, testCase.expectedCacheControl) { - t.Errorf("expected %v got %v for case %d", testCase.expectedCacheControl, c, i) - } - - } -} - -func TestFilterFromCache(t *testing.T) { - testCases := []struct { - metadata map[string]string - expectedResult bool - }{ - {map[string]string{"content-type": "application/json"}, false}, - {map[string]string{"cache-control": "private,no-store"}, true}, - {map[string]string{"cache-control": "no-cache,must-revalidate"}, true}, - {map[string]string{"cache-control": "no-transform"}, false}, - {map[string]string{"cache-control": "max-age=3600"}, false}, - } - - for i, testCase := range testCases { - if filterFromCache(testCase.metadata) != testCase.expectedResult { - t.Errorf("Cache exclude directive test failed for case %d", i) - } - } -} diff --git a/cmd/format-disk-cache.go b/cmd/format-disk-cache.go index c007f5a50..b1bfff6f4 100644 --- a/cmd/format-disk-cache.go +++ b/cmd/format-disk-cache.go @@ -18,11 +18,16 @@ package cmd import ( "context" + "encoding/json" "errors" "fmt" "io" + "io/ioutil" "os" + "path" + "path/filepath" "reflect" + "strings" "github.com/minio/minio/cmd/logger" ) @@ -33,6 +38,7 @@ const ( // formatCacheV1.Cache.Version formatCacheVersionV1 = "1" + formatCacheVersionV2 = "2" formatMetaVersion1 = "1" @@ -56,6 +62,9 @@ type formatCacheV1 struct { } `json:"cache"` // Cache field holds cache format. } +// formatCacheV2 is same as formatCacheV1 +type formatCacheV2 = formatCacheV1 + // Used to detect the version of "cache" format. type formatCacheVersionDetect struct { Cache struct { @@ -64,17 +73,17 @@ type formatCacheVersionDetect struct { } // Return a slice of format, to be used to format uninitialized disks. -func newFormatCacheV1(drives []string) []*formatCacheV1 { +func newFormatCacheV2(drives []string) []*formatCacheV2 { diskCount := len(drives) var disks = make([]string, diskCount) - var formats = make([]*formatCacheV1, diskCount) + var formats = make([]*formatCacheV2, diskCount) for i := 0; i < diskCount; i++ { - format := &formatCacheV1{} + format := &formatCacheV2{} format.Version = formatMetaVersion1 format.Format = formatCache - format.Cache.Version = formatCacheVersionV1 + format.Cache.Version = formatCacheVersionV2 format.Cache.DistributionAlgo = formatCacheV1DistributionAlgo format.Cache.This = mustGetUUID() formats[i] = format @@ -87,6 +96,15 @@ func newFormatCacheV1(drives []string) []*formatCacheV1 { return formats } +// Returns formatCache.Cache.Version +func formatCacheGetVersion(r io.ReadSeeker) (string, error) { + format := &formatCacheVersionDetect{} + if err := jsonLoad(r, format); err != nil { + return "", err + } + return format.Cache.Version, nil +} + // Creates a new cache format.json if unformatted. func createFormatCache(fsFormatPath string, format *formatCacheV1) error { // open file using READ & WRITE permission @@ -110,8 +128,8 @@ func createFormatCache(fsFormatPath string, format *formatCacheV1) error { // This function creates a cache format file on disk and returns a slice // of format cache config -func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV1, err error) { - nformats := newFormatCacheV1(drives) +func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV2, err error) { + nformats := newFormatCacheV2(drives) for _, drive := range drives { _, err = os.Stat(drive) if err == nil { @@ -147,26 +165,33 @@ func initFormatCache(ctx context.Context, drives []string) (formats []*formatCac return nformats, nil } -func loadFormatCache(ctx context.Context, drives []string) ([]*formatCacheV1, error) { - formats := make([]*formatCacheV1, len(drives)) +func loadFormatCache(ctx context.Context, drives []string) ([]*formatCacheV2, bool, error) { + formats := make([]*formatCacheV2, len(drives)) + var formatV2 *formatCacheV2 + migrating := false for i, drive := range drives { cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile) - f, err := os.Open(cacheFormatPath) + f, err := os.OpenFile(cacheFormatPath, os.O_RDWR, 0) + if err != nil { if os.IsNotExist(err) { continue } logger.LogIf(ctx, err) - return nil, err + return nil, migrating, err } defer f.Close() format, err := formatMetaCacheV1(f) if err != nil { continue } - formats[i] = format + formatV2 = format + if format.Cache.Version != formatCacheVersionV2 { + migrating = true + } + formats[i] = formatV2 } - return formats, nil + return formats, migrating, nil } // unmarshalls the cache format.json into formatCacheV1 @@ -178,26 +203,38 @@ func formatMetaCacheV1(r io.ReadSeeker) (*formatCacheV1, error) { return format, nil } -func checkFormatCacheValue(format *formatCacheV1) error { +func checkFormatCacheValue(format *formatCacheV2, migrating bool) error { + if format.Format != formatCache { + return fmt.Errorf("Unsupported cache format [%s] found", format.Format) + } + + // during migration one or more cache drive(s) formats can be out of sync + if migrating { + // Validate format version and format type. + if format.Version != formatMetaVersion1 { + return fmt.Errorf("Unsupported version of cache format [%s] found", format.Version) + } + if format.Cache.Version != formatCacheVersionV2 && format.Cache.Version != formatCacheVersionV1 { + return fmt.Errorf("Unsupported Cache backend format found [%s]", format.Cache.Version) + } + return nil + } // Validate format version and format type. if format.Version != formatMetaVersion1 { return fmt.Errorf("Unsupported version of cache format [%s] found", format.Version) } - if format.Format != formatCache { - return fmt.Errorf("Unsupported cache format [%s] found", format.Format) - } - if format.Cache.Version != formatCacheVersionV1 { + if format.Cache.Version != formatCacheVersionV2 { return fmt.Errorf("Unsupported Cache backend format found [%s]", format.Cache.Version) } return nil } -func checkFormatCacheValues(formats []*formatCacheV1) (int, error) { +func checkFormatCacheValues(migrating bool, formats []*formatCacheV2) (int, error) { for i, formatCache := range formats { if formatCache == nil { continue } - if err := checkFormatCacheValue(formatCache); err != nil { + if err := checkFormatCacheValue(formatCache, migrating); err != nil { return i, err } if len(formats) != len(formatCache.Cache.Disks) { @@ -210,7 +247,7 @@ func checkFormatCacheValues(formats []*formatCacheV1) (int, error) { // checkCacheDisksConsistency - checks if "This" disk uuid on each disk is consistent with all "Disks" slices // across disks. -func checkCacheDiskConsistency(formats []*formatCacheV1) error { +func checkCacheDiskConsistency(formats []*formatCacheV2) error { var disks = make([]string, len(formats)) // Collect currently available disk uuids. for index, format := range formats { @@ -236,7 +273,7 @@ func checkCacheDiskConsistency(formats []*formatCacheV1) error { } // checkCacheDisksSliceConsistency - validate cache Disks order if they are consistent. -func checkCacheDisksSliceConsistency(formats []*formatCacheV1) error { +func checkCacheDisksSliceConsistency(formats []*formatCacheV2) error { var sentinelDisks []string // Extract first valid Disks slice. for _, format := range formats { @@ -269,7 +306,7 @@ func findCacheDiskIndex(disk string, disks []string) int { } // validate whether cache drives order has changed -func validateCacheFormats(ctx context.Context, formats []*formatCacheV1) error { +func validateCacheFormats(ctx context.Context, migrating bool, formats []*formatCacheV2) error { count := 0 for _, format := range formats { if format == nil { @@ -279,7 +316,7 @@ func validateCacheFormats(ctx context.Context, formats []*formatCacheV1) error { if count == len(formats) { return errors.New("Cache format files missing on all drives") } - if _, err := checkFormatCacheValues(formats); err != nil { + if _, err := checkFormatCacheValues(migrating, formats); err != nil { logger.LogIf(ctx, err) return err } @@ -308,17 +345,161 @@ func cacheDrivesUnformatted(drives []string) bool { // create format.json for each cache drive if fresh disk or load format from disk // Then validate the format for all drives in the cache to ensure order // of cache drives has not changed. -func loadAndValidateCacheFormat(ctx context.Context, drives []string) (formats []*formatCacheV1, err error) { +func loadAndValidateCacheFormat(ctx context.Context, drives []string) (formats []*formatCacheV2, migrating bool, err error) { if cacheDrivesUnformatted(drives) { formats, err = initFormatCache(ctx, drives) } else { - formats, err = loadFormatCache(ctx, drives) + formats, migrating, err = loadFormatCache(ctx, drives) } if err != nil { - return nil, err + return nil, false, err } - if err = validateCacheFormats(ctx, formats); err != nil { - return nil, err + if err = validateCacheFormats(ctx, migrating, formats); err != nil { + return nil, false, err } - return formats, nil + return formats, migrating, nil +} + +// reads cached object on disk and writes it back after adding bitrot +// hashsum per block as per the new disk cache format. +func migrateData(ctx context.Context, c *diskCache, oldfile, destDir string) error { + st, err := os.Stat(oldfile) + if err != nil { + err = osErrToFSFileErr(err) + return err + } + readCloser, err := readCacheFileStream(oldfile, 0, st.Size()) + if err != nil { + return err + } + _, err = c.bitrotWriteToCache(ctx, destDir, readCloser, st.Size()) + return err +} + +// migrate cache contents from old cacheFS format to new backend format +// new format is flat +// sha(bucket,object)/ <== dir name +// - part.1 <== data +// - cache.json <== metadata +func migrateOldCache(ctx context.Context, c *diskCache) error { + oldCacheBucketsPath := path.Join(c.dir, minioMetaBucket, "buckets") + cacheFormatPath := pathJoin(c.dir, minioMetaBucket, formatConfigFile) + + if _, err := os.Stat(oldCacheBucketsPath); err != nil { + // remove .minio.sys sub directories + removeAll(path.Join(c.dir, minioMetaBucket, "multipart")) + removeAll(path.Join(c.dir, minioMetaBucket, "tmp")) + removeAll(path.Join(c.dir, minioMetaBucket, "trash")) + removeAll(path.Join(c.dir, minioMetaBucket, "buckets")) + // just migrate cache format + return migrateCacheFormatJSON(cacheFormatPath) + } + + buckets, err := readDir(oldCacheBucketsPath) + if err != nil { + return err + } + + for _, bucket := range buckets { + var objMetaPaths []string + root := path.Join(oldCacheBucketsPath, bucket) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if strings.HasSuffix(path, cacheMetaJSONFile) { + objMetaPaths = append(objMetaPaths, path) + } + return nil + }) + if err != nil { + return err + } + for _, oMeta := range objMetaPaths { + objSlice := strings.SplitN(oMeta, cacheMetaJSONFile, 2) + object := strings.TrimPrefix(objSlice[0], path.Join(oldCacheBucketsPath, bucket)) + object = strings.TrimSuffix(object, "/") + + destdir := getCacheSHADir(c.dir, bucket, object) + if err := os.MkdirAll(destdir, 0777); err != nil { + return err + } + prevCachedPath := path.Join(c.dir, bucket, object) + // move cached object to new cache directory path + // migrate cache data and add bit-rot protection hash sum + // at the start of each block + if err := migrateData(ctx, c, prevCachedPath, destdir); err != nil { + continue + } + stat, err := os.Stat(prevCachedPath) + if err != nil { + if err == errFileNotFound { + continue + } + logger.LogIf(ctx, err) + return err + } + // old cached file can now be removed + if err := os.Remove(prevCachedPath); err != nil { + return err + } + + // move cached metadata after changing cache metadata version + oldMetaPath := pathJoin(oldCacheBucketsPath, bucket, object, cacheMetaJSONFile) + metaPath := pathJoin(destdir, cacheMetaJSONFile) + metaBytes, err := ioutil.ReadFile(oldMetaPath) + if err != nil { + return err + } + // marshal cache metadata after adding version and stat info + meta := &cacheMeta{} + if err = json.Unmarshal(metaBytes, &meta); err != nil { + return err + } + meta.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} + meta.Version = cacheMetaVersion + meta.Stat.Size = stat.Size() + meta.Stat.ModTime = stat.ModTime() + jsonData, err := json.Marshal(meta) + if err != nil { + return err + } + + if err = ioutil.WriteFile(metaPath, jsonData, 0644); err != nil { + return err + } + } + + // delete old bucket from cache, now that all contents are cleared + removeAll(path.Join(c.dir, bucket)) + } + + // remove .minio.sys sub directories + removeAll(path.Join(c.dir, minioMetaBucket, "multipart")) + removeAll(path.Join(c.dir, minioMetaBucket, "tmp")) + removeAll(path.Join(c.dir, minioMetaBucket, "trash")) + removeAll(path.Join(c.dir, minioMetaBucket, "buckets")) + + return migrateCacheFormatJSON(cacheFormatPath) + +} + +func migrateCacheFormatJSON(cacheFormatPath string) error { + // now migrate format.json + f, err := os.OpenFile(cacheFormatPath, os.O_RDWR, 0) + if err != nil { + return err + } + defer f.Close() + formatV1 := formatCacheV1{} + if err := jsonLoad(f, &formatV1); err != nil { + return err + } + + formatV2 := &formatCacheV2{} + formatV2.formatMetaV1 = formatV1.formatMetaV1 + formatV2.Version = formatMetaVersion1 + formatV2.Cache = formatV1.Cache + formatV2.Cache.Version = formatCacheVersionV2 + if err := jsonSave(f, formatV2); err != nil { + return err + } + return nil } diff --git a/cmd/format-disk-cache_test.go b/cmd/format-disk-cache_test.go index 6713995ea..d818f0277 100644 --- a/cmd/format-disk-cache_test.go +++ b/cmd/format-disk-cache_test.go @@ -18,20 +18,10 @@ package cmd import ( "context" - "io" "os" "testing" ) -// Returns format.Cache.Version -func formatCacheGetVersion(r io.ReadSeeker) (string, error) { - format := &formatCacheVersionDetect{} - if err := jsonLoad(r, format); err != nil { - return "", err - } - return format.Cache.Version, nil -} - // TestDiskCacheFormat - tests initFormatCache, formatMetaGetFormatBackendCache, formatCacheGetVersion. func TestDiskCacheFormat(t *testing.T) { ctx := context.Background() @@ -55,8 +45,8 @@ func TestDiskCacheFormat(t *testing.T) { if err != nil { t.Fatal(err) } - if version != formatCacheVersionV1 { - t.Fatalf(`expected: %s, got: %s`, formatCacheVersionV1, version) + if version != formatCacheVersionV2 { + t.Fatalf(`expected: %s, got: %s`, formatCacheVersionV2, version) } // Corrupt the format.json file and test the functions. @@ -68,7 +58,7 @@ func TestDiskCacheFormat(t *testing.T) { t.Fatal(err) } - if _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil { + if _, _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil { t.Fatal("expected to fail") } @@ -81,15 +71,15 @@ func TestDiskCacheFormat(t *testing.T) { t.Fatal(err) } - if _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil { + if _, _, err = loadAndValidateCacheFormat(context.Background(), fsDirs); err == nil { t.Fatal("expected to fail") } } // generates a valid format.json for Cache backend. -func genFormatCacheValid() []*formatCacheV1 { +func genFormatCacheValid() []*formatCacheV2 { disks := make([]string, 8) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } @@ -97,7 +87,7 @@ func genFormatCacheValid() []*formatCacheV1 { format := &formatCacheV1{} format.Version = formatMetaVersion1 format.Format = formatCache - format.Cache.Version = formatCacheVersionV1 + format.Cache.Version = formatCacheVersionV2 format.Cache.This = disks[index] format.Cache.Disks = disks formatConfigs[index] = format @@ -106,9 +96,9 @@ func genFormatCacheValid() []*formatCacheV1 { } // generates a invalid format.json version for Cache backend. -func genFormatCacheInvalidVersion() []*formatCacheV1 { +func genFormatCacheInvalidVersion() []*formatCacheV2 { disks := make([]string, 8) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } @@ -128,14 +118,14 @@ func genFormatCacheInvalidVersion() []*formatCacheV1 { } // generates a invalid format.json version for Cache backend. -func genFormatCacheInvalidFormat() []*formatCacheV1 { +func genFormatCacheInvalidFormat() []*formatCacheV2 { disks := make([]string, 8) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } for index := range disks { - format := &formatCacheV1{} + format := &formatCacheV2{} format.Version = formatMetaVersion1 format.Format = formatCache format.Cache.Version = formatCacheVersionV1 @@ -150,14 +140,14 @@ func genFormatCacheInvalidFormat() []*formatCacheV1 { } // generates a invalid format.json version for Cache backend. -func genFormatCacheInvalidCacheVersion() []*formatCacheV1 { +func genFormatCacheInvalidCacheVersion() []*formatCacheV2 { disks := make([]string, 8) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } for index := range disks { - format := &formatCacheV1{} + format := &formatCacheV2{} format.Version = formatMetaVersion1 format.Format = formatCache format.Cache.Version = formatCacheVersionV1 @@ -172,17 +162,17 @@ func genFormatCacheInvalidCacheVersion() []*formatCacheV1 { } // generates a invalid format.json version for Cache backend. -func genFormatCacheInvalidDisksCount() []*formatCacheV1 { +func genFormatCacheInvalidDisksCount() []*formatCacheV2 { disks := make([]string, 7) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } for index := range disks { - format := &formatCacheV1{} + format := &formatCacheV2{} format.Version = formatMetaVersion1 format.Format = formatCache - format.Cache.Version = formatCacheVersionV1 + format.Cache.Version = formatCacheVersionV2 format.Cache.This = disks[index] format.Cache.Disks = disks formatConfigs[index] = format @@ -191,9 +181,9 @@ func genFormatCacheInvalidDisksCount() []*formatCacheV1 { } // generates a invalid format.json Disks for Cache backend. -func genFormatCacheInvalidDisks() []*formatCacheV1 { +func genFormatCacheInvalidDisks() []*formatCacheV2 { disks := make([]string, 8) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } @@ -201,7 +191,7 @@ func genFormatCacheInvalidDisks() []*formatCacheV1 { format := &formatCacheV1{} format.Version = formatMetaVersion1 format.Format = formatCache - format.Cache.Version = formatCacheVersionV1 + format.Cache.Version = formatCacheVersionV2 format.Cache.This = disks[index] format.Cache.Disks = disks formatConfigs[index] = format @@ -226,7 +216,7 @@ func genFormatCacheInvalidThis() []*formatCacheV1 { format := &formatCacheV1{} format.Version = formatMetaVersion1 format.Format = formatCache - format.Cache.Version = formatCacheVersionV1 + format.Cache.Version = formatCacheVersionV2 format.Cache.This = disks[index] format.Cache.Disks = disks formatConfigs[index] = format @@ -238,9 +228,9 @@ func genFormatCacheInvalidThis() []*formatCacheV1 { } // generates a invalid format.json Disk UUID in wrong order for Cache backend. -func genFormatCacheInvalidDisksOrder() []*formatCacheV1 { +func genFormatCacheInvalidDisksOrder() []*formatCacheV2 { disks := make([]string, 8) - formatConfigs := make([]*formatCacheV1, 8) + formatConfigs := make([]*formatCacheV2, 8) for index := range disks { disks[index] = mustGetUUID() } @@ -248,7 +238,7 @@ func genFormatCacheInvalidDisksOrder() []*formatCacheV1 { format := &formatCacheV1{} format.Version = formatMetaVersion1 format.Format = formatCache - format.Cache.Version = formatCacheVersionV1 + format.Cache.Version = formatCacheVersionV2 format.Cache.This = disks[index] format.Cache.Disks = disks formatConfigs[index] = format @@ -319,7 +309,7 @@ func TestFormatCache(t *testing.T) { } for i, testCase := range testCases { - err := validateCacheFormats(context.Background(), testCase.formatConfigs) + err := validateCacheFormats(context.Background(), false, testCase.formatConfigs) if err != nil && testCase.shouldPass { t.Errorf("Test %d: Expected to pass but failed with %s", i+1, err) } diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index dce7e46b4..bc83a9f96 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -253,7 +253,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { if len(cacheConfig.Drives) > 0 { var err error // initialize the new disk cache objects. - globalCacheObjectAPI, err = newServerCacheObjects(cacheConfig) + globalCacheObjectAPI, err = newServerCacheObjects(context.Background(), cacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 43059da22..e292542b7 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -337,7 +337,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } defer gr.Close() - objInfo := gr.ObjInfo if objectAPI.IsEncryptionSupported() { @@ -379,7 +378,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req statusCodeWritten = true w.WriteHeader(http.StatusPartialContent) } - // Write object content to response body if _, err = io.Copy(httpWriter, gr); err != nil { if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet @@ -1253,10 +1251,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req // Ensure that metadata does not contain sensitive information crypto.RemoveSensitiveEntries(metadata) - if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) { - putObject = api.CacheAPI().PutObject - } - // Create the object.. objInfo, err := putObject(ctx, bucket, object, pReader, opts) if err != nil { @@ -1408,9 +1402,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r return } newMultipartUpload := objectAPI.NewMultipartUpload - if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) { - newMultipartUpload = api.CacheAPI().NewMultipartUpload - } + uploadID, err := newMultipartUpload(ctx, bucket, object, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) @@ -1939,9 +1931,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } putObjectPart := objectAPI.PutObjectPart - if api.CacheAPI() != nil && !isEncrypted { - putObjectPart = api.CacheAPI().PutObjectPart - } + partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts) if err != nil { // Verify if the underlying error is signature mismatch. @@ -1974,9 +1964,6 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, return } abortMultipartUpload := objectAPI.AbortMultipartUpload - if api.CacheAPI() != nil { - abortMultipartUpload = api.CacheAPI().AbortMultipartUpload - } if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) @@ -2257,9 +2244,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } completeMultiPartUpload := objectAPI.CompleteMultipartUpload - if api.CacheAPI() != nil { - completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload - } // This code is specifically to handle the requirements for slow // complete multipart upload operations on FS mode. diff --git a/cmd/server-main.go b/cmd/server-main.go index 18b6bdfd0..881ad41a1 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -346,7 +346,7 @@ func serverMain(ctx *cli.Context) { var cacheConfig = globalServerConfig.GetCacheConfig() if len(cacheConfig.Drives) > 0 { // initialize the new disk cache objects. - globalCacheObjectAPI, err = newServerCacheObjects(cacheConfig) + globalCacheObjectAPI, err = newServerCacheObjects(context.Background(), cacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 7032325fe..465f08c82 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -257,9 +257,6 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, } deleteBucket := objectAPI.DeleteBucket - if web.CacheAPI() != nil { - deleteBucket = web.CacheAPI().DeleteBucket - } if err := deleteBucket(ctx, args.BucketName); err != nil { return toJSONError(ctx, err, args.BucketName) @@ -302,9 +299,6 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re return toJSONError(ctx, errServerNotInitialized) } listBuckets := objectAPI.ListBuckets - if web.CacheAPI() != nil { - listBuckets = web.CacheAPI().ListBuckets - } claims, owner, authErr := webRequestAuthenticate(r) if authErr != nil { @@ -407,9 +401,6 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r } listObjects := objectAPI.ListObjects - if web.CacheAPI() != nil { - listObjects = web.CacheAPI().ListObjects - } if isRemoteCallRequired(ctx, args.BucketName, objectAPI) { sr, err := globalDNSConfig.Get(args.BucketName) @@ -600,9 +591,6 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, return toJSONError(ctx, errServerNotInitialized) } listObjects := objectAPI.ListObjects - if web.CacheAPI() != nil { - listObjects = web.CacheAPI().ListObjects - } claims, owner, authErr := webRequestAuthenticate(r) if authErr != nil { @@ -1065,10 +1053,8 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } putObject := objectAPI.PutObject - if !hasServerSideEncryptionHeader(r.Header) && web.CacheAPI() != nil { - putObject = web.CacheAPI().PutObject - } - objInfo, err := putObject(ctx, bucket, object, pReader, opts) + + objInfo, err := putObject(context.Background(), bucket, object, pReader, opts) if err != nil { writeWebErrorResponse(w, err) return @@ -1307,32 +1293,29 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { writeWebErrorResponse(w, errInvalidBucketName) return } + getObjectNInfo := objectAPI.GetObjectNInfo + if web.CacheAPI() != nil { + getObjectNInfo = web.CacheAPI().GetObjectNInfo + } - getObject := objectAPI.GetObject - if web.CacheAPI() != nil { - getObject = web.CacheAPI().GetObject - } listObjects := objectAPI.ListObjects - if web.CacheAPI() != nil { - listObjects = web.CacheAPI().ListObjects - } archive := zip.NewWriter(w) defer archive.Close() - getObjectInfo := objectAPI.GetObjectInfo - if web.CacheAPI() != nil { - getObjectInfo = web.CacheAPI().GetObjectInfo - } - opts := ObjectOptions{} var length int64 for _, object := range args.Objects { // Writes compressed object file to the response. zipit := func(objectName string) error { - info, err := getObjectInfo(ctx, args.BucketName, objectName, opts) + var opts ObjectOptions + gr, err := getObjectNInfo(ctx, args.BucketName, objectName, nil, r.Header, readLock, opts) if err != nil { return err } + defer gr.Close() + + info := gr.ObjInfo + length = info.Size if objectAPI.IsEncryptionSupported() { if _, err = DecryptObjectInfo(&info, r.Header); err != nil { @@ -1395,7 +1378,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { // Response writer should be limited early on for decryption upto required length, // additionally also skipping mod(offset)64KiB boundaries. writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) - writer, startOffset, length, err = DecryptBlocksRequest(writer, r, + writer, _, length, err = DecryptBlocksRequest(writer, r, args.BucketName, objectName, startOffset, length, info, false) if err != nil { writeWebErrorResponse(w, err) @@ -1403,14 +1386,20 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { } } httpWriter := ioutil.WriteOnClose(writer) - if err = getObject(ctx, args.BucketName, objectName, startOffset, length, httpWriter, "", opts); err != nil { + + // Write object content to response body + if _, err = io.Copy(httpWriter, gr); err != nil { httpWriter.Close() if info.IsCompressed() { // Wait for decompression go-routine to retire. wg.Wait() } + if !httpWriter.HasWritten() { // write error response only if no data or headers has been written to client yet + writeWebErrorResponse(w, err) + } return err } + if err = httpWriter.Close(); err != nil { if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet writeWebErrorResponse(w, err) diff --git a/docs/disk-caching/DESIGN.md b/docs/disk-caching/DESIGN.md index 27a9fa39d..89dd0fd61 100644 --- a/docs/disk-caching/DESIGN.md +++ b/docs/disk-caching/DESIGN.md @@ -34,13 +34,13 @@ minio server -h - An object is only cached when drive has sufficient disk space. ## Behavior -Disk caching caches objects for both **uploaded** and **downloaded** objects i.e +Disk caching caches objects for **downloaded** objects i.e - Caches new objects for entries not found in cache while downloading. Otherwise serves from the cache. -- Caches all successfully uploaded objects. Replaces existing cached entry of the same object if needed. +- Bitrot protection is added to cached content and verified when object is served from cache. - When an object is deleted, corresponding entry in cache if any is deleted as well. - Cache continues to work for read-only operations such as GET, HEAD when backend is offline. -- Cache disallows write operations when backend is offline. +- Cache-Control and Expires headers can be used to control how long objects stay in the cache > NOTE: Expiration happens automatically based on the configured interval as explained above, frequently accessed objects stay alive in cache for a significantly longer time. @@ -51,3 +51,5 @@ Upon restart of minio server after a running minio process is killed or crashes, - Bucket policies are not cached, so anonymous operations are not supported when backend is offline. - Objects are distributed using deterministic hashing among the list of configured cache drives. If one or more drives go offline, or cache drive configuration is altered in any way, performance may degrade to a linear lookup time depending on the number of disks in cache. +## TODO +- Encrypt cached objects automatically with a cache encryption master key \ No newline at end of file