api: Implement CopyObject s3 API, doing server side copy.

Fixes #1172
This commit is contained in:
Harshavardhana
2016-02-27 03:04:52 -08:00
parent 2520298734
commit 3ff8a1b719
11 changed files with 263 additions and 51 deletions

View File

@@ -17,12 +17,15 @@
package main
import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/gorilla/mux"
mux "github.com/gorilla/mux"
"github.com/minio/minio/pkg/fs"
"github.com/minio/minio/pkg/probe"
)
const (
@@ -147,14 +150,141 @@ func (api storageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusOK)
}
// CopyObjectHandler - Copy Object
// ----------
// This implementation of the PUT operation adds an object to a bucket
// while reading the object from another source.
func (api storageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
if isRequestRequiresACLCheck(r) {
writeErrorResponse(w, r, AccessDenied, r.URL.Path)
return
}
if !isSignV4ReqAuthenticated(api.Signature, r) {
writeErrorResponse(w, r, SignatureDoesNotMatch, r.URL.Path)
return
}
// TODO: Reject requests where body/payload is present, for now we
// don't even read it.
// objectSource
objectSource := r.Header.Get("X-Amz-Copy-Source")
// Skip the first element if it is '/', split the rest.
if strings.HasPrefix(objectSource, "/") {
objectSource = objectSource[1:]
}
splits := strings.SplitN(objectSource, "/", 2)
// Save sourceBucket and sourceObject extracted from url Path.
var sourceBucket, sourceObject string
if len(splits) == 2 {
sourceBucket = splits[0]
sourceObject = splits[1]
}
// If source object is empty, reply back error.
if sourceObject == "" {
writeErrorResponse(w, r, InvalidCopySource, r.URL.Path)
return
}
// Source and destination objects cannot be same, reply back error.
if sourceObject == object && sourceBucket == bucket {
writeErrorResponse(w, r, InvalidCopyDest, r.URL.Path)
return
}
metadata, err := api.Filesystem.GetObjectMetadata(sourceBucket, sourceObject)
if err != nil {
errorIf(err.Trace(), "GetObjectMetadata failed.", nil)
switch err.ToGoError().(type) {
case fs.BucketNameInvalid:
writeErrorResponse(w, r, InvalidBucketName, objectSource)
case fs.BucketNotFound:
writeErrorResponse(w, r, NoSuchBucket, objectSource)
case fs.ObjectNotFound:
writeErrorResponse(w, r, NoSuchKey, objectSource)
case fs.ObjectNameInvalid:
writeErrorResponse(w, r, NoSuchKey, objectSource)
default:
writeErrorResponse(w, r, InternalError, objectSource)
}
return
}
/// maximum Upload size for object in a single CopyObject operation.
if isMaxObjectSize(metadata.Size) {
writeErrorResponse(w, r, EntityTooLarge, objectSource)
return
}
// Initialize a pipe for data pipe line.
reader, writer := io.Pipe()
// Start writing in a routine.
go func() {
defer writer.Close()
if _, getErr := api.Filesystem.GetObject(writer, sourceBucket, sourceObject, 0, 0); getErr != nil {
writer.CloseWithError(probe.WrapError(getErr))
return
}
}()
// Verify md5sum.
expectedMD5Sum := metadata.MD5
// Size of object.
size := metadata.Size
// Create the object.
metadata, err = api.Filesystem.CreateObject(bucket, object, expectedMD5Sum, size, reader, nil)
if err != nil {
errorIf(err.Trace(), "CreateObject failed.", nil)
switch err.ToGoError().(type) {
case fs.RootPathFull:
writeErrorResponse(w, r, RootPathFull, r.URL.Path)
case fs.BucketNotFound:
writeErrorResponse(w, r, NoSuchBucket, r.URL.Path)
case fs.BucketNameInvalid:
writeErrorResponse(w, r, InvalidBucketName, r.URL.Path)
case fs.BadDigest:
writeErrorResponse(w, r, BadDigest, r.URL.Path)
case fs.IncompleteBody:
writeErrorResponse(w, r, IncompleteBody, r.URL.Path)
case fs.InvalidDigest:
writeErrorResponse(w, r, InvalidDigest, r.URL.Path)
case fs.ObjectExistsAsPrefix:
writeErrorResponse(w, r, ObjectExistsAsPrefix, r.URL.Path)
default:
writeErrorResponse(w, r, InternalError, r.URL.Path)
}
return
}
response := generateCopyObjectResponse(metadata.MD5, metadata.LastModified)
encodedSuccessResponse := encodeSuccessResponse(response)
// write headers
setCommonHeaders(w)
// write success response.
writeSuccessResponse(w, encodedSuccessResponse)
}
// PutObjectHandler - PUT Object
// ----------
// This implementation of the PUT operation adds an object to a bucket.
func (api storageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
var object, bucket string
// If the matching failed, it means that the X-Amz-Copy-Source was
// wrong, fail right here.
if _, ok := r.Header["X-Amz-Copy-Source"]; ok {
writeErrorResponse(w, r, InvalidCopySource, r.URL.Path)
return
}
vars := mux.Vars(r)
bucket = vars["bucket"]
object = vars["object"]
bucket := vars["bucket"]
object := vars["object"]
if isRequestRequiresACLCheck(r) {
if api.Filesystem.IsPrivateBucket(bucket) || api.Filesystem.IsReadOnlyBucket(bucket) {