mirror of
https://github.com/minio/minio.git
synced 2025-03-02 23:09:13 -05:00
Change CopyObject{Part} to single srcInfo argument (#5553)
Refactor such that metadata and etag are combined to a single argument `srcInfo`. This is a precursor change for #5544 making it easier for us to provide encryption/decryption functions.
This commit is contained in:
parent
a00e052606
commit
0ea54c9858
@ -239,7 +239,7 @@ func (fs *FSObjects) NewMultipartUpload(bucket, object string, meta map[string]s
|
||||
// object. Internally incoming data is written to '.minio.sys/tmp' location
|
||||
// and safely renamed to '.minio.sys/multipart' for reach parts.
|
||||
func (fs *FSObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int,
|
||||
startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, e error) {
|
||||
startOffset int64, length int64, srcInfo ObjectInfo) (pi PartInfo, e error) {
|
||||
|
||||
if err := checkNewMultipartArgs(srcBucket, srcObject, fs); err != nil {
|
||||
return pi, toObjectErr(errors.Trace(err))
|
||||
@ -249,7 +249,7 @@ func (fs *FSObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject,
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
|
||||
go func() {
|
||||
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcEtag); gerr != nil {
|
||||
if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil {
|
||||
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
|
||||
pipeWriter.CloseWithError(gerr)
|
||||
return
|
||||
|
17
cmd/fs-v1.go
17
cmd/fs-v1.go
@ -360,7 +360,7 @@ func (fs *FSObjects) DeleteBucket(bucket string) error {
|
||||
// CopyObject - copy object source object to destination object.
|
||||
// if source object and destination object are same we only
|
||||
// update metadata.
|
||||
func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) {
|
||||
func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo) (oi ObjectInfo, e error) {
|
||||
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject
|
||||
// Hold write lock on destination since in both cases
|
||||
// - if source and destination are same
|
||||
@ -392,15 +392,6 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
if srcEtag != "" {
|
||||
etag, perr := fs.getObjectETag(srcBucket, srcObject)
|
||||
if perr != nil {
|
||||
return oi, toObjectErr(perr, srcBucket, srcObject)
|
||||
}
|
||||
if etag != srcEtag {
|
||||
return oi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this request is only metadata update.
|
||||
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
|
||||
@ -416,7 +407,7 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
|
||||
|
||||
// Save objects' metadata in `fs.json`.
|
||||
fsMeta := newFSMetaV1()
|
||||
fsMeta.Meta = metadata
|
||||
fsMeta.Meta = srcInfo.UserDefined
|
||||
if _, err = fsMeta.WriteTo(wlk); err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
@ -433,7 +424,7 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
|
||||
|
||||
go func() {
|
||||
var startOffset int64 // Read the whole file.
|
||||
if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil {
|
||||
if gerr := fs.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil {
|
||||
errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject)
|
||||
pipeWriter.CloseWithError(gerr)
|
||||
return
|
||||
@ -446,7 +437,7 @@ func (fs *FSObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
|
||||
return oi, toObjectErr(err, dstBucket, dstObject)
|
||||
}
|
||||
|
||||
objInfo, err := fs.putObject(dstBucket, dstObject, hashReader, metadata)
|
||||
objInfo, err := fs.putObject(dstBucket, dstObject, hashReader, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, dstBucket, dstObject)
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func (a GatewayUnsupported) NewMultipartUpload(bucket string, object string, met
|
||||
}
|
||||
|
||||
// CopyObjectPart copy part of object to uploadID for another object
|
||||
func (a GatewayUnsupported) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string, partID int, startOffset, length int64, metadata map[string]string, srcETag string) (pi PartInfo, err error) {
|
||||
func (a GatewayUnsupported) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string, partID int, startOffset, length int64, srcInfo ObjectInfo) (pi PartInfo, err error) {
|
||||
return pi, errors.Trace(NotImplemented{})
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ func (a GatewayUnsupported) ListObjectsHeal(bucket, prefix, marker, delimiter st
|
||||
|
||||
// CopyObject copies a blob from source container to destination container.
|
||||
func (a GatewayUnsupported) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string,
|
||||
metadata map[string]string, srcEtag string) (objInfo ObjectInfo, err error) {
|
||||
srcInfo ObjectInfo) (objInfo ObjectInfo, err error) {
|
||||
return objInfo, errors.Trace(NotImplemented{})
|
||||
}
|
||||
|
||||
|
@ -625,10 +625,10 @@ func (a *azureObjects) PutObject(bucket, object string, data *hash.Reader, metad
|
||||
|
||||
// CopyObject - Copies a blob from source container to destination container.
|
||||
// Uses Azure equivalent CopyBlob API.
|
||||
func (a *azureObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) {
|
||||
func (a *azureObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
|
||||
srcBlobURL := a.client.GetContainerReference(srcBucket).GetBlobReference(srcObject).GetURL()
|
||||
destBlob := a.client.GetContainerReference(destBucket).GetBlobReference(destObject)
|
||||
azureMeta, props, err := s3MetaToAzureProperties(metadata)
|
||||
azureMeta, props, err := s3MetaToAzureProperties(srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
return objInfo, azureToObjectError(err, srcBucket, srcObject)
|
||||
}
|
||||
|
@ -516,12 +516,6 @@ func (l *b2Objects) PutObject(bucket string, object string, data *h2.Reader, met
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CopyObject copies a blob from source container to destination container.
|
||||
func (l *b2Objects) CopyObject(srcBucket string, srcObject string, dstBucket string,
|
||||
dstObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) {
|
||||
return objInfo, errors.Trace(minio.NotImplemented{})
|
||||
}
|
||||
|
||||
// DeleteObject deletes a blob in bucket
|
||||
func (l *b2Objects) DeleteObject(bucket string, object string) error {
|
||||
bkt, err := l.Bucket(bucket)
|
||||
|
@ -801,13 +801,13 @@ func (l *gcsGateway) PutObject(bucket string, key string, data *hash.Reader, met
|
||||
|
||||
// CopyObject - Copies a blob from source container to destination container.
|
||||
func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string,
|
||||
metadata map[string]string, srcEtag string) (minio.ObjectInfo, error) {
|
||||
srcInfo minio.ObjectInfo) (minio.ObjectInfo, error) {
|
||||
|
||||
src := l.client.Bucket(srcBucket).Object(srcObject)
|
||||
dst := l.client.Bucket(destBucket).Object(destObject)
|
||||
|
||||
copier := dst.CopierFrom(src)
|
||||
copier.ObjectAttrs.Metadata = metadata
|
||||
copier.ObjectAttrs.Metadata = srcInfo.UserDefined
|
||||
|
||||
attrs, err := copier.Run(l.ctx)
|
||||
if err != nil {
|
||||
|
@ -563,7 +563,7 @@ func (t *tritonObjects) PutObject(bucket, object string, data *hash.Reader, meta
|
||||
// Uses Manta Snaplinks API.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#PutSnapLink
|
||||
func (t *tritonObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) {
|
||||
func (t *tritonObjects) CopyObject(srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
|
||||
ctx := context.Background()
|
||||
if err = t.client.SnapLinks().Put(ctx, &storage.PutSnapLinkInput{
|
||||
SourcePath: path.Join(mantaRoot, srcBucket, srcObject),
|
||||
|
@ -611,13 +611,13 @@ func (l *ossObjects) PutObject(bucket, object string, data *hash.Reader, metadat
|
||||
}
|
||||
|
||||
// CopyObject copies an object from source bucket to a destination bucket.
|
||||
func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) {
|
||||
func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
|
||||
bkt, err := l.Client.Bucket(srcBucket)
|
||||
if err != nil {
|
||||
return objInfo, ossToObjectError(errors.Trace(err), srcBucket, srcObject)
|
||||
}
|
||||
|
||||
opts := make([]oss.Option, 0, len(metadata)+1)
|
||||
opts := make([]oss.Option, 0, len(srcInfo.UserDefined)+1)
|
||||
// Set this header such that following CopyObject() always sets the right metadata on the destination.
|
||||
// metadata input is already a trickled down value from interpreting x-oss-metadata-directive at
|
||||
// handler layer. So what we have right now is supposed to be applied on the destination object anyways.
|
||||
@ -625,7 +625,7 @@ func (l *ossObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject strin
|
||||
opts = append(opts, oss.MetadataDirective(oss.MetaReplace))
|
||||
|
||||
// Build OSS metadata
|
||||
opts, err = appendS3MetaToOSSOptions(opts, metadata)
|
||||
opts, err = appendS3MetaToOSSOptions(opts, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
return objInfo, ossToObjectError(err, srcBucket, srcObject)
|
||||
}
|
||||
@ -797,7 +797,7 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par
|
||||
// CopyObjectPart creates a part in a multipart upload by copying
|
||||
// existing object or a part of it.
|
||||
func (l *ossObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string,
|
||||
partID int, startOffset, length int64, metadata map[string]string, srcEtag string) (p minio.PartInfo, err error) {
|
||||
partID int, startOffset, length int64, srcInfo minio.ObjectInfo) (p minio.PartInfo, err error) {
|
||||
|
||||
bkt, err := l.Client.Bucket(destBucket)
|
||||
if err != nil {
|
||||
@ -805,7 +805,7 @@ func (l *ossObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject
|
||||
}
|
||||
|
||||
// Build OSS metadata
|
||||
opts, err := appendS3MetaToOSSOptions(nil, metadata)
|
||||
opts, err := appendS3MetaToOSSOptions(nil, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
return p, ossToObjectError(err, srcBucket, srcObject)
|
||||
}
|
||||
|
@ -290,13 +290,14 @@ func (l *s3Objects) PutObject(bucket string, object string, data *hash.Reader, m
|
||||
}
|
||||
|
||||
// CopyObject copies an object from source bucket to a destination bucket.
|
||||
func (l *s3Objects) CopyObject(srcBucket string, srcObject string, dstBucket string, dstObject string, metadata map[string]string, srcEtag string) (objInfo minio.ObjectInfo, err error) {
|
||||
func (l *s3Objects) CopyObject(srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo) (objInfo minio.ObjectInfo, err error) {
|
||||
// Set this header such that following CopyObject() always sets the right metadata on the destination.
|
||||
// metadata input is already a trickled down value from interpreting x-amz-metadata-directive at
|
||||
// handler layer. So what we have right now is supposed to be applied on the destination object anyways.
|
||||
// So preserve it by adding "REPLACE" directive to save all the metadata set by CopyObject API.
|
||||
metadata["x-amz-metadata-directive"] = "REPLACE"
|
||||
if _, err = l.Client.CopyObject(srcBucket, srcObject, dstBucket, dstObject, metadata); err != nil {
|
||||
srcInfo.UserDefined["x-amz-metadata-directive"] = "REPLACE"
|
||||
srcInfo.UserDefined["x-amz-copy-source-if-match"] = srcInfo.ETag
|
||||
if _, err = l.Client.CopyObject(srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined); err != nil {
|
||||
return objInfo, minio.ErrorRespToObjectError(errors.Trace(err), srcBucket, srcObject)
|
||||
}
|
||||
return l.GetObjectInfo(dstBucket, dstObject)
|
||||
@ -346,10 +347,13 @@ func (l *s3Objects) PutObjectPart(bucket string, object string, uploadID string,
|
||||
// CopyObjectPart creates a part in a multipart upload by copying
|
||||
// existing object or a part of it.
|
||||
func (l *s3Objects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID string,
|
||||
partID int, startOffset, length int64, metadata map[string]string, srcEtag string) (p minio.PartInfo, err error) {
|
||||
partID int, startOffset, length int64, srcInfo minio.ObjectInfo) (p minio.PartInfo, err error) {
|
||||
|
||||
srcInfo.UserDefined = map[string]string{
|
||||
"x-amz-copy-source-if-match": srcInfo.ETag,
|
||||
}
|
||||
completePart, err := l.Client.CopyObjectPart(srcBucket, srcObject, destBucket, destObject,
|
||||
uploadID, partID, startOffset, length, metadata)
|
||||
uploadID, partID, startOffset, length, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
return p, minio.ErrorRespToObjectError(errors.Trace(err), srcBucket, srcObject)
|
||||
}
|
||||
|
@ -43,13 +43,14 @@ type ObjectLayer interface {
|
||||
GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error)
|
||||
GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error)
|
||||
PutObject(bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error)
|
||||
CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcETag string) (objInfo ObjectInfo, err error)
|
||||
CopyObject(srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo) (objInfo ObjectInfo, err error)
|
||||
DeleteObject(bucket, object string) error
|
||||
|
||||
// Multipart operations.
|
||||
ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error)
|
||||
NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error)
|
||||
CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string, srcEtag string) (info PartInfo, err error)
|
||||
CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
|
||||
startOffset int64, length int64, srcInfo ObjectInfo) (info PartInfo, err error)
|
||||
PutObjectPart(bucket, object, uploadID string, partID int, data *hash.Reader) (info PartInfo, err error)
|
||||
ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error)
|
||||
AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
|
@ -288,9 +288,6 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
// Extract metadata relevant for an CopyObject operation based on conditional
|
||||
// header values specified in X-Amz-Metadata-Directive.
|
||||
func getCpObjMetadataFromHeader(header http.Header, defaultMeta map[string]string) (map[string]string, error) {
|
||||
// Make sure to remove saved etag if any, CopyObject calculates a new one.
|
||||
delete(defaultMeta, "etag")
|
||||
|
||||
// if x-amz-metadata-directive says REPLACE then
|
||||
// we extract metadata from the input headers.
|
||||
if isMetadataReplace(header) {
|
||||
@ -357,30 +354,33 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
|
||||
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject
|
||||
|
||||
objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
|
||||
srcInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Verify before x-amz-copy-source preconditions before continuing with CopyObject.
|
||||
if checkCopyObjectPreconditions(w, r, objInfo) {
|
||||
if checkCopyObjectPreconditions(w, r, srcInfo) {
|
||||
return
|
||||
}
|
||||
|
||||
/// maximum Upload size for object in a single CopyObject operation.
|
||||
if isMaxObjectSize(objInfo.Size) {
|
||||
if isMaxObjectSize(srcInfo.Size) {
|
||||
writeErrorResponse(w, ErrEntityTooLarge, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
newMetadata, err := getCpObjMetadataFromHeader(r.Header, objInfo.UserDefined)
|
||||
srcInfo.UserDefined, err = getCpObjMetadataFromHeader(r.Header, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
errorIf(err, "found invalid http request header")
|
||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure to remove saved etag if any, CopyObject calculates a new one.
|
||||
delete(srcInfo.UserDefined, "etag")
|
||||
|
||||
// Check if x-amz-metadata-directive was not set to REPLACE and source,
|
||||
// desination are same objects.
|
||||
if !isMetadataReplace(r.Header) && cpSrcDstSame {
|
||||
@ -392,7 +392,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
|
||||
// Copy source object to destination, if source and destination
|
||||
// object is same then only metadata is updated.
|
||||
objInfo, err = objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, newMetadata, objInfo.ETag)
|
||||
objInfo, err := objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, srcInfo)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||
return
|
||||
@ -713,7 +713,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
|
||||
objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
|
||||
srcInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||
return
|
||||
@ -723,7 +723,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
var hrange *httpRange
|
||||
rangeHeader := r.Header.Get("x-amz-copy-source-range")
|
||||
if rangeHeader != "" {
|
||||
if hrange, err = parseCopyPartRange(rangeHeader, objInfo.Size); err != nil {
|
||||
if hrange, err = parseCopyPartRange(rangeHeader, srcInfo.Size); err != nil {
|
||||
// Handle only errInvalidRange
|
||||
// Ignore other parse error and treat it as regular Get request like Amazon S3.
|
||||
errorIf(err, "Unable to extract range %s", rangeHeader)
|
||||
@ -733,13 +733,13 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
}
|
||||
|
||||
// Verify before x-amz-copy-source preconditions before continuing with CopyObject.
|
||||
if checkCopyObjectPartPreconditions(w, r, objInfo) {
|
||||
if checkCopyObjectPartPreconditions(w, r, srcInfo) {
|
||||
return
|
||||
}
|
||||
|
||||
// Get the object.
|
||||
var startOffset int64
|
||||
length := objInfo.Size
|
||||
length := srcInfo.Size
|
||||
if hrange != nil {
|
||||
length = hrange.getLength()
|
||||
startOffset = hrange.offsetBegin
|
||||
@ -751,10 +751,13 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure to remove all metadata from source for for multipart operations.
|
||||
srcInfo.UserDefined = nil
|
||||
|
||||
// Copy source object to destination, if source and destination
|
||||
// object is same then only metadata is updated.
|
||||
partInfo, err := objectAPI.CopyObjectPart(srcBucket, srcObject, dstBucket,
|
||||
dstObject, uploadID, partID, startOffset, length, nil, objInfo.ETag)
|
||||
dstObject, uploadID, partID, startOffset, length, srcInfo)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
|
||||
return
|
||||
|
@ -546,30 +546,21 @@ func (s *xlSets) DeleteObject(bucket string, object string) (err error) {
|
||||
}
|
||||
|
||||
// CopyObject - copies objects from one hashedSet to another hashedSet, on server side.
|
||||
func (s *xlSets) CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string, srcEtag string) (objInfo ObjectInfo, err error) {
|
||||
if len(s.sets) == 1 {
|
||||
return s.sets[0].CopyObject(srcBucket, srcObject, destBucket, destObject, metadata, srcEtag)
|
||||
}
|
||||
|
||||
func (s *xlSets) CopyObject(srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo) (objInfo ObjectInfo, err error) {
|
||||
srcSet := s.getHashedSet(srcObject)
|
||||
destSet := s.getHashedSet(destObject)
|
||||
|
||||
objInfo, err = srcSet.GetObjectInfo(srcBucket, srcObject)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
// Check if this request is only metadata update.
|
||||
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(destBucket, destObject))
|
||||
if cpMetadataOnly {
|
||||
return srcSet.CopyObject(srcBucket, srcObject, destBucket, destObject, metadata, srcEtag)
|
||||
return srcSet.CopyObject(srcBucket, srcObject, destBucket, destObject, srcInfo)
|
||||
}
|
||||
|
||||
// Initialize pipe.
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
|
||||
go func() {
|
||||
if gerr := srcSet.GetObject(srcBucket, srcObject, 0, objInfo.Size, pipeWriter, srcEtag); gerr != nil {
|
||||
if gerr := srcSet.GetObject(srcBucket, srcObject, 0, srcInfo.Size, pipeWriter, srcInfo.ETag); gerr != nil {
|
||||
errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject)
|
||||
pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject))
|
||||
return
|
||||
@ -577,13 +568,13 @@ func (s *xlSets) CopyObject(srcBucket, srcObject, destBucket, destObject string,
|
||||
pipeWriter.Close() // Close writer explicitly signalling we wrote all data.
|
||||
}()
|
||||
|
||||
hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "")
|
||||
hashReader, err := hash.NewReader(pipeReader, srcInfo.Size, "", "")
|
||||
if err != nil {
|
||||
pipeReader.CloseWithError(err)
|
||||
return objInfo, toObjectErr(errors.Trace(err), destBucket, destObject)
|
||||
return srcInfo, toObjectErr(errors.Trace(err), destBucket, destObject)
|
||||
}
|
||||
|
||||
objInfo, err = destSet.PutObject(destBucket, destObject, hashReader, metadata)
|
||||
objInfo, err = destSet.PutObject(destBucket, destObject, hashReader, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
pipeReader.CloseWithError(err)
|
||||
return objInfo, err
|
||||
@ -778,11 +769,7 @@ func (s *xlSets) NewMultipartUpload(bucket, object string, metadata map[string]s
|
||||
|
||||
// Copies a part of an object from source hashedSet to destination hashedSet.
|
||||
func (s *xlSets) CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
|
||||
startOffset int64, length int64, metadata map[string]string, srcEtag string) (partInfo PartInfo, err error) {
|
||||
if len(s.sets) == 1 {
|
||||
return s.sets[0].CopyObjectPart(srcBucket, srcObject, destBucket, destObject, uploadID, partID, startOffset,
|
||||
length, metadata, srcEtag)
|
||||
}
|
||||
startOffset int64, length int64, srcInfo ObjectInfo) (partInfo PartInfo, err error) {
|
||||
|
||||
srcSet := s.getHashedSet(srcObject)
|
||||
destSet := s.getHashedSet(destObject)
|
||||
@ -790,11 +777,12 @@ func (s *xlSets) CopyObjectPart(srcBucket, srcObject, destBucket, destObject str
|
||||
// Initialize pipe to stream from source.
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
go func() {
|
||||
if gerr := srcSet.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcEtag); gerr != nil {
|
||||
if gerr := srcSet.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil {
|
||||
errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject)
|
||||
pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject))
|
||||
return
|
||||
}
|
||||
|
||||
// Close writer explicitly signalling we wrote all data.
|
||||
pipeWriter.Close()
|
||||
return
|
||||
|
@ -587,7 +587,7 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st
|
||||
// data is read from an existing object.
|
||||
//
|
||||
// Implements S3 compatible Upload Part Copy API.
|
||||
func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, metadata map[string]string, srcEtag string) (pi PartInfo, e error) {
|
||||
func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo) (pi PartInfo, e error) {
|
||||
// Hold read locks on source object only if we are
|
||||
// going to read data from source object.
|
||||
objectSRLock := xl.nsMutex.NewNSLock(srcBucket, srcObject)
|
||||
@ -600,20 +600,11 @@ func (xl xlObjects) CopyObjectPart(srcBucket, srcObject, dstBucket, dstObject, u
|
||||
return pi, err
|
||||
}
|
||||
|
||||
if srcEtag != "" {
|
||||
objInfo, err := xl.getObjectInfo(srcBucket, srcObject)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
if objInfo.ETag != srcEtag {
|
||||
return pi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject)
|
||||
}
|
||||
}
|
||||
// Initialize pipe.
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
|
||||
go func() {
|
||||
if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil {
|
||||
if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil {
|
||||
errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject)
|
||||
pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject))
|
||||
return
|
||||
|
@ -79,7 +79,7 @@ func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks [
|
||||
// CopyObject - copy object source object to destination object.
|
||||
// if source object and destination object are same we only
|
||||
// update metadata.
|
||||
func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string, srcEtag string) (oi ObjectInfo, e error) {
|
||||
func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo) (oi ObjectInfo, e error) {
|
||||
cpSrcDstSame := srcBucket == dstBucket && srcObject == dstObject
|
||||
// Hold write lock on destination since in both cases
|
||||
// - if source and destination are same
|
||||
@ -103,16 +103,6 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
|
||||
defer objectSRLock.RUnlock()
|
||||
}
|
||||
|
||||
if srcEtag != "" {
|
||||
objInfo, perr := xl.getObjectInfo(srcBucket, srcObject)
|
||||
if perr != nil {
|
||||
return oi, toObjectErr(perr, srcBucket, srcObject)
|
||||
}
|
||||
if objInfo.ETag != srcEtag {
|
||||
return oi, toObjectErr(errors.Trace(InvalidETag{}), srcBucket, srcObject)
|
||||
}
|
||||
}
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllXLMetadata(xl.getDisks(), srcBucket, srcObject)
|
||||
|
||||
@ -144,7 +134,7 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
|
||||
// Check if this request is only metadata update.
|
||||
cpMetadataOnly := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
|
||||
if cpMetadataOnly {
|
||||
xlMeta.Meta = metadata
|
||||
xlMeta.Meta = srcInfo.UserDefined
|
||||
partsMetadata := make([]xlMetaV1, len(xl.getDisks()))
|
||||
// Update `xl.json` content on each disks.
|
||||
for index := range partsMetadata {
|
||||
@ -169,7 +159,7 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
|
||||
|
||||
go func() {
|
||||
var startOffset int64 // Read the whole file.
|
||||
if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, ""); gerr != nil {
|
||||
if gerr := xl.getObject(srcBucket, srcObject, startOffset, length, pipeWriter, srcInfo.ETag); gerr != nil {
|
||||
errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject)
|
||||
pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject))
|
||||
return
|
||||
@ -182,7 +172,7 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
|
||||
return oi, toObjectErr(errors.Trace(err), dstBucket, dstObject)
|
||||
}
|
||||
|
||||
objInfo, err := xl.putObject(dstBucket, dstObject, hashReader, metadata)
|
||||
objInfo, err := xl.putObject(dstBucket, dstObject, hashReader, srcInfo.UserDefined)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, dstBucket, dstObject)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user