mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
Inline small file data in xl.meta file (#11758)
This commit is contained in:
parent
f5831174e6
commit
2623338dc5
@ -25,6 +25,7 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/ioutil"
|
||||
)
|
||||
|
||||
type errHashMismatch struct {
|
||||
@ -38,6 +39,7 @@ func (err *errHashMismatch) Error() string {
|
||||
// Calculates bitrot in chunks and writes the hash into the stream.
|
||||
type streamingBitrotWriter struct {
|
||||
iow io.WriteCloser
|
||||
closeWithErr func(err error) error
|
||||
h hash.Hash
|
||||
shardSize int64
|
||||
canClose chan struct{} // Needed to avoid race explained in Close() call.
|
||||
@ -66,16 +68,24 @@ func (b *streamingBitrotWriter) Close() error {
|
||||
// 2) pipe.Close()
|
||||
// Now pipe.Close() can return before the data is read on the other end of the pipe and written to the disk
|
||||
// Hence an immediate Read() on the file can return incorrect data.
|
||||
if b.canClose != nil {
|
||||
<-b.canClose
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Returns streaming bitrot writer implementation.
|
||||
func newStreamingBitrotWriterBuffer(w io.Writer, algo BitrotAlgorithm, shardSize int64) io.WriteCloser {
|
||||
return &streamingBitrotWriter{iow: ioutil.NopCloser(w), h: algo.New(), shardSize: shardSize, canClose: nil}
|
||||
}
|
||||
|
||||
// Returns streaming bitrot writer implementation.
|
||||
func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer {
|
||||
r, w := io.Pipe()
|
||||
h := algo.New()
|
||||
|
||||
bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{})}
|
||||
bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: make(chan struct{})}
|
||||
|
||||
go func() {
|
||||
totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1)
|
||||
if length != -1 {
|
||||
@ -123,7 +133,7 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
|
||||
// For the first ReadAt() call we need to open the stream for reading.
|
||||
b.currOffset = offset
|
||||
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset
|
||||
if len(b.data) == 0 {
|
||||
if len(b.data) == 0 && b.tillOffset != streamOffset {
|
||||
b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset)
|
||||
} else {
|
||||
b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset)
|
||||
@ -161,15 +171,13 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
|
||||
func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader {
|
||||
h := algo.New()
|
||||
return &streamingBitrotReader{
|
||||
disk,
|
||||
data,
|
||||
nil,
|
||||
volume,
|
||||
filePath,
|
||||
ceilFrac(tillOffset, shardSize)*int64(h.Size()) + tillOffset,
|
||||
0,
|
||||
h,
|
||||
shardSize,
|
||||
make([]byte, h.Size()),
|
||||
disk: disk,
|
||||
data: data,
|
||||
volume: volume,
|
||||
filePath: filePath,
|
||||
tillOffset: ceilFrac(tillOffset, shardSize)*int64(h.Size()) + tillOffset,
|
||||
h: h,
|
||||
shardSize: shardSize,
|
||||
hashBytes: make([]byte, h.Size()),
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"hash"
|
||||
@ -143,3 +144,54 @@ func bitrotShardFileSize(size int64, shardSize int64, algo BitrotAlgorithm) int6
|
||||
}
|
||||
return ceilFrac(size, shardSize)*int64(algo.New().Size()) + size
|
||||
}
|
||||
|
||||
// bitrotVerify a single stream of data.
|
||||
func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, want []byte, shardSize int64) error {
|
||||
if algo != HighwayHash256S {
|
||||
h := algo.New()
|
||||
if n, err := io.Copy(h, r); err != nil || n != wantSize {
|
||||
// Premature failure in reading the object, file is corrupt.
|
||||
return errFileCorrupt
|
||||
}
|
||||
if !bytes.Equal(h.Sum(nil), want) {
|
||||
return errFileCorrupt
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
h := algo.New()
|
||||
hashBuf := make([]byte, h.Size())
|
||||
buf := make([]byte, shardSize)
|
||||
left := wantSize
|
||||
|
||||
// Calculate the size of the bitrot file and compare
|
||||
// it with the actual file size.
|
||||
if left != bitrotShardFileSize(partSize, shardSize, algo) {
|
||||
return errFileCorrupt
|
||||
}
|
||||
|
||||
for left > 0 {
|
||||
// Read expected hash...
|
||||
h.Reset()
|
||||
n, err := io.ReadFull(r, hashBuf)
|
||||
if err != nil {
|
||||
// Read's failed for object with right size, file is corrupt.
|
||||
return err
|
||||
}
|
||||
// Subtract hash length..
|
||||
left -= int64(n)
|
||||
if left < shardSize {
|
||||
shardSize = left
|
||||
}
|
||||
read, err := io.CopyBuffer(h, io.LimitReader(r, shardSize), buf)
|
||||
if err != nil {
|
||||
// Read's failed for object with right size, at different offsets.
|
||||
return err
|
||||
}
|
||||
left -= read
|
||||
if !bytes.Equal(h.Sum(nil), hashBuf) {
|
||||
return errFileCorrupt
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ func TestErasureEncode(t *testing.T) {
|
||||
case *wholeBitrotWriter:
|
||||
w.disk = badDisk{nil}
|
||||
case *streamingBitrotWriter:
|
||||
w.iow.(*io.PipeWriter).CloseWithError(errFaultyDisk)
|
||||
w.closeWithErr(errFaultyDisk)
|
||||
}
|
||||
}
|
||||
if test.offDisks > 0 {
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
@ -198,8 +199,8 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
dataErrs[i] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
if erasureDistributionReliable {
|
||||
meta := partsMetadata[i]
|
||||
if erasureDistributionReliable {
|
||||
if !meta.IsValid() {
|
||||
continue
|
||||
}
|
||||
@ -221,6 +222,21 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
|
||||
}
|
||||
}
|
||||
|
||||
// Always check data, if we got it.
|
||||
if len(meta.Data) > 0 || meta.Size == 0 {
|
||||
checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number)
|
||||
dataErrs[i] = bitrotVerify(bytes.NewBuffer(meta.Data),
|
||||
int64(len(meta.Data)),
|
||||
meta.Erasure.ShardFileSize(meta.Size),
|
||||
checksumInfo.Algorithm,
|
||||
checksumInfo.Hash, meta.Erasure.ShardSize())
|
||||
if dataErrs[i] == nil {
|
||||
// All parts verified, mark it as all data available.
|
||||
availableDisks[i] = onlineDisk
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
switch scanMode {
|
||||
case madmin.HealDeepScan:
|
||||
// disk has a valid xl.meta but may not have all the
|
||||
|
@ -177,10 +177,12 @@ func TestListOnlineDisks(t *testing.T) {
|
||||
}
|
||||
|
||||
object := "object"
|
||||
data := bytes.Repeat([]byte("a"), 1024)
|
||||
data := bytes.Repeat([]byte("a"), smallFileThreshold*2)
|
||||
z := obj.(*erasureServerPools)
|
||||
erasureDisks := z.serverPools[0].sets[0].getDisks()
|
||||
for i, test := range testCases {
|
||||
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
|
||||
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to putObject %v", err)
|
||||
@ -255,7 +257,189 @@ func TestListOnlineDisks(t *testing.T) {
|
||||
i+1, erasureDisks[tamperedIndex])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestListOnlineDisksSmallObjects - checks if listOnlineDisks and outDatedDisks
|
||||
// are consistent with each other.
|
||||
func TestListOnlineDisksSmallObjects(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
obj, disks, err := prepareErasure16(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Prepare Erasure backend failed - %v", err)
|
||||
}
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(disks)
|
||||
|
||||
type tamperKind int
|
||||
const (
|
||||
noTamper tamperKind = iota
|
||||
deletePart tamperKind = iota
|
||||
corruptPart tamperKind = iota
|
||||
)
|
||||
timeSentinel := time.Unix(1, 0).UTC()
|
||||
threeNanoSecs := time.Unix(3, 0).UTC()
|
||||
fourNanoSecs := time.Unix(4, 0).UTC()
|
||||
modTimesThreeNone := []time.Time{
|
||||
threeNanoSecs, threeNanoSecs, threeNanoSecs, threeNanoSecs,
|
||||
threeNanoSecs, threeNanoSecs, threeNanoSecs,
|
||||
timeSentinel, timeSentinel, timeSentinel, timeSentinel,
|
||||
timeSentinel, timeSentinel, timeSentinel, timeSentinel,
|
||||
timeSentinel,
|
||||
}
|
||||
modTimesThreeFour := []time.Time{
|
||||
threeNanoSecs, threeNanoSecs, threeNanoSecs, threeNanoSecs,
|
||||
threeNanoSecs, threeNanoSecs, threeNanoSecs, threeNanoSecs,
|
||||
fourNanoSecs, fourNanoSecs, fourNanoSecs, fourNanoSecs,
|
||||
fourNanoSecs, fourNanoSecs, fourNanoSecs, fourNanoSecs,
|
||||
}
|
||||
testCases := []struct {
|
||||
modTimes []time.Time
|
||||
expectedTime time.Time
|
||||
errs []error
|
||||
_tamperBackend tamperKind
|
||||
}{
|
||||
{
|
||||
modTimes: modTimesThreeFour,
|
||||
expectedTime: fourNanoSecs,
|
||||
errs: []error{
|
||||
nil, nil, nil, nil, nil, nil, nil, nil, nil,
|
||||
nil, nil, nil, nil, nil, nil, nil,
|
||||
},
|
||||
_tamperBackend: noTamper,
|
||||
},
|
||||
{
|
||||
modTimes: modTimesThreeNone,
|
||||
expectedTime: threeNanoSecs,
|
||||
errs: []error{
|
||||
// Disks that have a valid xl.meta.
|
||||
nil, nil, nil, nil, nil, nil, nil,
|
||||
// Majority of disks don't have xl.meta.
|
||||
errFileNotFound, errFileNotFound,
|
||||
errFileNotFound, errFileNotFound,
|
||||
errFileNotFound, errDiskAccessDenied,
|
||||
errDiskNotFound, errFileNotFound,
|
||||
errFileNotFound,
|
||||
},
|
||||
_tamperBackend: deletePart,
|
||||
},
|
||||
{
|
||||
modTimes: modTimesThreeNone,
|
||||
expectedTime: threeNanoSecs,
|
||||
errs: []error{
|
||||
// Disks that have a valid xl.meta.
|
||||
nil, nil, nil, nil, nil, nil, nil,
|
||||
// Majority of disks don't have xl.meta.
|
||||
errFileNotFound, errFileNotFound,
|
||||
errFileNotFound, errFileNotFound,
|
||||
errFileNotFound, errDiskAccessDenied,
|
||||
errDiskNotFound, errFileNotFound,
|
||||
errFileNotFound,
|
||||
},
|
||||
_tamperBackend: corruptPart,
|
||||
},
|
||||
}
|
||||
|
||||
bucket := "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a bucket %v", err)
|
||||
}
|
||||
|
||||
object := "object"
|
||||
data := bytes.Repeat([]byte("a"), smallFileThreshold/2)
|
||||
z := obj.(*erasureServerPools)
|
||||
erasureDisks := z.serverPools[0].sets[0].getDisks()
|
||||
for i, test := range testCases {
|
||||
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
|
||||
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to putObject %v", err)
|
||||
}
|
||||
|
||||
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
||||
_, err := getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||
}
|
||||
|
||||
for j := range partsMetadata {
|
||||
if errs[j] != nil {
|
||||
t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j])
|
||||
}
|
||||
partsMetadata[j].ModTime = test.modTimes[j]
|
||||
}
|
||||
|
||||
if erasureDisks, err = writeUniqueFileInfo(ctx, erasureDisks, bucket, object, partsMetadata, diskCount(erasureDisks)); err != nil {
|
||||
t.Fatal(ctx, err)
|
||||
}
|
||||
|
||||
tamperedIndex := -1
|
||||
switch test._tamperBackend {
|
||||
case deletePart:
|
||||
for index, err := range test.errs {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// Remove a part from a disk
|
||||
// which has a valid xl.meta,
|
||||
// and check if that disk
|
||||
// appears in outDatedDisks.
|
||||
tamperedIndex = index
|
||||
dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
||||
if dErr != nil {
|
||||
t.Fatalf("Test %d: Failed to delete %s - %v", i+1,
|
||||
pathJoin(object, xlStorageFormatFile), dErr)
|
||||
}
|
||||
break
|
||||
}
|
||||
case corruptPart:
|
||||
for index, err := range test.errs {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// Corrupt a part from a disk
|
||||
// which has a valid xl.meta,
|
||||
// and check if that disk
|
||||
// appears in outDatedDisks.
|
||||
tamperedIndex = index
|
||||
filePath := pathJoin(erasureDisks[index].String(), bucket, object, xlStorageFormatFile)
|
||||
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_SYNC, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open %s: %s\n", filePath, err)
|
||||
}
|
||||
f.Write([]byte("oops")) // Will cause bitrot error
|
||||
f.Close()
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true)
|
||||
_, err = getLatestFileInfo(ctx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo %v", err)
|
||||
}
|
||||
|
||||
onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs)
|
||||
if !modTime.Equal(test.expectedTime) {
|
||||
t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v",
|
||||
i+1, test.expectedTime, modTime)
|
||||
}
|
||||
|
||||
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan)
|
||||
test.errs = newErrs
|
||||
|
||||
if test._tamperBackend != noTamper {
|
||||
if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil {
|
||||
t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data",
|
||||
i+1, erasureDisks[tamperedIndex])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -230,8 +231,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t
|
||||
}
|
||||
|
||||
// Heals an object by re-writing corrupt/missing erasure blocks.
|
||||
func (er erasureObjects) healObject(ctx context.Context, bucket string, object string,
|
||||
versionID string, partsMetadata []FileInfo, errs []error, lfi FileInfo, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
||||
func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, partsMetadata []FileInfo, errs []error, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
||||
|
||||
dryRun := opts.DryRun
|
||||
scanMode := opts.ScanMode
|
||||
@ -379,6 +379,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
dataDir = migrateDataDir
|
||||
}
|
||||
|
||||
var inlineBuffers []*bytes.Buffer
|
||||
if len(latestMeta.Parts) <= 1 && latestMeta.Size < smallFileThreshold {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
|
||||
}
|
||||
|
||||
if !latestMeta.Deleted || latestMeta.TransitionStatus != lifecycle.TransitionComplete {
|
||||
result.DataBlocks = latestMeta.Erasure.DataBlocks
|
||||
result.ParityBlocks = latestMeta.Erasure.ParityBlocks
|
||||
@ -398,6 +403,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
}
|
||||
|
||||
erasureInfo := latestMeta.Erasure
|
||||
|
||||
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
||||
partSize := latestMeta.Parts[partIndex].Size
|
||||
partActualSize := latestMeta.Parts[partIndex].ActualSize
|
||||
@ -414,7 +420,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
if latestMeta.XLV1 {
|
||||
partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber))
|
||||
}
|
||||
readers[i] = newBitrotReader(disk, nil, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
|
||||
readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
|
||||
}
|
||||
writers := make([]io.Writer, len(outDatedDisks))
|
||||
for i, disk := range outDatedDisks {
|
||||
@ -422,9 +428,14 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
continue
|
||||
}
|
||||
partPath := pathJoin(tmpID, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
if len(inlineBuffers) > 0 {
|
||||
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)))
|
||||
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
} else {
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath,
|
||||
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true)
|
||||
}
|
||||
}
|
||||
err = erasure.Heal(ctx, readers, writers, partSize)
|
||||
closeBitrotReaders(readers)
|
||||
closeBitrotWriters(writers)
|
||||
@ -453,6 +464,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
Algorithm: checksumAlgo,
|
||||
Hash: bitrotWriterSum(writers[i]),
|
||||
})
|
||||
if len(inlineBuffers) > 0 {
|
||||
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
||||
} else {
|
||||
partsMetadata[i].Data = nil
|
||||
}
|
||||
}
|
||||
|
||||
// If all disks are having errors, we give up.
|
||||
@ -462,6 +478,25 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
}
|
||||
}
|
||||
|
||||
if len(inlineBuffers) > 0 {
|
||||
// Write directly...
|
||||
if outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, bucket, object, partsMetadata, diskCount(outDatedDisks)); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return result, toObjectErr(err, bucket, object)
|
||||
}
|
||||
result.ObjectSize = latestMeta.Size
|
||||
for _, disk := range outDatedDisks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
for i, v := range result.Before.Drives {
|
||||
if v.Endpoint == disk.String() {
|
||||
result.After.Drives[i].State = madmin.DriveStateOk
|
||||
}
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpID, len(storageDisks)/2+1)
|
||||
|
||||
// Generate and write `xl.meta` generated from other disks.
|
||||
@ -830,7 +865,13 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
||||
storageEndpoints := er.getEndpoints()
|
||||
|
||||
// Read metadata files from all the disks
|
||||
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
|
||||
|
||||
// When versionID is empty, we read directly from the `null` versionID for healing.
|
||||
if versionID == "" {
|
||||
versionID = nullVersionID
|
||||
}
|
||||
|
||||
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, true)
|
||||
|
||||
if isAllNotFound(errs) {
|
||||
err = toObjectErr(errFileNotFound, bucket, object)
|
||||
@ -841,11 +882,11 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
||||
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err
|
||||
}
|
||||
|
||||
fi, err := getLatestFileInfo(healCtx, partsMetadata, errs)
|
||||
_, err = getLatestFileInfo(healCtx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||
}
|
||||
|
||||
// Heal the object.
|
||||
return er.healObject(healCtx, bucket, object, versionID, partsMetadata, errs, fi, opts)
|
||||
return er.healObject(healCtx, bucket, object, versionID, partsMetadata, errs, opts)
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -735,10 +736,21 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
}()
|
||||
|
||||
writers := make([]io.Writer, len(onlineDisks))
|
||||
dataSize := data.Size()
|
||||
var inlineBuffers []*bytes.Buffer
|
||||
if dataSize >= 0 && dataSize < smallFileThreshold {
|
||||
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
||||
}
|
||||
for i, disk := range onlineDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(inlineBuffers) > 0 {
|
||||
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(data.Size())))
|
||||
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
continue
|
||||
}
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj,
|
||||
erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false)
|
||||
}
|
||||
@ -770,6 +782,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
onlineDisks[i] = nil
|
||||
continue
|
||||
}
|
||||
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
|
||||
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
||||
}
|
||||
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize())
|
||||
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
||||
PartNumber: 1,
|
||||
@ -797,17 +812,30 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
partsMetadata[index].Metadata = opts.UserDefined
|
||||
partsMetadata[index].Size = n
|
||||
partsMetadata[index].ModTime = modTime
|
||||
if len(inlineBuffers) > 0 && inlineBuffers[index] != nil {
|
||||
partsMetadata[index].Data = inlineBuffers[index].Bytes()
|
||||
}
|
||||
}
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if len(inlineBuffers) == 0 {
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Rename the successfully written temporary object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
} else {
|
||||
// Write directly...
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, bucket, object, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Whether a disk was initially or becomes offline
|
||||
// during this upload, send it to the MRF list.
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
@ -109,6 +110,10 @@ func TestErasureDeleteObjectBasic(t *testing.T) {
|
||||
for _, test := range testCases {
|
||||
test := test
|
||||
t.Run("", func(t *testing.T) {
|
||||
_, err := xl.GetObjectInfo(ctx, "bucket", "dir/obj", ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatal("dir/obj not found before last test")
|
||||
}
|
||||
_, actualErr := xl.DeleteObject(ctx, test.bucket, test.object, ObjectOptions{})
|
||||
if test.expectedErr != nil && actualErr != test.expectedErr {
|
||||
t.Errorf("Expected to fail with %s, but failed with %s", test.expectedErr, actualErr)
|
||||
@ -462,7 +467,7 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
||||
object := "object"
|
||||
opts := ObjectOptions{}
|
||||
// Create "object" under "bucket".
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold*2)), smallFileThreshold*2, "", ""), opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -470,8 +475,8 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
||||
// Make 9 disks offline, which leaves less than quorum number of disks
|
||||
// in a 16 disk Erasure setup. The original disks are 'replaced' with
|
||||
// naughtyDisks that fail after 'f' successful StorageAPI method
|
||||
// invocations, where f - [0,3)
|
||||
for f := 0; f < 3; f++ {
|
||||
// invocations, where f - [0,4)
|
||||
for f := 0; f < 4; f++ {
|
||||
diskErrors := make(map[int]error)
|
||||
for i := 0; i <= f; i++ {
|
||||
diskErrors[i] = nil
|
||||
@ -491,13 +496,78 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
||||
}
|
||||
z.serverPools[0].erasureDisksMu.Unlock()
|
||||
// Upload new content to same object "object"
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold*2)), smallFileThreshold*2, "", ""), opts)
|
||||
if !errors.Is(err, errErasureWriteQuorum) {
|
||||
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPutObjectNoQuorumSmall(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := prepareErasure16(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Cleanup backend directories.
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureServerPools)
|
||||
xl := z.serverPools[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bucket := "bucket"
|
||||
object := "object"
|
||||
opts := ObjectOptions{}
|
||||
// Create "object" under "bucket".
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold/2)), smallFileThreshold/2, "", ""), opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Make 9 disks offline, which leaves less than quorum number of disks
|
||||
// in a 16 disk Erasure setup. The original disks are 'replaced' with
|
||||
// naughtyDisks that fail after 'f' successful StorageAPI method
|
||||
// invocations, where f - [0,2)
|
||||
for f := 0; f < 2; f++ {
|
||||
t.Run("exec-"+strconv.Itoa(f), func(t *testing.T) {
|
||||
diskErrors := make(map[int]error)
|
||||
for i := 0; i <= f; i++ {
|
||||
diskErrors[i] = nil
|
||||
}
|
||||
erasureDisks := xl.getDisks()
|
||||
for i := range erasureDisks[:9] {
|
||||
switch diskType := erasureDisks[i].(type) {
|
||||
case *naughtyDisk:
|
||||
erasureDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk)
|
||||
default:
|
||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
||||
}
|
||||
}
|
||||
z.serverPools[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
return erasureDisks
|
||||
}
|
||||
z.serverPools[0].erasureDisksMu.Unlock()
|
||||
// Upload new content to same object "object"
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold/2)), smallFileThreshold/2, "", ""), opts)
|
||||
if !errors.Is(err, errErasureWriteQuorum) {
|
||||
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestObjectQuorumFromMeta(t *testing.T) {
|
||||
ExecObjectLayerTestWithDirs(t, testObjectQuorumFromMeta)
|
||||
}
|
||||
|
@ -913,7 +913,7 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error {
|
||||
return errDiskNotFound
|
||||
}
|
||||
// Attempt to create MinIO internal buckets.
|
||||
return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket)
|
||||
return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, minioMetaTmpDeletedBucket, dataUsageBucket, minioMetaTmpBucket+"-old")
|
||||
}
|
||||
|
||||
// Initialize a new set of set formats which will be written to all disks.
|
||||
|
@ -128,7 +128,7 @@ func (e *metaCacheEntry) fileInfo(bucket string) (*FileInfo, error) {
|
||||
}, nil
|
||||
}
|
||||
if e.cached == nil {
|
||||
fi, err := getFileInfo(e.metadata, bucket, e.name, "")
|
||||
fi, err := getFileInfo(e.metadata, bucket, e.name, "", false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ type WalkDirOptions struct {
|
||||
|
||||
// WalkDir will traverse a directory and return all entries found.
|
||||
// On success a sorted meta cache stream will be returned.
|
||||
// Metadata has data stripped, if any.
|
||||
func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error {
|
||||
// Verify if volume is valid and it exists.
|
||||
volumeDir, err := s.getVolDir(opts.Bucket)
|
||||
@ -94,7 +95,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
// behavior.
|
||||
out <- metaCacheEntry{
|
||||
name: opts.BaseDir,
|
||||
metadata: metadata,
|
||||
metadata: xlMetaV2TrimData(metadata),
|
||||
}
|
||||
} else {
|
||||
if st, err := os.Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
|
||||
@ -156,6 +157,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
meta.metadata = xlMetaV2TrimData(meta.metadata)
|
||||
meta.name = strings.TrimSuffix(entry, xlStorageFormatFile)
|
||||
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
|
||||
meta.name = pathJoin(current, meta.name)
|
||||
|
@ -91,14 +91,16 @@ func testDeleteObject(obj ObjectLayer, instanceType string, t TestErrHandler) {
|
||||
|
||||
for _, object := range testCase.objectToUploads {
|
||||
md5Bytes := md5.Sum([]byte(object.content))
|
||||
_, err = obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, strings.NewReader(object.content),
|
||||
oi, err := obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, strings.NewReader(object.content),
|
||||
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{})
|
||||
t.Log(oi)
|
||||
if err != nil {
|
||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
_, _ = obj.DeleteObject(context.Background(), testCase.bucketName, testCase.pathToDelete, ObjectOptions{})
|
||||
oi, err := obj.DeleteObject(context.Background(), testCase.bucketName, testCase.pathToDelete, ObjectOptions{})
|
||||
t.Log(oi, err)
|
||||
|
||||
result, err := obj.ListObjects(context.Background(), testCase.bucketName, "", "", "", 1000)
|
||||
if err != nil {
|
||||
|
@ -207,6 +207,9 @@ const SlashSeparator = "/"
|
||||
|
||||
// retainSlash - retains slash from a path.
|
||||
func retainSlash(s string) string {
|
||||
if s == "" {
|
||||
return s
|
||||
}
|
||||
return strings.TrimSuffix(s, SlashSeparator) + SlashSeparator
|
||||
}
|
||||
|
||||
|
@ -54,6 +54,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/fatih/color"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio-go/v7/pkg/s3utils"
|
||||
"github.com/minio/minio-go/v7/pkg/signer"
|
||||
@ -233,13 +234,7 @@ func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) {
|
||||
// Using this interface, functionalities to be used in tests can be
|
||||
// made generalized, and can be integrated in benchmarks/unit tests/go check suite tests.
|
||||
type TestErrHandler interface {
|
||||
Log(args ...interface{})
|
||||
Logf(format string, args ...interface{})
|
||||
Error(args ...interface{})
|
||||
Errorf(format string, args ...interface{})
|
||||
Failed() bool
|
||||
Fatal(args ...interface{})
|
||||
Fatalf(format string, args ...interface{})
|
||||
testing.TB
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -73,13 +73,18 @@ func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersion
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getFileInfo(xlMetaBuf []byte, volume, path, versionID string) (FileInfo, error) {
|
||||
func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (FileInfo, error) {
|
||||
if isXL2V1Format(xlMetaBuf) {
|
||||
var xlMeta xlMetaV2
|
||||
if err := xlMeta.Load(xlMetaBuf); err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
return xlMeta.ToFileInfo(volume, path, versionID)
|
||||
fi, err := xlMeta.ToFileInfo(volume, path, versionID)
|
||||
if !data || err != nil {
|
||||
return fi, err
|
||||
}
|
||||
fi.Data = xlMeta.data.find(fi.DataDir)
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
xlMeta := &xlMetaV1Object{}
|
||||
|
@ -18,12 +18,15 @@ package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
|
||||
"github.com/google/uuid"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
@ -33,28 +36,55 @@ var (
|
||||
// XL header specifies the format
|
||||
xlHeader = [4]byte{'X', 'L', '2', ' '}
|
||||
|
||||
// XLv2 version 1
|
||||
xlVersionV1 = [4]byte{'1', ' ', ' ', ' '}
|
||||
// Current version being written.
|
||||
xlVersionCurrent [4]byte
|
||||
)
|
||||
|
||||
func checkXL2V1(buf []byte) error {
|
||||
const (
|
||||
// Breaking changes.
|
||||
// Newer versions cannot be read by older software.
|
||||
// This will prevent downgrades to incompatible versions.
|
||||
xlVersionMajor = 1
|
||||
|
||||
// Non breaking changes.
|
||||
// Bumping this is informational, but should be done
|
||||
// if any change is made to the data stored, bumping this
|
||||
// will allow to detect the exact version later.
|
||||
xlVersionMinor = 1
|
||||
)
|
||||
|
||||
func init() {
|
||||
binary.LittleEndian.PutUint16(xlVersionCurrent[0:2], xlVersionMajor)
|
||||
binary.LittleEndian.PutUint16(xlVersionCurrent[2:4], xlVersionMinor)
|
||||
}
|
||||
|
||||
// checkXL2V1 will check if the metadata has correct header and is a known major version.
|
||||
// The remaining payload and versions are returned.
|
||||
func checkXL2V1(buf []byte) (payload []byte, major, minor uint16, err error) {
|
||||
if len(buf) <= 8 {
|
||||
return fmt.Errorf("xlMeta: no data")
|
||||
return payload, 0, 0, fmt.Errorf("xlMeta: no data")
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf[:4], xlHeader[:]) {
|
||||
return fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4])
|
||||
return payload, 0, 0, fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4])
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf[4:8], xlVersionV1[:]) {
|
||||
return fmt.Errorf("xlMeta: unknown XLv2 version, expected %v, got %v", xlVersionV1[:4], buf[4:8])
|
||||
if bytes.Equal(buf[4:8], []byte("1 ")) {
|
||||
// Set as 1,0.
|
||||
major, minor = 1, 0
|
||||
} else {
|
||||
major, minor = binary.LittleEndian.Uint16(buf[4:6]), binary.LittleEndian.Uint16(buf[6:8])
|
||||
}
|
||||
if major > xlVersionMajor {
|
||||
return buf[8:], major, minor, fmt.Errorf("xlMeta: unknown major version %d found", major)
|
||||
}
|
||||
|
||||
return nil
|
||||
return buf[8:], major, minor, nil
|
||||
}
|
||||
|
||||
func isXL2V1Format(buf []byte) bool {
|
||||
return checkXL2V1(buf) == nil
|
||||
_, _, _, err := checkXL2V1(buf)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// The []journal contains all the different versions of the object.
|
||||
@ -199,6 +229,317 @@ func (j xlMetaV2Version) Valid() bool {
|
||||
// the journals for the object.
|
||||
type xlMetaV2 struct {
|
||||
Versions []xlMetaV2Version `json:"Versions" msg:"Versions"`
|
||||
|
||||
// data will contain raw data if any.
|
||||
// data will be one or more versions indexed by storage dir.
|
||||
// To remove all data set to nil.
|
||||
data xlMetaInlineData `msg:"-"`
|
||||
}
|
||||
|
||||
// xlMetaInlineData is serialized data in [string][]byte pairs.
|
||||
//
|
||||
//msgp:ignore xlMetaInlineData
|
||||
type xlMetaInlineData []byte
|
||||
|
||||
// xlMetaInlineDataVer indicates the vesrion of the inline data structure.
|
||||
const xlMetaInlineDataVer = 1
|
||||
|
||||
// versionOK returns whether the version is ok.
|
||||
func (x xlMetaInlineData) versionOK() bool {
|
||||
if len(x) == 0 {
|
||||
return true
|
||||
}
|
||||
return x[0] > 0 && x[0] <= xlMetaInlineDataVer
|
||||
}
|
||||
|
||||
// afterVersion returns the payload after the version, if any.
|
||||
func (x xlMetaInlineData) afterVersion() []byte {
|
||||
if len(x) == 0 {
|
||||
return x
|
||||
}
|
||||
return x[1:]
|
||||
}
|
||||
|
||||
// find the data with key s.
|
||||
// Returns nil if not for or an error occurs.
|
||||
func (x xlMetaInlineData) find(key string) []byte {
|
||||
if len(x) == 0 || !x.versionOK() {
|
||||
return nil
|
||||
}
|
||||
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
||||
if err != nil || sz == 0 {
|
||||
return nil
|
||||
}
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var found []byte
|
||||
found, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil || sz == 0 {
|
||||
return nil
|
||||
}
|
||||
if string(found) == key {
|
||||
val, _, _ := msgp.ReadBytesZC(buf)
|
||||
return val
|
||||
}
|
||||
// Skip it
|
||||
_, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// validate checks if the data is valid.
|
||||
// It does not check integrity of the stored data.
|
||||
func (x xlMetaInlineData) validate() error {
|
||||
if len(x) == 0 {
|
||||
return nil
|
||||
}
|
||||
if !x.versionOK() {
|
||||
return fmt.Errorf("xlMetaInlineData: unknown version 0x%x", x[0])
|
||||
}
|
||||
|
||||
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var key []byte
|
||||
key, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return fmt.Errorf("xlMetaInlineData: key %d is length 0", i)
|
||||
}
|
||||
_, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// validate checks if the data is valid.
|
||||
// It does not check integrity of the stored data.
|
||||
func (x xlMetaInlineData) list() ([]string, error) {
|
||||
if len(x) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if !x.versionOK() {
|
||||
return nil, errors.New("xlMetaInlineData: unknown version")
|
||||
}
|
||||
|
||||
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys := make([]string, 0, sz)
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var key []byte
|
||||
key, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil {
|
||||
return keys, err
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return keys, fmt.Errorf("xlMetaInlineData: key %d is length 0", i)
|
||||
}
|
||||
keys = append(keys, string(key))
|
||||
// Skip data...
|
||||
_, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
return keys, err
|
||||
}
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (x xlMetaInlineData) entries() int {
|
||||
if len(x) == 0 || !x.versionOK() {
|
||||
return 0
|
||||
}
|
||||
sz, _, _ := msgp.ReadMapHeaderBytes(x.afterVersion())
|
||||
return int(sz)
|
||||
}
|
||||
|
||||
// replace will add or replace a key/value pair.
|
||||
func (x *xlMetaInlineData) replace(key string, value []byte) {
|
||||
in := x.afterVersion()
|
||||
sz, buf, _ := msgp.ReadMapHeaderBytes(in)
|
||||
keys := make([][]byte, 0, sz+1)
|
||||
vals := make([][]byte, 0, sz+1)
|
||||
|
||||
// Version plus header...
|
||||
plSize := 1 + msgp.MapHeaderSize
|
||||
replaced := false
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var found, foundVal []byte
|
||||
var err error
|
||||
found, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
foundVal, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
plSize += len(found) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
||||
keys = append(keys, found)
|
||||
if string(found) == key {
|
||||
vals = append(vals, value)
|
||||
plSize += len(value)
|
||||
replaced = true
|
||||
} else {
|
||||
vals = append(vals, foundVal)
|
||||
plSize += len(foundVal)
|
||||
}
|
||||
}
|
||||
// Add one more.
|
||||
if !replaced {
|
||||
keys = append(keys, []byte(key))
|
||||
vals = append(vals, value)
|
||||
plSize += len(key) + len(value) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
||||
}
|
||||
|
||||
// Reserialize...
|
||||
payload := make([]byte, 1, plSize)
|
||||
payload[0] = xlMetaInlineDataVer
|
||||
payload = msgp.AppendMapHeader(payload, uint32(len(keys)))
|
||||
for i := range keys {
|
||||
payload = msgp.AppendStringFromBytes(payload, keys[i])
|
||||
payload = msgp.AppendBytes(payload, vals[i])
|
||||
}
|
||||
*x = payload
|
||||
if err := x.validate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// rename will rename a key.
|
||||
// Returns whether the key was found.
|
||||
func (x *xlMetaInlineData) rename(oldKey, newKey string) bool {
|
||||
in := x.afterVersion()
|
||||
sz, buf, _ := msgp.ReadMapHeaderBytes(in)
|
||||
keys := make([][]byte, 0, sz)
|
||||
vals := make([][]byte, 0, sz)
|
||||
|
||||
// Version plus header...
|
||||
plSize := 1 + msgp.MapHeaderSize
|
||||
found := false
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var foundKey, foundVal []byte
|
||||
var err error
|
||||
foundKey, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
foundVal, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
plSize += len(foundVal) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
||||
vals = append(vals, foundVal)
|
||||
if string(foundKey) != oldKey {
|
||||
keys = append(keys, foundKey)
|
||||
plSize += len(foundKey)
|
||||
} else {
|
||||
keys = append(keys, []byte(newKey))
|
||||
plSize += len(newKey)
|
||||
found = true
|
||||
}
|
||||
}
|
||||
// If not found, just return.
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
|
||||
// Reserialize...
|
||||
payload := make([]byte, 1, plSize)
|
||||
payload[0] = xlMetaInlineDataVer
|
||||
payload = msgp.AppendMapHeader(payload, uint32(len(keys)))
|
||||
for i := range keys {
|
||||
payload = msgp.AppendStringFromBytes(payload, keys[i])
|
||||
payload = msgp.AppendBytes(payload, vals[i])
|
||||
}
|
||||
*x = payload
|
||||
return true
|
||||
}
|
||||
|
||||
// remove will remove a key.
|
||||
// Returns whether the key was found.
|
||||
func (x *xlMetaInlineData) remove(key string) bool {
|
||||
in := x.afterVersion()
|
||||
sz, buf, _ := msgp.ReadMapHeaderBytes(in)
|
||||
keys := make([][]byte, 0, sz)
|
||||
vals := make([][]byte, 0, sz)
|
||||
|
||||
// Version plus header...
|
||||
plSize := 1 + msgp.MapHeaderSize
|
||||
found := false
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var foundKey, foundVal []byte
|
||||
var err error
|
||||
foundKey, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
foundVal, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if string(foundKey) != key {
|
||||
plSize += msgp.StringPrefixSize + msgp.ArrayHeaderSize + len(foundKey) + len(foundVal)
|
||||
keys = append(keys, foundKey)
|
||||
vals = append(vals, foundVal)
|
||||
} else {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
// If not found, just return.
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
// If none left...
|
||||
if len(keys) == 0 {
|
||||
*x = nil
|
||||
return true
|
||||
}
|
||||
|
||||
// Reserialize...
|
||||
payload := make([]byte, 1, plSize)
|
||||
payload[0] = xlMetaInlineDataVer
|
||||
payload = msgp.AppendMapHeader(payload, uint32(len(keys)))
|
||||
for i := range keys {
|
||||
payload = msgp.AppendStringFromBytes(payload, keys[i])
|
||||
payload = msgp.AppendBytes(payload, vals[i])
|
||||
}
|
||||
*x = payload
|
||||
return true
|
||||
}
|
||||
|
||||
// xlMetaV2TrimData will trim any data from the metadata without unmarshalling it.
|
||||
// If any error occurs the unmodified data is returned.
|
||||
func xlMetaV2TrimData(buf []byte) []byte {
|
||||
metaBuf, min, maj, err := checkXL2V1(buf)
|
||||
if err != nil {
|
||||
return buf
|
||||
}
|
||||
if maj == 1 && min < 1 {
|
||||
// First version to carry data.
|
||||
return buf
|
||||
}
|
||||
// Skip header
|
||||
_, metaBuf, err = msgp.ReadBytesZC(metaBuf)
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
return buf
|
||||
}
|
||||
// = input - current pos
|
||||
ends := len(buf) - len(metaBuf)
|
||||
if ends > len(buf) {
|
||||
return buf
|
||||
}
|
||||
return buf[:ends]
|
||||
}
|
||||
|
||||
// AddLegacy adds a legacy version, is only called when no prior
|
||||
@ -219,12 +560,69 @@ func (z *xlMetaV2) AddLegacy(m *xlMetaV1Object) error {
|
||||
}
|
||||
|
||||
// Load unmarshal and load the entire message pack.
|
||||
// Note that references to the incoming buffer may be kept as data.
|
||||
func (z *xlMetaV2) Load(buf []byte) error {
|
||||
if err := checkXL2V1(buf); err != nil {
|
||||
return err
|
||||
buf, _, minor, err := checkXL2V1(buf)
|
||||
if err != nil {
|
||||
return errFileCorrupt
|
||||
}
|
||||
_, err := z.UnmarshalMsg(buf[8:])
|
||||
return err
|
||||
switch minor {
|
||||
case 0:
|
||||
_, err = z.UnmarshalMsg(buf)
|
||||
if err != nil {
|
||||
return errFileCorrupt
|
||||
}
|
||||
return nil
|
||||
case 1:
|
||||
v, buf, err := msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
return errFileCorrupt
|
||||
}
|
||||
_, err = z.UnmarshalMsg(v)
|
||||
if err != nil {
|
||||
return errFileCorrupt
|
||||
}
|
||||
// Add remaining data.
|
||||
z.data = nil
|
||||
if len(buf) > 0 {
|
||||
z.data = buf
|
||||
if err := z.data.validate(); err != nil {
|
||||
return errFileCorrupt
|
||||
}
|
||||
}
|
||||
default:
|
||||
return errors.New("unknown metadata version")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AppendTo will marshal the data in z and append it to the provided slice.
|
||||
func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) {
|
||||
sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst)
|
||||
if cap(dst) < sz {
|
||||
buf := make([]byte, len(dst), sz)
|
||||
copy(buf, dst)
|
||||
dst = buf
|
||||
}
|
||||
if err := z.data.validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dst = append(dst, xlHeader[:]...)
|
||||
dst = append(dst, xlVersionCurrent[:]...)
|
||||
// Add "bin 32" type header to always have enough space.
|
||||
// We will fill out the correct size when we know it.
|
||||
dst = append(dst, 0xc6, 0, 0, 0, 0)
|
||||
dataOffset := len(dst)
|
||||
dst, err := z.MarshalMsg(dst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update size...
|
||||
binary.BigEndian.PutUint32(dst[dataOffset-4:dataOffset], uint32(len(dst)-dataOffset))
|
||||
|
||||
return append(dst, z.data...), nil
|
||||
}
|
||||
|
||||
// AddVersion adds a new version
|
||||
@ -304,6 +702,10 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
||||
ventry.ObjectV2.MetaUser[k] = v
|
||||
}
|
||||
}
|
||||
// If asked to save data.
|
||||
if len(fi.Data) > 0 || fi.Size == 0 {
|
||||
z.data.replace(dd.String(), fi.Data)
|
||||
}
|
||||
}
|
||||
|
||||
if !ventry.Valid() {
|
||||
@ -324,7 +726,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
||||
return nil
|
||||
}
|
||||
case ObjectType:
|
||||
if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) {
|
||||
if version.ObjectV2.VersionID == uv {
|
||||
z.Versions[i] = ventry
|
||||
return nil
|
||||
}
|
||||
@ -332,7 +734,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
||||
// Allowing delete marker to replaced with an proper
|
||||
// object data type as well, this is not S3 complaint
|
||||
// behavior but kept here for future flexibility.
|
||||
if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) {
|
||||
if version.DeleteMarker.VersionID == uv {
|
||||
z.Versions[i] = ventry
|
||||
return nil
|
||||
}
|
||||
@ -352,7 +754,7 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error)
|
||||
versionID := ""
|
||||
var uv uuid.UUID
|
||||
// check if the version is not "null"
|
||||
if !bytes.Equal(j.VersionID[:], uv[:]) {
|
||||
if j.VersionID != uv {
|
||||
versionID = uuid.UUID(j.VersionID).String()
|
||||
}
|
||||
fi := FileInfo{
|
||||
@ -516,7 +918,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
|
||||
return version.ObjectV1.DataDir, len(z.Versions) == 0, nil
|
||||
}
|
||||
case DeleteType:
|
||||
if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) {
|
||||
if version.DeleteMarker.VersionID == uv {
|
||||
if updateVersion {
|
||||
if len(z.Versions[i].DeleteMarker.MetaSys) == 0 {
|
||||
z.Versions[i].DeleteMarker.MetaSys = make(map[string][]byte)
|
||||
@ -538,7 +940,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
|
||||
return "", len(z.Versions) == 0, nil
|
||||
}
|
||||
case ObjectType:
|
||||
if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) && updateVersion {
|
||||
if version.ObjectV2.VersionID == uv && updateVersion {
|
||||
z.Versions[i].ObjectV2.MetaSys[VersionPurgeStatusKey] = []byte(fi.VersionPurgeStatus)
|
||||
return "", len(z.Versions) == 0, nil
|
||||
}
|
||||
@ -550,7 +952,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
|
||||
for _, version := range versions {
|
||||
switch version.Type {
|
||||
case ObjectType:
|
||||
if bytes.Equal(version.ObjectV2.DataDir[:], dataDir[:]) {
|
||||
if version.ObjectV2.DataDir == dataDir {
|
||||
sameDataDirCount++
|
||||
}
|
||||
}
|
||||
@ -564,7 +966,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
|
||||
}
|
||||
switch version.Type {
|
||||
case ObjectType:
|
||||
if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) {
|
||||
if version.ObjectV2.VersionID == uv {
|
||||
if fi.TransitionStatus != "" {
|
||||
z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+"transition-status"] = []byte(fi.TransitionStatus)
|
||||
return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil
|
||||
|
144
cmd/xl-storage-format-v2_test.go
Normal file
144
cmd/xl-storage-format-v2_test.go
Normal file
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestXLV2FormatData(t *testing.T) {
|
||||
failOnErr := func(err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
data := []byte("some object data")
|
||||
data2 := []byte("some other object data")
|
||||
|
||||
xl := xlMetaV2{}
|
||||
fi := FileInfo{
|
||||
Volume: "volume",
|
||||
Name: "object-name",
|
||||
VersionID: "756100c6-b393-4981-928a-d49bbc164741",
|
||||
IsLatest: true,
|
||||
Deleted: false,
|
||||
TransitionStatus: "",
|
||||
DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef",
|
||||
XLV1: false,
|
||||
ModTime: time.Now(),
|
||||
Size: 0,
|
||||
Mode: 0,
|
||||
Metadata: nil,
|
||||
Parts: nil,
|
||||
Erasure: ErasureInfo{
|
||||
Algorithm: ReedSolomon.String(),
|
||||
DataBlocks: 4,
|
||||
ParityBlocks: 2,
|
||||
BlockSize: 10000,
|
||||
Index: 1,
|
||||
Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8},
|
||||
Checksums: []ChecksumInfo{{
|
||||
PartNumber: 1,
|
||||
Algorithm: HighwayHash256S,
|
||||
Hash: nil,
|
||||
}},
|
||||
},
|
||||
MarkDeleted: false,
|
||||
DeleteMarkerReplicationStatus: "",
|
||||
VersionPurgeStatus: "",
|
||||
Data: data,
|
||||
NumVersions: 1,
|
||||
SuccessorModTime: time.Time{},
|
||||
}
|
||||
|
||||
failOnErr(xl.AddVersion(fi))
|
||||
|
||||
fi.VersionID = mustGetUUID()
|
||||
fi.DataDir = mustGetUUID()
|
||||
fi.Data = data2
|
||||
failOnErr(xl.AddVersion(fi))
|
||||
|
||||
serialized, err := xl.AppendTo(nil)
|
||||
failOnErr(err)
|
||||
// Roundtrip data
|
||||
var xl2 xlMetaV2
|
||||
failOnErr(xl2.Load(serialized))
|
||||
|
||||
// We should have one data entry
|
||||
list, err := xl2.data.list()
|
||||
failOnErr(err)
|
||||
if len(list) != 2 {
|
||||
t.Fatalf("want 1 entry, got %d", len(list))
|
||||
}
|
||||
|
||||
if !bytes.Equal(xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"), data) {
|
||||
t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"))
|
||||
}
|
||||
if !bytes.Equal(xl2.data.find(fi.DataDir), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find(fi.DataDir))
|
||||
}
|
||||
|
||||
// Remove entry
|
||||
xl2.data.remove(fi.DataDir)
|
||||
failOnErr(xl2.data.validate())
|
||||
if xl2.data.find(fi.DataDir) != nil {
|
||||
t.Fatal("Data was not removed:", xl2.data.find(fi.DataDir))
|
||||
}
|
||||
if xl2.data.entries() != 1 {
|
||||
t.Fatal("want 1 entry, got", xl2.data.entries())
|
||||
}
|
||||
// Re-add
|
||||
xl2.data.replace(fi.DataDir, fi.Data)
|
||||
failOnErr(xl2.data.validate())
|
||||
if xl2.data.entries() != 2 {
|
||||
t.Fatal("want 2 entries, got", xl2.data.entries())
|
||||
}
|
||||
|
||||
// Replace entry
|
||||
xl2.data.replace("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", data2)
|
||||
failOnErr(xl2.data.validate())
|
||||
if xl2.data.entries() != 2 {
|
||||
t.Fatal("want 2 entries, got", xl2.data.entries())
|
||||
}
|
||||
if !bytes.Equal(xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"))
|
||||
}
|
||||
|
||||
if !xl2.data.rename("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", "new-key") {
|
||||
t.Fatal("old key was not found")
|
||||
}
|
||||
failOnErr(xl2.data.validate())
|
||||
if !bytes.Equal(xl2.data.find("new-key"), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"))
|
||||
}
|
||||
if xl2.data.entries() != 2 {
|
||||
t.Fatal("want 2 entries, got", xl2.data.entries())
|
||||
}
|
||||
if !bytes.Equal(xl2.data.find(fi.DataDir), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find(fi.DataDir))
|
||||
}
|
||||
|
||||
// Test trimmed
|
||||
xl2 = xlMetaV2{}
|
||||
failOnErr(xl2.Load(xlMetaV2TrimData(serialized)))
|
||||
if len(xl2.data) != 0 {
|
||||
t.Fatal("data, was not trimmed, bytes left:", len(xl2.data))
|
||||
}
|
||||
}
|
@ -843,15 +843,14 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
return err
|
||||
}
|
||||
|
||||
buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// transitioned objects maintains metadata on the source cluster. When transition
|
||||
// status is set, update the metadata to disk.
|
||||
if !lastVersion || fi.TransitionStatus != "" {
|
||||
// when data-dir is specified. Transition leverages existing DeleteObject
|
||||
// api call to mark object as deleted. When object is pending transition,
|
||||
// just update the metadata and avoid deleting data dir.
|
||||
if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending {
|
||||
xlMeta.data.remove(dataDir)
|
||||
filePath := pathJoin(volumeDir, path, dataDir)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
@ -859,24 +858,34 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
|
||||
tmpuuid := mustGetUUID()
|
||||
if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)); err != nil {
|
||||
if err != errFileNotFound {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buf, err = xlMeta.AppendTo(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// transitioned objects maintains metadata on the source cluster. When transition
|
||||
// status is set, update the metadata to disk.
|
||||
if !lastVersion || fi.TransitionStatus != "" {
|
||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
||||
}
|
||||
|
||||
// Delete the meta file, if there are no more versions the
|
||||
// top level parent is automatically removed.
|
||||
filePath := pathJoin(volumeDir, path, xlStorageFormatFile)
|
||||
// Move everything to trash.
|
||||
filePath := retainSlash(pathJoin(volumeDir, path))
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
}
|
||||
err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
|
||||
|
||||
return s.deleteFile(volumeDir, filePath, false)
|
||||
// Delete parents if needed.
|
||||
filePath = retainSlash(pathutil.Dir(pathJoin(volumeDir, path)))
|
||||
if filePath == retainSlash(volumeDir) {
|
||||
return err
|
||||
}
|
||||
s.deleteFile(volumeDir, filePath, false)
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
||||
@ -890,26 +899,36 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F
|
||||
if !isXL2V1Format(buf) {
|
||||
xlMeta, err = newXLMetaV2(fi)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...))
|
||||
buf, err = xlMeta.AppendTo(nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
if err := xlMeta.Load(buf); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
} else {
|
||||
if err = xlMeta.Load(buf); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
if err = xlMeta.AddVersion(fi); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...))
|
||||
buf, err = xlMeta.AppendTo(nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
||||
|
||||
}
|
||||
|
||||
func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) {
|
||||
@ -1005,16 +1024,20 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
return fi, errFileNotFound
|
||||
}
|
||||
|
||||
fi, err = getFileInfo(buf, volume, path, versionID)
|
||||
fi, err = getFileInfo(buf, volume, path, versionID, readData)
|
||||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
|
||||
if readData {
|
||||
if len(fi.Data) > 0 || fi.Size == 0 {
|
||||
return fi, nil
|
||||
}
|
||||
// Reading data for small objects when
|
||||
// - object has not yet transitioned
|
||||
// - object size lesser than 32KiB
|
||||
// - object has maximum of 1 parts
|
||||
|
||||
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 {
|
||||
// Enable O_DIRECT optionally only if drive supports it.
|
||||
requireDirectIO := globalStorageClass.GetDMA() == storageclass.DMAReadWrite
|
||||
@ -1801,8 +1824,9 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
fi, err := getFileInfo(srcBuf, dstVolume, dstPath, "")
|
||||
fi, err := getFileInfo(srcBuf, dstVolume, dstPath, "", true)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1955,30 +1979,37 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
return err
|
||||
}
|
||||
|
||||
dstBuf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...))
|
||||
dstBuf, err = xlMeta.AppendTo(nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return errFileCorrupt
|
||||
}
|
||||
|
||||
// Commit data, if any
|
||||
if srcDataPath != "" {
|
||||
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit data
|
||||
if srcDataPath != "" {
|
||||
tmpuuid := mustGetUUID()
|
||||
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
tmpuuid = mustGetUUID()
|
||||
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Commit meta-file
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
} else {
|
||||
// Write meta-file directly, no data
|
||||
if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Remove parent dir of the source file if empty
|
||||
parentDir := pathutil.Dir(srcFilePath)
|
||||
@ -2074,63 +2105,13 @@ func (s *xlStorage) bitrotVerify(partPath string, partSize int64, algo BitrotAlg
|
||||
|
||||
// Close the file descriptor.
|
||||
defer file.Close()
|
||||
|
||||
if algo != HighwayHash256S {
|
||||
h := algo.New()
|
||||
if _, err = io.Copy(h, file); err != nil {
|
||||
// Premature failure in reading the object,file is corrupt.
|
||||
return errFileCorrupt
|
||||
}
|
||||
if !bytes.Equal(h.Sum(nil), sum) {
|
||||
return errFileCorrupt
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
buf := make([]byte, shardSize)
|
||||
h := algo.New()
|
||||
hashBuf := make([]byte, h.Size())
|
||||
fi, err := file.Stat()
|
||||
if err != nil {
|
||||
// Unable to stat on the file, return an expected error
|
||||
// for healing code to fix this file.
|
||||
return err
|
||||
}
|
||||
|
||||
size := fi.Size()
|
||||
|
||||
// Calculate the size of the bitrot file and compare
|
||||
// it with the actual file size.
|
||||
if size != bitrotShardFileSize(partSize, shardSize, algo) {
|
||||
return errFileCorrupt
|
||||
}
|
||||
|
||||
var n int
|
||||
for {
|
||||
if size == 0 {
|
||||
return nil
|
||||
}
|
||||
h.Reset()
|
||||
n, err = file.Read(hashBuf)
|
||||
if err != nil {
|
||||
// Read's failed for object with right size, file is corrupt.
|
||||
return err
|
||||
}
|
||||
size -= int64(n)
|
||||
if size < int64(len(buf)) {
|
||||
buf = buf[:size]
|
||||
}
|
||||
n, err = file.Read(buf)
|
||||
if err != nil {
|
||||
// Read's failed for object with right size, at different offsets.
|
||||
return err
|
||||
}
|
||||
size -= int64(n)
|
||||
h.Write(buf)
|
||||
if !bytes.Equal(h.Sum(nil), hashBuf) {
|
||||
return errFileCorrupt
|
||||
}
|
||||
}
|
||||
return bitrotVerify(file, fi.Size(), partSize, algo, sum, shardSize)
|
||||
}
|
||||
|
||||
func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (err error) {
|
||||
|
@ -18,9 +18,12 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
@ -28,8 +31,6 @@ import (
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
var xlHeader = [4]byte{'X', 'L', '2', ' '}
|
||||
|
||||
func main() {
|
||||
app := cli.NewApp()
|
||||
app.Copyright = "MinIO, Inc."
|
||||
@ -53,6 +54,10 @@ GLOBAL FLAGS:
|
||||
Usage: "Print each file as a separate line without formatting",
|
||||
Name: "ndjson",
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Usage: "Display inline data keys and sizes",
|
||||
Name: "data",
|
||||
},
|
||||
}
|
||||
|
||||
app.Action = func(c *cli.Context) error {
|
||||
@ -75,39 +80,57 @@ GLOBAL FLAGS:
|
||||
r = f
|
||||
}
|
||||
|
||||
// Read header
|
||||
var tmp [4]byte
|
||||
_, err := io.ReadFull(r, tmp[:])
|
||||
b, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(tmp[:], xlHeader[:]) {
|
||||
return fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], tmp[:4])
|
||||
}
|
||||
// Skip version check for now
|
||||
_, err = io.ReadFull(r, tmp[:])
|
||||
b, _, minor, err := checkXL2V1(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
_, err = msgp.CopyToJSON(&buf, r)
|
||||
buf := bytes.NewBuffer(nil)
|
||||
var data xlMetaInlineData
|
||||
switch minor {
|
||||
case 0:
|
||||
_, err = msgp.CopyToJSON(buf, bytes.NewBuffer(b))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case 1:
|
||||
v, b, err := msgp.ReadBytesZC(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = msgp.CopyToJSON(buf, bytes.NewBuffer(v))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = b
|
||||
default:
|
||||
return errors.New("unknown metadata version")
|
||||
}
|
||||
|
||||
if c.Bool("data") {
|
||||
b, err := data.json()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf = bytes.NewBuffer(b)
|
||||
}
|
||||
if c.Bool("ndjson") {
|
||||
fmt.Println(buf.String())
|
||||
continue
|
||||
}
|
||||
var msi map[string]interface{}
|
||||
dec := json.NewDecoder(&buf)
|
||||
dec := json.NewDecoder(buf)
|
||||
// Use number to preserve integers.
|
||||
dec.UseNumber()
|
||||
err = dec.Decode(&msi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b, err := json.MarshalIndent(msi, "", " ")
|
||||
b, err = json.MarshalIndent(msi, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -120,3 +143,111 @@ GLOBAL FLAGS:
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// XL header specifies the format
|
||||
xlHeader = [4]byte{'X', 'L', '2', ' '}
|
||||
|
||||
// Current version being written.
|
||||
xlVersionCurrent [4]byte
|
||||
)
|
||||
|
||||
const (
|
||||
// Breaking changes.
|
||||
// Newer versions cannot be read by older software.
|
||||
// This will prevent downgrades to incompatible versions.
|
||||
xlVersionMajor = 1
|
||||
|
||||
// Non breaking changes.
|
||||
// Bumping this is informational, but should be done
|
||||
// if any change is made to the data stored, bumping this
|
||||
// will allow to detect the exact version later.
|
||||
xlVersionMinor = 1
|
||||
)
|
||||
|
||||
func init() {
|
||||
binary.LittleEndian.PutUint16(xlVersionCurrent[0:2], xlVersionMajor)
|
||||
binary.LittleEndian.PutUint16(xlVersionCurrent[2:4], xlVersionMinor)
|
||||
}
|
||||
|
||||
// checkXL2V1 will check if the metadata has correct header and is a known major version.
|
||||
// The remaining payload and versions are returned.
|
||||
func checkXL2V1(buf []byte) (payload []byte, major, minor uint16, err error) {
|
||||
if len(buf) <= 8 {
|
||||
return payload, 0, 0, fmt.Errorf("xlMeta: no data")
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf[:4], xlHeader[:]) {
|
||||
return payload, 0, 0, fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4])
|
||||
}
|
||||
|
||||
if bytes.Equal(buf[4:8], []byte("1 ")) {
|
||||
// Set as 1,0.
|
||||
major, minor = 1, 0
|
||||
} else {
|
||||
major, minor = binary.LittleEndian.Uint16(buf[4:6]), binary.LittleEndian.Uint16(buf[6:8])
|
||||
}
|
||||
if major > xlVersionMajor {
|
||||
return buf[8:], major, minor, fmt.Errorf("xlMeta: unknown major version %d found", major)
|
||||
}
|
||||
|
||||
return buf[8:], major, minor, nil
|
||||
}
|
||||
|
||||
const xlMetaInlineDataVer = 1
|
||||
|
||||
type xlMetaInlineData []byte
|
||||
|
||||
// afterVersion returns the payload after the version, if any.
|
||||
func (x xlMetaInlineData) afterVersion() []byte {
|
||||
if len(x) == 0 {
|
||||
return x
|
||||
}
|
||||
return x[1:]
|
||||
}
|
||||
|
||||
// versionOK returns whether the version is ok.
|
||||
func (x xlMetaInlineData) versionOK() bool {
|
||||
if len(x) == 0 {
|
||||
return true
|
||||
}
|
||||
return x[0] > 0 && x[0] <= xlMetaInlineDataVer
|
||||
}
|
||||
|
||||
func (x xlMetaInlineData) json() ([]byte, error) {
|
||||
if len(x) == 0 {
|
||||
return []byte("{}"), nil
|
||||
}
|
||||
if !x.versionOK() {
|
||||
return nil, errors.New("xlMetaInlineData: unknown version")
|
||||
}
|
||||
|
||||
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := []byte("{")
|
||||
|
||||
for i := uint32(0); i < sz; i++ {
|
||||
var key, val []byte
|
||||
key, buf, err = msgp.ReadMapKeyZC(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return nil, fmt.Errorf("xlMetaInlineData: key %d is length 0", i)
|
||||
}
|
||||
// Skip data...
|
||||
val, buf, err = msgp.ReadBytesZC(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i > 0 {
|
||||
res = append(res, ',')
|
||||
}
|
||||
s := fmt.Sprintf(`"%s":%d`, string(key), len(val))
|
||||
res = append(res, []byte(s)...)
|
||||
}
|
||||
res = append(res, '}')
|
||||
return res, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user