Merge remote-tracking branch 'origin/feature-gcs'

This commit is contained in:
Harshavardhana
2017-06-22 11:52:12 -07:00
296 changed files with 104333 additions and 425 deletions

View File

@@ -115,6 +115,7 @@ const (
ErrBucketAlreadyOwnedByYou
ErrInvalidDuration
ErrNotSupported
ErrBucketAlreadyExists
// Add new error codes here.
// Bucket notification related errors.
@@ -355,6 +356,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{
Description: "The bucket you tried to delete is not empty",
HTTPStatusCode: http.StatusConflict,
},
ErrBucketAlreadyExists: {
Code: "BucketAlreadyExists",
Description: "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Please select a different name and try again.",
HTTPStatusCode: http.StatusConflict,
},
ErrAllAccessDisabled: {
Code: "AllAccessDisabled",
Description: "All access to this bucket has been disabled.",
@@ -663,6 +669,8 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
apiErr = ErrStorageFull
case BadDigest:
apiErr = ErrBadDigest
case AllAccessDisabled:
apiErr = ErrAllAccessDisabled
case IncompleteBody:
apiErr = ErrIncompleteBody
case ObjectExistsAsDirectory:
@@ -677,6 +685,8 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
apiErr = ErrBucketAlreadyOwnedByYou
case BucketNotEmpty:
apiErr = ErrBucketNotEmpty
case BucketAlreadyExists:
apiErr = ErrBucketAlreadyExists
case BucketExists:
apiErr = ErrBucketAlreadyOwnedByYou
case ObjectNotFound:
@@ -701,6 +711,8 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
apiErr = ErrNoSuchUpload
case PartTooSmall:
apiErr = ErrEntityTooSmall
case SignatureDoesNotMatch:
apiErr = ErrSignatureDoesNotMatch
case SHA256Mismatch:
apiErr = ErrContentSHA256Mismatch
case ObjectTooLarge:

117
cmd/common-main.go Normal file
View File

@@ -0,0 +1,117 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"errors"
"os"
"path/filepath"
"strings"
"time"
"github.com/minio/cli"
)
// Check for updates and print a notification message
func checkUpdate(mode string) {
// Its OK to ignore any errors during getUpdateInfo() here.
if older, downloadURL, err := getUpdateInfo(1*time.Second, mode); err == nil {
if updateMsg := computeUpdateMessage(downloadURL, older); updateMsg != "" {
log.Println(updateMsg)
}
}
}
func enableLoggers() {
fileLogTarget := serverConfig.Logger.GetFile()
if fileLogTarget.Enable {
err := InitFileLogger(&fileLogTarget)
fatalIf(err, "Unable to initialize file logger")
log.AddTarget(fileLogTarget)
}
consoleLogTarget := serverConfig.Logger.GetConsole()
if consoleLogTarget.Enable {
InitConsoleLogger(&consoleLogTarget)
}
log.SetConsoleTarget(consoleLogTarget)
}
func initConfig() {
// Config file does not exist, we create it fresh and return upon success.
if isFile(getConfigFile()) {
fatalIf(migrateConfig(), "Config migration failed.")
fatalIf(loadConfig(), "Unable to load config version: '%s'.", v19)
} else {
fatalIf(newConfig(), "Unable to initialize minio config for the first time.")
log.Println("Created minio configuration file successfully at " + getConfigDir())
}
}
func handleCommonCmdArgs(ctx *cli.Context) {
// Set configuration directory.
{
// Get configuration directory from command line argument.
configDir := ctx.String("config-dir")
if !ctx.IsSet("config-dir") && ctx.GlobalIsSet("config-dir") {
configDir = ctx.GlobalString("config-dir")
}
if configDir == "" {
fatalIf(errors.New("empty directory"), "Configuration directory cannot be empty.")
}
// Disallow relative paths, figure out absolute paths.
configDirAbs, err := filepath.Abs(configDir)
fatalIf(err, "Unable to fetch absolute path for config directory %s", configDir)
setConfigDir(configDirAbs)
}
}
func handleCommonEnvVars() {
// Start profiler if env is set.
if profiler := os.Getenv("_MINIO_PROFILER"); profiler != "" {
globalProfiler = startProfiler(profiler)
}
// Check if object cache is disabled.
globalXLObjCacheDisabled = strings.EqualFold(os.Getenv("_MINIO_CACHE"), "off")
accessKey := os.Getenv("MINIO_ACCESS_KEY")
secretKey := os.Getenv("MINIO_SECRET_KEY")
if accessKey != "" && secretKey != "" {
cred, err := createCredential(accessKey, secretKey)
fatalIf(err, "Invalid access/secret Key set in environment.")
// credential Envs are set globally.
globalIsEnvCreds = true
globalActiveCred = cred
}
if browser := os.Getenv("MINIO_BROWSER"); browser != "" {
browserFlag, err := ParseBrowserFlag(browser)
if err != nil {
fatalIf(errors.New("invalid value"), "Unknown value %s in MINIO_BROWSER environment variable.", browser)
}
// browser Envs are set globally, this does not represent
// if browser is turned off or on.
globalIsEnvBrowser = true
globalIsBrowserEnabled = bool(browserFlag)
}
}

49
cmd/gateway-anonymous.go Normal file
View File

@@ -0,0 +1,49 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "net/http"
func anonErrToObjectErr(statusCode int, params ...string) error {
bucket := ""
object := ""
if len(params) >= 1 {
bucket = params[0]
}
if len(params) == 2 {
object = params[1]
}
switch statusCode {
case http.StatusNotFound:
if object != "" {
return ObjectNotFound{bucket, object}
}
return BucketNotFound{Bucket: bucket}
case http.StatusBadRequest:
if object != "" {
return ObjectNameInvalid{bucket, object}
}
return BucketNameInvalid{Bucket: bucket}
case http.StatusForbidden:
fallthrough
case http.StatusUnauthorized:
return AllAccessDisabled{bucket, object}
}
return errUnexpected
}

View File

@@ -153,15 +153,26 @@ func azureToObjectError(err error, params ...string) error {
return e
}
// Inits azure blob storage client and returns azureObjects.
func newAzureLayer(endPoint string, account, key string, secure bool) (GatewayLayer, error) {
if endPoint == "" {
endPoint = storage.DefaultBaseURL
// Inits azure blob storage client and returns AzureObjects.
func newAzureLayer(host string) (GatewayLayer, error) {
var err error
var endpoint = storage.DefaultBaseURL
var secure = true
// If user provided some parameters
if host != "" {
endpoint, secure, err = parseGatewayEndpoint(host)
if err != nil {
return nil, err
}
}
c, err := storage.NewClient(account, key, endPoint, globalAzureAPIVersion, secure)
creds := serverConfig.GetCredential()
c, err := storage.NewClient(creds.AccessKey, creds.SecretKey, endpoint, globalAzureAPIVersion, secure)
if err != nil {
return &azureObjects{}, err
}
return &azureObjects{
client: c.GetBlobService(),
metaInfo: azureMultipartMetaInfo{
@@ -174,7 +185,6 @@ func newAzureLayer(endPoint string, account, key string, secure bool) (GatewayLa
// Shutdown - save any gateway metadata to disk
// if necessary and reload upon next restart.
func (a *azureObjects) Shutdown() error {
// TODO
return nil
}
@@ -653,31 +663,6 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string,
return a.GetObjectInfo(bucket, object)
}
func anonErrToObjectErr(statusCode int, params ...string) error {
bucket := ""
object := ""
if len(params) >= 1 {
bucket = params[0]
}
if len(params) == 2 {
object = params[1]
}
switch statusCode {
case http.StatusNotFound:
if object != "" {
return ObjectNotFound{bucket, object}
}
return BucketNotFound{Bucket: bucket}
case http.StatusBadRequest:
if object != "" {
return ObjectNameInvalid{bucket, object}
}
return BucketNameInvalid{Bucket: bucket}
}
return errUnexpected
}
// Copied from github.com/Azure/azure-sdk-for-go/storage/blob.go
func azureListBlobsGetParameters(p storage.ListBlobsParameters) url.Values {
out := url.Values{}

View File

@@ -0,0 +1,131 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func toGCSPublicURL(bucket, object string) string {
return fmt.Sprintf("https://storage.googleapis.com/%s/%s", bucket, object)
}
// AnonPutObject creates a new object anonymously with the incoming data,
func (l *gcsGateway) AnonPutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
return ObjectInfo{}, NotImplemented{}
}
// AnonGetObject - Get object anonymously
func (l *gcsGateway) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error {
req, err := http.NewRequest("GET", toGCSPublicURL(bucket, object), nil)
if err != nil {
return gcsToObjectError(traceError(err), bucket, object)
}
if length > 0 && startOffset > 0 {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
} else if startOffset > 0 {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-", startOffset))
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return gcsToObjectError(traceError(err), bucket, object)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
return gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket, object)), bucket, object)
}
_, err = io.Copy(writer, resp.Body)
return gcsToObjectError(traceError(err), bucket, object)
}
// AnonGetObjectInfo - Get object info anonymously
func (l *gcsGateway) AnonGetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
resp, err := http.Head(toGCSPublicURL(bucket, object))
if err != nil {
return objInfo, gcsToObjectError(traceError(err), bucket, object)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return objInfo, gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket, object)), bucket, object)
}
var contentLength int64
contentLengthStr := resp.Header.Get("Content-Length")
if contentLengthStr != "" {
contentLength, err = strconv.ParseInt(contentLengthStr, 0, 64)
if err != nil {
return objInfo, gcsToObjectError(traceError(errUnexpected), bucket, object)
}
}
t, err := time.Parse(time.RFC1123, resp.Header.Get("Last-Modified"))
if err != nil {
return objInfo, traceError(err)
}
objInfo.ModTime = t
objInfo.Bucket = bucket
objInfo.UserDefined = make(map[string]string)
if resp.Header.Get("Content-Encoding") != "" {
objInfo.UserDefined["Content-Encoding"] = resp.Header.Get("Content-Encoding")
}
objInfo.UserDefined["Content-Type"] = resp.Header.Get("Content-Type")
objInfo.ETag = resp.Header.Get("Etag")
objInfo.ModTime = t
objInfo.Name = object
objInfo.Size = contentLength
return
}
// AnonListObjects - List objects anonymously
func (l *gcsGateway) AnonListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
result, err := l.anonClient.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return ListObjectsInfo{}, s3ToObjectError(traceError(err), bucket)
}
return fromMinioClientListBucketResult(bucket, result), nil
}
// AnonGetBucketInfo - Get bucket metadata anonymously.
func (l *gcsGateway) AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error) {
resp, err := http.Head(toGCSPublicURL(bucket, ""))
if err != nil {
return bucketInfo, gcsToObjectError(traceError(err))
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return bucketInfo, gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket)), bucket)
}
// Last-Modified date being returned by GCS
return BucketInfo{
Name: bucket,
}, nil
}

