feat: SSE-KMS use uuid instead of read all data to md5. (#17958)

This commit is contained in:
jiuker
2023-09-19 01:00:54 +08:00
committed by GitHub
parent a00db4267c
commit 9947c01c8e
23 changed files with 133 additions and 56 deletions

View File

@@ -567,7 +567,7 @@ func isReqAuthenticated(ctx context.Context, r *http.Request, region string, sty
// Verify 'Content-Md5' and/or 'X-Amz-Content-Sha256' if present.
// The verification happens implicit during reading.
reader, err := hash.NewReader(r.Body, -1, clientETag.String(), hex.EncodeToString(contentSHA256), -1)
reader, err := hash.NewReader(ctx, r.Body, -1, clientETag.String(), hex.EncodeToString(contentSHA256), -1)
if err != nil {
return toAPIErrorCode(ctx, err)
}

View File

@@ -166,7 +166,7 @@ func (r *BatchJobReplicateV1) ReplicateFromSource(ctx context.Context, api Objec
}
defer rd.Close()
hr, err := hash.NewReader(rd, objInfo.Size, "", "", objInfo.Size)
hr, err := hash.NewReader(ctx, rd, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
@@ -229,7 +229,7 @@ func (r *BatchJobReplicateV1) copyWithMultipartfromSource(ctx context.Context, a
}
defer rd.Close()
hr, err = hash.NewReader(io.LimitReader(rd, objInfo.Size), objInfo.Size, "", "", objInfo.Size)
hr, err = hash.NewReader(ctx, io.LimitReader(rd, objInfo.Size), objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}

View File

@@ -1139,7 +1139,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
hashReader, err := hash.NewReader(reader, fileSize, "", "", fileSize)
hashReader, err := hash.NewReader(ctx, reader, fileSize, "", "", fileSize)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
@@ -1254,7 +1254,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
// do not try to verify encrypted content/
hashReader, err = hash.NewReader(reader, -1, "", "", -1)
hashReader, err = hash.NewReader(ctx, reader, -1, "", "", -1)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return

View File

@@ -1571,7 +1571,7 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
)
for _, partInfo := range objInfo.Parts {
hr, err = hash.NewReader(io.LimitReader(r, partInfo.ActualSize), partInfo.ActualSize, "", "", partInfo.ActualSize)
hr, err = hash.NewReader(ctx, io.LimitReader(r, partInfo.ActualSize), partInfo.ActualSize, "", "", partInfo.ActualSize)
if err != nil {
return err
}

View File

@@ -71,7 +71,7 @@ func deleteConfig(ctx context.Context, objAPI objectDeleter, configFile string)
}
func saveConfigWithOpts(ctx context.Context, store objectIO, configFile string, data []byte, opts ObjectOptions) error {
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
hashReader, err := hash.NewReader(ctx, bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
if err != nil {
return err
}

View File

@@ -998,7 +998,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
return err
}
hr, err := hash.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), "", "", int64(buf.Len()))
hr, err := hash.NewReader(ctx, bytes.NewReader(buf.Bytes()), int64(buf.Len()), "", "", int64(buf.Len()))
if err != nil {
return err
}

View File

@@ -1441,7 +1441,7 @@ func newCachePartEncryptReader(ctx context.Context, bucket, object string, partI
info := ObjectInfo{Size: size}
wantSize = info.EncryptedSize()
}
hReader, err := hash.NewReader(content, wantSize, "", "", size)
hReader, err := hash.NewReader(ctx, content, wantSize, "", "", size)
if err != nil {
return nil, err
}

View File

@@ -734,7 +734,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
defer cLock.Unlock(lkctx)
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", r.ActualSize())
hashReader, err := hash.NewReader(ctx, pipeReader, size, "", "", r.ActualSize())
if err != nil {
return
}
@@ -795,7 +795,7 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
if st == CommitComplete || st.String() == "" {
return
}
hashReader, err := hash.NewReader(cReader, oi.Size, "", "", oi.Size)
hashReader, err := hash.NewReader(ctx, cReader, oi.Size, "", "", oi.Size)
if err != nil {
return
}
@@ -1059,7 +1059,7 @@ func (c *cacheObjects) PutObjectPart(ctx context.Context, bucket, object, upload
info = PartInfo{}
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", data.ActualSize())
hashReader, err := hash.NewReader(ctx, pipeReader, size, "", "", data.ActualSize())
if err != nil {
return
}

View File

@@ -2155,7 +2155,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
defer gr.Close()
hashReader, err := hash.NewReader(gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size)
hashReader, err := hash.NewReader(ctx, gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size)
if err != nil {
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
@@ -2180,7 +2180,7 @@ func (er erasureObjects) restoreTransitionedObject(ctx context.Context, bucket s
// rehydrate the parts back on disk as per the original xl.meta prior to transition
for _, partInfo := range oi.Parts {
hr, err := hash.NewReader(io.LimitReader(gr, partInfo.Size), partInfo.Size, "", "", partInfo.Size)
hr, err := hash.NewReader(ctx, io.LimitReader(gr, partInfo.Size), partInfo.Size, "", "", partInfo.Size)
if err != nil {
return setRestoreHeaderFn(oi, err)
}

View File

@@ -608,7 +608,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, ObjectOptions{})
parts := make([]CompletePart, len(objInfo.Parts))
for i, part := range objInfo.Parts {
hr, err := hash.NewReader(io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
if err != nil {
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
}
@@ -642,7 +642,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
return err
}
hr, err := hash.NewReader(io.LimitReader(gr, objInfo.Size), objInfo.Size, "", "", actualSize)
hr, err := hash.NewReader(ctx, io.LimitReader(gr, objInfo.Size), objInfo.Size, "", "", actualSize)
if err != nil {
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
}

View File

@@ -736,7 +736,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
parts := make([]CompletePart, len(oi.Parts))
for i, part := range oi.Parts {
hr, err := hash.NewReader(io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
hr, err := hash.NewReader(ctx, io.LimitReader(gr, part.Size), part.Size, "", "", part.ActualSize)
if err != nil {
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
}
@@ -766,7 +766,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
return err
}
hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize)
hr, err := hash.NewReader(ctx, gr, oi.Size, "", "", actualSize)
if err != nil {
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
}

View File

@@ -761,7 +761,7 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
return nil
}
o.debugln(color.Green("saveMetaCacheStream:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
r, err := hash.NewReader(ctx, bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = er.putMetacacheObject(ctx, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{

View File

@@ -342,6 +342,15 @@ func mustGetUUID() string {
return u.String()
}
// mustGetUUIDBytes - get a random UUID as 16 bytes unencoded.
func mustGetUUIDBytes() []byte {
u, err := uuid.NewRandom()
if err != nil {
logger.CriticalIf(GlobalContext, err)
}
return u[:]
}
// Create an s3 compatible MD5sum for complete multipart transaction.
func getCompleteMultipartMD5(parts []CompletePart) string {
var finalMD5Bytes []byte

View File

@@ -1182,7 +1182,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
compressMetadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
compressMetadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10)
reader = etag.NewReader(reader, nil)
reader = etag.NewReader(ctx, reader, nil, nil)
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(reader, actualSize, wantEncryption)
dstOpts.IndexCB = cb
@@ -1195,7 +1195,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
reader = gr
}
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualSize)
srcInfo.Reader, err = hash.NewReader(ctx, reader, length, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -1316,7 +1316,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
// do not try to verify encrypted content
srcInfo.Reader, err = hash.NewReader(reader, targetSize, "", "", actualSize)
srcInfo.Reader, err = hash.NewReader(ctx, reader, targetSize, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -1743,7 +1743,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
actualReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -1763,8 +1763,20 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
md5hex = "" // Do not try to verify the content.
sha256hex = ""
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
var hashReader *hash.Reader
// Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag
if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header)) {
hashReader, err = hash.NewReaderWithOpts(ctx, reader, hash.Options{
Size: size,
MD5Hex: md5hex,
SHA256Hex: sha256hex,
ActualSize: actualSize,
DisableMD5: false,
ForceMD5: mustGetUUIDBytes(),
})
} else {
hashReader, err = hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
}
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -1860,7 +1872,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -2077,7 +2089,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
}
}
hreader, err := hash.NewReader(reader, size, md5hex, sha256hex, size)
hreader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, size)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -2128,7 +2140,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
actualReader, err := hash.NewReader(reader, size, "", "", actualSize)
actualReader, err := hash.NewReader(ctx, reader, size, "", "", actualSize)
if err != nil {
return err
}
@@ -2142,7 +2154,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
size = -1 // Since compressed size is un-predictable.
}
hashReader, err := hash.NewReader(reader, size, "", "", actualSize)
hashReader, err := hash.NewReader(ctx, reader, size, "", "", actualSize)
if err != nil {
return err
}
@@ -2212,7 +2224,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
if err != nil {
return err
}

View File

@@ -448,7 +448,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
}
actualPartSize = length
var reader io.Reader = etag.NewReader(gr, nil)
var reader io.Reader = etag.NewReader(ctx, gr, nil, nil)
mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts)
if err != nil {
@@ -471,7 +471,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
length = -1
}
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize)
srcInfo.Reader, err = hash.NewReader(ctx, reader, length, "", "", actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -526,7 +526,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
wantSize = info.EncryptedSize()
}
srcInfo.Reader, err = hash.NewReader(reader, wantSize, "", "", actualPartSize)
srcInfo.Reader, err = hash.NewReader(ctx, reader, wantSize, "", "", actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -715,7 +715,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
var idxCb func() []byte
if isCompressed {
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
actualReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -736,7 +736,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
sha256hex = ""
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
hashReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -798,7 +798,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
wantSize = info.EncryptedSize()
}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return

View File

@@ -61,7 +61,7 @@ func fanOutPutObject(ctx context.Context, bucket string, objectAPI ObjectLayer,
ActualSize: -1,
DisableMD5: true,
}
hr, err := hash.NewReaderWithOpts(bytes.NewReader(fanOutBuf), hopts)
hr, err := hash.NewReaderWithOpts(ctx, bytes.NewReader(fanOutBuf), hopts)
if err != nil {
errs[idx] = err
return
@@ -91,7 +91,7 @@ func fanOutPutObject(ctx context.Context, bucket string, objectAPI ObjectLayer,
}
// do not try to verify encrypted content/
hr, err = hash.NewReader(encrd, -1, "", "", -1)
hr, err = hash.NewReader(ctx, encrd, -1, "", "", -1)
if err != nil {
errs[idx] = err
return

View File

@@ -162,7 +162,7 @@ func calculateSignedChunkLength(chunkDataSize int64) int64 {
}
func mustGetPutObjReader(t TestErrHandler, data io.Reader, size int64, md5hex, sha256hex string) *PutObjReader {
hr, err := hash.NewReader(data, size, md5hex, sha256hex, size)
hr, err := hash.NewReader(context.Background(), data, size, md5hex, sha256hex, size)
if err != nil {
t.Fatal(err)
}

View File

@@ -287,7 +287,7 @@ func (config *TierConfigMgr) getDriver(tierName string) (d WarmBackend, err erro
// using a PutObject API. PutObjReader encrypts json encoded tier configurations
// if KMS is enabled, otherwise simply yields the json encoded bytes as is.
// Similarly, ObjectOptions value depends on KMS' status.
func (config *TierConfigMgr) configReader() (*PutObjReader, *ObjectOptions, error) {
func (config *TierConfigMgr) configReader(ctx context.Context) (*PutObjReader, *ObjectOptions, error) {
b, err := config.Bytes()
if err != nil {
return nil, nil, err
@@ -295,7 +295,7 @@ func (config *TierConfigMgr) configReader() (*PutObjReader, *ObjectOptions, erro
payloadSize := int64(len(b))
br := bytes.NewReader(b)
hr, err := hash.NewReader(br, payloadSize, "", "", payloadSize)
hr, err := hash.NewReader(ctx, br, payloadSize, "", "", payloadSize)
if err != nil {
return nil, nil, err
}
@@ -318,7 +318,7 @@ func (config *TierConfigMgr) configReader() (*PutObjReader, *ObjectOptions, erro
Size: payloadSize,
}
encSize := info.EncryptedSize()
encHr, err := hash.NewReader(encBr, encSize, "", "", encSize)
encHr, err := hash.NewReader(ctx, encBr, encSize, "", "", encSize)
if err != nil {
return nil, nil, err
}
@@ -371,7 +371,7 @@ func (config *TierConfigMgr) Save(ctx context.Context, objAPI ObjectLayer) error
return errServerNotInitialized
}
pr, opts, err := globalTierConfigMgr.configReader()
pr, opts, err := globalTierConfigMgr.configReader(ctx)
if err != nil {
return err
}