mirror of
https://github.com/minio/minio.git
synced 2025-01-26 14:13:16 -05:00
9439dfef64
This commit ensures that all tickers are stopped using defer ticker.Stop() style. This will also fix one bug seen when a client starts to listen to event notifications and that case will result a leak in tickers.
539 lines
16 KiB
Go
539 lines
16 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/disk"
|
|
"github.com/minio/minio/pkg/hash"
|
|
"github.com/minio/minio/pkg/lock"
|
|
)
|
|
|
|
const (
|
|
// cache.json object metadata for cached objects.
|
|
cacheMetaJSONFile = "cache.json"
|
|
|
|
cacheEnvDelimiter = ";"
|
|
)
|
|
|
|
// cacheFSObjects implements the cache backend operations.
|
|
type cacheFSObjects struct {
|
|
*FSObjects
|
|
// caching drive path (from cache "drives" in config.json)
|
|
dir string
|
|
// expiry in days specified in config.json
|
|
expiry int
|
|
// max disk usage pct
|
|
maxDiskUsagePct int
|
|
// purge() listens on this channel to start the cache-purge process
|
|
purgeChan chan struct{}
|
|
// mark false if drive is offline
|
|
online bool
|
|
// mutex to protect updates to online variable
|
|
onlineMutex *sync.RWMutex
|
|
}
|
|
|
|
// Inits the cache directory if it is not init'ed already.
|
|
// Initializing implies creation of new FS Object layer.
|
|
func newCacheFSObjects(dir string, expiry int, maxDiskUsagePct int) (*cacheFSObjects, error) {
|
|
// Assign a new UUID for FS minio mode. Each server instance
|
|
// gets its own UUID for temporary file transaction.
|
|
fsUUID := mustGetUUID()
|
|
|
|
// Initialize meta volume, if volume already exists ignores it.
|
|
if err := initMetaVolumeFS(dir, fsUUID); err != nil {
|
|
return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
|
|
}
|
|
|
|
trashPath := pathJoin(dir, minioMetaBucket, cacheTrashDir)
|
|
if err := os.MkdirAll(trashPath, 0777); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if expiry == 0 {
|
|
expiry = globalCacheExpiry
|
|
}
|
|
|
|
// Initialize fs objects.
|
|
fsObjects := &FSObjects{
|
|
fsPath: dir,
|
|
metaJSONFile: cacheMetaJSONFile,
|
|
fsUUID: fsUUID,
|
|
rwPool: &fsIOPool{
|
|
readersMap: make(map[string]*lock.RLockedFile),
|
|
},
|
|
nsMutex: newNSLock(false),
|
|
listPool: newTreeWalkPool(globalLookupTimeout),
|
|
appendFileMap: make(map[string]*fsAppendFile),
|
|
}
|
|
|
|
go fsObjects.cleanupStaleMultipartUploads(context.Background(), globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh)
|
|
|
|
cacheFS := cacheFSObjects{
|
|
FSObjects: fsObjects,
|
|
dir: dir,
|
|
expiry: expiry,
|
|
maxDiskUsagePct: maxDiskUsagePct,
|
|
purgeChan: make(chan struct{}),
|
|
online: true,
|
|
onlineMutex: &sync.RWMutex{},
|
|
}
|
|
return &cacheFS, nil
|
|
}
|
|
|
|
// Returns if the disk usage is low.
|
|
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
|
|
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
|
|
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
|
|
func (cfs *cacheFSObjects) diskUsageLow() bool {
|
|
|
|
minUsage := cfs.maxDiskUsagePct * 80 / 100
|
|
di, err := disk.GetInfo(cfs.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
|
return int(usedPercent) < minUsage
|
|
}
|
|
|
|
// Return if the disk usage is high.
|
|
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
|
|
func (cfs *cacheFSObjects) diskUsageHigh() bool {
|
|
di, err := disk.GetInfo(cfs.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return true
|
|
}
|
|
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
|
return int(usedPercent) > cfs.maxDiskUsagePct
|
|
}
|
|
|
|
// Returns if size space can be allocated without exceeding
|
|
// max disk usable for caching
|
|
func (cfs *cacheFSObjects) diskAvailable(size int64) bool {
|
|
di, err := disk.GetInfo(cfs.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", cfs.dir)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
|
|
return int(usedPercent) < cfs.maxDiskUsagePct
|
|
}
|
|
|
|
// purges all content marked trash from the cache.
|
|
func (cfs *cacheFSObjects) purgeTrash() {
|
|
ticker := time.NewTicker(time.Minute * cacheCleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-globalServiceDoneCh:
|
|
return
|
|
case <-ticker.C:
|
|
trashPath := path.Join(cfs.fsPath, minioMetaBucket, cacheTrashDir)
|
|
entries, err := readDir(trashPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, entry := range entries {
|
|
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{})
|
|
fi, err := fsStatVolume(ctx, pathJoin(trashPath, entry))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
dir := path.Join(trashPath, fi.Name())
|
|
|
|
// Delete all expired cache content.
|
|
fsRemoveAll(ctx, dir)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Purge cache entries that were not accessed.
|
|
func (cfs *cacheFSObjects) purge() {
|
|
delimiter := slashSeparator
|
|
maxKeys := 1000
|
|
ctx := context.Background()
|
|
for {
|
|
olderThan := cfs.expiry
|
|
for !cfs.diskUsageLow() {
|
|
// delete unaccessed objects older than expiry duration
|
|
expiry := UTCNow().AddDate(0, 0, -1*olderThan)
|
|
olderThan /= 2
|
|
if olderThan < 1 {
|
|
break
|
|
}
|
|
deletedCount := 0
|
|
buckets, err := cfs.ListBuckets(ctx)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
// Reset cache online status if drive was offline earlier.
|
|
if !cfs.IsOnline() {
|
|
cfs.setOnline(true)
|
|
}
|
|
for _, bucket := range buckets {
|
|
var continuationToken string
|
|
var marker string
|
|
for {
|
|
objects, err := cfs.ListObjects(ctx, bucket.Name, marker, continuationToken, delimiter, maxKeys)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
if !objects.IsTruncated {
|
|
break
|
|
}
|
|
marker = objects.NextMarker
|
|
for _, object := range objects.Objects {
|
|
// purge objects that qualify because of cache-control directives or
|
|
// past cache expiry duration.
|
|
if !filterFromCache(object.UserDefined) ||
|
|
!isStaleCache(object) ||
|
|
object.AccTime.After(expiry) {
|
|
continue
|
|
}
|
|
if err = cfs.DeleteObject(ctx, bucket.Name, object.Name); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
deletedCount++
|
|
}
|
|
}
|
|
}
|
|
if deletedCount == 0 {
|
|
// to avoid a busy loop
|
|
time.Sleep(time.Minute * 30)
|
|
}
|
|
}
|
|
<-cfs.purgeChan
|
|
}
|
|
}
|
|
|
|
// sets cache drive status
|
|
func (cfs *cacheFSObjects) setOnline(status bool) {
|
|
cfs.onlineMutex.Lock()
|
|
cfs.online = status
|
|
cfs.onlineMutex.Unlock()
|
|
}
|
|
|
|
// returns true if cache drive is online
|
|
func (cfs *cacheFSObjects) IsOnline() bool {
|
|
cfs.onlineMutex.RLock()
|
|
defer cfs.onlineMutex.RUnlock()
|
|
return cfs.online
|
|
}
|
|
|
|
// Caches the object to disk
|
|
func (cfs *cacheFSObjects) Put(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) error {
|
|
if cfs.diskUsageHigh() {
|
|
select {
|
|
case cfs.purgeChan <- struct{}{}:
|
|
default:
|
|
}
|
|
return errDiskFull
|
|
}
|
|
if !cfs.diskAvailable(data.Size()) {
|
|
return errDiskFull
|
|
}
|
|
if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil {
|
|
pErr := cfs.MakeBucketWithLocation(ctx, bucket, "")
|
|
if pErr != nil {
|
|
return pErr
|
|
}
|
|
}
|
|
_, err := cfs.PutObject(ctx, bucket, object, data, metadata)
|
|
// if err is due to disk being offline , mark cache drive as offline
|
|
if IsErr(err, baseErrs...) {
|
|
cfs.setOnline(false)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Returns the handle for the cached object
|
|
func (cfs *cacheFSObjects) Get(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
|
|
return cfs.GetObject(ctx, bucket, object, startOffset, length, writer, etag)
|
|
}
|
|
|
|
// Deletes the cached object
|
|
func (cfs *cacheFSObjects) Delete(ctx context.Context, bucket, object string) (err error) {
|
|
return cfs.DeleteObject(ctx, bucket, object)
|
|
}
|
|
|
|
// convenience function to check if object is cached on this cacheFSObjects
|
|
func (cfs *cacheFSObjects) Exists(ctx context.Context, bucket, object string) bool {
|
|
_, err := cfs.GetObjectInfo(ctx, bucket, object)
|
|
return err == nil
|
|
}
|
|
|
|
// Identical to fs PutObject operation except that it uses ETag in metadata
|
|
// headers.
|
|
func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, retErr error) {
|
|
fs := cfs.FSObjects
|
|
// Lock the object.
|
|
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
|
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
|
|
return objInfo, err
|
|
}
|
|
defer objectLock.Unlock()
|
|
|
|
// No metadata is set, allocate a new one.
|
|
meta := make(map[string]string)
|
|
for k, v := range metadata {
|
|
meta[k] = v
|
|
}
|
|
|
|
var err error
|
|
|
|
// Validate if bucket name is valid and exists.
|
|
if _, err = fs.statBucketDir(ctx, bucket); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket)
|
|
}
|
|
|
|
fsMeta := newFSMetaV1()
|
|
fsMeta.Meta = meta
|
|
|
|
// This is a special case with size as '0' and object ends
|
|
// with a slash separator, we treat it like a valid operation
|
|
// and return success.
|
|
if isObjectDir(object, data.Size()) {
|
|
// Check if an object is present as one of the parent dir.
|
|
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
|
|
return ObjectInfo{}, toObjectErr(errFileAccessDenied, bucket, object)
|
|
}
|
|
if err = mkdirAll(pathJoin(fs.fsPath, bucket, object), 0777); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
var fi os.FileInfo
|
|
if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
|
}
|
|
|
|
if err = checkPutObjectArgs(ctx, bucket, object, fs, data.Size()); err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
|
|
// Check if an object is present as one of the parent dir.
|
|
if fs.parentDirIsObject(ctx, bucket, path.Dir(object)) {
|
|
return ObjectInfo{}, toObjectErr(errFileAccessDenied, bucket, object)
|
|
}
|
|
|
|
// Validate input data size and it can never be less than zero.
|
|
if data.Size() < 0 {
|
|
logger.LogIf(ctx, errInvalidArgument)
|
|
return ObjectInfo{}, errInvalidArgument
|
|
}
|
|
|
|
var wlk *lock.LockedFile
|
|
if bucket != minioMetaBucket {
|
|
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
|
|
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
|
|
|
|
wlk, err = fs.rwPool.Create(fsMetaPath)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
// This close will allow for locks to be synchronized on `fs.json`.
|
|
defer wlk.Close()
|
|
defer func() {
|
|
// Remove meta file when PutObject encounters any error
|
|
if retErr != nil {
|
|
tmpDir := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)
|
|
fsRemoveMeta(ctx, bucketMetaDir, fsMetaPath, tmpDir)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Uploaded object will first be written to the temporary location which will eventually
|
|
// be renamed to the actual location. It is first written to the temporary location
|
|
// so that cleaning it up will be easy if the server goes down.
|
|
tempObj := mustGetUUID()
|
|
|
|
// Allocate a buffer to Read() from request body
|
|
bufSize := int64(readSizeV1)
|
|
if size := data.Size(); size > 0 && bufSize > size {
|
|
bufSize = size
|
|
}
|
|
|
|
buf := make([]byte, int(bufSize))
|
|
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
|
|
bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, buf, data.Size())
|
|
if err != nil {
|
|
fsRemoveFile(ctx, fsTmpObjPath)
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
if fsMeta.Meta["etag"] == "" {
|
|
fsMeta.Meta["etag"] = hex.EncodeToString(data.MD5Current())
|
|
}
|
|
// Should return IncompleteBody{} error when reader has fewer
|
|
// bytes than specified in request header.
|
|
if bytesWritten < data.Size() {
|
|
fsRemoveFile(ctx, fsTmpObjPath)
|
|
return ObjectInfo{}, IncompleteBody{}
|
|
}
|
|
|
|
// Delete the temporary object in the case of a
|
|
// failure. If PutObject succeeds, then there would be
|
|
// nothing to delete.
|
|
defer fsRemoveFile(ctx, fsTmpObjPath)
|
|
|
|
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
|
|
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
|
|
if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
if bucket != minioMetaBucket {
|
|
// Write FS metadata after a successful namespace operation.
|
|
if _, err = fsMeta.WriteTo(wlk); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
}
|
|
|
|
// Stat the file to fetch timestamp, size.
|
|
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
// Success.
|
|
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
|
}
|
|
|
|
// Implements S3 compatible initiate multipart API. Operation here is identical
|
|
// to fs backend implementation - with the exception that cache FS uses the uploadID
|
|
// generated on the backend
|
|
func (cfs *cacheFSObjects) NewMultipartUpload(ctx context.Context, bucket, object string, meta map[string]string, uploadID string) (string, error) {
|
|
if cfs.diskUsageHigh() {
|
|
select {
|
|
case cfs.purgeChan <- struct{}{}:
|
|
default:
|
|
}
|
|
return "", errDiskFull
|
|
}
|
|
|
|
if _, err := cfs.GetBucketInfo(ctx, bucket); err != nil {
|
|
pErr := cfs.MakeBucketWithLocation(ctx, bucket, "")
|
|
if pErr != nil {
|
|
return "", pErr
|
|
}
|
|
}
|
|
fs := cfs.FSObjects
|
|
if err := checkNewMultipartArgs(ctx, bucket, object, fs); err != nil {
|
|
return "", toObjectErr(err, bucket)
|
|
}
|
|
|
|
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
|
|
return "", toObjectErr(err, bucket)
|
|
}
|
|
|
|
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
err := mkdirAll(uploadIDDir, 0755)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return "", err
|
|
}
|
|
|
|
// Initialize fs.json values.
|
|
fsMeta := newFSMetaV1()
|
|
fsMeta.Meta = meta
|
|
|
|
fsMetaBytes, err := json.Marshal(fsMeta)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return "", err
|
|
}
|
|
|
|
if err = ioutil.WriteFile(pathJoin(uploadIDDir, fs.metaJSONFile), fsMetaBytes, 0644); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return "", err
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// moveBucketToTrash clears cacheFSObjects of bucket contents and moves it to trash folder.
|
|
func (cfs *cacheFSObjects) moveBucketToTrash(ctx context.Context, bucket string) (err error) {
|
|
fs := cfs.FSObjects
|
|
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
|
|
if err = bucketLock.GetLock(globalObjectTimeout); err != nil {
|
|
return err
|
|
}
|
|
defer bucketLock.Unlock()
|
|
bucketDir, err := fs.getBucketDir(ctx, bucket)
|
|
if err != nil {
|
|
return toObjectErr(err, bucket)
|
|
}
|
|
trashPath := pathJoin(cfs.fsPath, minioMetaBucket, cacheTrashDir)
|
|
expiredDir := path.Join(trashPath, bucket)
|
|
// Attempt to move regular bucket to expired directory.
|
|
if err = fsRenameDir(bucketDir, expiredDir); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return toObjectErr(err, bucket)
|
|
}
|
|
// Cleanup all the bucket metadata.
|
|
ominioMetadataBucketDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket)
|
|
nminioMetadataBucketDir := pathJoin(trashPath, MustGetUUID())
|
|
logger.LogIf(ctx, fsRenameDir(ominioMetadataBucketDir, nminioMetadataBucketDir))
|
|
return nil
|
|
}
|
|
|
|
// Removes a directory only if its empty, handles long
|
|
// paths for windows automatically.
|
|
func fsRenameDir(dirPath, newPath string) (err error) {
|
|
if dirPath == "" || newPath == "" {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
if err = checkPathLength(dirPath); err != nil {
|
|
return err
|
|
}
|
|
if err = checkPathLength(newPath); err != nil {
|
|
return err
|
|
}
|
|
if err = os.Rename(dirPath, newPath); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return errVolumeNotFound
|
|
} else if isSysErrNotEmpty(err) {
|
|
return errVolumeNotEmpty
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|