27
cmd/gateway-gcs-errors.go Normal file
View File

@@ -0,0 +1,27 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "errors"
var (
// ProjectID format is not valid.
errGCSInvalidProjectID = errors.New("GCS project id is either empty or invalid")
// Multipart identifier is not in the correct form.
errGCSNotValidMultipartIdentifier = errors.New("Not a valid multipart identifier")
)

View File

@@ -0,0 +1,42 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
// HealBucket - Not relevant.
func (l *gcsGateway) HealBucket(bucket string) error {
return traceError(NotImplemented{})
}
// ListBucketsHeal - Not relevant.
func (l *gcsGateway) ListBucketsHeal() (buckets []BucketInfo, err error) {
return []BucketInfo{}, traceError(NotImplemented{})
}
// HealObject - Not relevant.
func (l *gcsGateway) HealObject(bucket string, object string) (int, int, error) {
return 0, 0, traceError(NotImplemented{})
}
// ListObjectsHeal - Not relevant.
func (l *gcsGateway) ListObjectsHeal(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, traceError(NotImplemented{})
}
// ListUploadsHeal - Not relevant.
func (l *gcsGateway) ListUploadsHeal(bucket string, prefix string, marker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return ListMultipartsInfo{}, traceError(NotImplemented{})
}

896
cmd/gateway-gcs.go Normal file
View File

