Add ObjectOptions to ObjectLayer calls (#6382)

This commit is contained in:
poornas
2018-09-10 09:42:43 -07:00
committed by kannappanr
parent 30d4a2cf53
commit 5c0b98abf0
62 changed files with 812 additions and 7981 deletions

View File

@@ -57,16 +57,16 @@ type cacheObjects struct {
// file path patterns to exclude from cache
exclude []string
// Object functions pointing to the corresponding functions of backend implementation.
GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error)
PutObjectFn func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error)
GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObjectFn func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObjectFn func(ctx context.Context, bucket, object string) error
ListObjectsFn func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
ListObjectsV2Fn func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListBucketsFn func(ctx context.Context) (buckets []BucketInfo, err error)
GetBucketInfoFn func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
NewMultipartUploadFn func(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error)
PutObjectPartFn func(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error)
NewMultipartUploadFn func(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error)
PutObjectPartFn func(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error)
AbortMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string) error
CompleteMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error)
DeleteBucketFn func(ctx context.Context, bucket string) error
@@ -88,14 +88,14 @@ type CacheObjectLayer interface {
ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
DeleteBucket(ctx context.Context, bucket string) error
// Object operations.
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error)
GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error)
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string) error
// Multipart operations.
NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error)
PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error)
NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error)
PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error)
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error)
@@ -177,20 +177,20 @@ func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string {
// Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also
// stores it in the cache for serving subsequent requests.
func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) {
func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
GetObjectFn := c.GetObjectFn
GetObjectInfoFn := c.GetObjectInfoFn
if c.isCacheExclude(bucket, object) {
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag)
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
// fetch cacheFSObjects if object is currently cached or nearest available cache drive
dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object)
if err != nil {
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag)
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
// stat object on backend
objInfo, err := GetObjectInfoFn(ctx, bucket, object)
objInfo, err := GetObjectInfoFn(ctx, bucket, object, opts)
backendDown := backendDownError(err)
if err != nil && !backendDown {
if _, ok := err.(ObjectNotFound); ok {
@@ -201,27 +201,27 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star
}
if !backendDown && filterFromCache(objInfo.UserDefined) {
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag)
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object)
cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object, opts)
if err == nil {
if backendDown {
// If the backend is down, serve the request from cache.
return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag)
return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
if cachedObjInfo.ETag == objInfo.ETag && !isStaleCache(objInfo) {
return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag)
return dcache.Get(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
dcache.Delete(ctx, bucket, object)
}
if startOffset != 0 || length != objInfo.Size {
// We don't cache partial objects.
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag)
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
if !dcache.diskAvailable(objInfo.Size * cacheSizeMultiplier) {
// cache only objects < 1/100th of disk capacity
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag)
return GetObjectFn(ctx, bucket, object, startOffset, length, writer, etag, opts)
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
@@ -230,13 +230,13 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star
return err
}
go func() {
if err = GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag); err != nil {
if err = GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts); err != nil {
pipeWriter.CloseWithError(err)
return
}
pipeWriter.Close() // Close writer explicitly signaling we wrote all data.
}()
err = dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(objInfo))
err = dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(objInfo), opts)
if err != nil {
return err
}
@@ -245,17 +245,17 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star
}
// Returns ObjectInfo from cache if available.
func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string) (ObjectInfo, error) {
func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
getObjectInfoFn := c.GetObjectInfoFn
if c.isCacheExclude(bucket, object) {
return getObjectInfoFn(ctx, bucket, object)
return getObjectInfoFn(ctx, bucket, object, opts)
}
// fetch cacheFSObjects if object is currently cached or nearest available cache drive
dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object)
if err != nil {
return getObjectInfoFn(ctx, bucket, object)
return getObjectInfoFn(ctx, bucket, object, opts)
}
objInfo, err := getObjectInfoFn(ctx, bucket, object)
objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
if err != nil {
if _, ok := err.(ObjectNotFound); ok {
// Delete the cached entry if backend object was deleted.
@@ -266,14 +266,14 @@ func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string)
return ObjectInfo{}, err
}
// when backend is down, serve from cache.
cachedObjInfo, cerr := dcache.GetObjectInfo(ctx, bucket, object)
cachedObjInfo, cerr := dcache.GetObjectInfo(ctx, bucket, object, opts)
if cerr == nil {
return cachedObjInfo, nil
}
return ObjectInfo{}, BackendDown{}
}
// when backend is up, do a sanity check on cached object
cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object)
cachedObjInfo, err := dcache.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
return objInfo, nil
}
@@ -560,24 +560,24 @@ func (c cacheObjects) isCacheExclude(bucket, object string) bool {
}
// PutObject - caches the uploaded object for single Put operations
func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
putObjectFn := c.PutObjectFn
dcache, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return putObjectFn(ctx, bucket, object, r, metadata)
return putObjectFn(ctx, bucket, object, r, metadata, opts)
}
size := r.Size()
// fetch from backend if there is no space on cache drive
if !dcache.diskAvailable(size * cacheSizeMultiplier) {
return putObjectFn(ctx, bucket, object, r, metadata)
return putObjectFn(ctx, bucket, object, r, metadata, opts)
}
// fetch from backend if cache exclude pattern or cache-control
// directive set to exclude
if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, metadata)
return putObjectFn(ctx, bucket, object, r, metadata, opts)
}
objInfo = ObjectInfo{}
// Initialize pipe to stream data to backend
@@ -595,7 +595,7 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *h
oinfoCh := make(chan ObjectInfo)
errCh := make(chan error)
go func() {
oinfo, perr := putObjectFn(ctx, bucket, object, hashReader, metadata)
oinfo, perr := putObjectFn(ctx, bucket, object, hashReader, metadata, opts)
if perr != nil {
pipeWriter.CloseWithError(perr)
wPipe.CloseWithError(perr)
@@ -608,7 +608,7 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *h
}()
go func() {
if err = dcache.Put(ctx, bucket, object, cHashReader, metadata); err != nil {
if err = dcache.Put(ctx, bucket, object, cHashReader, metadata, opts); err != nil {
wPipe.CloseWithError(err)
return
}
@@ -627,39 +627,39 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *h
}
// NewMultipartUpload - Starts a new multipart upload operation to backend and cache.
func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error) {
newMultipartUploadFn := c.NewMultipartUploadFn
if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
return newMultipartUploadFn(ctx, bucket, object, metadata)
return newMultipartUploadFn(ctx, bucket, object, metadata, opts)
}
dcache, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return newMultipartUploadFn(ctx, bucket, object, metadata)
return newMultipartUploadFn(ctx, bucket, object, metadata, opts)
}
uploadID, err = newMultipartUploadFn(ctx, bucket, object, metadata)
uploadID, err = newMultipartUploadFn(ctx, bucket, object, metadata, opts)
if err != nil {
return
}
// create new multipart upload in cache with same uploadID
dcache.NewMultipartUpload(ctx, bucket, object, metadata, uploadID)
dcache.NewMultipartUpload(ctx, bucket, object, metadata, uploadID, opts)
return uploadID, err
}
// PutObjectPart - uploads part to backend and cache simultaneously.
func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error) {
func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error) {
putObjectPartFn := c.PutObjectPartFn
dcache, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data)
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
}
if c.isCacheExclude(bucket, object) {
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data)
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
}
// make sure cache has at least cacheSizeMultiplier * size available
@@ -669,7 +669,7 @@ func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI
case dcache.purgeChan <- struct{}{}:
default:
}
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data)
return putObjectPartFn(ctx, bucket, object, uploadID, partID, data, opts)
}
info = PartInfo{}
@@ -688,7 +688,7 @@ func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI
pinfoCh := make(chan PartInfo)
errorCh := make(chan error)
go func() {
info, err = putObjectPartFn(ctx, bucket, object, uploadID, partID, hashReader)
info, err = putObjectPartFn(ctx, bucket, object, uploadID, partID, hashReader, opts)
if err != nil {
close(pinfoCh)
pipeWriter.CloseWithError(err)
@@ -700,7 +700,7 @@ func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI
pinfoCh <- info
}()
go func() {
if _, perr := dcache.PutObjectPart(ctx, bucket, object, uploadID, partID, cHashReader); perr != nil {
if _, perr := dcache.PutObjectPart(ctx, bucket, object, uploadID, partID, cHashReader, opts); perr != nil {
wPipe.CloseWithError(perr)
return
}
@@ -876,14 +876,14 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
cache: dcache,
exclude: config.Exclude,
listPool: newTreeWalkPool(globalLookupTimeout),
GetObjectFn: func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error {
return newObjectLayerFn().GetObject(ctx, bucket, object, startOffset, length, writer, etag)
GetObjectFn: func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
return newObjectLayerFn().GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts)
},
GetObjectInfoFn: func(ctx context.Context, bucket, object string) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object)
GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
},
PutObjectFn: func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, metadata)
PutObjectFn: func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, metadata, opts)
},
DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
return newObjectLayerFn().DeleteObject(ctx, bucket, object)
@@ -900,11 +900,11 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
GetBucketInfoFn: func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
return newObjectLayerFn().GetBucketInfo(ctx, bucket)
},
NewMultipartUploadFn: func(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, metadata)
NewMultipartUploadFn: func(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error) {
return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, metadata, opts)
},
PutObjectPartFn: func(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error) {
return newObjectLayerFn().PutObjectPart(ctx, bucket, object, uploadID, partID, data)
PutObjectPartFn: func(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader, opts ObjectOptions) (info PartInfo, err error) {
return newObjectLayerFn().PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
},
AbortMultipartUploadFn: func(ctx context.Context, bucket, object, uploadID string) error {
return newObjectLayerFn().AbortMultipartUpload(ctx, bucket, object, uploadID)