Add support for caching multipart in writethrough mode (#13507)

This commit is contained in:
Poorna K
2021-11-01 11:11:58 -04:00
committed by GitHub
parent 6d53e3c2d7
commit 15dcacc1fc
7 changed files with 975 additions and 91 deletions

View File

@@ -29,6 +29,8 @@ import (
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
@@ -39,6 +41,7 @@ import (
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/disk"
"github.com/minio/minio/internal/fips"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/kms"
@@ -48,13 +51,18 @@ import (
const (
// cache.json object metadata for cached objects.
cacheMetaJSONFile = "cache.json"
cacheDataFile = "part.1"
cacheMetaVersion = "1.0.0"
cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days
cacheMetaJSONFile = "cache.json"
cacheDataFile = "part.1"
cacheDataFilePrefix = "part"
cacheMetaVersion = "1.0.0"
cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days
// SSECacheEncrypted is the metadata key indicating that the object
// is a cache entry encrypted with cache KMS master key in globalCacheKMS.
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
cacheMultipartDir = "multipart"
cacheStaleUploadCleanupInterval = time.Hour * 24
cacheStaleUploadExpiry = time.Hour * 24
)
// CacheChecksumInfoV1 - carries checksums of individual blocks on disk.
@@ -78,6 +86,11 @@ type cacheMeta struct {
Hits int `json:"hits,omitempty"`
Bucket string `json:"bucket,omitempty"`
Object string `json:"object,omitempty"`
// for multipart upload
PartNumbers []int `json:"partNums,omitempty"` // Part Numbers
PartETags []string `json:"partETags,omitempty"` // Part ETags
PartSizes []int64 `json:"partSizes,omitempty"` // Part Sizes
PartActualSizes []int64 `json:"partASizes,omitempty"` // Part ActualSizes (compression)
}
// RangeInfo has the range, file and range length information for a cached range.
@@ -104,13 +117,13 @@ func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
CacheStatus: CacheHit,
CacheLookupStatus: CacheHit,
}
meta := cloneMSS(m.Meta)
// We set file info only if its valid.
o.Size = m.Stat.Size
o.ETag = extractETag(m.Meta)
o.ContentType = m.Meta["content-type"]
o.ContentEncoding = m.Meta["content-encoding"]
if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok {
o.ETag = extractETag(meta)
o.ContentType = meta["content-type"]
o.ContentEncoding = meta["content-encoding"]
if storageClass, ok := meta[xhttp.AmzStorageClass]; ok {
o.StorageClass = storageClass
} else {
o.StorageClass = globalMinioDefaultStorageClass
@@ -119,20 +132,26 @@ func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
t time.Time
e error
)
if exp, ok := m.Meta["expires"]; ok {
if exp, ok := meta["expires"]; ok {
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
o.Expires = t.UTC()
}
}
if mtime, ok := m.Meta["last-modified"]; ok {
if mtime, ok := meta["last-modified"]; ok {
if t, e = time.Parse(http.TimeFormat, mtime); e == nil {
o.ModTime = t.UTC()
}
}
o.Parts = make([]ObjectPartInfo, len(m.PartNumbers))
for i := range m.PartNumbers {
o.Parts[i].Number = m.PartNumbers[i]
o.Parts[i].Size = m.PartSizes[i]
o.Parts[i].ETag = m.PartETags[i]
o.Parts[i].ActualSize = m.PartActualSizes[i]
}
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of user-defined metadata
o.UserDefined = cleanMetadata(m.Meta)
o.UserDefined = cleanMetadata(meta)
return o
}
@@ -142,16 +161,18 @@ type diskCache struct {
online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
purgeRunning int32
triggerGC chan struct{}
dir string // caching directory
stats CacheDiskStats // disk cache stats for prometheus
quotaPct int // max usage in %
pool sync.Pool
after int // minimum accesses before an object is cached.
lowWatermark int
highWatermark int
enableRange bool
commitWriteback bool
triggerGC chan struct{}
dir string // caching directory
stats CacheDiskStats // disk cache stats for prometheus
quotaPct int // max usage in %
pool sync.Pool
after int // minimum accesses before an object is cached.
lowWatermark int
highWatermark int
enableRange bool
commitWriteback bool
commitWritethrough bool
retryWritebackCh chan ObjectInfo
// nsMutex namespace lock
nsMutex *nsLockMap
@@ -170,15 +191,17 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err)
}
cache := diskCache{
dir: dir,
triggerGC: make(chan struct{}, 1),
stats: CacheDiskStats{Dir: dir},
quotaPct: quotaPct,
after: config.After,
lowWatermark: config.WatermarkLow,
highWatermark: config.WatermarkHigh,
enableRange: config.Range,
commitWriteback: config.CommitWriteback,
dir: dir,
triggerGC: make(chan struct{}, 1),
stats: CacheDiskStats{Dir: dir},
quotaPct: quotaPct,
after: config.After,
lowWatermark: config.WatermarkLow,
highWatermark: config.WatermarkHigh,
enableRange: config.Range,
commitWriteback: config.CacheCommitMode == CommitWriteBack,
commitWritethrough: config.CacheCommitMode == CommitWriteThrough,
retryWritebackCh: make(chan ObjectInfo, 10000),
online: 1,
pool: sync.Pool{
@@ -190,6 +213,7 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
nsMutex: newNSLock(false),
}
go cache.purgeWait(ctx)
go cache.cleanupStaleUploads(ctx)
if cache.commitWriteback {
go cache.scanCacheWritebackFailures(ctx)
}
@@ -303,16 +327,11 @@ func (c *diskCache) purge(ctx context.Context) {
// ignore error we know what value we are passing.
scorer, _ := newFileScorer(toFree, time.Now().Unix(), 100)
// this function returns FileInfo for cached range files and cache data file.
fiStatFn := func(ranges map[string]string, dataFile, pathPrefix string) map[string]os.FileInfo {
// this function returns FileInfo for cached range files.
fiStatRangesFn := func(ranges map[string]string, pathPrefix string) map[string]os.FileInfo {
fm := make(map[string]os.FileInfo)
fname := pathJoin(pathPrefix, dataFile)
if fi, err := os.Stat(fname); err == nil {
fm[fname] = fi
}
for _, rngFile := range ranges {
fname = pathJoin(pathPrefix, rngFile)
fname := pathJoin(pathPrefix, rngFile)
if fi, err := os.Stat(fname); err == nil {
fm[fname] = fi
}
@@ -320,6 +339,26 @@ func (c *diskCache) purge(ctx context.Context) {
return fm
}
// this function returns most recent Atime among cached part files.
lastAtimeFn := func(partNums []int, pathPrefix string) time.Time {
lastATime := timeSentinel
for _, pnum := range partNums {
fname := pathJoin(pathPrefix, fmt.Sprintf("%s.%d", cacheDataFilePrefix, pnum))
if fi, err := os.Stat(fname); err == nil {
if atime.Get(fi).After(lastATime) {
lastATime = atime.Get(fi)
}
}
}
if len(partNums) == 0 {
fname := pathJoin(pathPrefix, cacheDataFile)
if fi, err := os.Stat(fname); err == nil {
lastATime = atime.Get(fi)
}
}
return lastATime
}
filterFn := func(name string, typ os.FileMode) error {
if name == minioMetaBucket {
// Proceed to next file.
@@ -334,9 +373,10 @@ func (c *diskCache) purge(ctx context.Context) {
// Proceed to next file.
return nil
}
// stat all cached file ranges and cacheDataFile.
cachedFiles := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, name))
// get last access time of cache part files
lastAtime := lastAtimeFn(meta.PartNumbers, pathJoin(c.dir, name))
// stat all cached file ranges.
cachedRngFiles := fiStatRangesFn(meta.Ranges, pathJoin(c.dir, name))
objInfo := meta.ToObjectInfo("", "")
// prevent gc from clearing un-synced commits. This metadata is present when
// cache writeback commit setting is enabled.
@@ -345,7 +385,27 @@ func (c *diskCache) purge(ctx context.Context) {
return nil
}
cc := cacheControlOpts(objInfo)
for fname, fi := range cachedFiles {
switch {
case cc != nil:
if cc.isStale(objInfo.ModTime) {
if err = removeAll(cacheDir); err != nil {
logger.LogIf(ctx, err)
}
scorer.adjustSaveBytes(-objInfo.Size)
// break early if sufficient disk space reclaimed.
if c.diskUsageLow() {
// if we found disk usage is already low, we return nil filtering is complete.
return errDoneForNow
}
}
case lastAtime != timeSentinel:
// cached multipart or single part
objInfo.AccTime = lastAtime
objInfo.Name = pathJoin(c.dir, name, cacheDataFile)
scorer.addFileWithObjInfo(objInfo, numHits)
}
for fname, fi := range cachedRngFiles {
if cc != nil {
if cc.isStale(objInfo.ModTime) {
if err = removeAll(fname); err != nil {
@@ -365,7 +425,7 @@ func (c *diskCache) purge(ctx context.Context) {
}
// clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json
fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile))
if err != nil || (fi.ModTime().Before(expiry) && len(cachedFiles) == 0) {
if err != nil || (fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) {
removeAll(cacheDir)
scorer.adjustSaveBytes(-fi.Size())
// Proceed to next file.
@@ -581,8 +641,11 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
if m.Meta == nil {
m.Meta = make(map[string]string)
}
if etag, ok := meta["etag"]; ok {
m.Meta["etag"] = etag
// save etag in m.Meta if missing
if _, ok := m.Meta["etag"]; !ok {
if etag, ok := meta["etag"]; ok {
m.Meta["etag"] = etag
}
}
}
m.Hits++
@@ -639,7 +702,7 @@ func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Rea
}
hashBytes := h.Sum(nil)
// compute md5Hash of original data stream if writeback commit to cache
if c.commitWriteback {
if c.commitWriteback || c.commitWritethrough {
if _, err = md5Hash.Write((*bufp)[:n]); err != nil {
return 0, "", err
}
@@ -655,8 +718,9 @@ func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Rea
break
}
}
md5sumCurr := md5Hash.Sum(nil)
return bytesWritten, base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)), nil
return bytesWritten, base64.StdEncoding.EncodeToString(md5sumCurr), nil
}
func newCacheEncryptReader(content io.Reader, bucket, object string, metadata map[string]string) (r io.Reader, err error) {
@@ -897,7 +961,7 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
return err
}
if _, err := io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil {
if _, err = io.Copy(writer, bytes.NewReader((*bufp)[blockOffset:blockOffset+blockLength])); err != nil {
if err != io.ErrClosedPipe {
logger.LogIf(ctx, err)
return err
@@ -950,19 +1014,78 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts)
return gr, numHits, gerr
}
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts)
fn, startOffset, length, nErr := NewGetObjectReader(rs, objInfo, opts)
if nErr != nil {
return nil, numHits, nErr
}
filePath := pathJoin(cacheObjPath, cacheFile)
var totalBytesRead int64
pr, pw := xioutil.WaitPipe()
go func() {
err := c.bitrotReadFromCache(ctx, filePath, off, length, pw)
if err != nil {
removeAll(cacheObjPath)
if len(objInfo.Parts) > 0 {
// For negative length read everything.
if length < 0 {
length = objInfo.Size - startOffset
}
pw.CloseWithError(err)
}()
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > objInfo.Size || startOffset+length > objInfo.Size {
logger.LogIf(ctx, InvalidRange{startOffset, length, objInfo.Size}, logger.Application)
return nil, numHits, InvalidRange{startOffset, length, objInfo.Size}
}
// Get start part index and offset.
partIndex, partOffset, err := cacheObjectToPartOffset(objInfo, startOffset)
if err != nil {
return nil, numHits, InvalidRange{startOffset, length, objInfo.Size}
}
// Calculate endOffset according to length
endOffset := startOffset
if length > 0 {
endOffset += length - 1
}
// Get last part index to read given length.
lastPartIndex, _, err := cacheObjectToPartOffset(objInfo, endOffset)
if err != nil {
return nil, numHits, InvalidRange{startOffset, length, objInfo.Size}
}
go func() {
for ; partIndex <= lastPartIndex; partIndex++ {
if length == totalBytesRead {
break
}
partNumber := objInfo.Parts[partIndex].Number
// Save the current part name and size.
partSize := objInfo.Parts[partIndex].Size
partLength := partSize - partOffset
// partLength should be adjusted so that we don't write more data than what was requested.
if partLength > (length - totalBytesRead) {
partLength = length - totalBytesRead
}
filePath := pathJoin(cacheObjPath, fmt.Sprintf("part.%d", partNumber))
err := c.bitrotReadFromCache(ctx, filePath, partOffset, partLength, pw)
if err != nil {
removeAll(cacheObjPath)
pw.CloseWithError(err)
break
}
totalBytesRead += partLength
// partOffset will be valid only for the first part, hence reset it to 0 for
// the remaining parts.
partOffset = 0
} // End of read all parts loop.
pr.CloseWithError(err)
}()
} else {
go func() {
filePath := pathJoin(cacheObjPath, cacheFile)
err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw)
if err != nil {
removeAll(cacheObjPath)
}
pw.CloseWithError(err)
}()
}
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
pipeCloser := func() { pr.CloseWithError(nil) }
@@ -980,7 +1103,6 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
gr.ObjInfo.Size = objSize
}
return gr, numHits, nil
}
// Deletes the cached object
@@ -1040,3 +1162,397 @@ func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
return
}
}
// NewMultipartUpload caches multipart uploads when writethrough is MINIO_CACHE_COMMIT mode
// multiparts are saved in .minio.sys/multipart/cachePath/uploadID dir until finalized. Then the individual parts
// are moved from the upload dir to cachePath/ directory.
func (c *diskCache) NewMultipartUpload(ctx context.Context, bucket, object, uID string, opts ObjectOptions) (uploadID string, err error) {
uploadID = uID
if uploadID == "" {
return "", InvalidUploadID{
Bucket: bucket,
Object: object,
UploadID: uploadID,
}
}
cachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadIDDir := path.Join(cachePath, uploadID)
if err := os.MkdirAll(uploadIDDir, 0777); err != nil {
return uploadID, err
}
metaPath := pathJoin(uploadIDDir, cacheMetaJSONFile)
f, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return uploadID, err
}
defer f.Close()
m := &cacheMeta{
Version: cacheMetaVersion,
Bucket: bucket,
Object: object,
}
if err := jsonLoad(f, m); err != nil && err != io.EOF {
return uploadID, err
}
m.Meta = opts.UserDefined
m.Meta[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
if c.commitWriteback {
m.Meta[writeBackStatusHeader] = CommitPending.String()
}
m.Stat.ModTime = UTCNow()
if globalCacheKMS != nil {
if _, err := newCacheEncryptMetadata(bucket, object, m.Meta); err != nil {
return uploadID, err
}
}
err = jsonSave(f, m)
return uploadID, err
}
// PutObjectPart caches part to cache multipart path.
func (c *diskCache) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data io.Reader, size int64, opts ObjectOptions) (partInfo PartInfo, err error) {
oi := PartInfo{}
if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data)
return oi, errDiskFull
}
cachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadIDDir := path.Join(cachePath, uploadID)
partIDLock := c.NewNSLockFn(pathJoin(uploadIDDir, strconv.Itoa(partID)))
lkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
ctx = lkctx.Context()
defer partIDLock.Unlock(lkctx.Cancel)
meta, _, _, err := c.statCache(ctx, uploadIDDir)
// Case where object not yet cached
if err != nil {
return oi, err
}
if !c.diskSpaceAvailable(size) {
return oi, errDiskFull
}
reader := data
var actualSize = uint64(size)
if globalCacheKMS != nil {
reader, err = newCachePartEncryptReader(ctx, bucket, object, partID, data, size, meta.Meta)
if err != nil {
return oi, err
}
actualSize, _ = sio.EncryptedSize(uint64(size))
}
n, md5sum, err := c.bitrotWriteToCache(uploadIDDir, fmt.Sprintf("part.%d", partID), reader, actualSize)
if IsErr(err, baseErrs...) {
// take the cache drive offline
c.setOffline()
}
if err != nil {
return oi, err
}
if actualSize != uint64(n) {
return oi, IncompleteBody{Bucket: bucket, Object: object}
}
var md5hex string
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
md5hex = hex.EncodeToString(md5bytes)
}
pInfo := PartInfo{
PartNumber: partID,
ETag: md5hex,
Size: n,
ActualSize: int64(actualSize),
LastModified: UTCNow(),
}
return pInfo, nil
}
// SavePartMetadata saves part upload metadata to uploadID directory on disk cache
func (c *diskCache) SavePartMetadata(ctx context.Context, bucket, object, uploadID string, partID int, pinfo PartInfo) error {
cachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadDir := path.Join(cachePath, uploadID)
// acquire a write lock at upload path to update cache.json
uploadLock := c.NewNSLockFn(uploadDir)
ulkctx, err := uploadLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
defer uploadLock.Unlock(ulkctx.Cancel)
metaPath := pathJoin(uploadDir, cacheMetaJSONFile)
f, err := os.OpenFile(metaPath, os.O_RDWR, 0666)
if err != nil {
return err
}
defer f.Close()
m := &cacheMeta{}
if err := jsonLoad(f, m); err != nil && err != io.EOF {
return err
}
var key []byte
var objectEncryptionKey crypto.ObjectKey
if globalCacheKMS != nil {
// Calculating object encryption key
key, err = decryptObjectInfo(key, bucket, object, m.Meta)
if err != nil {
return err
}
copy(objectEncryptionKey[:], key)
pinfo.ETag = hex.EncodeToString(objectEncryptionKey.SealETag([]byte(pinfo.ETag)))
}
pIdx := cacheObjPartIndex(m, partID)
if pIdx == -1 {
m.PartActualSizes = append(m.PartActualSizes, pinfo.ActualSize)
m.PartNumbers = append(m.PartNumbers, pinfo.PartNumber)
m.PartETags = append(m.PartETags, pinfo.ETag)
m.PartSizes = append(m.PartSizes, pinfo.Size)
} else {
m.PartActualSizes[pIdx] = pinfo.ActualSize
m.PartNumbers[pIdx] = pinfo.PartNumber
m.PartETags[pIdx] = pinfo.ETag
m.PartSizes[pIdx] = pinfo.Size
}
return jsonSave(f, m)
}
// newCachePartEncryptReader returns encrypted cache part reader, with part data encrypted with part encryption key
func newCachePartEncryptReader(ctx context.Context, bucket, object string, partID int, content io.Reader, size int64, metadata map[string]string) (r io.Reader, err error) {
var key []byte
var objectEncryptionKey, partEncryptionKey crypto.ObjectKey
// Calculating object encryption key
key, err = decryptObjectInfo(key, bucket, object, metadata)
if err != nil {
return nil, err
}
copy(objectEncryptionKey[:], key)
partEnckey := objectEncryptionKey.DerivePartKey(uint32(partID))
copy(partEncryptionKey[:], partEnckey[:])
wantSize := int64(-1)
if size >= 0 {
info := ObjectInfo{Size: size}
wantSize = info.EncryptedSize()
}
hReader, err := hash.NewReader(content, wantSize, "", "", size)
if err != nil {
return nil, err
}
pReader := NewPutObjReader(hReader)
content, err = pReader.WithEncryption(hReader, &partEncryptionKey)
if err != nil {
return nil, err
}
reader, err := sio.EncryptReader(content, sio.Config{Key: partEncryptionKey[:], MinVersion: sio.Version20, CipherSuites: fips.CipherSuitesDARE()})
if err != nil {
return nil, crypto.ErrInvalidCustomerKey
}
return reader, nil
}
// uploadIDExists returns error if uploadID is not being cached.
func (c *diskCache) uploadIDExists(bucket, object, uploadID string) (err error) {
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadIDDir := path.Join(mpartCachePath, uploadID)
if _, err := os.Stat(uploadIDDir); err != nil {
return err
}
return nil
}
// CompleteMultipartUpload completes multipart upload on cache. The parts and cache.json are moved from the temporary location in
// .minio.sys/multipart/cacheSHA/.. to cacheSHA path after part verification succeeds.
func (c *diskCache) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, roi ObjectInfo, opts ObjectOptions) (oi ObjectInfo, err error) {
cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(cachePath)
lkctx, err := cLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
ctx = lkctx.Context()
defer cLock.Unlock(lkctx.Cancel)
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadIDDir := path.Join(mpartCachePath, uploadID)
uploadMeta, _, _, uerr := c.statCache(ctx, uploadIDDir)
if uerr != nil {
return oi, errUploadIDNotFound
}
// Case where object not yet cached
// Calculate full object size.
var objectSize int64
// Calculate consolidated actual size.
var objectActualSize int64
var partETags []string
partETags, err = decryptCachePartETags(uploadMeta)
if err != nil {
return oi, err
}
for i, pi := range uploadedParts {
pIdx := cacheObjPartIndex(uploadMeta, pi.PartNumber)
if pIdx == -1 {
invp := InvalidPart{
PartNumber: pi.PartNumber,
GotETag: pi.ETag,
}
return oi, invp
}
pi.ETag = canonicalizeETag(pi.ETag)
if partETags[pIdx] != pi.ETag {
invp := InvalidPart{
PartNumber: pi.PartNumber,
ExpETag: partETags[pIdx],
GotETag: pi.ETag,
}
return oi, invp
}
// All parts except the last part has to be atleast 5MB.
if (i < len(uploadedParts)-1) && !isMinAllowedPartSize(uploadMeta.PartActualSizes[pIdx]) {
return oi, PartTooSmall{
PartNumber: pi.PartNumber,
PartSize: uploadMeta.PartActualSizes[pIdx],
PartETag: pi.ETag,
}
}
// Save for total object size.
objectSize += uploadMeta.PartSizes[pIdx]
// Save the consolidated actual size.
objectActualSize += uploadMeta.PartActualSizes[pIdx]
}
uploadMeta.Stat.Size = objectSize
uploadMeta.Stat.ModTime = roi.ModTime
// if encrypted - make sure ETag updated
uploadMeta.Meta["etag"] = roi.ETag
uploadMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
var cpartETags []string
var cpartNums []int
var cpartSizes, cpartActualSizes []int64
for _, pi := range uploadedParts {
pIdx := cacheObjPartIndex(uploadMeta, pi.PartNumber)
if pIdx != -1 {
cpartETags = append(cpartETags, uploadMeta.PartETags[pIdx])
cpartNums = append(cpartNums, uploadMeta.PartNumbers[pIdx])
cpartSizes = append(cpartSizes, uploadMeta.PartSizes[pIdx])
cpartActualSizes = append(cpartActualSizes, uploadMeta.PartActualSizes[pIdx])
}
}
uploadMeta.PartETags = cpartETags
uploadMeta.PartSizes = cpartSizes
uploadMeta.PartActualSizes = cpartActualSizes
uploadMeta.PartNumbers = cpartNums
uploadMeta.Hits++
metaPath := pathJoin(uploadIDDir, cacheMetaJSONFile)
f, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return oi, err
}
defer f.Close()
jsonSave(f, uploadMeta)
for _, pi := range uploadedParts {
part := fmt.Sprintf("part.%d", pi.PartNumber)
renameAll(pathJoin(uploadIDDir, part), pathJoin(cachePath, part))
}
renameAll(pathJoin(uploadIDDir, cacheMetaJSONFile), pathJoin(cachePath, cacheMetaJSONFile))
removeAll(uploadIDDir) // clean up any unused parts in the uploadIDDir
return uploadMeta.ToObjectInfo(bucket, object), nil
}
func (c *diskCache) AbortUpload(bucket, object, uploadID string) (err error) {
mpartCachePath := getMultipartCacheSHADir(c.dir, bucket, object)
uploadDir := path.Join(mpartCachePath, uploadID)
return removeAll(uploadDir)
}
// cacheObjPartIndex - returns the index of matching object part number.
func cacheObjPartIndex(m *cacheMeta, partNumber int) int {
for i, part := range m.PartNumbers {
if partNumber == part {
return i
}
}
return -1
}
// cacheObjectToPartOffset calculates part index and part offset for requested offset for content on cache.
func cacheObjectToPartOffset(objInfo ObjectInfo, offset int64) (partIndex int, partOffset int64, err error) {
if offset == 0 {
// Special case - if offset is 0, then partIndex and partOffset are always 0.
return 0, 0, nil
}
partOffset = offset
// Seek until object offset maps to a particular part offset.
for i, part := range objInfo.Parts {
partIndex = i
// Offset is smaller than size we have reached the proper part offset.
if partOffset < part.Size {
return partIndex, partOffset, nil
}
// Continue to towards the next part.
partOffset -= part.Size
}
// Offset beyond the size of the object return InvalidRange.
return 0, 0, InvalidRange{}
}
// get path of on-going multipart caching
func getMultipartCacheSHADir(dir, bucket, object string) string {
return pathJoin(dir, minioMetaBucket, cacheMultipartDir, getSHA256Hash([]byte(pathJoin(bucket, object))))
}
// clean up stale cache multipart uploads according to cleanup interval.
func (c *diskCache) cleanupStaleUploads(ctx context.Context) {
if !c.commitWritethrough {
return
}
timer := time.NewTimer(cacheStaleUploadCleanupInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
// Reset for the next interval
timer.Reset(cacheStaleUploadCleanupInterval)
now := time.Now()
readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir), func(shaDir string, typ os.FileMode) error {
return readDirFn(pathJoin(c.dir, minioMetaBucket, cacheMultipartDir, shaDir), func(uploadIDDir string, typ os.FileMode) error {
uploadIDPath := pathJoin(c.dir, minioMetaBucket, cacheMultipartDir, shaDir, uploadIDDir)
fi, err := os.Stat(uploadIDPath)
if err != nil {
return nil
}
if now.Sub(fi.ModTime()) > cacheStaleUploadExpiry {
removeAll(uploadIDPath)
}
return nil
})
})
}
}
}