mirror of https://github.com/minio/minio.git
1667 lines
50 KiB
Go
1667 lines
50 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/djherbis/atime"
|
|
"github.com/minio/minio/internal/amztime"
|
|
"github.com/minio/minio/internal/config/cache"
|
|
"github.com/minio/minio/internal/crypto"
|
|
"github.com/minio/minio/internal/disk"
|
|
"github.com/minio/minio/internal/fips"
|
|
"github.com/minio/minio/internal/hash"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/kms"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/sio"
|
|
)
|
|
|
|
const (
|
|
// cache.json object metadata for cached objects.
|
|
cacheMetaJSONFile = "cache.json"
|
|
cacheDataFile = "part.1"
|
|
cacheDataFilePrefix = "part"
|
|
|
|
cacheMetaVersion = "1.0.0"
|
|
cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days
|
|
// SSECacheEncrypted is the metadata key indicating that the object
|
|
// is a cache entry encrypted with cache KMS master key in globalCacheKMS.
|
|
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
|
|
cacheMultipartDir = "multipart"
|
|
cacheWritebackDir = "writeback"
|
|
|
|
cacheStaleUploadCleanupInterval = time.Hour * 24
|
|
cacheStaleUploadExpiry = time.Hour * 24
|
|
cacheWBStaleUploadExpiry = time.Hour * 24 * 7
|
|
)
|
|
|
|
// CacheChecksumInfoV1 - carries checksums of individual blocks on disk.
|
|
type CacheChecksumInfoV1 struct {
|
|
Algorithm string `json:"algorithm"`
|
|
Blocksize int64 `json:"blocksize"`
|
|
}
|
|
|
|
// Represents the cache metadata struct
|
|
type cacheMeta struct {
|
|
Version string `json:"version"`
|
|
Stat StatInfo `json:"stat"` // Stat of the current object `cache.json`.
|
|
|
|
// checksums of blocks on disk.
|
|
Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"`
|
|
// Metadata map for current object.
|
|
Meta map[string]string `json:"meta,omitempty"`
|
|
// Ranges maps cached range to associated filename.
|
|
Ranges map[string]string `json:"ranges,omitempty"`
|
|
// Hits is a counter on the number of times this object has been accessed so far.
|
|
Hits int `json:"hits,omitempty"`
|
|
Bucket string `json:"bucket,omitempty"`
|
|
Object string `json:"object,omitempty"`
|
|
// for multipart upload
|
|
PartNumbers []int `json:"partNums,omitempty"` // Part Numbers
|
|
PartETags []string `json:"partETags,omitempty"` // Part ETags
|
|
PartSizes []int64 `json:"partSizes,omitempty"` // Part Sizes
|
|
PartActualSizes []int64 `json:"partASizes,omitempty"` // Part ActualSizes (compression)
|
|
}
|
|
|
|
// RangeInfo has the range, file and range length information for a cached range.
|
|
type RangeInfo struct {
|
|
Range string
|
|
File string
|
|
Size int64
|
|
}
|
|
|
|
// Empty returns true if this is an empty struct
|
|
func (r *RangeInfo) Empty() bool {
|
|
return r.Range == "" && r.File == "" && r.Size == 0
|
|
}
|
|
|
|
func (m *cacheMeta) ToObjectInfo() (o ObjectInfo) {
|
|
if len(m.Meta) == 0 {
|
|
m.Meta = make(map[string]string)
|
|
m.Stat.ModTime = timeSentinel
|
|
}
|
|
|
|
o = ObjectInfo{
|
|
Bucket: m.Bucket,
|
|
Name: m.Object,
|
|
CacheStatus: CacheHit,
|
|
CacheLookupStatus: CacheHit,
|
|
}
|
|
meta := cloneMSS(m.Meta)
|
|
// We set file info only if its valid.
|
|
o.Size = m.Stat.Size
|
|
o.ETag = extractETag(meta)
|
|
o.ContentType = meta["content-type"]
|
|
o.ContentEncoding = meta["content-encoding"]
|
|
if storageClass, ok := meta[xhttp.AmzStorageClass]; ok {
|
|
o.StorageClass = storageClass
|
|
} else {
|
|
o.StorageClass = globalMinioDefaultStorageClass
|
|
}
|
|
|
|
if exp, ok := meta["expires"]; ok {
|
|
if t, e := amztime.ParseHeader(exp); e == nil {
|
|
o.Expires = t.UTC()
|
|
}
|
|
}
|
|
if mtime, ok := meta["last-modified"]; ok {
|
|
if t, e := amztime.ParseHeader(mtime); e == nil {
|
|
o.ModTime = t.UTC()
|
|
}
|
|
}
|
|
o.Parts = make([]ObjectPartInfo, len(m.PartNumbers))
|
|
for i := range m.PartNumbers {
|
|
o.Parts[i].Number = m.PartNumbers[i]
|
|
o.Parts[i].Size = m.PartSizes[i]
|
|
o.Parts[i].ETag = m.PartETags[i]
|
|
o.Parts[i].ActualSize = m.PartActualSizes[i]
|
|
}
|
|
// etag/md5Sum has already been extracted. We need to
|
|
// remove to avoid it from appearing as part of user-defined metadata
|
|
o.UserDefined = cleanMetadata(meta)
|
|
return o
|
|
}
|
|
|
|
// represents disk cache struct
|
|
type diskCache struct {
|
|
// is set to 0 if drive is offline
|
|
online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
purgeRunning int32
|
|
|
|
triggerGC chan struct{}
|
|
dir string // caching directory
|
|
stats CacheDiskStats // disk cache stats for prometheus
|
|
quotaPct int // max usage in %
|
|
pool sync.Pool
|
|
after int // minimum accesses before an object is cached.
|
|
lowWatermark int
|
|
highWatermark int
|
|
enableRange bool
|
|
commitWriteback bool
|
|
commitWritethrough bool
|
|
|
|
retryWritebackCh chan ObjectInfo
|
|
// nsMutex namespace lock
|
|
nsMutex *nsLockMap
|
|
// Object functions pointing to the corresponding functions of backend implementation.
|
|
NewNSLockFn func(cachePath string) RWLocker
|
|
}
|
|
|
|
// Inits the disk cache dir if it is not initialized already.
|
|
func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCache, error) {
|
|
quotaPct := config.MaxUse
|
|
if quotaPct == 0 {
|
|
quotaPct = config.Quota
|
|
}
|
|
|
|
if err := os.MkdirAll(dir, 0o777); err != nil {
|
|
return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err)
|
|
}
|
|
cache := diskCache{
|
|
dir: dir,
|
|
triggerGC: make(chan struct{}, 1),
|
|
stats: CacheDiskStats{Dir: dir},
|
|
quotaPct: quotaPct,
|
|
after: config.After,
|
|
lowWatermark: config.WatermarkLow,
|
|
highWatermark: config.WatermarkHigh,
|
|
enableRange: config.Range,
|
|
commitWriteback: config.CacheCommitMode == CommitWriteBack,
|
|
commitWritethrough: config.CacheCommitMode == CommitWriteThrough,
|
|
|
|
retryWritebackCh: make(chan ObjectInfo, 10000),
|
|
online: 1,
|
|
pool: sync.Pool{
|
|
New: func() interface{} {
|
|
b := disk.AlignedBlock(int(cacheBlkSize))
|
|
return &b
|
|
},
|
|
},
|
|
nsMutex: newNSLock(false),
|
|
}
|
|
go cache.purgeWait(ctx)
|
|
go cache.cleanupStaleUploads(ctx)
|
|
if cache.commitWriteback {
|
|
go cache.scanCacheWritebackFailures(ctx)
|
|
}
|
|
cache.diskSpaceAvailable(0) // update if cache usage is already high.
|
|
cache.NewNSLockFn = func(cachePath string) RWLocker {
|
|
return cache.nsMutex.NewNSLock(nil, cachePath, "")
|
|
}
|
|
return &cache, nil
|
|
}
|
|
|
|
// diskUsageLow() returns true if disk usage falls below the low watermark w.r.t configured cache quota.
|
|
// Ex. for a 100GB disk, if quota is configured as 70% and watermark_low = 80% and
|
|
// watermark_high = 90% then garbage collection starts when 63% of disk is used and
|
|
// stops when disk usage drops to 56%
|
|
func (c *diskCache) diskUsageLow() bool {
|
|
gcStopPct := c.quotaPct * c.lowWatermark / 100
|
|
di, err := disk.GetInfo(c.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
|
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
usedPercent := float64(di.Used) * 100 / float64(di.Total)
|
|
low := int(usedPercent) < gcStopPct
|
|
atomic.StoreUint64(&c.stats.UsagePercent, uint64(usedPercent))
|
|
if low {
|
|
atomic.StoreInt32(&c.stats.UsageState, 0)
|
|
}
|
|
return low
|
|
}
|
|
|
|
// Returns if the disk usage reaches or exceeds configured cache quota when size is added.
|
|
// If current usage without size exceeds high watermark a GC is automatically queued.
|
|
func (c *diskCache) diskSpaceAvailable(size int64) bool {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
|
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
|
|
|
|
gcTriggerPct := c.quotaPct * c.highWatermark / 100
|
|
di, err := disk.GetInfo(c.dir)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
if di.Total == 0 {
|
|
logger.LogIf(ctx, errors.New("diskCache: Received 0 total disk size"))
|
|
return false
|
|
}
|
|
usedPercent := float64(di.Used) * 100 / float64(di.Total)
|
|
if usedPercent >= float64(gcTriggerPct) {
|
|
atomic.StoreInt32(&c.stats.UsageState, 1)
|
|
c.queueGC()
|
|
}
|
|
atomic.StoreUint64(&c.stats.UsagePercent, uint64(usedPercent))
|
|
|
|
// Recalculate percentage with provided size added.
|
|
usedPercent = float64(di.Used+uint64(size)) * 100 / float64(di.Total)
|
|
|
|
return usedPercent < float64(c.quotaPct)
|
|
}
|
|
|
|
// queueGC will queue a GC.
|
|
// Calling this function is always non-blocking.
|
|
func (c *diskCache) queueGC() {
|
|
select {
|
|
case c.triggerGC <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// toClear returns how many bytes should be cleared to reach the low watermark quota.
|
|
// returns 0 if below quota.
|
|
func (c *diskCache) toClear() uint64 {
|
|
di, err := disk.GetInfo(c.dir)
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
|
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return 0
|
|
}
|
|
return bytesToClear(int64(di.Total), int64(di.Free), uint64(c.quotaPct), uint64(c.lowWatermark), uint64(c.highWatermark))
|
|
}
|
|
|
|
func (c *diskCache) purgeWait(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-c.triggerGC: // wait here until someone triggers.
|
|
c.purge(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Purge cache entries that were not accessed.
|
|
func (c *diskCache) purge(ctx context.Context) {
|
|
if atomic.LoadInt32(&c.purgeRunning) == 1 || c.diskUsageLow() {
|
|
return
|
|
}
|
|
|
|
toFree := c.toClear()
|
|
if toFree == 0 {
|
|
return
|
|
}
|
|
|
|
atomic.StoreInt32(&c.purgeRunning, 1) // do not run concurrent purge()
|
|
defer atomic.StoreInt32(&c.purgeRunning, 0)
|
|
|
|
// expiry for cleaning up old cache.json files that
|
|
// need to be cleaned up.
|
|
expiry := UTCNow().Add(-cacheExpiryDays)
|
|
// defaulting max hits count to 100
|
|
// ignore error we know what value we are passing.
|
|
scorer, err := newFileScorer(toFree, time.Now().Unix(), 100)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
|
|
// this function returns FileInfo for cached range files.
|
|
fiStatRangesFn := func(ranges map[string]string, pathPrefix string) map[string]os.FileInfo {
|
|
fm := make(map[string]os.FileInfo)
|
|
for _, rngFile := range ranges {
|
|
fname := pathJoin(pathPrefix, rngFile)
|
|
if fi, err := os.Stat(fname); err == nil {
|
|
fm[fname] = fi
|
|
}
|
|
}
|
|
return fm
|
|
}
|
|
|
|
// this function returns most recent Atime among cached part files.
|
|
lastAtimeFn := func(partNums []int, pathPrefix string) time.Time {
|
|
lastATime := timeSentinel
|
|
for _, pnum := range partNums {
|
|
fname := pathJoin(pathPrefix, fmt.Sprintf("%s.%d", cacheDataFilePrefix, pnum))
|
|
if fi, err := os.Stat(fname); err == nil {
|
|
if atime.Get(fi).After(lastATime) {
|
|
lastATime = atime.Get(fi)
|
|
}
|
|
}
|
|
}
|
|
if len(partNums) == 0 {
|
|
fname := pathJoin(pathPrefix, cacheDataFile)
|
|
if fi, err := os.Stat(fname); err == nil {
|
|
lastATime = atime.Get(fi)
|
|
}
|
|
}
|
|
return lastATime
|
|
}
|
|
|
|
filterFn := func(name string, typ os.FileMode) error {
|
|
if name == minioMetaBucket {
|
|
// Proceed to next file.
|
|
return nil
|
|
}
|
|
|
|
cacheDir := pathJoin(c.dir, name)
|
|
meta, _, numHits, err := c.statCachedMeta(ctx, cacheDir)
|
|
if err != nil {
|
|
// delete any partially filled cache entry left behind.
|
|
removeAll(cacheDir)
|
|
// Proceed to next file.
|
|
return nil
|
|
}
|
|
// get last access time of cache part files
|
|
lastAtime := lastAtimeFn(meta.PartNumbers, pathJoin(c.dir, name))
|
|
// stat all cached file ranges.
|
|
cachedRngFiles := fiStatRangesFn(meta.Ranges, pathJoin(c.dir, name))
|
|
objInfo := meta.ToObjectInfo()
|
|
// prevent gc from clearing un-synced commits. This metadata is present when
|
|
// cache writeback commit setting is enabled.
|
|
status, ok := objInfo.UserDefined[writeBackStatusHeader]
|
|
if ok && status != CommitComplete.String() {
|
|
return nil
|
|
}
|
|
cc := cacheControlOpts(objInfo)
|
|
switch {
|
|
case cc != nil:
|
|
if cc.isStale(objInfo.ModTime) {
|
|
removeAll(cacheDir)
|
|
scorer.adjustSaveBytes(-objInfo.Size)
|
|
// break early if sufficient disk space reclaimed.
|
|
if c.diskUsageLow() {
|
|
// if we found disk usage is already low, we return nil filtering is complete.
|
|
return errDoneForNow
|
|
}
|
|
}
|
|
case lastAtime != timeSentinel:
|
|
// cached multipart or single part
|
|
objInfo.AccTime = lastAtime
|
|
objInfo.Name = pathJoin(c.dir, name, cacheDataFile)
|
|
scorer.addFileWithObjInfo(objInfo, numHits)
|
|
}
|
|
|
|
for fname, fi := range cachedRngFiles {
|
|
if fi == nil {
|
|
continue
|
|
}
|
|
if cc != nil {
|
|
if cc.isStale(objInfo.ModTime) {
|
|
removeAll(fname)
|
|
scorer.adjustSaveBytes(-fi.Size())
|
|
|
|
// break early if sufficient disk space reclaimed.
|
|
if c.diskUsageLow() {
|
|
// if we found disk usage is already low, we return nil filtering is complete.
|
|
return errDoneForNow
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
scorer.addFile(fname, atime.Get(fi), fi.Size(), numHits)
|
|
}
|
|
// clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json
|
|
fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile))
|
|
if err != nil || (fi != nil && fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) {
|
|
removeAll(cacheDir)
|
|
if fi != nil {
|
|
scorer.adjustSaveBytes(-fi.Size())
|
|
}
|
|
// Proceed to next file.
|
|
return nil
|
|
}
|
|
|
|
// if we found disk usage is already low, we return nil filtering is complete.
|
|
if c.diskUsageLow() {
|
|
return errDoneForNow
|
|
}
|
|
|
|
// Proceed to next file.
|
|
return nil
|
|
}
|
|
|
|
if err := readDirFn(c.dir, filterFn); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
|
|
scorer.purgeFunc(func(qfile queuedFile) {
|
|
fileName := qfile.name
|
|
removeAll(fileName)
|
|
slashIdx := strings.LastIndex(fileName, SlashSeparator)
|
|
if slashIdx >= 0 {
|
|
fileNamePrefix := fileName[0:slashIdx]
|
|
fname := fileName[slashIdx+1:]
|
|
if fname == cacheDataFile {
|
|
removeAll(fileNamePrefix)
|
|
}
|
|
}
|
|
})
|
|
|
|
scorer.reset()
|
|
}
|
|
|
|
// sets cache drive status
|
|
func (c *diskCache) setOffline() {
|
|
atomic.StoreUint32(&c.online, 0)
|
|
}
|
|
|
|
// returns true if cache drive is online
|
|
func (c *diskCache) IsOnline() bool {
|
|
return atomic.LoadUint32(&c.online) != 0
|
|
}
|
|
|
|
// Stat returns ObjectInfo from disk cache
|
|
func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, numHits int, err error) {
|
|
var partial bool
|
|
var meta *cacheMeta
|
|
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
// Stat the file to get file size.
|
|
meta, partial, numHits, err = c.statCachedMeta(ctx, cacheObjPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if partial {
|
|
return oi, numHits, errFileNotFound
|
|
}
|
|
oi = meta.ToObjectInfo()
|
|
oi.Bucket = bucket
|
|
oi.Name = object
|
|
|
|
if err = decryptCacheObjectETag(&oi); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// statCachedMeta returns metadata from cache - including ranges cached, partial to indicate
|
|
// if partial object is cached.
|
|
func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
|
|
cLock := c.NewNSLockFn(cacheObjPath)
|
|
lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer cLock.RUnlock(lkctx.Cancel)
|
|
return c.statCache(ctx, cacheObjPath)
|
|
}
|
|
|
|
// statRange returns ObjectInfo and RangeInfo from disk cache
|
|
func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, rngInfo RangeInfo, numHits int, err error) {
|
|
// Stat the file to get file size.
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
var meta *cacheMeta
|
|
var partial bool
|
|
|
|
meta, partial, numHits, err = c.statCachedMeta(ctx, cacheObjPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
oi = meta.ToObjectInfo()
|
|
oi.Bucket = bucket
|
|
oi.Name = object
|
|
if !partial {
|
|
err = decryptCacheObjectETag(&oi)
|
|
return
|
|
}
|
|
|
|
actualSize := uint64(meta.Stat.Size)
|
|
var length int64
|
|
_, length, err = rs.GetOffsetLength(int64(actualSize))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
actualRngSize := uint64(length)
|
|
if globalCacheKMS != nil {
|
|
actualRngSize, _ = sio.EncryptedSize(uint64(length))
|
|
}
|
|
|
|
rng := rs.String(int64(actualSize))
|
|
rngFile, ok := meta.Ranges[rng]
|
|
if !ok {
|
|
return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object}
|
|
}
|
|
if _, err = os.Stat(pathJoin(cacheObjPath, rngFile)); err != nil {
|
|
return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object}
|
|
}
|
|
rngInfo = RangeInfo{Range: rng, File: rngFile, Size: int64(actualRngSize)}
|
|
|
|
err = decryptCacheObjectETag(&oi)
|
|
return
|
|
}
|
|
|
|
// statCache is a convenience function for purge() to get ObjectInfo for cached object
|
|
func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) {
|
|
// Stat the file to get file size.
|
|
metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile)
|
|
f, err := os.Open(metaPath)
|
|
if err != nil {
|
|
return meta, partial, 0, err
|
|
}
|
|
defer f.Close()
|
|
meta = &cacheMeta{Version: cacheMetaVersion}
|
|
if err := jsonLoad(f, meta); err != nil {
|
|
return meta, partial, 0, err
|
|
}
|
|
// get metadata of part.1 if full file has been cached.
|
|
partial = true
|
|
if _, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)); err == nil {
|
|
partial = false
|
|
}
|
|
if writebackInProgress(meta.Meta) {
|
|
partial = false
|
|
}
|
|
return meta, partial, meta.Hits, nil
|
|
}
|
|
|
|
// saves object metadata to disk cache
|
|
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
|
// finalizeWB is true only if metadata update accompanied by moving part from temp location to cache dir.
|
|
func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly, finalizeWB bool) error {
|
|
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
|
cLock := c.NewNSLockFn(cachedPath)
|
|
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer cLock.Unlock(lkctx.Cancel)
|
|
if err = c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly); err != nil {
|
|
return err
|
|
}
|
|
// move part saved in writeback directory and cache.json atomically
|
|
if finalizeWB {
|
|
wbdir := getCacheWriteBackSHADir(c.dir, bucket, object)
|
|
if err = renameAll(pathJoin(wbdir, cacheDataFile), pathJoin(cachedPath, cacheDataFile)); err != nil {
|
|
return err
|
|
}
|
|
removeAll(wbdir) // cleanup writeback/shadir
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// saves object metadata to disk cache
|
|
// incHitsOnly is true if metadata update is incrementing only the hit counter
|
|
func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error {
|
|
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
|
metaPath := pathJoin(cachedPath, cacheMetaJSONFile)
|
|
// Create cache directory if needed
|
|
if err := os.MkdirAll(cachedPath, 0o777); err != nil {
|
|
return err
|
|
}
|
|
f, err := OpenFile(metaPath, os.O_RDWR|os.O_CREATE|writeMode, 0o666)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
m := &cacheMeta{
|
|
Version: cacheMetaVersion,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
if err := jsonLoad(f, m); err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
// increment hits
|
|
if rs != nil {
|
|
// rsFileName gets set by putRange. Check for blank values here
|
|
// coming from other code paths that set rs only (eg initial creation or hit increment).
|
|
if rsFileName != "" {
|
|
if m.Ranges == nil {
|
|
m.Ranges = make(map[string]string)
|
|
}
|
|
m.Ranges[rs.String(actualSize)] = rsFileName
|
|
}
|
|
}
|
|
if rs == nil && !incHitsOnly {
|
|
// this is necessary cleanup of range files if entire object is cached.
|
|
if _, err := os.Stat(pathJoin(cachedPath, cacheDataFile)); err == nil {
|
|
for _, f := range m.Ranges {
|
|
removeAll(pathJoin(cachedPath, f))
|
|
}
|
|
m.Ranges = nil
|
|
}
|
|
}
|
|
m.Stat.Size = actualSize
|
|
if !incHitsOnly {
|
|
// reset meta
|
|
m.Meta = meta
|
|
} else {
|
|
if m.Meta == nil {
|
|
m.Meta = make(map[string]string)
|
|
}
|
|
// save etag in m.Meta if missing
|
|
if _, ok := m.Meta["etag"]; !ok {
|
|
if etag, ok := meta["etag"]; ok {
|
|
m.Meta["etag"] = etag
|
|
}
|
|
}
|
|
}
|
|
m.Hits++
|
|
|
|
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
|
|
return jsonSave(f, m)
|
|
}
|
|
|
|
// updates the ETag and ModTime on cache with ETag from backend
|
|
func (c *diskCache) updateMetadata(ctx context.Context, bucket, object, etag string, modTime time.Time, size int64) error {
|
|
cachedPath := getCacheSHADir(c.dir, bucket, object)
|
|
metaPath := pathJoin(cachedPath, cacheMetaJSONFile)
|
|
// Create cache directory if needed
|
|
if err := os.MkdirAll(cachedPath, 0o777); err != nil {
|
|
return err
|
|
}
|
|
f, err := OpenFile(metaPath, os.O_RDWR|writeMode, 0o666)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
m := &cacheMeta{
|
|
Version: cacheMetaVersion,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
if err := jsonLoad(f, m); err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
if m.Meta == nil {
|
|
m.Meta = make(map[string]string)
|
|
}
|
|
var key []byte
|
|
var objectEncryptionKey crypto.ObjectKey
|
|
|
|
if globalCacheKMS != nil {
|
|
// Calculating object encryption key
|
|
key, err = decryptObjectMeta(key, bucket, object, m.Meta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
copy(objectEncryptionKey[:], key)
|
|
m.Meta["etag"] = hex.EncodeToString(objectEncryptionKey.SealETag([]byte(etag)))
|
|
} else {
|
|
m.Meta["etag"] = etag
|
|
}
|
|
m.Meta["last-modified"] = modTime.UTC().Format(http.TimeFormat)
|
|
m.Meta["Content-Length"] = strconv.Itoa(int(size))
|
|
return jsonSave(f, m)
|
|
}
|
|
|
|
func getCacheSHADir(dir, bucket, object string) string {
|
|
return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object))))
|
|
}
|
|
|
|
// returns temporary writeback cache location.
|
|
func getCacheWriteBackSHADir(dir, bucket, object string) string {
|
|
return pathJoin(dir, minioMetaBucket, "writeback", getSHA256Hash([]byte(pathJoin(bucket, object))))
|
|
}
|
|
|
|
// Cache data to disk with bitrot checksum added for each block of 1MB
|
|
func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) {
|
|
if err := os.MkdirAll(cachePath, 0o777); err != nil {
|
|
return 0, "", err
|
|
}
|
|
filePath := pathJoin(cachePath, fileName)
|
|
|
|
if filePath == "" || reader == nil {
|
|
return 0, "", errInvalidArgument
|
|
}
|
|
|
|
if err := checkPathLength(filePath); err != nil {
|
|
return 0, "", err
|
|
}
|
|
f, err := os.Create(filePath)
|
|
if err != nil {
|
|
return 0, "", osErrToFileErr(err)
|
|
}
|
|
defer f.Close()
|
|
|
|
var bytesWritten int64
|
|
|
|
h := HighwayHash256S.New()
|
|
|
|
bufp := c.pool.Get().(*[]byte)
|
|
defer c.pool.Put(bufp)
|
|
md5Hash := md5.New()
|
|
var n, n2 int
|
|
for {
|
|
n, err = io.ReadFull(reader, *bufp)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
return 0, "", err
|
|
}
|
|
eof := err == io.EOF || err == io.ErrUnexpectedEOF
|
|
if n == 0 && size != 0 {
|
|
// Reached EOF, nothing more to be done.
|
|
break
|
|
}
|
|
h.Reset()
|
|
if _, err = h.Write((*bufp)[:n]); err != nil {
|
|
return 0, "", err
|
|
}
|
|
hashBytes := h.Sum(nil)
|
|
// compute md5Hash of original data stream if writeback commit to cache
|
|
if c.commitWriteback || c.commitWritethrough {
|
|
if _, err = md5Hash.Write((*bufp)[:n]); err != nil {
|
|
return 0, "", err
|
|
}
|
|
}
|
|
if _, err = f.Write(hashBytes); err != nil {
|
|
return 0, "", err
|
|
}
|
|
if n2, err = f.Write((*bufp)[:n]); err != nil {
|
|
return 0, "", err
|
|
}
|
|
bytesWritten += int64(n2)
|
|
if eof {
|
|
break
|
|
}
|
|
}
|
|
md5sumCurr := md5Hash.Sum(nil)
|
|
|
|
return bytesWritten, base64.StdEncoding.EncodeToString(md5sumCurr), nil
|
|
}
|
|
|
|
func newCacheEncryptReader(ctx context.Context, content io.Reader, bucket, object string, metadata map[string]string) (r io.Reader, err error) {
|
|
objectEncryptionKey, err := newCacheEncryptMetadata(ctx, bucket, object, metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reader, err := sio.EncryptReader(content, sio.Config{Key: objectEncryptionKey, MinVersion: sio.Version20, CipherSuites: fips.DARECiphers()})
|
|
if err != nil {
|
|
return nil, crypto.ErrInvalidCustomerKey
|
|
}
|
|
return reader, nil
|
|
}
|
|
|
|
func newCacheEncryptMetadata(ctx context.Context, bucket, object string, metadata map[string]string) ([]byte, error) {
|
|
var sealedKey crypto.SealedKey
|
|
if globalCacheKMS == nil {
|
|
return nil, errKMSNotConfigured
|
|
}
|
|
key, err := globalCacheKMS.GenerateKey(ctx, "", kms.Context{bucket: pathJoin(bucket, object)})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
objectKey := crypto.GenerateKey(key.Plaintext, rand.Reader)
|
|
sealedKey = objectKey.Seal(key.Plaintext, crypto.GenerateIV(rand.Reader), crypto.S3.String(), bucket, object)
|
|
crypto.S3.CreateMetadata(metadata, key.KeyID, key.Ciphertext, sealedKey)
|
|
|
|
if etag, ok := metadata["etag"]; ok {
|
|
metadata["etag"] = hex.EncodeToString(objectKey.SealETag([]byte(etag)))
|
|
}
|
|
metadata[SSECacheEncrypted] = ""
|
|
return objectKey[:], nil
|
|
}
|
|
|
|
func (c *diskCache) GetLockContext(ctx context.Context, bucket, object string) (RWLocker, LockContext, error) {
|
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
|
cLock := c.NewNSLockFn(cachePath)
|
|
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
|
|
return cLock, lkctx, err
|
|
}
|
|
|
|
// Caches the object to disk
|
|
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
|
|
cLock, lkctx, err := c.GetLockContext(ctx, bucket, object)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer cLock.Unlock(lkctx.Cancel)
|
|
|
|
return c.put(ctx, bucket, object, data, size, rs, opts, incHitsOnly, writeback)
|
|
}
|
|
|
|
// Caches the object to disk
|
|
func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
|
|
if !c.diskSpaceAvailable(size) {
|
|
io.Copy(io.Discard, data)
|
|
return oi, errDiskFull
|
|
}
|
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
|
meta, _, numHits, err := c.statCache(ctx, cachePath)
|
|
// Case where object not yet cached
|
|
if osIsNotExist(err) && c.after >= 1 {
|
|
return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false)
|
|
}
|
|
// Case where object already has a cache metadata entry but not yet cached
|
|
if err == nil && numHits < c.after {
|
|
cETag := extractETag(meta.Meta)
|
|
bETag := extractETag(opts.UserDefined)
|
|
if cETag == bETag {
|
|
return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false)
|
|
}
|
|
incHitsOnly = true
|
|
}
|
|
|
|
if rs != nil {
|
|
return oi, c.putRange(ctx, bucket, object, data, size, rs, opts)
|
|
}
|
|
if !c.diskSpaceAvailable(size) {
|
|
return oi, errDiskFull
|
|
}
|
|
|
|
if writeback {
|
|
cachePath = getCacheWriteBackSHADir(c.dir, bucket, object)
|
|
}
|
|
|
|
if err := os.MkdirAll(cachePath, 0o777); err != nil {
|
|
removeAll(cachePath)
|
|
return oi, err
|
|
}
|
|
metadata := cloneMSS(opts.UserDefined)
|
|
reader := data
|
|
actualSize := uint64(size)
|
|
if globalCacheKMS != nil {
|
|
reader, err = newCacheEncryptReader(ctx, data, bucket, object, metadata)
|
|
if err != nil {
|
|
removeAll(cachePath)
|
|
return oi, err
|
|
}
|
|
actualSize, _ = sio.EncryptedSize(uint64(size))
|
|
}
|
|
n, md5sum, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize)
|
|
if IsErr(err, baseErrs...) {
|
|
// take the cache drive offline
|
|
c.setOffline()
|
|
}
|
|
if err != nil {
|
|
removeAll(cachePath)
|
|
return oi, err
|
|
}
|
|
|
|
if actualSize != uint64(n) {
|
|
removeAll(cachePath)
|
|
return oi, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
if writeback {
|
|
metadata["content-md5"] = md5sum
|
|
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
|
|
metadata["etag"] = hex.EncodeToString(md5bytes)
|
|
}
|
|
metadata[writeBackStatusHeader] = CommitPending.String()
|
|
}
|
|
return ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
ETag: metadata["etag"],
|
|
Size: n,
|
|
UserDefined: metadata,
|
|
},
|
|
c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly)
|
|
}
|
|
|
|
// Caches the range to disk
|
|
func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error {
|
|
rlen, err := rs.GetLength(size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !c.diskSpaceAvailable(rlen) {
|
|
return errDiskFull
|
|
}
|
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
|
if err := os.MkdirAll(cachePath, 0o777); err != nil {
|
|
return err
|
|
}
|
|
metadata := cloneMSS(opts.UserDefined)
|
|
reader := data
|
|
actualSize := uint64(rlen)
|
|
// objSize is the actual size of object (with encryption overhead if any)
|
|
objSize := uint64(size)
|
|
if globalCacheKMS != nil {
|
|
reader, err = newCacheEncryptReader(ctx, data, bucket, object, metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
actualSize, _ = sio.EncryptedSize(uint64(rlen))
|
|
objSize, _ = sio.EncryptedSize(uint64(size))
|
|
|
|
}
|
|
cacheFile := mustGetUUID()
|
|
n, _, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize)
|
|
if IsErr(err, baseErrs...) {
|
|
// take the cache drive offline
|
|
c.setOffline()
|
|
}
|
|
if err != nil {
|
|
removeAll(cachePath)
|
|
return err
|
|
}
|
|
if actualSize != uint64(n) {
|
|
removeAll(cachePath)
|
|
return IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile, false)
|
|
}
|
|
|
|
// checks streaming bitrot checksum of cached object before returning data
|
|
func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, offset, length int64, writer io.Writer) error {
|
|
h := HighwayHash256S.New()
|
|
|
|
checksumHash := make([]byte, h.Size())
|
|
|
|
startBlock := offset / cacheBlkSize
|
|
endBlock := (offset + length) / cacheBlkSize
|
|
|
|
// get block start offset
|
|
var blockStartOffset int64
|
|
if startBlock > 0 {
|
|
blockStartOffset = (cacheBlkSize + int64(h.Size())) * startBlock
|
|
}
|
|
|
|
tillLength := (cacheBlkSize + int64(h.Size())) * (endBlock - startBlock + 1)
|
|
|
|
// Start offset cannot be negative.
|
|
if offset < 0 {
|
|
logger.LogIf(ctx, errUnexpected)
|
|
return errUnexpected
|
|
}
|
|
|
|
// Writer cannot be nil.
|
|
if writer == nil {
|
|
logger.LogIf(ctx, errUnexpected)
|
|
return errUnexpected
|
|
}
|
|
var blockOffset, blockLength int64
|
|
rc, err := readCacheFileStream(filePath, blockStartOffset, tillLength)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rc.Close()
|
|
bufp := c.pool.Get().(*[]byte)
|
|
defer c.pool.Put(bufp)
|
|
|
|
for block := startBlock; block <= endBlock; block++ {
|
|
switch {
|
|
case startBlock == endBlock:
|
|
blockOffset = offset % cacheBlkSize
|
|
blockLength = length
|
|
case block == startBlock:
|
|
blockOffset = offset % cacheBlkSize
|
|
blockLength = cacheBlkSize - blockOffset
|
|
case block == endBlock:
|
|
blockOffset = 0
|
|
blockLength = (offset + length) % cacheBlkSize
|
|
default:
|
|
blockOffset = 0
|
|
blockLength = cacheBlkSize
|
|
}
|
|
if blockLength == 0 {
|
|
break
|
|
}
|
|
if _, err := io.ReadFull(rc, checksumHash); err != nil {
|
|
return err
|
|
}
|
|
h.Reset()
|
|
n, err := io.ReadFull(rc, *bufp)
|
|
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
eof := err == io.EOF || err == io.ErrUnexpectedEOF
|
|
if n == 0 && length != 0 {
|
|
// Reached EOF, nothing more to be done.
|
|
break
|
|
}
|
|
|
|
if _, e := h.Write((*bufp)[:n]); e != nil {
|
|
return e
|
|
}
|
|
hashBytes := h.Sum(nil)
|
|
|
|
if !bytes.Equal(hashBytes, checksumHash) {
|
|
err = fmt.Errorf("hashes do not match expected %s, got %s",
|
|
hex.EncodeToString(checksumHash), hex.EncodeToString(hashBytes))
|
|
logger.LogIf(GlobalContext, err)
|
|
return err
|
|
}
|
|
|
|
if _, err = io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil {
|
|
if err != io.ErrClosedPipe {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
eof = true
|
|
}
|
|
if eof {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get returns ObjectInfo and reader for object from disk cache
|
|
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
cLock := c.NewNSLockFn(cacheObjPath)
|
|
lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return nil, numHits, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer cLock.RUnlock(lkctx.Cancel)
|
|
|
|
var objInfo ObjectInfo
|
|
var rngInfo RangeInfo
|
|
if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil {
|
|
return nil, numHits, toObjectErr(err, bucket, object)
|
|
}
|
|
cacheFile := cacheDataFile
|
|
objSize := objInfo.Size
|
|
if !rngInfo.Empty() {
|
|
// for cached ranges, need to pass actual range file size to GetObjectReader
|
|
// and clear out range spec
|
|
cacheFile = rngInfo.File
|
|
objInfo.Size = rngInfo.Size
|
|
rs = nil
|
|
}
|
|
|
|
if objInfo.IsCompressed() {
|
|
// Cache isn't compressed.
|
|
delete(objInfo.UserDefined, ReservedMetadataPrefix+"compression")
|
|
}
|
|
|
|
// For a directory, we need to send an reader that returns no bytes.
|
|
if HasSuffix(object, SlashSeparator) {
|
|
// The lock taken above is released when
|
|
// objReader.Close() is called by the caller.
|
|
gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts)
|
|
return gr, numHits, gerr
|
|
}
|
|
fn, startOffset, length, nErr := NewGetObjectReader(rs, objInfo, opts)
|
|
if nErr != nil {
|
|
return nil, numHits, nErr
|
|
}
|
|
var totalBytesRead int64
|
|
|
|
pr, pw := xioutil.WaitPipe()
|
|
if len(objInfo.Parts) > 0 {
|
|
// For negative length read everything.
|
|
if length < 0 {
|
|
length = objInfo.Size - startOffset
|
|
}
|
|
|
|
// Reply back invalid range if the input offset and length fall out of range.
|
|
if startOffset > objInfo.Size || startOffset+length > objInfo.Size {
|
|
logger.LogIf(ctx, InvalidRange{startOffset, length, objInfo.Size}, logger.Application)
|
|
return nil, numHits, InvalidRange{startOffset, length, objInfo.Size}
|
|
}
|
|
// Get start part index and offset.
|
|
partIndex, partOffset, err := cacheObjectToPartOffset(objInfo, startOffset)
|
|
if err != nil {
|
|
return nil, numHits, InvalidRange{startOffset, length, objInfo.Size}
|
|
}
|
|
// Calculate endOffset according to length
|
|
endOffset := startOffset
|
|
if length > 0 {
|
|
endOffset += length - 1
|
|
}
|
|
|
|
// Get last part index to read given length.
|
|
lastPartIndex, _, err := cacheObjectToPartOffset(objInfo, endOffset)
|
|
if err != nil {
|
|
return nil, numHits, InvalidRange{startOffset, length, objInfo.Size}
|
|
}
|
|
go func() {
|
|
for ; partIndex <= lastPartIndex; partIndex++ {
|
|
if length == totalBytesRead {
|
|
break
|
|
}
|
|
partNumber := objInfo.Parts[partIndex].Number
|
|
// Save the current part name and size.
|
|
partSize := objInfo.Parts[partIndex].Size
|
|
partLength := partSize - partOffset
|
|
// partLength should be adjusted so that we don't write more data than what was requested.
|
|
if partLength > (length - totalBytesRead) {
|
|
partLength = length - totalBytesRead
|
|
}
|
|
filePath := pathJoin(cacheObjPath, fmt.Sprintf("part.%d", partNumber))
|
|
err := c.bitrotReadFromCache(ctx, filePath, partOffset, partLength, pw)
|
|
if err != nil {
|
|
removeAll(cacheObjPath)
|
|
pw.CloseWithError(err)
|
|
break
|
|
}
|
|
totalBytesRead += partLength
|
|
// partOffset will be valid only for the first part, hence reset it to 0 for
|
|
// the remaining parts.
|
|
partOffset = 0
|
|
} // End of read all parts loop.
|
|
pw.CloseWithError(err)
|
|
}()
|
|
} else {
|
|
go func() {
|
|
if writebackInProgress(objInfo.UserDefined) {
|
|
cacheObjPath = getCacheWriteBackSHADir(c.dir, bucket, object)
|
|
}
|
|
filePath := pathJoin(cacheObjPath, cacheFile)
|
|
err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw)
|
|
if err != nil {
|
|
removeAll(cacheObjPath)
|
|
}
|
|
pw.CloseWithError(err)
|
|
}()
|
|
}
|
|
|
|
// Cleanup function to cause the go routine above to exit, in
|
|
// case of incomplete read.
|
|
pipeCloser := func() { pr.CloseWithError(nil) }
|
|
|
|
gr, gerr := fn(pr, h, pipeCloser)
|
|
if gerr != nil {
|
|
return gr, numHits, gerr
|
|
}
|
|
if globalCacheKMS != nil {
|
|
// clean up internal SSE cache metadata
|
|
delete(gr.ObjInfo.UserDefined, xhttp.AmzServerSideEncryption)
|
|
}
|
|
if !rngInfo.Empty() {
|
|
// overlay Size with actual object size and not the range size
|
|
gr.ObjInfo.Size = objSize
|
|
}
|
|
return gr, numHits, nil
|
|
}
|
|
|
|
// deletes the cached object - caller should have taken write lock
|
|
func (c *diskCache) delete(bucket, object string) (err error) {
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
return removeAll(cacheObjPath)
|
|
}
|
|
|
|
// Deletes the cached object
|
|
func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) {
|
|
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
|
cLock := c.NewNSLockFn(cacheObjPath)
|
|
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer cLock.Unlock(lkctx.Cancel)
|
|
return removeAll(cacheObjPath)
|
|
}
|
|
|
|
// convenience function to check if object is cached on this diskCache
|
|
func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
|
|
if _, err := os.Stat(getCacheSHADir(c.dir, bucket, object)); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// queues writeback upload failures on server startup
|
|
func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
|
|
defer close(c.retryWritebackCh)
|
|
filterFn := func(name string, typ os.FileMode) error {
|
|
if name == minioMetaBucket {
|
|
// Proceed to next file.
|
|
return nil
|
|
}
|
|
cacheDir := pathJoin(c.dir, name)
|
|
meta, _, _, err := c.statCachedMeta(ctx, cacheDir)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
objInfo := meta.ToObjectInfo()
|
|
status, ok := objInfo.UserDefined[writeBackStatusHeader]
|
|
if !ok || status == CommitComplete.String() {
|
|
return nil
|
|
}
|
|
select {
|
|
case c.retryWritebackCh <- objInfo:
|
|
default:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if err := readDirFn(c.dir, filterFn); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// NewMultipartUpload caches multipart uploads when writethrough is MINIO_CACHE_COMMIT mode
|
|
// multiparts are saved in .minio.sys/multipart/cachePath/uploadID dir until finalized. Then the individual parts
|
|
// are moved from the upload dir to cachePath/ directory.
|
|
func (c *diskCache) NewMultipartUpload(ctx context.Context, bucket, object, uID string, opts ObjectOptions) (uploadID string, err error) {
|
|
uploadID = uID
|
|
if uploadID == "" {
|
|
return "", InvalidUploadID{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
UploadID: uploadID,
|
|
}
|
|
}
|
|
|
|
cachePath := getMultipartCacheSHADir(c.dir, bucket, object)
|
|
uploadIDDir := path.Join(cachePath, uploadID)
|
|
if err := mkdirAll(uploadIDDir, 0o777); err != nil {
|
|
return uploadID, err
|
|
}
|
|
metaPath := pathJoin(uploadIDDir, cacheMetaJSONFile)
|
|
|
|
f, err := OpenFile(metaPath, os.O_RDWR|os.O_CREATE|writeMode, 0o666)
|
|
if err != nil {
|
|
return uploadID, err
|
|
}
|
|
defer f.Close()
|
|
|
|
m := &cacheMeta{
|
|
Version: cacheMetaVersion,
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
if err := jsonLoad(f, m); err != nil && err != io.EOF {
|
|
return uploadID, err
|
|
}
|
|
|
|
m.Meta = opts.UserDefined
|
|
|
|
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
|
|
m.Stat.ModTime = UTCNow()
|
|
if globalCacheKMS != nil {
|
|
m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
|
|
if _, err := newCacheEncryptMetadata(ctx, bucket, object, m.Meta); err != nil {
|
|
return uploadID, err
|
|
}
|
|
}
|
|
err = jsonSave(f, m)
|
|
return uploadID, err
|
|
}
|
|
|
|
// PutObjectPart caches part to cache multipart path.
|
|
func (c *diskCache) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data io.Reader, size int64, opts ObjectOptions) (partInfo PartInfo, err error) {
|
|
oi := PartInfo{}
|
|
if !c.diskSpaceAvailable(size) {
|
|
io.Copy(io.Discard, data)
|
|
return oi, errDiskFull
|
|
}
|
|
cachePath := getMultipartCacheSHADir(c.dir, bucket, object)
|
|
uploadIDDir := path.Join(cachePath, uploadID)
|
|
|
|
partIDLock := c.NewNSLockFn(pathJoin(uploadIDDir, strconv.Itoa(partID)))
|
|
lkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
ctx = lkctx.Context()
|
|
defer partIDLock.Unlock(lkctx.Cancel)
|
|
meta, _, _, err := c.statCache(ctx, uploadIDDir)
|
|
// Case where object not yet cached
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
if !c.diskSpaceAvailable(size) {
|
|
return oi, errDiskFull
|
|
}
|
|
reader := data
|
|
actualSize := uint64(size)
|
|
if globalCacheKMS != nil {
|
|
reader, err = newCachePartEncryptReader(ctx, bucket, object, partID, data, size, meta.Meta)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
actualSize, _ = sio.EncryptedSize(uint64(size))
|
|
}
|
|
n, md5sum, err := c.bitrotWriteToCache(uploadIDDir, fmt.Sprintf("part.%d", partID), reader, actualSize)
|
|
if IsErr(err, baseErrs...) {
|
|
// take the cache drive offline
|
|
c.setOffline()
|
|
}
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
if actualSize != uint64(n) {
|
|
return oi, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
var md5hex string
|
|
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
|
|
md5hex = hex.EncodeToString(md5bytes)
|
|
}
|
|
|
|
pInfo := PartInfo{
|
|
PartNumber: partID,
|
|
ETag: md5hex,
|
|
Size: n,
|
|
ActualSize: int64(actualSize),
|
|
LastModified: UTCNow(),
|
|
}
|
|
return pInfo, nil
|
|
}
|
|
|
|
// SavePartMetadata saves part upload metadata to uploadID directory on disk cache
|
|
func (c *diskCache) SavePartMetadata(ctx context.Context, bucket, object, uploadID string, partID int, pinfo PartInfo) error {
|
|
cachePath := getMultipartCacheSHADir(c.dir, bucket, object)
|
|
uploadDir := path.Join(cachePath, uploadID)
|
|
|
|
// acquire a write lock at upload path to update cache.json
|
|
uploadLock := c.NewNSLockFn(uploadDir)
|
|
ulkctx, err := uploadLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer uploadLock.Unlock(ulkctx.Cancel)
|
|
|
|
metaPath := pathJoin(uploadDir, cacheMetaJSONFile)
|
|
f, err := OpenFile(metaPath, os.O_RDWR|writeMode, 0o666)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
m := &cacheMeta{}
|
|
if err := jsonLoad(f, m); err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
var key []byte
|
|
var objectEncryptionKey crypto.ObjectKey
|
|
if globalCacheKMS != nil {
|
|
// Calculating object encryption key
|
|
key, err = decryptObjectMeta(key, bucket, object, m.Meta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
copy(objectEncryptionKey[:], key)
|
|
pinfo.ETag = hex.EncodeToString(objectEncryptionKey.SealETag([]byte(pinfo.ETag)))
|
|
|
|
}
|
|
|
|
pIdx := cacheObjPartIndex(m, partID)
|
|
if pIdx == -1 {
|
|
m.PartActualSizes = append(m.PartActualSizes, pinfo.ActualSize)
|
|
m.PartNumbers = append(m.PartNumbers, pinfo.PartNumber)
|
|
m.PartETags = append(m.PartETags, pinfo.ETag)
|
|
m.PartSizes = append(m.PartSizes, pinfo.Size)
|
|
} else {
|
|
m.PartActualSizes[pIdx] = pinfo.ActualSize
|
|
m.PartNumbers[pIdx] = pinfo.PartNumber
|
|
m.PartETags[pIdx] = pinfo.ETag
|
|
m.PartSizes[pIdx] = pinfo.Size
|
|
}
|
|
return jsonSave(f, m)
|
|
}
|
|
|
|
// newCachePartEncryptReader returns encrypted cache part reader, with part data encrypted with part encryption key
|
|
func newCachePartEncryptReader(ctx context.Context, bucket, object string, partID int, content io.Reader, size int64, metadata map[string]string) (r io.Reader, err error) {
|
|
var key []byte
|
|
var objectEncryptionKey, partEncryptionKey crypto.ObjectKey
|
|
|
|
// Calculating object encryption key
|
|
key, err = decryptObjectMeta(key, bucket, object, metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
copy(objectEncryptionKey[:], key)
|
|
|
|
partEnckey := objectEncryptionKey.DerivePartKey(uint32(partID))
|
|
copy(partEncryptionKey[:], partEnckey[:])
|
|
wantSize := int64(-1)
|
|
if size >= 0 {
|
|
info := ObjectInfo{Size: size}
|
|
wantSize = info.EncryptedSize()
|
|
}
|
|
hReader, err := hash.NewReader(content, wantSize, "", "", size)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pReader := NewPutObjReader(hReader)
|
|
content, err = pReader.WithEncryption(hReader, &partEncryptionKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reader, err := sio.EncryptReader(content, sio.Config{Key: partEncryptionKey[:], MinVersion: sio.Version20, CipherSuites: fips.DARECiphers()})
|
|
if err != nil {
|
|
return nil, crypto.ErrInvalidCustomerKey
|
|
}
|
|
return reader, nil
|
|
}
|
|
|
|
// uploadIDExists returns error if uploadID is not being cached.
|
|
func (c *diskCache) uploadIDExists(bucket, object, uploadID string) (err error) {
|
|
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
|
|
uploadIDDir := path.Join(mpartCachePath, uploadID)
|
|
if _, err := Stat(uploadIDDir); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CompleteMultipartUpload completes multipart upload on cache. The parts and cache.json are moved from the temporary location in
|
|
// .minio.sys/multipart/cacheSHA/.. to cacheSHA path after part verification succeeds.
|
|
func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, roi ObjectInfo, opts ObjectOptions) (oi ObjectInfo, err error) {
|
|
cachePath := getCacheSHADir(c.dir, bucket, object)
|
|
cLock := c.NewNSLockFn(cachePath)
|
|
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
ctx = lkctx.Context()
|
|
defer cLock.Unlock(lkctx.Cancel)
|
|
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
|
|
uploadIDDir := path.Join(mpartCachePath, uploadID)
|
|
|
|
uploadMeta, _, _, uerr := c.statCache(ctx, uploadIDDir)
|
|
if uerr != nil {
|
|
return oi, errUploadIDNotFound
|
|
}
|
|
|
|
// Case where object not yet cached
|
|
// Calculate full object size.
|
|
var objectSize int64
|
|
|
|
// Calculate consolidated actual size.
|
|
var objectActualSize int64
|
|
|
|
var partETags []string
|
|
partETags, err = decryptCachePartETags(uploadMeta)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
for i, pi := range uploadedParts {
|
|
pIdx := cacheObjPartIndex(uploadMeta, pi.PartNumber)
|
|
if pIdx == -1 {
|
|
invp := InvalidPart{
|
|
PartNumber: pi.PartNumber,
|
|
GotETag: pi.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
pi.ETag = canonicalizeETag(pi.ETag)
|
|
if partETags[pIdx] != pi.ETag {
|
|
invp := InvalidPart{
|
|
PartNumber: pi.PartNumber,
|
|
ExpETag: partETags[pIdx],
|
|
GotETag: pi.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
// All parts except the last part has to be atleast 5MB.
|
|
if (i < len(uploadedParts)-1) && !isMinAllowedPartSize(uploadMeta.PartActualSizes[pIdx]) {
|
|
return oi, PartTooSmall{
|
|
PartNumber: pi.PartNumber,
|
|
PartSize: uploadMeta.PartActualSizes[pIdx],
|
|
PartETag: pi.ETag,
|
|
}
|
|
}
|
|
|
|
// Save for total object size.
|
|
objectSize += uploadMeta.PartSizes[pIdx]
|
|
|
|
// Save the consolidated actual size.
|
|
objectActualSize += uploadMeta.PartActualSizes[pIdx]
|
|
|
|
}
|
|
uploadMeta.Stat.Size = objectSize
|
|
uploadMeta.Stat.ModTime = roi.ModTime
|
|
uploadMeta.Bucket = bucket
|
|
uploadMeta.Object = object
|
|
// if encrypted - make sure ETag updated
|
|
|
|
uploadMeta.Meta["etag"] = roi.ETag
|
|
uploadMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
|
var cpartETags []string
|
|
var cpartNums []int
|
|
var cpartSizes, cpartActualSizes []int64
|
|
for _, pi := range uploadedParts {
|
|
pIdx := cacheObjPartIndex(uploadMeta, pi.PartNumber)
|
|
if pIdx != -1 {
|
|
cpartETags = append(cpartETags, uploadMeta.PartETags[pIdx])
|
|
cpartNums = append(cpartNums, uploadMeta.PartNumbers[pIdx])
|
|
cpartSizes = append(cpartSizes, uploadMeta.PartSizes[pIdx])
|
|
cpartActualSizes = append(cpartActualSizes, uploadMeta.PartActualSizes[pIdx])
|
|
}
|
|
}
|
|
uploadMeta.PartETags = cpartETags
|
|
uploadMeta.PartSizes = cpartSizes
|
|
uploadMeta.PartActualSizes = cpartActualSizes
|
|
uploadMeta.PartNumbers = cpartNums
|
|
uploadMeta.Hits++
|
|
metaPath := pathJoin(uploadIDDir, cacheMetaJSONFile)
|
|
|
|
f, err := OpenFile(metaPath, os.O_RDWR|os.O_CREATE|writeMode, 0o666)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
defer f.Close()
|
|
jsonSave(f, uploadMeta)
|
|
for _, pi := range uploadedParts {
|
|
part := fmt.Sprintf("part.%d", pi.PartNumber)
|
|
renameAll(pathJoin(uploadIDDir, part), pathJoin(cachePath, part))
|
|
}
|
|
renameAll(pathJoin(uploadIDDir, cacheMetaJSONFile), pathJoin(cachePath, cacheMetaJSONFile))
|
|
removeAll(uploadIDDir) // clean up any unused parts in the uploadIDDir
|
|
return uploadMeta.ToObjectInfo(), nil
|
|
}
|
|
|
|
func (c *diskCache) AbortUpload(bucket, object, uploadID string) (err error) {
|
|
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
|
|
uploadDir := path.Join(mpartCachePath, uploadID)
|
|
return removeAll(uploadDir)
|
|
}
|
|
|
|
// cacheObjPartIndex - returns the index of matching object part number.
|
|
func cacheObjPartIndex(m *cacheMeta, partNumber int) int {
|
|
for i, part := range m.PartNumbers {
|
|
if partNumber == part {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// cacheObjectToPartOffset calculates part index and part offset for requested offset for content on cache.
|
|
func cacheObjectToPartOffset(objInfo ObjectInfo, offset int64) (partIndex int, partOffset int64, err error) {
|
|
if offset == 0 {
|
|
// Special case - if offset is 0, then partIndex and partOffset are always 0.
|
|
return 0, 0, nil
|
|
}
|
|
partOffset = offset
|
|
// Seek until object offset maps to a particular part offset.
|
|
for i, part := range objInfo.Parts {
|
|
partIndex = i
|
|
// Offset is smaller than size we have reached the proper part offset.
|
|
if partOffset < part.Size {
|
|
return partIndex, partOffset, nil
|
|
}
|
|
// Continue to towards the next part.
|
|
partOffset -= part.Size
|
|
}
|
|
// Offset beyond the size of the object return InvalidRange.
|
|
return 0, 0, InvalidRange{}
|
|
}
|
|
|
|
// get path of on-going multipart caching
|
|
func getMultipartCacheSHADir(dir, bucket, object string) string {
|
|
return pathJoin(dir, minioMetaBucket, cacheMultipartDir, getSHA256Hash([]byte(pathJoin(bucket, object))))
|
|
}
|
|
|
|
// clean up stale cache multipart uploads according to cleanup interval.
|
|
func (c *diskCache) cleanupStaleUploads(ctx context.Context) {
|
|
timer := time.NewTimer(cacheStaleUploadCleanupInterval)
|
|
defer timer.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
now := time.Now()
|
|
readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir), func(shaDir string, typ os.FileMode) error {
|
|
return readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir, shaDir), func(uploadIDDir string, typ os.FileMode) error {
|
|
uploadIDPath := pathJoin(c.dir, minioMetaBucket, cacheMultipartDir, shaDir, uploadIDDir)
|
|
fi, err := Stat(uploadIDPath)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if now.Sub(fi.ModTime()) > cacheStaleUploadExpiry {
|
|
removeAll(uploadIDPath)
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
// clean up of writeback folder where cache.json no longer exists in the main c.dir/<sha256(bucket,object> path
|
|
// and if past upload expiry window.
|
|
readDirFn(pathJoin(c.dir, minioMetaBucket, cacheWritebackDir), func(shaDir string, typ os.FileMode) error {
|
|
wbdir := pathJoin(c.dir, minioMetaBucket, cacheWritebackDir, shaDir)
|
|
cachedir := pathJoin(c.dir, shaDir)
|
|
if _, err := Stat(cachedir); os.IsNotExist(err) {
|
|
fi, err := Stat(wbdir)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if now.Sub(fi.ModTime()) > cacheWBStaleUploadExpiry {
|
|
return removeAll(wbdir)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Reset for the next interval
|
|
timer.Reset(cacheStaleUploadCleanupInterval)
|
|
}
|
|
}
|
|
}
|