Merge pull request #1174 from harshavardhana/copy-object

api: Implement CopyObject s3 API, doing server side copy.
This commit is contained in:
Harshavardhana 2016-02-27 20:44:56 -08:00
commit 0b2e449727
11 changed files with 263 additions and 51 deletions

View File

@ -57,6 +57,9 @@ const (
InvalidMaxUploads InvalidMaxUploads
InvalidMaxParts InvalidMaxParts
InvalidPartNumberMarker InvalidPartNumberMarker
InvalidRequestBody
InvalidCopySource
InvalidCopyDest
MalformedXML MalformedXML
MissingContentLength MissingContentLength
MissingRequestBodyError MissingRequestBodyError
@ -80,6 +83,21 @@ const (
// APIError code to Error structure map // APIError code to Error structure map
var errorCodeResponse = map[int]APIError{ var errorCodeResponse = map[int]APIError{
InvalidCopyDest: {
Code: "InvalidRequest",
Description: "This copy request is illegal because it is trying to copy an object to itself.",
HTTPStatusCode: http.StatusBadRequest,
},
InvalidCopySource: {
Code: "InvalidArgument",
Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.",
HTTPStatusCode: http.StatusBadRequest,
},
InvalidRequestBody: {
Code: "InvalidArgument",
Description: "Body shouldn't be set for this request.",
HTTPStatusCode: http.StatusBadRequest,
},
InvalidMaxUploads: { InvalidMaxUploads: {
Code: "InvalidArgument", Code: "InvalidArgument",
Description: "Argument maxUploads must be an integer between 0 and 2147483647.", Description: "Argument maxUploads must be an integer between 0 and 2147483647.",

View File

@ -76,7 +76,7 @@ func setObjectHeaders(w http.ResponseWriter, metadata fs.ObjectMetadata, content
setCommonHeaders(w) setCommonHeaders(w)
} }
// set object headers // set object headers
lastModified := metadata.Created.Format(http.TimeFormat) lastModified := metadata.LastModified.Format(http.TimeFormat)
// object related headers // object related headers
w.Header().Set("Content-Type", metadata.ContentType) w.Header().Set("Content-Type", metadata.ContentType)
if metadata.MD5 != "" { if metadata.MD5 != "" {

View File

@ -19,6 +19,7 @@ package main
import ( import (
"encoding/xml" "encoding/xml"
"net/http" "net/http"
"time"
"github.com/minio/minio/pkg/fs" "github.com/minio/minio/pkg/fs"
) )
@ -181,6 +182,14 @@ type Object struct {
StorageClass string StorageClass string
} }
// CopyObjectResponse container returns ETag and LastModified of the
// successfully copied object
type CopyObjectResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyObjectResult" json:"-"`
ETag string
LastModified string // time string of format "2006-01-02T15:04:05.000Z"
}
// Initiator inherit from Owner struct, fields are same // Initiator inherit from Owner struct, fields are same
type Initiator Owner type Initiator Owner
@ -289,7 +298,7 @@ func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKe
continue continue
} }
content.Key = object.Object content.Key = object.Object
content.LastModified = object.Created.Format(timeFormatAMZ) content.LastModified = object.LastModified.Format(timeFormatAMZ)
if object.MD5 != "" { if object.MD5 != "" {
content.ETag = "\"" + object.MD5 + "\"" content.ETag = "\"" + object.MD5 + "\""
} }
@ -318,6 +327,14 @@ func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKe
return data return data
} }
// generateCopyObjectResponse
func generateCopyObjectResponse(etag string, lastModified time.Time) CopyObjectResponse {
return CopyObjectResponse{
ETag: "\"" + etag + "\"",
LastModified: lastModified.Format(timeFormatAMZ),
}
}
// generateInitiateMultipartUploadResponse // generateInitiateMultipartUploadResponse
func generateInitiateMultipartUploadResponse(bucket, key, uploadID string) InitiateMultipartUploadResponse { func generateInitiateMultipartUploadResponse(bucket, key, uploadID string) InitiateMultipartUploadResponse {
return InitiateMultipartUploadResponse{ return InitiateMultipartUploadResponse{

View File

@ -251,11 +251,6 @@ func (h resourceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeErrorResponse(w, r, NotImplemented, r.URL.Path) writeErrorResponse(w, r, NotImplemented, r.URL.Path)
return return
} }
// X-Amz-Copy-Source should be ignored as NotImplemented.
if _, ok := r.Header[http.CanonicalHeaderKey("x-amz-copy-source")]; ok {
writeErrorResponse(w, r, NotImplemented, r.URL.Path)
return
}
h.handler.ServeHTTP(w, r) h.handler.ServeHTTP(w, r)
} }

View File

@ -17,12 +17,15 @@
package main package main
import ( import (
"io"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"strings"
"github.com/gorilla/mux" mux "github.com/gorilla/mux"
"github.com/minio/minio/pkg/fs" "github.com/minio/minio/pkg/fs"
"github.com/minio/minio/pkg/probe"
) )
const ( const (
@ -147,14 +150,141 @@ func (api storageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
// CopyObjectHandler - Copy Object
// ----------
// This implementation of the PUT operation adds an object to a bucket
// while reading the object from another source.
func (api storageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
if isRequestRequiresACLCheck(r) {
writeErrorResponse(w, r, AccessDenied, r.URL.Path)
return
}
if !isSignV4ReqAuthenticated(api.Signature, r) {
writeErrorResponse(w, r, SignatureDoesNotMatch, r.URL.Path)
return
}
// TODO: Reject requests where body/payload is present, for now we
// don't even read it.
// objectSource
objectSource := r.Header.Get("X-Amz-Copy-Source")
// Skip the first element if it is '/', split the rest.
if strings.HasPrefix(objectSource, "/") {
objectSource = objectSource[1:]
}
splits := strings.SplitN(objectSource, "/", 2)
// Save sourceBucket and sourceObject extracted from url Path.
var sourceBucket, sourceObject string
if len(splits) == 2 {
sourceBucket = splits[0]
sourceObject = splits[1]
}
// If source object is empty, reply back error.
if sourceObject == "" {
writeErrorResponse(w, r, InvalidCopySource, r.URL.Path)
return
}
// Source and destination objects cannot be same, reply back error.
if sourceObject == object && sourceBucket == bucket {
writeErrorResponse(w, r, InvalidCopyDest, r.URL.Path)
return
}
metadata, err := api.Filesystem.GetObjectMetadata(sourceBucket, sourceObject)
if err != nil {
errorIf(err.Trace(), "GetObjectMetadata failed.", nil)
switch err.ToGoError().(type) {
case fs.BucketNameInvalid:
writeErrorResponse(w, r, InvalidBucketName, objectSource)
case fs.BucketNotFound:
writeErrorResponse(w, r, NoSuchBucket, objectSource)
case fs.ObjectNotFound:
writeErrorResponse(w, r, NoSuchKey, objectSource)
case fs.ObjectNameInvalid:
writeErrorResponse(w, r, NoSuchKey, objectSource)
default:
writeErrorResponse(w, r, InternalError, objectSource)
}
return
}
/// maximum Upload size for object in a single CopyObject operation.
if isMaxObjectSize(metadata.Size) {
writeErrorResponse(w, r, EntityTooLarge, objectSource)
return
}
// Initialize a pipe for data pipe line.
reader, writer := io.Pipe()
// Start writing in a routine.
go func() {
defer writer.Close()
if _, getErr := api.Filesystem.GetObject(writer, sourceBucket, sourceObject, 0, 0); getErr != nil {
writer.CloseWithError(probe.WrapError(getErr))
return
}
}()
// Verify md5sum.
expectedMD5Sum := metadata.MD5
// Size of object.
size := metadata.Size
// Create the object.
metadata, err = api.Filesystem.CreateObject(bucket, object, expectedMD5Sum, size, reader, nil)
if err != nil {
errorIf(err.Trace(), "CreateObject failed.", nil)
switch err.ToGoError().(type) {
case fs.RootPathFull:
writeErrorResponse(w, r, RootPathFull, r.URL.Path)
case fs.BucketNotFound:
writeErrorResponse(w, r, NoSuchBucket, r.URL.Path)
case fs.BucketNameInvalid:
writeErrorResponse(w, r, InvalidBucketName, r.URL.Path)
case fs.BadDigest:
writeErrorResponse(w, r, BadDigest, r.URL.Path)
case fs.IncompleteBody:
writeErrorResponse(w, r, IncompleteBody, r.URL.Path)
case fs.InvalidDigest:
writeErrorResponse(w, r, InvalidDigest, r.URL.Path)
case fs.ObjectExistsAsPrefix:
writeErrorResponse(w, r, ObjectExistsAsPrefix, r.URL.Path)
default:
writeErrorResponse(w, r, InternalError, r.URL.Path)
}
return
}
response := generateCopyObjectResponse(metadata.MD5, metadata.LastModified)
encodedSuccessResponse := encodeSuccessResponse(response)
// write headers
setCommonHeaders(w)
// write success response.
writeSuccessResponse(w, encodedSuccessResponse)
}
// PutObjectHandler - PUT Object // PutObjectHandler - PUT Object
// ---------- // ----------
// This implementation of the PUT operation adds an object to a bucket. // This implementation of the PUT operation adds an object to a bucket.
func (api storageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Request) { func (api storageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
var object, bucket string // If the matching failed, it means that the X-Amz-Copy-Source was
// wrong, fail right here.
if _, ok := r.Header["X-Amz-Copy-Source"]; ok {
writeErrorResponse(w, r, InvalidCopySource, r.URL.Path)
return
}
vars := mux.Vars(r) vars := mux.Vars(r)
bucket = vars["bucket"] bucket := vars["bucket"]
object = vars["object"] object := vars["object"]
if isRequestRequiresACLCheck(r) { if isRequestRequiresACLCheck(r) {
if api.Filesystem.IsPrivateBucket(bucket) || api.Filesystem.IsReadOnlyBucket(bucket) { if api.Filesystem.IsPrivateBucket(bucket) || api.Filesystem.IsReadOnlyBucket(bucket) {

View File

@ -103,10 +103,10 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
objectName := strings.TrimPrefix(path, bucketPathPrefix) objectName := strings.TrimPrefix(path, bucketPathPrefix)
if strings.HasPrefix(objectName, prefix) { if strings.HasPrefix(objectName, prefix) {
object := ObjectMetadata{ object := ObjectMetadata{
Object: objectName, Object: objectName,
Created: info.ModTime(), LastModified: info.ModTime(),
Mode: info.Mode(), Mode: info.Mode(),
Size: info.Size(), Size: info.Size(),
} }
select { select {
// Send object on walker channel. // Send object on walker channel.

View File

@ -33,11 +33,11 @@ type ObjectMetadata struct {
Bucket string Bucket string
Object string Object string
ContentType string ContentType string
Created time.Time LastModified time.Time
Mode os.FileMode Mode os.FileMode
MD5 string MD5 string
Size int64 Size int64
} }
// PartMetadata - various types of individual part resources // PartMetadata - various types of individual part resources

View File

@ -546,12 +546,12 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
} }
} }
newObject := ObjectMetadata{ newObject := ObjectMetadata{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
Created: st.ModTime(), LastModified: st.ModTime(),
Size: st.Size(), Size: st.Size(),
ContentType: contentType, ContentType: contentType,
MD5: hex.EncodeToString(md5Hasher.Sum(nil)), MD5: hex.EncodeToString(md5Hasher.Sum(nil)),
} }
return newObject, nil return newObject, nil
} }

View File

@ -165,12 +165,12 @@ func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error)
} }
} }
metadata := ObjectMetadata{ metadata := ObjectMetadata{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
Created: stat.ModTime(), LastModified: stat.ModTime(),
Size: stat.Size(), Size: stat.Size(),
ContentType: contentType, ContentType: contentType,
Mode: stat.Mode(), Mode: stat.Mode(),
} }
return metadata, nil return metadata, nil
} }
@ -312,12 +312,12 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
} }
} }
newObject := ObjectMetadata{ newObject := ObjectMetadata{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,
Created: st.ModTime(), LastModified: st.ModTime(),
Size: st.Size(), Size: st.Size(),
ContentType: contentType, ContentType: contentType,
MD5: md5Sum, MD5: md5Sum,
} }
return newObject, nil return newObject, nil
} }

