fs: Enable returning ETag along with ListObjects() (#4042)

This is to comply with S3 behavior, we previously removed
reading `fs.json` for optimization reasons but we have a
reason to believe that providing ETag and using gjson
provides needed benefit of not having to deal with
unmarshalling overhead of golang stdlib.

Fixes #4028
This commit is contained in:
Harshavardhana 2017-04-04 09:14:03 -07:00 committed by GitHub
parent 52d8f564bf
commit 4747adfcb4
5 changed files with 127 additions and 34 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/lock"
"github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/mimedb"
"github.com/tidwall/gjson"
) )
const ( const (
@ -66,7 +67,7 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
Name: object, Name: object,
} }
// We set file into only if its valid. // We set file info only if its valid.
objInfo.ModTime = timeSentinel objInfo.ModTime = timeSentinel
if fi != nil { if fi != nil {
objInfo.ModTime = fi.ModTime() objInfo.ModTime = fi.ModTime()
@ -144,29 +145,76 @@ func (m *fsMetaV1) WriteTo(lk *lock.LockedFile) (n int64, err error) {
return int64(len(metadataBytes)), nil return int64(len(metadataBytes)), nil
} }
func parseFSVersion(fsMetaBuf []byte) string {
return gjson.GetBytes(fsMetaBuf, "version").String()
}
func parseFSFormat(fsMetaBuf []byte) string {
return gjson.GetBytes(fsMetaBuf, "format").String()
}
func parseFSRelease(fsMetaBuf []byte) string {
return gjson.GetBytes(fsMetaBuf, "minio.release").String()
}
func parseFSMetaMap(fsMetaBuf []byte) map[string]string {
// Get xlMetaV1.Meta map.
metaMapResult := gjson.GetBytes(fsMetaBuf, "meta").Map()
metaMap := make(map[string]string)
for key, valResult := range metaMapResult {
metaMap[key] = valResult.String()
}
return metaMap
}
func parseFSParts(fsMetaBuf []byte) []objectPartInfo {
// Parse the FS Parts.
partsResult := gjson.GetBytes(fsMetaBuf, "parts").Array()
partInfo := make([]objectPartInfo, len(partsResult))
for i, p := range partsResult {
info := objectPartInfo{}
info.Number = int(p.Get("number").Int())
info.Name = p.Get("name").String()
info.ETag = p.Get("etag").String()
info.Size = p.Get("size").Int()
partInfo[i] = info
}
return partInfo
}
func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) { func (m *fsMetaV1) ReadFrom(lk *lock.LockedFile) (n int64, err error) {
var metadataBytes []byte var fsMetaBuf []byte
fi, err := lk.Stat() fi, err := lk.Stat()
if err != nil { if err != nil {
return 0, traceError(err) return 0, traceError(err)
} }
metadataBytes, err = ioutil.ReadAll(io.NewSectionReader(lk, 0, fi.Size())) fsMetaBuf, err = ioutil.ReadAll(io.NewSectionReader(lk, 0, fi.Size()))
if err != nil { if err != nil {
return 0, traceError(err) return 0, traceError(err)
} }
if len(metadataBytes) == 0 { if len(fsMetaBuf) == 0 {
return 0, traceError(io.EOF) return 0, traceError(io.EOF)
} }
// Decode `fs.json` into fsMeta structure. // obtain version.
if err = json.Unmarshal(metadataBytes, m); err != nil { m.Version = parseFSVersion(fsMetaBuf)
return 0, traceError(err)
} // obtain format.
m.Format = parseFSFormat(fsMetaBuf)
// obtain metadata.
m.Meta = parseFSMetaMap(fsMetaBuf)
// obtain parts info list.
m.Parts = parseFSParts(fsMetaBuf)
// obtain minio release date.
m.Minio.Release = parseFSRelease(fsMetaBuf)
// Success. // Success.
return int64(len(metadataBytes)), nil return int64(len(fsMetaBuf)), nil
} }
// FS metadata constants. // FS metadata constants.

View File

