mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
fix: under FanOut API avoid repeated md5sum calculation (#17572)
md5sum calculation has a high CPU overhead, avoid calculating it repeatedly for similar fanOut calls. To fix following CPU profiler result ``` (pprof) top10 Showing nodes accounting for 678.68s, 84.67% of 801.54s total Dropped 1072 nodes (cum <= 4.01s) Showing top 10 nodes out of 156 flat flat% sum% cum cum% 332.54s 41.49% 41.49% 332.54s 41.49% runtime/internal/syscall.Syscall6 228.39s 28.49% 69.98% 228.39s 28.49% crypto/md5.block 48.07s 6.00% 75.98% 48.07s 6.00% runtime.memmove 28.91s 3.61% 79.59% 28.91s 3.61% github.com/minio/highwayhash.updateAVX2 8.25s 1.03% 80.61% 8.25s 1.03% runtime.futex 8.25s 1.03% 81.64% 10.81s 1.35% runtime.step 6.99s 0.87% 82.52% 22.35s 2.79% runtime.pcvalue 6.67s 0.83% 83.35% 38.90s 4.85% runtime.mallocgc 5.77s 0.72% 84.07% 32.61s 4.07% runtime.gentraceback 4.84s 0.6% 84.67% 10.49s 1.31% runtime.lock2 ```
This commit is contained in:
parent
f6b48ed02a
commit
0bc34952eb
@ -20,7 +20,9 @@ package cmd
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
@ -1290,15 +1292,20 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
buf := bytebufferpool.Get()
|
||||
defer bytebufferpool.Put(buf)
|
||||
|
||||
md5w := md5.New()
|
||||
|
||||
// Maximum allowed fan-out object size.
|
||||
const maxFanOutSize = 16 << 20
|
||||
|
||||
n, err := io.Copy(buf, ioutil.HardLimitReader(pReader, maxFanOutSize))
|
||||
n, err := io.Copy(io.MultiWriter(buf, md5w), ioutil.HardLimitReader(pReader, maxFanOutSize))
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Set the correct hex md5sum for the fan-out stream.
|
||||
fanOutOpts.MD5Hex = hex.EncodeToString(md5w.Sum(nil))
|
||||
|
||||
concurrentSize := 100
|
||||
if runtime.GOMAXPROCS(0) < concurrentSize {
|
||||
concurrentSize = runtime.GOMAXPROCS(0)
|
||||
|
@ -36,6 +36,7 @@ type fanOutOptions struct {
|
||||
Key []byte
|
||||
KmsCtx kms.Context
|
||||
Checksum *hash.Checksum
|
||||
MD5Hex string
|
||||
}
|
||||
|
||||
// fanOutPutObject takes an input source reader and fans out multiple PUT operations
|
||||
@ -53,7 +54,14 @@ func fanOutPutObject(ctx context.Context, bucket string, objectAPI ObjectLayer,
|
||||
|
||||
objInfos[idx] = ObjectInfo{Name: req.Key}
|
||||
|
||||
hr, err := hash.NewReader(bytes.NewReader(fanOutBuf), int64(len(fanOutBuf)), "", "", -1)
|
||||
hopts := hash.Options{
|
||||
Size: int64(len(fanOutBuf)),
|
||||
MD5Hex: opts.MD5Hex,
|
||||
SHA256Hex: "",
|
||||
ActualSize: -1,
|
||||
DisableMD5: true,
|
||||
}
|
||||
hr, err := hash.NewReaderWithOpts(bytes.NewReader(fanOutBuf), hopts)
|
||||
if err != nil {
|
||||
errs[idx] = err
|
||||
return
|
||||
|
@ -53,12 +53,33 @@ type Reader struct {
|
||||
// Content checksum
|
||||
contentHash Checksum
|
||||
contentHasher hash.Hash
|
||||
disableMD5 bool
|
||||
|
||||
trailer http.Header
|
||||
|
||||
sha256 hash.Hash
|
||||
}
|
||||
|
||||
// Options are optional arguments to NewReaderWithOpts, Options
|
||||
// simply converts positional arguments to NewReader() into a
|
||||
// more flexible way to provide optional inputs. This is currently
|
||||
// used by the FanOut API call mostly to disable expensive md5sum
|
||||
// calculation repeatedly under hash.Reader.
|
||||
type Options struct {
|
||||
MD5Hex string
|
||||
SHA256Hex string
|
||||
Size int64
|
||||
ActualSize int64
|
||||
DisableMD5 bool
|
||||
}
|
||||
|
||||
// NewReaderWithOpts is like NewReader but takes `Options` as argument, allowing
|
||||
// callers to indicate if they want to disable md5sum checksum.
|
||||
func NewReaderWithOpts(src io.Reader, opts Options) (*Reader, error) {
|
||||
// return hard limited reader
|
||||
return newReader(src, opts.Size, opts.MD5Hex, opts.SHA256Hex, opts.ActualSize, opts.DisableMD5)
|
||||
}
|
||||
|
||||
// NewReader returns a new Reader that wraps src and computes
|
||||
// MD5 checksum of everything it reads as ETag.
|
||||
//
|
||||
@ -75,11 +96,10 @@ type Reader struct {
|
||||
// NewReader enforces S3 compatibility strictly by ensuring caller
|
||||
// does not send more content than specified size.
|
||||
func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64) (*Reader, error) {
|
||||
// return hard limited reader
|
||||
return newReader(src, size, md5Hex, sha256Hex, actualSize)
|
||||
return newReader(src, size, md5Hex, sha256Hex, actualSize, false)
|
||||
}
|
||||
|
||||
func newReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64) (*Reader, error) {
|
||||
func newReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64, disableMD5 bool) (*Reader, error) {
|
||||
MD5, err := hex.DecodeString(md5Hex)
|
||||
if err != nil {
|
||||
return nil, BadDigest{ // TODO(aead): Return an error that indicates that an invalid ETag has been specified
|
||||
@ -131,13 +151,19 @@ func newReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize i
|
||||
|
||||
if size >= 0 {
|
||||
r := ioutil.HardLimitReader(src, size)
|
||||
if _, ok := src.(etag.Tagger); !ok {
|
||||
src = etag.NewReader(r, MD5)
|
||||
if !disableMD5 {
|
||||
if _, ok := src.(etag.Tagger); !ok {
|
||||
src = etag.NewReader(r, MD5)
|
||||
} else {
|
||||
src = etag.Wrap(r, src)
|
||||
}
|
||||
} else {
|
||||
src = etag.Wrap(r, src)
|
||||
src = r
|
||||
}
|
||||
} else if _, ok := src.(etag.Tagger); !ok {
|
||||
src = etag.NewReader(src, MD5)
|
||||
if !disableMD5 {
|
||||
src = etag.NewReader(src, MD5)
|
||||
}
|
||||
}
|
||||
var h hash.Hash
|
||||
if len(SHA256) != 0 {
|
||||
@ -150,6 +176,7 @@ func newReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize i
|
||||
checksum: MD5,
|
||||
contentSHA256: SHA256,
|
||||
sha256: h,
|
||||
disableMD5: disableMD5,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -296,21 +323,15 @@ func (r *Reader) ETag() etag.ETag {
|
||||
return nil
|
||||
}
|
||||
|
||||
// MD5 returns the MD5 checksum set as reference value.
|
||||
//
|
||||
// It corresponds to the checksum that is expected and
|
||||
// not the actual MD5 checksum of the content.
|
||||
// Therefore, refer to MD5Current.
|
||||
func (r *Reader) MD5() []byte {
|
||||
return r.checksum
|
||||
}
|
||||
|
||||
// MD5Current returns the MD5 checksum of the content
|
||||
// that has been read so far.
|
||||
//
|
||||
// Calling MD5Current again after reading more data may
|
||||
// result in a different checksum.
|
||||
func (r *Reader) MD5Current() []byte {
|
||||
if r.disableMD5 {
|
||||
return r.checksum
|
||||
}
|
||||
return r.ETag()[:]
|
||||
}
|
||||
|
||||
@ -322,16 +343,6 @@ func (r *Reader) SHA256() []byte {
|
||||
return r.contentSHA256
|
||||
}
|
||||
|
||||
// MD5HexString returns a hex representation of the MD5.
|
||||
func (r *Reader) MD5HexString() string {
|
||||
return hex.EncodeToString(r.checksum)
|
||||
}
|
||||
|
||||
// MD5Base64String returns a hex representation of the MD5.
|
||||
func (r *Reader) MD5Base64String() string {
|
||||
return base64.StdEncoding.EncodeToString(r.checksum)
|
||||
}
|
||||
|
||||
// SHA256HexString returns a hex representation of the SHA256.
|
||||
func (r *Reader) SHA256HexString() string {
|
||||
return hex.EncodeToString(r.contentSHA256)
|
||||
|
@ -19,6 +19,7 @@ package hash
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -37,14 +38,15 @@ func TestHashReaderHelperMethods(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if r.MD5HexString() != "e2fc714c4727ee9395f324cd2e7f331f" {
|
||||
t.Errorf("Expected md5hex \"e2fc714c4727ee9395f324cd2e7f331f\", got %s", r.MD5HexString())
|
||||
md5sum := r.MD5Current()
|
||||
if hex.EncodeToString(md5sum) != "e2fc714c4727ee9395f324cd2e7f331f" {
|
||||
t.Errorf("Expected md5hex \"e2fc714c4727ee9395f324cd2e7f331f\", got %s", hex.EncodeToString(md5sum))
|
||||
}
|
||||
if r.SHA256HexString() != "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031589" {
|
||||
t.Errorf("Expected sha256hex \"88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031589\", got %s", r.SHA256HexString())
|
||||
}
|
||||
if r.MD5Base64String() != "4vxxTEcn7pOV8yTNLn8zHw==" {
|
||||
t.Errorf("Expected md5base64 \"4vxxTEcn7pOV8yTNLn8zHw==\", got \"%s\"", r.MD5Base64String())
|
||||
if base64.StdEncoding.EncodeToString(md5sum) != "4vxxTEcn7pOV8yTNLn8zHw==" {
|
||||
t.Errorf("Expected md5base64 \"4vxxTEcn7pOV8yTNLn8zHw==\", got \"%s\"", base64.StdEncoding.EncodeToString(md5sum))
|
||||
}
|
||||
if r.Size() != 4 {
|
||||
t.Errorf("Expected size 4, got %d", r.Size())
|
||||
@ -56,9 +58,6 @@ func TestHashReaderHelperMethods(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(r.MD5(), expectedMD5) {
|
||||
t.Errorf("Expected md5hex \"e2fc714c4727ee9395f324cd2e7f331f\", got %s", r.MD5HexString())
|
||||
}
|
||||
if !bytes.Equal(r.MD5Current(), expectedMD5) {
|
||||
t.Errorf("Expected md5hex \"e2fc714c4727ee9395f324cd2e7f331f\", got %s", hex.EncodeToString(r.MD5Current()))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user