Move metadata into ObjectOptions for NewMultipart and PutObject (#7060)

This commit is contained in:
poornas
2019-02-08 21:31:06 -08:00
committed by Nitish Tiwari
parent c1b3f1994b
commit 40b8d11209
44 changed files with 278 additions and 276 deletions

View File

@@ -60,13 +60,13 @@ type cacheObjects struct {
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObjectFn func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObjectFn func(ctx context.Context, bucket, object string) error
ListObjectsFn func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
ListObjectsV2Fn func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListBucketsFn func(ctx context.Context) (buckets []BucketInfo, err error)
GetBucketInfoFn func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
NewMultipartUploadFn func(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error)
NewMultipartUploadFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error)
PutObjectPartFn func(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error)
AbortMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string) error
CompleteMultipartUploadFn func(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
@@ -92,11 +92,11 @@ type CacheObjectLayer interface {
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error)
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string) error
// Multipart operations.
NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error)
NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error)
PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error)
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
@@ -246,8 +246,7 @@ func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
}
go func() {
opts := ObjectOptions{}
putErr := dcache.Put(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), c.getMetadata(bkReader.ObjInfo), opts)
putErr := dcache.Put(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), ObjectOptions{UserDefined: c.getMetadata(bkReader.ObjInfo)})
// close the write end of the pipe, so the error gets
// propagated to getObjReader
pipeWriter.CloseWithError(putErr)
@@ -320,7 +319,9 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star
gerr := GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts)
pipeWriter.CloseWithError(gerr) // Close writer explicitly signaling we wrote all data.
}()
err = dcache.Put(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), c.getMetadata(objInfo), opts)
opts.UserDefined = c.getMetadata(objInfo)
err = dcache.Put(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts)
if err != nil {
return err
}
@@ -644,25 +645,25 @@ func (c cacheObjects) isCacheExclude(bucket, object string) bool {
}
// PutObject - caches the uploaded object for single Put operations
func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
putObjectFn := c.PutObjectFn
data := r.Reader
dcache, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return putObjectFn(ctx, bucket, object, r, metadata, opts)
return putObjectFn(ctx, bucket, object, r, opts)
}
size := r.Size()
// fetch from backend if there is no space on cache drive
if !dcache.diskAvailable(size) {
return putObjectFn(ctx, bucket, object, r, metadata, opts)
return putObjectFn(ctx, bucket, object, r, opts)
}
// fetch from backend if cache exclude pattern or cache-control
// directive set to exclude
if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
if c.isCacheExclude(bucket, object) || filterFromCache(opts.UserDefined) {
dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, metadata, opts)
return putObjectFn(ctx, bucket, object, r, opts)
}
objInfo = ObjectInfo{}
// Initialize pipe to stream data to backend
@@ -680,7 +681,7 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *P
oinfoCh := make(chan ObjectInfo)
errCh := make(chan error)
go func() {
oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), metadata, opts)
oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts)
if perr != nil {
pipeWriter.CloseWithError(perr)
wPipe.CloseWithError(perr)
@@ -693,7 +694,7 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *P
}()
go func() {
if err = dcache.Put(ctx, bucket, object, NewPutObjReader(cHashReader, nil, nil), metadata, opts); err != nil {
if err = dcache.Put(ctx, bucket, object, NewPutObjReader(cHashReader, nil, nil), opts); err != nil {
wPipe.CloseWithError(err)
return
}
@@ -712,25 +713,25 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *P
}
// NewMultipartUpload - Starts a new multipart upload operation to backend and cache.
func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error) {
func (c cacheObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
newMultipartUploadFn := c.NewMultipartUploadFn
if c.isCacheExclude(bucket, object) || filterFromCache(metadata) {
return newMultipartUploadFn(ctx, bucket, object, metadata, opts)
if c.isCacheExclude(bucket, object) || filterFromCache(opts.UserDefined) {
return newMultipartUploadFn(ctx, bucket, object, opts)
}
dcache, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return newMultipartUploadFn(ctx, bucket, object, metadata, opts)
return newMultipartUploadFn(ctx, bucket, object, opts)
}
uploadID, err = newMultipartUploadFn(ctx, bucket, object, metadata, opts)
uploadID, err = newMultipartUploadFn(ctx, bucket, object, opts)
if err != nil {
return
}
// create new multipart upload in cache with same uploadID
dcache.NewMultipartUpload(ctx, bucket, object, metadata, uploadID, opts)
dcache.NewMultipartUpload(ctx, bucket, object, uploadID, opts)
return uploadID, err
}
@@ -971,8 +972,8 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
},
PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, metadata map[string]string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, metadata, opts)
PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
},
DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
return newObjectLayerFn().DeleteObject(ctx, bucket, object)
@@ -989,8 +990,8 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
GetBucketInfoFn: func(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
return newObjectLayerFn().GetBucketInfo(ctx, bucket)
},
NewMultipartUploadFn: func(ctx context.Context, bucket, object string, metadata map[string]string, opts ObjectOptions) (uploadID string, err error) {
return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, metadata, opts)
NewMultipartUploadFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
return newObjectLayerFn().NewMultipartUpload(ctx, bucket, object, opts)
},
PutObjectPartFn: func(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) {
return newObjectLayerFn().PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)