Allow CopyObjectPart to work in federated setups (#8066)

Fixes #8065
This commit is contained in:
Harshavardhana 2019-08-20 07:19:22 -10:00 committed by GitHub
parent c601cb2f1e
commit 069badc7e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -601,8 +601,21 @@ var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core,
return core, nil
}
// Check if the destination bucket is on a remote site, this code only gets executed
// when federation is enabled, ie when globalDNSConfig is non 'nil'.
//
// This function is similar to isRemoteCallRequired but specifically for COPY object API
// if destination and source are same we do not need to check for destnation bucket
// to exist locally.
func isRemoteCopyRequired(ctx context.Context, srcBucket, dstBucket string, objAPI ObjectLayer) bool {
if srcBucket == dstBucket {
return false
}
return isRemoteCallRequired(ctx, dstBucket, objAPI)
}
// Check if the bucket is on a remote site, this code only gets executed when federation is enabled.
var isRemoteCallRequired = func(ctx context.Context, bucket string, objAPI ObjectLayer) bool {
func isRemoteCallRequired(ctx context.Context, bucket string, objAPI ObjectLayer) bool {
if globalDNSConfig == nil {
return false
}
@ -787,23 +800,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
length = actualSize
}
// Check if the destination bucket is on a remote site, this code only gets executed
// when federation is enabled, ie when globalDNSConfig is non 'nil'.
//
// This function is similar to isRemoteCallRequired but specifically for COPY object API
// if destination and source are same we do not need to check for destnation bucket
// to exist locally.
var isRemoteCopyRequired = func(ctx context.Context, srcBucket, dstBucket string, objAPI ObjectLayer) bool {
if globalDNSConfig == nil {
return false
}
if srcBucket == dstBucket {
return false
}
_, err := objAPI.GetBucketInfo(ctx, dstBucket)
return err == toObjectErr(errVolumeNotFound, dstBucket)
}
var compressMetadata map[string]string
// No need to compress for remote etcd calls
// Pass the decompressed stream to such calls.
@ -1591,6 +1587,36 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
if isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) {
var dstRecords []dns.SrvRecord
dstRecords, err = globalDNSConfig.Get(dstBucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Send PutObject request to appropriate instance (in federated deployment)
client, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL, guessIsBrowserReq(r))
return
}
partInfo, err := client.PutObjectPart(dstBucket, dstObject, uploadID, partID,
srcInfo.Reader, srcInfo.Size, "", "", dstOpts.ServerSideEncryption)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
encodedSuccessResponse := encodeResponse(response)
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
return
}
actualPartSize = length
var reader io.Reader