@@ -0,0 +1,896 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"io"
"regexp"
"strings"
"time"
"cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
minio "github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/policy"
)
const (
// gcsMinioMeta is used for multiparts. We have "minio.sys.temp" prefix so that
// listing on the GCS lists this entry in the end. Also in the gateway
// ListObjects we filter out this entry.
gcsMinioPath = "minio.sys.temp"
// Path where multipart objects are saved.
// If we change the backend format we will use a different url path like /multipart/v2
// but we will not migrate old data.
gcsMinioMultipartPathV1 = gcsMinioPath + "/multipart/v1"
// Multipart meta file.
gcsMinioMultipartMeta = "gcs.json"
// gcs.json version number
gcsMultipartMetaCurrentVersion = "1"
// token prefixed with GCS returned marker to differentiate
// from user supplied marker.
gcsTokenPrefix = "##minio"
)
// Stored in gcs.json - Contents of this file is not used anywhere. It can be
// used for debugging purposes.
type gcsMultipartMetaV1 struct {
Version string `json:"version"` // Version number
Bucket string `json:"bucket"` // Bucket name
Object string `json:"object"` // Object name
}
// Check if object prefix is "ZZZZ_Minio".
func isGCSPrefix(prefix string) bool {
return strings.TrimSuffix(prefix, slashSeparator) == gcsMinioPath
}
// Returns name of the multipart meta object.
func gcsMultipartMetaName(uploadID string) string {
return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta)
}
// Returns name of the part object.
func gcsMultipartDataName(uploadID, etag string) string {
return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPathV1, uploadID, etag)
}
// Convert Minio errors to minio object layer errors.
func gcsToObjectError(err error, params ...string) error {
if err == nil {
return nil
}
e, ok := err.(*Error)
if !ok {
// Code should be fixed if this function is called without doing traceError()
// Else handling different situations in this function makes this function complicated.
errorIf(err, "Expected type *Error")
return err
}
err = e.e
bucket := ""
object := ""
if len(params) >= 1 {
bucket = params[0]
}
if len(params) == 2 {
object = params[1]
}
// in some cases just a plain error is being returned
switch err.Error() {
case "storage: bucket doesn't exist":
err = BucketNotFound{
Bucket: bucket,
}
e.e = err
return e
case "storage: object doesn't exist":
err = ObjectNotFound{
Bucket: bucket,
Object: object,
}
e.e = err
return e
}
googleAPIErr, ok := err.(*googleapi.Error)
if !ok {
// We don't interpret non Minio errors. As minio errors will
// have StatusCode to help to convert to object errors.
e.e = err
return e
}
if len(googleAPIErr.Errors) == 0 {
e.e = err
return e
}
reason := googleAPIErr.Errors[0].Reason
message := googleAPIErr.Errors[0].Message
switch reason {
case "required":
// Anonymous users does not have storage.xyz access to project 123.
fallthrough
case "keyInvalid":
fallthrough
case "forbidden":
err = PrefixAccessDenied{
Bucket: bucket,
Object: object,
}
case "invalid":
err = BucketNameInvalid{
Bucket: bucket,
}
case "notFound":
if object != "" {
err = ObjectNotFound{
Bucket: bucket,
Object: object,
}
} else {
err = BucketNotFound{
Bucket: bucket,
}
}
case "conflict":
if message == "You already own this bucket. Please select another name." {
err = BucketAlreadyOwnedByYou{
Bucket: bucket,
}
} else if message == "Sorry, that name is not available. Please try a different one." {
err = BucketAlreadyExists{
Bucket: bucket,
}
} else {
err = BucketNotEmpty{
Bucket: bucket,
}
}
default:
err = fmt.Errorf("Unsupported error reason: %s", reason)
}
e.e = err
return e
}
// gcsProjectIDRegex defines a valid gcs project id format
var gcsProjectIDRegex = regexp.MustCompile("^[a-z][a-z0-9-]{5,29}$")
// isValidGCSProjectId - checks if a given project id is valid or not.
// Project IDs must start with a lowercase letter and can have lowercase
// ASCII letters, digits or hyphens. Project IDs must be between 6 and 30 characters.
// Ref: https://cloud.google.com/resource-manager/reference/rest/v1/projects#Project (projectId section)
func isValidGCSProjectID(projectID string) bool {
return gcsProjectIDRegex.MatchString(projectID)
}
// gcsGateway - Implements gateway for Minio and GCS compatible object storage servers.
type gcsGateway struct {
client *storage.Client
anonClient *minio.Core
projectID string
ctx context.Context
}
const googleStorageEndpoint = "storage.googleapis.com"
// newGCSGateway returns gcs gatewaylayer
func newGCSGateway(projectID string) (GatewayLayer, error) {
ctx := context.Background()
// Initialize a GCS client.
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
// Initialize a anonymous client with minio core APIs.
anonClient, err := minio.NewCore(googleStorageEndpoint, "", "", true)
if err != nil {
return nil, err
}
return &gcsGateway{
client: client,
projectID: projectID,
ctx: ctx,
anonClient: anonClient,
}, nil
}
// Shutdown - save any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *gcsGateway) Shutdown() error {
return nil
}
// StorageInfo - Not relevant to GCS backend.
func (l *gcsGateway) StorageInfo() StorageInfo {
return StorageInfo{}
}
// MakeBucketWithLocation - Create a new container on GCS backend.
func (l *gcsGateway) MakeBucketWithLocation(bucket, location string) error {
bkt := l.client.Bucket(bucket)
// we'll default to the us multi-region in case of us-east-1
if location == "us-east-1" {
location = "us"
}
err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{
Location: location,
})
return gcsToObjectError(traceError(err), bucket)
}
// GetBucketInfo - Get bucket metadata..
func (l *gcsGateway) GetBucketInfo(bucket string) (BucketInfo, error) {
attrs, err := l.client.Bucket(bucket).Attrs(l.ctx)
if err != nil {
return BucketInfo{}, gcsToObjectError(traceError(err), bucket)
}
return BucketInfo{
Name: attrs.Name,
Created: attrs.Created,
}, nil
}
// ListBuckets lists all GCS buckets
func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) {
it := l.client.Buckets(l.ctx, l.projectID)
b := []BucketInfo{}
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return []BucketInfo{}, gcsToObjectError(traceError(err))
}
b = append(b, BucketInfo{
Name: attrs.Name,
Created: attrs.Created,
})
}
return b, nil
}
// DeleteBucket delete a bucket on GCS
func (l *gcsGateway) DeleteBucket(bucket string) error {
err := l.client.Bucket(bucket).Delete(l.ctx)
return gcsToObjectError(traceError(err), bucket)
}
func toGCSPageToken(name string) string {
length := uint16(len(name))
b := []byte{
0xa,
byte(length & 0xFF),
}
length = length >> 7
if length > 0 {
b = append(b, byte(length&0xFF))
}
b = append(b, []byte(name)...)
return base64.StdEncoding.EncodeToString(b)
}
// Returns true if marker was returned by GCS, i.e prefixed with
// ##minio by minio gcs gateway.
func isGCSMarker(marker string) bool {
return strings.HasPrefix(marker, gcsTokenPrefix)
}
// ListObjects - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
isTruncated := false
nextMarker := ""
prefixes := []string{}
// To accommodate S3-compatible applications using
// ListObjectsV1 to use object keys as markers to control the
// listing of objects, we use the following encoding scheme to
// distinguish between GCS continuation tokens and application
// supplied markers.
//
// - NextMarker in ListObjectsV1 response is constructed by
// prefixing "##minio" to the GCS continuation token,
// e.g, "##minioCgRvYmoz"
//
// - Application supplied markers are used as-is to list
// object keys that appear after it in the lexicographical order.
// If application is using GCS continuation token we should
// strip the gcsTokenPrefix we added.
gcsMarker := isGCSMarker(marker)
if gcsMarker {
it.PageInfo().Token = strings.TrimPrefix(marker, gcsTokenPrefix)
}
it.PageInfo().MaxSize = maxKeys
objects := []ObjectInfo{}
for {
if len(objects) >= maxKeys {
// check if there is one next object and
// if that one next object is our hidden
// metadata folder, then just break
// otherwise we've truncated the output
attrs, _ := it.Next()
if attrs == nil {
} else if isGCSPrefix(attrs.Prefix) {
break
}
isTruncated = true
break
}
attrs, err := it.Next()
if err == iterator.Done {
break
} else if err != nil {
return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix)
}
nextMarker = toGCSPageToken(attrs.Name)
if isGCSPrefix(attrs.Prefix) {
// we don't return our metadata prefix
continue
} else if attrs.Prefix != "" {
prefixes = append(prefixes, attrs.Prefix)
continue
} else if !gcsMarker && attrs.Name <= marker {
// if user supplied a marker don't append
// objects until we reach marker (and skip it).
continue
}
objects = append(objects, ObjectInfo{
Name: attrs.Name,
Bucket: attrs.Bucket,
ModTime: attrs.Updated,
Size: attrs.Size,
ETag: fmt.Sprintf("%d", attrs.CRC32C),
UserDefined: attrs.Metadata,
ContentType: attrs.ContentType,
ContentEncoding: attrs.ContentEncoding,
})
}
return ListObjectsInfo{
IsTruncated: isTruncated,
NextMarker: gcsTokenPrefix + nextMarker,
Prefixes: prefixes,
Objects: objects,
}, nil
}
// ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) {
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
isTruncated := false
nextMarker := ""
prefixes := []string{}
objects := []ObjectInfo{}
for {
if maxKeys < len(objects) {
isTruncated = true
nextMarker = it.PageInfo().Token
break
}
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix)
}
if attrs.Prefix != "" {
prefixes = append(prefixes, attrs.Prefix)
continue
}
objects = append(objects, fromGCSAttrsToObjectInfo(attrs))
}
return ListObjectsV2Info{
IsTruncated: isTruncated,
ContinuationToken: continuationToken,
NextContinuationToken: nextMarker,
Prefixes: prefixes,
Objects: objects,
}, nil
}
// GetObject - reads an object from GCS. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
// otherwise gcs will just return object not exist in case of non-existing bucket
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
return gcsToObjectError(traceError(err), bucket)
}
object := l.client.Bucket(bucket).Object(key)
r, err := object.NewRangeReader(l.ctx, startOffset, length)
if err != nil {
return gcsToObjectError(traceError(err), bucket, key)
}
defer r.Close()
if _, err := io.Copy(writer, r); err != nil {
return gcsToObjectError(traceError(err), bucket, key)
}
return nil
}
// fromGCSAttrsToObjectInfo converts GCS BucketAttrs to gateway ObjectInfo
func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo {
// All google cloud storage objects have a CRC32c hash, whereas composite objects may not have a MD5 hash
// Refer https://cloud.google.com/storage/docs/hashes-etags. Use CRC32C for ETag
return ObjectInfo{
Name: attrs.Name,
Bucket: attrs.Bucket,
ModTime: attrs.Updated,
Size: attrs.Size,
ETag: fmt.Sprintf("%d", attrs.CRC32C),
UserDefined: attrs.Metadata,
ContentType: attrs.ContentType,
ContentEncoding: attrs.ContentEncoding,
}
}
// GetObjectInfo - reads object info and replies back ObjectInfo
func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, error) {
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
// otherwise gcs will just return object not exist in case of non-existing bucket
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket)
}
attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object)
}
objInfo := fromGCSAttrsToObjectInfo(attrs)
objInfo.ETag = fmt.Sprintf("%d", attrs.CRC32C)
return objInfo, nil
}
// PutObject - Create a new object with the incoming data,
func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
// otherwise gcs will just return object not exist in case of non-existing bucket
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket)
}
reader := data
var sha256Writer hash.Hash
if sha256sum != "" {
sha256Writer = sha256.New()
reader = io.TeeReader(data, sha256Writer)
}
md5sum := metadata["etag"]
delete(metadata, "etag")
object := l.client.Bucket(bucket).Object(key)
w := object.NewWriter(l.ctx)
w.ContentType = metadata["content-type"]
w.ContentEncoding = metadata["content-encoding"]
if md5sum != "" {
var err error
w.MD5, err = hex.DecodeString(md5sum)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
}
w.Metadata = metadata
_, err := io.Copy(w, reader)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
err = w.Close()
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
attrs, err := object.Attrs(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
if sha256sum != "" {
if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum {
object.Delete(l.ctx)
return ObjectInfo{}, traceError(SHA256Mismatch{})
}
}
return fromGCSAttrsToObjectInfo(attrs), nil
}
// CopyObject - Copies a blob from source container to destination container.
func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, metadata map[string]string) (ObjectInfo, error) {
src := l.client.Bucket(srcBucket).Object(srcObject)
dst := l.client.Bucket(destBucket).Object(destObject)
attrs, err := dst.CopierFrom(src).Run(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), destBucket, destObject)
}
return fromGCSAttrsToObjectInfo(attrs), nil
}
// DeleteObject - Deletes a blob in bucket
func (l *gcsGateway) DeleteObject(bucket string, object string) error {
err := l.client.Bucket(bucket).Object(object).Delete(l.ctx)
if err != nil {
return gcsToObjectError(traceError(err), bucket, object)
}
return nil
}
// NewMultipartUpload - upload object in multiple parts
func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[string]string) (uploadID string, err error) {
// generate new uploadid
uploadID = mustGetUUID()
// generate name for part zero
meta := gcsMultipartMetaName(uploadID)
w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx)
w.ContentType = metadata["content-type"]
w.ContentEncoding = metadata["content-encoding"]
w.Metadata = metadata
content, err := json.Marshal(gcsMultipartMetaV1{gcsMultipartMetaCurrentVersion, bucket, key})
if err != nil {
return "", gcsToObjectError(traceError(err), bucket, key)
}
if _, err = w.Write(content); err != nil {
return "", gcsToObjectError(traceError(err), bucket, key)
}
if err = w.Close(); err != nil {
return "", gcsToObjectError(traceError(err), bucket, key)
}
return uploadID, nil
}
// ListMultipartUploads - lists all multipart uploads.
func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return ListMultipartsInfo{
KeyMarker: keyMarker,
UploadIDMarker: uploadIDMarker,
MaxUploads: maxUploads,
Prefix: prefix,
Delimiter: delimiter,
}, nil
}
// CopyObjectPart - copy part of object to other bucket and object
func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) {
return PartInfo{}, traceError(NotSupported{})
}
// PutObjectPart puts a part of object in bucket
func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
_, err := object.Attrs(l.ctx)
if err != nil {
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
var sha256Writer hash.Hash
// Generate random ETag.
etag := getMD5Hash([]byte(mustGetUUID()))
reader := data
if sha256sum != "" {
sha256Writer = sha256.New()
reader = io.TeeReader(data, sha256Writer)
}
dataName := gcsMultipartDataName(uploadID, etag)
object = l.client.Bucket(bucket).Object(dataName)
w := object.NewWriter(l.ctx)
// Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case
// where it tries to upload 0 bytes in the last chunk and get error from server.
w.ChunkSize = 0
if md5Hex != "" {
w.MD5, err = hex.DecodeString(md5Hex)
if err != nil {
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
}
_, err = io.Copy(w, reader)
if err != nil {
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
err = w.Close()
if err != nil {
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
if sha256sum != "" {
if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum {
object.Delete(l.ctx)
return PartInfo{}, traceError(SHA256Mismatch{})
}
}
return PartInfo{
PartNumber: partID,
ETag: etag,
LastModified: time.Now().UTC(),
Size: size,
}, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
_, err := object.Attrs(l.ctx)
if err != nil {
return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
return ListPartsInfo{}, nil
}
// Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up.
func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
_, err := object.Attrs(l.ctx)
if err != nil {
return gcsToObjectError(traceError(err), bucket, key)
}
prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPathV1, uploadID)
// iterate through all parts and delete them
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: prefix, Versions: false})
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return gcsToObjectError(traceError(err), bucket, key)
}
object := l.client.Bucket(bucket).Object(attrs.Name)
// Ignore the error as parallel AbortMultipartUpload might have deleted it.
object.Delete(l.ctx)
}
return nil
}
// AbortMultipartUpload aborts a ongoing multipart upload
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
return l.cleanupMultipartUpload(bucket, key, uploadID)
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
// Note that there is a limit (currently 32) to the number of components that can be composed in a single operation.
// There is a limit (currently 1024) to the total number of components for a given composite object. This means you can append to each object at most 1023 times.
// There is a per-project rate limit (currently 200) to the number of components you can compose per second. This rate counts both the components being appended to a composite object as well as the components being copied when the composite object of which they are a part is copied.
func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) {
meta := gcsMultipartMetaName(uploadID)
object := l.client.Bucket(bucket).Object(meta)
partZeroAttrs, err := object.Attrs(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
r, err := object.NewReader(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
// Check version compatibility of the meta file before compose()
multipartMeta := gcsMultipartMetaV1{}
decoder := json.NewDecoder(r)
err = decoder.Decode(&multipartMeta)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
if multipartMeta.Version != gcsMultipartMetaCurrentVersion {
return ObjectInfo{}, gcsToObjectError(traceError(errFormatNotSupported), bucket, key)
}
parts := make([]*storage.ObjectHandle, len(uploadedParts))
for i, uploadedPart := range uploadedParts {
parts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag))
}
if len(parts) > 32 {
// we need to split up the compose of more than 32 parts
// into subcomposes. This means that the first 32 parts will
// compose to a composed-object-0, next parts to composed-object-1,
// the final compose will compose composed-object* to 1.
return ObjectInfo{}, traceError(NotSupported{})
}
dst := l.client.Bucket(bucket).Object(key)
composer := dst.ComposerFrom(parts...)
composer.ContentType = partZeroAttrs.ContentType
composer.Metadata = partZeroAttrs.Metadata
attrs, err := composer.Run(l.ctx)
if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
if err = l.cleanupMultipartUpload(bucket, key, uploadID); err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
}
return fromGCSAttrsToObjectInfo(attrs), nil
}
// SetBucketPolicies - Set policy on bucket
func (l *gcsGateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAccessPolicy) error {
var policies []BucketAccessPolicy
for prefix, policy := range policy.GetPolicies(policyInfo.Statements, bucket) {
policies = append(policies, BucketAccessPolicy{
Prefix: prefix,
Policy: policy,
})
}
prefix := bucket + "/*" // For all objects inside the bucket.
if len(policies) != 1 {
return traceError(NotImplemented{})
} else if policies[0].Prefix != prefix {
return traceError(NotImplemented{})
}
acl := l.client.Bucket(bucket).ACL()
if policies[0].Policy == policy.BucketPolicyNone {
if err := acl.Delete(l.ctx, storage.AllUsers); err != nil {
return gcsToObjectError(traceError(err), bucket)
}
return nil
}
var role storage.ACLRole
switch policies[0].Policy {
case policy.BucketPolicyReadOnly:
role = storage.RoleReader
case policy.BucketPolicyWriteOnly:
role = storage.RoleWriter
default:
return traceError(NotImplemented{})
}
if err := acl.Set(l.ctx, storage.AllUsers, role); err != nil {
return gcsToObjectError(traceError(err), bucket)
}
return nil
}
// GetBucketPolicies - Get policy on bucket
func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
acl := l.client.Bucket(bucket).ACL()
rules, err := acl.List(l.ctx)
if err != nil {
return policy.BucketAccessPolicy{}, gcsToObjectError(traceError(err), bucket)
}
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
for _, r := range rules {
if r.Entity != storage.AllUsers || r.Role == storage.RoleOwner {
continue
}
switch r.Role {
case storage.RoleReader:
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyReadOnly, bucket, "")
case storage.RoleWriter:
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyWriteOnly, bucket, "")
}
}
return policyInfo, nil
}
// DeleteBucketPolicies - Delete all policies on bucket
func (l *gcsGateway) DeleteBucketPolicies(bucket string) error {
acl := l.client.Bucket(bucket).ACL()
// This only removes the storage.AllUsers policies
if err := acl.Delete(l.ctx, storage.AllUsers); err != nil {
return gcsToObjectError(traceError(err), bucket)
}
return nil
}

