xl: Heal empty parts (#7860)

posix.VerifyFile() doesn't know how to check if a file
is corrupted if that file is empty. We do have the part
size in xl.json so we pass it to VerifyFile to return
an error so healing empty parts can work properly.
This commit is contained in:
Anis Elleuch 2019-07-13 00:29:44 +01:00 committed by kannappanr
parent bf278ca36f
commit 000a60f238
11 changed files with 65 additions and 18 deletions

View File

@ -196,9 +196,9 @@ func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error
return d.disk.ReadAll(volume, path)
}
func (d *naughtyDisk) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
func (d *naughtyDisk) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
if err := d.calcError(); err != nil {
return err
}
return d.disk.VerifyFile(volume, path, algo, sum, shardSize)
return d.disk.VerifyFile(volume, path, empty, algo, sum, shardSize)
}

View File

@ -1526,7 +1526,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
return nil
}
func (s *posix) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) {
func (s *posix) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
@ -1546,7 +1546,7 @@ func (s *posix) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte
return err
}
// Stat a volume entry.
_, err = os.Stat((volumeDir))
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
@ -1556,12 +1556,12 @@ func (s *posix) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte
// Validate effective path length before reading.
filePath := pathJoin(volumeDir, path)
if err = checkPathLength((filePath)); err != nil {
if err = checkPathLength(filePath); err != nil {
return err
}
// Open the file for reading.
file, err := os.Open((filePath))
file, err := os.Open(filePath)
if err != nil {
switch {
case os.IsNotExist(err):
@ -1601,6 +1601,11 @@ func (s *posix) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte
if err != nil {
return err
}
if empty && fi.Size() != 0 || !empty && fi.Size() == 0 {
return errFileUnexpectedSize
}
size := fi.Size()
for {
if size == 0 {

View File

@ -1797,7 +1797,7 @@ func TestPosixVerifyFile(t *testing.T) {
if err := posixStorage.WriteAll(volName, fileName, bytes.NewBuffer(data)); err != nil {
t.Fatal(err)
}
if err := posixStorage.VerifyFile(volName, fileName, algo, hashBytes, 0); err != nil {
if err := posixStorage.VerifyFile(volName, fileName, false, algo, hashBytes, 0); err != nil {
t.Fatal(err)
}
@ -1805,7 +1805,7 @@ func TestPosixVerifyFile(t *testing.T) {
if err := posixStorage.AppendFile(volName, fileName, []byte("a")); err != nil {
t.Fatal(err)
}
if err := posixStorage.VerifyFile(volName, fileName, algo, hashBytes, 0); err == nil {
if err := posixStorage.VerifyFile(volName, fileName, false, algo, hashBytes, 0); err == nil {
t.Fatal("expected to fail bitrot check")
}
@ -1835,7 +1835,7 @@ func TestPosixVerifyFile(t *testing.T) {
}
}
w.Close()
if err := posixStorage.VerifyFile(volName, fileName, algo, nil, shardSize); err != nil {
if err := posixStorage.VerifyFile(volName, fileName, false, algo, nil, shardSize); err != nil {
t.Fatal(err)
}
@ -1849,7 +1849,7 @@ func TestPosixVerifyFile(t *testing.T) {
t.Fatal(err)
}
f.Close()
if err := posixStorage.VerifyFile(volName, fileName, algo, nil, shardSize); err == nil {
if err := posixStorage.VerifyFile(volName, fileName, false, algo, nil, shardSize); err == nil {
t.Fatal("expected to fail bitrot check")
}
}

View File

@ -72,6 +72,9 @@ var errVolumeAccessDenied = errors.New("volume access denied")
// errFileAccessDenied - cannot access file, insufficient permissions.
var errFileAccessDenied = errors.New("file access denied")
// errFileUnexpectedSize - file has an unexpected size
var errFileUnexpectedSize = errors.New("file has unexpected size")
// errFileParentIsFile - cannot have overlapping objects, parent is already a file.
var errFileParentIsFile = errors.New("parent is a file")

View File

@ -52,7 +52,7 @@ type StorageAPI interface {
StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error)
DeleteFileBulk(volume string, paths []string) (errs []error, err error)
VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) error
VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) error
// Write all data, syncs the data to disk.
WriteAll(volume string, path string, reader io.Reader) (err error)

View File

@ -67,6 +67,8 @@ func toStorageErr(err error) error {
return io.EOF
case io.ErrUnexpectedEOF.Error():
return io.ErrUnexpectedEOF
case errFileUnexpectedSize.Error():
return errFileUnexpectedSize
case errUnexpected.Error():
return errUnexpected
case errDiskFull.Error():
@ -434,11 +436,12 @@ func (client *storageRESTClient) getInstanceID() (err error) {
return nil
}
func (client *storageRESTClient) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
func (client *storageRESTClient) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTBitrotAlgo, algo.String())
values.Set(storageRESTEmpty, strconv.FormatBool(empty))
values.Set(storageRESTLength, strconv.Itoa(int(shardSize)))
if len(sum) != 0 {
values.Set(storageRESTBitrotHash, hex.EncodeToString(sum))

View File

@ -16,7 +16,7 @@
package cmd
const storageRESTVersion = "v7"
const storageRESTVersion = "v8"
const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/"
const (
@ -52,6 +52,7 @@ const (
storageRESTDstPath = "destination-path"
storageRESTOffset = "offset"
storageRESTLength = "length"
storageRESTEmpty = "empty"
storageRESTCount = "count"
storageRESTMarkerPath = "marker"
storageRESTLeafFile = "leaf-file"

View File

@ -520,6 +520,11 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
empty, err := strconv.ParseBool(vars[storageRESTEmpty])
if err != nil {
s.writeErrorResponse(w, err)
return
}
shardSize, err := strconv.Atoi(vars[storageRESTLength])
if err != nil {
s.writeErrorResponse(w, err)
@ -542,7 +547,7 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
algo := BitrotAlgorithmFromString(algoStr)
w.Header().Set(xhttp.ContentType, "text/event-stream")
doneCh := sendWhiteSpaceVerifyFile(w)
err = s.storage.VerifyFile(volume, filePath, algo, hash, int64(shardSize))
err = s.storage.VerifyFile(volume, filePath, empty, algo, hash, int64(shardSize))
<-doneCh
gob.NewEncoder(w).Encode(VerifyFileResp{err})
}

View File

@ -183,10 +183,10 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
// it needs healing too.
for _, part := range partsMetadata[i].Parts {
checksumInfo := erasureInfo.GetChecksumInfo(part.Name)
err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), part.Size == 0, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
if err != nil {
isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ")
if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound {
if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound && err != errFileUnexpectedSize {
logger.LogIf(ctx, err)
}
dataErrs[i] = err

View File

@ -195,6 +195,9 @@ func shouldHealObjectOnDisk(xlErr, dataErr error, meta xlMetaV1, quorumModTime t
if dataErr == errFileNotFound {
return true
}
if dataErr == errFileUnexpectedSize {
return true
}
if _, ok := dataErr.(HashMismatchError); ok {
return true
}

View File

@ -107,7 +107,7 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Fatalf("Failed to complete multipart upload - %v", err)
}
// Remove the object backend files from the first disk.
// Test 1: Remove the object backend files from the first disk.
xl := obj.(*xlObjects)
firstDisk := xl.storageDisks[0]
err = firstDisk.DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile))
@ -125,7 +125,34 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Errorf("Expected xl.json file to be present but stat failed - %v", err)
}
// Delete xl.json from more than read quorum number of disks, to create a corrupted situation.
// Test 2: Heal when part.1 is empty
partSt1, err := firstDisk.StatFile(bucket, filepath.Join(object, "part.1"))
if err != nil {
t.Errorf("Expected part.1 file to be present but stat failed - %v", err)
}
err = firstDisk.DeleteFile(bucket, filepath.Join(object, "part.1"))
if err != nil {
t.Errorf("Failure during part.1 removal - %v", err)
}
err = firstDisk.AppendFile(bucket, filepath.Join(object, "part.1"), []byte{})
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
}
_, err = obj.HealObject(context.Background(), bucket, object, false, true, madmin.HealDeepScan)
if err != nil {
t.Errorf("Expected nil but received %v", err)
}
partSt2, err := firstDisk.StatFile(bucket, filepath.Join(object, "part.1"))
if err != nil {
t.Errorf("Expected from part.1 file to be present but stat failed - %v", err)
}
if partSt1.Size != partSt2.Size {
t.Errorf("part.1 file size is not the same before and after heal")
}
// Test 3: checks if HealObject returns an error when xl.json is not found
// in more than read quorum number of disks, to create a corrupted situation.
for i := 0; i <= len(xl.storageDisks)/2; i++ {
xl.storageDisks[i].DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile))
}