mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
This reverts commit a13b58f630.
This commit is contained in:
@@ -100,7 +100,9 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
listObjectsV2 := objectAPI.ListObjectsV2
|
||||
|
||||
if api.CacheAPI() != nil {
|
||||
listObjectsV2 = api.CacheAPI().ListObjectsV2
|
||||
}
|
||||
// Inititate a list objects operation based on the input params.
|
||||
// On success would return back ListObjectsInfo object to be
|
||||
// marshaled into S3 compatible XML header.
|
||||
@@ -177,6 +179,9 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
listObjects := objectAPI.ListObjects
|
||||
if api.CacheAPI() != nil {
|
||||
listObjects = api.CacheAPI().ListObjects
|
||||
}
|
||||
|
||||
// Inititate a list objects operation based on the input params.
|
||||
// On success would return back ListObjectsInfo object to be
|
||||
|
||||
@@ -105,7 +105,9 @@ func (api objectAPIHandlers) GetBucketLocationHandler(w http.ResponseWriter, r *
|
||||
}
|
||||
|
||||
getBucketInfo := objectAPI.GetBucketInfo
|
||||
|
||||
if api.CacheAPI() != nil {
|
||||
getBucketInfo = api.CacheAPI().GetBucketInfo
|
||||
}
|
||||
if _, err := getBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@@ -200,6 +202,9 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
|
||||
}
|
||||
|
||||
listBuckets := objectAPI.ListBuckets
|
||||
if api.CacheAPI() != nil {
|
||||
listBuckets = api.CacheAPI().ListBuckets
|
||||
}
|
||||
|
||||
if s3Error := checkRequestAuthType(ctx, r, policy.ListAllMyBucketsAction, "", ""); s3Error != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
|
||||
@@ -737,7 +742,9 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
|
||||
getBucketInfo := objectAPI.GetBucketInfo
|
||||
|
||||
if api.CacheAPI() != nil {
|
||||
getBucketInfo = api.CacheAPI().GetBucketInfo
|
||||
}
|
||||
if _, err := getBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
|
||||
return
|
||||
@@ -767,7 +774,9 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
|
||||
}
|
||||
|
||||
deleteBucket := objectAPI.DeleteBucket
|
||||
|
||||
if api.CacheAPI() != nil {
|
||||
deleteBucket = api.CacheAPI().DeleteBucket
|
||||
}
|
||||
// Attempt to delete bucket.
|
||||
if err := deleteBucket(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
|
||||
@@ -1,417 +0,0 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/djherbis/atime"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
)
|
||||
|
||||
const (
|
||||
// cache.json object metadata for cached objects.
|
||||
cacheMetaJSONFile = "cache.json"
|
||||
cacheDataFile = "data"
|
||||
|
||||
cacheEnvDelimiter = ";"
|
||||
)
|
||||
|
||||
// Represents the cache metadata struct
|
||||
type cacheMeta struct {
|
||||
// Metadata map for current object.
|
||||
Meta map[string]string `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
func (m *cacheMeta) ToObjectInfo(bucket, object string, fi os.FileInfo) (o ObjectInfo) {
|
||||
if len(m.Meta) == 0 {
|
||||
m.Meta = make(map[string]string)
|
||||
}
|
||||
|
||||
o = ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
}
|
||||
|
||||
// We set file info only if its valid.
|
||||
o.ModTime = timeSentinel
|
||||
if fi != nil {
|
||||
o.ModTime = fi.ModTime()
|
||||
o.Size = fi.Size()
|
||||
if fi.IsDir() {
|
||||
// Directory is always 0 bytes in S3 API, treat it as such.
|
||||
o.Size = 0
|
||||
o.IsDir = fi.IsDir()
|
||||
}
|
||||
}
|
||||
|
||||
o.ETag = extractETag(m.Meta)
|
||||
o.ContentType = m.Meta["content-type"]
|
||||
o.ContentEncoding = m.Meta["content-encoding"]
|
||||
if storageClass, ok := m.Meta[amzStorageClass]; ok {
|
||||
o.StorageClass = storageClass
|
||||
} else {
|
||||
o.StorageClass = globalMinioDefaultStorageClass
|
||||
}
|
||||
var (
|
||||
t time.Time
|
||||
e error
|
||||
)
|
||||
if exp, ok := m.Meta["expires"]; ok {
|
||||
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
||||
o.Expires = t.UTC()
|
||||
}
|
||||
}
|
||||
// etag/md5Sum has already been extracted. We need to
|
||||
// remove to avoid it from appearing as part of
|
||||
o.UserDefined = cleanMetadata(m.Meta)
|
||||
return o
|
||||
}
|
||||
|
||||
// disk cache
|
||||
type diskCache struct {
|
||||
dir string // caching directory
|
||||
maxDiskUsagePct int // max usage in %
|
||||
expiry int // cache expiry in days
|
||||
// to manage cache operations
|
||||
nsMutex *nsLockMap
|
||||
// mark false if drive is offline
|
||||
online bool
|
||||
// mutex to protect updates to online variable
|
||||
onlineMutex *sync.RWMutex
|
||||
// purge() listens on this channel to start the cache-purge process
|
||||
purgeChan chan struct{}
|
||||
}
|
||||
|
||||
// Inits the disk cache dir if it is not init'ed already.
|
||||
func newdiskCache(dir string, expiry int, maxDiskUsagePct int) (*diskCache, error) {
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, fmt.Errorf("Unable to initialize '%s' dir, %s", dir, err)
|
||||
}
|
||||
|
||||
if expiry == 0 {
|
||||
expiry = globalCacheExpiry
|
||||
}
|
||||
|
||||
cache := diskCache{
|
||||
dir: dir,
|
||||
expiry: expiry,
|
||||
maxDiskUsagePct: maxDiskUsagePct,
|
||||
purgeChan: make(chan struct{}),
|
||||
online: true,
|
||||
onlineMutex: &sync.RWMutex{},
|
||||
nsMutex: newNSLock(false),
|
||||
}
|
||||
return &cache, nil
|
||||
}
|
||||
|
||||
// Returns if the disk usage is low.
|
||||
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
|
||||
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
|
||||
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
|
||||
func (c *diskCache) diskUsageLow() bool {
|
||||
minUsage := c.maxDiskUsagePct * 80 / 100
|
||||
di, err := disk.GetInfo(c.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return false
|
||||
}
|
||||
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
||||
return int(usedPercent) < minUsage
|
||||
}
|
||||
|
||||
// Return if the disk usage is high.
|
||||
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
|
||||
func (c *diskCache) diskUsageHigh() bool {
|
||||
di, err := disk.GetInfo(c.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return true
|
||||
}
|
||||
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
||||
return int(usedPercent) > c.maxDiskUsagePct
|
||||
}
|
||||
|
||||
// Returns if size space can be allocated without exceeding
|
||||
// max disk usable for caching
|
||||
func (c *diskCache) diskAvailable(size int64) bool {
|
||||
di, err := disk.GetInfo(c.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return false
|
||||
}
|
||||
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
|
||||
return int(usedPercent) < c.maxDiskUsagePct
|
||||
}
|
||||
|
||||
// Purge cache entries that were not accessed.
|
||||
func (c *diskCache) purge() {
|
||||
ctx := context.Background()
|
||||
for {
|
||||
olderThan := c.expiry
|
||||
for !c.diskUsageLow() {
|
||||
// delete unaccessed objects older than expiry duration
|
||||
expiry := UTCNow().AddDate(0, 0, -1*olderThan)
|
||||
olderThan /= 2
|
||||
if olderThan < 1 {
|
||||
break
|
||||
}
|
||||
deletedCount := 0
|
||||
|
||||
objDirs, err := ioutil.ReadDir(c.dir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, obj := range objDirs {
|
||||
var fi os.FileInfo
|
||||
fi, err := os.Stat(pathJoin(c.dir, obj.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
objInfo, err := c.statCache(ctx, pathJoin(c.dir, obj.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if !filterFromCache(objInfo.UserDefined) ||
|
||||
!isStaleCache(objInfo) ||
|
||||
atime.Get(fi).After(expiry) {
|
||||
continue
|
||||
}
|
||||
if err = os.RemoveAll(pathJoin(c.dir, obj.Name())); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
deletedCount++
|
||||
}
|
||||
if deletedCount == 0 {
|
||||
// to avoid a busy loop
|
||||
time.Sleep(time.Minute * 30)
|
||||
}
|
||||
}
|
||||
<-c.purgeChan
|
||||
}
|
||||
}
|
||||
|
||||
// sets cache drive status
|
||||
func (c *diskCache) setOnline(status bool) {
|
||||
c.onlineMutex.Lock()
|
||||
c.online = status
|
||||
c.onlineMutex.Unlock()
|
||||
}
|
||||
|
||||
// returns true if cache drive is online
|
||||
func (c *diskCache) IsOnline() bool {
|
||||
c.onlineMutex.RLock()
|
||||
defer c.onlineMutex.RUnlock()
|
||||
return c.online
|
||||
}
|
||||
|
||||
// Stat returns ObjectInfo from disk cache
|
||||
func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
|
||||
cachePath := c.getCacheSHADir(bucket, object)
|
||||
cLock := c.nsMutex.NewNSLock(cachePath, "")
|
||||
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer cLock.RUnlock()
|
||||
|
||||
oi, err = c.stat(ctx, bucket, object)
|
||||
return
|
||||
}
|
||||
|
||||
// lockless helper to Stat
|
||||
func (c *diskCache) stat(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
||||
cacheObjPath := c.getCacheSHADir(bucket, object)
|
||||
oi, e = c.statCache(ctx, cacheObjPath)
|
||||
if e != nil {
|
||||
return
|
||||
}
|
||||
oi.Bucket = bucket
|
||||
oi.Name = object
|
||||
return
|
||||
}
|
||||
|
||||
// statCache is a convenience function for purge() to get ObjectInfo for cached object
|
||||
func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (oi ObjectInfo, e error) {
|
||||
// Stat the file to get file size.
|
||||
fi, err := fsStatFile(ctx, pathJoin(cacheObjPath, cacheDataFile))
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile)
|
||||
|
||||
f, err := os.Open(metaPath)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
defer f.Close()
|
||||
meta := &cacheMeta{}
|
||||
if err := jsonLoad(f, meta); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
return meta.ToObjectInfo("", "", fi), nil
|
||||
}
|
||||
|
||||
// caches object metadata to disk cache
|
||||
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string) error {
|
||||
fileName := c.getCacheSHADir(bucket, object)
|
||||
metaPath := pathJoin(fileName, cacheMetaJSONFile)
|
||||
|
||||
f, err := os.Create(metaPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
m := cacheMeta{Meta: meta}
|
||||
jsonData, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f.Write(jsonData)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *diskCache) getCacheSHADir(bucket, object string) string {
|
||||
return path.Join(c.dir, getSHA256Hash([]byte(path.Join(bucket, object))))
|
||||
}
|
||||
|
||||
// Caches the object to disk
|
||||
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
|
||||
if c.diskUsageHigh() {
|
||||
select {
|
||||
case c.purgeChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return errDiskFull
|
||||
}
|
||||
if !c.diskAvailable(size) {
|
||||
return errDiskFull
|
||||
}
|
||||
cachePath := c.getCacheSHADir(bucket, object)
|
||||
cLock := c.nsMutex.NewNSLock(cachePath, "")
|
||||
if err := cLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
bufSize := int64(readSizeV1)
|
||||
if size > 0 && bufSize > size {
|
||||
bufSize = size
|
||||
}
|
||||
filePath := path.Join(cachePath, cacheDataFile)
|
||||
buf := make([]byte, int(bufSize))
|
||||
_, err := fsCreateFile(ctx, filePath, data, buf, size)
|
||||
if IsErr(err, baseErrs...) {
|
||||
c.setOnline(false)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.saveMetadata(ctx, bucket, object, opts.UserDefined)
|
||||
}
|
||||
|
||||
// Get returns ObjectInfo and reader for object from disk cache
|
||||
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
cachePath := c.getCacheSHADir(bucket, object)
|
||||
fileName := path.Join(cachePath, cacheDataFile)
|
||||
|
||||
cLock := c.nsMutex.NewNSLock(cachePath, "")
|
||||
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer cLock.RUnlock()
|
||||
var objInfo ObjectInfo
|
||||
|
||||
if objInfo, err = c.stat(ctx, bucket, object); err != nil {
|
||||
return nil, toObjectErr(err, bucket, object)
|
||||
}
|
||||
var nsUnlocker = func() {}
|
||||
// For a directory, we need to send an reader that returns no bytes.
|
||||
if hasSuffix(object, slashSeparator) {
|
||||
// The lock taken above is released when
|
||||
// objReader.Close() is called by the caller.
|
||||
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
|
||||
}
|
||||
objReaderFn, off, length, rErr := NewGetObjectReader(rs, objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
|
||||
if rErr != nil {
|
||||
return nil, rErr
|
||||
}
|
||||
|
||||
// Read the object, doesn't exist returns an s3 compatible error.
|
||||
readCloser, size, err := fsOpenFile(ctx, fileName, off)
|
||||
if err != nil {
|
||||
return nil, toObjectErr(err, bucket, object)
|
||||
}
|
||||
reader := io.LimitReader(readCloser, length)
|
||||
closeFn := func() {
|
||||
readCloser.Close()
|
||||
}
|
||||
|
||||
// Check if range is valid
|
||||
if off > size || off+length > size {
|
||||
err = InvalidRange{off, length, size}
|
||||
logger.LogIf(ctx, err)
|
||||
closeFn()
|
||||
nsUnlocker()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return objReaderFn(reader, h, opts.CheckCopyPrecondFn, closeFn)
|
||||
}
|
||||
|
||||
// Deletes the cached object
|
||||
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
|
||||
cachePath := c.getCacheSHADir(bucket, object)
|
||||
cLock := c.nsMutex.NewNSLock(cachePath, "")
|
||||
if err := cLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
return os.RemoveAll(cachePath)
|
||||
|
||||
}
|
||||
|
||||
// convenience function to check if object is cached on this diskCache
|
||||
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
|
||||
fileName := getSHA256Hash([]byte(path.Join(bucket, object)))
|
||||
if _, err := os.Stat(path.Join(c.dir, fileName)); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
537
cmd/disk-cache-fs.go
Normal file
537
cmd/disk-cache-fs.go
Normal file
@@ -0,0 +1,537 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
"github.com/minio/minio/pkg/lock"
|
||||
)
|
||||
|
||||
const (
|
||||
// cache.json object metadata for cached objects.
|
||||
cacheMetaJSONFile = "cache.json"
|
||||
|
||||
cacheEnvDelimiter = ";"
|
||||
)
|
||||
|
||||
// cacheFSObjects implements the cache backend operations.
|
||||
type cacheFSObjects struct {
|
||||
*FSObjects
|
||||
// caching drive path (from cache "drives" in config.json)
|
||||
dir string
|
||||
// expiry in days specified in config.json
|
||||
expiry int
|
||||
// max disk usage pct
|
||||
maxDiskUsagePct int
|
||||
// purge() listens on this channel to start the cache-purge process
|
||||
purgeChan chan struct{}
|
||||
// mark false if drive is offline
|
||||
online bool
|
||||
// mutex to protect updates to online variable
|
||||
onlineMutex *sync.RWMutex
|
||||
}
|
||||
|
||||
// Inits the cache directory if it is not init'ed already.
|
||||
// Initializing implies creation of new FS Object layer.
|
||||
func newCacheFSObjects(dir string, expiry int, maxDiskUsagePct int) (*cacheFSObjects, error) {
|
||||
// Assign a new UUID for FS minio mode. Each server instance
|
||||
// gets its own UUID for temporary file transaction.
|
||||
fsUUID := mustGetUUID()
|
||||
|
||||
// Initialize meta volume, if volume already exists ignores it.
|
||||
if err := initMetaVolumeFS(dir, fsUUID); err != nil {
|
||||
return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
|
||||
}
|
||||
|
||||
trashPath := pathJoin(dir, minioMetaBucket, cacheTrashDir)
|
||||
if err := os.MkdirAll(trashPath, 0777); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if expiry == 0 {
|
||||
expiry = globalCacheExpiry
|
||||
}
|
||||
|
||||
// Initialize fs objects.
|
||||
fsObjects := &FSObjects{
|
||||
fsPath: dir,
|
||||
metaJSONFile: cacheMetaJSONFile,
|
||||
fsUUID: fsUUID,
|
||||
rwPool: &fsIOPool{
|
||||
readersMap: make(map[string]*lock.RLockedFile),
|
||||
},
|
||||
nsMutex: newNSLock(false),
|
||||
listPool: NewTreeWalkPool(globalLookupTimeout),
|
||||
appendFileMap: make(map[string]*fsAppendFile),
|
||||
}
|
||||
|
||||
go fsObjects.cleanupStaleMultipartUploads(context.Background(), GlobalMultipartCleanupInterval, GlobalMultipartExpiry, GlobalServiceDoneCh)
|
||||
|
||||
cacheFS := cacheFSObjects{
|
||||
FSObjects: fsObjects,
|
||||
dir: dir,
|
||||
expiry: expiry,
|
||||
maxDiskUsagePct: maxDiskUsagePct,
|
||||
purgeChan: make(chan struct{}),
|
||||
online: true,
|
||||
onlineMutex: &sync.RWMutex{},
|
||||
}
|
||||
return &cacheFS, nil
|
||||
}
|
||||
|
||||
// Returns if the disk usage is low.
|
||||
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
|
||||
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
|
||||
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
|
||||
func (cfs *cacheFSObjects) diskUsageLow() bool {
|
||||
|
||||
minUsage := cfs.maxDiskUsagePct * 80 / 100
|
||||
di, err := disk.GetInfo(cfs.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return false
|
||||
}
|
||||
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
||||
return int(usedPercent) < minUsage
|
||||
}
|
||||
|
||||
// Return if the disk usage is high.
|
||||
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
|
||||
func (cfs *cacheFSObjects) diskUsageHigh() bool {
|
||||
di, err := disk.GetInfo(cfs.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return true
|
||||
}
|
||||
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
||||
return int(usedPercent) > cfs.maxDiskUsagePct
|
||||
}
|
||||
|
||||
// Returns if size space can be allocated without exceeding
|
||||
// max disk usable for caching
|
||||
func (cfs *cacheFSObjects) diskAvailable(size int64) bool {
|
||||
di, err := disk.GetInfo(cfs.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return false
|
||||
}
|
||||
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
|
||||
return int(usedPercent) < cfs.maxDiskUsagePct
|
||||
}
|
||||
|
||||
// purges all content marked trash from the cache.
|
||||
func (cfs *cacheFSObjects) purgeTrash() {
|
||||
ticker := time.NewTicker(time.Minute * cacheCleanupInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-GlobalServiceDoneCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
trashPath := path.Join(cfs.fsPath, minioMetaBucket, cacheTrashDir)
|
||||
entries, err := readDir(trashPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, entry := range entries {
|
||||
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{})
|
||||
fi, err := fsStatVolume(ctx, pathJoin(trashPath, entry))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
dir := path.Join(trashPath, fi.Name())
|
||||
|
||||
// Delete all expired cache content.
|
||||
fsRemoveAll(ctx, dir)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Purge cache entries that were not accessed.
|
||||
func (cfs *cacheFSObjects) purge() {
|
||||
delimiter := slashSeparator
|
||||
maxKeys := 1000
|
||||
ctx := context.Background()
|
||||
for {
|
||||
olderThan := cfs.expiry
|
||||
for !cfs.diskUsageLow() {
|
||||
// delete unaccessed objects older than expiry duration
|
||||
expiry := UTCNow().AddDate(0, 0, -1*olderThan)
|
||||
olderThan /= 2
|
||||
if olderThan < 1 {
|
||||
break
|
||||
}
|
||||
deletedCount := 0
|
||||
buckets, err := cfs.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
// Reset cache online status if drive was offline earlier.
|
||||
if !cfs.IsOnline() {
|
||||
cfs.setOnline(true)
|
||||
}
|
||||
for _, bucket := range buckets {
|
||||
var continuationToken string
|
||||
var marker string
|
||||
for {
|
||||
objects, err := cfs.ListObjects(ctx, bucket.Name, marker, continuationToken, delimiter, maxKeys)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !objects.IsTruncated {
|
||||
break
|
||||
}
|
||||
marker = objects.NextMarker
|
||||
for _, object := range objects.Objects {
|
||||
// purge objects that qualify because of cache-control directives or
|
||||
// past cache expiry duration.
|
||||
if !filterFromCache(object.UserDefined) ||
|
||||
!isStaleCache(object) ||
|
||||
object.AccTime.After(expiry) {
|
||||
continue
|
||||
}
|
||||
if err = cfs.DeleteObject(ctx, bucket.Name, object.Name); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
deletedCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
if deletedCount == 0 {
|
||||
// to avoid a busy loop
|
||||
time.Sleep(time.Minute * 30)
|
||||
}
|
||||
}
|
||||
<-cfs.purgeChan
|
||||
}
|
||||
}
|
||||
|
||||
// sets cache drive status
|
||||
func (cfs *cacheFSObjects) setOnline(status bool) {
|
||||
cfs.onlineMutex.Lock()
|
||||
cfs.online = status
|
||||
cfs.onlineMutex.Unlock()
|
||||
}
|
||||
|
||||
// returns true if cache drive is online
|
||||
func (cfs *cacheFSObjects) IsOnline() bool {
|
||||
cfs.onlineMutex.RLock()
|
||||
defer cfs.onlineMutex.RUnlock()
|
||||
return cfs.online
|
||||
}
|
||||
|
||||
// Caches the object to disk
|
||||
func (cfs *cacheFSObjects) Put(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) error {
|
||||
if cfs.diskUsageHigh() {
|
||||
select {
|
||||
case cfs.purgeChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return errDiskFull
|
||||
}
|
||||
if !cfs.diskAvailable(data.Size()) {
|
||||
return errDiskFull
|
||||
}
|
||||
if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil {
|
||||
pErr := cfs.MakeBucketWithLocation(ctx, bucket, "")
|
||||
if pErr != nil {
|
||||
return pErr
|
||||
}
|
||||
}
|
||||
_, err := cfs.PutObject(ctx, bucket, object, data, opts)
|
||||
// if err is due to disk being offline , mark cache drive as offline
|
||||
if IsErr(err, baseErrs...) {
|
||||
cfs.setOnline(false)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Returns the handle for the cached object
|
||||
func (cfs *cacheFSObjects) Get(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
|
||||
return cfs.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
|
||||
}
|
||||
|
||||
// Deletes the cached object
|
||||
func (cfs *cacheFSObjects) Delete(ctx context.Context, bucket, object string) (err error) {
|
||||
return cfs.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
|
||||
// convenience function to check if object is cached on this cacheFSObjects
|
||||
func (cfs *cacheFSObjects) Exists(ctx context.Context, bucket, object string) bool {
|
||||
_, err := cfs.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// Identical to fs PutObject operation except that it uses ETag in metadata
|
||||
// headers.
|
||||
func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error) {
|
||||
data := r.Reader
|
||||
fs := cfs.FSObjects
|
||||
// Lock the object.
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
defer objectLock.Unlock()
|
||||
|
||||
// No metadata is set, allocate a new one.
|
||||
meta := make(map[string]string)
|
||||
for k, v := range opts.UserDefined {
|
||||
meta[k] = v
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
// Validate if bucket name is valid and exists.
|
||||
if _, err = fs.statBucketDir(ctx, bucket); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
fsMeta := newFSMetaV1()
|
||||
fsMeta.Meta = meta
|
||||
|
||||
// This is a special case with size as '0' and object ends
|
||||
// with a slash separator, we treat it like a valid operation
|
||||
// and return success.
|
||||
if isObjectDir(object, data.Size()) {
|
||||
// Check if an object is present as one of the parent dir.
|
||||
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
|
||||
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
|
||||
}
|
||||
if err = mkdirAll(pathJoin(fs.fsPath, bucket, object), 0777); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
var fi os.FileInfo
|
||||
if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||
}
|
||||
|
||||
if err = checkPutObjectArgs(ctx, bucket, object, fs, data.Size()); err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
|
||||
// Check if an object is present as one of the parent dir.
|
||||
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
|
||||
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
|
||||
}
|
||||
|
||||
// Validate input data size and it can never be less than zero.
|
||||
if data.Size() < -1 {
|
||||
logger.LogIf(ctx, errInvalidArgument)
|
||||
return ObjectInfo{}, errInvalidArgument
|
||||
}
|
||||
|
||||
var wlk *lock.LockedFile
|
||||
if bucket != minioMetaBucket {
|
||||
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
|
||||
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
|
||||
|
||||
wlk, err = fs.rwPool.Create(fsMetaPath)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
// This close will allow for locks to be synchronized on `fs.json`.
|
||||
defer wlk.Close()
|
||||
defer func() {
|
||||
// Remove meta file when PutObject encounters any error
|
||||
if retErr != nil {
|
||||
tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
|
||||
fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Uploaded object will first be written to the temporary location which will eventually
|
||||
// be renamed to the actual location. It is first written to the temporary location
|
||||
// so that cleaning it up will be easy if the server goes down.
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
// Allocate a buffer to Read() from request body
|
||||
bufSize := int64(readSizeV1)
|
||||
if size := data.Size(); size > 0 && bufSize > size {
|
||||
bufSize = size
|
||||
}
|
||||
|
||||
buf := make([]byte, int(bufSize))
|
||||
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
|
||||
bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, buf, data.Size())
|
||||
if err != nil {
|
||||
fsRemoveFile(ctx, fsTmpObjPath)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
if fsMeta.Meta["etag"] == "" {
|
||||
fsMeta.Meta["etag"] = r.MD5CurrentHexString()
|
||||
}
|
||||
// Should return IncompleteBody{} error when reader has fewer
|
||||
// bytes than specified in request header.
|
||||
if bytesWritten < data.Size() {
|
||||
fsRemoveFile(ctx, fsTmpObjPath)
|
||||
return ObjectInfo{}, IncompleteBody{}
|
||||
}
|
||||
|
||||
// Delete the temporary object in the case of a
|
||||
// failure. If PutObject succeeds, then there would be
|
||||
// nothing to delete.
|
||||
defer fsRemoveFile(ctx, fsTmpObjPath)
|
||||
|
||||
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
|
||||
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
|
||||
if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
if bucket != minioMetaBucket {
|
||||
// Write FS metadata after a successful namespace operation.
|
||||
if _, err = fsMeta.WriteTo(wlk); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Stat the file to fetch timestamp, size.
|
||||
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
// Success.
|
||||
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||
}
|
||||
|
||||
// Implements S3 compatible initiate multipart API. Operation here is identical
|
||||
// to fs backend implementation - with the exception that cache FS uses the uploadID
|
||||
// generated on the backend
|
||||
func (cfs *cacheFSObjects) NewMultipartUpload(ctx context.Context, bucket, object string, uploadID string, opts ObjectOptions) (string, error) {
|
||||
if cfs.diskUsageHigh() {
|
||||
select {
|
||||
case cfs.purgeChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return "", errDiskFull
|
||||
}
|
||||
|
||||
if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil {
|
||||
pErr := cfs.MakeBucketWithLocation(ctx, bucket, "")
|
||||
if pErr != nil {
|
||||
return "", pErr
|
||||
}
|
||||
}
|
||||
fs := cfs.FSObjects
|
||||
if err := checkNewMultipartArgs(ctx, bucket, object, fs); err != nil {
|
||||
return "", toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
|
||||
return "", toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
err := mkdirAll(uploadIDDir, 0755)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Initialize fs.json values.
|
||||
fsMeta := newFSMetaV1()
|
||||
fsMeta.Meta = opts.UserDefined
|
||||
|
||||
fsMetaBytes, err := json.Marshal(fsMeta)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
if err = ioutil.WriteFile(pathJoin(uploadIDDir, fs.metaJSONFile), fsMetaBytes, 0644); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return "", err
|
||||
}
|
||||
return uploadID, nil
|
||||
}
|
||||
|
||||
// moveBucketToTrash clears cacheFSObjects of bucket contents and moves it to trash folder.
|
||||
func (cfs *cacheFSObjects) moveBucketToTrash(ctx context.Context, bucket string) (err error) {
|
||||
fs := cfs.FSObjects
|
||||
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
|
||||
if err = bucketLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer bucketLock.Unlock()
|
||||
bucketDir, err := fs.getBucketDir(ctx, bucket)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
trashPath := pathJoin(cfs.fsPath, minioMetaBucket, cacheTrashDir)
|
||||
expiredDir := path.Join(trashPath, bucket)
|
||||
// Attempt to move regular bucket to expired directory.
|
||||
if err = fsRenameDir(bucketDir, expiredDir); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
// Cleanup all the bucket metadata.
|
||||
ominioMetadataBucketDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket)
|
||||
nminioMetadataBucketDir := pathJoin(trashPath, MustGetUUID())
|
||||
logger.LogIf(ctx, fsRenameDir(ominioMetadataBucketDir, nminioMetadataBucketDir))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Removes a directory only if its empty, handles long
|
||||
// paths for windows automatically.
|
||||
func fsRenameDir(dirPath, newPath string) (err error) {
|
||||
if dirPath == "" || newPath == "" {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
if err = checkPathLength(dirPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = checkPathLength(newPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = os.Rename(dirPath, newPath); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrNotEmpty(err) {
|
||||
return errVolumeNotEmpty
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,146 +0,0 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/crypto"
|
||||
)
|
||||
|
||||
type cacheControl struct {
|
||||
exclude bool
|
||||
expiry time.Time
|
||||
maxAge int
|
||||
sMaxAge int
|
||||
minFresh int
|
||||
}
|
||||
|
||||
// cache exclude directives in cache-control header
|
||||
var cacheExcludeDirectives = []string{
|
||||
"no-cache",
|
||||
"no-store",
|
||||
"must-revalidate",
|
||||
}
|
||||
|
||||
// returns true if cache exclude directives are set.
|
||||
func isCacheExcludeDirective(s string) bool {
|
||||
for _, directive := range cacheExcludeDirectives {
|
||||
if s == directive {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// returns struct with cache-control settings from user metadata.
|
||||
func getCacheControlOpts(m map[string]string) (c cacheControl, err error) {
|
||||
var headerVal string
|
||||
for k, v := range m {
|
||||
if k == "cache-control" {
|
||||
headerVal = v
|
||||
}
|
||||
if k == "expires" {
|
||||
if e, err := http.ParseTime(v); err == nil {
|
||||
c.expiry = e
|
||||
}
|
||||
}
|
||||
}
|
||||
if headerVal == "" {
|
||||
return
|
||||
}
|
||||
headerVal = strings.ToLower(headerVal)
|
||||
headerVal = strings.TrimSpace(headerVal)
|
||||
|
||||
vals := strings.Split(headerVal, ",")
|
||||
for _, val := range vals {
|
||||
val = strings.TrimSpace(val)
|
||||
p := strings.Split(val, "=")
|
||||
if isCacheExcludeDirective(p[0]) {
|
||||
c.exclude = true
|
||||
continue
|
||||
}
|
||||
|
||||
if len(p) != 2 {
|
||||
continue
|
||||
}
|
||||
if p[0] == "max-age" ||
|
||||
p[0] == "s-maxage" ||
|
||||
p[0] == "min-fresh" {
|
||||
i, err := strconv.Atoi(p[1])
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
if p[0] == "max-age" {
|
||||
c.maxAge = i
|
||||
}
|
||||
if p[0] == "s-maxage" {
|
||||
c.sMaxAge = i
|
||||
}
|
||||
if p[0] == "min-fresh" {
|
||||
c.minFresh = i
|
||||
}
|
||||
}
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// return true if metadata has a cache-control header
|
||||
// directive to exclude object from cache.
|
||||
func filterFromCache(m map[string]string) bool {
|
||||
c, err := getCacheControlOpts(m)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return c.exclude
|
||||
}
|
||||
|
||||
// returns true if cache expiry conditions met in cache-control/expiry metadata.
|
||||
func isStaleCache(objInfo ObjectInfo) bool {
|
||||
c, err := getCacheControlOpts(objInfo.UserDefined)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
now := time.Now()
|
||||
if c.sMaxAge > 0 && c.sMaxAge > int(now.Sub(objInfo.ModTime).Seconds()) {
|
||||
return true
|
||||
}
|
||||
if c.maxAge > 0 && c.maxAge > int(now.Sub(objInfo.ModTime).Seconds()) {
|
||||
return true
|
||||
}
|
||||
if !c.expiry.Equal(time.Time{}) && c.expiry.Before(time.Now()) {
|
||||
return true
|
||||
}
|
||||
if c.minFresh > 0 && c.minFresh <= int(now.Sub(objInfo.ModTime).Seconds()) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// backendDownError returns true if err is due to backend failure or faulty disk if in server mode
|
||||
func backendDownError(err error) bool {
|
||||
_, backendDown := err.(BackendDown)
|
||||
return backendDown || IsErr(err, baseErrs...)
|
||||
}
|
||||
|
||||
// IsCacheable returns if the object should be saved in the cache.
|
||||
func (o ObjectInfo) IsCacheable() bool {
|
||||
return !crypto.IsEncrypted(o.UserDefined)
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2018,2019 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,8 +19,6 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -28,45 +26,27 @@ import (
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
)
|
||||
|
||||
// Initialize cache objects.
|
||||
func initCacheObjects(disk string, cacheMaxUse int) (*diskCache, error) {
|
||||
return newdiskCache(disk, globalCacheExpiry, cacheMaxUse)
|
||||
// Initialize cache FS objects.
|
||||
func initCacheFSObjects(disk string, cacheMaxUse int) (*cacheFSObjects, error) {
|
||||
return newCacheFSObjects(disk, globalCacheExpiry, cacheMaxUse)
|
||||
}
|
||||
|
||||
// inits diskCache struct for nDisks
|
||||
func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) ([]*diskCache, error) {
|
||||
var cb []*diskCache
|
||||
func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) (*diskCache, error) {
|
||||
var cfs []*cacheFSObjects
|
||||
for _, d := range drives {
|
||||
obj, err := initCacheObjects(d, cacheMaxUse)
|
||||
obj, err := initCacheFSObjects(d, cacheMaxUse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cb = append(cb, obj)
|
||||
}
|
||||
return cb, nil
|
||||
}
|
||||
|
||||
// Tests ToObjectInfo function.
|
||||
func TestCacheMetadataObjInfo(t *testing.T) {
|
||||
m := cacheMeta{Meta: nil}
|
||||
objInfo := m.ToObjectInfo("testbucket", "testobject", nil)
|
||||
if objInfo.Size != 0 {
|
||||
t.Fatal("Unexpected object info value for Size", objInfo.Size)
|
||||
}
|
||||
if objInfo.ModTime != timeSentinel {
|
||||
t.Fatal("Unexpected object info value for ModTime ", objInfo.ModTime)
|
||||
}
|
||||
if objInfo.IsDir {
|
||||
t.Fatal("Unexpected object info value for IsDir", objInfo.IsDir)
|
||||
}
|
||||
if !objInfo.Expires.IsZero() {
|
||||
t.Fatal("Unexpected object info value for Expires ", objInfo.Expires)
|
||||
cfs = append(cfs, obj)
|
||||
}
|
||||
return &diskCache{cfs: cfs}, nil
|
||||
}
|
||||
|
||||
// test whether a drive being offline causes
|
||||
// getCachedLoc to fetch next online drive
|
||||
func TestGetCachedLoc(t *testing.T) {
|
||||
// getCacheFS to fetch next online drive
|
||||
func TestGetCacheFS(t *testing.T) {
|
||||
for n := 1; n < 10; n++ {
|
||||
fsDirs, err := getRandomDisks(n)
|
||||
if err != nil {
|
||||
@@ -76,15 +56,14 @@ func TestGetCachedLoc(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c := cacheObjects{cache: d}
|
||||
bucketName := "testbucket"
|
||||
objectName := "testobject"
|
||||
ctx := context.Background()
|
||||
// find cache drive where object would be hashed
|
||||
index := c.hashIndex(bucketName, objectName)
|
||||
index := d.hashIndex(bucketName, objectName)
|
||||
// turn off drive by setting online status to false
|
||||
c.cache[index].online = false
|
||||
cfs, err := c.getCachedLoc(ctx, bucketName, objectName)
|
||||
d.cfs[index].online = false
|
||||
cfs, err := d.getCacheFS(ctx, bucketName, objectName)
|
||||
if n == 1 && err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
@@ -92,7 +71,7 @@ func TestGetCachedLoc(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
i := -1
|
||||
for j, f := range c.cache {
|
||||
for j, f := range d.cfs {
|
||||
if f == cfs {
|
||||
i = j
|
||||
break
|
||||
@@ -105,8 +84,8 @@ func TestGetCachedLoc(t *testing.T) {
|
||||
}
|
||||
|
||||
// test whether a drive being offline causes
|
||||
// getCachedLoc to fetch next online drive
|
||||
func TestGetCacheMaxUse(t *testing.T) {
|
||||
// getCacheFS to fetch next online drive
|
||||
func TestGetCacheFSMaxUse(t *testing.T) {
|
||||
for n := 1; n < 10; n++ {
|
||||
fsDirs, err := getRandomDisks(n)
|
||||
if err != nil {
|
||||
@@ -116,16 +95,14 @@ func TestGetCacheMaxUse(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c := cacheObjects{cache: d}
|
||||
|
||||
bucketName := "testbucket"
|
||||
objectName := "testobject"
|
||||
ctx := context.Background()
|
||||
// find cache drive where object would be hashed
|
||||
index := c.hashIndex(bucketName, objectName)
|
||||
index := d.hashIndex(bucketName, objectName)
|
||||
// turn off drive by setting online status to false
|
||||
c.cache[index].online = false
|
||||
cb, err := c.getCachedLoc(ctx, bucketName, objectName)
|
||||
d.cfs[index].online = false
|
||||
cfs, err := d.getCacheFS(ctx, bucketName, objectName)
|
||||
if n == 1 && err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
@@ -133,8 +110,8 @@ func TestGetCacheMaxUse(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
i := -1
|
||||
for j, f := range d {
|
||||
if f == cb {
|
||||
for j, f := range d.cfs {
|
||||
if f == cfs {
|
||||
i = j
|
||||
break
|
||||
}
|
||||
@@ -195,9 +172,7 @@ func TestDiskCache(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c := cacheObjects{cache: d}
|
||||
|
||||
cache := c.cache[0]
|
||||
cache := d.cfs[0]
|
||||
ctx := context.Background()
|
||||
bucketName := "testbucket"
|
||||
objectName := "testobject"
|
||||
@@ -223,17 +198,14 @@ func TestDiskCache(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
|
||||
err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cReader, err := cache.Get(ctx, bucketName, objectName, nil, http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
}, opts)
|
||||
cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cachedObjInfo := cReader.ObjInfo
|
||||
if !cache.Exists(ctx, bucketName, objectName) {
|
||||
t.Fatal("Expected object to exist on cache")
|
||||
}
|
||||
@@ -247,7 +219,7 @@ func TestDiskCache(t *testing.T) {
|
||||
t.Fatal("Cached content-type does not match")
|
||||
}
|
||||
writer := bytes.NewBuffer(nil)
|
||||
_, err = io.Copy(writer, cReader)
|
||||
err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "", opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -274,7 +246,7 @@ func TestDiskCacheMaxUse(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cache := d[0]
|
||||
cache := d.cfs[0]
|
||||
ctx := context.Background()
|
||||
bucketName := "testbucket"
|
||||
objectName := "testobject"
|
||||
@@ -302,20 +274,19 @@ func TestDiskCacheMaxUse(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !cache.diskAvailable(int64(size)) {
|
||||
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
|
||||
err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta})
|
||||
if err != errDiskFull {
|
||||
t.Fatal("Cache max-use limit violated.")
|
||||
}
|
||||
} else {
|
||||
err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta})
|
||||
err = cache.Put(ctx, bucketName, objectName, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: httpMeta})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cReader, err := cache.Get(ctx, bucketName, objectName, nil, nil, opts)
|
||||
cachedObjInfo, err := cache.GetObjectInfo(ctx, bucketName, objectName, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cachedObjInfo := cReader.ObjInfo
|
||||
if !cache.Exists(ctx, bucketName, objectName) {
|
||||
t.Fatal("Expected object to exist on cache")
|
||||
}
|
||||
@@ -329,7 +300,7 @@ func TestDiskCacheMaxUse(t *testing.T) {
|
||||
t.Fatal("Cached content-type does not match")
|
||||
}
|
||||
writer := bytes.NewBuffer(nil)
|
||||
_, err = io.Copy(writer, cReader)
|
||||
err = cache.Get(ctx, bucketName, objectName, 0, int64(size), writer, "", opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -1249,6 +1249,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
// Ensure that metadata does not contain sensitive information
|
||||
crypto.RemoveSensitiveEntries(metadata)
|
||||
|
||||
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
|
||||
putObject = api.CacheAPI().PutObject
|
||||
}
|
||||
|
||||
// Create the object..
|
||||
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
|
||||
if err != nil {
|
||||
@@ -1400,7 +1404,9 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
newMultipartUpload := objectAPI.NewMultipartUpload
|
||||
|
||||
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
|
||||
newMultipartUpload = api.CacheAPI().NewMultipartUpload
|
||||
}
|
||||
uploadID, err := newMultipartUpload(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
@@ -1929,7 +1935,9 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
putObjectPart := objectAPI.PutObjectPart
|
||||
|
||||
if api.CacheAPI() != nil && !isEncrypted {
|
||||
putObjectPart = api.CacheAPI().PutObjectPart
|
||||
}
|
||||
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
|
||||
if err != nil {
|
||||
// Verify if the underlying error is signature mismatch.
|
||||
@@ -1962,6 +1970,9 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
|
||||
return
|
||||
}
|
||||
abortMultipartUpload := objectAPI.AbortMultipartUpload
|
||||
if api.CacheAPI() != nil {
|
||||
abortMultipartUpload = api.CacheAPI().AbortMultipartUpload
|
||||
}
|
||||
|
||||
if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
|
||||
@@ -2242,6 +2253,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
}
|
||||
|
||||
completeMultiPartUpload := objectAPI.CompleteMultipartUpload
|
||||
if api.CacheAPI() != nil {
|
||||
completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload
|
||||
}
|
||||
|
||||
// This code is specifically to handle the requirements for slow
|
||||
// complete multipart upload operations on FS mode.
|
||||
|
||||
@@ -253,6 +253,9 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs,
|
||||
ctx := context.Background()
|
||||
|
||||
deleteBucket := objectAPI.DeleteBucket
|
||||
if web.CacheAPI() != nil {
|
||||
deleteBucket = web.CacheAPI().DeleteBucket
|
||||
}
|
||||
|
||||
if err := deleteBucket(ctx, args.BucketName); err != nil {
|
||||
return toJSONError(err, args.BucketName)
|
||||
@@ -294,6 +297,9 @@ func (web *webAPIHandlers) ListBuckets(r *http.Request, args *WebGenericArgs, re
|
||||
return toJSONError(errServerNotInitialized)
|
||||
}
|
||||
listBuckets := objectAPI.ListBuckets
|
||||
if web.CacheAPI() != nil {
|
||||
listBuckets = web.CacheAPI().ListBuckets
|
||||
}
|
||||
|
||||
claims, owner, authErr := webRequestAuthenticate(r)
|
||||
if authErr != nil {
|
||||
@@ -397,6 +403,9 @@ func (web *webAPIHandlers) ListObjects(r *http.Request, args *ListObjectsArgs, r
|
||||
}
|
||||
|
||||
listObjects := objectAPI.ListObjects
|
||||
if web.CacheAPI() != nil {
|
||||
listObjects = web.CacheAPI().ListObjects
|
||||
}
|
||||
|
||||
if isRemoteCallRequired(context.Background(), args.BucketName, objectAPI) {
|
||||
sr, err := globalDNSConfig.Get(args.BucketName)
|
||||
@@ -568,6 +577,9 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
|
||||
return toJSONError(errServerNotInitialized)
|
||||
}
|
||||
listObjects := objectAPI.ListObjects
|
||||
if web.CacheAPI() != nil {
|
||||
listObjects = web.CacheAPI().ListObjects
|
||||
}
|
||||
|
||||
claims, owner, authErr := webRequestAuthenticate(r)
|
||||
if authErr != nil {
|
||||
@@ -1006,7 +1018,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
putObject := objectAPI.PutObject
|
||||
|
||||
if !hasServerSideEncryptionHeader(r.Header) && web.CacheAPI() != nil {
|
||||
putObject = web.CacheAPI().PutObject
|
||||
}
|
||||
objInfo, err := putObject(context.Background(), bucket, object, pReader, opts)
|
||||
if err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
@@ -1246,29 +1260,32 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
|
||||
writeWebErrorResponse(w, errInvalidBucketName)
|
||||
return
|
||||
}
|
||||
getObjectNInfo := objectAPI.GetObjectNInfo
|
||||
if web.CacheAPI() != nil {
|
||||
getObjectNInfo = web.CacheAPI().GetObjectNInfo
|
||||
}
|
||||
|
||||
getObject := objectAPI.GetObject
|
||||
if web.CacheAPI() != nil {
|
||||
getObject = web.CacheAPI().GetObject
|
||||
}
|
||||
listObjects := objectAPI.ListObjects
|
||||
if web.CacheAPI() != nil {
|
||||
listObjects = web.CacheAPI().ListObjects
|
||||
}
|
||||
|
||||
archive := zip.NewWriter(w)
|
||||
defer archive.Close()
|
||||
|
||||
getObjectInfo := objectAPI.GetObjectInfo
|
||||
if web.CacheAPI() != nil {
|
||||
getObjectInfo = web.CacheAPI().GetObjectInfo
|
||||
}
|
||||
opts := ObjectOptions{}
|
||||
var length int64
|
||||
for _, object := range args.Objects {
|
||||
// Writes compressed object file to the response.
|
||||
zipit := func(objectName string) error {
|
||||
var opts ObjectOptions
|
||||
gr, err := getObjectNInfo(ctx, args.BucketName, objectName, nil, r.Header, readLock, opts)
|
||||
info, err := getObjectInfo(ctx, args.BucketName, objectName, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer gr.Close()
|
||||
|
||||
info := gr.ObjInfo
|
||||
|
||||
length = info.Size
|
||||
if objectAPI.IsEncryptionSupported() {
|
||||
if _, err = DecryptObjectInfo(&info, r.Header); err != nil {
|
||||
@@ -1331,7 +1348,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
|
||||
// Response writer should be limited early on for decryption upto required length,
|
||||
// additionally also skipping mod(offset)64KiB boundaries.
|
||||
writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length)
|
||||
writer, _, length, err = DecryptBlocksRequest(writer, r,
|
||||
writer, startOffset, length, err = DecryptBlocksRequest(writer, r,
|
||||
args.BucketName, objectName, startOffset, length, info, false)
|
||||
if err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
@@ -1339,20 +1356,14 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
httpWriter := ioutil.WriteOnClose(writer)
|
||||
|
||||
// Write object content to response body
|
||||
if _, err = io.Copy(httpWriter, gr); err != nil {
|
||||
if err = getObject(ctx, args.BucketName, objectName, startOffset, length, httpWriter, "", opts); err != nil {
|
||||
httpWriter.Close()
|
||||
if info.IsCompressed() {
|
||||
// Wait for decompression go-routine to retire.
|
||||
wg.Wait()
|
||||
}
|
||||
if !httpWriter.HasWritten() { // write error response only if no data or headers has been written to client yet
|
||||
writeWebErrorResponse(w, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if err = httpWriter.Close(); err != nil {
|
||||
if !httpWriter.HasWritten() { // write error response only if no data has been written to client yet
|
||||
writeWebErrorResponse(w, err)
|
||||
|
||||
Reference in New Issue
Block a user