183
cmd/gateway-gcs_test.go Normal file
View File

@@ -0,0 +1,183 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "testing"
func TestToGCSPageToken(t *testing.T) {
testCases := []struct {
Name string
Token string
}{
{
Name: "A",
Token: "CgFB",
},
{
Name: "AAAAAAAAAA",
Token: "CgpBQUFBQUFBQUFB",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CmRBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CpEDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE=",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CpIDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CpMDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQ==",
},
{
Name: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
Token: "CvQDQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE=",
},
}
for i, testCase := range testCases {
if toGCSPageToken(testCase.Name) != testCase.Token {
t.Errorf("Test %d: Expected %s, got %s", i+1, toGCSPageToken(testCase.Name), testCase.Token)
}
}
}
// TestValidGCSProjectID tests the behavior of isValidGCSProjectID
func TestValidGCSProjectID(t *testing.T) {
testCases := []struct {
ProjectID string
Valid bool
}{
{"", false},
{"a", false},
{"Abc", false},
{"1bcd", false},
// 5 chars
{"abcdb", false},
// 6 chars
{"abcdbz", true},
// 30 chars
{"project-id-1-project-id-more-1", true},
// 31 chars
{"project-id-1-project-id-more-11", false},
{"storage.googleapis.com", false},
{"http://storage.googleapis.com", false},
{"http://localhost:9000", false},
{"project-id-1", true},
{"project-id-1988832", true},
{"projectid1414", true},
}
for i, testCase := range testCases {
if isValidGCSProjectID(testCase.ProjectID) != testCase.Valid {
t.Errorf("Test %d: Expected %v, got %v", i+1, isValidGCSProjectID(testCase.ProjectID), testCase.Valid)
}
}
}
// Test for isGCSPrefix
func TestIsGCSPrefix(t *testing.T) {
testCases := []struct {
prefix string
expectedRes bool
}{
// Regular prefix without a trailing slash
{
prefix: "hello",
expectedRes: false,
},
// Regular prefix with a trailing slash
{
prefix: "hello/",
expectedRes: false,
},
// GCS prefix without a trailing slash
{
prefix: gcsMinioPath,
expectedRes: true,
},
// GCS prefix with a trailing slash
{
prefix: gcsMinioPath + "/",
expectedRes: true,
},
}
for i, tc := range testCases {
if actualRes := isGCSPrefix(tc.prefix); actualRes != tc.expectedRes {
t.Errorf("%d: Expected isGCSPrefix to return %v but got %v", i, tc.expectedRes, actualRes)
}
}
}
// Test for isGCSMarker.
func TestIsGCSMarker(t *testing.T) {
testCases := []struct {
marker string
expected bool
}{
{
marker: "##miniogcs123",
expected: true,
},
{
marker: "##mini_notgcs123",
expected: false,
},
{
marker: "#minioagainnotgcs123",
expected: false,
},
{
marker: "obj1",
expected: false,
},
}
for i, tc := range testCases {
if actual := isGCSMarker(tc.marker); actual != tc.expected {
t.Errorf("Test %d: marker is %s, expected %v but got %v",
i+1, tc.marker, tc.expected, actual)
}
}
}
// Test for gcsMultipartMetaName.
func TestGCSMultipartMetaName(t *testing.T) {
uploadID := "a"
expected := pathJoin(gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta)
got := gcsMultipartMetaName(uploadID)
if expected != got {
t.Errorf("expected: %s, got: %s", expected, got)
}
}
// Test for gcsMultipartDataName.
func TestGCSMultipartDataName(t *testing.T) {
uploadID := "a"
etag := "b"
expected := pathJoin(gcsMinioMultipartPathV1, uploadID, etag)
got := gcsMultipartDataName(uploadID, etag)
if expected != got {
t.Errorf("expected: %s, got: %s", expected, got)
}
}

