mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
gateway/gcs: Change in multipart backend format (#4455)
This commit is contained in:
parent
f99f218999
commit
3928c1e14c
@ -156,7 +156,6 @@ func azureToObjectError(err error, params ...string) error {
|
||||
|
||||
// Inits azure blob storage client and returns AzureObjects.
|
||||
func newAzureLayer(host string) (GatewayLayer, error) {
|
||||
|
||||
var err error
|
||||
var endpoint = storage.DefaultBaseURL
|
||||
var secure = true
|
||||
|
@ -29,7 +29,7 @@ func toGCSPublicURL(bucket, object string) string {
|
||||
}
|
||||
|
||||
// AnonPutObject creates a new object anonymously with the incoming data,
|
||||
func (l *gcsGateway) AnonPutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
|
||||
func (l *gcsGateway) AnonPutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
|
||||
|
||||
return ObjectInfo{}, NotImplemented{}
|
||||
}
|
||||
|
@ -21,13 +21,13 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"google.golang.org/api/googleapi"
|
||||
@ -38,17 +38,42 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// ZZZZMinioPrefix is used for metadata and multiparts. The prefix is being filtered out,
|
||||
// hence the naming of ZZZZ (last prefix)
|
||||
ZZZZMinioPrefix = "ZZZZ-Minio"
|
||||
// gcsMinioMeta is used for multiparts. We have "minio.sys.temp" prefix so that
|
||||
// listing on the GCS lists this entry in the end. Also in the gateway
|
||||
// ListObjects we filter out this entry.
|
||||
gcsMinioPath = "minio.sys.temp"
|
||||
// Path where multipart objects are saved.
|
||||
gcsMinioMultipartPath = gcsMinioPath + "/multipart"
|
||||
// Multipart meta file.
|
||||
gcsMinioMultipartMeta = "gcs.json"
|
||||
// gcs.json version number
|
||||
gcsMultipartMetaCurrentVersion = "1"
|
||||
// token prefixed with GCS returned marker to differentiate
|
||||
// from user supplied marker.
|
||||
gcsTokenPrefix = "##minio"
|
||||
)
|
||||
|
||||
// Stored in gcs.json - Contents of this file is not used anywhere. It can be
|
||||
// used for debugging purposes.
|
||||
type gcsMultipartMetaV1 struct {
|
||||
Version string `json:"version"` // Version number
|
||||
Bucket string `json:"bucket"` // Bucket name
|
||||
Object string `json:"object"` // Object name
|
||||
}
|
||||
|
||||
// Check if object prefix is "ZZZZ_Minio".
|
||||
func isGCSPrefix(prefix string) bool {
|
||||
return strings.TrimSuffix(prefix, slashSeparator) == ZZZZMinioPrefix
|
||||
return strings.TrimSuffix(prefix, slashSeparator) == gcsMinioPath
|
||||
}
|
||||
|
||||
// Returns name of the multipart meta object.
|
||||
func gcsMultipartMetaName(uploadID string) string {
|
||||
return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPath, uploadID, gcsMinioMultipartMeta)
|
||||
}
|
||||
|
||||
// Returns name of the part object.
|
||||
func gcsMultipartDataName(uploadID, etag string) string {
|
||||
return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPath, uploadID, etag)
|
||||
}
|
||||
|
||||
// Convert Minio errors to minio object layer errors.
|
||||
@ -97,6 +122,12 @@ func gcsToObjectError(err error, params ...string) error {
|
||||
if !ok {
|
||||
// We don't interpret non Minio errors. As minio errors will
|
||||
// have StatusCode to help to convert to object errors.
|
||||
e.e = err
|
||||
return e
|
||||
}
|
||||
|
||||
if len(googleAPIErr.Errors) == 0 {
|
||||
e.e = err
|
||||
return e
|
||||
}
|
||||
|
||||
@ -148,9 +179,6 @@ func gcsToObjectError(err error, params ...string) error {
|
||||
|
||||
}
|
||||
|
||||
_ = bucket
|
||||
_ = object
|
||||
|
||||
e.e = err
|
||||
return e
|
||||
}
|
||||
@ -211,23 +239,20 @@ func (l *gcsGateway) StorageInfo() StorageInfo {
|
||||
return StorageInfo{}
|
||||
}
|
||||
|
||||
// MakeBucket - Create a new container on GCS backend.
|
||||
func (l *gcsGateway) MakeBucket(bucket string) error {
|
||||
// will never be called, only satisfy ObjectLayer interface
|
||||
return traceError(NotImplemented{})
|
||||
}
|
||||
|
||||
// MakeBucket - Create a new container on GCS backend.
|
||||
// MakeBucketWithLocation - Create a new container on GCS backend.
|
||||
func (l *gcsGateway) MakeBucketWithLocation(bucket, location string) error {
|
||||
bkt := l.client.Bucket(bucket)
|
||||
|
||||
if err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{
|
||||
Location: location,
|
||||
}); err != nil {
|
||||
return gcsToObjectError(traceError(err), bucket)
|
||||
// we'll default to the us multi-region in case of us-east-1
|
||||
if location == "us-east-1" {
|
||||
location = "us"
|
||||
}
|
||||
|
||||
return nil
|
||||
err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{
|
||||
Location: location,
|
||||
})
|
||||
|
||||
return gcsToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
// GetBucketInfo - Get bucket metadata..
|
||||
@ -270,11 +295,7 @@ func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) {
|
||||
// DeleteBucket delete a bucket on GCS
|
||||
func (l *gcsGateway) DeleteBucket(bucket string) error {
|
||||
err := l.client.Bucket(bucket).Delete(l.ctx)
|
||||
if err != nil {
|
||||
return gcsToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
return nil
|
||||
return gcsToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
func toGCSPageToken(name string) string {
|
||||
@ -474,7 +495,6 @@ func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo {
|
||||
ContentType: attrs.ContentType,
|
||||
ContentEncoding: attrs.ContentEncoding,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// GetObjectInfo - reads object info and replies back ObjectInfo
|
||||
@ -504,15 +524,12 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
teeReader := data
|
||||
reader := data
|
||||
|
||||
var sha256Writer hash.Hash
|
||||
if sha256sum == "" {
|
||||
} else if _, err := hex.DecodeString(sha256sum); err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
} else {
|
||||
if sha256sum != "" {
|
||||
sha256Writer = sha256.New()
|
||||
teeReader = io.TeeReader(teeReader, sha256Writer)
|
||||
reader = io.TeeReader(data, sha256Writer)
|
||||
}
|
||||
|
||||
md5sum := metadata["etag"]
|
||||
@ -524,16 +541,17 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
|
||||
|
||||
w.ContentType = metadata["content-type"]
|
||||
w.ContentEncoding = metadata["content-encoding"]
|
||||
if md5sum == "" {
|
||||
} else if md5, err := hex.DecodeString(md5sum); err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
} else {
|
||||
w.MD5 = md5
|
||||
if md5sum != "" {
|
||||
var err error
|
||||
w.MD5, err = hex.DecodeString(md5sum)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
}
|
||||
|
||||
w.Metadata = metadata
|
||||
|
||||
_, err := io.Copy(w, teeReader)
|
||||
_, err := io.Copy(w, reader)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
@ -548,10 +566,11 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
if sha256sum == "" {
|
||||
} else if newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)); newSHA256sum != sha256sum {
|
||||
object.Delete(l.ctx)
|
||||
return ObjectInfo{}, traceError(SHA256Mismatch{})
|
||||
if sha256sum != "" {
|
||||
if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum {
|
||||
object.Delete(l.ctx)
|
||||
return ObjectInfo{}, traceError(SHA256Mismatch{})
|
||||
}
|
||||
}
|
||||
|
||||
return fromGCSAttrsToObjectInfo(attrs), nil
|
||||
@ -580,127 +599,26 @@ func (l *gcsGateway) DeleteObject(bucket string, object string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads - lists all multipart uploads.
|
||||
func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
||||
prefix = pathJoin(ZZZZMinioPrefix, "multipart-")
|
||||
|
||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
||||
|
||||
nextMarker := ""
|
||||
isTruncated := false
|
||||
|
||||
it.PageInfo().Token = uploadIDMarker
|
||||
|
||||
uploads := []uploadMetadata{}
|
||||
for {
|
||||
if len(uploads) >= maxUploads {
|
||||
isTruncated = true
|
||||
nextMarker = it.PageInfo().Token
|
||||
break
|
||||
}
|
||||
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
} else if err != nil {
|
||||
return ListMultipartsInfo{}, gcsToObjectError(traceError(err), bucket)
|
||||
}
|
||||
|
||||
if attrs.Prefix != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
objectKey, uploadID, partID, err := fromGCSMultipartKey(attrs.Name)
|
||||
if err != nil {
|
||||
continue
|
||||
} else if partID != 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
nextMarker = toGCSPageToken(attrs.Name)
|
||||
|
||||
// we count only partID == 0
|
||||
uploads = append(uploads, uploadMetadata{
|
||||
Object: objectKey,
|
||||
UploadID: uploadID,
|
||||
Initiated: attrs.Created,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return ListMultipartsInfo{
|
||||
Uploads: uploads,
|
||||
IsTruncated: isTruncated,
|
||||
|
||||
KeyMarker: nextMarker,
|
||||
UploadIDMarker: nextMarker,
|
||||
NextKeyMarker: nextMarker,
|
||||
NextUploadIDMarker: nextMarker,
|
||||
MaxUploads: maxUploads,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func fromGCSMultipartKey(s string) (key, uploadID string, partID int, err error) {
|
||||
// remove prefixes
|
||||
s = path.Base(s)
|
||||
|
||||
parts := strings.Split(s, "-")
|
||||
if parts[0] != "multipart" {
|
||||
return "", "", 0, errGCSNotValidMultipartIdentifier
|
||||
}
|
||||
|
||||
if len(parts) != 4 {
|
||||
return "", "", 0, errGCSNotValidMultipartIdentifier
|
||||
}
|
||||
|
||||
key = unescape(parts[1])
|
||||
|
||||
uploadID = parts[2]
|
||||
|
||||
partID, err = strconv.Atoi(parts[3])
|
||||
if err != nil {
|
||||
return "", "", 0, err
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func unescape(s string) string {
|
||||
s = strings.Replace(s, "%2D", "-", -1)
|
||||
s = strings.Replace(s, "%2F", "/", -1)
|
||||
s = strings.Replace(s, "%25", "%", -1)
|
||||
return s
|
||||
}
|
||||
|
||||
func escape(s string) string {
|
||||
s = strings.Replace(s, "%", "%25", -1)
|
||||
s = strings.Replace(s, "/", "%2F", -1)
|
||||
s = strings.Replace(s, "-", "%2D", -1)
|
||||
return s
|
||||
}
|
||||
|
||||
func toGCSMultipartKey(key string, uploadID string, partID int) string {
|
||||
// parts are allowed to be numbered from 1 to 10,000 (inclusive)
|
||||
|
||||
// we need to encode the key because of possible slashes
|
||||
return pathJoin(ZZZZMinioPrefix, fmt.Sprintf("multipart-%s-%s-%05d", escape(key), uploadID, partID))
|
||||
}
|
||||
|
||||
// NewMultipartUpload - upload object in multiple parts
|
||||
func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[string]string) (uploadID string, err error) {
|
||||
// generate new uploadid
|
||||
uploadID = mustGetUUID()
|
||||
uploadID = strings.Replace(uploadID, "-", "", -1)
|
||||
|
||||
// generate name for part zero
|
||||
partZeroKey := toGCSMultipartKey(key, uploadID, 0)
|
||||
meta := gcsMultipartMetaName(uploadID)
|
||||
|
||||
// we are writing a 0 sized object to hold the metadata
|
||||
w := l.client.Bucket(bucket).Object(partZeroKey).NewWriter(l.ctx)
|
||||
w := l.client.Bucket(bucket).Object(meta).NewWriter(l.ctx)
|
||||
w.ContentType = metadata["content-type"]
|
||||
w.ContentEncoding = metadata["content-encoding"]
|
||||
w.Metadata = metadata
|
||||
|
||||
content, err := json.Marshal(gcsMultipartMetaV1{gcsMultipartMetaCurrentVersion, bucket, key})
|
||||
if err != nil {
|
||||
return "", gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
if _, err = w.Write(content); err != nil {
|
||||
return "", gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
if err = w.Close(); err != nil {
|
||||
return "", gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
@ -708,6 +626,17 @@ func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[
|
||||
return uploadID, nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads - lists all multipart uploads.
|
||||
func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
||||
return ListMultipartsInfo{
|
||||
KeyMarker: keyMarker,
|
||||
UploadIDMarker: uploadIDMarker,
|
||||
MaxUploads: maxUploads,
|
||||
Prefix: prefix,
|
||||
Delimiter: delimiter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CopyObjectPart - copy part of object to other bucket and object
|
||||
func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) {
|
||||
return PartInfo{}, traceError(NotSupported{})
|
||||
@ -715,131 +644,146 @@ func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBuck
|
||||
|
||||
// PutObjectPart puts a part of object in bucket
|
||||
func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) {
|
||||
multipartKey := toGCSMultipartKey(key, uploadID, partID)
|
||||
meta := gcsMultipartMetaName(uploadID)
|
||||
object := l.client.Bucket(bucket).Object(meta)
|
||||
|
||||
_, err := object.Attrs(l.ctx)
|
||||
if err != nil {
|
||||
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
var sha256Writer hash.Hash
|
||||
|
||||
// Generate random ETag.
|
||||
etag := getMD5Hash([]byte(mustGetUUID()))
|
||||
|
||||
reader := data
|
||||
|
||||
if sha256sum != "" {
|
||||
sha256Writer = sha256.New()
|
||||
reader = io.TeeReader(data, sha256Writer)
|
||||
}
|
||||
|
||||
dataName := gcsMultipartDataName(uploadID, etag)
|
||||
|
||||
object = l.client.Bucket(bucket).Object(dataName)
|
||||
|
||||
w := object.NewWriter(l.ctx)
|
||||
// Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case
|
||||
// where it tries to upload 0 bytes in the last chunk and get error from server.
|
||||
w.ChunkSize = 0
|
||||
if md5Hex != "" {
|
||||
w.MD5, err = hex.DecodeString(md5Hex)
|
||||
if err != nil {
|
||||
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
}
|
||||
_, err = io.Copy(w, reader)
|
||||
if err != nil {
|
||||
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
return PartInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
if sha256sum != "" {
|
||||
if hex.EncodeToString(sha256Writer.Sum(nil)) != sha256sum {
|
||||
object.Delete(l.ctx)
|
||||
return PartInfo{}, traceError(SHA256Mismatch{})
|
||||
}
|
||||
}
|
||||
|
||||
info, err := l.PutObject(bucket, multipartKey, size, data, map[string]string{}, sha256sum)
|
||||
return PartInfo{
|
||||
PartNumber: partID,
|
||||
LastModified: info.ModTime,
|
||||
ETag: info.ETag,
|
||||
Size: info.Size,
|
||||
}, err
|
||||
ETag: etag,
|
||||
LastModified: time.Now().UTC(),
|
||||
Size: size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListObjectParts returns all object parts for specified object in specified bucket
|
||||
func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
|
||||
delimiter := slashSeparator
|
||||
prefix := pathJoin(ZZZZMinioPrefix, fmt.Sprintf("multipart-%s-%s", escape(key), uploadID))
|
||||
meta := gcsMultipartMetaName(uploadID)
|
||||
object := l.client.Bucket(bucket).Object(meta)
|
||||
|
||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
||||
|
||||
isTruncated := false
|
||||
|
||||
it.PageInfo().Token = toGCSPageToken(toGCSMultipartKey(key, uploadID, partNumberMarker))
|
||||
it.PageInfo().MaxSize = maxParts
|
||||
|
||||
nextPartnumberMarker := 0
|
||||
|
||||
parts := []PartInfo{}
|
||||
for {
|
||||
if len(parts) >= maxParts {
|
||||
isTruncated = true
|
||||
break
|
||||
}
|
||||
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
} else if err != nil {
|
||||
return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, prefix)
|
||||
}
|
||||
|
||||
if attrs.Prefix != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
_, _, partID, err := fromGCSMultipartKey(attrs.Name)
|
||||
if err != nil {
|
||||
continue
|
||||
} else if partID == 0 {
|
||||
// we'll ignore partID 0, it is our zero object, containing
|
||||
// metadata
|
||||
continue
|
||||
}
|
||||
|
||||
nextPartnumberMarker = partID
|
||||
|
||||
parts = append(parts, PartInfo{
|
||||
PartNumber: partID,
|
||||
LastModified: attrs.Updated,
|
||||
ETag: fmt.Sprintf("%d", attrs.CRC32C),
|
||||
Size: attrs.Size,
|
||||
})
|
||||
_, err := object.Attrs(l.ctx)
|
||||
if err != nil {
|
||||
return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
return ListPartsInfo{
|
||||
IsTruncated: isTruncated,
|
||||
NextPartNumberMarker: nextPartnumberMarker,
|
||||
Parts: parts,
|
||||
}, nil
|
||||
return ListPartsInfo{}, nil
|
||||
}
|
||||
|
||||
// AbortMultipartUpload aborts a ongoing multipart upload
|
||||
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
|
||||
delimiter := slashSeparator
|
||||
prefix := pathJoin(ZZZZMinioPrefix, fmt.Sprintf("multipart-%s-%s", escape(key), uploadID))
|
||||
// Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up.
|
||||
func (l *gcsGateway) cleanupMultipartUpload(bucket, key, uploadID string) error {
|
||||
meta := gcsMultipartMetaName(uploadID)
|
||||
object := l.client.Bucket(bucket).Object(meta)
|
||||
|
||||
_, err := object.Attrs(l.ctx)
|
||||
if err != nil {
|
||||
return gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPath, uploadID)
|
||||
|
||||
// iterate through all parts and delete them
|
||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
||||
|
||||
it.PageInfo().Token = toGCSPageToken(toGCSMultipartKey(key, uploadID, 0))
|
||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: prefix, Versions: false})
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if err == iterator.Done {
|
||||
break
|
||||
} else if err != nil {
|
||||
}
|
||||
if err != nil {
|
||||
return gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
// on error continue deleting other parts
|
||||
l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx)
|
||||
object := l.client.Bucket(bucket).Object(attrs.Name)
|
||||
// Ignore the error as parallel AbortMultipartUpload might have deleted it.
|
||||
object.Delete(l.ctx)
|
||||
}
|
||||
|
||||
// delete part zero, ignoring errors here, we want to clean up all remains
|
||||
_ = l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, 0)).Delete(l.ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AbortMultipartUpload aborts a ongoing multipart upload
|
||||
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
|
||||
return l.cleanupMultipartUpload(bucket, key, uploadID)
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
|
||||
|
||||
// Note that there is a limit (currently 32) to the number of components that can be composed in a single operation.
|
||||
|
||||
// There is a limit (currently 1024) to the total number of components for a given composite object. This means you can append to each object at most 1023 times.
|
||||
|
||||
// There is a per-project rate limit (currently 200) to the number of components you can compose per second. This rate counts both the components being appended to a composite object as well as the components being copied when the composite object of which they are a part is copied.
|
||||
func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) {
|
||||
partZero := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, 0))
|
||||
meta := gcsMultipartMetaName(uploadID)
|
||||
object := l.client.Bucket(bucket).Object(meta)
|
||||
|
||||
partZeroAttrs, err := partZero.Attrs(l.ctx)
|
||||
partZeroAttrs, err := object.Attrs(l.ctx)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
r, err := object.NewReader(l.ctx)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
// Check version compatibility of the meta file before compose()
|
||||
multipartMeta := gcsMultipartMetaV1{}
|
||||
decoder := json.NewDecoder(r)
|
||||
err = decoder.Decode(&multipartMeta)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
if multipartMeta.Version != gcsMultipartMetaCurrentVersion {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(errFormatNotSupported), bucket, key)
|
||||
}
|
||||
|
||||
parts := make([]*storage.ObjectHandle, len(uploadedParts))
|
||||
for i, uploadedPart := range uploadedParts {
|
||||
object := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber))
|
||||
attrs, partErr := object.Attrs(l.ctx)
|
||||
if partErr != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(partErr), bucket, key)
|
||||
}
|
||||
crc32cStr := fmt.Sprintf("%d", attrs.CRC32C)
|
||||
if crc32cStr != uploadedPart.ETag {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(InvalidPart{}), bucket, key)
|
||||
}
|
||||
|
||||
parts[i] = object
|
||||
parts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag))
|
||||
}
|
||||
|
||||
if len(parts) > 32 {
|
||||
@ -858,14 +802,13 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
|
||||
composer.Metadata = partZeroAttrs.Metadata
|
||||
|
||||
attrs, err := composer.Run(l.ctx)
|
||||
// cleanup, delete all parts
|
||||
for _, uploadedPart := range uploadedParts {
|
||||
l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)).Delete(l.ctx)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
|
||||
partZero.Delete(l.ctx)
|
||||
|
||||
return fromGCSAttrsToObjectInfo(attrs), gcsToObjectError(traceError(err), bucket, key)
|
||||
if err = l.cleanupMultipartUpload(bucket, key, uploadID); err != nil {
|
||||
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
|
||||
}
|
||||
return fromGCSAttrsToObjectInfo(attrs), nil
|
||||
}
|
||||
|
||||
// SetBucketPolicies - Set policy on bucket
|
||||
|
@ -18,41 +18,6 @@ package cmd
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestEscape(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Value string
|
||||
EscapedValue string
|
||||
}{
|
||||
{
|
||||
Value: "test-test",
|
||||
EscapedValue: "test%2Dtest",
|
||||
},
|
||||
{
|
||||
Value: "test/test",
|
||||
EscapedValue: "test%2Ftest",
|
||||
},
|
||||
{
|
||||
Value: "test%test",
|
||||
EscapedValue: "test%25test",
|
||||
},
|
||||
{
|
||||
Value: "%%%////+++",
|
||||
EscapedValue: "%25%25%25%2F%2F%2F%2F+++",
|
||||
},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
if escape(testCase.Value) != testCase.EscapedValue {
|
||||
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.EscapedValue, escape(testCase.Value))
|
||||
}
|
||||
|
||||
if unescape(testCase.EscapedValue) != testCase.Value {
|
||||
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.Value, unescape(testCase.EscapedValue))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestToGCSPageToken(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Name string
|
||||
@ -122,7 +87,6 @@ func TestValidGCSProjectID(t *testing.T) {
|
||||
{"projectid1414", true},
|
||||
}
|
||||
|
||||
//
|
||||
for i, testCase := range testCases {
|
||||
if isValidGCSProjectID(testCase.ProjectID) != testCase.Valid {
|
||||
t.Errorf("Test %d: Expected %v, got %v", i+1, isValidGCSProjectID(testCase.ProjectID), testCase.Valid)
|
||||
@ -148,12 +112,12 @@ func TestIsGCSPrefix(t *testing.T) {
|
||||
},
|
||||
// GCS prefix without a trailing slash
|
||||
{
|
||||
prefix: ZZZZMinioPrefix,
|
||||
prefix: gcsMinioPath,
|
||||
expectedRes: true,
|
||||
},
|
||||
// GCS prefix with a trailing slash
|
||||
{
|
||||
prefix: ZZZZMinioPrefix + "/",
|
||||
prefix: gcsMinioPath + "/",
|
||||
expectedRes: true,
|
||||
},
|
||||
}
|
||||
@ -196,3 +160,24 @@ func TestIsGCSMarker(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test for gcsMultipartMetaName.
|
||||
func TestGCSMultipartMetaName(t *testing.T) {
|
||||
uploadID := "a"
|
||||
expected := pathJoin(gcsMinioMultipartPath, uploadID, gcsMinioMultipartMeta)
|
||||
got := gcsMultipartMetaName(uploadID)
|
||||
if expected != got {
|
||||
t.Errorf("expected: %s, got: %s", expected, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test for gcsMultipartDataName.
|
||||
func TestGCSMultipartDataName(t *testing.T) {
|
||||
uploadID := "a"
|
||||
etag := "b"
|
||||
expected := pathJoin(gcsMinioMultipartPath, uploadID, etag)
|
||||
got := gcsMultipartDataName(uploadID, etag)
|
||||
if expected != got {
|
||||
t.Errorf("expected: %s, got: %s", expected, got)
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,10 @@ var errUnexpected = errors.New("Unexpected error, please report this issue at ht
|
||||
// errCorruptedFormat - corrupted backend format.
|
||||
var errCorruptedFormat = errors.New("corrupted backend format, please join https://slack.minio.io for assistance")
|
||||
|
||||
// errFormatNotSupported - returned when older minio tries to parse metadata
|
||||
// created by newer minio.
|
||||
var errFormatNotSupported = errors.New("format not supported")
|
||||
|
||||
// errUnformattedDisk - unformatted disk found.
|
||||
var errUnformattedDisk = errors.New("unformatted disk found")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user