verify part.N exists before reading part.N.meta (#20383)

if part.N doesn't exist we do not have to complete
the multipart transaction, it simply means that we
have some partial upload situation at hand.
This commit is contained in:
Harshavardhana 2024-09-05 13:37:19 -07:00 committed by GitHub
parent 6be88a2b99
commit 85f08d7752
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 6 deletions

View File

@ -1106,7 +1106,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
readQuorum := fi.ReadQuorum(er.defaultRQuorum()) readQuorum := fi.ReadQuorum(er.defaultRQuorum())
// Read Part info for all parts // Read Part info for all parts
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" partPath := pathJoin(uploadIDPath, fi.DataDir) + SlashSeparator
partMetaPaths := make([]string, len(parts)) partMetaPaths := make([]string, len(parts))
partNumbers := make([]int, len(parts)) partNumbers := make([]int, len(parts))
for idx, part := range parts { for idx, part := range parts {

View File

@ -1097,13 +1097,13 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
var legacyJSON bool var legacyJSON bool
buf, err := xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) { buf, err := xioutil.WithDeadline[[]byte](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) ([]byte, error) {
buf, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile)) buf, _, err := s.readAllDataWithDMTime(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile))
if err != nil && !errors.Is(err, errFileNotFound) { if err != nil && !errors.Is(err, errFileNotFound) {
return nil, err return nil, err
} }
if errors.Is(err, errFileNotFound) && legacy { if errors.Is(err, errFileNotFound) && legacy {
buf, err = s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1)) buf, _, err = s.readAllDataWithDMTime(ctx, volume, volumeDir, pathJoin(volumeDir, path, xlStorageFormatFileV1))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3123,12 +3123,24 @@ func (s *xlStorage) ReadParts(ctx context.Context, volume string, partMetaPaths
parts := make([]*ObjectPartInfo, len(partMetaPaths)) parts := make([]*ObjectPartInfo, len(partMetaPaths))
for idx, partMetaPath := range partMetaPaths { for idx, partMetaPath := range partMetaPaths {
var partNumber int var partNumber int
fmt.Sscanf(pathutil.Dir(partMetaPath), "part.%d.meta", &partNumber) fmt.Sscanf(pathutil.Base(partMetaPath), "part.%d.meta", &partNumber)
if contextCanceled(ctx) { if contextCanceled(ctx) {
parts[idx] = &ObjectPartInfo{Error: ctx.Err().Error(), Number: partNumber} parts[idx] = &ObjectPartInfo{
Error: ctx.Err().Error(),
Number: partNumber,
}
continue continue
} }
if err := Access(pathJoin(volumeDir, pathutil.Dir(partMetaPath), fmt.Sprintf("part.%d", partNumber))); err != nil {
parts[idx] = &ObjectPartInfo{
Error: err.Error(),
Number: partNumber,
}
continue
}
data, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, partMetaPath)) data, err := s.readAllData(ctx, volume, volumeDir, pathJoin(volumeDir, partMetaPath))
if err != nil { if err != nil {
parts[idx] = &ObjectPartInfo{ parts[idx] = &ObjectPartInfo{
@ -3137,11 +3149,16 @@ func (s *xlStorage) ReadParts(ctx context.Context, volume string, partMetaPaths
} }
continue continue
} }
pinfo := &ObjectPartInfo{} pinfo := &ObjectPartInfo{}
if _, err = pinfo.UnmarshalMsg(data); err != nil { if _, err = pinfo.UnmarshalMsg(data); err != nil {
parts[idx] = &ObjectPartInfo{Error: err.Error(), Number: partNumber} parts[idx] = &ObjectPartInfo{
Error: err.Error(),
Number: partNumber,
}
continue continue
} }
parts[idx] = pinfo parts[idx] = pinfo
} }
diskHealthCheckOK(ctx, nil) diskHealthCheckOK(ctx, nil)