Improve listing consistency with version merging (#13723)

This commit is contained in:
Klaus Post 2021-12-02 11:29:16 -08:00 committed by GitHub
parent 8309ddd486
commit 3db931dc0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 926 additions and 294 deletions

View File

@ -40,7 +40,7 @@ lint: ## runs golangci-lint suite of linters
check: test
test: verifiers build ## builds minio, runs linters, tests
@echo "Running unit tests"
@GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./... 1>/dev/null
@GO111MODULE=on CGO_ENABLED=0 go test -tags kqueue ./...
test-upgrade: build
@echo "Running minio upgrade tests"

View File

@ -41,11 +41,13 @@ type adminErasureTestBed struct {
erasureDirs []string
objLayer ObjectLayer
router *mux.Router
done context.CancelFunc
}
// prepareAdminErasureTestBed - helper function that setups a single-node
// Erasure backend for admin-handler tests.
func prepareAdminErasureTestBed(ctx context.Context) (*adminErasureTestBed, error) {
ctx, cancel := context.WithCancel(ctx)
// reset global variables to start afresh.
resetTestGlobals()
@ -57,11 +59,13 @@ func prepareAdminErasureTestBed(ctx context.Context) (*adminErasureTestBed, erro
// Initializing objectLayer for HealFormatHandler.
objLayer, erasureDirs, xlErr := initTestErasureObjLayer(ctx)
if xlErr != nil {
cancel()
return nil, xlErr
}
// Initialize minio server config.
if err := newTestConfig(globalMinioDefaultRegion, objLayer); err != nil {
cancel()
return nil, err
}
@ -84,12 +88,14 @@ func prepareAdminErasureTestBed(ctx context.Context) (*adminErasureTestBed, erro
erasureDirs: erasureDirs,
objLayer: objLayer,
router: adminRouter,
done: cancel,
}, nil
}
// TearDown - method that resets the test bed for subsequent unit
// tests to start afresh.
func (atb *adminErasureTestBed) TearDown() {
atb.done()
removeRoots(atb.erasureDirs)
resetTestGlobals()
}

View File

@ -656,6 +656,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
dirQuorum: getReadQuorum(len(f.disks)),
objQuorum: getReadQuorum(len(f.disks)),
bucket: "",
strict: false,
}
healObjectsPrefix := color.Green("healObjects:")

View File

@ -473,27 +473,32 @@ func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) erro
// start a collector that picks up entries from objectUpdatedCh
// and adds them to the current bloom filter.
func (d *dataUpdateTracker) startCollector(ctx context.Context) {
for in := range d.input {
bucket, _ := path2BucketObjectWithBasePath("", in)
if bucket == "" {
if d.debug && len(in) > 0 {
console.Debugf(color.Green("dataUpdateTracker:")+" no bucket (%s)\n", in)
for {
select {
case <-ctx.Done():
return
case in := <-d.input:
bucket, _ := path2BucketObjectWithBasePath("", in)
if bucket == "" {
if d.debug && len(in) > 0 {
console.Debugf(color.Green("dataUpdateTracker:")+" no bucket (%s)\n", in)
}
continue
}
continue
}
if isReservedOrInvalidBucket(bucket, false) {
continue
}
split := splitPathDeterministic(in)
if isReservedOrInvalidBucket(bucket, false) {
continue
}
split := splitPathDeterministic(in)
// Add all paths until done.
d.mu.Lock()
for i := range split {
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
// Add all paths until done.
d.mu.Lock()
for i := range split {
d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String())
}
d.dirty = d.dirty || len(split) > 0
d.mu.Unlock()
}
d.dirty = d.dirty || len(split) > 0
d.mu.Unlock()
}
}

View File

@ -24,7 +24,6 @@ import (
"os"
"path"
"reflect"
"runtime"
"testing"
"time"
@ -147,10 +146,6 @@ func TestHealing(t *testing.T) {
}
func TestHealingDanglingObject(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -180,6 +175,9 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatalf("Failed to make a bucket - %v", err)
}
disks = objLayer.(*erasureServerPools).serverPools[0].erasureDisks[0]
orgDisks := append([]StorageAPI{}, disks...)
// Enable versioning.
globalBucketMetadataSys.Update(bucket, bucketVersioningConfig, []byte(`<VersioningConfiguration><Status>Enabled</Status></VersioningConfiguration>`))
@ -190,11 +188,13 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0400); err != nil {
t.Fatal(err)
}
setDisks := func(newDisks ...StorageAPI) {
objLayer.(*erasureServerPools).serverPools[0].erasureDisksMu.Lock()
copy(disks, newDisks)
objLayer.(*erasureServerPools).serverPools[0].erasureDisksMu.Unlock()
}
// Remove 4 disks.
setDisks(nil, nil, nil, nil)
// Create delete marker under quorum.
objInfo, err := objLayer.DeleteObject(ctx, bucket, object, ObjectOptions{Versioned: true})
@ -202,11 +202,8 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0755); err != nil {
t.Fatal(err)
}
}
// Restore...
setDisks(orgDisks[:4]...)
fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
@ -243,11 +240,7 @@ func TestHealingDanglingObject(t *testing.T) {
}
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0400); err != nil {
t.Fatal(err)
}
}
setDisks(nil, nil, nil, nil)
rd := mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", "")
_, err = objLayer.PutObject(ctx, bucket, object, rd, ObjectOptions{
@ -257,11 +250,7 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0755); err != nil {
t.Fatal(err)
}
}
setDisks(orgDisks[:4]...)
fileInfoPreHeal, err = disks[0].ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {
@ -297,11 +286,7 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0400); err != nil {
t.Fatal(err)
}
}
setDisks(nil, nil, nil, nil)
// Create delete marker under quorum.
_, err = objLayer.DeleteObject(ctx, bucket, object, ObjectOptions{
@ -312,11 +297,7 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatal(err)
}
for _, fsDir := range fsDirs[:4] {
if err = os.Chmod(fsDir, 0755); err != nil {
t.Fatal(err)
}
}
setDisks(orgDisks[:4]...)
fileInfoPreHeal, err = disks[0].ReadVersion(context.Background(), bucket, object, "", false)
if err != nil {

View File

@ -1751,6 +1751,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
strict: false, // Allow less strict matching.
}
path := baseDirFromPrefix(prefix)

View File

@ -37,7 +37,7 @@ type metaCacheEntry struct {
metadata []byte
// cached contains the metadata if decoded.
cached *FileInfo
cached *xlMetaV2
// Indicates the entry can be reused and only one reference to metadata is expected.
reusable bool
@ -58,68 +58,80 @@ func (e metaCacheEntry) hasPrefix(s string) bool {
return strings.HasPrefix(e.name, s)
}
// matches returns if the entries match by comparing their latest version fileinfo.
func (e *metaCacheEntry) matches(other *metaCacheEntry, bucket string) bool {
// matches returns if the entries have the same versions.
// If strict is false we allow signatures to mismatch.
func (e *metaCacheEntry) matches(other *metaCacheEntry, strict bool) (prefer *metaCacheEntry, matches bool) {
if e == nil && other == nil {
return true
return nil, true
}
if e == nil || other == nil {
return false
if e == nil {
return other, false
}
if other == nil {
return e, false
}
// This should reject 99%
if len(e.metadata) != len(other.metadata) || e.name != other.name {
return false
// Name should match...
if e.name != other.name {
if e.name < other.name {
return e, false
}
return other, false
}
eFi, eErr := e.fileInfo(bucket)
oFi, oErr := other.fileInfo(bucket)
eVers, eErr := e.xlmeta()
oVers, oErr := other.xlmeta()
if eErr != nil || oErr != nil {
return eErr == oErr
return nil, false
}
// check both fileInfo's have same number of versions, if not skip
// the `other` entry.
if eFi.NumVersions != oFi.NumVersions {
return false
}
return eFi.ModTime.Equal(oFi.ModTime) && eFi.Size == oFi.Size && eFi.VersionID == oFi.VersionID
}
// resolveEntries returns if the entries match by comparing their latest version fileinfo.
func resolveEntries(a, b *metaCacheEntry, bucket string) *metaCacheEntry {
if b == nil {
return a
}
if a == nil {
return b
}
aFi, err := a.fileInfo(bucket)
if err != nil {
return b
}
bFi, err := b.fileInfo(bucket)
if err != nil {
return a
}
if aFi.NumVersions == bFi.NumVersions {
if aFi.ModTime.Equal(bFi.ModTime) {
return a
if len(eVers.versions) != len(oVers.versions) {
eTime := eVers.latestModtime()
oTime := oVers.latestModtime()
if !eTime.Equal(oTime) {
if eTime.After(oTime) {
return e, false
}
return other, false
}
if aFi.ModTime.After(bFi.ModTime) {
return a
// Tiebreak on version count.
if len(eVers.versions) > len(oVers.versions) {
return e, false
}
return b
return other, false
}
if bFi.NumVersions > aFi.NumVersions {
return b
}
// Check if each version matches...
for i, eVer := range eVers.versions {
oVer := oVers.versions[i]
if eVer.header != oVer.header {
if !strict && eVer.header.matchesNotStrict(oVer.header) {
if prefer == nil {
if eVer.header.sortsBefore(oVer.header) {
prefer = e
} else {
prefer = other
}
}
continue
}
if prefer != nil {
return prefer, false
}
return a
if eVer.header.sortsBefore(oVer.header) {
return e, false
}
return other, false
}
}
// If we match, return e
if prefer == nil {
prefer = e
}
return prefer, true
}
// isInDir returns whether the entry is in the dir when considering the separator.
@ -143,7 +155,10 @@ func (e metaCacheEntry) isInDir(dir, separator string) bool {
// If v2 and UNABLE to load metadata true will be returned.
func (e *metaCacheEntry) isLatestDeletemarker() bool {
if e.cached != nil {
return e.cached.Deleted
if len(e.cached.versions) == 0 {
return true
}
return e.cached.versions[0].header.Type == DeleteType
}
if !isXL2V1Format(e.metadata) {
return false
@ -152,8 +167,8 @@ func (e *metaCacheEntry) isLatestDeletemarker() bool {
return meta.IsLatestDeleteMarker()
}
// Fall back...
var xlMeta xlMetaV2
if err := xlMeta.Load(e.metadata); err != nil || len(xlMeta.versions) == 0 {
xlMeta, err := e.xlmeta()
if err != nil || len(xlMeta.versions) == 0 {
return true
}
return xlMeta.versions[0].header.Type == DeleteType
@ -162,24 +177,37 @@ func (e *metaCacheEntry) isLatestDeletemarker() bool {
// fileInfo returns the decoded metadata.
// If entry is a directory it is returned as that.
// If versioned the latest version will be returned.
func (e *metaCacheEntry) fileInfo(bucket string) (*FileInfo, error) {
func (e *metaCacheEntry) fileInfo(bucket string) (FileInfo, error) {
if e.isDir() {
return &FileInfo{
return FileInfo{
Volume: bucket,
Name: e.name,
Mode: uint32(os.ModeDir),
}, nil
}
if e.cached != nil {
return e.cached.ToFileInfo(bucket, e.name, "")
}
return getFileInfo(e.metadata, bucket, e.name, "", false)
}
// xlmeta returns the decoded metadata.
// This should not be called on directories.
func (e *metaCacheEntry) xlmeta() (*xlMetaV2, error) {
if e.isDir() {
return nil, errFileNotFound
}
if e.cached == nil {
if len(e.metadata) == 0 {
// only happens if the entry is not found.
return nil, errFileNotFound
}
fi, err := getFileInfo(e.metadata, bucket, e.name, "", false)
var xl xlMetaV2
err := xl.LoadOrConvert(e.metadata)
if err != nil {
return nil, err
}
e.cached = &fi
e.cached = &xl
}
return e.cached, nil
}
@ -200,6 +228,7 @@ func (e *metaCacheEntry) fileInfoVersions(bucket string) (FileInfoVersions, erro
},
}, nil
}
// Too small gains to reuse cache here.
return getFileInfoVersions(e.metadata, bucket, e.name)
}
@ -240,16 +269,15 @@ type metadataResolutionParams struct {
dirQuorum int // Number if disks needed for a directory to 'exist'.
objQuorum int // Number of disks needed for an object to 'exist'.
bucket string // Name of the bucket. Used for generating cached fileinfo.
strict bool // Versions must match exactly, including all metadata.
// Reusable slice for resolution
candidates []struct {
n int
e *metaCacheEntry
}
candidates [][]xlMetaV2ShallowVersion
}
// resolve multiple entries.
// entries are resolved by majority, then if tied by mod-time and versions.
// Names must match on all entries in m.
func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCacheEntry, ok bool) {
if len(m) == 0 {
return nil, false
@ -257,14 +285,14 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
dirExists := 0
if cap(r.candidates) < len(m) {
r.candidates = make([]struct {
n int
e *metaCacheEntry
}, 0, len(m))
r.candidates = make([][]xlMetaV2ShallowVersion, 0, len(m))
}
r.candidates = r.candidates[:0]
objsAgree := 0
objsValid := 0
for i := range m {
entry := &m[i]
// Empty entry
if entry.name == "" {
continue
}
@ -275,72 +303,67 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
continue
}
// Get new entry metadata
if _, err := entry.fileInfo(r.bucket); err != nil {
// Get new entry metadata,
// shallow decode.
xl, err := entry.xlmeta()
if err != nil {
logger.LogIf(context.Background(), err)
continue
}
objsValid++
found := false
for i, c := range r.candidates {
if c.e.matches(entry, r.bucket) {
c.n++
r.candidates[i] = c
found = true
break
}
}
if !found {
r.candidates = append(r.candidates, struct {
n int
e *metaCacheEntry
}{n: 1, e: entry})
}
}
if selected != nil && selected.isDir() && dirExists > r.dirQuorum {
return selected, true
}
// Add all valid to candidates.
r.candidates = append(r.candidates, xl.versions)
switch len(r.candidates) {
case 0:
// We select the first object we find as a candidate and see if all match that.
// This is to quickly identify if all agree.
if selected == nil {
return nil, false
selected = entry
objsAgree = 1
continue
}
if !selected.isDir() || dirExists < r.dirQuorum {
return nil, false
// Names match, check meta...
if prefer, ok := entry.matches(selected, r.strict); ok {
selected = prefer
objsAgree++
continue
}
return selected, true
case 1:
cand := r.candidates[0]
if cand.n < r.objQuorum {
return nil, false
}
return cand.e, true
default:
// Sort by matches....
sort.Slice(r.candidates, func(i, j int) bool {
return r.candidates[i].n > r.candidates[j].n
})
// Check if we have enough.
if r.candidates[0].n < r.objQuorum {
return nil, false
}
// if r.objQuorum == 1 then it is guaranteed that
// this resolver is for HealObjects(), so use resolveEntries()
// instead to resolve candidates, this check is only useful
// for regular cases of ListObjects()
if r.candidates[0].n > r.candidates[1].n && r.objQuorum > 1 {
ok := r.candidates[0].e != nil && r.candidates[0].e.name != ""
return r.candidates[0].e, ok
}
e := resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket)
// Tie between two, resolve using modtime+versions.
ok := e != nil && e.name != ""
return e, ok
}
// Return dir entries, if enough...
if selected != nil && selected.isDir() && dirExists >= r.dirQuorum {
return selected, true
}
// If we would never be able to reach read quorum.
if objsValid < r.objQuorum {
return nil, false
}
// If all objects agree.
if selected != nil && objsAgree == objsValid {
return selected, true
}
// Merge if we have disagreement.
// Create a new merged result.
selected = &metaCacheEntry{
name: selected.name,
reusable: true,
cached: &xlMetaV2{metaV: selected.cached.metaV},
}
selected.cached.versions = mergeXLV2Versions(r.objQuorum, r.strict, r.candidates...)
if len(selected.cached.versions) == 0 {
return nil, false
}
// Reserialize
var err error
selected.metadata, err = selected.cached.AppendTo(metaDataPoolGet())
if err != nil {
logger.LogIf(context.Background(), err)
return nil, false
}
return selected, true
}
// firstFound returns the first found and the number of set entries.

View File

@ -18,9 +18,12 @@
package cmd
import (
"fmt"
"math/rand"
"reflect"
"sort"
"testing"
"time"
)
func Test_metaCacheEntries_sort(t *testing.T) {
@ -219,3 +222,438 @@ func Test_metaCacheEntry_isInDir(t *testing.T) {
})
}
}
func Test_metaCacheEntries_resolve(t *testing.T) {
baseTime, err := time.Parse("2006/01/02", "2015/02/25")
if err != nil {
t.Fatal(err)
}
var inputs = []xlMetaV2{
0: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(30 * time.Minute).UnixNano(),
Signature: [4]byte{1, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
// Mismatches Modtime+Signature and older...
1: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(15 * time.Minute).UnixNano(),
Signature: [4]byte{2, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
// Has another version prior to the one we want.
2: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(60 * time.Minute).UnixNano(),
Signature: [4]byte{2, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(30 * time.Minute).UnixNano(),
Signature: [4]byte{1, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
// Has a completely different version id
3: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(60 * time.Minute).UnixNano(),
Signature: [4]byte{1, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
4: {
versions: []xlMetaV2ShallowVersion{},
},
// Has a zero version id
5: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{},
ModTime: baseTime.Add(60 * time.Minute).UnixNano(),
Signature: [4]byte{5, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
// Zero version, modtime newer..
6: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{},
ModTime: baseTime.Add(90 * time.Minute).UnixNano(),
Signature: [4]byte{6, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
7: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{},
ModTime: baseTime.Add(90 * time.Minute).UnixNano(),
Signature: [4]byte{6, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(60 * time.Minute).UnixNano(),
Signature: [4]byte{2, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(60 * time.Minute).UnixNano(),
Signature: [4]byte{1, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(30 * time.Minute).UnixNano(),
Signature: [4]byte{1, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
// Delete marker.
8: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7},
ModTime: baseTime.Add(90 * time.Minute).UnixNano(),
Signature: [4]byte{6, 1, 1, 1},
Type: DeleteType,
Flags: 0,
}},
},
},
// Delete marker and version from 1
9: {
versions: []xlMetaV2ShallowVersion{
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7},
ModTime: baseTime.Add(90 * time.Minute).UnixNano(),
Signature: [4]byte{6, 1, 1, 1},
Type: DeleteType,
Flags: 0,
}},
{header: xlMetaV2VersionHeader{
VersionID: [16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1},
ModTime: baseTime.Add(15 * time.Minute).UnixNano(),
Signature: [4]byte{2, 1, 1, 1},
Type: ObjectType,
Flags: 0,
}},
},
},
}
inputSerialized := make([]metaCacheEntry, len(inputs))
for i, xl := range inputs {
xl.sortByModTime()
var err error
var entry = metaCacheEntry{
name: "testobject",
}
entry.metadata, err = xl.AppendTo(nil)
if err != nil {
t.Fatal(err)
}
inputSerialized[i] = entry
}
tests := []struct {
name string
m metaCacheEntries
r metadataResolutionParams
wantSelected *metaCacheEntry
wantOk bool
}{
{
name: "consistent",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[0]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "consistent-strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[0]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "one zero, below quorum",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], metaCacheEntry{}},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: false},
wantSelected: nil,
wantOk: false,
},
{
name: "one zero, below quorum, strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], metaCacheEntry{}},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: true},
wantSelected: nil,
wantOk: false,
},
{
name: "one zero, at quorum",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], metaCacheEntry{}},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "one zero, at quorum, strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], metaCacheEntry{}},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: true},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "modtime, signature mismatch",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "modtime,signature mismatch, strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: true},
wantSelected: nil,
wantOk: false,
},
{
name: "modtime, signature mismatch, at quorum",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "modtime,signature mismatch, at quorum, strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: true},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "additional version",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
// Since we have the same version in all inputs, that is strictly ok.
name: "additional version, strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: true},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
// Since we have the same version in all inputs, that is strictly ok.
name: "additional version, quorum one",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 1, objQuorum: 1, strict: true},
// We get the both versions, since we only request quorum 1
wantSelected: &inputSerialized[2],
wantOk: true,
},
{
name: "additional version, quorum two",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[0], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: true},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "2 additional versions, quorum two",
m: metaCacheEntries{inputSerialized[0], inputSerialized[0], inputSerialized[2], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: true},
wantSelected: &inputSerialized[2],
wantOk: true,
},
{
// inputSerialized[1] have older versions of the second in inputSerialized[2]
name: "modtimemismatch",
m: metaCacheEntries{inputSerialized[1], inputSerialized[1], inputSerialized[2], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: false},
wantSelected: &inputSerialized[2],
wantOk: true,
},
{
// inputSerialized[1] have older versions of the second in inputSerialized[2]
name: "modtimemismatch,strict",
m: metaCacheEntries{inputSerialized[1], inputSerialized[1], inputSerialized[2], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: true},
wantSelected: &inputSerialized[2],
wantOk: true,
},
{
// inputSerialized[1] have older versions of the second in inputSerialized[2], but
// since it is not strict, we should get it that one (with latest modtime)
name: "modtimemismatch,not strict",
m: metaCacheEntries{inputSerialized[1], inputSerialized[1], inputSerialized[2], inputSerialized[2]},
r: metadataResolutionParams{dirQuorum: 4, objQuorum: 4, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "one-q1",
m: metaCacheEntries{inputSerialized[0], inputSerialized[4], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 1, objQuorum: 1, strict: false},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "one-q1-strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[4], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 1, objQuorum: 1, strict: true},
wantSelected: &inputSerialized[0],
wantOk: true,
},
{
name: "one-q2",
m: metaCacheEntries{inputSerialized[0], inputSerialized[4], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: false},
wantSelected: nil,
wantOk: false,
},
{
name: "one-q2-strict",
m: metaCacheEntries{inputSerialized[0], inputSerialized[4], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: true},
wantSelected: nil,
wantOk: false,
},
{
name: "two-diff-q2",
m: metaCacheEntries{inputSerialized[0], inputSerialized[3], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: false},
wantSelected: nil,
wantOk: false,
},
{
name: "zeroid",
m: metaCacheEntries{inputSerialized[5], inputSerialized[5], inputSerialized[6], inputSerialized[6]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: false},
wantSelected: &inputSerialized[6],
wantOk: true,
},
{
// When ID is zero, do not allow non-strict matches to reach quorum.
name: "zeroid-belowq",
m: metaCacheEntries{inputSerialized[5], inputSerialized[5], inputSerialized[6], inputSerialized[6]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: false},
wantSelected: nil,
wantOk: false,
},
{
name: "merge4",
m: metaCacheEntries{inputSerialized[2], inputSerialized[3], inputSerialized[5], inputSerialized[6]},
r: metadataResolutionParams{dirQuorum: 1, objQuorum: 1, strict: false},
wantSelected: &inputSerialized[7],
wantOk: true,
},
{
name: "deletemarker",
m: metaCacheEntries{inputSerialized[8], inputSerialized[4], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 1, objQuorum: 1, strict: false},
wantSelected: &inputSerialized[8],
wantOk: true,
},
{
name: "deletemarker-nonq",
m: metaCacheEntries{inputSerialized[8], inputSerialized[8], inputSerialized[4], inputSerialized[4]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: false},
wantSelected: nil,
wantOk: false,
},
{
name: "deletemarker-nonq",
m: metaCacheEntries{inputSerialized[8], inputSerialized[8], inputSerialized[8], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: false},
wantSelected: &inputSerialized[8],
wantOk: true,
},
{
name: "deletemarker-mixed",
m: metaCacheEntries{inputSerialized[8], inputSerialized[8], inputSerialized[1], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 2, objQuorum: 2, strict: false},
wantSelected: &inputSerialized[9],
wantOk: true,
},
{
name: "deletemarker-q3",
m: metaCacheEntries{inputSerialized[8], inputSerialized[9], inputSerialized[9], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: false},
wantSelected: &inputSerialized[9],
wantOk: true,
},
{
name: "deletemarker-q3-strict",
m: metaCacheEntries{inputSerialized[8], inputSerialized[9], inputSerialized[9], inputSerialized[1]},
r: metadataResolutionParams{dirQuorum: 3, objQuorum: 3, strict: true},
wantSelected: &inputSerialized[9],
wantOk: true,
},
}
for testID, tt := range tests {
rng := rand.New(rand.NewSource(0))
// Run for a number of times, shuffling the input to ensure that output is consistent.
for i := 0; i < 10; i++ {
t.Run(fmt.Sprintf("test-%d-%s-run-%d", testID, tt.name, i), func(t *testing.T) {
if i > 0 {
rng.Shuffle(len(tt.m), func(i, j int) {
tt.m[i], tt.m[j] = tt.m[j], tt.m[i]
})
}
gotSelected, gotOk := tt.m.resolve(&tt.r)
if gotOk != tt.wantOk {
t.Errorf("resolve() gotOk = %v, want %v", gotOk, tt.wantOk)
}
if gotSelected != nil {
gotSelected.cached = nil
gotSelected.reusable = false
}
if !reflect.DeepEqual(gotSelected, tt.wantSelected) {
wantM, _ := tt.wantSelected.xlmeta()
gotM, _ := gotSelected.xlmeta()
t.Errorf("resolve() gotSelected = \n%#v, want \n%#v", *gotM, *wantM)
}
})
}
}
}

View File

@ -764,7 +764,7 @@ type listPathRawOptions struct {
// agreed is called if all disks agreed.
agreed func(entry metaCacheEntry)
// partial will be returned when there is disagreement between disks.
// partial will be called when there is disagreement between disks.
// if disk did not return any result, but also haven't errored
// the entry will be empty and errs will
partial func(entries metaCacheEntries, nAgreed int, errs []error)
@ -905,7 +905,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
continue
}
// If exact match, we agree.
if current.matches(&entry, opts.bucket) {
if _, ok := current.matches(&entry, true); ok {
topEntries[i] = entry
agree++
continue

View File

@ -31,13 +31,13 @@ func _() {
_ = x[storageMetricUpdateMetadata-20]
_ = x[storageMetricReadVersion-21]
_ = x[storageMetricReadAll-22]
_ = x[storageStatInfoFile-23]
_ = x[storageMetricStatInfoFile-23]
_ = x[storageMetricLast-24]
}
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllstorageStatInfoFileLast"
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllStatInfoFileLast"
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 224, 243, 247}
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 224, 236, 240}
func (i storageMetric) String() string {
if i >= storageMetric(len(_storageMetric_index)-1) {

View File

@ -57,7 +57,7 @@ const (
storageMetricUpdateMetadata
storageMetricReadVersion
storageMetricReadAll
storageStatInfoFile
storageMetricStatInfoFile
// .... add more
@ -548,7 +548,7 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path
}
func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) {
defer p.updateStorageMetrics(storageStatInfoFile, volume, path)()
defer p.updateStorageMetrics(storageMetricStatInfoFile, volume, path)()
if contextCanceled(ctx) {
return nil, ctx.Err()

View File

@ -18,122 +18,87 @@
package cmd
import (
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/logger"
"github.com/zeebo/xxh3"
)
func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) {
if isXL2V1Format(xlMetaBuf) {
var versions []FileInfo
var err error
if buf, _ := isIndexedMetaV2(xlMetaBuf); buf != nil {
versions, err = buf.ListVersions(volume, path)
fivs, err := getAllFileInfoVersions(xlMetaBuf, volume, path)
if err != nil {
return fivs, err
}
n := 0
for _, fi := range fivs.Versions {
// Filter our tier object delete marker
if !fi.TierFreeVersion() {
fivs.Versions[n] = fi
n++
} else {
var xlMeta xlMetaV2
if err := xlMeta.Load(xlMetaBuf); err != nil {
return FileInfoVersions{}, err
}
versions, err = xlMeta.ListVersions(volume, path)
fivs.FreeVersions = append(fivs.FreeVersions, fi)
}
if err != nil || len(versions) == 0 {
}
fivs.Versions = fivs.Versions[:n]
// Update numversions
for i := range fivs.Versions {
fivs.Versions[i].NumVersions = n
}
return fivs, nil
}
func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) {
var versions []FileInfo
var err error
if buf, _ := isIndexedMetaV2(xlMetaBuf); buf != nil {
versions, err = buf.ListVersions(volume, path)
} else {
var xlMeta xlMetaV2
if err := xlMeta.LoadOrConvert(xlMetaBuf); err != nil {
return FileInfoVersions{}, err
}
var freeVersions []FileInfo
n := 0
for _, fi := range versions {
if fi.TierFreeVersion() {
freeVersions = append(freeVersions, fi)
continue
}
versions[n] = fi
n++
}
versions = versions[:n]
for _, ver := range versions {
ver.NumVersions = n
}
return FileInfoVersions{
Volume: volume,
Name: path,
Versions: versions,
FreeVersions: freeVersions,
LatestModTime: versions[0].ModTime,
}, nil
versions, err = xlMeta.ListVersions(volume, path)
}
xlMeta := &xlMetaV1Object{}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(xlMetaBuf, xlMeta); err != nil {
return FileInfoVersions{}, errFileCorrupt
}
fi, err := xlMeta.ToFileInfo(volume, path)
if err != nil {
if err != nil || len(versions) == 0 {
return FileInfoVersions{}, err
}
fi.IsLatest = true // No versions so current version is latest.
fi.NumVersions = 1 // just this version
return FileInfoVersions{
Volume: volume,
Name: path,
Versions: []FileInfo{fi},
LatestModTime: fi.ModTime,
Versions: versions,
LatestModTime: versions[0].ModTime,
}, nil
}
func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (FileInfo, error) {
if isXL2V1Format(xlMetaBuf) {
var fi FileInfo
var err error
var inData xlMetaInlineData
if buf, data := isIndexedMetaV2(xlMetaBuf); buf != nil {
inData = data
fi, err = buf.ToFileInfo(volume, path, versionID)
} else {
var xlMeta xlMetaV2
if err := xlMeta.Load(xlMetaBuf); err != nil {
return FileInfo{}, err
}
inData = xlMeta.data
fi, err = xlMeta.ToFileInfo(volume, path, versionID)
var fi FileInfo
var err error
var inData xlMetaInlineData
if buf, data := isIndexedMetaV2(xlMetaBuf); buf != nil {
inData = data
fi, err = buf.ToFileInfo(volume, path, versionID)
} else {
var xlMeta xlMetaV2
if err := xlMeta.LoadOrConvert(xlMetaBuf); err != nil {
return FileInfo{}, err
}
if !data || err != nil {
return fi, err
}
versionID := fi.VersionID
if versionID == "" {
versionID = nullVersionID
}
fi.Data = inData.find(versionID)
if len(fi.Data) == 0 {
// PR #11758 used DataDir, preserve it
// for users who might have used master
// branch
fi.Data = inData.find(fi.DataDir)
}
return fi, nil
inData = xlMeta.data
fi, err = xlMeta.ToFileInfo(volume, path, versionID)
}
xlMeta := &xlMetaV1Object{}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(xlMetaBuf, xlMeta); err != nil {
logger.LogIf(GlobalContext, fmt.Errorf("unable to unmarshal json object: %v", err))
return FileInfo{}, errFileCorrupt
if !data || err != nil {
return fi, err
}
fi, err := xlMeta.ToFileInfo(volume, path)
if err != nil {
return FileInfo{}, err
versionID = fi.VersionID
if versionID == "" {
versionID = nullVersionID
}
fi.XLV1 = true // indicates older version
fi.IsLatest = true // No versions so current version is latest.
return fi, err
fi.Data = inData.find(versionID)
if len(fi.Data) == 0 {
// PR #11758 used DataDir, preserve it
// for users who might have used master
// branch
fi.Data = inData.find(fi.DataDir)
}
return fi, nil
}
// getXLDiskLoc will return the pool/set/disk id if it can be located in the object layer.

View File

@ -192,16 +192,17 @@ func (m *xlMetaV1Object) ToFileInfo(volume, path string) (FileInfo, error) {
}
fi := FileInfo{
Volume: volume,
Name: path,
ModTime: m.Stat.ModTime,
Size: m.Stat.Size,
Metadata: m.Meta,
Parts: m.Parts,
Erasure: m.Erasure,
VersionID: m.VersionID,
DataDir: m.DataDir,
XLV1: true,
Volume: volume,
Name: path,
ModTime: m.Stat.ModTime,
Size: m.Stat.Size,
Metadata: m.Meta,
Parts: m.Parts,
Erasure: m.Erasure,
VersionID: m.VersionID,
DataDir: m.DataDir,
XLV1: true,
NumVersions: 1,
}
return fi, nil

View File

@ -33,6 +33,7 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
@ -49,7 +50,7 @@ var (
)
//go:generate msgp -file=$GOFILE -unexported
//go:generate stringer -type VersionType -output=xl-storage-format-v2_string.go $GOFILE
//go:generate stringer -type VersionType,ErasureAlgo -output=xl-storage-format-v2_string.go $GOFILE
const (
// Breaking changes.
@ -130,14 +131,6 @@ func (e ErasureAlgo) valid() bool {
return e > invalidErasureAlgo && e < lastErasureAlgo
}
func (e ErasureAlgo) String() string {
switch e {
case ReedSolomon:
return "reedsolomon"
}
return ""
}
// ChecksumAlgo defines common type of different checksum algorithms
type ChecksumAlgo uint8
@ -269,6 +262,45 @@ func (x xlMetaV2VersionHeader) String() string {
)
}
// matchesNotStrict returns whether x and o have both have non-zero version,
// their versions match and their type match.
func (x xlMetaV2VersionHeader) matchesNotStrict(o xlMetaV2VersionHeader) bool {
return x.VersionID != [16]byte{} &&
x.VersionID == o.VersionID &&
x.Type == o.Type
}
// sortsBefore can be used as a tiebreaker for stable sorting/selecting.
// Returns false on ties.
func (x xlMetaV2VersionHeader) sortsBefore(o xlMetaV2VersionHeader) bool {
if x == o {
return false
}
// Prefer newest modtime.
if x.ModTime != o.ModTime {
return x.ModTime > o.ModTime
}
// The following doesn't make too much sense, but we want sort to be consistent nonetheless.
// Prefer lower types
if x.Type != o.Type {
return x.Type < o.Type
}
// Consistent sort on signature
if v := bytes.Compare(x.Signature[:], o.Signature[:]); v != 0 {
return v > 0
}
// On ID mismatch
if v := bytes.Compare(x.VersionID[:], o.VersionID[:]); v != 0 {
return v > 0
}
// Flags
if x.Flags != o.Flags {
return x.Flags > o.Flags
}
return false
}
// Valid xl meta xlMetaV2Version is valid
func (j xlMetaV2Version) Valid() bool {
if !j.Type.valid() {
@ -372,6 +404,7 @@ func (j xlMetaV2Version) getVersionID() [16]byte {
return [16]byte{}
}
// ToFileInfo returns FileInfo of the underlying type.
func (j *xlMetaV2Version) ToFileInfo(volume, path string) (FileInfo, error) {
switch j.Type {
case ObjectType:
@ -788,6 +821,26 @@ type xlMetaV2 struct {
metaV uint8
}
// LoadOrConvert will load the metadata in the buffer.
// If this is a legacy format, it will automatically be converted to XLV2.
func (x *xlMetaV2) LoadOrConvert(buf []byte) error {
if isXL2V1Format(buf) {
return x.Load(buf)
}
xlMeta := &xlMetaV1Object{}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(buf, xlMeta); err != nil {
return errFileCorrupt
}
if len(x.versions) > 0 {
x.versions = x.versions[:0]
}
x.data = nil
x.metaV = xlMetaVersion
return x.AddLegacy(xlMeta)
}
// Load all versions of the stored data.
// Note that references to the incoming buffer will be kept.
func (x *xlMetaV2) Load(buf []byte) error {
@ -924,6 +977,14 @@ func (x *xlMetaV2) loadLegacy(buf []byte) error {
return nil
}
// latestModtime returns the modtime of the latest version.
func (x *xlMetaV2) latestModtime() time.Time {
if x == nil || len(x.versions) == 0 {
return time.Time{}
}
return time.Unix(0, x.versions[0].header.ModTime)
}
func (x *xlMetaV2) addVersion(ver xlMetaV2Version) error {
modTime := ver.getModTime().UnixNano()
if !ver.Valid() {
@ -1059,14 +1120,14 @@ func (x *xlMetaV2) setIdx(idx int, ver xlMetaV2Version) (err error) {
func (x *xlMetaV2) sortByModTime() {
// Quick check
if len(x.versions) <= 1 || sort.SliceIsSorted(x.versions, func(i, j int) bool {
return x.versions[i].header.ModTime > x.versions[j].header.ModTime
return x.versions[i].header.sortsBefore(x.versions[j].header)
}) {
return
}
// We should sort.
sort.Slice(x.versions, func(i, j int) bool {
return x.versions[i].header.ModTime > x.versions[j].header.ModTime
return x.versions[i].header.sortsBefore(x.versions[j].header)
})
}
@ -1506,7 +1567,6 @@ func (x *xlMetaV2) AddLegacy(m *xlMetaV1Object) error {
return errFileCorrupt
}
m.VersionID = nullVersionID
m.DataDir = legacyDataDir
return x.addVersion(xlMetaV2Version{ObjectV1: m, Type: LegacyType})
}
@ -1605,6 +1665,137 @@ func (x xlMetaV2) ListVersions(volume, path string) ([]FileInfo, error) {
return versions, nil
}
// mergeXLV2Versions will merge all versions, typically from different disks
// that have at least quorum entries in all metas.
// Quorum must be the minimum number of matching metadata files.
// Quorum should be > 1 and <= len(versions).
// If strict is set to false, entries that match type
func mergeXLV2Versions(quorum int, strict bool, versions ...[]xlMetaV2ShallowVersion) (merged []xlMetaV2ShallowVersion) {
if len(versions) < quorum || len(versions) == 0 {
return nil
}
if len(versions) == 1 {
return versions[0]
}
if quorum == 1 {
// No need for non-strict checks if quorum is 1.
strict = true
}
// Our result
merged = make([]xlMetaV2ShallowVersion, 0, len(versions[0]))
tops := make([]xlMetaV2ShallowVersion, len(versions))
for {
// Step 1 create slice with all top versions.
tops = tops[:0]
var topSig [4]byte
var topID [16]byte
consistent := true // Are all signatures consistent (shortcut)
for _, vers := range versions {
if len(vers) == 0 {
consistent = false
continue
}
ver := vers[0]
if len(tops) == 0 {
consistent = true
topSig = ver.header.Signature
topID = ver.header.VersionID
} else {
consistent = consistent && topSig == ver.header.Signature && topID == ver.header.VersionID
}
tops = append(tops, vers[0])
}
// Check if done...
if len(tops) < quorum {
// We couldn't gather enough for quorum
break
}
var latest xlMetaV2ShallowVersion
var latestCount int
if consistent {
// All had the same signature, easy.
latest = tops[0]
latestCount = len(tops)
merged = append(merged, latest)
} else {
// Find latest.
for i, ver := range tops {
if ver.header == latest.header {
latestCount++
continue
}
if i == 0 || ver.header.sortsBefore(latest.header) {
if i == 0 {
latestCount = 1
} else if !strict && ver.header.matchesNotStrict(latest.header) {
latestCount++
} else {
latestCount = 1
}
latest = ver
continue
}
// Mismatch, but older.
if !strict && ver.header.matchesNotStrict(latest.header) {
// If non-nil version ID and it matches, assume match, but keep newest.
if ver.header.sortsBefore(latest.header) {
latest = ver
}
latestCount++
}
}
if latestCount >= quorum {
merged = append(merged, latest)
}
}
// Remove from all streams up until latest modtime or if selected.
for i, vers := range versions {
for _, ver := range vers {
// Truncate later modtimes, not selected.
if ver.header.ModTime > latest.header.ModTime {
versions[i] = versions[i][1:]
continue
}
// Truncate matches
if ver.header == latest.header {
versions[i] = versions[i][1:]
continue
}
// Truncate non-empty version and type matches
if latest.header.VersionID == ver.header.VersionID {
versions[i] = versions[i][1:]
continue
}
// Skip versions with version id we already emitted.
for _, mergedV := range merged {
if ver.header.VersionID == mergedV.header.VersionID {
versions[i] = versions[i][1:]
continue
}
}
// Keep top entry (and remaining)...
break
}
}
}
// Sanity check. Enable if duplicates show up.
if false {
var found = make(map[[16]byte]struct{})
for _, ver := range merged {
if _, ok := found[ver.header.VersionID]; ok {
panic("found dupe")
}
found[ver.header.VersionID] = struct{}{}
}
}
return merged
}
type xlMetaBuf []byte
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure

View File

@ -1,4 +1,4 @@
// Code generated by "stringer -type VersionType -output=xl-storage-format-v2_string.go xl-storage-format-v2.go"; DO NOT EDIT.
// Code generated by "stringer -type VersionType,ErasureAlgo -output=xl-storage-format-v2_string.go xl-storage-format-v2.go"; DO NOT EDIT.
package cmd
@ -25,3 +25,22 @@ func (i VersionType) String() string {
}
return _VersionType_name[_VersionType_index[i]:_VersionType_index[i+1]]
}
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[invalidErasureAlgo-0]
_ = x[ReedSolomon-1]
_ = x[lastErasureAlgo-2]
}
const _ErasureAlgo_name = "invalidErasureAlgoReedSolomonlastErasureAlgo"
var _ErasureAlgo_index = [...]uint8{0, 18, 29, 44}
func (i ErasureAlgo) String() string {
if i >= ErasureAlgo(len(_ErasureAlgo_index)-1) {
return "ErasureAlgo(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _ErasureAlgo_name[_ErasureAlgo_index[i]:_ErasureAlgo_index[i+1]]
}

View File

@ -2058,6 +2058,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
logger.LogIf(ctx, err)
// Data appears corrupt. Drop data.
} else {
xlMetaLegacy.DataDir = legacyDataDir
if err = xlMeta.AddLegacy(xlMetaLegacy); err != nil {
logger.LogIf(ctx, err)
}