Avoid network transfer for bitrot verification during healing (#7375)

This commit is contained in:
Krishna Srinivas 2019-07-08 13:51:18 -07:00 committed by Dee Koder
parent e857b6741d
commit 58d90ed73c
13 changed files with 317 additions and 51 deletions

View File

@ -131,7 +131,7 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
b.h.Write(buf) b.h.Write(buf)
if !bytes.Equal(b.h.Sum(nil), b.hashBytes) { if !bytes.Equal(b.h.Sum(nil), b.hashBytes) {
err = hashMismatchError{hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil))} err = HashMismatchError{hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil))}
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
return 0, err return 0, err
} }

View File

@ -155,33 +155,3 @@ func bitrotWriterSum(w io.Writer) []byte {
} }
return nil return nil
} }
// Verify if a file has bitrot error.
func bitrotCheckFile(disk StorageAPI, volume string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) {
if algo != HighwayHash256S {
buf := []byte{}
// For whole-file bitrot we don't need to read the entire file as the bitrot verify happens on the server side even if we read 0-bytes.
_, err = disk.ReadFile(volume, filePath, 0, buf, NewBitrotVerifier(algo, sum))
return err
}
buf := make([]byte, shardSize)
r := newStreamingBitrotReader(disk, volume, filePath, tillOffset, algo, shardSize)
defer closeBitrotReaders([]io.ReaderAt{r})
var offset int64
for {
if offset == tillOffset {
break
}
var n int
tmpBuf := buf
if int64(len(tmpBuf)) > (tillOffset - offset) {
tmpBuf = tmpBuf[:(tillOffset - offset)]
}
n, err = r.ReadAt(tmpBuf, offset)
if err != nil {
return err
}
offset += int64(n)
}
return nil
}

View File

@ -195,3 +195,10 @@ func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error
} }
return d.disk.ReadAll(volume, path) return d.disk.ReadAll(volume, path)
} }
func (d *naughtyDisk) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
if err := d.calcError(); err != nil {
return err
}
return d.disk.VerifyFile(volume, path, algo, sum, shardSize)
}

View File

@ -958,7 +958,7 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
} }
if !bytes.Equal(h.Sum(nil), verifier.sum) { if !bytes.Equal(h.Sum(nil), verifier.sum) {
return 0, hashMismatchError{hex.EncodeToString(verifier.sum), hex.EncodeToString(h.Sum(nil))} return 0, HashMismatchError{hex.EncodeToString(verifier.sum), hex.EncodeToString(h.Sum(nil))}
} }
return int64(len(buffer)), nil return int64(len(buffer)), nil
@ -1525,3 +1525,104 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
return nil return nil
} }
func (s *posix) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
// Stat a volume entry.
_, err = os.Stat((volumeDir))
if err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
}
return err
}
// Validate effective path length before reading.
filePath := pathJoin(volumeDir, path)
if err = checkPathLength((filePath)); err != nil {
return err
}
// Open the file for reading.
file, err := os.Open((filePath))
if err != nil {
switch {
case os.IsNotExist(err):
return errFileNotFound
case os.IsPermission(err):
return errFileAccessDenied
case isSysErrNotDir(err):
return errFileAccessDenied
case isSysErrIO(err):
return errFaultyDisk
default:
return err
}
}
// Close the file descriptor.
defer file.Close()
if algo != HighwayHash256S {
bufp := s.pool.Get().(*[]byte)
defer s.pool.Put(bufp)
h := algo.New()
if _, err = io.CopyBuffer(h, file, *bufp); err != nil {
return err
}
if !bytes.Equal(h.Sum(nil), sum) {
return HashMismatchError{hex.EncodeToString(sum), hex.EncodeToString(h.Sum(nil))}
}
return nil
}
buf := make([]byte, shardSize)
h := algo.New()
hashBuf := make([]byte, h.Size())
fi, err := file.Stat()
if err != nil {
return err
}
size := fi.Size()
for {
if size == 0 {
return nil
}
h.Reset()
n, err := file.Read(hashBuf)
if err != nil {
return err
}
size -= int64(n)
if size < int64(len(buf)) {
buf = buf[:size]
}
n, err = file.Read(buf)
if err != nil {
return err
}
size -= int64(n)
h.Write(buf)
if !bytes.Equal(h.Sum(nil), hashBuf) {
return HashMismatchError{hex.EncodeToString(hashBuf), hex.EncodeToString(h.Sum(nil))}
}
}
}

View File

