mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
cache: in writeback mode skip etag verification (#13781)
if the commit is still in pending or failed status This PR also does some minor code cleanup
This commit is contained in:
parent
4f3290309e
commit
d21466f595
@ -808,10 +808,6 @@ func (c *diskCache) GetLockContext(ctx context.Context, bucket, object string) (
|
|||||||
|
|
||||||
// Caches the object to disk
|
// Caches the object to disk
|
||||||
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
|
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly, writeback bool) (oi ObjectInfo, err error) {
|
||||||
if !c.diskSpaceAvailable(size) {
|
|
||||||
io.Copy(ioutil.Discard, data)
|
|
||||||
return oi, errDiskFull
|
|
||||||
}
|
|
||||||
cLock, lkctx, err := c.GetLockContext(ctx, bucket, object)
|
cLock, lkctx, err := c.GetLockContext(ctx, bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, err
|
return oi, err
|
||||||
|
@ -581,3 +581,14 @@ func (t *multiWriter) Write(p []byte) (n int, err error) {
|
|||||||
func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer {
|
func cacheMultiWriter(w1 io.Writer, w2 *io.PipeWriter) io.Writer {
|
||||||
return &multiWriter{backendWriter: w1, cacheWriter: w2}
|
return &multiWriter{backendWriter: w1, cacheWriter: w2}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// skipETagVerification returns true if writeback commit is not complete
|
||||||
|
func skipETagVerification(m map[string]string) bool {
|
||||||
|
if v, ok := m[writeBackStatusHeader]; ok {
|
||||||
|
switch cacheCommitStatus(v) {
|
||||||
|
case CommitPending, CommitFailed:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -276,6 +276,10 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
|||||||
bReader.ObjInfo.CacheStatus = CacheMiss
|
bReader.ObjInfo.CacheStatus = CacheMiss
|
||||||
return bReader, err
|
return bReader, err
|
||||||
}
|
}
|
||||||
|
// serve cached content without ETag verification if writeback commit is not yet complete
|
||||||
|
if skipETagVerification(cacheReader.ObjInfo.UserDefined) {
|
||||||
|
return cacheReader, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo, err := c.InnerGetObjectInfoFn(ctx, bucket, object, opts)
|
objInfo, err := c.InnerGetObjectInfoFn(ctx, bucket, object, opts)
|
||||||
@ -422,6 +426,10 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
|
|||||||
c.cacheStats.incHit()
|
c.cacheStats.incHit()
|
||||||
return cachedObjInfo, nil
|
return cachedObjInfo, nil
|
||||||
}
|
}
|
||||||
|
// serve cache metadata without ETag verification if writeback commit is not yet complete
|
||||||
|
if skipETagVerification(cachedObjInfo.UserDefined) {
|
||||||
|
return cachedObjInfo, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
|
objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
|
||||||
@ -811,6 +819,8 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
|
|||||||
|
|
||||||
func (c *cacheObjects) queueWritebackRetry(oi ObjectInfo) {
|
func (c *cacheObjects) queueWritebackRetry(oi ObjectInfo) {
|
||||||
select {
|
select {
|
||||||
|
case <-GlobalContext.Done():
|
||||||
|
return
|
||||||
case c.wbRetryCh <- oi:
|
case c.wbRetryCh <- oi:
|
||||||
c.uploadObject(GlobalContext, oi)
|
c.uploadObject(GlobalContext, oi)
|
||||||
default:
|
default:
|
||||||
|
Loading…
Reference in New Issue
Block a user