mirror of
https://github.com/minio/minio.git
synced 2025-11-06 20:33:07 -05:00
Use CompleteMultipartUpload in RestoreTransitionedObject
Signed-off-by: Krishnan Parthasarathi <kp@minio.io>
This commit is contained in:
@@ -1395,7 +1395,7 @@ func (er erasureObjects) RestoreTransitionedObject(ctx context.Context, bucket,
|
||||
}
|
||||
|
||||
// update restore status header in the metadata
|
||||
func (er erasureObjects) updateRestoreMetadata(ctx context.Context, bucket, object string, objInfo ObjectInfo, opts ObjectOptions, noLock bool, rerr error) error {
|
||||
func (er erasureObjects) updateRestoreMetadata(ctx context.Context, bucket, object string, objInfo ObjectInfo, opts ObjectOptions, rerr error) error {
|
||||
oi := objInfo.Clone()
|
||||
oi.metadataOnly = true // Perform only metadata updates.
|
||||
|
||||
@@ -1408,7 +1408,6 @@ func (er erasureObjects) updateRestoreMetadata(ctx context.Context, bucket, obje
|
||||
VersionID: oi.VersionID,
|
||||
}, ObjectOptions{
|
||||
VersionID: oi.VersionID,
|
||||
NoLock: noLock, // true if lock already taken
|
||||
}); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to update transition restore metadata for %s/%s(%s): %s", bucket, object, oi.VersionID, err))
|
||||
return err
|
||||
@@ -1422,15 +1421,15 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
|
||||
defer func() {
|
||||
ObjectPathUpdated(pathJoin(bucket, object))
|
||||
}()
|
||||
setRestoreHeaderFn := func(oi ObjectInfo, noLock bool, rerr error) error {
|
||||
er.updateRestoreMetadata(ctx, bucket, object, oi, opts, noLock, rerr)
|
||||
setRestoreHeaderFn := func(oi ObjectInfo, rerr error) error {
|
||||
er.updateRestoreMetadata(ctx, bucket, object, oi, opts, rerr)
|
||||
return rerr
|
||||
}
|
||||
var oi ObjectInfo
|
||||
// get the file info on disk for transitioned object
|
||||
actualfi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
||||
}
|
||||
|
||||
oi = actualfi.ToObjectInfo(bucket, object)
|
||||
@@ -1438,40 +1437,40 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
|
||||
var rs *HTTPRangeSpec
|
||||
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
||||
}
|
||||
defer gr.Close()
|
||||
hashReader, err := hash.NewReader(gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
||||
}
|
||||
pReader := NewPutObjReader(hashReader)
|
||||
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
|
||||
ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
|
||||
_, err = er.PutObject(ctx, bucket, object, pReader, ropts)
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
||||
}
|
||||
uploadID, err := er.NewMultipartUpload(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
return setRestoreHeaderFn(oi, err)
|
||||
}
|
||||
var uploadedParts []CompletePart
|
||||
var rs *HTTPRangeSpec
|
||||
// get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted.
|
||||
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
return setRestoreHeaderFn(oi, err)
|
||||
}
|
||||
defer gr.Close()
|
||||
// rehydrate the parts back on disk as per the original xl.meta prior to transition
|
||||
for _, partInfo := range oi.Parts {
|
||||
hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
return setRestoreHeaderFn(oi, err)
|
||||
}
|
||||
pInfo, err := er.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, NewPutObjReader(hr), ObjectOptions{})
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
return setRestoreHeaderFn(oi, err)
|
||||
}
|
||||
uploadedParts = append(uploadedParts, CompletePart{
|
||||
PartNumber: pInfo.PartNumber,
|
||||
@@ -1487,20 +1486,20 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(err, bucket, object))
|
||||
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
||||
}
|
||||
|
||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if reducedErr == errErasureWriteQuorum {
|
||||
return setRestoreHeaderFn(oi, false, toObjectErr(reducedErr, bucket, object))
|
||||
return setRestoreHeaderFn(oi, toObjectErr(reducedErr, bucket, object))
|
||||
}
|
||||
|
||||
onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
return setRestoreHeaderFn(oi, err)
|
||||
}
|
||||
//validate parts created via multipart to transitioned object's parts info in xl.meta
|
||||
partsMatch := true
|
||||
@@ -1516,27 +1515,12 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
|
||||
}
|
||||
|
||||
if !partsMatch {
|
||||
return setRestoreHeaderFn(oi, false, InvalidObjectState{Bucket: bucket, Object: object})
|
||||
return setRestoreHeaderFn(oi, InvalidObjectState{Bucket: bucket, Object: object})
|
||||
}
|
||||
var currentFI = actualfi
|
||||
currentFI.DataDir = fi.DataDir
|
||||
|
||||
// Hold namespace to complete the transaction
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
|
||||
_, err = er.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, false, err)
|
||||
return setRestoreHeaderFn(oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath))
|
||||
}
|
||||
defer lk.Unlock()
|
||||
|
||||
// Attempt to rename temp upload object to actual upload path object
|
||||
_, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, path.Join(uploadIDPath, fi.DataDir), bucket, path.Join(object, actualfi.DataDir), true, writeQuorum, nil)
|
||||
if err != nil {
|
||||
return setRestoreHeaderFn(oi, true, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath))
|
||||
}
|
||||
// Cleanup multipart upload dir.
|
||||
if err = er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, writeQuorum); err != nil {
|
||||
return setRestoreHeaderFn(oi, true, toObjectErr(err, bucket, object, uploadID))
|
||||
}
|
||||
return setRestoreHeaderFn(oi, true, nil)
|
||||
return setRestoreHeaderFn(oi, nil)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user