bitrot: Verify file size inside storage interface (#7932)

This commit is contained in:
Anis Elleuch 2019-09-11 21:49:53 +01:00 committed by kannappanr
parent 3d65dc8d94
commit 3f258062d8
9 changed files with 39 additions and 19 deletions

View File

@ -155,3 +155,11 @@ func bitrotWriterSum(w io.Writer) []byte {
} }
return nil return nil
} }
// Returns the size of the file with bitrot protection
func bitrotShardFileSize(size int64, shardSize int64, algo BitrotAlgorithm) int64 {
if algo != HighwayHash256S {
return size
}
return ceilFrac(size, shardSize)*int64(algo.New().Size()) + size
}

View File

@ -196,9 +196,9 @@ func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error
return d.disk.ReadAll(volume, path) return d.disk.ReadAll(volume, path)
} }
func (d *naughtyDisk) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) error { func (d *naughtyDisk) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return err return err
} }
return d.disk.VerifyFile(volume, path, empty, algo, sum, shardSize) return d.disk.VerifyFile(volume, path, size, algo, sum, shardSize)
} }

View File

@ -1541,7 +1541,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
return nil return nil
} }
func (s *posix) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) { func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) {
defer func() { defer func() {
if err == errFaultyDisk { if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1) atomic.AddInt32(&s.ioErrCount, 1)
@ -1617,7 +1617,9 @@ func (s *posix) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm
return err return err
} }
if empty && fi.Size() != 0 || !empty && fi.Size() == 0 { // Calculate the size of the bitrot file and compare
// it with the actual file size.
if fi.Size() != bitrotShardFileSize(fileSize, shardSize, algo) {
return errFileUnexpectedSize return errFileUnexpectedSize
} }

View File

@ -1797,7 +1797,7 @@ func TestPosixVerifyFile(t *testing.T) {
if err := posixStorage.WriteAll(volName, fileName, bytes.NewBuffer(data)); err != nil { if err := posixStorage.WriteAll(volName, fileName, bytes.NewBuffer(data)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := posixStorage.VerifyFile(volName, fileName, false, algo, hashBytes, 0); err != nil { if err := posixStorage.VerifyFile(volName, fileName, size, algo, hashBytes, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1805,7 +1805,14 @@ func TestPosixVerifyFile(t *testing.T) {
if err := posixStorage.AppendFile(volName, fileName, []byte("a")); err != nil { if err := posixStorage.AppendFile(volName, fileName, []byte("a")); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := posixStorage.VerifyFile(volName, fileName, false, algo, hashBytes, 0); err == nil {
// Check if VerifyFile reports the incorrect file length (the correct length is `size+1`)
if err := posixStorage.VerifyFile(volName, fileName, size, algo, hashBytes, 0); err == nil {
t.Fatal("expected to fail bitrot check")
}
// Check if bitrot fails
if err := posixStorage.VerifyFile(volName, fileName, size+1, algo, hashBytes, 0); err == nil {
t.Fatal("expected to fail bitrot check") t.Fatal("expected to fail bitrot check")
} }
@ -1833,7 +1840,7 @@ func TestPosixVerifyFile(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
w.Close() w.Close()
if err := posixStorage.VerifyFile(volName, fileName, false, algo, nil, shardSize); err != nil { if err := posixStorage.VerifyFile(volName, fileName, size, algo, nil, shardSize); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1847,7 +1854,10 @@ func TestPosixVerifyFile(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
f.Close() f.Close()
if err := posixStorage.VerifyFile(volName, fileName, false, algo, nil, shardSize); err == nil { if err := posixStorage.VerifyFile(volName, fileName, size, algo, nil, shardSize); err == nil {
t.Fatal("expected to fail bitrot check")
}
if err := posixStorage.VerifyFile(volName, fileName, size+1, algo, nil, shardSize); err == nil {
t.Fatal("expected to fail bitrot check") t.Fatal("expected to fail bitrot check")
} }
} }

View File

@ -52,7 +52,7 @@ type StorageAPI interface {
StatFile(volume string, path string) (file FileInfo, err error) StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error) DeleteFile(volume string, path string) (err error)
DeleteFileBulk(volume string, paths []string) (errs []error, err error) DeleteFileBulk(volume string, paths []string) (errs []error, err error)
VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) error VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error
// Write all data, syncs the data to disk. // Write all data, syncs the data to disk.
WriteAll(volume string, path string, reader io.Reader) (err error) WriteAll(volume string, path string, reader io.Reader) (err error)

View File

@ -439,13 +439,13 @@ func (client *storageRESTClient) getInstanceID() (err error) {
return nil return nil
} }
func (client *storageRESTClient) VerifyFile(volume, path string, empty bool, algo BitrotAlgorithm, sum []byte, shardSize int64) error { func (client *storageRESTClient) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
values.Set(storageRESTBitrotAlgo, algo.String()) values.Set(storageRESTBitrotAlgo, algo.String())
values.Set(storageRESTEmpty, strconv.FormatBool(empty)) values.Set(storageRESTLength, strconv.FormatInt(size, 10))
values.Set(storageRESTLength, strconv.Itoa(int(shardSize))) values.Set(storageRESTShardSize, strconv.Itoa(int(shardSize)))
if len(sum) != 0 { if len(sum) != 0 {
values.Set(storageRESTBitrotHash, hex.EncodeToString(sum)) values.Set(storageRESTBitrotHash, hex.EncodeToString(sum))
} }

View File

@ -16,7 +16,7 @@
package cmd package cmd
const storageRESTVersion = "v8" const storageRESTVersion = "v9"
const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + SlashSeparator const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + SlashSeparator
const ( const (
@ -52,7 +52,7 @@ const (
storageRESTDstPath = "destination-path" storageRESTDstPath = "destination-path"
storageRESTOffset = "offset" storageRESTOffset = "offset"
storageRESTLength = "length" storageRESTLength = "length"
storageRESTEmpty = "empty" storageRESTShardSize = "shard-size"
storageRESTCount = "count" storageRESTCount = "count"
storageRESTMarkerPath = "marker" storageRESTMarkerPath = "marker"
storageRESTLeafFile = "leaf-file" storageRESTLeafFile = "leaf-file"

View File

@ -520,12 +520,12 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
volume := vars[storageRESTVolume] volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath] filePath := vars[storageRESTFilePath]
empty, err := strconv.ParseBool(vars[storageRESTEmpty]) size, err := strconv.ParseInt(vars[storageRESTLength], 10, 0)
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
shardSize, err := strconv.Atoi(vars[storageRESTLength]) shardSize, err := strconv.Atoi(vars[storageRESTShardSize])
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
@ -547,7 +547,7 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
algo := BitrotAlgorithmFromString(algoStr) algo := BitrotAlgorithmFromString(algoStr)
w.Header().Set(xhttp.ContentType, "text/event-stream") w.Header().Set(xhttp.ContentType, "text/event-stream")
doneCh := sendWhiteSpaceVerifyFile(w) doneCh := sendWhiteSpaceVerifyFile(w)
err = s.storage.VerifyFile(volume, filePath, empty, algo, hash, int64(shardSize)) err = s.storage.VerifyFile(volume, filePath, size, algo, hash, int64(shardSize))
<-doneCh <-doneCh
gob.NewEncoder(w).Encode(VerifyFileResp{err}) gob.NewEncoder(w).Encode(VerifyFileResp{err})
} }
@ -600,7 +600,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)). subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...) Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFile)). subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFile)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTLength, storageRESTEmpty)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTLength, storageRESTShardSize)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID)) subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID))
} }

View File

@ -183,7 +183,7 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
// it needs healing too. // it needs healing too.
for _, part := range partsMetadata[i].Parts { for _, part := range partsMetadata[i].Parts {
checksumInfo := erasureInfo.GetChecksumInfo(part.Name) checksumInfo := erasureInfo.GetChecksumInfo(part.Name)
err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), part.Size == 0, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) err = onlineDisk.VerifyFile(bucket, pathJoin(object, part.Name), erasure.ShardFileSize(part.Size), checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
if err != nil { if err != nil {
isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ") isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ")
if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound && err != errFileUnexpectedSize { if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound && err != errFileUnexpectedSize {