mirror of
https://github.com/minio/minio.git
synced 2025-11-10 14:09:48 -05:00
skip inconsistent shards if possible (#13945)
data shards were wrong due to a healing bug reported in #13803 mainly with unaligned object sizes. This PR is an attempt to automatically avoid these shards, with available information about the `xl.meta` and actually disk mtime.
This commit is contained in:
@@ -26,29 +26,59 @@ import (
|
||||
)
|
||||
|
||||
// commonTime returns a maximally occurring time from a list of time.
|
||||
func commonTime(modTimes []time.Time) (modTime time.Time) {
|
||||
timeOccurenceMap := make(map[int64]int, len(modTimes))
|
||||
func commonTimeAndOccurence(times []time.Time, group time.Duration) (maxTime time.Time, maxima int) {
|
||||
timeOccurenceMap := make(map[int64]int, len(times))
|
||||
groupNano := group.Nanoseconds()
|
||||
// Ignore the uuid sentinel and count the rest.
|
||||
for _, t := range modTimes {
|
||||
for _, t := range times {
|
||||
if t.Equal(timeSentinel) {
|
||||
continue
|
||||
}
|
||||
timeOccurenceMap[t.UnixNano()]++
|
||||
nano := t.UnixNano()
|
||||
if group > 0 {
|
||||
for k := range timeOccurenceMap {
|
||||
if k == nano {
|
||||
// We add to ourself later
|
||||
continue
|
||||
}
|
||||
diff := k - nano
|
||||
if diff < 0 {
|
||||
diff = -diff
|
||||
}
|
||||
// We are within the limit
|
||||
if diff < groupNano {
|
||||
timeOccurenceMap[k]++
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add ourself...
|
||||
timeOccurenceMap[nano]++
|
||||
}
|
||||
|
||||
var maxima int // Counter for remembering max occurrence of elements.
|
||||
maxima = 0 // Counter for remembering max occurrence of elements.
|
||||
latest := int64(0)
|
||||
|
||||
// Find the common cardinality from previously collected
|
||||
// occurrences of elements.
|
||||
for nano, count := range timeOccurenceMap {
|
||||
t := time.Unix(0, nano).UTC()
|
||||
if count > maxima || (count == maxima && t.After(modTime)) {
|
||||
if count < maxima {
|
||||
continue
|
||||
}
|
||||
|
||||
// We are at or above maxima
|
||||
if count > maxima || nano > latest {
|
||||
maxima = count
|
||||
modTime = t
|
||||
latest = nano
|
||||
}
|
||||
}
|
||||
|
||||
// Return the collected common modTime.
|
||||
// Return the collected common max time, with maxima
|
||||
return time.Unix(0, latest).UTC(), maxima
|
||||
}
|
||||
|
||||
// commonTime returns a maximally occurring time from a list of time.
|
||||
func commonTime(modTimes []time.Time) (modTime time.Time) {
|
||||
modTime, _ = commonTimeAndOccurence(modTimes, 0)
|
||||
return modTime
|
||||
}
|
||||
|
||||
@@ -88,6 +118,19 @@ func filterOnlineDisksInplace(fi FileInfo, partsMetadata []FileInfo, onlineDisks
|
||||
}
|
||||
}
|
||||
|
||||
// Extracts list of disk mtimes from FileInfo slice and returns, skips
|
||||
// slice elements that have errors.
|
||||
func listObjectDiskMtimes(partsMetadata []FileInfo) (diskMTimes []time.Time) {
|
||||
diskMTimes = bootModtimes(len(partsMetadata))
|
||||
for index, metadata := range partsMetadata {
|
||||
if metadata.IsValid() {
|
||||
// Once the file is found, save the disk mtime saved on disk.
|
||||
diskMTimes[index] = metadata.DiskMTime
|
||||
}
|
||||
}
|
||||
return diskMTimes
|
||||
}
|
||||
|
||||
// Notes:
|
||||
// There are 5 possible states a disk could be in,
|
||||
// 1. __online__ - has the latest copy of xl.meta - returned by listOnlineDisks
|
||||
@@ -185,6 +228,13 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
errs []error, latestMeta FileInfo,
|
||||
bucket, object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
|
||||
|
||||
var diskMTime time.Time
|
||||
delta := 5 * time.Second
|
||||
if !latestMeta.DataShardFixed() {
|
||||
diskMTime = pickValidDiskTimeWithQuorum(partsMetadata,
|
||||
latestMeta.Erasure.DataBlocks)
|
||||
}
|
||||
|
||||
availableDisks := make([]StorageAPI, len(onlineDisks))
|
||||
dataErrs := make([]error, len(onlineDisks))
|
||||
inconsistent := 0
|
||||
@@ -289,6 +339,14 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
}
|
||||
}
|
||||
|
||||
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
|
||||
if !partsMetadata[i].AcceptableDelta(diskMTime, delta) {
|
||||
// not with in acceptable delta, skip.
|
||||
partsMetadata[i] = FileInfo{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if dataErrs[i] == nil {
|
||||
// All parts verified, mark it as all data available.
|
||||
availableDisks[i] = onlineDisk
|
||||
|
||||
@@ -24,12 +24,38 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/minio/internal/sync/errgroup"
|
||||
)
|
||||
|
||||
const reservedMetadataPrefixLowerDataShardFix = ReservedMetadataPrefixLower + "data-shard-fix"
|
||||
|
||||
// AcceptableDelta returns 'true' if the fi.DiskMTime is under
|
||||
// acceptable delta of "delta" duration with maxTime.
|
||||
//
|
||||
// This code is primarily used for heuristic detection of
|
||||
// incorrect shards, as per https://github.com/minio/minio/pull/13803
|
||||
//
|
||||
// This check only is active if we could find maximally
|
||||
// occurring disk mtimes that are somewhat same across
|
||||
// the quorum. Allowing to skip those shards which we
|
||||
// might think are wrong.
|
||||
func (fi FileInfo) AcceptableDelta(maxTime time.Time, delta time.Duration) bool {
|
||||
diff := maxTime.Sub(fi.DiskMTime)
|
||||
if diff < 0 {
|
||||
diff = -diff
|
||||
}
|
||||
return diff < delta
|
||||
}
|
||||
|
||||
// DataShardFixed - data shard fixed?
|
||||
func (fi FileInfo) DataShardFixed() bool {
|
||||
return fi.Metadata[reservedMetadataPrefixLowerDataShardFix] == "true"
|
||||
}
|
||||
|
||||
// Heals a bucket if it doesn't exist on one of the disks, additionally
|
||||
// also heals the missing entries for bucket metadata files
|
||||
// `policy.json, notification.xml, listeners.json`.
|
||||
@@ -224,6 +250,11 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
|
||||
return true
|
||||
}
|
||||
if erErr == nil {
|
||||
if meta.XLV1 {
|
||||
// Legacy means heal always
|
||||
// always check first.
|
||||
return true
|
||||
}
|
||||
if !meta.Deleted && !meta.IsRemote() {
|
||||
// If xl.meta was read fine but there may be problem with the part.N files.
|
||||
if IsErr(dataErr, []error{
|
||||
@@ -234,19 +265,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
|
||||
return true
|
||||
}
|
||||
}
|
||||
if !latestMeta.MetadataEquals(meta) {
|
||||
return true
|
||||
}
|
||||
if !latestMeta.TransitionInfoEquals(meta) {
|
||||
return true
|
||||
}
|
||||
if !latestMeta.ReplicationInfoEquals(meta) {
|
||||
return true
|
||||
}
|
||||
if !latestMeta.ModTime.Equal(meta.ModTime) {
|
||||
return true
|
||||
}
|
||||
if meta.XLV1 {
|
||||
if !latestMeta.Equals(meta) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ func TestHealing(t *testing.T) {
|
||||
}
|
||||
|
||||
// After heal the meta file should be as expected.
|
||||
if !reflect.DeepEqual(fileInfoPreHeal, fileInfoPostHeal) {
|
||||
if !fileInfoPreHeal.Equals(fileInfoPostHeal) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ func TestHealing(t *testing.T) {
|
||||
}
|
||||
|
||||
// After heal the meta file should be as expected.
|
||||
if !reflect.DeepEqual(fileInfoPreHeal, fileInfoPostHeal) {
|
||||
if !fileInfoPreHeal.Equals(fileInfoPostHeal) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
|
||||
@@ -699,7 +699,8 @@ func TestHealLastDataShard(t *testing.T) {
|
||||
t.Fatalf("Failed to make a bucket - %v", err)
|
||||
}
|
||||
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
|
||||
_, err = obj.PutObject(ctx, bucket, object,
|
||||
mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -724,7 +725,9 @@ func TestHealLastDataShard(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete a file - %v", err)
|
||||
}
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{
|
||||
ScanMode: madmin.HealNormalScan,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -738,21 +741,23 @@ func TestHealLastDataShard(t *testing.T) {
|
||||
firstHealedH := sha256.New()
|
||||
_, err = io.Copy(firstHealedH, firstGr)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
t.Fatal(err)
|
||||
}
|
||||
firstHealedDataSha256 := firstHealedH.Sum(nil)
|
||||
|
||||
if !bytes.Equal(actualSha256, firstHealedDataSha256) {
|
||||
t.Fatal("object healed wrong")
|
||||
t.Fatalf("object healed wrong, expected %x, got %x",
|
||||
actualSha256, firstHealedDataSha256)
|
||||
}
|
||||
|
||||
// remove another data shard
|
||||
err = removeAll(pathJoin(shuffledDisks[1].String(), bucket, object))
|
||||
if err != nil {
|
||||
if err = removeAll(pathJoin(shuffledDisks[1].String(), bucket, object)); err != nil {
|
||||
t.Fatalf("Failed to delete a file - %v", err)
|
||||
}
|
||||
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{
|
||||
ScanMode: madmin.HealNormalScan,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -766,12 +771,13 @@ func TestHealLastDataShard(t *testing.T) {
|
||||
secondHealedH := sha256.New()
|
||||
_, err = io.Copy(secondHealedH, secondGr)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
t.Fatal(err)
|
||||
}
|
||||
secondHealedDataSha256 := secondHealedH.Sum(nil)
|
||||
|
||||
if !bytes.Equal(actualSha256, secondHealedDataSha256) {
|
||||
t.Fatal("object healed wrong")
|
||||
t.Fatalf("object healed wrong, expected %x, got %x",
|
||||
actualSha256, secondHealedDataSha256)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -351,6 +351,17 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.
|
||||
return FileInfo{}, errErasureReadQuorum
|
||||
}
|
||||
|
||||
func pickValidDiskTimeWithQuorum(metaArr []FileInfo, quorum int) time.Time {
|
||||
diskMTimes := listObjectDiskMtimes(metaArr)
|
||||
|
||||
diskMTime, diskMaxima := commonTimeAndOccurence(diskMTimes, 5*time.Second)
|
||||
if diskMaxima >= quorum {
|
||||
return diskMTime
|
||||
}
|
||||
|
||||
return timeSentinel
|
||||
}
|
||||
|
||||
// pickValidFileInfo - picks one valid FileInfo content and returns from a
|
||||
// slice of FileInfo.
|
||||
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (FileInfo, error) {
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
@@ -179,6 +180,31 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
return nil, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
if !fi.DataShardFixed() {
|
||||
diskMTime := pickValidDiskTimeWithQuorum(metaArr, fi.Erasure.DataBlocks)
|
||||
delta := 5 * time.Second
|
||||
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
|
||||
for index := range onlineDisks {
|
||||
if onlineDisks[index] == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
if !metaArr[index].IsValid() {
|
||||
continue
|
||||
}
|
||||
if !metaArr[index].AcceptableDelta(diskMTime, delta) {
|
||||
// If disk mTime mismatches it is considered outdated
|
||||
// https://github.com/minio/minio/pull/13803
|
||||
//
|
||||
// This check only is active if we could find maximally
|
||||
// occurring disk mtimes that are somewhat same across
|
||||
// the quorum. Allowing to skip those shards which we
|
||||
// might think are wrong.
|
||||
onlineDisks[index] = OfflineDisk
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
objInfo := fi.ToObjectInfo(bucket, object)
|
||||
if objInfo.DeleteMarker {
|
||||
if opts.VersionID == "" {
|
||||
@@ -1444,14 +1470,16 @@ func (er erasureObjects) addPartial(bucket, object, versionID string, size int64
|
||||
}
|
||||
|
||||
func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
// Lock the object before updating tags.
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, err
|
||||
if !opts.NoLock {
|
||||
// Lock the object before updating metadata.
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
|
||||
disks := er.getDisks()
|
||||
|
||||
|
||||
@@ -184,6 +184,25 @@ type FileInfo struct {
|
||||
// no other caller must set this value other than multi-object delete call.
|
||||
// usage in other calls in undefined please avoid.
|
||||
Idx int `msg:"i"`
|
||||
|
||||
// DiskMTime indicates the mtime of the xl.meta on disk
|
||||
// This is mainly used for detecting a particular issue
|
||||
// reported in https://github.com/minio/minio/pull/13803
|
||||
DiskMTime time.Time `msg:"dmt"`
|
||||
}
|
||||
|
||||
// Equals checks if fi(FileInfo) matches ofi(FileInfo)
|
||||
func (fi FileInfo) Equals(ofi FileInfo) (ok bool) {
|
||||
if !fi.MetadataEquals(ofi) {
|
||||
return false
|
||||
}
|
||||
if !fi.ReplicationInfoEquals(ofi) {
|
||||
return false
|
||||
}
|
||||
if !fi.TransitionInfoEquals(ofi) {
|
||||
return false
|
||||
}
|
||||
return fi.ModTime.Equal(ofi.ModTime)
|
||||
}
|
||||
|
||||
// GetDataDir returns an expected dataDir given FileInfo
|
||||
|
||||
@@ -550,8 +550,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
if zb0001 != 25 {
|
||||
err = msgp.ArrayError{Wanted: 25, Got: zb0001}
|
||||
if zb0001 != 26 {
|
||||
err = msgp.ArrayError{Wanted: 26, Got: zb0001}
|
||||
return
|
||||
}
|
||||
z.Volume, err = dc.ReadString()
|
||||
@@ -716,13 +716,18 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err, "Idx")
|
||||
return
|
||||
}
|
||||
z.DiskMTime, err = dc.ReadTime()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "DiskMTime")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// array header, size 25
|
||||
err = en.Append(0xdc, 0x0, 0x19)
|
||||
// array header, size 26
|
||||
err = en.Append(0xdc, 0x0, 0x1a)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -870,14 +875,19 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "Idx")
|
||||
return
|
||||
}
|
||||
err = en.WriteTime(z.DiskMTime)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "DiskMTime")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// array header, size 25
|
||||
o = append(o, 0xdc, 0x0, 0x19)
|
||||
// array header, size 26
|
||||
o = append(o, 0xdc, 0x0, 0x1a)
|
||||
o = msgp.AppendString(o, z.Volume)
|
||||
o = msgp.AppendString(o, z.Name)
|
||||
o = msgp.AppendString(o, z.VersionID)
|
||||
@@ -922,6 +932,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.AppendTime(o, z.SuccessorModTime)
|
||||
o = msgp.AppendBool(o, z.Fresh)
|
||||
o = msgp.AppendInt(o, z.Idx)
|
||||
o = msgp.AppendTime(o, z.DiskMTime)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -933,8 +944,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
if zb0001 != 25 {
|
||||
err = msgp.ArrayError{Wanted: 25, Got: zb0001}
|
||||
if zb0001 != 26 {
|
||||
err = msgp.ArrayError{Wanted: 26, Got: zb0001}
|
||||
return
|
||||
}
|
||||
z.Volume, bts, err = msgp.ReadStringBytes(bts)
|
||||
@@ -1099,6 +1110,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "Idx")
|
||||
return
|
||||
}
|
||||
z.DiskMTime, bts, err = msgp.ReadTimeBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "DiskMTime")
|
||||
return
|
||||
}
|
||||
o = bts
|
||||
return
|
||||
}
|
||||
@@ -1116,7 +1132,7 @@ func (z *FileInfo) Msgsize() (s int) {
|
||||
for za0003 := range z.Parts {
|
||||
s += z.Parts[za0003].Msgsize()
|
||||
}
|
||||
s += z.Erasure.Msgsize() + msgp.BoolSize + z.ReplicationState.Msgsize() + msgp.BytesPrefixSize + len(z.Data) + msgp.IntSize + msgp.TimeSize + msgp.BoolSize + msgp.IntSize
|
||||
s += z.Erasure.Msgsize() + msgp.BoolSize + z.ReplicationState.Msgsize() + msgp.BytesPrefixSize + len(z.Data) + msgp.IntSize + msgp.TimeSize + msgp.BoolSize + msgp.IntSize + msgp.TimeSize
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v42" // Added FreeVersions to FileInfoVersions
|
||||
storageRESTVersion = "v43" // Added DiskMTime field for FileInfo
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
||||
@@ -375,32 +375,39 @@ func (s *xlStorage) Healing() *healingTracker {
|
||||
return &h
|
||||
}
|
||||
|
||||
func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) {
|
||||
// readsMetadata and returns disk mTime information for xl.meta
|
||||
func (s *xlStorage) readMetadataWithDMTime(ctx context.Context, itemPath string) ([]byte, time.Time, error) {
|
||||
if contextCanceled(ctx) {
|
||||
return nil, ctx.Err()
|
||||
return nil, time.Time{}, ctx.Err()
|
||||
}
|
||||
|
||||
if err := checkPathLength(itemPath); err != nil {
|
||||
return nil, err
|
||||
return nil, time.Time{}, err
|
||||
}
|
||||
|
||||
f, err := OpenFile(itemPath, readMode, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, time.Time{}, err
|
||||
}
|
||||
defer f.Close()
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, time.Time{}, err
|
||||
}
|
||||
if stat.IsDir() {
|
||||
return nil, &os.PathError{
|
||||
return nil, time.Time{}, &os.PathError{
|
||||
Op: "open",
|
||||
Path: itemPath,
|
||||
Err: syscall.EISDIR,
|
||||
}
|
||||
}
|
||||
return readXLMetaNoData(f, stat.Size())
|
||||
buf, err := readXLMetaNoData(f, stat.Size())
|
||||
return buf, stat.ModTime().UTC(), err
|
||||
}
|
||||
|
||||
func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) {
|
||||
buf, _, err := s.readMetadataWithDMTime(ctx, itemPath)
|
||||
return buf, err
|
||||
}
|
||||
|
||||
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
|
||||
@@ -1214,11 +1221,18 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
// Validate file path length, before reading.
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return fi, err
|
||||
}
|
||||
|
||||
var buf []byte
|
||||
var dmTime time.Time
|
||||
if readData {
|
||||
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFile))
|
||||
} else {
|
||||
buf, err = s.readMetadata(ctx, pathJoin(volumeDir, path, xlStorageFormatFile))
|
||||
buf, dmTime, err = s.readMetadataWithDMTime(ctx, pathJoin(filePath, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if osIsNotExist(err) {
|
||||
if aerr := Access(volumeDir); aerr != nil && osIsNotExist(aerr) {
|
||||
@@ -1231,7 +1245,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
|
||||
buf, dmTime, err = s.readAllData(ctx, volumeDir, pathJoin(filePath, xlStorageFormatFileV1))
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
if versionID != "" {
|
||||
@@ -1257,6 +1271,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
fi.DiskMTime = dmTime
|
||||
|
||||
if len(fi.Data) == 0 {
|
||||
// We did not read inline data, so we have no references.
|
||||
@@ -1299,7 +1314,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
len(fi.Parts) == 1 {
|
||||
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
|
||||
dataPath := pathJoin(volumeDir, path, fi.DataDir, partPath)
|
||||
fi.Data, err = s.readAllData(volumeDir, dataPath)
|
||||
fi.Data, _, err = s.readAllData(ctx, volumeDir, dataPath)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
@@ -1309,38 +1324,42 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) readAllData(volumeDir string, filePath string) (buf []byte, err error) {
|
||||
func (s *xlStorage) readAllData(ctx context.Context, volumeDir string, filePath string) (buf []byte, dmTime time.Time, err error) {
|
||||
if contextCanceled(ctx) {
|
||||
return nil, time.Time{}, ctx.Err()
|
||||
}
|
||||
|
||||
f, err := OpenFileDirectIO(filePath, readMode, 0666)
|
||||
if err != nil {
|
||||
if osIsNotExist(err) {
|
||||
// Check if the object doesn't exist because its bucket
|
||||
// is missing in order to return the correct error.
|
||||
if err = Access(volumeDir); err != nil && osIsNotExist(err) {
|
||||
return nil, errVolumeNotFound
|
||||
return nil, dmTime, errVolumeNotFound
|
||||
}
|
||||
return nil, errFileNotFound
|
||||
return nil, dmTime, errFileNotFound
|
||||
} else if osIsPermission(err) {
|
||||
return nil, errFileAccessDenied
|
||||
return nil, dmTime, errFileAccessDenied
|
||||
} else if isSysErrNotDir(err) || isSysErrIsDir(err) {
|
||||
return nil, errFileNotFound
|
||||
return nil, dmTime, errFileNotFound
|
||||
} else if isSysErrHandleInvalid(err) {
|
||||
// This case is special and needs to be handled for windows.
|
||||
return nil, errFileNotFound
|
||||
return nil, dmTime, errFileNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
return nil, dmTime, errFaultyDisk
|
||||
} else if isSysErrTooManyFiles(err) {
|
||||
return nil, errTooManyOpenFiles
|
||||
return nil, dmTime, errTooManyOpenFiles
|
||||
} else if isSysErrInvalidArg(err) {
|
||||
st, _ := Lstat(filePath)
|
||||
if st != nil && st.IsDir() {
|
||||
// Linux returns InvalidArg for directory O_DIRECT
|
||||
// we need to keep this fallback code to return correct
|
||||
// errors upwards.
|
||||
return nil, errFileNotFound
|
||||
return nil, dmTime, errFileNotFound
|
||||
}
|
||||
return nil, errUnsupportedDisk
|
||||
return nil, dmTime, errUnsupportedDisk
|
||||
}
|
||||
return nil, err
|
||||
return nil, dmTime, err
|
||||
}
|
||||
r := &xioutil.ODirectReader{
|
||||
File: f,
|
||||
@@ -1352,10 +1371,10 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string) (buf []byte,
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
buf, err = ioutil.ReadAll(r)
|
||||
return buf, osErrToFileErr(err)
|
||||
return buf, dmTime, osErrToFileErr(err)
|
||||
}
|
||||
if stat.IsDir() {
|
||||
return nil, errFileNotFound
|
||||
return nil, dmTime, errFileNotFound
|
||||
}
|
||||
|
||||
// Read into appropriate buffer.
|
||||
@@ -1369,7 +1388,7 @@ func (s *xlStorage) readAllData(volumeDir string, filePath string) (buf []byte,
|
||||
// Read file...
|
||||
_, err = io.ReadFull(r, buf)
|
||||
|
||||
return buf, osErrToFileErr(err)
|
||||
return buf, stat.ModTime().UTC(), osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// ReadAll reads from r until an error or EOF and returns the data it read.
|
||||
@@ -1390,7 +1409,8 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.readAllData(volumeDir, filePath)
|
||||
buf, _, err = s.readAllData(ctx, volumeDir, filePath)
|
||||
return buf, err
|
||||
}
|
||||
|
||||
// ReadFile reads exactly len(buf) bytes into buf. It returns the
|
||||
|
||||
Reference in New Issue
Block a user