mirror of
https://github.com/minio/minio.git
synced 2025-04-21 11:04:20 -04:00
allow multipart uploads for single part multipart (#12821)
its possible that some multipart uploads would have uploaded only single parts so relying on `len(o.Parts)` alone is not sufficient, we need to look for ETag pattern to be absolutely sure.
This commit is contained in:
parent
b6cd54779c
commit
3c34e18a4e
@ -801,14 +801,11 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||||
if len(objInfo.Parts) > 1 {
|
if objInfo.Multipart {
|
||||||
if uploadID, err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object, r, objInfo, putOpts); err != nil {
|
if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
|
||||||
|
r, objInfo, putOpts); err != nil {
|
||||||
replicationStatus = replication.Failed
|
replicationStatus = replication.Failed
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||||
// block and abort remote upload upon failure.
|
|
||||||
if err = c.AbortMultipartUpload(ctx, dest.Bucket, object, uploadID); err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to clean up failed upload %s on remote %s/%s: %w", uploadID, dest.Bucket, object, err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
||||||
@ -876,38 +873,52 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (uploadID string, err error) {
|
func replicateObjectWithMultipart(ctx context.Context, c *miniogo.Core, bucket, object string, r io.Reader, objInfo ObjectInfo, opts miniogo.PutObjectOptions) (err error) {
|
||||||
var uploadedParts []miniogo.CompletePart
|
var uploadedParts []miniogo.CompletePart
|
||||||
uploadID, err = c.NewMultipartUpload(context.Background(), bucket, object, opts)
|
uploadID, err := c.NewMultipartUpload(context.Background(), bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
// block and abort remote upload upon failure.
|
||||||
|
if aerr := c.AbortMultipartUpload(ctx, bucket, object, uploadID); aerr != nil {
|
||||||
|
aerr = fmt.Errorf("Unable to cleanup failed multipart replication %s on remote %s/%s: %w", uploadID, bucket, object, aerr)
|
||||||
|
logger.LogIf(ctx, aerr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hr *hash.Reader
|
hr *hash.Reader
|
||||||
pInfo miniogo.ObjectPart
|
pInfo miniogo.ObjectPart
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, partInfo := range objInfo.Parts {
|
for _, partInfo := range objInfo.Parts {
|
||||||
hr, err = hash.NewReader(r, partInfo.Size, "", "", partInfo.Size)
|
hr, err = hash.NewReader(r, partInfo.Size, "", "", partInfo.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.Size, "", "", opts.ServerSideEncryption)
|
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.Size, "", "", opts.ServerSideEncryption)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
if pInfo.Size != partInfo.Size {
|
if pInfo.Size != partInfo.Size {
|
||||||
return uploadID, fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.Size)
|
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.Size)
|
||||||
}
|
}
|
||||||
uploadedParts = append(uploadedParts, miniogo.CompletePart{
|
uploadedParts = append(uploadedParts, miniogo.CompletePart{
|
||||||
PartNumber: pInfo.PartNumber,
|
PartNumber: pInfo.PartNumber,
|
||||||
ETag: pInfo.ETag,
|
ETag: pInfo.ETag,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{
|
_, err = c.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, miniogo.PutObjectOptions{
|
||||||
SourceMTime: objInfo.ModTime,
|
Internal: miniogo.AdvancedPutOptions{
|
||||||
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
|
SourceMTime: objInfo.ModTime,
|
||||||
}})
|
// always set this to distinguish between `mc mirror` replication and serverside
|
||||||
return
|
ReplicationRequest: true,
|
||||||
|
}})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
||||||
|
@ -44,7 +44,7 @@ import (
|
|||||||
"github.com/minio/console/restapi"
|
"github.com/minio/console/restapi"
|
||||||
"github.com/minio/console/restapi/operations"
|
"github.com/minio/console/restapi/operations"
|
||||||
"github.com/minio/kes"
|
"github.com/minio/kes"
|
||||||
minio "github.com/minio/minio-go/v7"
|
"github.com/minio/minio-go/v7"
|
||||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||||
"github.com/minio/minio-go/v7/pkg/set"
|
"github.com/minio/minio-go/v7/pkg/set"
|
||||||
"github.com/minio/minio/internal/auth"
|
"github.com/minio/minio/internal/auth"
|
||||||
@ -105,9 +105,10 @@ func init() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Set number of max retries to 1 for minio-go clients
|
|
||||||
minio.MaxRetry = 1
|
|
||||||
|
|
||||||
|
// All minio-go API operations shall be performed only once,
|
||||||
|
// another way to look at this is we are turning off retries.
|
||||||
|
minio.MaxRetry = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
const consolePrefix = "CONSOLE_"
|
const consolePrefix = "CONSOLE_"
|
||||||
|
@ -27,6 +27,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/bucket/replication"
|
"github.com/minio/minio/internal/bucket/replication"
|
||||||
|
"github.com/minio/minio/internal/crypto"
|
||||||
|
"github.com/minio/minio/internal/etag"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/sync/errgroup"
|
"github.com/minio/minio/internal/sync/errgroup"
|
||||||
@ -141,6 +143,13 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
|
|||||||
// Extract etag from metadata.
|
// Extract etag from metadata.
|
||||||
objInfo.ETag = extractETag(fi.Metadata)
|
objInfo.ETag = extractETag(fi.Metadata)
|
||||||
|
|
||||||
|
// Verify if Etag is parseable, if yes
|
||||||
|
// then check if its multipart etag.
|
||||||
|
et, e := etag.Parse(objInfo.ETag)
|
||||||
|
if e == nil {
|
||||||
|
objInfo.Multipart = et.IsMultipart()
|
||||||
|
}
|
||||||
|
|
||||||
// Add user tags to the object info
|
// Add user tags to the object info
|
||||||
tags := fi.Metadata[xhttp.AmzObjectTagging]
|
tags := fi.Metadata[xhttp.AmzObjectTagging]
|
||||||
if len(tags) != 0 {
|
if len(tags) != 0 {
|
||||||
@ -165,6 +174,14 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
|
|||||||
// Tags have also been extracted, we remove that as well.
|
// Tags have also been extracted, we remove that as well.
|
||||||
objInfo.UserDefined = cleanMetadata(fi.Metadata)
|
objInfo.UserDefined = cleanMetadata(fi.Metadata)
|
||||||
|
|
||||||
|
// Set multipart for encryption properly if
|
||||||
|
// not set already.
|
||||||
|
if !objInfo.Multipart {
|
||||||
|
if _, ok := crypto.IsEncrypted(objInfo.UserDefined); ok {
|
||||||
|
objInfo.Multipart = crypto.IsMultiPart(objInfo.UserDefined)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// All the parts per object.
|
// All the parts per object.
|
||||||
objInfo.Parts = fi.Parts
|
objInfo.Parts = fi.Parts
|
||||||
|
|
||||||
|
@ -29,6 +29,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
"github.com/minio/minio/internal/crypto"
|
||||||
|
"github.com/minio/minio/internal/etag"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/lock"
|
"github.com/minio/minio/internal/lock"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
@ -167,6 +169,12 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
|
|||||||
}
|
}
|
||||||
|
|
||||||
objInfo.ETag = extractETag(m.Meta)
|
objInfo.ETag = extractETag(m.Meta)
|
||||||
|
|
||||||
|
et, e := etag.Parse(objInfo.ETag)
|
||||||
|
if e == nil {
|
||||||
|
objInfo.Multipart = et.IsMultipart()
|
||||||
|
}
|
||||||
|
|
||||||
objInfo.ContentType = m.Meta["content-type"]
|
objInfo.ContentType = m.Meta["content-type"]
|
||||||
objInfo.ContentEncoding = m.Meta["content-encoding"]
|
objInfo.ContentEncoding = m.Meta["content-encoding"]
|
||||||
if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok {
|
if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok {
|
||||||
@ -176,7 +184,6 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
|
|||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
t time.Time
|
t time.Time
|
||||||
e error
|
|
||||||
)
|
)
|
||||||
if exp, ok := m.Meta["expires"]; ok {
|
if exp, ok := m.Meta["expires"]; ok {
|
||||||
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
||||||
@ -193,6 +200,14 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
|
|||||||
// Tags have also been extracted, we remove that as well.
|
// Tags have also been extracted, we remove that as well.
|
||||||
objInfo.UserDefined = cleanMetadata(m.Meta)
|
objInfo.UserDefined = cleanMetadata(m.Meta)
|
||||||
|
|
||||||
|
// Set multipart for encryption properly if
|
||||||
|
// not set already.
|
||||||
|
if !objInfo.Multipart {
|
||||||
|
if _, ok := crypto.IsEncrypted(objInfo.UserDefined); ok {
|
||||||
|
objInfo.Multipart = crypto.IsMultiPart(objInfo.UserDefined)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// All the parts per object.
|
// All the parts per object.
|
||||||
objInfo.Parts = m.Parts
|
objInfo.Parts = m.Parts
|
||||||
|
|
||||||
|
@ -173,6 +173,8 @@ type ObjectInfo struct {
|
|||||||
|
|
||||||
Legacy bool // indicates object on disk is in legacy data format
|
Legacy bool // indicates object on disk is in legacy data format
|
||||||
|
|
||||||
|
Multipart bool // indicates if object is multipart object.
|
||||||
|
|
||||||
// backendType indicates which backend filled this structure
|
// backendType indicates which backend filled this structure
|
||||||
backendType BackendType
|
backendType BackendType
|
||||||
|
|
||||||
@ -193,6 +195,7 @@ func (o ObjectInfo) Clone() (cinfo ObjectInfo) {
|
|||||||
Size: o.Size,
|
Size: o.Size,
|
||||||
IsDir: o.IsDir,
|
IsDir: o.IsDir,
|
||||||
ETag: o.ETag,
|
ETag: o.ETag,
|
||||||
|
Multipart: o.Multipart,
|
||||||
InnerETag: o.InnerETag,
|
InnerETag: o.InnerETag,
|
||||||
VersionID: o.VersionID,
|
VersionID: o.VersionID,
|
||||||
IsLatest: o.IsLatest,
|
IsLatest: o.IsLatest,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user