mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
70fec0a53f
Previously init multipart upload stores metadata of an object which is used for complete multipart. This patch makes azure gateway to store metadata information of init multipart object in azure in the name of 'minio.sys.tmp/multipart/v1/<UPLOAD-ID>/meta.json' and uses this information on complete multipart.
1071 lines
32 KiB
Go
1071 lines
32 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"golang.org/x/oauth2/google"
|
|
|
|
"cloud.google.com/go/storage"
|
|
cloudresourcemanager "google.golang.org/api/cloudresourcemanager/v1"
|
|
"google.golang.org/api/googleapi"
|
|
"google.golang.org/api/iterator"
|
|
|
|
minio "github.com/minio/minio-go"
|
|
"github.com/minio/minio-go/pkg/policy"
|
|
)
|
|
|
|
const (
|
|
// Path where multipart objects are saved.
|
|
// If we change the backend format we will use a different url path like /multipart/v2
|
|
// but we will not migrate old data.
|
|
gcsMinioMultipartPathV1 = globalMinioSysTmp + "multipart/v1"
|
|
|
|
// Multipart meta file.
|
|
gcsMinioMultipartMeta = "gcs.json"
|
|
|
|
// gcs.json version number
|
|
gcsMinioMultipartMetaCurrentVersion = "1"
|
|
|
|
// token prefixed with GCS returned marker to differentiate
|
|
// from user supplied marker.
|
|
gcsTokenPrefix = "{minio}"
|
|
|
|
// Maximum component object count to create a composite object.
|
|
// Refer https://cloud.google.com/storage/docs/composite-objects
|
|
gcsMaxComponents = 32
|
|
|
|
// gcsMaxPartCount - maximum multipart parts GCS supports which is 32 x 32 = 1024.
|
|
gcsMaxPartCount = 1024
|
|
|
|
// Every 24 hours we scan minio.sys.tmp to delete expired multiparts in minio.sys.tmp
|
|
gcsCleanupInterval = time.Hour * 24
|
|
|
|
// The cleanup routine deletes files older than 2 weeks in minio.sys.tmp
|
|
gcsMultipartExpiry = time.Hour * 24 * 14
|
|
)
|
|
|
|
// Stored in gcs.json - Contents of this file is not used anywhere. It can be
|
|
// used for debugging purposes.
|
|
type gcsMultipartMetaV1 struct {
|
|
Version string `json:"version"` // Version number
|
|
Bucket string `json:"bucket"` // Bucket name
|
|
Object string `json:"object"` // Object name
|
|
}
|
|
|
|
// Returns name of the multipart meta object.
|
|
func gcsMultipartMetaName(uploadID string) string {
|
|
return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta)
|
|
}
|
|
|
|
// Returns name of the part object.
|
|
func gcsMultipartDataName(uploadID string, partNumber int, etag string) string {
|
|
return fmt.Sprintf("%s/%s/%05d.%s", gcsMinioMultipartPathV1, uploadID, partNumber, etag)
|
|
}
|
|
|
|
// Convert Minio errors to minio object layer errors.
|
|
func gcsToObjectError(err error, params ...string) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
e, ok := err.(*Error)
|
|
if !ok {
|
|
// Code should be fixed if this function is called without doing traceError()
|
|
// Else handling different situations in this function makes this function complicated.
|
|
errorIf(err, "Expected type *Error")
|
|
return err
|
|
}
|
|
|
|
err = e.e
|
|
|
|
bucket := ""
|
|
object := ""
|
|
if len(params) >= 1 {
|
|
bucket = params[0]
|
|
}
|
|
if len(params) == 2 {
|
|
object = params[1]
|
|
}
|
|
|
|
// in some cases just a plain error is being returned
|
|
switch err.Error() {
|
|
case "storage: bucket doesn't exist":
|
|
err = BucketNotFound{
|
|
Bucket: bucket,
|
|
}
|
|
e.e = err
|
|
return e
|
|
case "storage: object doesn't exist":
|
|
err = ObjectNotFound{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
e.e = err
|
|
return e
|
|
}
|
|
|
|
googleAPIErr, ok := err.(*googleapi.Error)
|
|
if !ok {
|
|
// We don't interpret non Minio errors. As minio errors will
|
|
// have StatusCode to help to convert to object errors.
|
|
e.e = err
|
|
return e
|
|
}
|
|
|
|
if len(googleAPIErr.Errors) == 0 {
|
|
e.e = err
|
|
return e
|
|
}
|
|
|
|
reason := googleAPIErr.Errors[0].Reason
|
|
message := googleAPIErr.Errors[0].Message
|
|
|
|
switch reason {
|
|
case "required":
|
|
// Anonymous users does not have storage.xyz access to project 123.
|
|
fallthrough
|
|
case "keyInvalid":
|
|
fallthrough
|
|
case "forbidden":
|
|
err = PrefixAccessDenied{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
case "invalid":
|
|
err = BucketNameInvalid{
|
|
Bucket: bucket,
|
|
}
|
|
case "notFound":
|
|
if object != "" {
|
|
err = ObjectNotFound{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
break
|
|
}
|
|
err = BucketNotFound{Bucket: bucket}
|
|
case "conflict":
|
|
if message == "You already own this bucket. Please select another name." {
|
|
err = BucketAlreadyOwnedByYou{Bucket: bucket}
|
|
break
|
|
}
|
|
if message == "Sorry, that name is not available. Please try a different one." {
|
|
err = BucketAlreadyExists{Bucket: bucket}
|
|
break
|
|
}
|
|
err = BucketNotEmpty{Bucket: bucket}
|
|
default:
|
|
err = fmt.Errorf("Unsupported error reason: %s", reason)
|
|
}
|
|
|
|
e.e = err
|
|
return e
|
|
}
|
|
|
|
// gcsProjectIDRegex defines a valid gcs project id format
|
|
var gcsProjectIDRegex = regexp.MustCompile("^[a-z][a-z0-9-]{5,29}$")
|
|
|
|
// isValidGCSProjectIDFormat - checks if a given project id format is valid or not.
|
|
// Project IDs must start with a lowercase letter and can have lowercase ASCII letters,
|
|
// digits or hyphens. Project IDs must be between 6 and 30 characters.
|
|
// Ref: https://cloud.google.com/resource-manager/reference/rest/v1/projects#Project (projectId section)
|
|
func isValidGCSProjectIDFormat(projectID string) bool {
|
|
// Checking projectID format
|
|
return gcsProjectIDRegex.MatchString(projectID)
|
|
}
|
|
|
|
// checkGCSProjectID - checks if the project ID does really exist using resource manager API.
|
|
func checkGCSProjectID(ctx context.Context, projectID string) error {
|
|
// Check if a project id associated to the current account does really exist
|
|
resourceManagerClient, err := google.DefaultClient(ctx, cloudresourcemanager.CloudPlatformReadOnlyScope)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
baseSvc, err := cloudresourcemanager.New(resourceManagerClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
projectSvc := cloudresourcemanager.NewProjectsService(baseSvc)
|
|
|
|
curPageToken := ""
|
|
|
|
// Iterate over projects list result pages and immediately return nil when
|
|
// the project ID is found.
|
|
for {
|
|
resp, err := projectSvc.List().PageToken(curPageToken).Context(ctx).Do()
|
|
if err != nil {
|
|
return fmt.Errorf("Error getting projects list: %s", err.Error())
|
|
}
|
|
|
|
for _, p := range resp.Projects {
|
|
if p.ProjectId == projectID {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
if resp.NextPageToken != "" {
|
|
curPageToken = resp.NextPageToken
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return errGCSProjectIDNotFound
|
|
}
|
|
|
|
// gcsGateway - Implements gateway for Minio and GCS compatible object storage servers.
|
|
type gcsGateway struct {
|
|
client *storage.Client
|
|
anonClient *minio.Core
|
|
projectID string
|
|
ctx context.Context
|
|
}
|
|
|
|
const googleStorageEndpoint = "storage.googleapis.com"
|
|
|
|
// newGCSGateway returns gcs gatewaylayer
|
|
func newGCSGateway(projectID string) (GatewayLayer, error) {
|
|
ctx := context.Background()
|
|
|
|
err := checkGCSProjectID(ctx, projectID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Initialize a GCS client.
|
|
client, err := storage.NewClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Initialize a anonymous client with minio core APIs.
|
|
anonClient, err := minio.NewCore(googleStorageEndpoint, "", "", true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
anonClient.SetCustomTransport(newCustomHTTPTransport())
|
|
|
|
gateway := &gcsGateway{
|
|
client: client,
|
|
projectID: projectID,
|
|
ctx: ctx,
|
|
anonClient: anonClient,
|
|
}
|
|
// Start background process to cleanup old files in minio.sys.tmp
|
|
go gateway.CleanupGCSMinioSysTmp()
|
|
return gateway, nil
|
|
}
|
|
|
|
// Cleanup old files in minio.sys.tmp of the given bucket.
|
|
func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) {
|
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: globalMinioSysTmp, Versions: false})
|
|
for {
|
|
attrs, err := it.Next()
|
|
if err != nil {
|
|
if err != iterator.Done {
|
|
errorIf(err, "Object listing error on bucket %s during purging of old files in minio.sys.tmp", bucket)
|
|
}
|
|
return
|
|
}
|
|
if time.Since(attrs.Updated) > gcsMultipartExpiry {
|
|
// Delete files older than 2 weeks.
|
|
err := l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx)
|
|
if err != nil {
|
|
errorIf(err, "Unable to delete %s/%s during purging of old files in minio.sys.tmp", bucket, attrs.Name)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cleanup old files in minio.sys.tmp of all buckets.
|
|
func (l *gcsGateway) CleanupGCSMinioSysTmp() {
|
|
for {
|
|
it := l.client.Buckets(l.ctx, l.projectID)
|
|
for {
|
|
attrs, err := it.Next()
|
|
if err != nil {
|
|
if err != iterator.Done {
|
|
errorIf(err, "Bucket listing error during purging of old files in minio.sys.tmp")
|
|
}
|
|
break
|
|
}
|
|
l.CleanupGCSMinioSysTmpBucket(attrs.Name)
|
|
}
|
|
// Run the cleanup loop every 1 day.
|
|
time.Sleep(gcsCleanupInterval)
|
|
}
|
|
}
|
|
|
|
// Shutdown - save any gateway metadata to disk
|
|
// if necessary and reload upon next restart.
|
|
func (l *gcsGateway) Shutdown() error {
|
|
return nil
|
|
}
|
|
|
|
// StorageInfo - Not relevant to GCS backend.
|
|
func (l *gcsGateway) StorageInfo() StorageInfo {
|
|
return StorageInfo{}
|
|
}
|
|
|
|
// MakeBucketWithLocation - Create a new container on GCS backend.
|
|
func (l *gcsGateway) MakeBucketWithLocation(bucket, location string) error {
|
|
bkt := l.client.Bucket(bucket)
|
|
|
|
// we'll default to the us multi-region in case of us-east-1
|
|
if location == "us-east-1" {
|
|
location = "us"
|
|
}
|
|
|
|
err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{
|
|
Location: location,
|
|
})
|
|
|
|
return gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
// GetBucketInfo - Get bucket metadata..
|
|
func (l *gcsGateway) GetBucketInfo(bucket string) (BucketInfo, error) {
|
|
attrs, err := l.client.Bucket(bucket).Attrs(l.ctx)
|
|
if err != nil {
|
|
return BucketInfo{}, gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
return BucketInfo{
|
|
Name: attrs.Name,
|
|
Created: attrs.Created,
|
|
}, nil
|
|
}
|
|
|
|
// ListBuckets lists all buckets under your project-id on GCS.
|
|
func (l *gcsGateway) ListBuckets() (buckets []BucketInfo, err error) {
|
|
it := l.client.Buckets(l.ctx, l.projectID)
|
|
|
|
// Iterate and capture all the buckets.
|
|
for {
|
|
attrs, ierr := it.Next()
|
|
if ierr == iterator.Done {
|
|
break
|
|
}
|
|
|
|
if ierr != nil {
|
|
return buckets, gcsToObjectError(traceError(ierr))
|
|
}
|
|
|
|
buckets = append(buckets, BucketInfo{
|
|
Name: attrs.Name,
|
|
Created: attrs.Created,
|
|
})
|
|
}
|
|
|
|
return buckets, nil
|
|
}
|
|
|
|
// DeleteBucket delete a bucket on GCS.
|
|
func (l *gcsGateway) DeleteBucket(bucket string) error {
|
|
itObject := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: slashSeparator, Versions: false})
|
|
// We list the bucket and if we find any objects we return BucketNotEmpty error. If we
|
|
// find only "minio.sys.tmp/" then we remove it before deleting the bucket.
|
|
gcsMinioPathFound := false
|
|
nonGCSMinioPathFound := false
|
|
for {
|
|
objAttrs, err := itObject.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return gcsToObjectError(traceError(err))
|
|
}
|
|
if objAttrs.Prefix == globalMinioSysTmp {
|
|
gcsMinioPathFound = true
|
|
continue
|
|
}
|
|
nonGCSMinioPathFound = true
|
|
break
|
|
}
|
|
if nonGCSMinioPathFound {
|
|
return gcsToObjectError(traceError(BucketNotEmpty{}))
|
|
}
|
|
if gcsMinioPathFound {
|
|
// Remove minio.sys.tmp before deleting the bucket.
|
|
itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: globalMinioSysTmp})
|
|
for {
|
|
objAttrs, err := itObject.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return gcsToObjectError(traceError(err))
|
|
}
|
|
err = l.client.Bucket(bucket).Object(objAttrs.Name).Delete(l.ctx)
|
|
if err != nil {
|
|
return gcsToObjectError(traceError(err))
|
|
}
|
|
}
|
|
}
|
|
err := l.client.Bucket(bucket).Delete(l.ctx)
|
|
return gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
func toGCSPageToken(name string) string {
|
|
length := uint16(len(name))
|
|
|
|
b := []byte{
|
|
0xa,
|
|
byte(length & 0xFF),
|
|
}
|
|
|
|
length = length >> 7
|
|
if length > 0 {
|
|
b = append(b, byte(length&0xFF))
|
|
}
|
|
|
|
b = append(b, []byte(name)...)
|
|
|
|
return base64.StdEncoding.EncodeToString(b)
|
|
}
|
|
|
|
// Returns true if marker was returned by GCS, i.e prefixed with
|
|
// ##minio by minio gcs gateway.
|
|
func isGCSMarker(marker string) bool {
|
|
return strings.HasPrefix(marker, gcsTokenPrefix)
|
|
}
|
|
|
|
// ListObjects - lists all blobs in GCS bucket filtered by prefix
|
|
func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
|
|
Delimiter: delimiter,
|
|
Prefix: prefix,
|
|
Versions: false,
|
|
})
|
|
|
|
isTruncated := false
|
|
nextMarker := ""
|
|
prefixes := []string{}
|
|
|
|
// To accommodate S3-compatible applications using
|
|
// ListObjectsV1 to use object keys as markers to control the
|
|
// listing of objects, we use the following encoding scheme to
|
|
// distinguish between GCS continuation tokens and application
|
|
// supplied markers.
|
|
//
|
|
// - NextMarker in ListObjectsV1 response is constructed by
|
|
// prefixing "##minio" to the GCS continuation token,
|
|
// e.g, "##minioCgRvYmoz"
|
|
//
|
|
// - Application supplied markers are used as-is to list
|
|
// object keys that appear after it in the lexicographical order.
|
|
|
|
// If application is using GCS continuation token we should
|
|
// strip the gcsTokenPrefix we added.
|
|
gcsMarker := isGCSMarker(marker)
|
|
if gcsMarker {
|
|
it.PageInfo().Token = strings.TrimPrefix(marker, gcsTokenPrefix)
|
|
}
|
|
|
|
it.PageInfo().MaxSize = maxKeys
|
|
|
|
objects := []ObjectInfo{}
|
|
for {
|
|
if len(objects) >= maxKeys {
|
|
// check if there is one next object and
|
|
// if that one next object is our hidden
|
|
// metadata folder, then just break
|
|
// otherwise we've truncated the output
|
|
attrs, _ := it.Next()
|
|
if attrs != nil && attrs.Prefix == globalMinioSysTmp {
|
|
break
|
|
}
|
|
|
|
isTruncated = true
|
|
break
|
|
}
|
|
|
|
attrs, err := it.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix)
|
|
}
|
|
|
|
nextMarker = toGCSPageToken(attrs.Name)
|
|
|
|
if attrs.Prefix == globalMinioSysTmp {
|
|
// We don't return our metadata prefix.
|
|
continue
|
|
}
|
|
if !strings.HasPrefix(prefix, globalMinioSysTmp) {
|
|
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
|
|
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
|
|
// which will be helpful to observe the "directory structure" for debugging purposes.
|
|
if strings.HasPrefix(attrs.Prefix, globalMinioSysTmp) ||
|
|
strings.HasPrefix(attrs.Name, globalMinioSysTmp) {
|
|
continue
|
|
}
|
|
}
|
|
if attrs.Prefix != "" {
|
|
prefixes = append(prefixes, attrs.Prefix)
|
|
continue
|
|
}
|
|
if !gcsMarker && attrs.Name <= marker {
|
|
// if user supplied a marker don't append
|
|
// objects until we reach marker (and skip it).
|
|
continue
|
|
}
|
|
|
|
objects = append(objects, ObjectInfo{
|
|
Name: attrs.Name,
|
|
Bucket: attrs.Bucket,
|
|
ModTime: attrs.Updated,
|
|
Size: attrs.Size,
|
|
ETag: fmt.Sprintf("%d", attrs.CRC32C),
|
|
UserDefined: attrs.Metadata,
|
|
ContentType: attrs.ContentType,
|
|
ContentEncoding: attrs.ContentEncoding,
|
|
})
|
|
}
|
|
|
|
return ListObjectsInfo{
|
|
IsTruncated: isTruncated,
|
|
NextMarker: gcsTokenPrefix + nextMarker,
|
|
Prefixes: prefixes,
|
|
Objects: objects,
|
|
}, nil
|
|
}
|
|
|
|
// ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix
|
|
func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool,
|
|
delimiter string, maxKeys int) (ListObjectsV2Info, error) {
|
|
|
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
|
|
Delimiter: delimiter,
|
|
Prefix: prefix,
|
|
Versions: false,
|
|
})
|
|
|
|
isTruncated := false
|
|
it.PageInfo().MaxSize = maxKeys
|
|
|
|
if continuationToken != "" {
|
|
// If client sends continuationToken, set it
|
|
it.PageInfo().Token = continuationToken
|
|
} else {
|
|
// else set the continuationToken to return
|
|
continuationToken = it.PageInfo().Token
|
|
if continuationToken != "" {
|
|
// If GCS SDK sets continuationToken, it means there are more than maxKeys in the current page
|
|
// and the response will be truncated
|
|
isTruncated = true
|
|
}
|
|
}
|
|
|
|
prefixes := []string{}
|
|
objects := []ObjectInfo{}
|
|
|
|
for {
|
|
attrs, err := it.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix)
|
|
}
|
|
|
|
if attrs.Prefix == globalMinioSysTmp {
|
|
// We don't return our metadata prefix.
|
|
continue
|
|
}
|
|
if !strings.HasPrefix(prefix, globalMinioSysTmp) {
|
|
// If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries.
|
|
// But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/
|
|
// which will be helpful to observe the "directory structure" for debugging purposes.
|
|
if strings.HasPrefix(attrs.Prefix, globalMinioSysTmp) ||
|
|
strings.HasPrefix(attrs.Name, globalMinioSysTmp) {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if attrs.Prefix != "" {
|
|
prefixes = append(prefixes, attrs.Prefix)
|
|
continue
|
|
}
|
|
|
|
objects = append(objects, fromGCSAttrsToObjectInfo(attrs))
|
|
}
|
|
|
|
return ListObjectsV2Info{
|
|
IsTruncated: isTruncated,
|
|
ContinuationToken: continuationToken,
|
|
NextContinuationToken: continuationToken,
|
|
Prefixes: prefixes,
|
|
Objects: objects,
|
|
}, nil
|
|
}
|
|
|
|
// GetObject - reads an object from GCS. Supports additional
|
|
// parameters like offset and length which are synonymous with
|
|
// HTTP Range requests.
|
|
//
|
|
// startOffset indicates the starting read location of the object.
|
|
// length indicates the total length of the object.
|
|
func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
|
|
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
|
|
// otherwise gcs will just return object not exist in case of non-existing bucket
|
|
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
|
|
return gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
object := l.client.Bucket(bucket).Object(key)
|
|
r, err := object.NewRangeReader(l.ctx, startOffset, length)
|
|
if err != nil {
|
|
return gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
defer r.Close()
|
|
|
|
if _, err := io.Copy(writer, r); err != nil {
|
|
return gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// fromMinioClientListBucketResultToV2Info converts minio ListBucketResult to ListObjectsV2Info
|
|
func fromMinioClientListBucketResultToV2Info(bucket string, result minio.ListBucketResult) ListObjectsV2Info {
|
|
objects := make([]ObjectInfo, len(result.Contents))
|
|
|
|
for i, oi := range result.Contents {
|
|
objects[i] = fromMinioClientObjectInfo(bucket, oi)
|
|
}
|
|
|
|
prefixes := make([]string, len(result.CommonPrefixes))
|
|
for i, p := range result.CommonPrefixes {
|
|
prefixes[i] = p.Prefix
|
|
}
|
|
|
|
return ListObjectsV2Info{
|
|
IsTruncated: result.IsTruncated,
|
|
Prefixes: prefixes,
|
|
Objects: objects,
|
|
ContinuationToken: result.Marker,
|
|
NextContinuationToken: result.NextMarker,
|
|
}
|
|
}
|
|
|
|
// fromGCSAttrsToObjectInfo converts GCS BucketAttrs to gateway ObjectInfo
|
|
func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo {
|
|
// All google cloud storage objects have a CRC32c hash, whereas composite objects may not have a MD5 hash
|
|
// Refer https://cloud.google.com/storage/docs/hashes-etags. Use CRC32C for ETag
|
|
return ObjectInfo{
|
|
Name: attrs.Name,
|
|
Bucket: attrs.Bucket,
|
|
ModTime: attrs.Updated,
|
|
Size: attrs.Size,
|
|
ETag: fmt.Sprintf("%d", attrs.CRC32C),
|
|
UserDefined: attrs.Metadata,
|
|
ContentType: attrs.ContentType,
|
|
ContentEncoding: attrs.ContentEncoding,
|
|
}
|
|
}
|
|
|
|
// GetObjectInfo - reads object info and replies back ObjectInfo
|
|
func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, error) {
|
|
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
|
|
// otherwise gcs will just return object not exist in case of non-existing bucket
|
|
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx)
|
|
if err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object)
|
|
}
|
|
|
|
return fromGCSAttrsToObjectInfo(attrs), nil
|
|
}
|
|
|
|
// PutObject - Create a new object with the incoming data,
|
|
func (l *gcsGateway) PutObject(bucket string, key string, data *HashReader, metadata map[string]string) (ObjectInfo, error) {
|
|
|
|
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
|
|
// otherwise gcs will just return object not exist in case of non-existing bucket
|
|
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
if _, err := hex.DecodeString(metadata["etag"]); err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
delete(metadata, "etag")
|
|
|
|
object := l.client.Bucket(bucket).Object(key)
|
|
|
|
w := object.NewWriter(l.ctx)
|
|
|
|
w.ContentType = metadata["content-type"]
|
|
w.ContentEncoding = metadata["content-encoding"]
|
|
w.Metadata = metadata
|
|
|
|
if _, err := io.Copy(w, data); err != nil {
|
|
// Close the object writer upon error.
|
|
w.Close()
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
// Close the object writer upon success.
|
|
w.Close()
|
|
|
|
if err := data.Verify(); err != nil { // Verify sha256sum after close.
|
|
object.Delete(l.ctx)
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
attrs, err := object.Attrs(l.ctx)
|
|
if err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
return fromGCSAttrsToObjectInfo(attrs), nil
|
|
}
|
|
|
|
// CopyObject - Copies a blob from source container to destination container.
|
|
func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string,
|
|
metadata map[string]string) (ObjectInfo, error) {
|
|
|
|
src := l.client.Bucket(srcBucket).Object(srcObject)
|
|
dst := l.client.Bucket(destBucket).Object(destObject)
|
|
|
|
attrs, err := dst.CopierFrom(src).Run(l.ctx)
|
|
if err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), destBucket, destObject)
|
|
}
|
|
|
|
return fromGCSAttrsToObjectInfo(attrs), nil
|
|
}
|
|
|
|
// DeleteObject - Deletes a blob in bucket
|
|
func (l *gcsGateway) DeleteObject(bucket string, object string) error {
|
|
err := l.client.Bucket(bucket).Object(object).Delete(l.ctx)
|
|
if err != nil {
|
|
return gcsToObjectError(traceError(err), bucket, object)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewMultipartUpload - upload object in multiple parts
|
|
func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[string]string) (uploadID string, err error) {
|
|
// generate new uploadid
|
|
uploadID = mustGetUUID()
|
|
|
|
// generate name for part zero
|
|
meta := gcsMultipartMetaName(uploadID)
|
|
|
|
w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx)
|
|
defer w.Close()
|
|
|
|
w.ContentType = metadata["content-type"]
|
|
w.ContentEncoding = metadata["content-encoding"]
|
|
w.Metadata = metadata
|
|
|
|
if err = json.NewEncoder(w).Encode(gcsMultipartMetaV1{
|
|
gcsMinioMultipartMetaCurrentVersion,
|
|
bucket,
|
|
key,
|
|
}); err != nil {
|
|
return "", gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// ListMultipartUploads - lists all multipart uploads.
|
|
func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
|
return ListMultipartsInfo{
|
|
KeyMarker: keyMarker,
|
|
UploadIDMarker: uploadIDMarker,
|
|
MaxUploads: maxUploads,
|
|
Prefix: prefix,
|
|
Delimiter: delimiter,
|
|
}, nil
|
|
}
|
|
|
|
// CopyObjectPart - copy part of object to other bucket and object
|
|
func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) {
|
|
return PartInfo{}, traceError(NotSupported{})
|
|
}
|
|
|
|
// Checks if minio.sys.tmp/multipart/v1/<upload-id>/gcs.json exists, returns
|
|
// an object layer compatible error upon any error.
|
|
func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID string) error {
|
|
_, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(l.ctx)
|
|
return gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
// PutObjectPart puts a part of object in bucket
|
|
func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partNumber int, data *HashReader) (PartInfo, error) {
|
|
if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
etag := data.md5Sum
|
|
if etag == "" {
|
|
// Generate random ETag.
|
|
etag = getMD5Hash([]byte(mustGetUUID()))
|
|
}
|
|
object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, partNumber, etag))
|
|
w := object.NewWriter(l.ctx)
|
|
// Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case
|
|
// where it tries to upload 0 bytes in the last chunk and get error from server.
|
|
w.ChunkSize = 0
|
|
if _, err := io.Copy(w, data); err != nil {
|
|
// Make sure to close object writer upon error.
|
|
w.Close()
|
|
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
// Make sure to close the object writer upon success.
|
|
w.Close()
|
|
|
|
if err := data.Verify(); err != nil {
|
|
object.Delete(l.ctx)
|
|
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
return PartInfo{
|
|
PartNumber: partNumber,
|
|
ETag: etag,
|
|
LastModified: UTCNow(),
|
|
Size: data.Size(),
|
|
}, nil
|
|
|
|
}
|
|
|
|
// ListObjectParts returns all object parts for specified object in specified bucket
|
|
func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
|
|
return ListPartsInfo{}, l.checkUploadIDExists(bucket, key, uploadID)
|
|
}
|
|
|
|
// Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up.
|
|
func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error {
|
|
prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPathV1, uploadID)
|
|
|
|
// iterate through all parts and delete them
|
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: prefix, Versions: false})
|
|
|
|
for {
|
|
attrs, err := it.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
object := l.client.Bucket(bucket).Object(attrs.Name)
|
|
// Ignore the error as parallel AbortMultipartUpload might have deleted it.
|
|
object.Delete(l.ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AbortMultipartUpload aborts a ongoing multipart upload
|
|
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
|
|
if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil {
|
|
return err
|
|
}
|
|
return l.cleanupMultipartUpload(bucket, key, uploadID)
|
|
}
|
|
|
|
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
|
|
// Note that there is a limit (currently 32) to the number of components that can
|
|
// be composed in a single operation. There is a limit (currently 1024) to the total
|
|
// number of components for a given composite object. This means you can append to
|
|
// each object at most 1023 times. There is a per-project rate limit (currently 200)
|
|
// to the number of components you can compose per second. This rate counts both the
|
|
// components being appended to a composite object as well as the components being
|
|
// copied when the composite object of which they are a part is copied.
|
|
func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) {
|
|
meta := gcsMultipartMetaName(uploadID)
|
|
object := l.client.Bucket(bucket).Object(meta)
|
|
|
|
partZeroAttrs, err := object.Attrs(l.ctx)
|
|
if err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
r, err := object.NewReader(l.ctx)
|
|
if err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
defer r.Close()
|
|
|
|
// Check version compatibility of the meta file before compose()
|
|
multipartMeta := gcsMultipartMetaV1{}
|
|
if err = json.NewDecoder(r).Decode(&multipartMeta); err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
|
|
if multipartMeta.Version != gcsMinioMultipartMetaCurrentVersion {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(errFormatNotSupported), bucket, key)
|
|
}
|
|
|
|
// Validate if the gcs.json stores valid entries for the bucket and key.
|
|
if multipartMeta.Bucket != bucket || multipartMeta.Object != key {
|
|
return ObjectInfo{}, gcsToObjectError(InvalidUploadID{
|
|
UploadID: uploadID,
|
|
}, bucket, key)
|
|
}
|
|
|
|
var parts []*storage.ObjectHandle
|
|
for _, uploadedPart := range uploadedParts {
|
|
parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID,
|
|
uploadedPart.PartNumber, uploadedPart.ETag)))
|
|
}
|
|
|
|
// Returns name of the composed object.
|
|
gcsMultipartComposeName := func(uploadID string, composeNumber int) string {
|
|
return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", globalMinioSysTmp, uploadID, composeNumber)
|
|
}
|
|
|
|
composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents)))
|
|
if composeCount > 1 {
|
|
// Create composes of every 32 parts.
|
|
composeParts := make([]*storage.ObjectHandle, composeCount)
|
|
for i := 0; i < composeCount; i++ {
|
|
// Create 'composed-object-N' using next 32 parts.
|
|
composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartComposeName(uploadID, i))
|
|
start := i * gcsMaxComponents
|
|
end := start + gcsMaxComponents
|
|
if end > len(parts) {
|
|
end = len(parts)
|
|
}
|
|
|
|
composer := composeParts[i].ComposerFrom(parts[start:end]...)
|
|
composer.ContentType = partZeroAttrs.ContentType
|
|
composer.Metadata = partZeroAttrs.Metadata
|
|
|
|
if _, err = composer.Run(l.ctx); err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
}
|
|
|
|
// As composes are successfully created, final object needs to be created using composes.
|
|
parts = composeParts
|
|
}
|
|
|
|
composer := l.client.Bucket(bucket).Object(key).ComposerFrom(parts...)
|
|
composer.ContentType = partZeroAttrs.ContentType
|
|
composer.Metadata = partZeroAttrs.Metadata
|
|
attrs, err := composer.Run(l.ctx)
|
|
if err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
if err = l.cleanupMultipartUpload(bucket, key, uploadID); err != nil {
|
|
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
|
}
|
|
return fromGCSAttrsToObjectInfo(attrs), nil
|
|
}
|
|
|
|
// SetBucketPolicies - Set policy on bucket
|
|
func (l *gcsGateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAccessPolicy) error {
|
|
var policies []BucketAccessPolicy
|
|
|
|
for prefix, policy := range policy.GetPolicies(policyInfo.Statements, bucket) {
|
|
policies = append(policies, BucketAccessPolicy{
|
|
Prefix: prefix,
|
|
Policy: policy,
|
|
})
|
|
}
|
|
|
|
prefix := bucket + "/*" // For all objects inside the bucket.
|
|
|
|
if len(policies) != 1 {
|
|
return traceError(NotImplemented{})
|
|
}
|
|
if policies[0].Prefix != prefix {
|
|
return traceError(NotImplemented{})
|
|
}
|
|
|
|
acl := l.client.Bucket(bucket).ACL()
|
|
if policies[0].Policy == policy.BucketPolicyNone {
|
|
if err := acl.Delete(l.ctx, storage.AllUsers); err != nil {
|
|
return gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var role storage.ACLRole
|
|
switch policies[0].Policy {
|
|
case policy.BucketPolicyReadOnly:
|
|
role = storage.RoleReader
|
|
case policy.BucketPolicyWriteOnly:
|
|
role = storage.RoleWriter
|
|
default:
|
|
return traceError(NotImplemented{})
|
|
}
|
|
|
|
if err := acl.Set(l.ctx, storage.AllUsers, role); err != nil {
|
|
return gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetBucketPolicies - Get policy on bucket
|
|
func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
|
|
rules, err := l.client.Bucket(bucket).ACL().List(l.ctx)
|
|
if err != nil {
|
|
return policy.BucketAccessPolicy{}, gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
|
|
for _, r := range rules {
|
|
if r.Entity != storage.AllUsers || r.Role == storage.RoleOwner {
|
|
continue
|
|
}
|
|
switch r.Role {
|
|
case storage.RoleReader:
|
|
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyReadOnly, bucket, "")
|
|
case storage.RoleWriter:
|
|
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyWriteOnly, bucket, "")
|
|
}
|
|
}
|
|
|
|
return policyInfo, nil
|
|
}
|
|
|
|
// DeleteBucketPolicies - Delete all policies on bucket
|
|
func (l *gcsGateway) DeleteBucketPolicies(bucket string) error {
|
|
// This only removes the storage.AllUsers policies
|
|
if err := l.client.Bucket(bucket).ACL().Delete(l.ctx, storage.AllUsers); err != nil {
|
|
return gcsToObjectError(traceError(err), bucket)
|
|
}
|
|
|
|
return nil
|
|
}
|