mirror of
https://github.com/minio/minio.git
synced 2025-11-29 05:19:03 -05:00
Add GetObjectNInfo to object layer (#6449)
The new call combines GetObjectInfo and GetObject, and returns an object with a ReadCloser interface. Also adds a number of end-to-end encryption tests at the handler level.
This commit is contained in:
committed by
Harshavardhana
parent
7d0645fb3a
commit
36e51d0cee
@@ -31,6 +31,7 @@ import (
|
||||
|
||||
"github.com/djherbis/atime"
|
||||
|
||||
"github.com/minio/minio/cmd/crypto"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/minio/minio/pkg/wildcard"
|
||||
@@ -57,6 +58,7 @@ type cacheObjects struct {
|
||||
// file path patterns to exclude from cache
|
||||
exclude []string
|
||||
// Object functions pointing to the corresponding functions of backend implementation.
|
||||
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error)
|
||||
GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
|
||||
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
PutObjectFn func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
@@ -88,6 +90,7 @@ type CacheObjectLayer interface {
|
||||
ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
|
||||
DeleteBucket(ctx context.Context, bucket string) error
|
||||
// Object operations.
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error)
|
||||
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
|
||||
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
@@ -103,6 +106,11 @@ type CacheObjectLayer interface {
|
||||
StorageInfo(ctx context.Context) CacheStorageInfo
|
||||
}
|
||||
|
||||
// IsCacheable returns if the object should be saved in the cache.
|
||||
func (o ObjectInfo) IsCacheable() bool {
|
||||
return !crypto.IsEncrypted(o.UserDefined)
|
||||
}
|
||||
|
||||
// backendDownError returns true if err is due to backend failure or faulty disk if in server mode
|
||||
func backendDownError(err error) bool {
|
||||
_, backendDown := err.(BackendDown)
|
||||
@@ -175,6 +183,86 @@ func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string {
|
||||
return metadata
|
||||
}
|
||||
|
||||
func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header) (gr *GetObjectReader, err error) {
|
||||
|
||||
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h)
|
||||
|
||||
if c.isCacheExclude(bucket, object) || !bkReader.ObjInfo.IsCacheable() {
|
||||
return bkReader, bkErr
|
||||
}
|
||||
|
||||
// fetch cacheFSObjects if object is currently cached or nearest available cache drive
|
||||
dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object)
|
||||
if err != nil {
|
||||
return bkReader, bkErr
|
||||
}
|
||||
|
||||
backendDown := backendDownError(bkErr)
|
||||
if bkErr != nil && !backendDown {
|
||||
if _, ok := err.(ObjectNotFound); ok {
|
||||
// Delete the cached entry if backend object was deleted.
|
||||
dcache.Delete(ctx, bucket, object)
|
||||
}
|
||||
return nil, bkErr
|
||||
}
|
||||
|
||||
if !backendDown && filterFromCache(bkReader.ObjInfo.UserDefined) {
|
||||
return bkReader, bkErr
|
||||
}
|
||||
|
||||
if cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs, h); cacheErr == nil {
|
||||
if backendDown {
|
||||
// If the backend is down, serve the request from cache.
|
||||
return cacheReader, nil
|
||||
}
|
||||
|
||||
if cacheReader.ObjInfo.ETag == bkReader.ObjInfo.ETag && !isStaleCache(bkReader.ObjInfo) {
|
||||
// Object is not stale, so serve from cache
|
||||
return cacheReader, nil
|
||||
}
|
||||
|
||||
// Object is stale, so delete from cache
|
||||
dcache.Delete(ctx, bucket, object)
|
||||
}
|
||||
|
||||
// Since we got here, we are serving the request from backend,
|
||||
// and also adding the object to the cache.
|
||||
|
||||
if rs != nil {
|
||||
// We don't cache partial objects.
|
||||
return bkReader, bkErr
|
||||
}
|
||||
if !dcache.diskAvailable(bkReader.ObjInfo.Size * cacheSizeMultiplier) {
|
||||
// cache only objects < 1/100th of disk capacity
|
||||
return bkReader, bkErr
|
||||
}
|
||||
|
||||
if bkErr != nil {
|
||||
return nil, bkErr
|
||||
}
|
||||
|
||||
// Initialize pipe.
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
teeReader := io.TeeReader(bkReader, pipeWriter)
|
||||
hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "")
|
||||
if herr != nil {
|
||||
bkReader.Close()
|
||||
return nil, herr
|
||||
}
|
||||
|
||||
go func() {
|
||||
opts := ObjectOptions{}
|
||||
putErr := dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(bkReader.ObjInfo), opts)
|
||||
// close the write end of the pipe, so the error gets
|
||||
// propagated to getObjReader
|
||||
pipeWriter.CloseWithError(putErr)
|
||||
}()
|
||||
|
||||
cleanupBackend := func() { bkReader.Close() }
|
||||
gr = NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, cleanupBackend)
|
||||
return gr, nil
|
||||
}
|
||||
|
||||
// Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also
|
||||
// stores it in the cache for serving subsequent requests.
|
||||
func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
|
||||
|
||||
Reference in New Issue
Block a user