mirror of https://github.com/minio/minio.git
XL: listOnlineDisks should use modTime instead of version. (#2166)
This change is needed to make reading from objects future proof in-terms of handling online disks. Our current counter is not based on affirmative knowledge and relies on arithmetic sequence which can lead to bugs. Using modTime simplifies the understanding of `xl.json` and future tooling / debugging of the format.
This commit is contained in:
parent
e5cd35aad0
commit
623e0f9243
|
@ -42,7 +42,6 @@
|
|||
],
|
||||
},
|
||||
"stat": {
|
||||
"version": 0,
|
||||
"modTime": "2016-05-24T00:09:40.122390255Z",
|
||||
"size": 14824084
|
||||
},
|
||||
|
|
|
@ -16,33 +16,65 @@
|
|||
|
||||
package main
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Get the highest integer from a given integer slice.
|
||||
func highestInt(intSlice []int64, highestInt int64) (highestInteger int64) {
|
||||
highestInteger = highestInt
|
||||
for _, integer := range intSlice {
|
||||
if highestInteger < integer {
|
||||
highestInteger = integer
|
||||
break
|
||||
// commonTime returns a maximally occurring time from a list of time.
|
||||
func commonTime(modTimes []time.Time) (modTime time.Time) {
|
||||
var maxima int // Counter for remembering max occurrence of elements.
|
||||
timeOccurenceMap := make(map[time.Time]int)
|
||||
// Ignore the uuid sentinel and count the rest.
|
||||
for _, time := range modTimes {
|
||||
if time == timeSentinel {
|
||||
continue
|
||||
}
|
||||
timeOccurenceMap[time]++
|
||||
}
|
||||
// Find the common cardinality from previously collected
|
||||
// occurrences of elements.
|
||||
for time, count := range timeOccurenceMap {
|
||||
if count > maxima {
|
||||
maxima = count
|
||||
modTime = time
|
||||
}
|
||||
}
|
||||
return highestInteger
|
||||
// Return the collected common uuid.
|
||||
return modTime
|
||||
}
|
||||
|
||||
// Extracts objects versions from xlMetaV1 slice and returns version slice.
|
||||
func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64) {
|
||||
versions = make([]int64, len(partsMetadata))
|
||||
var timeSentinel = time.Unix(0, 0).UTC()
|
||||
|
||||
// Boot uuids upto disk count, setting the value to UUID sentinel.
|
||||
func bootModtimes(diskCount int) []time.Time {
|
||||
modTimes := make([]time.Time, diskCount)
|
||||
// Boots up all the uuids.
|
||||
for i := range modTimes {
|
||||
modTimes[i] = timeSentinel
|
||||
}
|
||||
return modTimes
|
||||
}
|
||||
|
||||
// Extracts list of times from xlMetaV1 slice and returns, skips
|
||||
// slice elements which have errors. As a special error
|
||||
// errFileNotFound is treated as a initial good condition.
|
||||
func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time.Time) {
|
||||
modTimes = bootModtimes(len(partsMetadata))
|
||||
// Set a new time value, specifically set when
|
||||
// error == errFileNotFound (this is needed when this is a
|
||||
// fresh PutObject).
|
||||
timeNow := time.Now().UTC()
|
||||
for index, metadata := range partsMetadata {
|
||||
if errs[index] == nil {
|
||||
versions[index] = metadata.Stat.Version
|
||||
// Once the file is found, save the uuid saved on disk.
|
||||
modTimes[index] = metadata.Stat.ModTime
|
||||
} else if errs[index] == errFileNotFound {
|
||||
versions[index] = 1
|
||||
} else {
|
||||
versions[index] = -1
|
||||
// Once the file is not found then the epoch is current time.
|
||||
modTimes[index] = timeNow
|
||||
}
|
||||
}
|
||||
return versions
|
||||
return modTimes
|
||||
}
|
||||
|
||||
// Reads all `xl.json` metadata as a xlMetaV1 slice.
|
||||
|
@ -100,23 +132,22 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
|
|||
// - slice returing readable disks.
|
||||
// - xlMetaV1
|
||||
// - bool value indicating if healing is needed.
|
||||
// - error if any.
|
||||
func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, version int64, err error) {
|
||||
func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) {
|
||||
onlineDisks = make([]StorageAPI, len(xl.storageDisks))
|
||||
|
||||
// List all the file versions from partsMetadata list.
|
||||
versions := listObjectVersions(partsMetadata, errs)
|
||||
// List all the file commit ids from parts metadata.
|
||||
modTimes := listObjectModtimes(partsMetadata, errs)
|
||||
|
||||
// Get highest object version.
|
||||
highestVersion := highestInt(versions, int64(1))
|
||||
// Reduce list of UUIDs to a single common value.
|
||||
modTime = commonTime(modTimes)
|
||||
|
||||
// Pick online disks with version set to highestVersion.
|
||||
for index, version := range versions {
|
||||
if version == highestVersion {
|
||||
// Create a new online disks slice, which have common uuid.
|
||||
for index, t := range modTimes {
|
||||
if t == modTime {
|
||||
onlineDisks[index] = xl.storageDisks[index]
|
||||
} else {
|
||||
onlineDisks[index] = nil
|
||||
}
|
||||
}
|
||||
return onlineDisks, highestVersion, nil
|
||||
return onlineDisks, modTime
|
||||
}
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// validates functionality provided to find most common
|
||||
// time occurrence from a list of time.
|
||||
func TestCommonTime(t *testing.T) {
|
||||
// List of test cases for common modTime.
|
||||
testCases := []struct {
|
||||
times []time.Time
|
||||
time time.Time
|
||||
}{
|
||||
{
|
||||
// 1. Tests common times when slice has varying time elements.
|
||||
[]time.Time{
|
||||
time.Unix(0, 1).UTC(),
|
||||
time.Unix(0, 2).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 2).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 1).UTC(),
|
||||
}, time.Unix(0, 3).UTC(),
|
||||
},
|
||||
{
|
||||
// 2. Tests common time obtained when all elements are equal.
|
||||
[]time.Time{
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
}, time.Unix(0, 3).UTC(),
|
||||
},
|
||||
{
|
||||
// 3. Tests common time obtained when elements have a mixture
|
||||
// of sentinel values.
|
||||
[]time.Time{
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 2).UTC(),
|
||||
time.Unix(0, 1).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
time.Unix(0, 4).UTC(),
|
||||
time.Unix(0, 3).UTC(),
|
||||
timeSentinel,
|
||||
timeSentinel,
|
||||
timeSentinel,
|
||||
}, time.Unix(0, 3).UTC(),
|
||||
},
|
||||
}
|
||||
|
||||
// Tests all the testcases, and validates them against expected
|
||||
// common modtime. Tests fail if modtime does not match.
|
||||
for i, testCase := range testCases {
|
||||
// Obtain a common mod time from modTimes slice.
|
||||
ctime := commonTime(testCase.times)
|
||||
if testCase.time != ctime {
|
||||
t.Fatalf("Test case %d, expect to pass but failed. Wanted modTime: %s, got modTime: %s\n", i+1, testCase.time, ctime)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -87,7 +87,6 @@ func pickValidErasureInfo(eInfos []erasureInfo) erasureInfo {
|
|||
type statInfo struct {
|
||||
Size int64 `json:"size"` // Size of the object `xl.json`.
|
||||
ModTime time.Time `json:"modTime"` // ModTime of the object `xl.json`.
|
||||
Version int64 `json:"version"` // Version of the object `xl.json`, useful to calculate quorum.
|
||||
}
|
||||
|
||||
// A xlMetaV1 represents `xl.json` metadata header.
|
||||
|
|
|
@ -266,7 +266,6 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
|||
meta["content-type"] = contentType
|
||||
}
|
||||
xlMeta.Stat.ModTime = time.Now().UTC()
|
||||
xlMeta.Stat.Version = 1
|
||||
xlMeta.Meta = meta
|
||||
|
||||
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
|
||||
|
@ -355,13 +354,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
|||
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
|
||||
|
||||
// List all online disks.
|
||||
onlineDisks, _, err := xl.listOnlineDisks(partsMetadata, errs)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs)
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
xlMeta := pickValidXLMeta(partsMetadata)
|
||||
|
||||
// Need a unique name for the part being written in minioMetaBucket to
|
||||
// accommodate concurrent PutObjectPart requests
|
||||
|
||||
partSuffix := fmt.Sprintf("part.%d", partID)
|
||||
tmpSuffix := getUUID()
|
||||
tmpPartPath := path.Join(tmpMetaPrefix, tmpSuffix)
|
||||
|
@ -434,8 +434,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
|||
}
|
||||
|
||||
// Read metadata (again) associated with the object from all disks.
|
||||
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket,
|
||||
uploadIDPath)
|
||||
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath)
|
||||
if !isQuorum(errs, xl.writeQuorum) {
|
||||
return "", toObjectErr(errXLWriteQuorum, bucket, object)
|
||||
}
|
||||
|
@ -448,7 +447,6 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
|||
var checksums []checkSumInfo
|
||||
for index, eInfo := range newEInfos {
|
||||
if eInfo.IsValid() {
|
||||
|
||||
// Use a map to find union of checksums of parts that
|
||||
// we concurrently written and committed before this
|
||||
// part. N B For a different, concurrent upload of the
|
||||
|
@ -474,21 +472,13 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
|||
}
|
||||
|
||||
// Pick one from the first valid metadata.
|
||||
xlMeta := pickValidXLMeta(partsMetadata)
|
||||
xlMeta = pickValidXLMeta(partsMetadata)
|
||||
|
||||
// Get current highest version based on re-read partsMetadata.
|
||||
_, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Increment version only if we have online disks less than configured storage disks.
|
||||
if diskCount(onlineDisks) < len(xl.storageDisks) {
|
||||
higherVersion++
|
||||
}
|
||||
onlineDisks, _ = xl.listOnlineDisks(partsMetadata, errs)
|
||||
|
||||
// Once part is successfully committed, proceed with updating XL metadata.
|
||||
xlMeta.Stat.Version = higherVersion
|
||||
xlMeta.Stat.ModTime = time.Now().UTC()
|
||||
|
||||
// Add the current part.
|
||||
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
|
||||
|
|
|
@ -73,15 +73,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
|||
}
|
||||
|
||||
// List all online disks.
|
||||
onlineDisks, highestVersion, err := xl.listOnlineDisks(metaArr, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
onlineDisks, modTime := xl.listOnlineDisks(metaArr, errs)
|
||||
|
||||
// Pick latest valid metadata.
|
||||
var xlMeta xlMetaV1
|
||||
for _, meta := range metaArr {
|
||||
if meta.IsValid() && meta.Stat.Version == highestVersion {
|
||||
if meta.IsValid() && meta.Stat.ModTime == modTime {
|
||||
xlMeta = meta
|
||||
break
|
||||
}
|
||||
|
@ -386,15 +383,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
|||
}
|
||||
|
||||
// List all online disks.
|
||||
onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
|
||||
if err != nil {
|
||||
return "", toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Increment version only if we have online disks less than configured storage disks.
|
||||
if diskCount(onlineDisks) < len(xl.storageDisks) {
|
||||
higherVersion++
|
||||
}
|
||||
onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs)
|
||||
|
||||
var mw io.Writer
|
||||
// Initialize md5 writer.
|
||||
|
@ -513,7 +502,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
|||
xlMeta.Meta = metadata
|
||||
xlMeta.Stat.Size = size
|
||||
xlMeta.Stat.ModTime = modTime
|
||||
xlMeta.Stat.Version = higherVersion
|
||||
|
||||
// Add the final part.
|
||||
xlMeta.AddObjectPart(1, "part.1", newMD5Hex, xlMeta.Stat.Size)
|
||||
|
||||
|
|
|
@ -78,14 +78,14 @@ func reduceErrs(errs []error) error {
|
|||
|
||||
// Validates if we have quorum based on the errors with errDiskNotFound.
|
||||
func isQuorum(errs []error, minQuorumCount int) bool {
|
||||
var diskFoundCount int
|
||||
var errCount int
|
||||
for _, err := range errs {
|
||||
if err == errDiskNotFound {
|
||||
if err == errDiskNotFound || err == errFaultyDisk || err == errDiskAccessDenied {
|
||||
continue
|
||||
}
|
||||
diskFoundCount++
|
||||
errCount++
|
||||
}
|
||||
return diskFoundCount >= minQuorumCount
|
||||
return errCount >= minQuorumCount
|
||||
}
|
||||
|
||||
// Similar to 'len(slice)' but returns the actual elements count
|
||||
|
|
Loading…
Reference in New Issue