From 69bf39f42e546f5e3bc774fec1eb1699c64f5bf5 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 21 Jul 2022 16:47:58 -0700 Subject: [PATCH] fix: make complete multipart uploads faster encrypted/compressed backends (#15375) - Only fetch the parts we need and abort as soon as one is missing. - Only fetch the number of parts requested by "ListObjectParts". --- cmd/erasure-multipart.go | 51 +++++++++++++++++++------------- cmd/object-multipart-handlers.go | 11 +++---- cmd/storage-datatypes.go | 1 + cmd/storage-datatypes_gen.go | 35 ++++++++++++++++++---- cmd/xl-storage.go | 6 +++- 5 files changed, 72 insertions(+), 32 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 1675d46a3..179259ac2 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -830,16 +830,33 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, err } + if maxParts == 0 { + return result, nil + } + + if partNumberMarker < 0 { + partNumberMarker = 0 + } + + // Limit output to maxPartsList. + if maxParts > maxPartsList-partNumberMarker { + maxParts = maxPartsList - partNumberMarker + } + // Read Part info for all parts partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" req := ReadMultipleReq{ - Bucket: minioMetaMultipartBucket, - Prefix: partPath, - MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + MaxResults: maxParts + 1, } + start := partNumberMarker + 1 + end := start + maxParts + // Parts are 1 based, so index 0 is part one, etc. - for i := 1; i <= maxPartsList; i++ { + for i := start; i <= end; i++ { req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i)) } @@ -861,13 +878,9 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, nil } - // Limit output to maxPartsList. - if maxParts > maxPartsList { - maxParts = maxPartsList - } - var partI ObjectPartInfo for i, part := range partInfoFiles { + partN := i + partNumberMarker + 1 if part.Error != "" || !part.Exists { continue } @@ -879,7 +892,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up continue } - if i+1 != partI.Number { + if partN != partI.Number { logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number)) continue } @@ -889,12 +902,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up } // Only parts with higher part numbers will be listed. - partIdx := objectPartIndex(fi.Parts, partNumberMarker) parts := fi.Parts - if partIdx != -1 { - parts = fi.Parts[partIdx+1:] - } - count := maxParts result.Parts = make([]PartInfo, 0, len(parts)) for _, part := range parts { result.Parts = append(result.Parts, PartInfo{ @@ -904,11 +912,11 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up ActualSize: part.ActualSize, Size: part.Size, }) - count-- - if count == 0 { + if len(result.Parts) >= maxParts { break } } + // If listed entries are more than maxParts, we set IsTruncated as true. if len(parts) > len(result.Parts) { result.IsTruncated = true @@ -972,10 +980,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Read Part info for all parts partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" req := ReadMultipleReq{ - Bucket: minioMetaMultipartBucket, - Prefix: partPath, - MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. - Files: make([]string, 0, len(parts)), + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + Files: make([]string, 0, len(parts)), + AbortOn404: true, } for _, part := range parts { req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber)) diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 40484431c..82497a926 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -614,6 +614,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite // // Therefore, we adjust all ETags sent by the client to match what is stored // on the backend. + // TODO(klauspost): This should be done while object is finalized instead of fetching the data twice if objectAPI.IsEncryptionSupported() { mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) if err != nil { @@ -622,8 +623,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } if _, ok := crypto.IsEncrypted(mi.UserDefined); ok { - const MaxParts = 10000 - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, MaxParts, ObjectOptions{}) + // Only fetch parts in between first and last. + // We already checked if we have at least one part. + start := complMultipartUpload.Parts[0].PartNumber + maxParts := complMultipartUpload.Parts[len(complMultipartUpload.Parts)-1].PartNumber - start + 1 + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, start-1, maxParts, ObjectOptions{}) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -631,9 +635,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite sort.Slice(listPartsInfo.Parts, func(i, j int) bool { return listPartsInfo.Parts[i].PartNumber < listPartsInfo.Parts[j].PartNumber }) - sort.Slice(complMultipartUpload.Parts, func(i, j int) bool { - return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber - }) for i := range listPartsInfo.Parts { for j := range complMultipartUpload.Parts { if listPartsInfo.Parts[i].PartNumber == complMultipartUpload.Parts[j].PartNumber { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index b12028664..fb66d488e 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -285,6 +285,7 @@ type ReadMultipleReq struct { MaxSize int64 // Return error if size is exceed. MetadataOnly bool // Read as XL meta and truncate data. AbortOn404 bool // Stop reading after first file not found. + MaxResults int // Stop after this many successful results. <= 0 means all. } // ReadMultipleResp contains a single response from a ReadMultipleReq. diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 92125605f..433d49686 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -1745,6 +1745,12 @@ func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "AbortOn404") return } + case "MaxResults": + z.MaxResults, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "MaxResults") + return + } default: err = dc.Skip() if err != nil { @@ -1758,9 +1764,9 @@ func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *ReadMultipleReq) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "Bucket" - err = en.Append(0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + err = en.Append(0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) if err != nil { return } @@ -1826,15 +1832,25 @@ func (z *ReadMultipleReq) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "AbortOn404") return } + // write "MaxResults" + err = en.Append(0xaa, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteInt(z.MaxResults) + if err != nil { + err = msgp.WrapError(err, "MaxResults") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *ReadMultipleReq) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "Bucket" - o = append(o, 0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = append(o, 0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) o = msgp.AppendString(o, z.Bucket) // string "Prefix" o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) @@ -1854,6 +1870,9 @@ func (z *ReadMultipleReq) MarshalMsg(b []byte) (o []byte, err error) { // string "AbortOn404" o = append(o, 0xaa, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x4f, 0x6e, 0x34, 0x30, 0x34) o = msgp.AppendBool(o, z.AbortOn404) + // string "MaxResults" + o = append(o, 0xaa, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73) + o = msgp.AppendInt(o, z.MaxResults) return } @@ -1924,6 +1943,12 @@ func (z *ReadMultipleReq) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "AbortOn404") return } + case "MaxResults": + z.MaxResults, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MaxResults") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1942,7 +1967,7 @@ func (z *ReadMultipleReq) Msgsize() (s int) { for za0001 := range z.Files { s += msgp.StringPrefixSize + len(z.Files[za0001]) } - s += 8 + msgp.Int64Size + 13 + msgp.BoolSize + 11 + msgp.BoolSize + s += 8 + msgp.Int64Size + 13 + msgp.BoolSize + 11 + msgp.BoolSize + 11 + msgp.IntSize return } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6bbe82289..119099f5d 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2573,7 +2573,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp defer close(resp) volumeDir := pathJoin(s.diskPath, req.Bucket) - + found := 0 for _, f := range req.Files { if contextCanceled(ctx) { return ctx.Err() @@ -2617,10 +2617,14 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp resp <- r continue } + found++ r.Exists = true r.Data = data r.Modtime = mt resp <- r + if req.MaxResults > 0 && found >= req.MaxResults { + return nil + } } return nil }