View File

@ -105,29 +105,53 @@ func registerAPIHandlers(mux *router.Router, a storageAPI, w *webAPI) {
// Bucket router // Bucket router
bucket := api.PathPrefix("/{bucket}").Subrouter() bucket := api.PathPrefix("/{bucket}").Subrouter()
// Object operations /// Object operations
// HeadObject
bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(a.HeadObjectHandler) bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(a.HeadObjectHandler)
// PutObjectPart
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(a.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(a.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// ListObjectPxarts
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(a.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}") bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(a.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(a.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}") bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(a.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}")
// NewMultipartUpload
bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(a.NewMultipartUploadHandler).Queries("uploads", "") bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(a.NewMultipartUploadHandler).Queries("uploads", "")
// AbortMultipartUpload
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(a.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}") bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(a.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}")
// GetObject
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(a.GetObjectHandler) bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(a.GetObjectHandler)
// CopyObject
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/).*?").HandlerFunc(a.CopyObjectHandler)
// PutObject
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(a.PutObjectHandler) bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(a.PutObjectHandler)
// DeleteObject
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(a.DeleteObjectHandler) bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(a.DeleteObjectHandler)
// Bucket operations /// Bucket operations
// GetBucketLocation
bucket.Methods("GET").HandlerFunc(a.GetBucketLocationHandler).Queries("location", "") bucket.Methods("GET").HandlerFunc(a.GetBucketLocationHandler).Queries("location", "")
// GetBucketACL
bucket.Methods("GET").HandlerFunc(a.GetBucketACLHandler).Queries("acl", "") bucket.Methods("GET").HandlerFunc(a.GetBucketACLHandler).Queries("acl", "")
// ListMultipartUploads
bucket.Methods("GET").HandlerFunc(a.ListMultipartUploadsHandler).Queries("uploads", "") bucket.Methods("GET").HandlerFunc(a.ListMultipartUploadsHandler).Queries("uploads", "")
// ListObjects
bucket.Methods("GET").HandlerFunc(a.ListObjectsHandler) bucket.Methods("GET").HandlerFunc(a.ListObjectsHandler)
// PutBucketACL
bucket.Methods("PUT").HandlerFunc(a.PutBucketACLHandler).Queries("acl", "") bucket.Methods("PUT").HandlerFunc(a.PutBucketACLHandler).Queries("acl", "")
// PutBucket
bucket.Methods("PUT").HandlerFunc(a.PutBucketHandler) bucket.Methods("PUT").HandlerFunc(a.PutBucketHandler)
// HeadBucket
bucket.Methods("HEAD").HandlerFunc(a.HeadBucketHandler) bucket.Methods("HEAD").HandlerFunc(a.HeadBucketHandler)
// PostPolicy
bucket.Methods("POST").HandlerFunc(a.PostPolicyBucketHandler) bucket.Methods("POST").HandlerFunc(a.PostPolicyBucketHandler)
// DeleteBucket
bucket.Methods("DELETE").HandlerFunc(a.DeleteBucketHandler) bucket.Methods("DELETE").HandlerFunc(a.DeleteBucketHandler)
// Root operation /// Root operation
// ListBuckets
api.Methods("GET").HandlerFunc(a.ListBucketsHandler) api.Methods("GET").HandlerFunc(a.ListBucketsHandler)
} }