@ -18,7 +18,6 @@ package cmd
import ( import (
"bytes" "bytes"
"os"
"path/filepath" "path/filepath"
"testing" "testing"
) )
@ -73,18 +72,6 @@ func TestReadFSMetadata(t *testing.T) {
if _, err = fsMeta.ReadFrom(rlk.LockedFile); err != nil { if _, err = fsMeta.ReadFrom(rlk.LockedFile); err != nil {
t.Fatal("Unexpected error ", err) t.Fatal("Unexpected error ", err)
} }
// Corrupted fs.json
file, err := os.OpenFile(preparePath(fsPath), os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
t.Fatal("Unexpected error ", err)
}
file.Write([]byte{'a'})
file.Close()
fsMeta = fsMetaV1{}
if _, err := fsMeta.ReadFrom(rlk.LockedFile); err == nil {
t.Fatal("Should fail", err)
}
} }
// TestWriteFSMetadata - tests of writeFSMetadata with healthy disk. // TestWriteFSMetadata - tests of writeFSMetadata with healthy disk.

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"hash" "hash"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -687,6 +688,41 @@ func (fs fsObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc {
return listDir return listDir
} }
// getObjectETag is a helper function, which returns only the md5sum
// of the file on the disk.
func (fs fsObjects) getObjectETag(bucket, entry string) (string, error) {
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, entry, fsMetaJSONFile)
// Read `fs.json` to perhaps contend with
// parallel Put() operations.
rlk, err := fs.rwPool.Open(fsMetaPath)
// Ignore if `fs.json` is not available, this is true for pre-existing data.
if err != nil && err != errFileNotFound {
return "", toObjectErr(traceError(err), bucket, entry)
}
// If file is not found, we don't need to proceed forward.
if err == errFileNotFound {
return "", nil
}
// Read from fs metadata only if it exists.
defer fs.rwPool.Close(fsMetaPath)
fsMetaBuf, err := ioutil.ReadAll(rlk.LockedFile)
if err != nil {
// `fs.json` can be empty due to previously failed
// PutObject() transaction, if we arrive at such
// a situation we just ignore and continue.
if errorCause(err) != io.EOF {
return "", toObjectErr(err, bucket, entry)
}
}
fsMetaMap := parseFSMetaMap(fsMetaBuf)
return fsMetaMap["md5Sum"], nil
}
// ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool // ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool
// state for future re-entrant list requests. // state for future re-entrant list requests.
func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
@ -731,14 +767,33 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
objInfo.IsDir = true objInfo.IsDir = true
return return
} }
// Protect reading `fs.json`.
objectLock := globalNSMutex.NewNSLock(bucket, entry)
objectLock.RLock()
var md5Sum string
md5Sum, err = fs.getObjectETag(bucket, entry)
objectLock.RUnlock()
if err != nil {
return ObjectInfo{}, err
}
// Stat the file to get file size. // Stat the file to get file size.
var fi os.FileInfo var fi os.FileInfo
fi, err = fsStatFile(pathJoin(fs.fsPath, bucket, entry)) fi, err = fsStatFile(pathJoin(fs.fsPath, bucket, entry))
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, entry) return ObjectInfo{}, toObjectErr(err, bucket, entry)
} }
fsMeta := fsMetaV1{}
return fsMeta.ToObjectInfo(bucket, entry, fi), nil // Success.
return ObjectInfo{
Name: entry,
Bucket: bucket,
Size: fi.Size(),
ModTime: fi.ModTime(),
IsDir: fi.IsDir(),
MD5Sum: md5Sum,
}, nil
} }
heal := false // true only for xl.ListObjectsHeal() heal := false // true only for xl.ListObjectsHeal()

View File

@ -81,7 +81,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: false, IsTruncated: false,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
{Name: "Asia/India/India-summer-photos-1"}, {Name: "Asia/India/India-summer-photos-1"},
{Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"}, {Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"},
{Name: "newPrefix0"}, {Name: "newPrefix0"},
@ -97,7 +97,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: true, IsTruncated: true,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
{Name: "Asia/India/India-summer-photos-1"}, {Name: "Asia/India/India-summer-photos-1"},
{Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"}, {Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"},
{Name: "newPrefix0"}, {Name: "newPrefix0"},
@ -109,7 +109,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: true, IsTruncated: true,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
{Name: "Asia/India/India-summer-photos-1"}, {Name: "Asia/India/India-summer-photos-1"},
{Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"}, {Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"},
{Name: "newPrefix0"}, {Name: "newPrefix0"},
@ -120,7 +120,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: true, IsTruncated: true,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
{Name: "Asia/India/India-summer-photos-1"}, {Name: "Asia/India/India-summer-photos-1"},
{Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"}, {Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"},
}, },
@ -131,7 +131,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: true, IsTruncated: true,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
}, },
}, },
// ListObjectsResult-5. // ListObjectsResult-5.
@ -234,7 +234,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: false, IsTruncated: false,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
{Name: "Asia/India/India-summer-photos-1"}, {Name: "Asia/India/India-summer-photos-1"},
{Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"}, {Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"},
{Name: "newPrefix0"}, {Name: "newPrefix0"},
@ -343,7 +343,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: false, IsTruncated: false,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
{Name: "Asia/India/India-summer-photos-1"}, {Name: "Asia/India/India-summer-photos-1"},
{Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"}, {Name: "Asia/India/Karnataka/Bangalore/Koramangala/pics"},
}, },
@ -354,7 +354,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
{ {
IsTruncated: false, IsTruncated: false,
Objects: []ObjectInfo{ Objects: []ObjectInfo{
{Name: "Asia-maps.png", ContentType: "image/png"}, {Name: "Asia-maps.png"},
}, },
}, },
// ListObjectsResult-26. // ListObjectsResult-26.
@ -549,8 +549,8 @@ func testListObjects(obj ObjectLayer, instanceType string, t TestErrHandler) {
if testCase.result.Objects[j].Name != result.Objects[j].Name { if testCase.result.Objects[j].Name != result.Objects[j].Name {
t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, result.Objects[j].Name) t.Errorf("Test %d: %s: Expected object name to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].Name, result.Objects[j].Name)
} }
if testCase.result.Objects[j].ContentType != result.Objects[j].ContentType { if result.Objects[j].MD5Sum == "" {
t.Errorf("Test %d: %s: Expected object contentType to be \"%s\", but found \"%s\" instead", i+1, instanceType, testCase.result.Objects[j].ContentType, result.Objects[j].ContentType) t.Errorf("Test %d: %s: Expected md5sum to be not empty, but found empty instead", i+1, instanceType)
} }
} }

View File

@ -67,6 +67,9 @@ func init() {
// Set system resources to maximum. // Set system resources to maximum.
setMaxResources() setMaxResources()
// Quiet logging.
log.logger.Hooks = nil
} }
func prepareFS() (ObjectLayer, string, error) { func prepareFS() (ObjectLayer, string, error) {