fix: remove parent dirs in RenameData upon failure (#12452)

- it is possible that during I/O failures we might
  leave partially written directories, make sure
  we purge them after.

- rename current data-dir (null) versionId only after
  the newer xl.meta has been written fully.

- attempt removal once for minioMetaTmpBucket/uuid/
  as this folder is empty if all previous operations
  were successful, this allows avoiding recursive os.Remove()
This commit is contained in:
Harshavardhana 2021-06-07 09:35:08 -07:00 committed by GitHub
parent 403f4b9c84
commit dd2831c1a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 15 deletions

View File

@ -356,10 +356,10 @@ func testStorageAPIDeleteFile(t *testing.T, storage StorageAPI) {
expectErr bool expectErr bool
}{ }{
{"foo", "myobject", false}, {"foo", "myobject", false},
// should removed by above case. // file not found not returned
{"foo", "myobject", true}, {"foo", "myobject", false},
// file not found error // file not found not returned
{"foo", "yourobject", true}, {"foo", "yourobject", false},
} }
for i, testCase := range testCases { for i, testCase := range testCases {

View File

@ -1802,7 +1802,7 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) erro
// error on parent directories. // error on parent directories.
return nil return nil
case osIsNotExist(err): case osIsNotExist(err):
return errFileNotFound return nil
case osIsPermission(err): case osIsPermission(err):
return errFileAccessDenied return errFileAccessDenied
case isSysErrIO(err): case isSysErrIO(err):
@ -2013,6 +2013,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
} }
} }
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
if legacyPreserved { if legacyPreserved {
// Preserve all the legacy data, could be slow, but at max there can be 10,000 parts. // Preserve all the legacy data, could be slow, but at max there can be 10,000 parts.
currentDataPath := pathJoin(dstVolumeDir, dstPath) currentDataPath := pathJoin(dstVolumeDir, dstPath)
@ -2021,9 +2022,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
return osErrToFileErr(err) return osErrToFileErr(err)
} }
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
// legacy data dir means its old content, honor system umask. // legacy data dir means its old content, honor system umask.
if err = mkdirAll(legacyDataPath, 0777); err != nil { if err = mkdirAll(legacyDataPath, 0777); err != nil {
// any failed mkdir-calls delete them.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
return osErrToFileErr(err) return osErrToFileErr(err)
} }
@ -2034,6 +2036,9 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
} }
if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil { if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
return osErrToFileErr(err) return osErrToFileErr(err)
} }
} }
@ -2054,28 +2059,42 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
} }
if err = xlMeta.AddVersion(fi); err != nil { if err = xlMeta.AddVersion(fi); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
return err return err
} }
dstBuf, err = xlMeta.AppendTo(nil) dstBuf, err = xlMeta.AppendTo(nil)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
return errFileCorrupt return errFileCorrupt
} }
if srcDataPath != "" { if srcDataPath != "" {
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil { if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
return err return err
} }
if oldDstDataPath != "" {
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
}
// renameAll only for objects that have xl.meta not saved inline. // renameAll only for objects that have xl.meta not saved inline.
if len(fi.Data) == 0 && fi.Size > 0 { if len(fi.Data) == 0 && fi.Size > 0 {
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID())) renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
if err = renameAll(srcDataPath, dstDataPath); err != nil { if err = renameAll(srcDataPath, dstDataPath); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
s.deleteFile(dstVolumeDir, dstDataPath, false)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return osErrToFileErr(err) return osErrToFileErr(err)
} }
@ -2083,20 +2102,41 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// Commit meta-file // Commit meta-file
if err = renameAll(srcFilePath, dstFilePath); err != nil { if err = renameAll(srcFilePath, dstFilePath); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
s.deleteFile(dstVolumeDir, dstFilePath, false)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return osErrToFileErr(err) return osErrToFileErr(err)
} }
// additionally only purge older data at the end of the transaction of new data-dir
// movement, this is to ensure that previous data references can co-exist for
// any recoverability.
if oldDstDataPath != "" {
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
}
} else { } else {
// Write meta-file directly, no data // Write meta-file directly, no data
if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil { if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
}
s.deleteFile(dstVolumeDir, dstFilePath, false)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return err return err
} }
} }
// Remove parent dir of the source file if empty // srcFilePath is always in minioMetaTmpBucket, an attempt to
parentDir := pathutil.Dir(srcFilePath) // remove the temporary folder is enough since at this point
s.deleteFile(srcVolumeDir, parentDir, false) // ideally all transaction should be complete.
Remove(pathutil.Dir(srcFilePath))
return nil return nil
} }

View File

@ -966,11 +966,11 @@ func TestXLStorageDeleteFile(t *testing.T) {
expectedErr: nil, expectedErr: nil,
}, },
// TestXLStorage case - 2. // TestXLStorage case - 2.
// The file was deleted in the last case, so Delete should fail. // The file was deleted in the last case, so Delete should not fail.
{ {
srcVol: "success-vol", srcVol: "success-vol",
srcPath: "success-file", srcPath: "success-file",
expectedErr: errFileNotFound, expectedErr: nil,
}, },
// TestXLStorage case - 3. // TestXLStorage case - 3.
// TestXLStorage case with segment of the volume name > 255. // TestXLStorage case with segment of the volume name > 255.