Allow CopyObject() in S3 gateway to support metadata (#5000)

Fixes #4924
This commit is contained in:
Harshavardhana
2017-10-03 10:38:25 -07:00
committed by Dee Koder
parent 53f3d2fd65
commit 89d528a4ed
14 changed files with 194 additions and 6045 deletions

View File

@@ -55,11 +55,11 @@ func (l *s3Objects) AnonPutObject(bucket string, object string, size int64, data
// AnonGetObject - Get object anonymously
func (l *s3Objects) AnonGetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
r := minio.NewGetReqHeaders()
if err := r.SetRange(startOffset, startOffset+length-1); err != nil {
opts := minio.GetObjectOptions{}
if err := opts.SetRange(startOffset, startOffset+length-1); err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
object, _, err := l.anonClient.GetObject(bucket, key, r)
object, _, err := l.anonClient.GetObject(bucket, key, opts)
if err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
@@ -75,8 +75,7 @@ func (l *s3Objects) AnonGetObject(bucket string, key string, startOffset int64,
// AnonGetObjectInfo - Get object info anonymously
func (l *s3Objects) AnonGetObjectInfo(bucket string, object string) (objInfo ObjectInfo, e error) {
r := minio.NewHeadReqHeaders()
oi, err := l.anonClient.StatObject(bucket, object, r)
oi, err := l.anonClient.StatObject(bucket, object, minio.StatObjectOptions{})
if err != nil {
return objInfo, s3ToObjectError(traceError(err), bucket, object)
}

View File

@@ -19,7 +19,6 @@ package cmd
import (
"io"
"net/http"
"strings"
"encoding/hex"
@@ -290,22 +289,20 @@ func fromMinioClientListBucketResult(bucket string, result minio.ListBucketResul
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *s3Objects) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
r := minio.NewGetReqHeaders()
if length < 0 && length != -1 {
return s3ToObjectError(traceError(errInvalidArgument), bucket, key)
}
opts := minio.GetObjectOptions{}
if startOffset >= 0 && length >= 0 {
if err := r.SetRange(startOffset, startOffset+length-1); err != nil {
if err := opts.SetRange(startOffset, startOffset+length-1); err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
}
object, _, err := l.Client.GetObject(bucket, key, r)
object, _, err := l.Client.GetObject(bucket, key, opts)
if err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
defer object.Close()
if _, err := io.Copy(writer, object); err != nil {
@@ -333,8 +330,7 @@ func fromMinioClientObjectInfo(bucket string, oi minio.ObjectInfo) ObjectInfo {
// GetObjectInfo reads object info and replies back ObjectInfo
func (l *s3Objects) GetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
r := minio.NewHeadReqHeaders()
oi, err := l.Client.StatObject(bucket, object, r)
oi, err := l.Client.StatObject(bucket, object, minio.StatObjectOptions{})
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), bucket, object)
}
@@ -361,35 +357,17 @@ func (l *s3Objects) PutObject(bucket string, object string, data *HashReader, me
return fromMinioClientObjectInfo(bucket, oi), nil
}
// CopyObject copies a blob from source container to destination container.
// CopyObject copies an object from source bucket to a destination bucket.
func (l *s3Objects) CopyObject(srcBucket string, srcObject string, dstBucket string, dstObject string, metadata map[string]string) (objInfo ObjectInfo, err error) {
// Source object
src := minio.NewSourceInfo(srcBucket, srcObject, nil)
// Destination object
var xamzMeta = map[string]string{}
for key := range metadata {
for _, prefix := range userMetadataKeyPrefixes {
if strings.HasPrefix(key, prefix) {
xamzMeta[key] = metadata[key]
}
}
}
dst, err := minio.NewDestinationInfo(dstBucket, dstObject, nil, xamzMeta)
if err != nil {
return objInfo, s3ToObjectError(traceError(err), dstBucket, dstObject)
}
if err = l.Client.CopyObject(dst, src); err != nil {
// Set this header such that following CopyObject() always sets the right metadata on the destination.
// metadata input is already a trickled down value from interpreting x-amz-metadata-directive at
// handler layer. So what we have right now is supposed to be applied on the destination object anyways.
// So preserve it by adding "REPLACE" directive to save all the metadata set by CopyObject API.
metadata["x-amz-metadata-directive"] = "REPLACE"
if _, err = l.Client.CopyObject(srcBucket, srcObject, dstBucket, dstObject, metadata); err != nil {
return objInfo, s3ToObjectError(traceError(err), srcBucket, srcObject)
}
oi, err := l.GetObjectInfo(dstBucket, dstObject)
if err != nil {
return objInfo, s3ToObjectError(traceError(err), dstBucket, dstObject)
}
return oi, nil
return l.GetObjectInfo(dstBucket, dstObject)
}
// DeleteObject deletes a blob in bucket

View File

@@ -398,7 +398,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if err != nil {
errorIf(err, "found invalid http request header")
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
// Check if x-amz-metadata-directive was not set to REPLACE and source,
// desination are same objects.
if !isMetadataReplace(r.Header) && cpSrcDstSame {