View File

@@ -19,12 +19,12 @@ package cmd
import (
"io"
"io/ioutil"
"net"
"net/http"
"strconv"
"encoding/hex"
"encoding/json"
"encoding/xml"
router "github.com/gorilla/mux"
"github.com/minio/minio-go/pkg/policy"
@@ -140,7 +140,7 @@ func (api gatewayAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Re
}
// Reads the object at startOffset and writes to mw.
if err := getObject(bucket, object, startOffset, length, writer); err != nil {
if err = getObject(bucket, object, startOffset, length, writer); err != nil {
errorIf(err, "Unable to write to client.")
if !dataWritten {
// Error response only if no data has been written to client yet. i.e if
@@ -157,6 +157,23 @@ func (api gatewayAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Re
// call wrter.Write(nil) to set appropriate headers.
writer.Write(nil)
}
// Get host and port from Request.RemoteAddr.
host, port, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
host, port = "", ""
}
// Notify object accessed via a GET request.
eventNotify(eventData{
Type: ObjectAccessedGet,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: host,
Port: port,
})
}
// PutObjectHandler - PUT Object
@@ -180,6 +197,8 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
bucket = vars["bucket"]
object = vars["object"]
// TODO: we should validate the object name here
// Get Content-Md5 sent by client and verify if valid
md5Bytes, err := checkValidMD5(r.Header.Get("Content-Md5"))
if err != nil {
@@ -236,8 +255,6 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
// Make sure we hex encode md5sum here.
metadata["etag"] = hex.EncodeToString(md5Bytes)
sha256sum := ""
// Lock the object.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.Lock()
@@ -247,7 +264,7 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
switch reqAuthType {
case authTypeAnonymous:
// Create anonymous object.
objInfo, err = objectAPI.AnonPutObject(bucket, object, size, r.Body, metadata, sha256sum)
objInfo, err = objectAPI.AnonPutObject(bucket, object, size, r.Body, metadata, "")
case authTypeStreamingSigned:
// Initialize stream signature verifier.
reader, s3Error := newSignV4ChunkedReader(r)
@@ -256,7 +273,7 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, s3Error, r.URL)
return
}
objInfo, err = objectAPI.PutObject(bucket, object, size, reader, metadata, sha256sum)
objInfo, err = objectAPI.PutObject(bucket, object, size, reader, metadata, "")
case authTypeSignedV2, authTypePresignedV2:
s3Error := isReqAuthenticatedV2(r)
if s3Error != ErrNone {
@@ -264,16 +281,19 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(w, s3Error, r.URL)
return
}
objInfo, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata, sha256sum)
objInfo, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata, "")
case authTypePresigned, authTypeSigned:
if s3Error := reqSignatureV4Verify(r, serverConfig.GetRegion()); s3Error != ErrNone {
errorIf(errSignatureMismatch, "%s", dumpRequest(r))
writeErrorResponse(w, s3Error, r.URL)
return
}
sha256sum := ""
if !skipContentSha256Cksum(r) {
sha256sum = r.Header.Get("X-Amz-Content-Sha256")
sha256sum = getContentSha256Cksum(r)
}
// Create object.
objInfo, err = objectAPI.PutObject(bucket, object, size, r.Body, metadata, sha256sum)
default:
@@ -283,12 +303,30 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
}
if err != nil {
errorIf(err, "Unable to save an object %s", r.URL.Path)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
w.Header().Set("ETag", "\""+objInfo.ETag+"\"")
writeSuccessResponseHeadersOnly(w)
// Get host and port from Request.RemoteAddr.
host, port, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
host, port = "", ""
}
// Notify object created event.
eventNotify(eventData{
Type: ObjectCreatedPut,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: host,
Port: port,
})
}
// HeadObjectHandler - HEAD Object
@@ -357,97 +395,23 @@ func (api gatewayAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.R
// Successful response.
w.WriteHeader(http.StatusOK)
}
// DeleteMultipleObjectsHandler - deletes multiple objects.
func (api gatewayAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
vars := router.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
// Get host and port from Request.RemoteAddr.
host, port, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
host, port = "", ""
}
if s3Error := checkRequestAuthType(r, bucket, "s3:DeleteObject", serverConfig.GetRegion()); s3Error != ErrNone {
writeErrorResponse(w, s3Error, r.URL)
return
}
// Content-Length is required and should be non-zero
// http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
if r.ContentLength <= 0 {
writeErrorResponse(w, ErrMissingContentLength, r.URL)
return
}
// Content-Md5 is requied should be set
// http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
if _, ok := r.Header["Content-Md5"]; !ok {
writeErrorResponse(w, ErrMissingContentMD5, r.URL)
return
}
// Allocate incoming content length bytes.
deleteXMLBytes := make([]byte, r.ContentLength)
// Read incoming body XML bytes.
if _, err := io.ReadFull(r.Body, deleteXMLBytes); err != nil {
errorIf(err, "Unable to read HTTP body.")
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
// Unmarshal list of keys to be deleted.
deleteObjects := &DeleteObjectsRequest{}
if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
errorIf(err, "Unable to unmarshal delete objects request XML.")
writeErrorResponse(w, ErrMalformedXML, r.URL)
return
}
var dErrs = make([]error, len(deleteObjects.Objects))
// Delete all requested objects in parallel.
for index, object := range deleteObjects.Objects {
dErr := objectAPI.DeleteObject(bucket, object.ObjectName)
if dErr != nil {
dErrs[index] = dErr
}
}
// Collect deleted objects and errors if any.
var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError
for index, err := range dErrs {
object := deleteObjects.Objects[index]
// Success deleted objects are collected separately.
if err == nil {
deletedObjects = append(deletedObjects, object)
continue
}
if _, ok := errorCause(err).(ObjectNotFound); ok {
// If the object is not found it should be
// accounted as deleted as per S3 spec.
deletedObjects = append(deletedObjects, object)
continue
}
errorIf(err, "Unable to delete object. %s", object.ObjectName)
// Error during delete should be collected separately.
deleteErrors = append(deleteErrors, DeleteError{
Code: errorCodeResponse[toAPIErrorCode(err)].Code,
Message: errorCodeResponse[toAPIErrorCode(err)].Description,
Key: object.ObjectName,
})
}
// Generate response
response := generateMultiDeleteResponse(deleteObjects.Quiet, deletedObjects, deleteErrors)
encodedSuccessResponse := encodeResponse(response)
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
// Notify object accessed via a HEAD request.
eventNotify(eventData{
Type: ObjectAccessedHead,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: host,
Port: port,
})
}
// PutBucketPolicyHandler - PUT Bucket policy
@@ -649,15 +613,6 @@ func (api gatewayAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Re
return
}
// validating region here, because isValidLocationConstraint
// reads body which has been read already. So only validating
// region here.
serverRegion := serverConfig.GetRegion()
if serverRegion != location {
writeErrorResponse(w, ErrInvalidRegion, r.URL)
return
}
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
defer bucketLock.Unlock()
@@ -746,15 +701,11 @@ func (api gatewayAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *htt
return
}
// Extract all the litsObjectsV1 query params to their native values.
// Extract all the listObjectsV1 query params to their native
// values. N B We delegate validation of params to respective
// gateway backends.
prefix, marker, delimiter, maxKeys, _ := getListObjectsV1Args(r.URL.Query())
// Validate all the query params before beginning to serve the request.
if s3Error := validateListObjectsArgs(prefix, marker, delimiter, maxKeys); s3Error != ErrNone {
writeErrorResponse(w, s3Error, r.URL)
return
}
listObjects := objectAPI.ListObjects
if reqAuthType == authTypeAnonymous {
listObjects = objectAPI.AnonListObjects

View File

@@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net/url"
"os"
"runtime"
"strings"
@@ -53,26 +52,13 @@ EXAMPLES:
$ export MINIO_ACCESS_KEY=azureaccountname
$ export MINIO_SECRET_KEY=azureaccountkey
$ {{.HelpName}}
2. Start minio gateway server for Azure Blob Storage backend on custom endpoint.
$ export MINIO_ACCESS_KEY=azureaccountname
$ export MINIO_SECRET_KEY=azureaccountkey
$ {{.HelpName}} https://azure.example.com
`
var azureBackendCmd = cli.Command{
Name: "azure",
Usage: "Microsoft Azure Blob Storage.",
Action: azureGatewayMain,
CustomHelpTemplate: azureGatewayTemplate,
Flags: append(serverFlags,
cli.BoolFlag{
Name: "quiet",
Usage: "Disable startup banner.",
},
),
HideHelpCommand: true,
}
const s3GatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
@@ -105,26 +91,69 @@ EXAMPLES:
$ {{.HelpName}} https://play.minio.io:9000
`
var s3BackendCmd = cli.Command{
Name: "s3",
Usage: "Amazon Simple Storage Service (S3).",
Action: s3GatewayMain,
CustomHelpTemplate: s3GatewayTemplate,
Flags: append(serverFlags,
cli.BoolFlag{
Name: "quiet",
Usage: "Disable startup banner.",
},
),
HideHelpCommand: true,
}
const gcsGatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
var gatewayCmd = cli.Command{
Name: "gateway",
Usage: "Start object storage gateway.",
HideHelpCommand: true,
Subcommands: []cli.Command{azureBackendCmd, s3BackendCmd},
}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} PROJECTID
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
PROJECTID:
GCS project id, there are no defaults this is mandatory.
ENVIRONMENT VARIABLES:
ACCESS:
MINIO_ACCESS_KEY: Username or access key of GCS.
MINIO_SECRET_KEY: Password or secret key of GCS.
BROWSER:
MINIO_BROWSER: To disable web browser access, set this value to "off".
EXAMPLES:
1. Start minio gateway server for GCS backend.
$ export MINIO_ACCESS_KEY=accesskey
$ export MINIO_SECRET_KEY=secretkey
$ {{.HelpName}} mygcsprojectid
`
var (
azureBackendCmd = cli.Command{
Name: "azure",
Usage: "Microsoft Azure Blob Storage.",
Action: azureGatewayMain,
CustomHelpTemplate: azureGatewayTemplate,
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
}
s3BackendCmd = cli.Command{
Name: "s3",
Usage: "Amazon Simple Storage Service (S3).",
Action: s3GatewayMain,
CustomHelpTemplate: s3GatewayTemplate,
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
}
gcsBackendCmd = cli.Command{
Name: "gcs",
Usage: "Google Cloud Storage.",
Action: gcsGatewayMain,
CustomHelpTemplate: gcsGatewayTemplate,
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
}
gatewayCmd = cli.Command{
Name: "gateway",
Usage: "Start object storage gateway.",
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
Subcommands: []cli.Command{azureBackendCmd, s3BackendCmd, gcsBackendCmd},
}
)
// Represents the type of the gateway backend.
type gatewayBackend string
@@ -132,79 +161,30 @@ type gatewayBackend string
const (
azureBackend gatewayBackend = "azure"
s3Backend gatewayBackend = "s3"
gcsBackend gatewayBackend = "gcs"
// Add more backends here.
)
// Returns access and secretkey set from environment variables.
func mustGetGatewayCredsFromEnv() (accessKey, secretKey string) {
// Fetch access keys from environment variables.
accessKey = os.Getenv("MINIO_ACCESS_KEY")
secretKey = os.Getenv("MINIO_SECRET_KEY")
if accessKey == "" || secretKey == "" {
fatalIf(errors.New("Missing credentials"), "Access and secret keys are mandatory to run Minio gateway server.")
}
return accessKey, secretKey
}
// Set browser setting from environment variables
func mustSetBrowserSettingFromEnv() {
if browser := os.Getenv("MINIO_BROWSER"); browser != "" {
browserFlag, err := ParseBrowserFlag(browser)
if err != nil {
fatalIf(errors.New("invalid value"), "Unknown value %s in MINIO_BROWSER environment variable.", browser)
}
// browser Envs are set globally, this does not represent
// if browser is turned off or on.
globalIsEnvBrowser = true
globalIsBrowserEnabled = bool(browserFlag)
}
}
// Initialize gateway layer depending on the backend type.
// Supported backend types are
//
// - Azure Blob Storage.
// - AWS S3.
// - Google Cloud Storage.
// - Add your favorite backend here.
func newGatewayLayer(backendType gatewayBackend, endpoint, accessKey, secretKey string, secure bool) (GatewayLayer, error) {
switch gatewayBackend(backendType) {
func newGatewayLayer(backendType gatewayBackend, arg string) (GatewayLayer, error) {
switch backendType {
case azureBackend:
return newAzureLayer(endpoint, accessKey, secretKey, secure)
return newAzureLayer(arg)
case s3Backend:
return newS3Gateway(endpoint, accessKey, secretKey, secure)
return newS3Gateway(arg)
case gcsBackend:
return newGCSGateway(arg)
}
return nil, fmt.Errorf("Unrecognized backend type %s", backendType)
}
// Initialize a new gateway config.
//
// DO NOT save this config, this is meant to be
// only used in memory.
func newGatewayConfig(accessKey, secretKey, region string) error {
// Initialize server config.
srvCfg := newServerConfigV19()
// If env is set for a fresh start, save them to config file.
srvCfg.SetCredential(credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
// Set custom region.
srvCfg.SetRegion(region)
// hold the mutex lock before a new config is assigned.
// Save the new config globally.
// unlock the mutex.
serverConfigMu.Lock()
serverConfig = srvCfg
serverConfigMu.Unlock()
return nil
}
// Return endpoint.
func parseGatewayEndpoint(arg string) (endPoint string, secure bool, err error) {
schemeSpecified := len(strings.Split(arg, "://")) > 1
@@ -264,6 +244,9 @@ func azureGatewayMain(ctx *cli.Context) {
cli.ShowCommandHelpAndExit(ctx, "azure", 1)
}
// Validate gateway arguments.
fatalIf(validateGatewayArguments(ctx.String("address"), ctx.Args().First()), "Invalid argument")
gatewayMain(ctx, azureBackend)
}
@@ -273,54 +256,72 @@ func s3GatewayMain(ctx *cli.Context) {
cli.ShowCommandHelpAndExit(ctx, "s3", 1)
}
// Validate gateway arguments.
fatalIf(validateGatewayArguments(ctx.String("address"), ctx.Args().First()), "Invalid argument")
gatewayMain(ctx, s3Backend)
}
// Handler for 'minio gateway gcs' command line
func gcsGatewayMain(ctx *cli.Context) {
if ctx.Args().Present() && ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, "gcs", 1)
}
if !isValidGCSProjectID(ctx.Args().First()) {
errorIf(errGCSInvalidProjectID, "Unable to start GCS gateway with %s", ctx.Args().First())
cli.ShowCommandHelpAndExit(ctx, "gcs", 1)
}
gatewayMain(ctx, gcsBackend)
}
// Handler for 'minio gateway'.
func gatewayMain(ctx *cli.Context, backendType gatewayBackend) {
// Fetch access and secret key from env.
accessKey, secretKey := mustGetGatewayCredsFromEnv()
// Fetch browser env setting
mustSetBrowserSettingFromEnv()
// Initialize new gateway config.
newGatewayConfig(accessKey, secretKey, globalMinioDefaultRegion)
// Get quiet flag from command line argument.
quietFlag := ctx.Bool("quiet") || ctx.GlobalBool("quiet")
if quietFlag {
log.EnableQuiet()
}
serverAddr := ctx.String("address")
endpointAddr := ctx.Args().Get(0)
err := validateGatewayArguments(serverAddr, endpointAddr)
fatalIf(err, "Invalid argument")
// Handle common command args.
handleCommonCmdArgs(ctx)
// Second argument is endpoint. If no endpoint is specified then the
// gateway implementation should use a default setting.
endPoint, secure, err := parseGatewayEndpoint(endpointAddr)
fatalIf(err, "Unable to parse endpoint")
// Handle common env vars.
handleCommonEnvVars()
// Create certs path for SSL configuration.
fatalIf(createConfigDir(), "Unable to create configuration directory")
// Validate if we have access, secret set through environment.
if !globalIsEnvCreds {
fatalIf(fmt.Errorf("Access and Secret keys should be set through ENVs for backend [%s]", backendType), "")
}
newObject, err := newGatewayLayer(backendType, endPoint, accessKey, secretKey, secure)
fatalIf(err, "Unable to initialize gateway layer")
// Create certs path.
fatalIf(createConfigDir(), "Unable to create configuration directories.")
// Initialize gateway config.
initConfig()
// Enable loggers as per configuration file.
enableLoggers()
// Init the error tracing module.
initError()
// Check and load SSL certificates.
var err error
globalPublicCerts, globalRootCAs, globalIsSSL, err = getSSLConfig()
fatalIf(err, "Invalid SSL key file")
initNSLock(false) // Enable local namespace lock.
router := mux.NewRouter().SkipClean(true)
newObject, err := newGatewayLayer(backendType, ctx.Args().First())
fatalIf(err, "Unable to initialize gateway layer")
// credentials Envs are set globally.
globalIsEnvCreds = true
router := mux.NewRouter().SkipClean(true)
// Register web router when its enabled.
if globalIsBrowserEnabled {
aerr := registerWebRouter(router)
fatalIf(aerr, "Unable to configure web browser")
fatalIf(registerWebRouter(router), "Unable to configure web browser")
}
registerGatewayAPIRouter(router, newObject)
@@ -353,10 +354,7 @@ func gatewayMain(ctx *cli.Context, backendType gatewayBackend) {
}
apiServer := NewServerMux(serverAddr, registerHandlers(router, handlerFns...))
_, _, globalIsSSL, err = getSSLConfig()
fatalIf(err, "Invalid SSL key file")
apiServer := NewServerMux(ctx.String("address"), registerHandlers(router, handlerFns...))
// Start server, automatically configures TLS if certs are available.
go func() {
@@ -364,9 +362,7 @@ func gatewayMain(ctx *cli.Context, backendType gatewayBackend) {
if globalIsSSL {
cert, key = getPublicCertFile(), getPrivateKeyFile()
}
aerr := apiServer.ListenAndServe(cert, key)
fatalIf(aerr, "Failed to start minio server")
fatalIf(apiServer.ListenAndServe(cert, key), "Failed to start minio server")
}()
// Once endpoints are finalized, initialize the new object api.
@@ -377,14 +373,20 @@ func gatewayMain(ctx *cli.Context, backendType gatewayBackend) {
// Prints the formatted startup message once object layer is initialized.
if !quietFlag {
mode := ""
if gatewayBackend(backendType) == azureBackend {
switch gatewayBackend(backendType) {
case azureBackend:
mode = globalMinioModeGatewayAzure
} else if gatewayBackend(backendType) == s3Backend {
case gcsBackend:
mode = globalMinioModeGatewayGCS
case s3Backend:
mode = globalMinioModeGatewayS3
}
// Check update mode.
checkUpdate(mode)
apiEndpoints := getAPIEndpoints(apiServer.Addr)
printGatewayStartupMessage(apiEndpoints, accessKey, secretKey, backendType)
// Print gateway startup message.
printGatewayStartupMessage(getAPIEndpoints(apiServer.Addr), backendType)
}
<-globalServiceDoneCh

View File

@@ -17,7 +17,6 @@
package cmd
import (
"os"
"strings"
"testing"
)
@@ -53,28 +52,6 @@ func TestParseGatewayEndpoint(t *testing.T) {
}
}
func TestSetBrowserFromEnv(t *testing.T) {
browser := os.Getenv("MINIO_BROWSER")
os.Setenv("MINIO_BROWSER", "on")
mustSetBrowserSettingFromEnv()
if globalIsBrowserEnabled != true {
t.Errorf("Expected the response status to be `%t`, but instead found `%t`", globalIsBrowserEnabled, false)
}
os.Setenv("MINIO_BROWSER", "off")
mustSetBrowserSettingFromEnv()
if globalIsBrowserEnabled != false {
t.Errorf("Expected the response status to be `%t`, but instead found `%t`", globalIsBrowserEnabled, true)
}
os.Setenv("MINIO_BROWSER", "")
mustSetBrowserSettingFromEnv()
if globalIsBrowserEnabled != false {
t.Errorf("Expected the response status to be `%t`, but instead found `%t`", globalIsBrowserEnabled, true)
}
os.Setenv("MINIO_BROWSER", browser)
}
// Test validateGatewayArguments
func TestValidateGatewayArguments(t *testing.T) {
nonLoopBackIPs := localIP4.FuncMatch(func(ip string, matchString string) bool {

View File

@@ -17,11 +17,12 @@
package cmd
import (
"encoding/hex"
"io"
"net/http"
"path"
"encoding/hex"
minio "github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/policy"
)
@@ -97,14 +98,30 @@ type s3Objects struct {
}
// newS3Gateway returns s3 gatewaylayer
func newS3Gateway(endpoint string, accessKey, secretKey string, secure bool) (GatewayLayer, error) {
if endpoint == "" {
endpoint = "s3.amazonaws.com"
secure = true
func newS3Gateway(host string) (GatewayLayer, error) {
var err error
var endpoint string
var secure = true
// Validate host parameters.
if host != "" {
// Override default params if the host is provided
endpoint, secure, err = parseGatewayEndpoint(host)
if err != nil {
return nil, err
}
}
// Default endpoint parameters
if endpoint == "" {
endpoint = "s3.amazonaws.com"
}
creds := serverConfig.GetCredential()
// Initialize minio client object.
client, err := minio.NewCore(endpoint, accessKey, secretKey, secure)
client, err := minio.NewCore(endpoint, creds.AccessKey, creds.SecretKey, secure)
if err != nil {
return nil, err
}

View File

@@ -18,28 +18,19 @@ package cmd
import (
"fmt"
"runtime"
"strings"
)
// Prints the formatted startup message.
func printGatewayStartupMessage(apiEndPoints []string, accessKey, secretKey string, backendType gatewayBackend) {
func printGatewayStartupMessage(apiEndPoints []string, backendType gatewayBackend) {
strippedAPIEndpoints := stripStandardPorts(apiEndPoints)
// Prints credential.
printGatewayCommonMsg(apiEndPoints, accessKey, secretKey)
printGatewayCommonMsg(strippedAPIEndpoints)
// Prints `mc` cli configuration message chooses
// first endpoint as default.
endPoint := apiEndPoints[0]
// Configure 'mc', following block prints platform specific information for minio client.
log.Println(colorBlue("\nCommand-line Access: ") + mcQuickStartGuide)
if runtime.GOOS == globalWindowsOSName {
mcMessage := fmt.Sprintf("$ mc.exe config host add my%s %s %s %s", backendType, endPoint, accessKey, secretKey)
log.Println(fmt.Sprintf(getFormatStr(len(mcMessage), 3), mcMessage))
} else {
mcMessage := fmt.Sprintf("$ mc config host add my%s %s %s %s", backendType, endPoint, accessKey, secretKey)
log.Println(fmt.Sprintf(getFormatStr(len(mcMessage), 3), mcMessage))
}
printCLIAccessMsg(strippedAPIEndpoints[0], fmt.Sprintf("my%s", backendType))
// Prints documentation message.
printObjectAPIMsg()
@@ -52,10 +43,16 @@ func printGatewayStartupMessage(apiEndPoints []string, accessKey, secretKey stri
}
// Prints common server startup message. Prints credential, region and browser access.
func printGatewayCommonMsg(apiEndpoints []string, accessKey, secretKey string) {
func printGatewayCommonMsg(apiEndpoints []string) {
// Get saved credentials.
cred := serverConfig.GetCredential()
apiEndpointStr := strings.Join(apiEndpoints, " ")
// Colorize the message and print.
log.Println(colorBlue("\nEndpoint: ") + colorBold(fmt.Sprintf(getFormatStr(len(apiEndpointStr), 1), apiEndpointStr)))
log.Println(colorBlue("AccessKey: ") + colorBold(fmt.Sprintf("%s ", accessKey)))
log.Println(colorBlue("SecretKey: ") + colorBold(fmt.Sprintf("%s ", secretKey)))
log.Println(colorBlue("AccessKey: ") + colorBold(fmt.Sprintf("%s ", cred.AccessKey)))
log.Println(colorBlue("SecretKey: ") + colorBold(fmt.Sprintf("%s ", cred.SecretKey)))
log.Println(colorBlue("\nBrowser Access:"))
log.Println(fmt.Sprintf(getFormatStr(len(apiEndpointStr), 3), apiEndpointStr))
}

View File

@@ -20,12 +20,24 @@ import "testing"
// Test printing Gateway common message.
func TestPrintGatewayCommonMessage(t *testing.T) {
apiEndpoints := []string{"127.0.0.1:9000"}
printGatewayCommonMsg(apiEndpoints, "abcd1", "abcd123")
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatal(err)
}
defer removeAll(root)
apiEndpoints := []string{"http://127.0.0.1:9000"}
printGatewayCommonMsg(apiEndpoints)
}
// Test print gateway startup message.
func TestPrintGatewayStartupMessage(t *testing.T) {
apiEndpoints := []string{"127.0.0.1:9000"}
printGatewayStartupMessage(apiEndpoints, "abcd1", "abcd123", "azure")
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatal(err)
}
defer removeAll(root)
apiEndpoints := []string{"http://127.0.0.1:9000"}
printGatewayStartupMessage(apiEndpoints, "azure")
}

View File

@@ -49,6 +49,7 @@ const (
globalMinioModeDistXL = "mode-server-distributed-xl"
globalMinioModeGatewayAzure = "mode-gateway-azure"
globalMinioModeGatewayS3 = "mode-gateway-s3"
globalMinioModeGatewayGCS = "mode-gateway-gcs"
// Add new global values here.
)

View File

@@ -110,6 +110,13 @@ func (e SHA256Mismatch) Error() string {
return "sha256 computed does not match with what is expected"
}
// SignatureDoesNotMatch - when content md5 does not match with what was sent from client.
type SignatureDoesNotMatch struct{}
func (e SignatureDoesNotMatch) Error() string {
return "The request signature we calculated does not match the signature you provided. Check your key and signing method."
}
// StorageFull storage ran out of space.
type StorageFull struct{}
@@ -144,6 +151,13 @@ func (e BucketNotFound) Error() string {
return "Bucket not found: " + e.Bucket
}
// BucketAlreadyExists the requested bucket name is not available.
type BucketAlreadyExists GenericError
func (e BucketAlreadyExists) Error() string {
return "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Please select a different name and try again."
}
// BucketAlreadyOwnedByYou already owned by you.
type BucketAlreadyOwnedByYou GenericError
@@ -250,6 +264,14 @@ func (e ObjectNameInvalid) Error() string {
return "Object name invalid: " + e.Bucket + "#" + e.Object
}
// AllAccessDisabled All access to this object has been disabled
type AllAccessDisabled GenericError
// Return string an error formatted as the given text.
func (e AllAccessDisabled) Error() string {
return "All access to this object has been disabled"
}
// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header.
type IncompleteBody GenericError

View File

@@ -17,12 +17,8 @@
package cmd
import (
"errors"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/minio/cli"
"github.com/minio/dsync"
@@ -81,61 +77,9 @@ EXAMPLES:
`,
}
// Check for updates and print a notification message
func checkUpdate(mode string) {
// Its OK to ignore any errors during getUpdateInfo() here.
if older, downloadURL, err := getUpdateInfo(1*time.Second, mode); err == nil {
if updateMsg := computeUpdateMessage(downloadURL, older); updateMsg != "" {
log.Println(updateMsg)
}
}
}
func enableLoggers() {
fileLogTarget := serverConfig.Logger.GetFile()
if fileLogTarget.Enable {
err := InitFileLogger(&fileLogTarget)
fatalIf(err, "Unable to initialize file logger")
log.AddTarget(fileLogTarget)
}
consoleLogTarget := serverConfig.Logger.GetConsole()
if consoleLogTarget.Enable {
InitConsoleLogger(&consoleLogTarget)
}
log.SetConsoleTarget(consoleLogTarget)
}
func initConfig() {
// Config file does not exist, we create it fresh and return upon success.
if isFile(getConfigFile()) {
fatalIf(migrateConfig(), "Config migration failed.")
fatalIf(loadConfig(), "Unable to load config version: '%s'.", v19)
} else {
fatalIf(newConfig(), "Unable to initialize minio config for the first time.")
log.Println("Created minio configuration file successfully at " + getConfigDir())
}
}
func serverHandleCmdArgs(ctx *cli.Context) {
// Set configuration directory.
{
// Get configuration directory from command line argument.
configDir := ctx.String("config-dir")
if !ctx.IsSet("config-dir") && ctx.GlobalIsSet("config-dir") {
configDir = ctx.GlobalString("config-dir")
}
if configDir == "" {
fatalIf(errors.New("empty directory"), "Configuration directory cannot be empty.")
}
// Disallow relative paths, figure out absolute paths.
configDirAbs, err := filepath.Abs(configDir)
fatalIf(err, "Unable to fetch absolute path for config directory %s", configDir)
setConfigDir(configDirAbs)
}
// Handle common command args.
handleCommonCmdArgs(ctx)
// Server address.
serverAddr := ctx.String("address")
@@ -162,36 +106,8 @@ func serverHandleCmdArgs(ctx *cli.Context) {
}
func serverHandleEnvVars() {
// Start profiler if env is set.
if profiler := os.Getenv("_MINIO_PROFILER"); profiler != "" {
globalProfiler = startProfiler(profiler)
}
// Check if object cache is disabled.
globalXLObjCacheDisabled = strings.EqualFold(os.Getenv("_MINIO_CACHE"), "off")
accessKey := os.Getenv("MINIO_ACCESS_KEY")
secretKey := os.Getenv("MINIO_SECRET_KEY")
if accessKey != "" && secretKey != "" {
cred, err := createCredential(accessKey, secretKey)
fatalIf(err, "Invalid access/secret Key set in environment.")
// credential Envs are set globally.
globalIsEnvCreds = true
globalActiveCred = cred
}
if browser := os.Getenv("MINIO_BROWSER"); browser != "" {
browserFlag, err := ParseBrowserFlag(browser)
if err != nil {
fatalIf(errors.New("invalid value"), "Unknown value %s in MINIO_BROWSER environment variable.", browser)
}
// browser Envs are set globally, this does not represent
// if browser is turned off or on.
globalIsEnvBrowser = true
globalIsBrowserEnabled = bool(browserFlag)
}
// Handle common environment variables.
handleCommonEnvVars()
if serverRegion := os.Getenv("MINIO_REGION"); serverRegion != "" {
// region Envs are set globally.
@@ -213,12 +129,16 @@ func serverMain(ctx *cli.Context) {
log.EnableQuiet()
}
// Handle all server command args.
serverHandleCmdArgs(ctx)
// Handle all server environment vars.
serverHandleEnvVars()
// Create certs path.
fatalIf(createConfigDir(), "Unable to create configuration directories.")
// Initialize server config.
initConfig()
// Enable loggers as per configuration file.

View File

@@ -52,7 +52,7 @@ func printStartupMessage(apiEndPoints []string) {
// Prints `mc` cli configuration message chooses
// first endpoint as default.
printCLIAccessMsg(strippedAPIEndpoints[0])
printCLIAccessMsg(strippedAPIEndpoints[0], "myminio")
// Prints documentation message.
printObjectAPIMsg()
@@ -141,17 +141,17 @@ func printEventNotifiers() {
// Prints startup message for command line access. Prints link to our documentation
// and custom platform specific message.
func printCLIAccessMsg(endPoint string) {
func printCLIAccessMsg(endPoint string, alias string) {
// Get saved credentials.
cred := serverConfig.GetCredential()
// Configure 'mc', following block prints platform specific information for minio client.
log.Println(colorBlue("\nCommand-line Access: ") + mcQuickStartGuide)
if runtime.GOOS == globalWindowsOSName {
mcMessage := fmt.Sprintf("$ mc.exe config host add myminio %s %s %s", endPoint, cred.AccessKey, cred.SecretKey)
mcMessage := fmt.Sprintf("$ mc.exe config host add %s %s %s %s", alias, endPoint, cred.AccessKey, cred.SecretKey)
log.Println(fmt.Sprintf(getFormatStr(len(mcMessage), 3), mcMessage))
} else {
mcMessage := fmt.Sprintf("$ mc config host add myminio %s %s %s", endPoint, cred.AccessKey, cred.SecretKey)
mcMessage := fmt.Sprintf("$ mc config host add %s %s %s %s", alias, endPoint, cred.AccessKey, cred.SecretKey)
log.Println(fmt.Sprintf(getFormatStr(len(mcMessage), 3), mcMessage))
}
}

View File

@@ -140,7 +140,7 @@ func TestPrintCLIAccessMsg(t *testing.T) {
defer removeAll(root)
apiEndpoints := []string{"http://127.0.0.1:9000"}
printCLIAccessMsg(apiEndpoints[0])
printCLIAccessMsg(apiEndpoints[0], "myminio")
}
// Test print startup message.

View File

@@ -27,6 +27,10 @@ var errUnexpected = errors.New("Unexpected error, please report this issue at ht
// errCorruptedFormat - corrupted backend format.
var errCorruptedFormat = errors.New("corrupted backend format, please join https://slack.minio.io for assistance")
// errFormatNotSupported - returned when older minio tries to parse metadata
// created by newer minio.
var errFormatNotSupported = errors.New("format not supported")
// errUnformattedDisk - unformatted disk found.
var errUnformattedDisk = errors.New("unformatted disk found")

View File

@@ -690,13 +690,15 @@ func getBucketAccessPolicy(objAPI ObjectLayer, bucketName string) (policy.Bucket
policyInfo, err = layer.GetBucketPolicies(bucketName)
case *azureObjects:
policyInfo, err = layer.GetBucketPolicies(bucketName)
case *gcsGateway:
policyInfo, err = layer.GetBucketPolicies(bucketName)
default:
policyInfo, err = readBucketAccessPolicy(objAPI, bucketName)
}
return policyInfo, err
}
// GetBucketPolicy - get bucket policy.
// GetBucketPolicy - get bucket policy for the requested prefix.
func (web *webAPIHandlers) GetBucketPolicy(r *http.Request, args *GetBucketPolicyArgs, reply *GetBucketPolicyRep) error {
objectAPI := web.ObjectAPI()
if objectAPI == nil {
@@ -707,9 +709,12 @@ func (web *webAPIHandlers) GetBucketPolicy(r *http.Request, args *GetBucketPolic
return toJSONError(errAuthentication)
}
policyInfo, err := readBucketAccessPolicy(objectAPI, args.BucketName)
var policyInfo, err = getBucketAccessPolicy(objectAPI, args.BucketName)
if err != nil {
return toJSONError(err, args.BucketName)
_, ok := errorCause(err).(PolicyNotFound)
if !ok {
return toJSONError(err, args.BucketName)
}
}
reply.UIVersion = browser.UIVersion
@@ -745,8 +750,8 @@ func (web *webAPIHandlers) ListAllBucketPolicies(r *http.Request, args *ListAllB
if !isHTTPRequestValid(r) {
return toJSONError(errAuthentication)
}
var policyInfo, err = getBucketAccessPolicy(objectAPI, args.BucketName)
var policyInfo, err = getBucketAccessPolicy(objectAPI, args.BucketName)
if err != nil {
_, ok := errorCause(err).(PolicyNotFound)
if !ok {
@@ -791,7 +796,6 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
}
var policyInfo, err = getBucketAccessPolicy(objectAPI, args.BucketName)
if err != nil {
if _, ok := errorCause(err).(PolicyNotFound); !ok {
return toJSONError(err, args.BucketName)