diff --git a/cmd/config/cache/config.go b/cmd/config/cache/config.go index 125860da6..90c504486 100644 --- a/cmd/config/cache/config.go +++ b/cmd/config/cache/config.go @@ -28,16 +28,17 @@ import ( // Config represents cache config settings type Config struct { - Enabled bool `json:"-"` - Drives []string `json:"drives"` - Expiry int `json:"expiry"` - MaxUse int `json:"maxuse"` - Quota int `json:"quota"` - Exclude []string `json:"exclude"` - After int `json:"after"` - WatermarkLow int `json:"watermark_low"` - WatermarkHigh int `json:"watermark_high"` - Range bool `json:"range"` + Enabled bool `json:"-"` + Drives []string `json:"drives"` + Expiry int `json:"expiry"` + MaxUse int `json:"maxuse"` + Quota int `json:"quota"` + Exclude []string `json:"exclude"` + After int `json:"after"` + WatermarkLow int `json:"watermark_low"` + WatermarkHigh int `json:"watermark_high"` + Range bool `json:"range"` + CommitWriteback bool `json:"-"` } // UnmarshalJSON - implements JSON unmarshal interface for unmarshalling @@ -152,3 +153,13 @@ func parseCacheExcludes(excludes string) ([]string, error) { return excludesSlice, nil } + +func parseCacheCommitMode(commitStr string) (bool, error) { + switch strings.ToLower(commitStr) { + case "writeback": + return true, nil + case "writethrough": + return false, nil + } + return false, config.ErrInvalidCacheCommitValue(nil).Msg("cache commit value must be `writeback` or `writethrough`") +} diff --git a/cmd/config/cache/help.go b/cmd/config/cache/help.go index 0a346461c..100b157c6 100644 --- a/cmd/config/cache/help.go +++ b/cmd/config/cache/help.go @@ -74,5 +74,11 @@ var ( Optional: true, Type: "string", }, + config.HelpKV{ + Key: Commit, + Description: `set to control cache commit behavior, defaults to "writethrough"`, + Optional: true, + Type: "string", + }, } ) diff --git a/cmd/config/cache/lookup.go b/cmd/config/cache/lookup.go index d487c55b6..6e8ea6f2e 100644 --- a/cmd/config/cache/lookup.go +++ b/cmd/config/cache/lookup.go @@ -35,6 +35,7 @@ const ( WatermarkLow = "watermark_low" WatermarkHigh = "watermark_high" Range = "range" + Commit = "commit" EnvCacheDrives = "MINIO_CACHE_DRIVES" EnvCacheExclude = "MINIO_CACHE_EXCLUDE" @@ -45,6 +46,7 @@ const ( EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW" EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH" EnvCacheRange = "MINIO_CACHE_RANGE" + EnvCacheCommit = "MINIO_CACHE_COMMIT" EnvCacheEncryptionMasterKey = "MINIO_CACHE_ENCRYPTION_MASTER_KEY" @@ -53,6 +55,7 @@ const ( DefaultAfter = "0" DefaultWaterMarkLow = "70" DefaultWaterMarkHigh = "80" + DefaultCacheCommit = "writethrough" ) // DefaultKVS - default KV settings for caching. @@ -90,6 +93,10 @@ var ( Key: Range, Value: config.EnableOn, }, + config.KV{ + Key: Commit, + Value: DefaultCacheCommit, + }, } ) @@ -210,6 +217,12 @@ func LookupConfig(kvs config.KVS) (Config, error) { } cfg.Range = rng } + if commit := env.Get(EnvCacheCommit, kvs.Get(Commit)); commit != "" { + cfg.CommitWriteback, err = parseCacheCommitMode(commit) + if err != nil { + return cfg, err + } + } return cfg, nil } diff --git a/cmd/config/errors.go b/cmd/config/errors.go index d9210a5a4..e6483cc4b 100644 --- a/cmd/config/errors.go +++ b/cmd/config/errors.go @@ -102,6 +102,12 @@ var ( "MINIO_CACHE_RANGE: Valid expected value is `on` or `off`", ) + ErrInvalidCacheCommitValue = newErrFn( + "Invalid cache commit value", + "Please check the passed value", + "MINIO_CACHE_COMMIT: Valid expected value is `writeback` or `writethrough`", + ) + ErrInvalidRotatingCredentialsBackendEncrypted = newErrFn( "Invalid rotating credentials", "Please set correct rotating credentials in the environment for decryption", diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 856891227..a94a7cc71 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -19,7 +19,9 @@ package cmd import ( "bytes" "context" + "crypto/md5" "crypto/rand" + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -70,7 +72,9 @@ type cacheMeta struct { // Ranges maps cached range to associated filename. Ranges map[string]string `json:"ranges,omitempty"` // Hits is a counter on the number of times this object has been accessed so far. - Hits int `json:"hits,omitempty"` + Hits int `json:"hits,omitempty"` + Bucket string `json:"bucket,omitempty"` + Object string `json:"object,omitempty"` } // RangeInfo has the range, file and range length information for a cached range. @@ -130,15 +134,17 @@ type diskCache struct { online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG purgeRunning int32 - triggerGC chan struct{} - dir string // caching directory - stats CacheDiskStats // disk cache stats for prometheus - quotaPct int // max usage in % - pool sync.Pool - after int // minimum accesses before an object is cached. - lowWatermark int - highWatermark int - enableRange bool + triggerGC chan struct{} + dir string // caching directory + stats CacheDiskStats // disk cache stats for prometheus + quotaPct int // max usage in % + pool sync.Pool + after int // minimum accesses before an object is cached. + lowWatermark int + highWatermark int + enableRange bool + commitWriteback bool + retryWritebackCh chan ObjectInfo // nsMutex namespace lock nsMutex *nsLockMap // Object functions pointing to the corresponding functions of backend implementation. @@ -156,15 +162,17 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err) } cache := diskCache{ - dir: dir, - triggerGC: make(chan struct{}, 1), - stats: CacheDiskStats{Dir: dir}, - quotaPct: quotaPct, - after: config.After, - lowWatermark: config.WatermarkLow, - highWatermark: config.WatermarkHigh, - enableRange: config.Range, - online: 1, + dir: dir, + triggerGC: make(chan struct{}, 1), + stats: CacheDiskStats{Dir: dir}, + quotaPct: quotaPct, + after: config.After, + lowWatermark: config.WatermarkLow, + highWatermark: config.WatermarkHigh, + enableRange: config.Range, + commitWriteback: config.CommitWriteback, + retryWritebackCh: make(chan ObjectInfo, 10000), + online: 1, pool: sync.Pool{ New: func() interface{} { b := disk.AlignedBlock(int(cacheBlkSize)) @@ -174,6 +182,9 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa nsMutex: newNSLock(false), } go cache.purgeWait(ctx) + if cache.commitWriteback { + go cache.scanCacheWritebackFailures(ctx) + } cache.diskSpaceAvailable(0) // update if cache usage is already high. cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker { return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "") @@ -323,6 +334,12 @@ func (c *diskCache) purge(ctx context.Context) { // stat all cached file ranges and cacheDataFile. cachedFiles := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, name)) objInfo := meta.ToObjectInfo("", "") + // prevent gc from clearing un-synced commits. This metadata is present when + // cache writeback commit setting is enabled. + status, ok := objInfo.UserDefined[writeBackStatusHeader] + if ok && status != CommitComplete.String() { + return nil + } cc := cacheControlOpts(objInfo) for fname, fi := range cachedFiles { if cc != nil { @@ -524,7 +541,11 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met } defer f.Close() - m := &cacheMeta{Version: cacheMetaVersion} + m := &cacheMeta{ + Version: cacheMetaVersion, + Bucket: bucket, + Object: object, + } if err := jsonLoad(f, m); err != nil && err != io.EOF { return err } @@ -561,7 +582,6 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met m.Meta["etag"] = etag } } - m.Hits++ m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} @@ -573,22 +593,22 @@ func getCacheSHADir(dir, bucket, object string) string { } // Cache data to disk with bitrot checksum added for each block of 1MB -func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, error) { +func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) { if err := os.MkdirAll(cachePath, 0777); err != nil { - return 0, err + return 0, "", err } filePath := pathJoin(cachePath, fileName) if filePath == "" || reader == nil { - return 0, errInvalidArgument + return 0, "", errInvalidArgument } if err := checkPathLength(filePath); err != nil { - return 0, err + return 0, "", err } f, err := os.Create(filePath) if err != nil { - return 0, osErrToFileErr(err) + return 0, "", osErrToFileErr(err) } defer f.Close() @@ -598,12 +618,12 @@ func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Rea bufp := c.pool.Get().(*[]byte) defer c.pool.Put(bufp) - + md5Hash := md5.New() var n, n2 int for { n, err = io.ReadFull(reader, *bufp) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - return 0, err + return 0, "", err } eof := err == io.EOF || err == io.ErrUnexpectedEOF if n == 0 && size != 0 { @@ -612,21 +632,28 @@ func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Rea } h.Reset() if _, err = h.Write((*bufp)[:n]); err != nil { - return 0, err + return 0, "", err } hashBytes := h.Sum(nil) + // compute md5Hash of original data stream if writeback commit to cache + if c.commitWriteback { + if _, err = md5Hash.Write((*bufp)[:n]); err != nil { + return 0, "", err + } + } if _, err = f.Write(hashBytes); err != nil { - return 0, err + return 0, "", err } if n2, err = f.Write((*bufp)[:n]); err != nil { - return 0, err + return 0, "", err } bytesWritten += int64(n2) if eof { break } } - return bytesWritten, nil + + return bytesWritten, base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)), nil } func newCacheEncryptReader(content io.Reader, bucket, object string, metadata map[string]string) (r io.Reader, err error) { @@ -663,41 +690,41 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string) } // Caches the object to disk -func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) error { +func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) (oi ObjectInfo, err error) { if !c.diskSpaceAvailable(size) { io.Copy(ioutil.Discard, data) - return errDiskFull + return oi, errDiskFull } cachePath := getCacheSHADir(c.dir, bucket, object) cLock := c.NewNSLockFn(ctx, cachePath) if err := cLock.GetLock(globalOperationTimeout); err != nil { - return err + return oi, err } defer cLock.Unlock() meta, _, numHits, err := c.statCache(ctx, cachePath) // Case where object not yet cached if os.IsNotExist(err) && c.after >= 1 { - return c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) + return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) } // Case where object already has a cache metadata entry but not yet cached if err == nil && numHits < c.after { cETag := extractETag(meta.Meta) bETag := extractETag(opts.UserDefined) if cETag == bETag { - return c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) + return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) } incHitsOnly = true } if rs != nil { - return c.putRange(ctx, bucket, object, data, size, rs, opts) + return oi, c.putRange(ctx, bucket, object, data, size, rs, opts) } if !c.diskSpaceAvailable(size) { - return errDiskFull + return oi, errDiskFull } if err := os.MkdirAll(cachePath, 0777); err != nil { - return err + return oi, err } var metadata = cloneMSS(opts.UserDefined) var reader = data @@ -705,25 +732,39 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read if globalCacheKMS != nil { reader, err = newCacheEncryptReader(data, bucket, object, metadata) if err != nil { - return err + return oi, err } actualSize, _ = sio.EncryptedSize(uint64(size)) } - n, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize) + n, md5sum, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize) if IsErr(err, baseErrs...) { // take the cache drive offline c.setOffline() } if err != nil { removeAll(cachePath) - return err + return oi, err } if actualSize != uint64(n) { removeAll(cachePath) - return IncompleteBody{Bucket: bucket, Object: object} + return oi, IncompleteBody{Bucket: bucket, Object: object} } - return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly) + if c.commitWriteback { + metadata["content-md5"] = md5sum + if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil { + metadata["etag"] = hex.EncodeToString(md5bytes) + } + metadata[writeBackStatusHeader] = CommitPending.String() + } + return ObjectInfo{ + Bucket: bucket, + Name: object, + ETag: metadata["etag"], + Size: n, + UserDefined: metadata, + }, + c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly) } // Caches the range to disk @@ -754,7 +795,7 @@ func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io } cacheFile := MustGetUUID() - n, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize) + n, _, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize) if IsErr(err, baseErrs...) { // take the cache drive offline c.setOffline() @@ -954,3 +995,36 @@ func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool { } return true } + +// queues writeback upload failures on server startup +func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) { + defer close(c.retryWritebackCh) + filterFn := func(name string, typ os.FileMode) error { + if name == minioMetaBucket { + // Proceed to next file. + return nil + } + cacheDir := pathJoin(c.dir, name) + meta, _, _, err := c.statCachedMeta(ctx, cacheDir) + if err != nil { + return nil + } + + objInfo := meta.ToObjectInfo("", "") + status, ok := objInfo.UserDefined[writeBackStatusHeader] + if !ok || status == CommitComplete.String() { + return nil + } + select { + case c.retryWritebackCh <- objInfo: + default: + } + + return nil + } + + if err := readDirFilterFn(c.dir, filterFn); err != nil { + logger.LogIf(ctx, err) + return + } +} diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 3696d8838..97f8b18f7 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -22,24 +22,47 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "sync" "sync/atomic" "time" "github.com/minio/minio/cmd/config/cache" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" objectlock "github.com/minio/minio/pkg/bucket/object/lock" "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/wildcard" ) const ( - cacheBlkSize = int64(1 * 1024 * 1024) - cacheGCInterval = time.Minute * 30 + cacheBlkSize = 1 << 20 + cacheGCInterval = time.Minute * 30 + writeBackStatusHeader = ReservedMetadataPrefixLower + "write-back-status" + writeBackRetryHeader = ReservedMetadataPrefixLower + "write-back-retry" ) +type cacheCommitStatus string + +const ( + // CommitPending - cache writeback with backend is pending. + CommitPending cacheCommitStatus = "pending" + + // CommitComplete - cache writeback completed ok. + CommitComplete cacheCommitStatus = "complete" + + // CommitFailed - cache writeback needs a retry. + CommitFailed cacheCommitStatus = "failed" +) + +// String returns string representation of status +func (s cacheCommitStatus) String() string { + return string(s) +} + // CacheStorageInfo - represents total, free capacity of // underlying cache storage. type CacheStorageInfo struct { @@ -69,11 +92,14 @@ type cacheObjects struct { exclude []string // number of accesses after which to cache an object after int + // commit objects in async manner + commitWriteback bool // if true migration is in progress from v1 to v2 migrating bool // mutex to protect migration bool migMutex sync.Mutex - + // retry queue for writeback cache mode to reattempt upload to backend + wbRetryCh chan ObjectInfo // Cache stats cacheStats *CacheStats @@ -333,7 +359,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string teeReader := io.TeeReader(bkReader, pipeWriter) userDefined := getMetadata(bkReader.ObjInfo) go func() { - putErr := dcache.Put(ctx, bucket, object, + _, putErr := dcache.Put(ctx, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, rs, ObjectOptions{ UserDefined: userDefined, @@ -629,7 +655,14 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * dcache.Delete(ctx, bucket, object) return putObjectFn(ctx, bucket, object, r, opts) } - + if c.commitWriteback { + oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false) + if err != nil { + return ObjectInfo{}, err + } + go c.uploadObject(GlobalContext, oi) + return oi, nil + } objInfo, err = putObjectFn(ctx, bucket, object, r, opts) if err == nil { @@ -647,9 +680,68 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * } }() } - return objInfo, err +} +// upload cached object to backend in async commit mode. +func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) { + dcache, err := c.getCacheToLoc(ctx, oi.Bucket, oi.Name) + if err != nil { + // disk cache could not be located. + logger.LogIf(ctx, fmt.Errorf("Could not upload %s/%s to backend: %w", oi.Bucket, oi.Name, err)) + return + } + cReader, _, bErr := dcache.Get(ctx, oi.Bucket, oi.Name, nil, http.Header{}, ObjectOptions{}) + if bErr != nil { + return + } + defer cReader.Close() + + if cReader.ObjInfo.ETag != oi.ETag { + return + } + st := cacheCommitStatus(oi.UserDefined[writeBackStatusHeader]) + if st == CommitComplete || st.String() == "" { + return + } + hashReader, err := hash.NewReader(cReader, oi.Size, "", "", oi.Size, globalCLIContext.StrictS3Compat) + if err != nil { + return + } + var opts ObjectOptions + opts.UserDefined = make(map[string]string) + opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"] + objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader, nil, nil), opts) + wbCommitStatus := CommitComplete + if err != nil { + wbCommitStatus = CommitFailed + } + + meta := cloneMSS(cReader.ObjInfo.UserDefined) + retryCnt := 0 + if wbCommitStatus == CommitFailed { + retryCnt, _ = strconv.Atoi(meta[writeBackRetryHeader]) + retryCnt++ + meta[writeBackRetryHeader] = strconv.Itoa(retryCnt) + } else { + delete(meta, writeBackRetryHeader) + } + meta[writeBackStatusHeader] = wbCommitStatus.String() + meta["etag"] = oi.ETag + dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, objInfo.Size, nil, "", false) + if retryCnt > 0 { + // slow down retries + time.Sleep(time.Second * time.Duration(retryCnt%10+1)) + c.queueWritebackRetry(oi) + } +} + +func (c *cacheObjects) queueWritebackRetry(oi ObjectInfo) { + select { + case c.wbRetryCh <- oi: + c.uploadObject(GlobalContext, oi) + default: + } } // Returns cacheObjects for use by Server. @@ -660,12 +752,13 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec return nil, err } c := &cacheObjects{ - cache: cache, - exclude: config.Exclude, - after: config.After, - migrating: migrateSw, - migMutex: sync.Mutex{}, - cacheStats: newCacheStats(), + cache: cache, + exclude: config.Exclude, + after: config.After, + migrating: migrateSw, + migMutex: sync.Mutex{}, + commitWriteback: config.CommitWriteback, + cacheStats: newCacheStats(), InnerGetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts) }, @@ -699,6 +792,15 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec go c.migrateCacheFromV1toV2(ctx) } go c.gc(ctx) + if c.commitWriteback { + c.wbRetryCh = make(chan ObjectInfo, 10000) + go func() { + <-GlobalContext.Done() + close(c.wbRetryCh) + }() + go c.queuePendingWriteback(ctx) + } + return c, nil } @@ -724,3 +826,25 @@ func (c *cacheObjects) gc(ctx context.Context) { } } } + +// queues any pending or failed async commits when server restarts +func (c *cacheObjects) queuePendingWriteback(ctx context.Context) { + for _, dcache := range c.cache { + if dcache != nil { + for { + select { + case <-ctx.Done(): + return + case oi, ok := <-dcache.retryWritebackCh: + if !ok { + goto next + } + c.queueWritebackRetry(oi) + default: + time.Sleep(time.Second * 1) + } + } + next: + } + } +} diff --git a/cmd/format-disk-cache.go b/cmd/format-disk-cache.go index e34e7e387..5b064bcf2 100644 --- a/cmd/format-disk-cache.go +++ b/cmd/format-disk-cache.go @@ -365,7 +365,7 @@ func migrateCacheData(ctx context.Context, c *diskCache, bucket, object, oldfile } actualSize, _ = sio.EncryptedSize(uint64(st.Size())) } - _, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, actualSize) + _, _, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, actualSize) return err }