@ -1251,13 +1251,13 @@ var posixReadFileWithVerifyTests = []struct {
{file: "myobject", offset: 25, length: 74, algorithm: SHA256, expError: nil}, // 1 {file: "myobject", offset: 25, length: 74, algorithm: SHA256, expError: nil}, // 1
{file: "myobject", offset: 29, length: 70, algorithm: SHA256, expError: nil}, // 2 {file: "myobject", offset: 29, length: 70, algorithm: SHA256, expError: nil}, // 2
{file: "myobject", offset: 100, length: 0, algorithm: SHA256, expError: nil}, // 3 {file: "myobject", offset: 100, length: 0, algorithm: SHA256, expError: nil}, // 3
{file: "myobject", offset: 1, length: 120, algorithm: SHA256, expError: hashMismatchError{}}, // 4 {file: "myobject", offset: 1, length: 120, algorithm: SHA256, expError: HashMismatchError{}}, // 4
{file: "myobject", offset: 3, length: 1100, algorithm: SHA256, expError: nil}, // 5 {file: "myobject", offset: 3, length: 1100, algorithm: SHA256, expError: nil}, // 5
{file: "myobject", offset: 2, length: 100, algorithm: SHA256, expError: hashMismatchError{}}, // 6 {file: "myobject", offset: 2, length: 100, algorithm: SHA256, expError: HashMismatchError{}}, // 6
{file: "myobject", offset: 1000, length: 1001, algorithm: SHA256, expError: nil}, // 7 {file: "myobject", offset: 1000, length: 1001, algorithm: SHA256, expError: nil}, // 7
{file: "myobject", offset: 0, length: 100, algorithm: BLAKE2b512, expError: hashMismatchError{}}, // 8 {file: "myobject", offset: 0, length: 100, algorithm: BLAKE2b512, expError: HashMismatchError{}}, // 8
{file: "myobject", offset: 25, length: 74, algorithm: BLAKE2b512, expError: nil}, // 9 {file: "myobject", offset: 25, length: 74, algorithm: BLAKE2b512, expError: nil}, // 9
{file: "myobject", offset: 29, length: 70, algorithm: BLAKE2b512, expError: hashMismatchError{}}, // 10 {file: "myobject", offset: 29, length: 70, algorithm: BLAKE2b512, expError: HashMismatchError{}}, // 10
{file: "myobject", offset: 100, length: 0, algorithm: BLAKE2b512, expError: nil}, // 11 {file: "myobject", offset: 100, length: 0, algorithm: BLAKE2b512, expError: nil}, // 11
{file: "myobject", offset: 1, length: 120, algorithm: BLAKE2b512, expError: nil}, // 12 {file: "myobject", offset: 1, length: 120, algorithm: BLAKE2b512, expError: nil}, // 12
{file: "myobject", offset: 3, length: 1100, algorithm: BLAKE2b512, expError: nil}, // 13 {file: "myobject", offset: 3, length: 1100, algorithm: BLAKE2b512, expError: nil}, // 13
@ -1296,7 +1296,7 @@ func TestPosixReadFileWithVerify(t *testing.T) {
if test.expError != nil { if test.expError != nil {
expected := h.Sum(nil) expected := h.Sum(nil)
h.Write([]byte{0}) h.Write([]byte{0})
test.expError = hashMismatchError{hex.EncodeToString(h.Sum(nil)), hex.EncodeToString(expected)} test.expError = HashMismatchError{hex.EncodeToString(h.Sum(nil)), hex.EncodeToString(expected)}
} }
buffer := make([]byte, test.length) buffer := make([]byte, test.length)
@ -1763,6 +1763,97 @@ func TestPosixStatFile(t *testing.T) {
} }
} }
// Test posix.VerifyFile()
func TestPosixVerifyFile(t *testing.T) {
// We test 4 cases:
// 1) Whole-file bitrot check on proper file
// 2) Whole-file bitrot check on corrupted file
// 3) Streaming bitrot check on proper file
// 4) Streaming bitrot check on corrupted file
// create posix test setup
posixStorage, path, err := newPosixTestSetup()
if err != nil {
t.Fatalf("Unable to create posix test setup, %s", err)
}
defer os.RemoveAll(path)
volName := "testvol"
fileName := "testfile"
if err := posixStorage.MakeVol(volName); err != nil {
t.Fatal(err)
}
// 1) Whole-file bitrot check on proper file
size := int64(4*1024*1024 + 100*1024) // 4.1 MB
data := make([]byte, size)
if _, err := rand.Read(data); err != nil {
t.Fatal(err)
}
algo := HighwayHash256
h := algo.New()
h.Write(data)
hashBytes := h.Sum(nil)
if err := posixStorage.WriteAll(volName, fileName, bytes.NewBuffer(data)); err != nil {
t.Fatal(err)
}
if err := posixStorage.VerifyFile(volName, fileName, algo, hashBytes, 0); err != nil {
t.Fatal(err)
}
// 2) Whole-file bitrot check on corrupted file
if err := posixStorage.AppendFile(volName, fileName, []byte("a")); err != nil {
t.Fatal(err)
}
if err := posixStorage.VerifyFile(volName, fileName, algo, hashBytes, 0); err == nil {
t.Fatal("expected to fail bitrot check")
}
if err := posixStorage.DeleteFile(volName, fileName); err != nil {
t.Fatal(err)
}
// 3) Streaming bitrot check on proper file
algo = HighwayHash256S
shardSize := int64(1024 * 1024)
shard := make([]byte, shardSize)
w := newStreamingBitrotWriter(posixStorage, volName, fileName, size, algo, shardSize)
reader := bytes.NewReader(data)
for {
// Using io.CopyBuffer instead of this loop will not work for us as io.CopyBuffer
// will use bytes.Buffer.ReadFrom() which will not do shardSize'ed writes causing error.
n, err := reader.Read(shard)
w.Write(shard[:n])
if err == nil {
continue
}
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
}
w.Close()
if err := posixStorage.VerifyFile(volName, fileName, algo, nil, shardSize); err != nil {
t.Fatal(err)
}
// 4) Streaming bitrot check on corrupted file
filePath := pathJoin(posixStorage.String(), volName, fileName)
f, err := os.OpenFile(filePath, os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
if _, err := f.WriteString("a"); err != nil {
t.Fatal(err)
}
f.Close()
if err := posixStorage.VerifyFile(volName, fileName, algo, nil, shardSize); err == nil {
t.Fatal("expected to fail bitrot check")
}
}
// Checks for restrictions for min total disk space and inodes. // Checks for restrictions for min total disk space and inodes.
func TestCheckDiskTotalMin(t *testing.T) { func TestCheckDiskTotalMin(t *testing.T) {
testCases := []struct { testCases := []struct {

View File

@ -18,6 +18,7 @@ package cmd
import ( import (
"context" "context"
"encoding/gob"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -36,6 +37,7 @@ import (
func init() { func init() {
logger.Init(GOPATH, GOROOT) logger.Init(GOPATH, GOROOT)
logger.RegisterUIError(fmtError) logger.RegisterUIError(fmtError)
gob.Register(HashMismatchError{})
} }
// ServerFlags - server command specific flags // ServerFlags - server command specific flags

View File

@ -91,18 +91,17 @@ var errLessData = errors.New("less data available than what was requested")
// errMoreData = returned when more data was sent by the caller than what it was supposed to. // errMoreData = returned when more data was sent by the caller than what it was supposed to.
var errMoreData = errors.New("more data was sent than what was advertised") var errMoreData = errors.New("more data was sent than what was advertised")
// hashMisMatchError - represents a bit-rot hash verification failure // HashMismatchError represents a bit-rot hash verification failure error.
// error. type HashMismatchError struct {
type hashMismatchError struct { Expected string
expected string Computed string
computed string
} }
// error method for the hashMismatchError // Error method for the hashMismatchError
func (h hashMismatchError) Error() string { func (h HashMismatchError) Error() string {
return fmt.Sprintf( return fmt.Sprintf(
"Bitrot verification mismatch - expected %v, received %v", "Bitrot verification mismatch - expected %v, received %v",
h.expected, h.computed) h.Expected, h.Computed)
} }
// Collection of basic errors. // Collection of basic errors.

View File

@ -52,6 +52,7 @@ type StorageAPI interface {
StatFile(volume string, path string) (file FileInfo, err error) StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error) DeleteFile(volume string, path string) (err error)
DeleteFileBulk(volume string, paths []string) (errs []error, err error) DeleteFileBulk(volume string, paths []string) (errs []error, err error)
VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) error
// Write all data, syncs the data to disk. // Write all data, syncs the data to disk.
WriteAll(volume string, path string, reader io.Reader) (err error) WriteAll(volume string, path string, reader io.Reader) (err error)

View File

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"bufio"
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"io" "io"
@ -105,7 +106,7 @@ func toStorageErr(err error) error {
fmt.Sscanf(err.Error(), "Bitrot verification mismatch - expected %s received %s", &expected, &received) fmt.Sscanf(err.Error(), "Bitrot verification mismatch - expected %s received %s", &expected, &received)
// Go's Sscanf %s scans "," that comes after the expected hash, hence remove it. Providing "," in the format string does not help. // Go's Sscanf %s scans "," that comes after the expected hash, hence remove it. Providing "," in the format string does not help.
expected = strings.TrimSuffix(expected, ",") expected = strings.TrimSuffix(expected, ",")
bitrotErr := hashMismatchError{expected, received} bitrotErr := HashMismatchError{expected, received}
return bitrotErr return bitrotErr
} }
return err return err
@ -433,6 +434,39 @@ func (client *storageRESTClient) getInstanceID() (err error) {
return nil return nil
} }
func (client *storageRESTClient) VerifyFile(volume, path string, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTBitrotAlgo, algo.String())
values.Set(storageRESTLength, strconv.Itoa(int(shardSize)))
if len(sum) != 0 {
values.Set(storageRESTBitrotHash, hex.EncodeToString(sum))
}
respBody, err := client.call(storageRESTMethodVerifyFile, values, nil, -1)
defer http.DrainBody(respBody)
if err != nil {
return err
}
reader := bufio.NewReader(respBody)
for {
b, err := reader.ReadByte()
if err != nil {
return err
}
if b != ' ' {
reader.UnreadByte()
break
}
}
verifyResp := &VerifyFileResp{}
err = gob.NewDecoder(reader).Decode(verifyResp)
if err != nil {
return err
}
return toStorageErr(verifyResp.Err)
}
// Close - marks the client as closed. // Close - marks the client as closed.
func (client *storageRESTClient) Close() error { func (client *storageRESTClient) Close() error {
client.connected = false client.connected = false

View File

@ -16,7 +16,7 @@
package cmd package cmd
const storageRESTVersion = "v6" const storageRESTVersion = "v7"
const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/" const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/"
const ( const (
@ -38,6 +38,7 @@ const (
storageRESTMethodDeleteFile = "deletefile" storageRESTMethodDeleteFile = "deletefile"
storageRESTMethodDeleteFileBulk = "deletefilebulk" storageRESTMethodDeleteFileBulk = "deletefilebulk"
storageRESTMethodRenameFile = "renamefile" storageRESTMethodRenameFile = "renamefile"
storageRESTMethodVerifyFile = "verifyfile"
storageRESTMethodGetInstanceID = "getinstanceid" storageRESTMethodGetInstanceID = "getinstanceid"
) )

View File

@ -487,6 +487,65 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
} }
} }
// Send whitespace to the client to avoid timeouts as bitrot verification can take time on spinning/slow disks.
func sendWhiteSpaceVerifyFile(w http.ResponseWriter) <-chan struct{} {
doneCh := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
w.Write([]byte(" "))
w.(http.Flusher).Flush()
case doneCh <- struct{}{}:
ticker.Stop()
return
}
}
}()
return doneCh
}
// VerifyFileResp - VerifyFile()'s response.
type VerifyFileResp struct {
Err error
}
// VerifyFile - Verify the file for bitrot errors.
func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
shardSize, err := strconv.Atoi(vars[storageRESTLength])
if err != nil {
s.writeErrorResponse(w, err)
return
}
hashStr := vars[storageRESTBitrotHash]
var hash []byte
if hashStr != "" {
hash, err = hex.DecodeString(hashStr)
if err != nil {
s.writeErrorResponse(w, err)
return
}
}
algoStr := vars[storageRESTBitrotAlgo]
if algoStr == "" {
s.writeErrorResponse(w, errInvalidArgument)
return
}
algo := BitrotAlgorithmFromString(algoStr)
doneCh := sendWhiteSpaceVerifyFile(w)
err = s.storage.VerifyFile(volume, filePath, algo, hash, int64(shardSize))
<-doneCh
gob.NewEncoder(w).Encode(VerifyFileResp{err})
}
// registerStorageRPCRouter - register storage rpc router. // registerStorageRPCRouter - register storage rpc router.
func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) { func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
@ -534,6 +593,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)). subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...) Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFile)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTLength)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID)) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID))
} }

