listing: improve listing of encrypted objects (#14667)

This commit improves the listing of encrypted objects:
 - Use `etag.Format` and `etag.Decrypt`
 - Detect SSE-S3 single-part objects in a single iteration
 - Fix batch size to `250`
 - Pass request context to `DecryptAll` to not waste resources
   when a client cancels the operation.

Signed-off-by: Andreas Auernhammer <hi@aead.dev>
This commit is contained in:
Andreas Auernhammer
2022-04-04 20:42:03 +02:00
committed by GitHub
parent d4251b2545
commit 6b1c62133d
7 changed files with 87 additions and 102 deletions

View File

@@ -19,12 +19,12 @@ package cmd
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"github.com/gorilla/mux"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/bucket/policy"
@@ -100,7 +100,8 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r
return
}
if err = DecryptETags(ctx, GlobalKMS, listObjectVersionsInfo.Objects, kms.BatchSize()); err != nil {
if err = DecryptETags(ctx, GlobalKMS, listObjectVersionsInfo.Objects); err != nil {
logger.LogIf(ctx, fmt.Errorf("Failed to decrypt ETag: %v", err)) // TODO(aead): Remove once we are confident that decryption does not fail accidentially
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
@@ -165,7 +166,8 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
return
}
if err = DecryptETags(ctx, GlobalKMS, listObjectsV2Info.Objects, kms.BatchSize()); err != nil {
if err = DecryptETags(ctx, GlobalKMS, listObjectsV2Info.Objects); err != nil {
logger.LogIf(ctx, fmt.Errorf("Failed to decrypt ETag: %v", err)) // TODO(aead): Remove once we are confident that decryption does not fail accidentially
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
@@ -243,7 +245,8 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
return
}
if err = DecryptETags(ctx, GlobalKMS, listObjectsV2Info.Objects, kms.BatchSize()); err != nil {
if err = DecryptETags(ctx, GlobalKMS, listObjectsV2Info.Objects); err != nil {
logger.LogIf(ctx, fmt.Errorf("Failed to decrypt ETag: %v", err)) // TODO(aead): Remove once we are confident that decryption does not fail accidentially
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
@@ -343,7 +346,8 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
return
}
if err = DecryptETags(ctx, GlobalKMS, listObjectsInfo.Objects, kms.BatchSize()); err != nil {
if err = DecryptETags(ctx, GlobalKMS, listObjectsInfo.Objects); err != nil {
logger.LogIf(ctx, fmt.Errorf("Failed to decrypt ETag: %v", err)) // TODO(aead): Remove once we are confident that decryption does not fail accidentially
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}

View File

@@ -96,102 +96,104 @@ func kmsKeyIDFromMetadata(metadata map[string]string) string {
return ARNPrefix + kmsID
}
// DecryptETags dectypts all ObjectInfo ETags, if encrypted, using the KMS.
func DecryptETags(ctx context.Context, KMS kms.KMS, objects []ObjectInfo, batchSize int) error {
// DecryptETags decryptes the ETag of all ObjectInfos using the KMS.
//
// It adjusts the size of all encrypted objects since encrypted
// objects are slightly larger due to encryption overhead.
// Further, it decrypts all single-part SSE-S3 encrypted objects
// and formats ETags of SSE-C / SSE-KMS encrypted objects to
// be AWS S3 compliant.
//
// DecryptETags uses a KMS bulk decryption API, if available, which
// is more efficient than decrypting ETags sequentually.
func DecryptETags(ctx context.Context, KMS kms.KMS, objects []ObjectInfo) error {
const BatchSize = 250 // We process the objects in batches - 250 is a reasonable default.
var (
metadata = make([]map[string]string, 0, batchSize)
buckets = make([]string, 0, batchSize)
names = make([]string, 0, batchSize)
metadata = make([]map[string]string, 0, BatchSize)
buckets = make([]string, 0, BatchSize)
names = make([]string, 0, BatchSize)
)
for len(objects) > 0 {
var N int
if len(objects) < batchSize {
N := BatchSize
if len(objects) < BatchSize {
N = len(objects)
} else {
N = batchSize
}
batch := objects[:N]
// We have to conntect the KMS only if there is at least
// one SSE-S3 single-part object. SSE-C and SSE-KMS objects
// don't return the plaintext MD5 ETag and the ETag of
// SSE-S3 multipart objects is not encrypted.
// Therefore, we can skip the expensive KMS calls whenever
// there is no single-part SSE-S3 object entirely.
var containsSSES3SinglePart bool
for _, object := range objects[:N] {
// We have to decrypt only ETags of SSE-S3 single-part
// objects.
// Therefore, we remember which objects (there index)
// in the current batch are single-part SSE-S3 objects.
metadata = metadata[:0:N]
buckets = buckets[:0:N]
names = names[:0:N]
SSES3SinglePartObjects := make(map[int]bool)
for i, object := range batch {
if kind, ok := crypto.IsEncrypted(object.UserDefined); ok && kind == crypto.S3 && !crypto.IsMultiPart(object.UserDefined) {
containsSSES3SinglePart = true
break
SSES3SinglePartObjects[i] = true
metadata = append(metadata, object.UserDefined)
buckets = append(buckets, object.Bucket)
names = append(names, object.Name)
}
}
if !containsSSES3SinglePart {
for i := range objects[:N] {
size, err := objects[i].GetActualSize()
// If there are no SSE-S3 single-part objects
// we can skip the decryption process. However,
// we still have to adjust the size and ETag
// of SSE-C and SSE-KMS objects.
if len(SSES3SinglePartObjects) == 0 {
for i := range batch {
size, err := batch[i].GetActualSize()
if err != nil {
return err
}
objects[i].Size = size
objects[i].ETag = objects[i].GetActualETag(nil)
batch[i].Size = size
if _, ok := crypto.IsEncrypted(batch[i].UserDefined); ok {
ETag, err := etag.Parse(batch[i].ETag)
if err != nil {
return err
}
batch[i].ETag = ETag.Format().String()
}
}
objects = objects[N:]
continue
}
// Now, there are some SSE-S3 single-part objects.
// We only request the decryption keys for them.
// We don't want to get the decryption keys for multipart
// or non-SSE-S3 objects.
//
// Therefore, we keep a map of indicies to remember which
// object was an SSE-S3 single-part object.
// Then we request the decryption keys for these objects.
// Finally, we decrypt the ETags of these objects using
// the decryption keys.
// However, we must also adjust the size and ETags of all
// objects (not just the SSE-S3 single part objects).
// For example, the ETag of SSE-KMS objects are random values
// and the size of an SSE-KMS object must be adjusted as well.
SSES3Objects := make(map[int]bool, 10)
metadata = metadata[:0:N]
buckets = buckets[:0:N]
names = names[:0:N]
for i, object := range objects[:N] {
if kind, ok := crypto.IsEncrypted(object.UserDefined); ok && kind == crypto.S3 && !crypto.IsMultiPart(object.UserDefined) {
metadata = append(metadata, object.UserDefined)
buckets = append(buckets, object.Bucket)
names = append(names, object.Name)
SSES3Objects[i] = true
}
}
keys, err := crypto.S3.UnsealObjectKeys(KMS, metadata, buckets, names)
// There is at least one SSE-S3 single-part object.
// For all SSE-S3 single-part objects we have to
// fetch their decryption keys. We do this using
// a Bulk-Decryption API call, if available.
keys, err := crypto.S3.UnsealObjectKeys(ctx, KMS, metadata, buckets, names)
if err != nil {
return err
}
var keyIndex int
for i := range objects[:N] {
size, err := objects[i].GetActualSize()
// Now, we have to decrypt the ETags of SSE-S3 single-part
// objects and adjust the size and ETags of all encrypted
// objects.
for i := range batch {
size, err := batch[i].GetActualSize()
if err != nil {
return err
}
objects[i].Size = size
batch[i].Size = size
if !SSES3Objects[i] {
objects[i].ETag = objects[i].GetActualETag(nil)
} else {
ETag, err := etag.Parse(objects[i].ETag)
if _, ok := crypto.IsEncrypted(batch[i].UserDefined); ok {
ETag, err := etag.Parse(batch[i].ETag)
if err != nil {
return err
}
if ETag.IsEncrypted() {
tag, err := keys[keyIndex].UnsealETag(ETag)
if SSES3SinglePartObjects[i] && ETag.IsEncrypted() {
ETag, err = etag.Decrypt(keys[0][:], ETag)
if err != nil {
return err
}
ETag = etag.ETag(tag)
objects[i].ETag = ETag.String()
keys = keys[1:]
}
keyIndex++
batch[i].ETag = ETag.Format().String()
}
}
objects = objects[N:]

View File

@@ -386,13 +386,13 @@ func getHostFromSrv(records []dns.SrvRecord) (host string) {
}
// IsCompressed returns true if the object is marked as compressed.
func (o ObjectInfo) IsCompressed() bool {
func (o *ObjectInfo) IsCompressed() bool {
_, ok := o.UserDefined[ReservedMetadataPrefix+"compression"]
return ok
}
// IsCompressedOK returns whether the object is compressed and can be decompressed.
func (o ObjectInfo) IsCompressedOK() (bool, error) {
func (o *ObjectInfo) IsCompressedOK() (bool, error) {
scheme, ok := o.UserDefined[ReservedMetadataPrefix+"compression"]
if !ok {
return false, nil
@@ -404,17 +404,8 @@ func (o ObjectInfo) IsCompressedOK() (bool, error) {
return true, fmt.Errorf("unknown compression scheme: %s", scheme)
}
// GetActualETag - returns the actual etag of the stored object
// decrypts SSE objects.
func (o ObjectInfo) GetActualETag(h http.Header) string {
if _, ok := crypto.IsEncrypted(o.UserDefined); !ok {
return o.ETag
}
return getDecryptedETag(h, o, false)
}
// GetActualSize - returns the actual size of the stored object
func (o ObjectInfo) GetActualSize() (int64, error) {
func (o *ObjectInfo) GetActualSize() (int64, error) {
if o.IsCompressed() {
sizeStr, ok := o.UserDefined[ReservedMetadataPrefix+"actual-size"]
if !ok {