diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index a8b871add..36b837965 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -301,11 +301,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, partsMetadata := make([]FileInfo, len(onlineDisks)) fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives) - if opts.Versioned { - fi.VersionID = opts.VersionID - if fi.VersionID == "" { - fi.VersionID = mustGetUUID() - } + fi.VersionID = opts.VersionID + if opts.Versioned && fi.VersionID == "" { + fi.VersionID = mustGetUUID() } fi.DataDir = mustGetUUID() diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 919a6c673..efda431fa 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1433,6 +1433,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s } oi = actualfi.ToObjectInfo(bucket, object) + ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi) if len(oi.Parts) == 1 { var rs *HTTPRangeSpec gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts) @@ -1445,15 +1446,16 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } pReader := NewPutObjReader(hashReader) - ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi) ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String() _, err = er.PutObject(ctx, bucket, object, pReader, ropts) return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } - uploadID, err := er.NewMultipartUpload(ctx, bucket, object, opts) + + uploadID, err := er.NewMultipartUpload(ctx, bucket, object, ropts) if err != nil { return setRestoreHeaderFn(oi, err) } + var uploadedParts []CompletePart var rs *HTTPRangeSpec // get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted. @@ -1462,6 +1464,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s return setRestoreHeaderFn(oi, err) } defer gr.Close() + // rehydrate the parts back on disk as per the original xl.meta prior to transition for _, partInfo := range oi.Parts { hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size) @@ -1478,40 +1481,17 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s }) } - uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - storageDisks := er.getDisks() - // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) - - // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) - if err != nil { - return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) - } - - reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) - if reducedErr == errErasureWriteQuorum { - return setRestoreHeaderFn(oi, toObjectErr(reducedErr, bucket, object)) - } - - _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) - - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) - if err != nil { - return setRestoreHeaderFn(oi, err) - } - //validate parts created via multipart to transitioned object's parts info in xl.meta partsMatch := true - if len(actualfi.Parts) != len(fi.Parts) { - partsMatch = false - } - if len(actualfi.Parts) == len(fi.Parts) { - for i, pi := range actualfi.Parts { - if fi.Parts[i].Size != pi.Size { + // validate parts created via multipart + if len(oi.Parts) == len(uploadedParts) { + for i, pi := range oi.Parts { + if uploadedParts[i].ETag != pi.ETag { partsMatch = false + break } } + } else { + partsMatch = false } if !partsMatch { @@ -1525,6 +1505,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s VersionSuspended: globalBucketVersioningSys.Suspended(bucket), }) if err != nil { + uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) return setRestoreHeaderFn(oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)) } return setRestoreHeaderFn(oi, nil) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 2adc9fa0b..0f6dfce4b 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -52,13 +52,13 @@ type ObjectOptions struct { DeleteMarkerReplicationStatus string // Is only set in DELETE operations VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted. Transition TransitionOptions - NoLock bool // indicates to lower layers if the caller is expecting to hold locks. - ProxyRequest bool // only set for GET/HEAD in active-active replication scenario - ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario - ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object. - // Use the maximum parity (N/2), used when - // saving server configuration files + NoLock bool // indicates to lower layers if the caller is expecting to hold locks. + ProxyRequest bool // only set for GET/HEAD in active-active replication scenario + ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario + ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object. + + // Use the maximum parity (N/2), used when saving server configuration files MaxParity bool }