View File

@ -183,8 +183,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
// it needs healing too. // it needs healing too.
for _, part := range partsMetadata[i].Parts { for _, part := range partsMetadata[i].Parts {
checksumInfo := erasureInfo.GetChecksumInfo(part.Name) checksumInfo := erasureInfo.GetChecksumInfo(part.Name)
tillOffset := erasure.ShardFileTillOffset(0, part.Size, part.Size) err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
err = bitrotCheckFile(onlineDisk, bucket, pathJoin(object, part.Name), tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
if err != nil { if err != nil {
isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ") isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ")
if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound { if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound {

View File

@ -195,7 +195,7 @@ func shouldHealObjectOnDisk(xlErr, dataErr error, meta xlMetaV1, quorumModTime t
if dataErr == errFileNotFound { if dataErr == errFileNotFound {
return true return true
} }
if _, ok := dataErr.(hashMismatchError); ok { if _, ok := dataErr.(HashMismatchError); ok {
return true return true
} }
if quorumModTime != meta.Stat.ModTime { if quorumModTime != meta.Stat.ModTime {
@ -351,7 +351,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
} }
// Reorder so that we have data disks first and parity disks next. // Reorder so that we have data disks first and parity disks next.
latestDisks = shuffleDisks(latestDisks, latestMeta.Erasure.Distribution) latestDisks = shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution) outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution) partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
for i := range outDatedDisks { for i := range outDatedDisks {