View File

@ -508,15 +508,6 @@ func (s *MyAPIFSCacheSuite) TestNotImplemented(c *C) {
response, err := client.Do(request) response, err := client.Do(request)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusNotImplemented) c.Assert(response.StatusCode, Equals, http.StatusNotImplemented)
request, err = s.newRequest("POST", testAPIFSCacheServer.URL+"/bucket/object", 0, nil)
request.Header.Set("X-Amz-Copy-Source", "/bucket/object-old")
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusNotImplemented)
} }
func (s *MyAPIFSCacheSuite) TestHeader(c *C) { func (s *MyAPIFSCacheSuite) TestHeader(c *C) {
@ -550,6 +541,44 @@ func (s *MyAPIFSCacheSuite) TestPutBucket(c *C) {
c.Assert(response.StatusCode, Equals, http.StatusOK) c.Assert(response.StatusCode, Equals, http.StatusOK)
} }
func (s *MyAPIFSCacheSuite) TestCopyObject(c *C) {
request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object-copy", 0, nil)
c.Assert(err, IsNil)
request.Header.Add("x-amz-acl", "private")
client := http.Client{}
response, err := client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
buffer1 := bytes.NewReader([]byte("hello world"))
request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object-copy/object", int64(buffer1.Len()), buffer1)
c.Assert(err, IsNil)
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object-copy/object1", 0, nil)
request.Header.Set("X-Amz-Copy-Source", "/put-object-copy/object")
c.Assert(err, IsNil)
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/put-object-copy/object1", 0, nil)
c.Assert(err, IsNil)
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
object, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
c.Assert(string(object), Equals, "hello world")
}
func (s *MyAPIFSCacheSuite) TestPutObject(c *C) { func (s *MyAPIFSCacheSuite) TestPutObject(c *C) {
request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object", 0, nil) request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object", 0, nil)
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -759,7 +788,6 @@ func (s *MyAPIFSCacheSuite) TestPartialContent(c *C) {
// prepare request // prepare request
request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/partial-content/bar", 0, nil) request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/partial-content/bar", 0, nil)
c.Assert(err, IsNil) c.Assert(err, IsNil)
request.Header.Add("Accept", "application/json")
request.Header.Add("Range", "bytes=6-7") request.Header.Add("Range", "bytes=6-7")
client = http.Client{} client = http.Client{}