implement a safer completeMultipart implementation (#20227)

- optimize writing part.N.meta by writing both part.N
  and its meta in sequence without network component.

- remove part.N.meta, part.N which were partially success
  ful, in quorum loss situations during renamePart()

- allow for strict read quorum check arbitrated via ETag
  for the given part number, this makes it double safer
  upon final commit.

- return an appropriate error when read quorum is missing,
  instead of returning InvalidPart{}, which is non-retryable
  error. This kind of situation can happen when many
  nodes are going offline in rotation, an example of such
  a restart() behavior is statefulset updates in k8s.

fixes #20091
This commit is contained in:
Harshavardhana
2024-08-12 01:38:15 -07:00
committed by GitHub
parent 909b169593
commit 2e0fd2cba9
19 changed files with 1487 additions and 275 deletions

View File

@@ -80,6 +80,14 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
return fi, nil, err
}
if readQuorum < 0 {
return fi, nil, errErasureReadQuorum
}
if writeQuorum < 0 {
return fi, nil, errErasureWriteQuorum
}
quorum := readQuorum
if write {
quorum = writeQuorum
@@ -88,14 +96,13 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
// List all online disks.
_, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, quorum)
var reducedErr error
if write {
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
} else {
reducedErr = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
err = reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
}
if reducedErr != nil {
return fi, nil, reducedErr
if err != nil {
return fi, nil, err
}
// Pick one from the first valid metadata.
@@ -490,9 +497,10 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
uploadIDPath := er.getUploadIDDir(bucket, object, uploadUUID)
// Write updated `xl.meta` to all disks.
if _, err := writeUniqueFileInfo(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
if _, err := writeAllMetadata(ctx, onlineDisks, bucket, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return nil, toObjectErr(err, bucket, object)
}
return &NewMultipartUploadResult{
UploadID: uploadID,
ChecksumAlgo: userDefined[hash.MinIOMultipartChecksum],
@@ -513,7 +521,7 @@ func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object
}
// renamePart - renames multipart part to its relevant location under uploadID.
func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, optsMeta []byte, writeQuorum int) ([]StorageAPI, error) {
g := errgroup.WithNErrs(len(disks))
// Rename file on all underlying storage disks.
@@ -523,60 +531,25 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].RenameFile(ctx, srcBucket, srcEntry, dstBucket, dstEntry)
return disks[index].RenamePart(ctx, srcBucket, srcEntry, dstBucket, dstEntry, optsMeta)
}, index)
}
// Wait for all renames to finish.
errs := g.Wait()
// Do not need to undo partial successful operation since those will be cleaned up
// in 24hrs via multipart cleaner, never rename() back to `.minio.sys/tmp` as there
// is no way to clean them.
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames.
return evalDisks(disks, errs), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
}
// writeAllDisks - writes 'b' to all provided disks.
// If write cannot reach quorum, the files will be deleted from all disks.
func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry string, b []byte, writeQuorum int) ([]StorageAPI, error) {
g := errgroup.WithNErrs(len(disks))
// Write file to all underlying storage disks.
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].WriteAll(ctx, dstBucket, dstEntry, b)
}, index)
paths := []string{
dstEntry,
dstEntry + ".meta",
}
// Wait for all renames to finish.
errs := g.Wait()
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames.
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if errors.Is(err, errErasureWriteQuorum) {
// Remove all written
g := errgroup.WithNErrs(len(disks))
for index := range disks {
if disks[index] == nil || errs[index] != nil {
continue
}
index := index
g.Go(func() error {
return disks[index].Delete(ctx, dstBucket, dstEntry, DeleteOptions{Immediate: true})
}, index)
}
// Ignore these errors.
g.WaitErr()
if err != nil {
er.cleanupMultipartPath(ctx, paths...)
}
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames.
return evalDisks(disks, errs), err
}
@@ -732,19 +705,6 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Rename temporary part file to its final location.
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
if err != nil {
if errors.Is(err, errFileNotFound) {
// An in-quorum errFileNotFound means that client stream
// prematurely closed and we do not find any xl.meta or
// part.1's - in such a scenario we must return as if client
// disconnected. This means that erasure.Encode() CreateFile()
// did not do anything.
return pi, IncompleteBody{Bucket: bucket, Object: object}
}
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
md5hex := r.MD5CurrentHexString()
if opts.PreserveETag != "" {
@@ -766,15 +726,22 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
Checksums: r.ContentCRC(),
}
fi.Parts = []ObjectPartInfo{partInfo}
partFI, err := fi.MarshalMsg(nil)
partFI, err := partInfo.MarshalMsg(nil)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Write part metadata to all disks.
onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", partFI, writeQuorum)
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
if err != nil {
if errors.Is(err, errFileNotFound) {
// An in-quorum errFileNotFound means that client stream
// prematurely closed and we do not find any xl.meta or
// part.1's - in such a scenario we must return as if client
// disconnected. This means that erasure.Encode() CreateFile()
// did not do anything.
return pi, IncompleteBody{Bucket: bucket, Object: object}
}
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
@@ -917,7 +884,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
g := errgroup.WithNErrs(len(req.Files)).WithConcurrency(32)
partsInfo := make([]ObjectPartInfo, len(req.Files))
partsInfo := make([]*ObjectPartInfo, len(req.Files))
for i, file := range req.Files {
file := file
partN := i + start
@@ -929,21 +896,17 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
return err
}
var pfi FileInfo
_, err = pfi.UnmarshalMsg(buf)
pinfo := &ObjectPartInfo{}
_, err = pinfo.UnmarshalMsg(buf)
if err != nil {
return err
}
if len(pfi.Parts) != 1 {
return errors.New("invalid number of parts expected 1, got 0")
if partN != pinfo.Number {
return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pinfo.Number)
}
if partN != pfi.Parts[0].Number {
return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pfi.Parts[0].Number)
}
partsInfo[i] = pfi.Parts[0]
partsInfo[i] = pinfo
return nil
}, i)
}
@@ -951,7 +914,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
g.Wait()
for _, part := range partsInfo {
if part.Number != 0 && !part.ModTime.IsZero() {
if part != nil && part.Number != 0 && !part.ModTime.IsZero() {
fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums)
}
}
@@ -987,6 +950,106 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
return result, nil
}
func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaPaths []string, partNumbers []int, readQuorum int) ([]*ObjectPartInfo, error) {
g := errgroup.WithNErrs(len(disks))
objectPartInfos := make([][]*ObjectPartInfo, len(disks))
// Rename file on all underlying storage disks.
for index := range disks {
index := index
g.Go(func() (err error) {
if disks[index] == nil {
return errDiskNotFound
}
objectPartInfos[index], err = disks[index].ReadParts(ctx, bucket, partMetaPaths...)
return err
}, index)
}
if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil {
return nil, err
}
partInfosInQuorum := make([]*ObjectPartInfo, len(partMetaPaths))
partMetaQuorumMap := make(map[string]int, len(partNumbers))
for pidx := range partMetaPaths {
var pinfos []*ObjectPartInfo
for idx := range disks {
if len(objectPartInfos[idx]) == 0 {
partMetaQuorumMap[partMetaPaths[pidx]]++
continue
}
pinfo := objectPartInfos[idx][pidx]
if pinfo == nil {
partMetaQuorumMap[partMetaPaths[pidx]]++
continue
}
if pinfo.ETag == "" {
partMetaQuorumMap[partMetaPaths[pidx]]++
} else {
pinfos = append(pinfos, pinfo)
partMetaQuorumMap[pinfo.ETag]++
}
}
var maxQuorum int
var maxETag string
var maxPartMeta string
for etag, quorum := range partMetaQuorumMap {
if maxQuorum < quorum {
maxQuorum = quorum
maxETag = etag
maxPartMeta = etag
}
}
var pinfo *ObjectPartInfo
for _, pinfo = range pinfos {
if pinfo != nil && maxETag != "" && pinfo.ETag == maxETag {
break
}
if maxPartMeta != "" && path.Base(maxPartMeta) == fmt.Sprintf("part.%d.meta", pinfo.Number) {
break
}
}
if pinfo != nil && pinfo.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum {
partInfosInQuorum[pidx] = pinfo
continue
}
if partMetaQuorumMap[maxPartMeta] == len(disks) {
if pinfo != nil && pinfo.Error != "" {
partInfosInQuorum[pidx] = &ObjectPartInfo{Error: pinfo.Error}
} else {
partInfosInQuorum[pidx] = &ObjectPartInfo{
Error: InvalidPart{
PartNumber: partNumbers[pidx],
}.Error(),
}
}
} else {
partInfosInQuorum[pidx] = &ObjectPartInfo{Error: errErasureReadQuorum.Error()}
}
}
return partInfosInQuorum, nil
}
func errStrToPartErr(errStr string) error {
if strings.Contains(errStr, "file not found") {
return InvalidPart{}
}
if strings.Contains(errStr, "Specified part could not be found") {
return InvalidPart{}
}
if strings.Contains(errStr, errErasureReadQuorum.Error()) {
return errErasureReadQuorum
}
return errors.New(errStr)
}
// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
@@ -1040,24 +1103,22 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
onlineDisks := er.getDisks()
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
readQuorum := fi.ReadQuorum(er.defaultRQuorum())
// Read Part info for all parts
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
req := ReadMultipleReq{
Bucket: minioMetaMultipartBucket,
Prefix: partPath,
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
Files: make([]string, 0, len(parts)),
AbortOn404: true,
MetadataOnly: true,
partMetaPaths := make([]string, len(parts))
partNumbers := make([]int, len(parts))
for idx, part := range parts {
partMetaPaths[idx] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part.PartNumber))
partNumbers[idx] = part.PartNumber
}
for _, part := range parts {
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))
}
partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum)
partInfoFiles, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths, partNumbers, readQuorum)
if err != nil {
return oi, err
}
if len(partInfoFiles) != len(parts) {
// Should only happen through internal error
err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts))
@@ -1119,35 +1180,22 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
opts.EncryptFn = metadataEncrypter(key)
}
for i, part := range partInfoFiles {
partID := parts[i].PartNumber
if part.Error != "" || !part.Exists {
return oi, InvalidPart{
PartNumber: partID,
}
}
var pfi FileInfo
_, err := pfi.UnmarshalMsg(part.Data)
if err != nil {
// Maybe crash or similar.
for idx, part := range partInfoFiles {
if part.Error != "" {
err = errStrToPartErr(part.Error)
bugLogIf(ctx, err)
return oi, InvalidPart{
PartNumber: partID,
}
return oi, err
}
partI := pfi.Parts[0]
partNumber := partI.Number
if partID != partNumber {
internalLogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partID, partID, partI.Number))
if parts[idx].PartNumber != part.Number {
internalLogIf(ctx, fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", parts[idx].PartNumber, parts[idx].PartNumber, part.Number))
return oi, InvalidPart{
PartNumber: partID,
PartNumber: part.Number,
}
}
// Add the current part.
fi.AddObjectPart(partI.Number, partI.ETag, partI.Size, partI.ActualSize, partI.ModTime, partI.Index, partI.Checksums)
fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums)
}
// Calculate full object size.