preserve metadata multipart restore (#12139)

avoid re-read of xl.meta instead just use
the success criteria from PutObjectPart()
and check the ETag matches per Part, if
they match then the parts have been
successfully restored as is.

Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Harshavardhana 2021-04-24 19:07:27 -07:00 committed by GitHub
parent f420996dfa
commit 4eb9b6eaf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 43 deletions

View File

@ -301,11 +301,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
partsMetadata := make([]FileInfo, len(onlineDisks)) partsMetadata := make([]FileInfo, len(onlineDisks))
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives) fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
if opts.Versioned { fi.VersionID = opts.VersionID
fi.VersionID = opts.VersionID if opts.Versioned && fi.VersionID == "" {
if fi.VersionID == "" { fi.VersionID = mustGetUUID()
fi.VersionID = mustGetUUID()
}
} }
fi.DataDir = mustGetUUID() fi.DataDir = mustGetUUID()

View File

@ -1433,6 +1433,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
} }
oi = actualfi.ToObjectInfo(bucket, object) oi = actualfi.ToObjectInfo(bucket, object)
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
if len(oi.Parts) == 1 { if len(oi.Parts) == 1 {
var rs *HTTPRangeSpec var rs *HTTPRangeSpec
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts) gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
@ -1445,15 +1446,16 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
} }
pReader := NewPutObjReader(hashReader) pReader := NewPutObjReader(hashReader)
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String() ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
_, err = er.PutObject(ctx, bucket, object, pReader, ropts) _, err = er.PutObject(ctx, bucket, object, pReader, ropts)
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
} }
uploadID, err := er.NewMultipartUpload(ctx, bucket, object, opts)
uploadID, err := er.NewMultipartUpload(ctx, bucket, object, ropts)
if err != nil { if err != nil {
return setRestoreHeaderFn(oi, err) return setRestoreHeaderFn(oi, err)
} }
var uploadedParts []CompletePart var uploadedParts []CompletePart
var rs *HTTPRangeSpec var rs *HTTPRangeSpec
// get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted. // get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted.
@ -1462,6 +1464,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
return setRestoreHeaderFn(oi, err) return setRestoreHeaderFn(oi, err)
} }
defer gr.Close() defer gr.Close()
// rehydrate the parts back on disk as per the original xl.meta prior to transition // rehydrate the parts back on disk as per the original xl.meta prior to transition
for _, partInfo := range oi.Parts { for _, partInfo := range oi.Parts {
hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size) hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size)
@ -1478,40 +1481,17 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
}) })
} }
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
storageDisks := er.getDisks()
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
if err != nil {
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return setRestoreHeaderFn(oi, toObjectErr(reducedErr, bucket, object))
}
_, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum)
if err != nil {
return setRestoreHeaderFn(oi, err)
}
//validate parts created via multipart to transitioned object's parts info in xl.meta
partsMatch := true partsMatch := true
if len(actualfi.Parts) != len(fi.Parts) { // validate parts created via multipart
partsMatch = false if len(oi.Parts) == len(uploadedParts) {
} for i, pi := range oi.Parts {
if len(actualfi.Parts) == len(fi.Parts) { if uploadedParts[i].ETag != pi.ETag {
for i, pi := range actualfi.Parts {
if fi.Parts[i].Size != pi.Size {
partsMatch = false partsMatch = false
break
} }
} }
} else {
partsMatch = false
} }
if !partsMatch { if !partsMatch {
@ -1525,6 +1505,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
VersionSuspended: globalBucketVersioningSys.Suspended(bucket), VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
}) })
if err != nil { if err != nil {
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
return setRestoreHeaderFn(oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)) return setRestoreHeaderFn(oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath))
} }
return setRestoreHeaderFn(oi, nil) return setRestoreHeaderFn(oi, nil)

View File

@ -52,13 +52,13 @@ type ObjectOptions struct {
DeleteMarkerReplicationStatus string // Is only set in DELETE operations DeleteMarkerReplicationStatus string // Is only set in DELETE operations
VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted. VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted.
Transition TransitionOptions Transition TransitionOptions
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object.
// Use the maximum parity (N/2), used when NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
// saving server configuration files ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object.
// Use the maximum parity (N/2), used when saving server configuration files
MaxParity bool MaxParity bool
} }