change fs.json format to include checksum fields (#5685)

This commit is contained in:
poornas 2018-03-27 17:23:10 -07:00 committed by kannappanr
parent 3ebe61abdf
commit 76d1e8bbcd
3 changed files with 115 additions and 28 deletions

View File

@ -17,6 +17,8 @@
package cmd
import (
"encoding/hex"
"encoding/json"
"io"
"io/ioutil"
"os"
@ -41,37 +43,89 @@ const (
fsMetaVersion100 = "1.0.0"
// FS backend meta 1.0.1 version.
fsMetaVersion = "1.0.1"
fsMetaVersion101 = "1.0.1"
// FS backend meta format.
fsMetaFormat = "fs"
// FS backend meta 1.0.2
// Removed the fields "Format" and "Minio" from fsMetaV1 as they were unused. Added "Checksum" field - to be used in future for bit-rot protection.
fsMetaVersion = "1.0.2"
// Add more constants here.
)
// FSChecksumInfoV1 - carries checksums of individual blocks on disk.
type FSChecksumInfoV1 struct {
Algorithm string
Blocksize int64
Hashes [][]byte
}
// MarshalJSON marshals the FSChecksumInfoV1 struct
func (c FSChecksumInfoV1) MarshalJSON() ([]byte, error) {
type checksuminfo struct {
Algorithm string `json:"algorithm"`
Blocksize int64 `json:"blocksize"`
Hashes []string `json:"hashes"`
}
var hashes []string
for _, h := range c.Hashes {
hashes = append(hashes, hex.EncodeToString(h))
}
info := checksuminfo{
Algorithm: c.Algorithm,
Hashes: hashes,
Blocksize: c.Blocksize,
}
return json.Marshal(info)
}
// UnmarshalJSON unmarshals the the given data into the FSChecksumInfoV1 struct
func (c *FSChecksumInfoV1) UnmarshalJSON(data []byte) error {
type checksuminfo struct {
Algorithm string `json:"algorithm"`
Blocksize int64 `json:"blocksize"`
Hashes []string `json:"hashes"`
}
var info checksuminfo
err := json.Unmarshal(data, &info)
if err != nil {
return err
}
c.Algorithm = info.Algorithm
c.Blocksize = info.Blocksize
var hashes [][]byte
for _, hashStr := range info.Hashes {
h, err := hex.DecodeString(hashStr)
if err != nil {
return err
}
hashes = append(hashes, h)
}
c.Hashes = hashes
return nil
}
// A fsMetaV1 represents a metadata header mapping keys to sets of values.
type fsMetaV1 struct {
Version string `json:"version"`
Format string `json:"format"`
Minio struct {
Release string `json:"release"`
} `json:"minio"`
// Metadata map for current object `fs.json`.
// checksums of blocks on disk.
Checksum FSChecksumInfoV1 `json:"checksum,omitempty"`
// Metadata map for current object.
Meta map[string]string `json:"meta,omitempty"`
// parts info for current object - used in encryption.
Parts []objectPartInfo `json:"parts,omitempty"`
}
// IsValid - tells if the format is sane by validating the version
// string and format style.
func (m fsMetaV1) IsValid() bool {
return isFSMetaValid(m.Version, m.Format)
return isFSMetaValid(m.Version)
}
// Verifies if the backend format metadata is sane by validating
// the version string and format style.
func isFSMetaValid(version, format string) bool {
return ((version == fsMetaVersion || version == fsMetaVersion100) &&
format == fsMetaFormat)
// Verifies if the backend format metadata is same by validating
// the version string.
func isFSMetaValid(version string) bool {
return (version == fsMetaVersion || version == fsMetaVersion100 || version == fsMetaVersion101)
}
// Converts metadata to object info.
@ -202,12 +256,9 @@ func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) {
// obtain version.
m.Version = parseFSVersion(fsMetaBuf)
// obtain format.
m.Format = parseFSFormat(fsMetaBuf)
// Verify if the format is valid, return corrupted format
// for unrecognized formats.
if !isFSMetaValid(m.Version, m.Format) {
if !isFSMetaValid(m.Version) {
return 0, errors.Trace(errCorruptedFormat)
}
@ -217,9 +268,6 @@ func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) {
// obtain metadata.
m.Meta = parseFSMetaMap(fsMetaBuf)
// obtain minio release date.
m.Minio.Release = parseFSRelease(fsMetaBuf)
// Success.
return int64(len(fsMetaBuf)), nil
}
@ -228,7 +276,5 @@ func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) {
func newFSMetaV1() (fsMeta fsMetaV1) {
fsMeta = fsMetaV1{}
fsMeta.Version = fsMetaVersion
fsMeta.Format = fsMetaFormat
fsMeta.Minio.Release = ReleaseTag
return fsMeta
}

View File

@ -21,6 +21,7 @@ import (
"context"
"os"
"path/filepath"
"reflect"
"testing"
)
@ -111,7 +112,47 @@ func TestWriteFSMetadata(t *testing.T) {
if fsMeta.Version != fsMetaVersion {
t.Fatalf("Unexpected version %s", fsMeta.Version)
}
if fsMeta.Format != fsMetaFormat {
t.Fatalf("Unexpected format %s", fsMeta.Format)
}
func TestFSChecksumV1MarshalJSON(t *testing.T) {
var cs FSChecksumInfoV1
testCases := []struct {
checksum FSChecksumInfoV1
expectedResult string
}{
{cs, `{"algorithm":"","blocksize":0,"hashes":null}`},
{FSChecksumInfoV1{Algorithm: "highwayhash", Blocksize: 500}, `{"algorithm":"highwayhash","blocksize":500,"hashes":null}`},
{FSChecksumInfoV1{Algorithm: "highwayhash", Blocksize: 10, Hashes: [][]byte{[]byte("hello")}}, `{"algorithm":"highwayhash","blocksize":10,"hashes":["68656c6c6f"]}`},
}
for _, testCase := range testCases {
data, _ := testCase.checksum.MarshalJSON()
if testCase.expectedResult != string(data) {
t.Fatalf("expected: %v, got: %v", testCase.expectedResult, string(data))
}
}
}
func TestFSChecksumV1UnMarshalJSON(t *testing.T) {
var cs FSChecksumInfoV1
testCases := []struct {
data []byte
expectedResult FSChecksumInfoV1
}{
{[]byte(`{"algorithm":"","blocksize":0,"hashes":null}`), cs},
{[]byte(`{"algorithm":"highwayhash","blocksize":500,"hashes":null}`), FSChecksumInfoV1{Algorithm: "highwayhash", Blocksize: 500}},
{[]byte(`{"algorithm":"highwayhash","blocksize":10,"hashes":["68656c6c6f"]}`), FSChecksumInfoV1{Algorithm: "highwayhash", Blocksize: 10, Hashes: [][]byte{[]byte("hello")}}},
}
for _, testCase := range testCases {
err := (&cs).UnmarshalJSON(testCase.data)
if err != nil {
t.Fatal("Unexpected error during checksum unmarshalling ", err)
}
if !reflect.DeepEqual(testCase.expectedResult, cs) {
t.Fatalf("expected: %v, got: %v", testCase.expectedResult, cs)
}
}
}

View File

@ -890,7 +890,7 @@ func (fs *FSObjects) getObjectETag(bucket, entry string, lock bool) (string, err
}
// Check if FS metadata is valid, if not return error.
if !isFSMetaValid(parseFSVersion(fsMetaBuf), parseFSFormat(fsMetaBuf)) {
if !isFSMetaValid(parseFSVersion(fsMetaBuf)) {
return "", toObjectErr(errors.Trace(errCorruptedFormat), bucket, entry)
}