add support for SSE-S3 bulk ETag decryption (#14627)

This commit adds support for bulk ETag
decryption for SSE-S3 encrypted objects.

If KES supports a bulk decryption API, then
MinIO will check whether its policy grants
access to this API. If so, MinIO will use
a bulk API call instead of sending encrypted
ETags serially to KES.

Note that MinIO will not use the KES bulk API
if its client certificate is an admin identity.

MinIO will process object listings in batches.
A batch has a configurable size that can be set
via `MINIO_KMS_KES_BULK_API_BATCH_SIZE=N`.
It defaults to `500`.

This env. variable is experimental and may be
renamed / removed in the future.

Signed-off-by: Andreas Auernhammer <hi@aead.dev>
This commit is contained in:
Andreas Auernhammer
2022-03-25 23:01:41 +01:00
committed by GitHub
parent 3970204009
commit 4d2fc530d0
9 changed files with 252 additions and 60 deletions

View File

@@ -24,28 +24,12 @@ import (
"strings"
"github.com/gorilla/mux"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/bucket/policy"
)
func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
g := errgroup.WithNErrs(len(objects)).WithConcurrency(500)
for index := range objects {
index := index
g.Go(func() error {
size, err := objects[index].GetActualSize()
if err == nil {
objects[index].Size = size
}
objects[index].ETag = objects[index].GetActualETag(nil)
return nil
}, index)
}
g.Wait()
}
// Validate all the ListObjects query arguments, returns an APIErrorCode
// if one of the args do not meet the required conditions.
// Special conditions required by MinIO server are as below
@@ -116,7 +100,10 @@ func (api objectAPIHandlers) ListObjectVersionsHandler(w http.ResponseWriter, r
return
}
concurrentDecryptETag(ctx, listObjectVersionsInfo.Objects)
if err = DecryptETags(ctx, GlobalKMS, listObjectVersionsInfo.Objects, kms.BatchSize()); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
response := generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delimiter, encodingType, maxkeys, listObjectVersionsInfo)
@@ -178,7 +165,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
return
}
concurrentDecryptETag(ctx, listObjectsV2Info.Objects)
if err = DecryptETags(ctx, GlobalKMS, listObjectsV2Info.Objects, kms.BatchSize()); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// The next continuation token has id@node_index format to optimize paginated listing
nextContinuationToken := listObjectsV2Info.NextContinuationToken
@@ -253,7 +243,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
return
}
concurrentDecryptETag(ctx, listObjectsV2Info.Objects)
if err = DecryptETags(ctx, GlobalKMS, listObjectsV2Info.Objects, kms.BatchSize()); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
response := generateListObjectsV2Response(bucket, prefix, token, listObjectsV2Info.NextContinuationToken, startAfter,
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
@@ -350,7 +343,10 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
return
}
concurrentDecryptETag(ctx, listObjectsInfo.Objects)
if err = DecryptETags(ctx, GlobalKMS, listObjectsInfo.Objects, kms.BatchSize()); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, encodingType, maxKeys, listObjectsInfo)

View File

@@ -19,6 +19,7 @@ package cmd
import (
"bufio"
"context"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
@@ -35,6 +36,7 @@ import (
"github.com/minio/kes"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/etag"
"github.com/minio/minio/internal/fips"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/kms"
@@ -80,6 +82,7 @@ func (o *MultipartInfo) KMSKeyID() string { return kmsKeyIDFromMetadata(o.UserDe
// metadata, if any. It returns an empty ID if no key ID is
// present.
func kmsKeyIDFromMetadata(metadata map[string]string) string {
const ARNPrefix = "arn:aws:kms:"
if len(metadata) == 0 {
return ""
}
@@ -87,10 +90,96 @@ func kmsKeyIDFromMetadata(metadata map[string]string) string {
if !ok {
return ""
}
if strings.HasPrefix(kmsID, "arn:aws:kms:") {
if strings.HasPrefix(kmsID, ARNPrefix) {
return kmsID
}
return "arn:aws:kms:" + kmsID
return ARNPrefix + kmsID
}
// DecryptETags dectypts all ObjectInfo ETags, if encrypted, using the KMS.
func DecryptETags(ctx context.Context, KMS kms.KMS, objects []ObjectInfo, batchSize int) error {
var (
metadata []map[string]string
buckets []string
names []string
)
for len(objects) > 0 {
var N int
if len(objects) < batchSize {
N = len(objects)
} else {
N = batchSize
}
SSES3Batch := true
for _, object := range objects[:N] {
if kind, ok := crypto.IsEncrypted(object.UserDefined); !ok || kind != crypto.S3 {
SSES3Batch = false
break
}
}
if !SSES3Batch {
for i := range objects[:N] {
size, err := objects[i].GetActualSize()
if err != nil {
return err
}
objects[i].Size = size
objects[i].ETag = objects[i].GetActualETag(nil)
}
objects = objects[N:]
continue
}
// Now, decrypt all ETags using the a specialized bulk decryption API, if available.
// We check the cap of all slices first, to avoid allocating them over and over again.
if cap(metadata) >= N {
metadata = metadata[:0:N]
} else {
metadata = make([]map[string]string, 0, N)
}
if cap(buckets) >= N {
buckets = buckets[:0:N]
} else {
buckets = make([]string, 0, N)
}
if cap(names) >= N {
names = names[:0:N]
} else {
names = make([]string, 0, N)
}
for _, object := range objects[:N] {
metadata = append(metadata, object.UserDefined)
buckets = append(buckets, object.Bucket)
names = append(names, object.Name)
}
keys, err := crypto.S3.UnsealObjectKeys(KMS, metadata, buckets, names)
if err != nil {
return err
}
for i := range objects[:N] {
size, err := objects[i].GetActualSize()
if err != nil {
return err
}
ETag, err := etag.Parse(objects[i].ETag)
if err != nil {
return err
}
if ETag.IsEncrypted() {
tag, err := keys[i].UnsealETag(ETag)
if err != nil {
return err
}
ETag = etag.ETag(tag)
objects[i].Size = size
objects[i].ETag = ETag.String()
}
}
objects = objects[N:]
}
return nil
}
// isMultipart returns true if the current object is