Implement gateway S3 support (#3940)

This commit is contained in:
Remco Verhoef
2017-04-27 20:26:00 +02:00
committed by Harshavardhana
parent 57c5c75611
commit 3a539ce660
61 changed files with 9607 additions and 199 deletions

View File

@@ -114,6 +114,7 @@ const (
ErrInvalidQueryParams
ErrBucketAlreadyOwnedByYou
ErrInvalidDuration
ErrNotSupported
// Add new error codes here.
// Bucket notification related errors.
@@ -666,6 +667,8 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
apiErr = ErrInvalidBucketName
case BucketNotFound:
apiErr = ErrNoSuchBucket
case BucketAlreadyOwnedByYou:
apiErr = ErrBucketAlreadyOwnedByYou
case BucketNotEmpty:
apiErr = ErrBucketNotEmpty
case BucketExists:
@@ -698,8 +701,12 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
apiErr = ErrEntityTooLarge
case ObjectTooSmall:
apiErr = ErrEntityTooSmall
case NotSupported:
apiErr = ErrNotSupported
case NotImplemented:
apiErr = ErrNotImplemented
case PolicyNotFound:
apiErr = ErrNoSuchBucketPolicy
default:
apiErr = ErrInternalError
}

View File

@@ -103,6 +103,10 @@ func TestAPIErrCode(t *testing.T) {
StorageFull{},
ErrStorageFull,
},
{
NotSupported{},
ErrNotSupported,
},
{
NotImplemented{},
ErrNotImplemented,

View File

@@ -154,6 +154,12 @@ func (a AzureObjects) StorageInfo() StorageInfo {
// MakeBucket - Create a new container on azure backend.
func (a AzureObjects) MakeBucket(bucket string) error {
// will never be called, only satisfy ObjectLayer interface
return traceError(NotImplemented{})
}
// MakeBucketWithLocation - Create a new container on azure backend.
func (a AzureObjects) MakeBucketWithLocation(bucket, location string) error {
err := a.client.CreateContainer(bucket, storage.ContainerAccessTypePrivate)
return azureToObjectError(traceError(err), bucket)
}
@@ -592,46 +598,16 @@ func azureListBlobsGetParameters(p storage.ListBlobsParameters) url.Values {
// storage.ContainerAccessTypePrivate - none in minio terminology
// As the common denominator for minio and azure is readonly and none, we support
// these two policies at the bucket level.
func (a AzureObjects) SetBucketPolicies(bucket string, policies []BucketAccessPolicy) error {
prefix := bucket + "/*" // For all objects inside the bucket.
if len(policies) != 1 {
return traceError(NotImplemented{})
}
if policies[0].Prefix != prefix {
return traceError(NotImplemented{})
}
if policies[0].Policy != policy.BucketPolicyReadOnly {
return traceError(NotImplemented{})
}
perm := storage.ContainerPermissions{
AccessType: storage.ContainerAccessTypeContainer,
AccessPolicies: nil,
}
err := a.client.SetContainerPermissions(bucket, perm, 0, "")
return azureToObjectError(traceError(err), bucket)
func (a AzureObjects) SetBucketPolicies(bucket string, policyInfo policy.BucketAccessPolicy) error {
return traceError(NotSupported{})
}
// GetBucketPolicies - Get the container ACL and convert it to canonical []bucketAccessPolicy
func (a AzureObjects) GetBucketPolicies(bucket string) ([]BucketAccessPolicy, error) {
perm, err := a.client.GetContainerPermissions(bucket, 0, "")
if err != nil {
return nil, azureToObjectError(traceError(err), bucket)
}
switch perm.AccessType {
case storage.ContainerAccessTypePrivate:
return nil, nil
case storage.ContainerAccessTypeContainer:
return []BucketAccessPolicy{{"", policy.BucketPolicyReadOnly}}, nil
}
return nil, azureToObjectError(traceError(NotImplemented{}))
func (a AzureObjects) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
return policy.BucketAccessPolicy{}, traceError(NotSupported{})
}
// DeleteBucketPolicies - Set the container ACL to "private"
func (a AzureObjects) DeleteBucketPolicies(bucket string) error {
perm := storage.ContainerPermissions{
AccessType: storage.ContainerAccessTypePrivate,
AccessPolicies: nil,
}
err := a.client.SetContainerPermissions(bucket, perm, 0, "")
return azureToObjectError(traceError(err))
return traceError(NotSupported{})
}

View File

@@ -17,7 +17,6 @@
package cmd
import (
"bytes"
"io"
"io/ioutil"
"net/http"
@@ -354,37 +353,13 @@ func (api gatewayAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *h
return
}
{
// FIXME: consolidate bucketPolicy and policy.BucketAccessPolicy so that
// the verification below is done on the same type.
// Parse bucket policy.
policyInfo := &bucketPolicy{}
err = parseBucketPolicy(bytes.NewReader(policyBytes), policyInfo)
if err != nil {
errorIf(err, "Unable to parse bucket policy.")
writeErrorResponse(w, ErrInvalidPolicyDocument, r.URL)
return
}
// Parse check bucket policy.
if s3Error := checkBucketPolicyResources(bucket, policyInfo); s3Error != ErrNone {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
}
policyInfo := &policy.BucketAccessPolicy{}
if err = json.Unmarshal(policyBytes, policyInfo); err != nil {
policyInfo := policy.BucketAccessPolicy{}
if err = json.Unmarshal(policyBytes, &policyInfo); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
var policies []BucketAccessPolicy
for prefix, policy := range policy.GetPolicies(policyInfo.Statements, bucket) {
policies = append(policies, BucketAccessPolicy{
Prefix: prefix,
Policy: policy,
})
}
if err = objAPI.SetBucketPolicies(bucket, policies); err != nil {
if err = objAPI.SetBucketPolicies(bucket, policyInfo); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@@ -453,17 +428,14 @@ func (api gatewayAPIHandlers) GetBucketPolicyHandler(w http.ResponseWriter, r *h
return
}
policies, err := objAPI.GetBucketPolicies(bucket)
bp, err := objAPI.GetBucketPolicies(bucket)
if err != nil {
errorIf(err, "Unable to read bucket policy.")
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
for _, p := range policies {
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, p.Policy, bucket, p.Prefix)
}
policyBytes, err := json.Marshal(&policyInfo)
policyBytes, err := json.Marshal(bp)
if err != nil {
errorIf(err, "Unable to read bucket policy.")
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@@ -499,6 +471,65 @@ func (api gatewayAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWri
writeErrorResponse(w, ErrNotImplemented, r.URL)
}
// PutBucketHandler - PUT Bucket
// ----------
// This implementation of the PUT operation creates a new bucket for authenticated request
func (api gatewayAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// PutBucket does not have any bucket action.
s3Error := checkRequestAuthType(r, "", "", globalMinioDefaultRegion)
if s3Error == ErrInvalidRegion {
// Clients like boto3 send putBucket() call signed with region that is configured.
s3Error = checkRequestAuthType(r, "", "", serverConfig.GetRegion())
}
if s3Error != ErrNone {
writeErrorResponse(w, s3Error, r.URL)
return
}
vars := router.Vars(r)
bucket := vars["bucket"]
// Validate if incoming location constraint is valid, reject
// requests which do not follow valid region requirements.
location, s3Error := parseLocationConstraint(r)
if s3Error != ErrNone {
writeErrorResponse(w, s3Error, r.URL)
return
}
// validating region here, because isValidLocationConstraint
// reads body which has been read already. So only validating
// region here.
serverRegion := serverConfig.GetRegion()
if serverRegion != location {
writeErrorResponse(w, ErrInvalidRegion, r.URL)
return
}
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
defer bucketLock.Unlock()
// Proceed to creating a bucket.
err := objectAPI.MakeBucketWithLocation(bucket, location)
if err != nil {
errorIf(err, "Unable to create a bucket.")
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Make sure to add Location information here only for bucket
w.Header().Set("Location", getLocation(r))
writeSuccessResponseHeadersOnly(w)
}
// DeleteBucketHandler - Delete bucket
func (api gatewayAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
objectAPI := api.ObjectAPI()

View File

@@ -61,7 +61,13 @@ var gatewayCmd = cli.Command{
Flags: append(serverFlags, cli.BoolFlag{
Name: "quiet",
Usage: "Disable startup banner.",
}),
},
cli.StringFlag{
Name: "endpoint",
Usage: "The endpoint.",
Value: "https://s3.amazonaws.com/",
},
),
HideHelpCommand: true,
}
@@ -70,6 +76,7 @@ type gatewayBackend string
const (
azureBackend gatewayBackend = "azure"
s3Backend gatewayBackend = "s3"
// Add more backends here.
)
@@ -89,11 +96,15 @@ func mustGetGatewayCredsFromEnv() (accessKey, secretKey string) {
//
// - Azure Blob Storage.
// - Add your favorite backend here.
func newGatewayLayer(backendType, endPoint, accessKey, secretKey string, secure bool) (GatewayLayer, error) {
if gatewayBackend(backendType) != azureBackend {
return nil, fmt.Errorf("Unrecognized backend type %s", backendType)
func newGatewayLayer(backendType, endpoint, accessKey, secretKey string, secure bool) (GatewayLayer, error) {
switch gatewayBackend(backendType) {
case azureBackend:
return newAzureLayer(endpoint, accessKey, secretKey, secure)
case s3Backend:
return newS3Gateway(endpoint, accessKey, secretKey, secure)
}
return newAzureLayer(endPoint, accessKey, secretKey, secure)
return nil, fmt.Errorf("Unrecognized backend type %s", backendType)
}
// Initialize a new gateway config.
@@ -130,6 +141,7 @@ func parseGatewayEndpoint(arg string) (endPoint string, secure bool, err error)
// Default connection will be "secure".
arg = "https://" + arg
}
u, err := url.Parse(arg)
if err != nil {
return "", false, err
@@ -230,6 +242,8 @@ func gatewayMain(ctx *cli.Context) {
mode := ""
if gatewayBackend(backendType) == azureBackend {
mode = globalMinioModeGatewayAzure
} else if gatewayBackend(backendType) == s3Backend {
mode = globalMinioModeGatewayS3
}
checkUpdate(mode)
apiEndpoints := getAPIEndpoints(apiServer.Addr)

View File

@@ -20,15 +20,19 @@ import (
"io"
router "github.com/gorilla/mux"
"github.com/minio/minio-go/pkg/policy"
)
// GatewayLayer - Interface to implement gateway mode.
type GatewayLayer interface {
ObjectLayer
MakeBucketWithLocation(bucket, location string) error
AnonGetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error)
AnonGetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error)
SetBucketPolicies(string, []BucketAccessPolicy) error
GetBucketPolicies(string) ([]BucketAccessPolicy, error)
SetBucketPolicies(string, policy.BucketAccessPolicy) error
GetBucketPolicies(string) (policy.BucketAccessPolicy, error)
DeleteBucketPolicies(string) error
AnonListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error)

View File

@@ -39,6 +39,7 @@ const (
globalMinioModeXL = "mode-server-xl"
globalMinioModeDistXL = "mode-server-distributed-xl"
globalMinioModeGatewayAzure = "mode-gateway-azure"
globalMinioModeGatewayS3 = "mode-gateway-s3"
// Add new global values here.
)

View File

@@ -221,6 +221,30 @@ type ListObjectsInfo struct {
Prefixes []string
}
// ListObjectsV2Info - container for list objects version 2.
type ListObjectsV2Info struct {
// Indicates whether the returned list objects response is truncated. A
// value of true indicates that the list was truncated. The list can be truncated
// if the number of objects exceeds the limit allowed or specified
// by max keys.
IsTruncated bool
// When response is truncated (the IsTruncated element value in the response
// is true), you can use the key name in this field as marker in the subsequent
// request to get next set of objects.
//
// NOTE: This element is returned only if you have delimiter request parameter
// specified.
ContinuationToken string
NextContinuationToken string
// List of objects info for this request.
Objects []ObjectInfo
// List of prefixes for this request.
Prefixes []string
}
// PartInfo - represents individual part metadata.
type PartInfo struct {
// Part number that identifies the part. This is a positive integer between

View File

@@ -144,6 +144,13 @@ func (e BucketNotFound) Error() string {
return "Bucket not found: " + e.Bucket
}
// BucketAlreadyOwnedByYou already owned by you.
type BucketAlreadyOwnedByYou GenericError
func (e BucketAlreadyOwnedByYou) Error() string {
return "Bucket already owned by you: " + e.Bucket
}
// BucketNotEmpty bucket is not empty.
type BucketNotEmpty GenericError
@@ -321,6 +328,13 @@ func (e NotImplemented) Error() string {
return "Not Implemented"
}
// NotSupported If a feature is not supported
type NotSupported struct{}
func (e NotSupported) Error() string {
return "Not Supported"
}
// PolicyNesting - policy nesting conflict.
type PolicyNesting struct{}
@@ -328,6 +342,13 @@ func (e PolicyNesting) Error() string {
return "New bucket policy conflicts with an existing policy. Please try again with new prefix."
}
// PolicyNotFound - policy not found
type PolicyNotFound GenericError
func (e PolicyNotFound) Error() string {
return "Policy not found"
}
// Check if error type is IncompleteBody.
func isErrIncompleteBody(err error) bool {
err = errorCause(err)

83
cmd/s3-layer-anonymous.go Normal file
View File

@@ -0,0 +1,83 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "io"
// AnonGetObject - Get object anonymously
func (l *s3Gateway) AnonGetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
object, err := l.anonClient.GetObject(bucket, key)
if err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
defer object.Close()
object.Seek(startOffset, io.SeekStart)
if _, err := io.CopyN(writer, object, length); err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
return nil
}
// AnonGetObjectInfo - Get object info anonymously
func (l *s3Gateway) AnonGetObjectInfo(bucket string, object string) (ObjectInfo, error) {
oi, err := l.anonClient.StatObject(bucket, object)
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), bucket, object)
}
return fromMinioClientObjectInfo(bucket, oi), nil
}
// AnonListObjects - List objects anonymously
func (l *s3Gateway) AnonListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
result, err := l.anonClient.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return ListObjectsInfo{}, s3ToObjectError(traceError(err), bucket)
}
return fromMinioClientListBucketResult(bucket, result), nil
}
// AnonGetBucketInfo - Get bucket metadata anonymously.
func (l *s3Gateway) AnonGetBucketInfo(bucket string) (BucketInfo, error) {
if exists, err := l.anonClient.BucketExists(bucket); err != nil {
return BucketInfo{}, s3ToObjectError(traceError(err), bucket)
} else if !exists {
return BucketInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
buckets, err := l.anonClient.ListBuckets()
if err != nil {
return BucketInfo{}, s3ToObjectError(traceError(err), bucket)
}
for _, bi := range buckets {
if bi.Name != bucket {
continue
}
return BucketInfo{
Name: bi.Name,
Created: bi.CreationDate,
}, nil
}
return BucketInfo{}, traceError(BucketNotFound{Bucket: bucket})
}

View File

@@ -0,0 +1,42 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
// HealBucket - Not relevant.
func (l *s3Gateway) HealBucket(bucket string) error {
return traceError(NotImplemented{})
}
// ListBucketsHeal - Not relevant.
func (l *s3Gateway) ListBucketsHeal() (buckets []BucketInfo, err error) {
return []BucketInfo{}, traceError(NotImplemented{})
}
// HealObject - Not relevant.
func (l *s3Gateway) HealObject(bucket string, object string) (int, int, error) {
return 0, 0, traceError(NotImplemented{})
}
// ListObjectsHeal - Not relevant.
func (l *s3Gateway) ListObjectsHeal(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, traceError(NotImplemented{})
}
// ListUploadsHeal - Not relevant.
func (l *s3Gateway) ListUploadsHeal(bucket string, prefix string, marker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return ListMultipartsInfo{}, traceError(NotImplemented{})
}

571
cmd/s3-layer.go Normal file
View File

@@ -0,0 +1,571 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"crypto/sha256"
"hash"
"io"
"net/http"
"path"
"encoding/hex"
minio "github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/policy"
)
// Convert Minio errors to minio object layer errors.
func s3ToObjectError(err error, params ...string) error {
if err == nil {
return nil
}
e, ok := err.(*Error)
if !ok {
// Code should be fixed if this function is called without doing traceError()
// Else handling different situations in this function makes this function complicated.
errorIf(err, "Expected type *Error")
return err
}
err = e.e
bucket := ""
object := ""
if len(params) >= 1 {
bucket = params[0]
}
if len(params) == 2 {
object = params[1]
}
minioErr, ok := err.(minio.ErrorResponse)
if !ok {
// We don't interpret non Minio errors. As minio errors will
// have StatusCode to help to convert to object errors.
return e
}
switch minioErr.Code {
case "BucketAlreadyOwnedByYou":
err = BucketAlreadyOwnedByYou{}
case "BucketNotEmpty":
err = BucketNotEmpty{}
case "NoSuchBucketPolicy":
err = PolicyNotFound{}
case "InvalidBucketName":
err = BucketNameInvalid{Bucket: bucket}
case "NoSuchBucket":
err = BucketNotFound{Bucket: bucket}
case "NoSuchKey":
if object != "" {
err = ObjectNotFound{Bucket: bucket, Object: object}
} else {
err = BucketNotFound{Bucket: bucket}
}
case "XMinioInvalidObjectName":
err = ObjectNameInvalid{}
case "AccessDenied":
err = PrefixAccessDenied{
Bucket: bucket,
Object: object,
}
case "XAmzContentSHA256Mismatch":
err = SHA256Mismatch{}
}
e.e = err
return e
}
// s3Gateway - Implements gateway for Minio and S3 compatible object storage servers.
type s3Gateway struct {
Client *minio.Core
anonClient *minio.Core
}
// newS3Gateway returns s3 gatewaylayer
func newS3Gateway(endpoint string, accessKey, secretKey string, secure bool) (GatewayLayer, error) {
// Initialize minio client object.
client, err := minio.NewCore(endpoint, accessKey, secretKey, secure)
if err != nil {
return nil, err
}
anonClient, err := minio.NewCore(endpoint, "", "", secure)
if err != nil {
return nil, err
}
return &s3Gateway{
Client: client,
anonClient: anonClient,
}, nil
}
// Shutdown - save any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *s3Gateway) Shutdown() error {
// TODO
return nil
}
// StorageInfo - Not relevant to S3 backend.
func (l *s3Gateway) StorageInfo() StorageInfo {
return StorageInfo{}
}
// MakeBucket - Create a new container on S3 backend.
func (l *s3Gateway) MakeBucket(bucket string) error {
// will never be called, only satisfy ObjectLayer interface
return traceError(NotImplemented{})
}
// MakeBucket - Create a new container on S3 backend.
func (l *s3Gateway) MakeBucketWithLocation(bucket, location string) error {
err := l.Client.MakeBucket(bucket, location)
if err != nil {
return s3ToObjectError(traceError(err), bucket)
}
return err
}
// GetBucketInfo - Get bucket metadata..
func (l *s3Gateway) GetBucketInfo(bucket string) (BucketInfo, error) {
buckets, err := l.Client.ListBuckets()
if err != nil {
return BucketInfo{}, s3ToObjectError(traceError(err), bucket)
}
for _, bi := range buckets {
if bi.Name != bucket {
continue
}
return BucketInfo{
Name: bi.Name,
Created: bi.CreationDate,
}, nil
}
return BucketInfo{}, traceError(BucketNotFound{Bucket: bucket})
}
// ListBuckets - Lists all S3 buckets
func (l *s3Gateway) ListBuckets() ([]BucketInfo, error) {
buckets, err := l.Client.ListBuckets()
if err != nil {
return nil, err
}
b := make([]BucketInfo, len(buckets))
for i, bi := range buckets {
b[i] = BucketInfo{
Name: bi.Name,
Created: bi.CreationDate,
}
}
return b, err
}
// DeleteBucket - delete a bucket on S3
func (l *s3Gateway) DeleteBucket(bucket string) error {
err := l.Client.RemoveBucket(bucket)
if err != nil {
return s3ToObjectError(traceError(err), bucket)
}
return nil
}
// ListObjects - lists all blobs in S3 bucket filtered by prefix
func (l *s3Gateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
result, err := l.Client.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return ListObjectsInfo{}, s3ToObjectError(traceError(err), bucket)
}
return fromMinioClientListBucketResult(bucket, result), nil
}
// ListObjectsV2 - lists all blobs in S3 bucket filtered by prefix
func (l *s3Gateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) {
result, err := l.Client.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys)
if err != nil {
return ListObjectsV2Info{}, s3ToObjectError(traceError(err), bucket)
}
return fromMinioClientListBucketV2Result(bucket, result), nil
}
// fromMinioClientListBucketV2Result - convert minio ListBucketResult to ListObjectsInfo
func fromMinioClientListBucketV2Result(bucket string, result minio.ListBucketV2Result) ListObjectsV2Info {
objects := make([]ObjectInfo, len(result.Contents))
for i, oi := range result.Contents {
objects[i] = fromMinioClientObjectInfo(bucket, oi)
}
prefixes := make([]string, len(result.CommonPrefixes))
for i, p := range result.CommonPrefixes {
prefixes[i] = p.Prefix
}
return ListObjectsV2Info{
IsTruncated: result.IsTruncated,
Prefixes: prefixes,
Objects: objects,
ContinuationToken: result.ContinuationToken,
NextContinuationToken: result.NextContinuationToken,
}
}
// fromMinioClientListBucketResult - convert minio ListBucketResult to ListObjectsInfo
func fromMinioClientListBucketResult(bucket string, result minio.ListBucketResult) ListObjectsInfo {
objects := make([]ObjectInfo, len(result.Contents))
for i, oi := range result.Contents {
objects[i] = fromMinioClientObjectInfo(bucket, oi)
}
prefixes := make([]string, len(result.CommonPrefixes))
for i, p := range result.CommonPrefixes {
prefixes[i] = p.Prefix
}
return ListObjectsInfo{
IsTruncated: result.IsTruncated,
NextMarker: result.NextMarker,
Prefixes: prefixes,
Objects: objects,
}
}
// GetObject - reads an object from S3. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *s3Gateway) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
object, err := l.Client.GetObject(bucket, key)
if err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
defer object.Close()
if _, err := object.Seek(startOffset, io.SeekStart); err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
if _, err := io.CopyN(writer, object, length); err != nil {
return s3ToObjectError(traceError(err), bucket, key)
}
return nil
}
// fromMinioClientObjectInfo -- converts minio ObjectInfo to gateway ObjectInfo
func fromMinioClientObjectInfo(bucket string, oi minio.ObjectInfo) ObjectInfo {
userDefined := fromMinioClientMetadata(oi.Metadata)
userDefined["Content-Type"] = oi.ContentType
return ObjectInfo{
Bucket: bucket,
Name: oi.Key,
ModTime: oi.LastModified,
Size: oi.Size,
MD5Sum: oi.ETag,
UserDefined: userDefined,
ContentType: oi.ContentType,
ContentEncoding: oi.Metadata.Get("Content-Encoding"),
}
}
// GetObjectInfo - reads object info and replies back ObjectInfo
func (l *s3Gateway) GetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
oi, err := l.Client.StatObject(bucket, object)
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), bucket, object)
}
return fromMinioClientObjectInfo(bucket, oi), nil
}
// PutObject - Create a new object with the incoming data,
func (l *s3Gateway) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
var sha256Writer hash.Hash
sha256sumBytes := []byte{}
teeReader := data
if sha256sum == "" {
} else if b, err := hex.DecodeString(sha256sum); err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), bucket, object)
} else {
sha256sumBytes = b
sha256Writer = sha256.New()
teeReader = io.TeeReader(data, sha256Writer)
}
delete(metadata, "md5Sum")
oi, err := l.Client.PutObject(bucket, object, size, teeReader, nil, sha256sumBytes, toMinioClientMetadata(metadata))
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), bucket, object)
}
if sha256sum != "" {
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
if newSHA256sum != sha256sum {
l.Client.RemoveObject(bucket, object)
return ObjectInfo{}, traceError(SHA256Mismatch{})
}
}
return fromMinioClientObjectInfo(bucket, oi), nil
}
// CopyObject - Copies a blob from source container to destination container.
func (l *s3Gateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, metadata map[string]string) (ObjectInfo, error) {
err := l.Client.CopyObject(destBucket, destObject, path.Join(srcBucket, srcObject), minio.CopyConditions{})
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), srcBucket, srcObject)
}
oi, err := l.GetObjectInfo(destBucket, destObject)
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), destBucket, destObject)
}
return oi, nil
}
// DeleteObject - Deletes a blob in bucket
func (l *s3Gateway) DeleteObject(bucket string, object string) error {
err := l.Client.RemoveObject(bucket, object)
if err != nil {
return s3ToObjectError(traceError(err), bucket, object)
}
return nil
}
// fromMinioClientUploadMetadata converts ObjectMultipartInfo to uploadMetadata
func fromMinioClientUploadMetadata(omi minio.ObjectMultipartInfo) uploadMetadata {
return uploadMetadata{
Object: omi.Key,
UploadID: omi.UploadID,
Initiated: omi.Initiated,
}
}
// fromMinioClientListMultipartsInfo converts minio ListMultipartUploadsResult to ListMultipartsInfo
func fromMinioClientListMultipartsInfo(lmur minio.ListMultipartUploadsResult) ListMultipartsInfo {
uploads := make([]uploadMetadata, len(lmur.Uploads))
for i, um := range lmur.Uploads {
uploads[i] = fromMinioClientUploadMetadata(um)
}
commonPrefixes := make([]string, len(lmur.CommonPrefixes))
for i, cp := range lmur.CommonPrefixes {
commonPrefixes[i] = cp.Prefix
}
return ListMultipartsInfo{
KeyMarker: lmur.KeyMarker,
UploadIDMarker: lmur.UploadIDMarker,
NextKeyMarker: lmur.NextKeyMarker,
NextUploadIDMarker: lmur.NextUploadIDMarker,
MaxUploads: int(lmur.MaxUploads),
IsTruncated: lmur.IsTruncated,
Uploads: uploads,
Prefix: lmur.Prefix,
Delimiter: lmur.Delimiter,
CommonPrefixes: commonPrefixes,
EncodingType: lmur.EncodingType,
}
}
// ListMultipartUploads - lists all multipart uploads.
func (l *s3Gateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result, err := l.Client.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
if err != nil {
return ListMultipartsInfo{}, err
}
return fromMinioClientListMultipartsInfo(result), nil
}
// fromMinioClientMetadata converts minio metadata to map[string]string
func fromMinioClientMetadata(metadata map[string][]string) map[string]string {
mm := map[string]string{}
for k, v := range metadata {
mm[http.CanonicalHeaderKey(k)] = v[0]
}
return mm
}
// toMinioClientMetadata converts metadata to map[string][]string
func toMinioClientMetadata(metadata map[string]string) map[string][]string {
mm := map[string][]string{}
for k, v := range metadata {
mm[http.CanonicalHeaderKey(k)] = []string{v}
}
return mm
}
// NewMultipartUpload - upload object in multiple parts
func (l *s3Gateway) NewMultipartUpload(bucket string, object string, metadata map[string]string) (uploadID string, err error) {
return l.Client.NewMultipartUpload(bucket, object, toMinioClientMetadata(metadata))
}
// CopyObjectPart - copy part of object to other bucket and object
func (l *s3Gateway) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) {
// FIXME: implement CopyObjectPart
return PartInfo{}, traceError(NotImplemented{})
}
// fromMinioClientObjectPart - converts minio ObjectPart to PartInfo
func fromMinioClientObjectPart(op minio.ObjectPart) PartInfo {
return PartInfo{
Size: op.Size,
ETag: op.ETag,
LastModified: op.LastModified,
PartNumber: op.PartNumber,
}
}
// PutObjectPart puts a part of object in bucket
func (l *s3Gateway) PutObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) {
md5HexBytes, err := hex.DecodeString(md5Hex)
if err != nil {
return PartInfo{}, err
}
sha256sumBytes, err := hex.DecodeString(sha256sum)
if err != nil {
return PartInfo{}, err
}
info, err := l.Client.PutObjectPart(bucket, object, uploadID, partID, size, data, md5HexBytes, sha256sumBytes)
if err != nil {
return PartInfo{}, err
}
return fromMinioClientObjectPart(info), nil
}
// fromMinioClientObjectParts - converts minio ObjectPart to PartInfo
func fromMinioClientObjectParts(parts []minio.ObjectPart) []PartInfo {
toParts := make([]PartInfo, len(parts))
for i, part := range parts {
toParts[i] = fromMinioClientObjectPart(part)
}
return toParts
}
// fromMinioClientListPartsInfo converts minio ListObjectPartsResult to ListPartsInfo
func fromMinioClientListPartsInfo(lopr minio.ListObjectPartsResult) ListPartsInfo {
return ListPartsInfo{
UploadID: lopr.UploadID,
Bucket: lopr.Bucket,
Object: lopr.Key,
StorageClass: "",
PartNumberMarker: lopr.PartNumberMarker,
NextPartNumberMarker: lopr.NextPartNumberMarker,
MaxParts: lopr.MaxParts,
IsTruncated: lopr.IsTruncated,
EncodingType: lopr.EncodingType,
Parts: fromMinioClientObjectParts(lopr.ObjectParts),
}
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *s3Gateway) ListObjectParts(bucket string, object string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
result, err := l.Client.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return ListPartsInfo{}, err
}
return fromMinioClientListPartsInfo(result), nil
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *s3Gateway) AbortMultipartUpload(bucket string, object string, uploadID string) error {
return l.Client.AbortMultipartUpload(bucket, object, uploadID)
}
// toMinioClientCompletePart converts completePart to minio CompletePart
func toMinioClientCompletePart(part completePart) minio.CompletePart {
return minio.CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
}
}
// toMinioClientCompletePart converts []completePart to minio []CompletePart
func toMinioClientCompleteParts(parts []completePart) []minio.CompletePart {
mparts := make([]minio.CompletePart, len(parts))
for i, part := range parts {
mparts[i] = toMinioClientCompletePart(part)
}
return mparts
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
func (l *s3Gateway) CompleteMultipartUpload(bucket string, object string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) {
err := l.Client.CompleteMultipartUpload(bucket, object, uploadID, toMinioClientCompleteParts(uploadedParts))
if err != nil {
return ObjectInfo{}, s3ToObjectError(traceError(err), bucket, object)
}
return l.GetObjectInfo(bucket, object)
}
// SetBucketPolicies - Set policy on bucket
func (l *s3Gateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAccessPolicy) error {
if err := l.Client.PutBucketPolicy(bucket, policyInfo); err != nil {
return s3ToObjectError(traceError(err), bucket, "")
}
return nil
}
// GetBucketPolicies - Get policy on bucket
func (l *s3Gateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
policyInfo, err := l.Client.GetBucketPolicy(bucket)
if err != nil {
return policy.BucketAccessPolicy{}, s3ToObjectError(traceError(err), bucket, "")
}
return policyInfo, nil
}
// DeleteBucketPolicies - Delete all policies on bucket
func (l *s3Gateway) DeleteBucketPolicies(bucket string) error {
if err := l.Client.PutBucketPolicy(bucket, policy.BucketAccessPolicy{}); err != nil {
return s3ToObjectError(traceError(err), bucket, "")
}
return nil
}

17
cmd/s3-layer_test.go Normal file
View File

@@ -0,0 +1,17 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd