mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: make complete multipart uploads faster encrypted/compressed backends (#15375)
- Only fetch the parts we need and abort as soon as one is missing. - Only fetch the number of parts requested by "ListObjectParts".
This commit is contained in:
parent
f4d5c861f3
commit
69bf39f42e
@ -830,16 +830,33 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
return result, err
|
||||
}
|
||||
|
||||
if maxParts == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
if partNumberMarker < 0 {
|
||||
partNumberMarker = 0
|
||||
}
|
||||
|
||||
// Limit output to maxPartsList.
|
||||
if maxParts > maxPartsList-partNumberMarker {
|
||||
maxParts = maxPartsList - partNumberMarker
|
||||
}
|
||||
|
||||
// Read Part info for all parts
|
||||
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
||||
req := ReadMultipleReq{
|
||||
Bucket: minioMetaMultipartBucket,
|
||||
Prefix: partPath,
|
||||
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
||||
MaxResults: maxParts + 1,
|
||||
}
|
||||
|
||||
start := partNumberMarker + 1
|
||||
end := start + maxParts
|
||||
|
||||
// Parts are 1 based, so index 0 is part one, etc.
|
||||
for i := 1; i <= maxPartsList; i++ {
|
||||
for i := start; i <= end; i++ {
|
||||
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i))
|
||||
}
|
||||
|
||||
@ -861,13 +878,9 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Limit output to maxPartsList.
|
||||
if maxParts > maxPartsList {
|
||||
maxParts = maxPartsList
|
||||
}
|
||||
|
||||
var partI ObjectPartInfo
|
||||
for i, part := range partInfoFiles {
|
||||
partN := i + partNumberMarker + 1
|
||||
if part.Error != "" || !part.Exists {
|
||||
continue
|
||||
}
|
||||
@ -879,7 +892,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
continue
|
||||
}
|
||||
|
||||
if i+1 != partI.Number {
|
||||
if partN != partI.Number {
|
||||
logger.LogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", i+1, i+1, partI.Number))
|
||||
continue
|
||||
}
|
||||
@ -889,12 +902,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
}
|
||||
|
||||
// Only parts with higher part numbers will be listed.
|
||||
partIdx := objectPartIndex(fi.Parts, partNumberMarker)
|
||||
parts := fi.Parts
|
||||
if partIdx != -1 {
|
||||
parts = fi.Parts[partIdx+1:]
|
||||
}
|
||||
count := maxParts
|
||||
result.Parts = make([]PartInfo, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
result.Parts = append(result.Parts, PartInfo{
|
||||
@ -904,11 +912,11 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
ActualSize: part.ActualSize,
|
||||
Size: part.Size,
|
||||
})
|
||||
count--
|
||||
if count == 0 {
|
||||
if len(result.Parts) >= maxParts {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If listed entries are more than maxParts, we set IsTruncated as true.
|
||||
if len(parts) > len(result.Parts) {
|
||||
result.IsTruncated = true
|
||||
@ -976,6 +984,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
Prefix: partPath,
|
||||
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
||||
Files: make([]string, 0, len(parts)),
|
||||
AbortOn404: true,
|
||||
}
|
||||
for _, part := range parts {
|
||||
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))
|
||||
|
@ -614,6 +614,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
//
|
||||
// Therefore, we adjust all ETags sent by the client to match what is stored
|
||||
// on the backend.
|
||||
// TODO(klauspost): This should be done while object is finalized instead of fetching the data twice
|
||||
if objectAPI.IsEncryptionSupported() {
|
||||
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{})
|
||||
if err != nil {
|
||||
@ -622,8 +623,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
}
|
||||
|
||||
if _, ok := crypto.IsEncrypted(mi.UserDefined); ok {
|
||||
const MaxParts = 10000
|
||||
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, MaxParts, ObjectOptions{})
|
||||
// Only fetch parts in between first and last.
|
||||
// We already checked if we have at least one part.
|
||||
start := complMultipartUpload.Parts[0].PartNumber
|
||||
maxParts := complMultipartUpload.Parts[len(complMultipartUpload.Parts)-1].PartNumber - start + 1
|
||||
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, start-1, maxParts, ObjectOptions{})
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
@ -631,9 +635,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
sort.Slice(listPartsInfo.Parts, func(i, j int) bool {
|
||||
return listPartsInfo.Parts[i].PartNumber < listPartsInfo.Parts[j].PartNumber
|
||||
})
|
||||
sort.Slice(complMultipartUpload.Parts, func(i, j int) bool {
|
||||
return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber
|
||||
})
|
||||
for i := range listPartsInfo.Parts {
|
||||
for j := range complMultipartUpload.Parts {
|
||||
if listPartsInfo.Parts[i].PartNumber == complMultipartUpload.Parts[j].PartNumber {
|
||||
|
@ -285,6 +285,7 @@ type ReadMultipleReq struct {
|
||||
MaxSize int64 // Return error if size is exceed.
|
||||
MetadataOnly bool // Read as XL meta and truncate data.
|
||||
AbortOn404 bool // Stop reading after first file not found.
|
||||
MaxResults int // Stop after this many successful results. <= 0 means all.
|
||||
}
|
||||
|
||||
// ReadMultipleResp contains a single response from a ReadMultipleReq.
|
||||
|
@ -1745,6 +1745,12 @@ func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err, "AbortOn404")
|
||||
return
|
||||
}
|
||||
case "MaxResults":
|
||||
z.MaxResults, err = dc.ReadInt()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MaxResults")
|
||||
return
|
||||
}
|
||||
default:
|
||||
err = dc.Skip()
|
||||
if err != nil {
|
||||
@ -1758,9 +1764,9 @@ func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *ReadMultipleReq) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// map header, size 6
|
||||
// map header, size 7
|
||||
// write "Bucket"
|
||||
err = en.Append(0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
|
||||
err = en.Append(0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -1826,15 +1832,25 @@ func (z *ReadMultipleReq) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "AbortOn404")
|
||||
return
|
||||
}
|
||||
// write "MaxResults"
|
||||
err = en.Append(0xaa, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteInt(z.MaxResults)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MaxResults")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *ReadMultipleReq) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 6
|
||||
// map header, size 7
|
||||
// string "Bucket"
|
||||
o = append(o, 0x86, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
|
||||
o = append(o, 0x87, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
|
||||
o = msgp.AppendString(o, z.Bucket)
|
||||
// string "Prefix"
|
||||
o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78)
|
||||
@ -1854,6 +1870,9 @@ func (z *ReadMultipleReq) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
// string "AbortOn404"
|
||||
o = append(o, 0xaa, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x4f, 0x6e, 0x34, 0x30, 0x34)
|
||||
o = msgp.AppendBool(o, z.AbortOn404)
|
||||
// string "MaxResults"
|
||||
o = append(o, 0xaa, 0x4d, 0x61, 0x78, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73)
|
||||
o = msgp.AppendInt(o, z.MaxResults)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1924,6 +1943,12 @@ func (z *ReadMultipleReq) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "AbortOn404")
|
||||
return
|
||||
}
|
||||
case "MaxResults":
|
||||
z.MaxResults, bts, err = msgp.ReadIntBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MaxResults")
|
||||
return
|
||||
}
|
||||
default:
|
||||
bts, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
@ -1942,7 +1967,7 @@ func (z *ReadMultipleReq) Msgsize() (s int) {
|
||||
for za0001 := range z.Files {
|
||||
s += msgp.StringPrefixSize + len(z.Files[za0001])
|
||||
}
|
||||
s += 8 + msgp.Int64Size + 13 + msgp.BoolSize + 11 + msgp.BoolSize
|
||||
s += 8 + msgp.Int64Size + 13 + msgp.BoolSize + 11 + msgp.BoolSize + 11 + msgp.IntSize
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -2573,7 +2573,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
|
||||
defer close(resp)
|
||||
|
||||
volumeDir := pathJoin(s.diskPath, req.Bucket)
|
||||
|
||||
found := 0
|
||||
for _, f := range req.Files {
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
@ -2617,10 +2617,14 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
|
||||
resp <- r
|
||||
continue
|
||||
}
|
||||
found++
|
||||
r.Exists = true
|
||||
r.Data = data
|
||||
r.Modtime = mt
|
||||
resp <- r
|
||||
if req.MaxResults > 0 && found >= req.MaxResults {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user