Allow caching only in gateway mode. (#8232)

This PR changes cache on PUT behavior to background fill the cache
after PutObject completes. This will avoid concurrency issues as in #8219.

Added cleanup of partially filled cache to prevent cache corruption
- Fixes #8208
This commit is contained in:
poornas
2019-09-16 14:24:04 -07:00
committed by kannappanr
parent 208efb843b
commit 76df027264
5 changed files with 42 additions and 112 deletions

View File

@@ -333,28 +333,6 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
return err
}
// updates ETag in cache metadata file to the backend ETag.
func (c *diskCache) updateETag(ctx context.Context, bucket, object string, etag string) error {
metaPath := path.Join(getCacheSHADir(c.dir, bucket, object), cacheMetaJSONFile)
f, err := os.OpenFile(metaPath, os.O_RDWR, 0)
if err != nil {
return err
}
defer f.Close()
meta := &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, meta); err != nil {
return err
}
meta.Meta["etag"] = etag
jsonData, err := json.Marshal(meta)
if err != nil {
return err
}
_, err = f.Write(jsonData)
return err
}
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error {
@@ -418,7 +396,7 @@ func (c *diskCache) bitrotWriteToCache(ctx context.Context, cachePath string, re
bufp := c.pool.Get().(*[]byte)
defer c.pool.Put(bufp)
var n int
var n, n2 int
for {
n, err = io.ReadFull(reader, *bufp)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
@@ -437,10 +415,10 @@ func (c *diskCache) bitrotWriteToCache(ctx context.Context, cachePath string, re
if _, err = f.Write(hashBytes); err != nil {
return 0, err
}
if _, err = f.Write((*bufp)[:n]); err != nil {
if n2, err = f.Write((*bufp)[:n]); err != nil {
return 0, err
}
bytesWritten += int64(n)
bytesWritten += int64(n2)
if eof {
break
}
@@ -521,9 +499,11 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
c.setOnline(false)
}
if err != nil {
removeAll(cachePath)
return err
}
if actualSize != uint64(n) {
removeAll(cachePath)
return IncompleteBody{}
}
return c.saveMetadata(ctx, bucket, object, metadata, n)
@@ -648,7 +628,11 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
filePath := path.Join(cacheObjPath, cacheDataFile)
pr, pw := io.Pipe()
go func() {
pw.CloseWithError(c.bitrotReadFromCache(ctx, filePath, off, length, pw))
err := c.bitrotReadFromCache(ctx, filePath, off, length, pw)
if err != nil {
removeAll(cacheObjPath)
}
pw.CloseWithError(err)
}()
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.

View File

@@ -14,7 +14,6 @@ import (
"github.com/djherbis/atime"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/wildcard"
)
@@ -489,7 +488,6 @@ func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
// PutObject - caches the uploaded object for single Put operations
func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
putObjectFn := c.PutObjectFn
data := r.rawReader
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
@@ -513,52 +511,24 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, opts)
}
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", data.ActualSize(), globalCLIContext.StrictS3Compat)
if err != nil {
return ObjectInfo{}, err
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
oinfoCh := make(chan ObjectInfo)
errCh := make(chan error)
go func() {
oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts)
if perr != nil {
pipeWriter.CloseWithError(perr)
wPipe.CloseWithError(perr)
close(oinfoCh)
errCh <- perr
return
}
close(errCh)
oinfoCh <- oinfo
}()
// get a namespace lock on cache until cache is filled.
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return ObjectInfo{}, err
}
defer cLock.Unlock()
go func() {
if err = dcache.Put(ctx, bucket, object, rPipe, data.Size(), opts); err != nil {
wPipe.CloseWithError(err)
return
}
}()
objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
mwriter := io.MultiWriter(pipeWriter, wPipe)
_, err = io.Copy(mwriter, data)
if err != nil {
err = <-errCh
return objInfo, err
if err == nil {
go func() {
// fill cache in the background
bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
if bErr != nil {
return
}
defer bReader.Close()
oi, err := c.stat(ctx, dcache, bucket, object)
// avoid cache overwrite if another background routine filled cache
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
}
}()
}
pipeWriter.Close()
wPipe.Close()
objInfo = <-oinfoCh
dcache.updateETag(ctx, bucket, object, objInfo.ETag)
return objInfo, err

View File

@@ -120,14 +120,7 @@ EXAMPLES:
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}miniostorage
{{.Prompt}} {{.HelpName}} http://node{1...32}.example.com/mnt/export/{1...32}
6. Start minio server with edge caching enabled.
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1;/mnt/drive2;/mnt/drive3;/mnt/drive4"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*;*.png"
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXPIRY{{.AssignmentOperator}}40
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_MAXUSE{{.AssignmentOperator}}80
{{.Prompt}} {{.HelpName}} /home/shared
7. Start minio server with KMS enabled.
6. Start minio server with KMS enabled.
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_APPROLE_ID{{.AssignmentOperator}}9b56cc08-8258-45d5-24a3-679876769126
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_APPROLE_SECRET{{.AssignmentOperator}}4e30c52f-13e4-a6f5-0763-d50e8cb4321f
{{.Prompt}} {{.EnvVarSetCommand}} MINIO_SSE_VAULT_ENDPOINT{{.AssignmentOperator}}https://vault-endpoint-ip:8200
@@ -339,13 +332,6 @@ func serverMain(ctx *cli.Context) {
// Load logger subsystem
loadLoggers()
var cacheConfig = globalServerConfig.GetCacheConfig()
if len(cacheConfig.Drives) > 0 {
// initialize the new disk cache objects.
globalCacheObjectAPI, err = newServerCacheObjects(context.Background(), cacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
}
// Create new IAM system.
globalIAMSys = NewIAMSys()
if err = globalIAMSys.Init(newObject); err != nil {