mirror of
https://github.com/minio/minio.git
synced 2025-11-09 13:39:46 -05:00
support encrypted/compressed objects properly during decommission (#15320)
fixes #15314
This commit is contained in:
@@ -658,6 +658,9 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
fi.ModTime = UTCNow()
|
||||
|
||||
md5hex := r.MD5CurrentHexString()
|
||||
if opts.PreserveETag != "" {
|
||||
md5hex = opts.PreserveETag
|
||||
}
|
||||
var index []byte
|
||||
if opts.IndexCB != nil {
|
||||
index = opts.IndexCB()
|
||||
|
||||
@@ -1099,8 +1099,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
Hash: bitrotWriterSum(w),
|
||||
})
|
||||
}
|
||||
|
||||
if userDefined["etag"] == "" {
|
||||
userDefined["etag"] = r.MD5CurrentHexString()
|
||||
if opts.PreserveETag != "" {
|
||||
userDefined["etag"] = opts.PreserveETag
|
||||
}
|
||||
}
|
||||
|
||||
// Guess content-type from the extension if possible.
|
||||
|
||||
@@ -581,6 +581,11 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||
auditLogDecom(ctx, "DecomCopyData", objInfo.Bucket, objInfo.Name, objInfo.VersionID, err)
|
||||
}()
|
||||
|
||||
actualSize, err := objInfo.GetActualSize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if objInfo.isMultipart() {
|
||||
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
@@ -593,14 +598,19 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
|
||||
parts := make([]CompletePart, len(objInfo.Parts))
|
||||
for i, part := range objInfo.Parts {
|
||||
hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
|
||||
hr, err := hash.NewReader(gr, part.Size, "", "", part.ActualSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
|
||||
}
|
||||
pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
|
||||
part.Number,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{})
|
||||
ObjectOptions{
|
||||
PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata.
|
||||
IndexCB: func() []byte {
|
||||
return part.Index // Preserve part Index to ensure decompression works.
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("decommissionObject: PutObjectPart() %w", err)
|
||||
}
|
||||
@@ -617,7 +627,8 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||
}
|
||||
return err
|
||||
}
|
||||
hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size)
|
||||
|
||||
hr, err := hash.NewReader(gr, objInfo.Size, "", "", actualSize)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
|
||||
}
|
||||
@@ -626,9 +637,13 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
||||
objInfo.Name,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
MTime: objInfo.ModTime,
|
||||
UserDefined: objInfo.UserDefined,
|
||||
VersionID: objInfo.VersionID,
|
||||
MTime: objInfo.ModTime,
|
||||
UserDefined: objInfo.UserDefined,
|
||||
PreserveETag: objInfo.ETag, // Preserve original ETag to ensure same metadata.
|
||||
IndexCB: func() []byte {
|
||||
return objInfo.Parts[0].Index // Preserve part Index to ensure decompression works.
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("decommissionObject: PutObject() %w", err)
|
||||
@@ -741,11 +756,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||
bi.Name,
|
||||
version.Name,
|
||||
ObjectOptions{
|
||||
Versioned: vc.PrefixEnabled(version.Name),
|
||||
VersionID: version.VersionID,
|
||||
MTime: version.ModTime,
|
||||
DeleteReplication: version.ReplicationState,
|
||||
DeleteMarker: true, // make sure we create a delete marker
|
||||
Versioned: vc.PrefixEnabled(version.Name),
|
||||
VersionID: version.VersionID,
|
||||
MTime: version.ModTime,
|
||||
DeleteReplication: version.ReplicationState,
|
||||
DeleteMarker: true, // make sure we create a delete marker
|
||||
SkipDecommissioned: true, // make sure we skip the decommissioned pool
|
||||
})
|
||||
var failure bool
|
||||
if err != nil {
|
||||
@@ -772,7 +788,8 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||
http.Header{},
|
||||
noLock, // all mutations are blocked reads are safe without locks.
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
VersionID: version.VersionID,
|
||||
NoDecryption: true,
|
||||
})
|
||||
if isErrObjectNotFound(err) {
|
||||
// object deleted by the application, nothing to do here we move on.
|
||||
|
||||
@@ -427,8 +427,9 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
|
||||
})
|
||||
|
||||
for _, pinfo := range poolObjInfos {
|
||||
// skip all objects from suspended pools for mutating calls.
|
||||
if z.IsSuspended(pinfo.PoolIndex) && opts.Mutate {
|
||||
// skip all objects from suspended pools if asked by the
|
||||
// caller.
|
||||
if z.IsSuspended(pinfo.PoolIndex) && opts.SkipDecommissioned {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -461,8 +462,8 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
|
||||
// The check is skipped if there is only one pool, and 0, nil is always returned in that case.
|
||||
func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
|
||||
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
|
||||
NoLock: true,
|
||||
Mutate: true,
|
||||
NoLock: true,
|
||||
SkipDecommissioned: true,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -486,7 +487,7 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec
|
||||
// if none are found falls back to most available space pool, this function is
|
||||
// designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload.
|
||||
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
|
||||
idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{Mutate: true})
|
||||
idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{SkipDecommissioned: true})
|
||||
if err != nil && !isErrObjectNotFound(err) {
|
||||
return idx, err
|
||||
}
|
||||
@@ -2275,7 +2276,6 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s
|
||||
return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
opts.Mutate = false
|
||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -2291,7 +2291,8 @@ func (z *erasureServerPools) TransitionObject(ctx context.Context, bucket, objec
|
||||
return z.serverPools[0].TransitionObject(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
opts.Mutate = true // Avoid transitioning an object from a pool being decommissioned.
|
||||
// Avoid transitioning an object from a pool being decommissioned.
|
||||
opts.SkipDecommissioned = true
|
||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -2307,7 +2308,8 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck
|
||||
return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
opts.Mutate = true // Avoid restoring object from a pool being decommissioned.
|
||||
// Avoid restoring object from a pool being decommissioned.
|
||||
opts.SkipDecommissioned = true
|
||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -59,6 +59,8 @@ type ObjectOptions struct {
|
||||
Transition TransitionOptions
|
||||
Expiration ExpirationOptions
|
||||
|
||||
NoDecryption bool // indicates if the stream must be decrypted.
|
||||
PreserveETag string // preserves this etag during a PUT call.
|
||||
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
|
||||
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
|
||||
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
|
||||
@@ -73,10 +75,11 @@ type ObjectOptions struct {
|
||||
// Use the maximum parity (N/2), used when saving server configuration files
|
||||
MaxParity bool
|
||||
|
||||
// Mutate set to 'true' if the call is namespace mutation call
|
||||
Mutate bool
|
||||
WalkAscending bool // return Walk results in ascending order of versions
|
||||
// SkipDecommissioned set to 'true' if the call requires skipping the pool being decommissioned.
|
||||
// mainly set for certain WRITE operations.
|
||||
SkipDecommissioned bool
|
||||
|
||||
WalkAscending bool // return Walk results in ascending order of versions
|
||||
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
|
||||
|
||||
// IndexCB will return any index created but the compression.
|
||||
|
||||
@@ -74,6 +74,7 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str
|
||||
if _, ok := header[xhttp.MinIOSourceReplicationRequest]; ok {
|
||||
opts.ReplicationRequest = true
|
||||
}
|
||||
opts.Speedtest = header.Get(globalObjectPerfUserMetadata) != ""
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -646,8 +646,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
// if object is encrypted and it is a restore request, fetch content without decrypting.
|
||||
if opts.Transition.RestoreRequest != nil {
|
||||
// if object is encrypted and it is a restore request or if NoDecryption
|
||||
// was requested, fetch content without decrypting.
|
||||
if opts.Transition.RestoreRequest != nil || opts.NoDecryption {
|
||||
isEncrypted = false
|
||||
isCompressed = false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user