mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Fix SSEC multipart checksum replication (#19915)
* Multipart SSEC checksums were not transferred. * Remove key mismatch logging. This key is user-controlled with SSEC. * If the source is SSEC and the destination reports ErrSSEEncryptedObject, assume replication is good.
This commit is contained in:
parent
ba9f0f2480
commit
ad04afe381
@ -1418,7 +1418,8 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set the encrypted size for SSE-C objects
|
// Set the encrypted size for SSE-C objects
|
||||||
if crypto.SSEC.IsEncrypted(objInfo.UserDefined) {
|
isSSEC := crypto.SSEC.IsEncrypted(objInfo.UserDefined)
|
||||||
|
if isSSEC {
|
||||||
size = objInfo.Size
|
size = objInfo.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1478,6 +1479,13 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// SSEC objects will refuse HeadObject without the decryption key.
|
||||||
|
// Ignore the error, since we know the object exists and versioning prevents overwriting existing versions.
|
||||||
|
if isSSEC && strings.Contains(cerr.Error(), errorCodes[ErrSSEEncryptedObject].Description) {
|
||||||
|
rinfo.ReplicationStatus = replication.Completed
|
||||||
|
rinfo.ReplicationAction = replicateNone
|
||||||
|
goto applyAction
|
||||||
|
}
|
||||||
// if target returns error other than NoSuchKey, defer replication attempt
|
// if target returns error other than NoSuchKey, defer replication attempt
|
||||||
if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||||
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
globalBucketTargetSys.markOffline(tgt.EndpointURL())
|
||||||
@ -1505,6 +1513,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
applyAction:
|
||||||
rinfo.ReplicationStatus = replication.Completed
|
rinfo.ReplicationStatus = replication.Completed
|
||||||
rinfo.Size = size
|
rinfo.Size = size
|
||||||
rinfo.ReplicationAction = rAction
|
rinfo.ReplicationAction = rAction
|
||||||
@ -1638,13 +1647,14 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hr *hash.Reader
|
hr *hash.Reader
|
||||||
pInfo minio.ObjectPart
|
pInfo minio.ObjectPart
|
||||||
|
isSSEC = crypto.SSEC.IsEncrypted(objInfo.UserDefined)
|
||||||
)
|
)
|
||||||
|
|
||||||
var objectSize int64
|
var objectSize int64
|
||||||
for _, partInfo := range objInfo.Parts {
|
for _, partInfo := range objInfo.Parts {
|
||||||
if crypto.SSEC.IsEncrypted(objInfo.UserDefined) {
|
if isSSEC {
|
||||||
hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.Size), partInfo.Size, "", "", partInfo.ActualSize)
|
hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.Size), partInfo.Size, "", "", partInfo.ActualSize)
|
||||||
} else {
|
} else {
|
||||||
hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.ActualSize), partInfo.ActualSize, "", "", partInfo.ActualSize)
|
hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.ActualSize), partInfo.ActualSize, "", "", partInfo.ActualSize)
|
||||||
@ -1655,16 +1665,18 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
|||||||
|
|
||||||
cHeader := http.Header{}
|
cHeader := http.Header{}
|
||||||
cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
|
cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
|
||||||
crc := getCRCMeta(objInfo, partInfo.Number, nil) // No SSE-C keys here.
|
if !isSSEC {
|
||||||
for k, v := range crc {
|
crc := getCRCMeta(objInfo, partInfo.Number, nil) // No SSE-C keys here.
|
||||||
cHeader.Add(k, v)
|
for k, v := range crc {
|
||||||
|
cHeader.Add(k, v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
popts := minio.PutObjectPartOptions{
|
popts := minio.PutObjectPartOptions{
|
||||||
SSE: opts.ServerSideEncryption,
|
SSE: opts.ServerSideEncryption,
|
||||||
CustomHeader: cHeader,
|
CustomHeader: cHeader,
|
||||||
}
|
}
|
||||||
|
|
||||||
if crypto.SSEC.IsEncrypted(objInfo.UserDefined) {
|
if isSSEC {
|
||||||
objectSize += partInfo.Size
|
objectSize += partInfo.Size
|
||||||
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.Size, popts)
|
pInfo, err = c.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, hr, partInfo.Size, popts)
|
||||||
} else {
|
} else {
|
||||||
@ -1674,7 +1686,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !crypto.SSEC.IsEncrypted(objInfo.UserDefined) && pInfo.Size != partInfo.ActualSize {
|
if !isSSEC && pInfo.Size != partInfo.ActualSize {
|
||||||
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
||||||
}
|
}
|
||||||
uploadedParts = append(uploadedParts, minio.CompletePart{
|
uploadedParts = append(uploadedParts, minio.CompletePart{
|
||||||
@ -1686,12 +1698,18 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
|||||||
ChecksumSHA256: pInfo.ChecksumSHA256,
|
ChecksumSHA256: pInfo.ChecksumSHA256,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
userMeta := map[string]string{
|
||||||
|
validSSEReplicationHeaders[ReservedMetadataPrefix+"Actual-Object-Size"]: objInfo.UserDefined[ReservedMetadataPrefix+"actual-size"],
|
||||||
|
}
|
||||||
|
if isSSEC && objInfo.UserDefined[ReplicationSsecChecksumHeader] != "" {
|
||||||
|
userMeta[ReplicationSsecChecksumHeader] = objInfo.UserDefined[ReplicationSsecChecksumHeader]
|
||||||
|
}
|
||||||
|
|
||||||
// really big value but its okay on heavily loaded systems. This is just tail end timeout.
|
// really big value but its okay on heavily loaded systems. This is just tail end timeout.
|
||||||
cctx, ccancel := context.WithTimeout(ctx, 10*time.Minute)
|
cctx, ccancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||||
defer ccancel()
|
defer ccancel()
|
||||||
_, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
|
_, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
|
||||||
UserMetadata: map[string]string{validSSEReplicationHeaders[ReservedMetadataPrefix+"Actual-Object-Size"]: objInfo.UserDefined[ReservedMetadataPrefix+"actual-size"]},
|
UserMetadata: userMeta,
|
||||||
Internal: minio.AdvancedPutOptions{
|
Internal: minio.AdvancedPutOptions{
|
||||||
SourceMTime: objInfo.ModTime,
|
SourceMTime: objInfo.ModTime,
|
||||||
SourceETag: objInfo.ETag,
|
SourceETag: objInfo.ETag,
|
||||||
|
@ -1025,7 +1025,9 @@ func DecryptObjectInfo(info *ObjectInfo, r *http.Request) (encrypted bool, err e
|
|||||||
if encrypted {
|
if encrypted {
|
||||||
if crypto.SSEC.IsEncrypted(info.UserDefined) {
|
if crypto.SSEC.IsEncrypted(info.UserDefined) {
|
||||||
if !(crypto.SSEC.IsRequested(headers) || crypto.SSECopy.IsRequested(headers)) {
|
if !(crypto.SSEC.IsRequested(headers) || crypto.SSECopy.IsRequested(headers)) {
|
||||||
return encrypted, errEncryptedObject
|
if r.Header.Get(xhttp.MinIOSourceReplicationRequest) != "true" {
|
||||||
|
return encrypted, errEncryptedObject
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1168,7 +1170,9 @@ func (o *ObjectInfo) decryptChecksums(part int, h http.Header) map[string]string
|
|||||||
if _, encrypted := crypto.IsEncrypted(o.UserDefined); encrypted {
|
if _, encrypted := crypto.IsEncrypted(o.UserDefined); encrypted {
|
||||||
decrypted, err := o.metadataDecrypter(h)("object-checksum", data)
|
decrypted, err := o.metadataDecrypter(h)("object-checksum", data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
encLogIf(GlobalContext, err)
|
if err != crypto.ErrSecretKeyMismatch {
|
||||||
|
encLogIf(GlobalContext, err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
data = decrypted
|
data = decrypted
|
||||||
|
@ -483,6 +483,11 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
|||||||
}
|
}
|
||||||
fi.DataDir = mustGetUUID()
|
fi.DataDir = mustGetUUID()
|
||||||
|
|
||||||
|
if userDefined[ReplicationSsecChecksumHeader] != "" {
|
||||||
|
fi.Checksum, _ = base64.StdEncoding.DecodeString(userDefined[ReplicationSsecChecksumHeader])
|
||||||
|
delete(userDefined, ReplicationSsecChecksumHeader)
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize erasure metadata.
|
// Initialize erasure metadata.
|
||||||
for index := range partsMetadata {
|
for index := range partsMetadata {
|
||||||
partsMetadata[index] = fi
|
partsMetadata[index] = fi
|
||||||
@ -1294,6 +1299,14 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
defer lk.Unlock(lkctx)
|
defer lk.Unlock(lkctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Accept encrypted checksum from incoming request.
|
||||||
|
if opts.UserDefined[ReplicationSsecChecksumHeader] != "" {
|
||||||
|
if v, err := base64.StdEncoding.DecodeString(opts.UserDefined[ReplicationSsecChecksumHeader]); err == nil {
|
||||||
|
fi.Checksum = v
|
||||||
|
}
|
||||||
|
delete(opts.UserDefined, ReplicationSsecChecksumHeader)
|
||||||
|
}
|
||||||
|
|
||||||
if checksumType.IsSet() {
|
if checksumType.IsSet() {
|
||||||
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
|
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
|
||||||
var cs *hash.Checksum
|
var cs *hash.Checksum
|
||||||
@ -1303,13 +1316,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
|
fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if fi.Metadata[ReplicationSsecChecksumHeader] != "" {
|
delete(fi.Metadata, hash.MinIOMultipartChecksum) // Not needed in final object.
|
||||||
if v, err := base64.StdEncoding.DecodeString(fi.Metadata[ReplicationSsecChecksumHeader]); err == nil {
|
|
||||||
fi.Checksum = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delete(fi.Metadata, ReplicationSsecChecksumHeader) // Transferred above.
|
|
||||||
delete(fi.Metadata, hash.MinIOMultipartChecksum) // Not needed in final object.
|
|
||||||
|
|
||||||
// Save the final object size and modtime.
|
// Save the final object size and modtime.
|
||||||
fi.Size = objectSize
|
fi.Size = objectSize
|
||||||
|
@ -37,6 +37,15 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str
|
|||||||
var sse encrypt.ServerSide
|
var sse encrypt.ServerSide
|
||||||
|
|
||||||
opts = ObjectOptions{UserDefined: metadata}
|
opts = ObjectOptions{UserDefined: metadata}
|
||||||
|
if v, ok := header[xhttp.MinIOSourceProxyRequest]; ok {
|
||||||
|
opts.ProxyHeaderSet = true
|
||||||
|
opts.ProxyRequest = strings.Join(v, "") == "true"
|
||||||
|
}
|
||||||
|
if _, ok := header[xhttp.MinIOSourceReplicationRequest]; ok {
|
||||||
|
opts.ReplicationRequest = true
|
||||||
|
}
|
||||||
|
opts.Speedtest = header.Get(globalObjectPerfUserMetadata) != ""
|
||||||
|
|
||||||
if copySource {
|
if copySource {
|
||||||
if crypto.SSECopy.IsRequested(header) {
|
if crypto.SSECopy.IsRequested(header) {
|
||||||
clientKey, err = crypto.SSECopy.ParseHTTP(header)
|
clientKey, err = crypto.SSECopy.ParseHTTP(header)
|
||||||
@ -66,14 +75,7 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str
|
|||||||
if crypto.S3.IsRequested(header) || (metadata != nil && crypto.S3.IsEncrypted(metadata)) {
|
if crypto.S3.IsRequested(header) || (metadata != nil && crypto.S3.IsEncrypted(metadata)) {
|
||||||
opts.ServerSideEncryption = encrypt.NewSSE()
|
opts.ServerSideEncryption = encrypt.NewSSE()
|
||||||
}
|
}
|
||||||
if v, ok := header[xhttp.MinIOSourceProxyRequest]; ok {
|
|
||||||
opts.ProxyHeaderSet = true
|
|
||||||
opts.ProxyRequest = strings.Join(v, "") == "true"
|
|
||||||
}
|
|
||||||
if _, ok := header[xhttp.MinIOSourceReplicationRequest]; ok {
|
|
||||||
opts.ReplicationRequest = true
|
|
||||||
}
|
|
||||||
opts.Speedtest = header.Get(globalObjectPerfUserMetadata) != ""
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -494,5 +496,8 @@ func completeMultipartOpts(ctx context.Context, r *http.Request, bucket, object
|
|||||||
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
|
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
|
||||||
opts.ReplicationRequest = true
|
opts.ReplicationRequest = true
|
||||||
}
|
}
|
||||||
|
if r.Header.Get(ReplicationSsecChecksumHeader) != "" {
|
||||||
|
opts.UserDefined[ReplicationSsecChecksumHeader] = r.Header.Get(ReplicationSsecChecksumHeader)
|
||||||
|
}
|
||||||
return opts, nil
|
return opts, nil
|
||||||
}
|
}
|
||||||
|
@ -1482,9 +1482,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
|||||||
|
|
||||||
// convert copy src encryption options for GET calls
|
// convert copy src encryption options for GET calls
|
||||||
getOpts := ObjectOptions{
|
getOpts := ObjectOptions{
|
||||||
VersionID: srcOpts.VersionID,
|
VersionID: srcOpts.VersionID,
|
||||||
Versioned: srcOpts.Versioned,
|
Versioned: srcOpts.Versioned,
|
||||||
VersionSuspended: srcOpts.VersionSuspended,
|
VersionSuspended: srcOpts.VersionSuspended,
|
||||||
|
ReplicationRequest: r.Header.Get(xhttp.MinIOSourceReplicationRequest) == "true",
|
||||||
}
|
}
|
||||||
getSSE := encrypt.SSE(srcOpts.ServerSideEncryption)
|
getSSE := encrypt.SSE(srcOpts.ServerSideEncryption)
|
||||||
if getSSE != srcOpts.ServerSideEncryption {
|
if getSSE != srcOpts.ServerSideEncryption {
|
||||||
@ -1616,7 +1617,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Encryption parameters not present for this object.
|
// Encryption parameters not present for this object.
|
||||||
if crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && !crypto.SSECopy.IsRequested(r.Header) {
|
if crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && !crypto.SSECopy.IsRequested(r.Header) && r.Header.Get(xhttp.MinIOSourceReplicationRequest) != "true" {
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidSSECustomerAlgorithm), r.URL)
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidSSECustomerAlgorithm), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user