Add Full Object Checksums and CRC64-NVME (#20855)

Backport of AIStor PR 247.

Add support for full object checksums as described here:

https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html

New checksum types are fully supported. Mint tests from https://github.com/minio/minio-go/pull/2026 are now passing.

Includes fixes from https://github.com/minio/minio/pull/20743 for mint tests.

Add using checksums as validation for object content. Fixes #20845 #20849

Fixes checksum replication (downstream PR 250)
This commit is contained in:
Klaus Post
2025-01-20 06:49:07 -08:00
committed by GitHub
parent 779ec8f0d4
commit 827004cd6d
18 changed files with 665 additions and 168 deletions

View File

@@ -480,6 +480,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
if opts.WantChecksum != nil && opts.WantChecksum.Type.IsSet() {
userDefined[hash.MinIOMultipartChecksum] = opts.WantChecksum.Type.String()
userDefined[hash.MinIOMultipartChecksumType] = opts.WantChecksum.Type.ObjType()
}
modTime := opts.MTime
@@ -508,6 +509,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
return &NewMultipartUploadResult{
UploadID: uploadID,
ChecksumAlgo: userDefined[hash.MinIOMultipartChecksum],
ChecksumType: userDefined[hash.MinIOMultipartChecksumType],
}, nil
}
@@ -765,15 +767,16 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Return success.
return PartInfo{
PartNumber: partInfo.Number,
ETag: partInfo.ETag,
LastModified: partInfo.ModTime,
Size: partInfo.Size,
ActualSize: partInfo.ActualSize,
ChecksumCRC32: partInfo.Checksums["CRC32"],
ChecksumCRC32C: partInfo.Checksums["CRC32C"],
ChecksumSHA1: partInfo.Checksums["SHA1"],
ChecksumSHA256: partInfo.Checksums["SHA256"],
PartNumber: partInfo.Number,
ETag: partInfo.ETag,
LastModified: partInfo.ModTime,
Size: partInfo.Size,
ActualSize: partInfo.ActualSize,
ChecksumCRC32: partInfo.Checksums["CRC32"],
ChecksumCRC32C: partInfo.Checksums["CRC32C"],
ChecksumSHA1: partInfo.Checksums["SHA1"],
ChecksumSHA256: partInfo.Checksums["SHA256"],
ChecksumCRC64NVME: partInfo.Checksums["CRC64NVME"],
}, nil
}
@@ -895,6 +898,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
result.PartNumberMarker = partNumberMarker
result.UserDefined = cloneMSS(fi.Metadata)
result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum]
result.ChecksumType = fi.Metadata[hash.MinIOMultipartChecksumType]
if maxParts == 0 {
return result, nil
@@ -941,15 +945,16 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
count := maxParts
for _, objPart := range objParts {
result.Parts = append(result.Parts, PartInfo{
PartNumber: objPart.Number,
LastModified: objPart.ModTime,
ETag: objPart.ETag,
Size: objPart.Size,
ActualSize: objPart.ActualSize,
ChecksumCRC32: objPart.Checksums["CRC32"],
ChecksumCRC32C: objPart.Checksums["CRC32C"],
ChecksumSHA1: objPart.Checksums["SHA1"],
ChecksumSHA256: objPart.Checksums["SHA256"],
PartNumber: objPart.Number,
LastModified: objPart.ModTime,
ETag: objPart.ETag,
Size: objPart.Size,
ActualSize: objPart.ActualSize,
ChecksumCRC32: objPart.Checksums["CRC32"],
ChecksumCRC32C: objPart.Checksums["CRC32C"],
ChecksumSHA1: objPart.Checksums["SHA1"],
ChecksumSHA256: objPart.Checksums["SHA256"],
ChecksumCRC64NVME: objPart.Checksums["CRC64NVME"],
})
count--
if count == 0 {
@@ -1131,12 +1136,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Checksum type set when upload started.
var checksumType hash.ChecksumType
if cs := fi.Metadata[hash.MinIOMultipartChecksum]; cs != "" {
checksumType = hash.NewChecksumType(cs)
checksumType = hash.NewChecksumType(cs, fi.Metadata[hash.MinIOMultipartChecksumType])
if opts.WantChecksum != nil && !opts.WantChecksum.Type.Is(checksumType) {
return oi, InvalidArgument{
Bucket: bucket,
Object: fi.Name,
Err: fmt.Errorf("checksum type mismatch"),
Err: fmt.Errorf("checksum type mismatch. got %q (%s) expected %q (%s)", checksumType.String(), checksumType.ObjType(), opts.WantChecksum.Type.String(), opts.WantChecksum.Type.ObjType()),
}
}
}
@@ -1216,6 +1221,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
// Allocate parts similar to incoming slice.
fi.Parts = make([]ObjectPartInfo, len(parts))
var checksum hash.Checksum
checksum.Type = checksumType
// Validate each part and then commit to disk.
for i, part := range parts {
partIdx := objectPartIndex(currentFI.Parts, part.PartNumber)
@@ -1249,10 +1257,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
}
wantCS := map[string]string{
hash.ChecksumCRC32.String(): part.ChecksumCRC32,
hash.ChecksumCRC32C.String(): part.ChecksumCRC32C,
hash.ChecksumSHA1.String(): part.ChecksumSHA1,
hash.ChecksumSHA256.String(): part.ChecksumSHA256,
hash.ChecksumCRC32.String(): part.ChecksumCRC32,
hash.ChecksumCRC32C.String(): part.ChecksumCRC32C,
hash.ChecksumSHA1.String(): part.ChecksumSHA1,
hash.ChecksumSHA256.String(): part.ChecksumSHA256,
hash.ChecksumCRC64NVME.String(): part.ChecksumCRC64NVME,
}
if wantCS[checksumType.String()] != crc {
return oi, InvalidPart{
@@ -1267,6 +1276,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
PartNumber: part.PartNumber,
}
}
if checksumType.FullObjectRequested() {
if err := checksum.AddPart(*cs, expPart.ActualSize); err != nil {
return oi, InvalidPart{
PartNumber: part.PartNumber,
ExpETag: "<nil>",
GotETag: err.Error(),
}
}
}
checksumCombined = append(checksumCombined, cs.Raw...)
}
@@ -1297,9 +1315,19 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
if opts.WantChecksum != nil {
err := opts.WantChecksum.Matches(checksumCombined, len(parts))
if err != nil {
return oi, err
if checksumType.FullObjectRequested() {
if opts.WantChecksum.Encoded != checksum.Encoded {
err := hash.ChecksumMismatch{
Want: opts.WantChecksum.Encoded,
Got: checksum.Encoded,
}
return oi, err
}
} else {
err := opts.WantChecksum.Matches(checksumCombined, len(parts))
if err != nil {
return oi, err
}
}
}
@@ -1313,14 +1341,18 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
if checksumType.IsSet() {
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
var cs *hash.Checksum
cs = hash.NewChecksumFromData(checksumType, checksumCombined)
fi.Checksum = cs.AppendTo(nil, checksumCombined)
checksum.Type = checksumType
if !checksumType.FullObjectRequested() {
checksum = *hash.NewChecksumFromData(checksumType, checksumCombined)
}
fi.Checksum = checksum.AppendTo(nil, checksumCombined)
if opts.EncryptFn != nil {
fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
}
}
delete(fi.Metadata, hash.MinIOMultipartChecksum) // Not needed in final object.
// Remove superfluous internal headers.
delete(fi.Metadata, hash.MinIOMultipartChecksum)
delete(fi.Metadata, hash.MinIOMultipartChecksumType)
// Save the final object size and modtime.
fi.Size = objectSize