diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index c7e503cd9..86061d405 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -25,6 +25,7 @@ import ( "io" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/ioutil" ) type errHashMismatch struct { @@ -37,10 +38,11 @@ func (err *errHashMismatch) Error() string { // Calculates bitrot in chunks and writes the hash into the stream. type streamingBitrotWriter struct { - iow io.WriteCloser - h hash.Hash - shardSize int64 - canClose chan struct{} // Needed to avoid race explained in Close() call. + iow io.WriteCloser + closeWithErr func(err error) error + h hash.Hash + shardSize int64 + canClose chan struct{} // Needed to avoid race explained in Close() call. } func (b *streamingBitrotWriter) Write(p []byte) (int, error) { @@ -66,16 +68,24 @@ func (b *streamingBitrotWriter) Close() error { // 2) pipe.Close() // Now pipe.Close() can return before the data is read on the other end of the pipe and written to the disk // Hence an immediate Read() on the file can return incorrect data. - <-b.canClose + if b.canClose != nil { + <-b.canClose + } return err } +// Returns streaming bitrot writer implementation. +func newStreamingBitrotWriterBuffer(w io.Writer, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { + return &streamingBitrotWriter{iow: ioutil.NopCloser(w), h: algo.New(), shardSize: shardSize, canClose: nil} +} + // Returns streaming bitrot writer implementation. func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64, heal bool) io.Writer { r, w := io.Pipe() h := algo.New() - bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{})} + bw := &streamingBitrotWriter{iow: w, closeWithErr: w.CloseWithError, h: h, shardSize: shardSize, canClose: make(chan struct{})} + go func() { totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1) if length != -1 { @@ -123,7 +133,7 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { // For the first ReadAt() call we need to open the stream for reading. b.currOffset = offset streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset - if len(b.data) == 0 { + if len(b.data) == 0 && b.tillOffset != streamOffset { b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) } else { b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset) @@ -161,15 +171,13 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { h := algo.New() return &streamingBitrotReader{ - disk, - data, - nil, - volume, - filePath, - ceilFrac(tillOffset, shardSize)*int64(h.Size()) + tillOffset, - 0, - h, - shardSize, - make([]byte, h.Size()), + disk: disk, + data: data, + volume: volume, + filePath: filePath, + tillOffset: ceilFrac(tillOffset, shardSize)*int64(h.Size()) + tillOffset, + h: h, + shardSize: shardSize, + hashBytes: make([]byte, h.Size()), } } diff --git a/cmd/bitrot.go b/cmd/bitrot.go index e29e478bd..899b8b56a 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "crypto/sha256" "errors" "hash" @@ -143,3 +144,54 @@ func bitrotShardFileSize(size int64, shardSize int64, algo BitrotAlgorithm) int6 } return ceilFrac(size, shardSize)*int64(algo.New().Size()) + size } + +// bitrotVerify a single stream of data. +func bitrotVerify(r io.Reader, wantSize, partSize int64, algo BitrotAlgorithm, want []byte, shardSize int64) error { + if algo != HighwayHash256S { + h := algo.New() + if n, err := io.Copy(h, r); err != nil || n != wantSize { + // Premature failure in reading the object, file is corrupt. + return errFileCorrupt + } + if !bytes.Equal(h.Sum(nil), want) { + return errFileCorrupt + } + return nil + } + + h := algo.New() + hashBuf := make([]byte, h.Size()) + buf := make([]byte, shardSize) + left := wantSize + + // Calculate the size of the bitrot file and compare + // it with the actual file size. + if left != bitrotShardFileSize(partSize, shardSize, algo) { + return errFileCorrupt + } + + for left > 0 { + // Read expected hash... + h.Reset() + n, err := io.ReadFull(r, hashBuf) + if err != nil { + // Read's failed for object with right size, file is corrupt. + return err + } + // Subtract hash length.. + left -= int64(n) + if left < shardSize { + shardSize = left + } + read, err := io.CopyBuffer(h, io.LimitReader(r, shardSize), buf) + if err != nil { + // Read's failed for object with right size, at different offsets. + return err + } + left -= read + if !bytes.Equal(h.Sum(nil), hashBuf) { + return errFileCorrupt + } + } + return nil +} diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go index 8788b51ed..cc542a128 100644 --- a/cmd/erasure-encode_test.go +++ b/cmd/erasure-encode_test.go @@ -139,7 +139,7 @@ func TestErasureEncode(t *testing.T) { case *wholeBitrotWriter: w.disk = badDisk{nil} case *streamingBitrotWriter: - w.iow.(*io.PipeWriter).CloseWithError(errFaultyDisk) + w.closeWithErr(errFaultyDisk) } } if test.offDisks > 0 { diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go index ee8be3f2f..0d7e1db9a 100644 --- a/cmd/erasure-healing-common.go +++ b/cmd/erasure-healing-common.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "time" @@ -198,8 +199,8 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad dataErrs[i] = errDiskNotFound continue } + meta := partsMetadata[i] if erasureDistributionReliable { - meta := partsMetadata[i] if !meta.IsValid() { continue } @@ -221,6 +222,21 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad } } + // Always check data, if we got it. + if len(meta.Data) > 0 || meta.Size == 0 { + checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number) + dataErrs[i] = bitrotVerify(bytes.NewBuffer(meta.Data), + int64(len(meta.Data)), + meta.Erasure.ShardFileSize(meta.Size), + checksumInfo.Algorithm, + checksumInfo.Hash, meta.Erasure.ShardSize()) + if dataErrs[i] == nil { + // All parts verified, mark it as all data available. + availableDisks[i] = onlineDisk + } + continue + } + switch scanMode { case madmin.HealDeepScan: // disk has a valid xl.meta but may not have all the diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 6c9f81c3a..02b85eaf2 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -177,85 +177,269 @@ func TestListOnlineDisks(t *testing.T) { } object := "object" - data := bytes.Repeat([]byte("a"), 1024) + data := bytes.Repeat([]byte("a"), smallFileThreshold*2) z := obj.(*erasureServerPools) erasureDisks := z.serverPools[0].sets[0].getDisks() for i, test := range testCases { - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) - if err != nil { - t.Fatalf("Failed to putObject %v", err) - } + t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) - fi, err := getLatestFileInfo(ctx, partsMetadata, errs) - if err != nil { - t.Fatalf("Failed to getLatestFileInfo %v", err) - } - - for j := range partsMetadata { - if errs[j] != nil { - t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) - } - partsMetadata[j].ModTime = test.modTimes[j] - } - - tamperedIndex := -1 - switch test._tamperBackend { - case deletePart: - for index, err := range test.errs { - if err != nil { - continue - } - // Remove a part from a disk - // which has a valid xl.meta, - // and check if that disk - // appears in outDatedDisks. - tamperedIndex = index - dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) - if dErr != nil { - t.Fatalf("Test %d: Failed to delete %s - %v", i+1, - filepath.Join(object, "part.1"), dErr) - } - break - } - case corruptPart: - for index, err := range test.errs { - if err != nil { - continue - } - // Corrupt a part from a disk - // which has a valid xl.meta, - // and check if that disk - // appears in outDatedDisks. - tamperedIndex = index - filePath := pathJoin(erasureDisks[index].String(), bucket, object, fi.DataDir, "part.1") - f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_SYNC, 0) - if err != nil { - t.Fatalf("Failed to open %s: %s\n", filePath, err) - } - f.Write([]byte("oops")) // Will cause bitrot error - f.Close() - break + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) } - } - - onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) - if !modTime.Equal(test.expectedTime) { - t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", - i+1, test.expectedTime, modTime) - } - - availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) - test.errs = newErrs - - if test._tamperBackend != noTamper { - if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { - t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", - i+1, erasureDisks[tamperedIndex]) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fi, err := getLatestFileInfo(ctx, partsMetadata, errs) + if err != nil { + t.Fatalf("Failed to getLatestFileInfo %v", err) } - } + for j := range partsMetadata { + if errs[j] != nil { + t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) + } + partsMetadata[j].ModTime = test.modTimes[j] + } + + tamperedIndex := -1 + switch test._tamperBackend { + case deletePart: + for index, err := range test.errs { + if err != nil { + continue + } + // Remove a part from a disk + // which has a valid xl.meta, + // and check if that disk + // appears in outDatedDisks. + tamperedIndex = index + dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) + if dErr != nil { + t.Fatalf("Test %d: Failed to delete %s - %v", i+1, + filepath.Join(object, "part.1"), dErr) + } + break + } + case corruptPart: + for index, err := range test.errs { + if err != nil { + continue + } + // Corrupt a part from a disk + // which has a valid xl.meta, + // and check if that disk + // appears in outDatedDisks. + tamperedIndex = index + filePath := pathJoin(erasureDisks[index].String(), bucket, object, fi.DataDir, "part.1") + f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_SYNC, 0) + if err != nil { + t.Fatalf("Failed to open %s: %s\n", filePath, err) + } + f.Write([]byte("oops")) // Will cause bitrot error + f.Close() + break + } + + } + + onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) + if !modTime.Equal(test.expectedTime) { + t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", + i+1, test.expectedTime, modTime) + } + + availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) + test.errs = newErrs + + if test._tamperBackend != noTamper { + if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { + t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", + i+1, erasureDisks[tamperedIndex]) + } + } + }) + } +} + +// TestListOnlineDisksSmallObjects - checks if listOnlineDisks and outDatedDisks +// are consistent with each other. +func TestListOnlineDisksSmallObjects(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obj, disks, err := prepareErasure16(ctx) + if err != nil { + t.Fatalf("Prepare Erasure backend failed - %v", err) + } + defer obj.Shutdown(context.Background()) + defer removeRoots(disks) + + type tamperKind int + const ( + noTamper tamperKind = iota + deletePart tamperKind = iota + corruptPart tamperKind = iota + ) + timeSentinel := time.Unix(1, 0).UTC() + threeNanoSecs := time.Unix(3, 0).UTC() + fourNanoSecs := time.Unix(4, 0).UTC() + modTimesThreeNone := []time.Time{ + threeNanoSecs, threeNanoSecs, threeNanoSecs, threeNanoSecs, + threeNanoSecs, threeNanoSecs, threeNanoSecs, + timeSentinel, timeSentinel, timeSentinel, timeSentinel, + timeSentinel, timeSentinel, timeSentinel, timeSentinel, + timeSentinel, + } + modTimesThreeFour := []time.Time{ + threeNanoSecs, threeNanoSecs, threeNanoSecs, threeNanoSecs, + threeNanoSecs, threeNanoSecs, threeNanoSecs, threeNanoSecs, + fourNanoSecs, fourNanoSecs, fourNanoSecs, fourNanoSecs, + fourNanoSecs, fourNanoSecs, fourNanoSecs, fourNanoSecs, + } + testCases := []struct { + modTimes []time.Time + expectedTime time.Time + errs []error + _tamperBackend tamperKind + }{ + { + modTimes: modTimesThreeFour, + expectedTime: fourNanoSecs, + errs: []error{ + nil, nil, nil, nil, nil, nil, nil, nil, nil, + nil, nil, nil, nil, nil, nil, nil, + }, + _tamperBackend: noTamper, + }, + { + modTimes: modTimesThreeNone, + expectedTime: threeNanoSecs, + errs: []error{ + // Disks that have a valid xl.meta. + nil, nil, nil, nil, nil, nil, nil, + // Majority of disks don't have xl.meta. + errFileNotFound, errFileNotFound, + errFileNotFound, errFileNotFound, + errFileNotFound, errDiskAccessDenied, + errDiskNotFound, errFileNotFound, + errFileNotFound, + }, + _tamperBackend: deletePart, + }, + { + modTimes: modTimesThreeNone, + expectedTime: threeNanoSecs, + errs: []error{ + // Disks that have a valid xl.meta. + nil, nil, nil, nil, nil, nil, nil, + // Majority of disks don't have xl.meta. + errFileNotFound, errFileNotFound, + errFileNotFound, errFileNotFound, + errFileNotFound, errDiskAccessDenied, + errDiskNotFound, errFileNotFound, + errFileNotFound, + }, + _tamperBackend: corruptPart, + }, + } + + bucket := "bucket" + err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{}) + if err != nil { + t.Fatalf("Failed to make a bucket %v", err) + } + + object := "object" + data := bytes.Repeat([]byte("a"), smallFileThreshold/2) + z := obj.(*erasureServerPools) + erasureDisks := z.serverPools[0].sets[0].getDisks() + for i, test := range testCases { + t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { + + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true) + _, err := getLatestFileInfo(ctx, partsMetadata, errs) + if err != nil { + t.Fatalf("Failed to getLatestFileInfo %v", err) + } + + for j := range partsMetadata { + if errs[j] != nil { + t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) + } + partsMetadata[j].ModTime = test.modTimes[j] + } + + if erasureDisks, err = writeUniqueFileInfo(ctx, erasureDisks, bucket, object, partsMetadata, diskCount(erasureDisks)); err != nil { + t.Fatal(ctx, err) + } + + tamperedIndex := -1 + switch test._tamperBackend { + case deletePart: + for index, err := range test.errs { + if err != nil { + continue + } + // Remove a part from a disk + // which has a valid xl.meta, + // and check if that disk + // appears in outDatedDisks. + tamperedIndex = index + dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) + if dErr != nil { + t.Fatalf("Test %d: Failed to delete %s - %v", i+1, + pathJoin(object, xlStorageFormatFile), dErr) + } + break + } + case corruptPart: + for index, err := range test.errs { + if err != nil { + continue + } + // Corrupt a part from a disk + // which has a valid xl.meta, + // and check if that disk + // appears in outDatedDisks. + tamperedIndex = index + filePath := pathJoin(erasureDisks[index].String(), bucket, object, xlStorageFormatFile) + f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_SYNC, 0) + if err != nil { + t.Fatalf("Failed to open %s: %s\n", filePath, err) + } + f.Write([]byte("oops")) // Will cause bitrot error + f.Close() + break + } + + } + partsMetadata, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", true) + _, err = getLatestFileInfo(ctx, partsMetadata, errs) + if err != nil { + t.Fatalf("Failed to getLatestFileInfo %v", err) + } + + onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) + if !modTime.Equal(test.expectedTime) { + t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", + i+1, test.expectedTime, modTime) + } + + availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) + test.errs = newErrs + + if test._tamperBackend != noTamper { + if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { + t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", + i+1, erasureDisks[tamperedIndex]) + } + } + }) } } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index c9fddec55..8ecc8e500 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "errors" "fmt" @@ -230,8 +231,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t } // Heals an object by re-writing corrupt/missing erasure blocks. -func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, - versionID string, partsMetadata []FileInfo, errs []error, lfi FileInfo, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { +func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, partsMetadata []FileInfo, errs []error, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { dryRun := opts.DryRun scanMode := opts.ScanMode @@ -379,6 +379,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s dataDir = migrateDataDir } + var inlineBuffers []*bytes.Buffer + if len(latestMeta.Parts) <= 1 && latestMeta.Size < smallFileThreshold { + inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks)) + } + if !latestMeta.Deleted || latestMeta.TransitionStatus != lifecycle.TransitionComplete { result.DataBlocks = latestMeta.Erasure.DataBlocks result.ParityBlocks = latestMeta.Erasure.ParityBlocks @@ -398,6 +403,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } erasureInfo := latestMeta.Erasure + for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { partSize := latestMeta.Parts[partIndex].Size partActualSize := latestMeta.Parts[partIndex].ActualSize @@ -414,7 +420,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s if latestMeta.XLV1 { partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) } - readers[i] = newBitrotReader(disk, nil, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) + readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) } writers := make([]io.Writer, len(outDatedDisks)) for i, disk := range outDatedDisks { @@ -422,8 +428,13 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s continue } partPath := pathJoin(tmpID, dataDir, fmt.Sprintf("part.%d", partNumber)) - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, - tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true) + if len(inlineBuffers) > 0 { + inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size))) + writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) + } else { + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, + tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize(), true) + } } err = erasure.Heal(ctx, readers, writers, partSize) closeBitrotReaders(readers) @@ -453,6 +464,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s Algorithm: checksumAlgo, Hash: bitrotWriterSum(writers[i]), }) + if len(inlineBuffers) > 0 { + partsMetadata[i].Data = inlineBuffers[i].Bytes() + } else { + partsMetadata[i].Data = nil + } } // If all disks are having errors, we give up. @@ -462,6 +478,25 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } } + if len(inlineBuffers) > 0 { + // Write directly... + if outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, bucket, object, partsMetadata, diskCount(outDatedDisks)); err != nil { + logger.LogIf(ctx, err) + return result, toObjectErr(err, bucket, object) + } + result.ObjectSize = latestMeta.Size + for _, disk := range outDatedDisks { + if disk == OfflineDisk { + continue + } + for i, v := range result.Before.Drives { + if v.Endpoint == disk.String() { + result.After.Drives[i].State = madmin.DriveStateOk + } + } + } + return result, nil + } defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpID, len(storageDisks)/2+1) // Generate and write `xl.meta` generated from other disks. @@ -830,7 +865,13 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version storageEndpoints := er.getEndpoints() // Read metadata files from all the disks - partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) + + // When versionID is empty, we read directly from the `null` versionID for healing. + if versionID == "" { + versionID = nullVersionID + } + + partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, true) if isAllNotFound(errs) { err = toObjectErr(errFileNotFound, bucket, object) @@ -841,11 +882,11 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID, er.defaultParityCount), err } - fi, err := getLatestFileInfo(healCtx, partsMetadata, errs) + _, err = getLatestFileInfo(healCtx, partsMetadata, errs) if err != nil { return er.purgeObjectDangling(healCtx, bucket, object, versionID, partsMetadata, errs, []error{}, opts) } // Heal the object. - return er.healObject(healCtx, bucket, object, versionID, partsMetadata, errs, fi, opts) + return er.healObject(healCtx, bucket, object, versionID, partsMetadata, errs, opts) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 2eec27276..65d14edc7 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "errors" "fmt" @@ -735,10 +736,21 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st }() writers := make([]io.Writer, len(onlineDisks)) + dataSize := data.Size() + var inlineBuffers []*bytes.Buffer + if dataSize >= 0 && dataSize < smallFileThreshold { + inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) + } for i, disk := range onlineDisks { if disk == nil { continue } + + if len(inlineBuffers) > 0 { + inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(data.Size()))) + writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) + continue + } writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize(), false) } @@ -770,6 +782,9 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st onlineDisks[i] = nil continue } + if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { + partsMetadata[i].Data = inlineBuffers[i].Bytes() + } partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize()) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ PartNumber: 1, @@ -797,16 +812,29 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st partsMetadata[index].Metadata = opts.UserDefined partsMetadata[index].Size = n partsMetadata[index].ModTime = modTime + if len(inlineBuffers) > 0 && inlineBuffers[index] != nil { + partsMetadata[index].Data = inlineBuffers[index].Bytes() + } } // Write unique `xl.meta` for each disk. - if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) - } + if len(inlineBuffers) == 0 { + if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) + } - // Rename the successfully written temporary object to final location. - if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil { - return ObjectInfo{}, toObjectErr(err, bucket, object) + // Rename the successfully written temporary object to final location. + if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + } else { + // Write directly... + if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, bucket, object, partsMetadata, writeQuorum); err != nil { + logger.LogIf(ctx, err) + return ObjectInfo{}, toObjectErr(err, bucket, object) + } } // Whether a disk was initially or becomes offline diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index 8373b283a..1feaa625a 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -24,6 +24,7 @@ import ( "io" "io/ioutil" "os" + "strconv" "testing" humanize "github.com/dustin/go-humanize" @@ -109,6 +110,10 @@ func TestErasureDeleteObjectBasic(t *testing.T) { for _, test := range testCases { test := test t.Run("", func(t *testing.T) { + _, err := xl.GetObjectInfo(ctx, "bucket", "dir/obj", ObjectOptions{}) + if err != nil { + t.Fatal("dir/obj not found before last test") + } _, actualErr := xl.DeleteObject(ctx, test.bucket, test.object, ObjectOptions{}) if test.expectedErr != nil && actualErr != test.expectedErr { t.Errorf("Expected to fail with %s, but failed with %s", test.expectedErr, actualErr) @@ -462,7 +467,7 @@ func TestPutObjectNoQuorum(t *testing.T) { object := "object" opts := ObjectOptions{} // Create "object" under "bucket". - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold*2)), smallFileThreshold*2, "", ""), opts) if err != nil { t.Fatal(err) } @@ -470,8 +475,8 @@ func TestPutObjectNoQuorum(t *testing.T) { // Make 9 disks offline, which leaves less than quorum number of disks // in a 16 disk Erasure setup. The original disks are 'replaced' with // naughtyDisks that fail after 'f' successful StorageAPI method - // invocations, where f - [0,3) - for f := 0; f < 3; f++ { + // invocations, where f - [0,4) + for f := 0; f < 4; f++ { diskErrors := make(map[int]error) for i := 0; i <= f; i++ { diskErrors[i] = nil @@ -491,13 +496,78 @@ func TestPutObjectNoQuorum(t *testing.T) { } z.serverPools[0].erasureDisksMu.Unlock() // Upload new content to same object "object" - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold*2)), smallFileThreshold*2, "", ""), opts) if !errors.Is(err, errErasureWriteQuorum) { t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) } } } +func TestPutObjectNoQuorumSmall(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create an instance of xl backend. + obj, fsDirs, err := prepareErasure16(ctx) + if err != nil { + t.Fatal(err) + } + + // Cleanup backend directories. + defer obj.Shutdown(context.Background()) + defer removeRoots(fsDirs) + + z := obj.(*erasureServerPools) + xl := z.serverPools[0].sets[0] + + // Create "bucket" + err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{}) + if err != nil { + t.Fatal(err) + } + + bucket := "bucket" + object := "object" + opts := ObjectOptions{} + // Create "object" under "bucket". + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{'a'}, smallFileThreshold/2)), smallFileThreshold/2, "", ""), opts) + if err != nil { + t.Fatal(err) + } + + // Make 9 disks offline, which leaves less than quorum number of disks + // in a 16 disk Erasure setup. The original disks are 'replaced' with + // naughtyDisks that fail after 'f' successful StorageAPI method + // invocations, where f - [0,2) + for f := 0; f < 2; f++ { + t.Run("exec-"+strconv.Itoa(f), func(t *testing.T) { + diskErrors := make(map[int]error) + for i := 0; i <= f; i++ { + diskErrors[i] = nil + } + erasureDisks := xl.getDisks() + for i := range erasureDisks[:9] { + switch diskType := erasureDisks[i].(type) { + case *naughtyDisk: + erasureDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk) + default: + erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk) + } + } + z.serverPools[0].erasureDisksMu.Lock() + xl.getDisks = func() []StorageAPI { + return erasureDisks + } + z.serverPools[0].erasureDisksMu.Unlock() + // Upload new content to same object "object" + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(bytes.Repeat([]byte{byte(f)}, smallFileThreshold/2)), smallFileThreshold/2, "", ""), opts) + if !errors.Is(err, errErasureWriteQuorum) { + t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + } + }) + } +} + func TestObjectQuorumFromMeta(t *testing.T) { ExecObjectLayerTestWithDirs(t, testObjectQuorumFromMeta) } diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 06cb03847..eea951582 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -913,7 +913,7 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error { return errDiskNotFound } // Attempt to create MinIO internal buckets. - return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket) + return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, minioMetaTmpDeletedBucket, dataUsageBucket, minioMetaTmpBucket+"-old") } // Initialize a new set of set formats which will be written to all disks. diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index f19992c64..1c603f404 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -128,7 +128,7 @@ func (e *metaCacheEntry) fileInfo(bucket string) (*FileInfo, error) { }, nil } if e.cached == nil { - fi, err := getFileInfo(e.metadata, bucket, e.name, "") + fi, err := getFileInfo(e.metadata, bucket, e.name, "", false) if err != nil { return nil, err } diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 9e66b7065..16d5c3ebd 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -55,6 +55,7 @@ type WalkDirOptions struct { // WalkDir will traverse a directory and return all entries found. // On success a sorted meta cache stream will be returned. +// Metadata has data stripped, if any. func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error { // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(opts.Bucket) @@ -94,7 +95,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // behavior. out <- metaCacheEntry{ name: opts.BaseDir, - metadata: metadata, + metadata: xlMetaV2TrimData(metadata), } } else { if st, err := os.Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { @@ -156,6 +157,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ logger.LogIf(ctx, err) continue } + meta.metadata = xlMetaV2TrimData(meta.metadata) meta.name = strings.TrimSuffix(entry, xlStorageFormatFile) meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = pathJoin(current, meta.name) diff --git a/cmd/object-api-deleteobject_test.go b/cmd/object-api-deleteobject_test.go index deefdbbf9..a7e7b8e37 100644 --- a/cmd/object-api-deleteobject_test.go +++ b/cmd/object-api-deleteobject_test.go @@ -91,14 +91,16 @@ func testDeleteObject(obj ObjectLayer, instanceType string, t TestErrHandler) { for _, object := range testCase.objectToUploads { md5Bytes := md5.Sum([]byte(object.content)) - _, err = obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, strings.NewReader(object.content), + oi, err := obj.PutObject(context.Background(), testCase.bucketName, object.name, mustGetPutObjReader(t, strings.NewReader(object.content), int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{}) + t.Log(oi) if err != nil { t.Fatalf("%s : %s", instanceType, err.Error()) } } - _, _ = obj.DeleteObject(context.Background(), testCase.bucketName, testCase.pathToDelete, ObjectOptions{}) + oi, err := obj.DeleteObject(context.Background(), testCase.bucketName, testCase.pathToDelete, ObjectOptions{}) + t.Log(oi, err) result, err := obj.ListObjects(context.Background(), testCase.bucketName, "", "", "", 1000) if err != nil { diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 699a00cbc..7dd9c0c2a 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -207,6 +207,9 @@ const SlashSeparator = "/" // retainSlash - retains slash from a path. func retainSlash(s string) string { + if s == "" { + return s + } return strings.TrimSuffix(s, SlashSeparator) + SlashSeparator } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 6e90e5ca0..b7faaaa1e 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -54,6 +54,7 @@ import ( "time" "github.com/fatih/color" + "github.com/gorilla/mux" "github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio-go/v7/pkg/signer" @@ -233,13 +234,7 @@ func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) { // Using this interface, functionalities to be used in tests can be // made generalized, and can be integrated in benchmarks/unit tests/go check suite tests. type TestErrHandler interface { - Log(args ...interface{}) - Logf(format string, args ...interface{}) - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Failed() bool - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) + testing.TB } const ( diff --git a/cmd/xl-storage-format-utils.go b/cmd/xl-storage-format-utils.go index 9a1b284d3..c6df24078 100644 --- a/cmd/xl-storage-format-utils.go +++ b/cmd/xl-storage-format-utils.go @@ -73,13 +73,18 @@ func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersion }, nil } -func getFileInfo(xlMetaBuf []byte, volume, path, versionID string) (FileInfo, error) { +func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (FileInfo, error) { if isXL2V1Format(xlMetaBuf) { var xlMeta xlMetaV2 if err := xlMeta.Load(xlMetaBuf); err != nil { return FileInfo{}, err } - return xlMeta.ToFileInfo(volume, path, versionID) + fi, err := xlMeta.ToFileInfo(volume, path, versionID) + if !data || err != nil { + return fi, err + } + fi.Data = xlMeta.data.find(fi.DataDir) + return fi, nil } xlMeta := &xlMetaV1Object{} diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 4f9f415ea..9b07fb186 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -18,12 +18,15 @@ package cmd import ( "bytes" + "encoding/binary" "errors" "fmt" "sort" "strings" "time" + "github.com/tinylib/msgp/msgp" + "github.com/google/uuid" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -33,28 +36,55 @@ var ( // XL header specifies the format xlHeader = [4]byte{'X', 'L', '2', ' '} - // XLv2 version 1 - xlVersionV1 = [4]byte{'1', ' ', ' ', ' '} + // Current version being written. + xlVersionCurrent [4]byte ) -func checkXL2V1(buf []byte) error { +const ( + // Breaking changes. + // Newer versions cannot be read by older software. + // This will prevent downgrades to incompatible versions. + xlVersionMajor = 1 + + // Non breaking changes. + // Bumping this is informational, but should be done + // if any change is made to the data stored, bumping this + // will allow to detect the exact version later. + xlVersionMinor = 1 +) + +func init() { + binary.LittleEndian.PutUint16(xlVersionCurrent[0:2], xlVersionMajor) + binary.LittleEndian.PutUint16(xlVersionCurrent[2:4], xlVersionMinor) +} + +// checkXL2V1 will check if the metadata has correct header and is a known major version. +// The remaining payload and versions are returned. +func checkXL2V1(buf []byte) (payload []byte, major, minor uint16, err error) { if len(buf) <= 8 { - return fmt.Errorf("xlMeta: no data") + return payload, 0, 0, fmt.Errorf("xlMeta: no data") } if !bytes.Equal(buf[:4], xlHeader[:]) { - return fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4]) + return payload, 0, 0, fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4]) } - if !bytes.Equal(buf[4:8], xlVersionV1[:]) { - return fmt.Errorf("xlMeta: unknown XLv2 version, expected %v, got %v", xlVersionV1[:4], buf[4:8]) + if bytes.Equal(buf[4:8], []byte("1 ")) { + // Set as 1,0. + major, minor = 1, 0 + } else { + major, minor = binary.LittleEndian.Uint16(buf[4:6]), binary.LittleEndian.Uint16(buf[6:8]) + } + if major > xlVersionMajor { + return buf[8:], major, minor, fmt.Errorf("xlMeta: unknown major version %d found", major) } - return nil + return buf[8:], major, minor, nil } func isXL2V1Format(buf []byte) bool { - return checkXL2V1(buf) == nil + _, _, _, err := checkXL2V1(buf) + return err == nil } // The []journal contains all the different versions of the object. @@ -199,6 +229,317 @@ func (j xlMetaV2Version) Valid() bool { // the journals for the object. type xlMetaV2 struct { Versions []xlMetaV2Version `json:"Versions" msg:"Versions"` + + // data will contain raw data if any. + // data will be one or more versions indexed by storage dir. + // To remove all data set to nil. + data xlMetaInlineData `msg:"-"` +} + +// xlMetaInlineData is serialized data in [string][]byte pairs. +// +//msgp:ignore xlMetaInlineData +type xlMetaInlineData []byte + +// xlMetaInlineDataVer indicates the vesrion of the inline data structure. +const xlMetaInlineDataVer = 1 + +// versionOK returns whether the version is ok. +func (x xlMetaInlineData) versionOK() bool { + if len(x) == 0 { + return true + } + return x[0] > 0 && x[0] <= xlMetaInlineDataVer +} + +// afterVersion returns the payload after the version, if any. +func (x xlMetaInlineData) afterVersion() []byte { + if len(x) == 0 { + return x + } + return x[1:] +} + +// find the data with key s. +// Returns nil if not for or an error occurs. +func (x xlMetaInlineData) find(key string) []byte { + if len(x) == 0 || !x.versionOK() { + return nil + } + sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion()) + if err != nil || sz == 0 { + return nil + } + for i := uint32(0); i < sz; i++ { + var found []byte + found, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil || sz == 0 { + return nil + } + if string(found) == key { + val, _, _ := msgp.ReadBytesZC(buf) + return val + } + // Skip it + _, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + return nil + } + } + return nil +} + +// validate checks if the data is valid. +// It does not check integrity of the stored data. +func (x xlMetaInlineData) validate() error { + if len(x) == 0 { + return nil + } + if !x.versionOK() { + return fmt.Errorf("xlMetaInlineData: unknown version 0x%x", x[0]) + } + + sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion()) + if err != nil { + return err + } + for i := uint32(0); i < sz; i++ { + var key []byte + key, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + return err + } + if len(key) == 0 { + return fmt.Errorf("xlMetaInlineData: key %d is length 0", i) + } + _, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + return err + } + } + return nil +} + +// validate checks if the data is valid. +// It does not check integrity of the stored data. +func (x xlMetaInlineData) list() ([]string, error) { + if len(x) == 0 { + return nil, nil + } + if !x.versionOK() { + return nil, errors.New("xlMetaInlineData: unknown version") + } + + sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion()) + if err != nil { + return nil, err + } + keys := make([]string, 0, sz) + for i := uint32(0); i < sz; i++ { + var key []byte + key, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + return keys, err + } + if len(key) == 0 { + return keys, fmt.Errorf("xlMetaInlineData: key %d is length 0", i) + } + keys = append(keys, string(key)) + // Skip data... + _, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + return keys, err + } + } + return keys, nil +} + +func (x xlMetaInlineData) entries() int { + if len(x) == 0 || !x.versionOK() { + return 0 + } + sz, _, _ := msgp.ReadMapHeaderBytes(x.afterVersion()) + return int(sz) +} + +// replace will add or replace a key/value pair. +func (x *xlMetaInlineData) replace(key string, value []byte) { + in := x.afterVersion() + sz, buf, _ := msgp.ReadMapHeaderBytes(in) + keys := make([][]byte, 0, sz+1) + vals := make([][]byte, 0, sz+1) + + // Version plus header... + plSize := 1 + msgp.MapHeaderSize + replaced := false + for i := uint32(0); i < sz; i++ { + var found, foundVal []byte + var err error + found, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + break + } + foundVal, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + break + } + plSize += len(found) + msgp.StringPrefixSize + msgp.ArrayHeaderSize + keys = append(keys, found) + if string(found) == key { + vals = append(vals, value) + plSize += len(value) + replaced = true + } else { + vals = append(vals, foundVal) + plSize += len(foundVal) + } + } + // Add one more. + if !replaced { + keys = append(keys, []byte(key)) + vals = append(vals, value) + plSize += len(key) + len(value) + msgp.StringPrefixSize + msgp.ArrayHeaderSize + } + + // Reserialize... + payload := make([]byte, 1, plSize) + payload[0] = xlMetaInlineDataVer + payload = msgp.AppendMapHeader(payload, uint32(len(keys))) + for i := range keys { + payload = msgp.AppendStringFromBytes(payload, keys[i]) + payload = msgp.AppendBytes(payload, vals[i]) + } + *x = payload + if err := x.validate(); err != nil { + panic(err) + } +} + +// rename will rename a key. +// Returns whether the key was found. +func (x *xlMetaInlineData) rename(oldKey, newKey string) bool { + in := x.afterVersion() + sz, buf, _ := msgp.ReadMapHeaderBytes(in) + keys := make([][]byte, 0, sz) + vals := make([][]byte, 0, sz) + + // Version plus header... + plSize := 1 + msgp.MapHeaderSize + found := false + for i := uint32(0); i < sz; i++ { + var foundKey, foundVal []byte + var err error + foundKey, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + break + } + foundVal, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + break + } + plSize += len(foundVal) + msgp.StringPrefixSize + msgp.ArrayHeaderSize + vals = append(vals, foundVal) + if string(foundKey) != oldKey { + keys = append(keys, foundKey) + plSize += len(foundKey) + } else { + keys = append(keys, []byte(newKey)) + plSize += len(newKey) + found = true + } + } + // If not found, just return. + if !found { + return false + } + + // Reserialize... + payload := make([]byte, 1, plSize) + payload[0] = xlMetaInlineDataVer + payload = msgp.AppendMapHeader(payload, uint32(len(keys))) + for i := range keys { + payload = msgp.AppendStringFromBytes(payload, keys[i]) + payload = msgp.AppendBytes(payload, vals[i]) + } + *x = payload + return true +} + +// remove will remove a key. +// Returns whether the key was found. +func (x *xlMetaInlineData) remove(key string) bool { + in := x.afterVersion() + sz, buf, _ := msgp.ReadMapHeaderBytes(in) + keys := make([][]byte, 0, sz) + vals := make([][]byte, 0, sz) + + // Version plus header... + plSize := 1 + msgp.MapHeaderSize + found := false + for i := uint32(0); i < sz; i++ { + var foundKey, foundVal []byte + var err error + foundKey, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + break + } + foundVal, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + break + } + if string(foundKey) != key { + plSize += msgp.StringPrefixSize + msgp.ArrayHeaderSize + len(foundKey) + len(foundVal) + keys = append(keys, foundKey) + vals = append(vals, foundVal) + } else { + found = true + } + } + // If not found, just return. + if !found { + return false + } + // If none left... + if len(keys) == 0 { + *x = nil + return true + } + + // Reserialize... + payload := make([]byte, 1, plSize) + payload[0] = xlMetaInlineDataVer + payload = msgp.AppendMapHeader(payload, uint32(len(keys))) + for i := range keys { + payload = msgp.AppendStringFromBytes(payload, keys[i]) + payload = msgp.AppendBytes(payload, vals[i]) + } + *x = payload + return true +} + +// xlMetaV2TrimData will trim any data from the metadata without unmarshalling it. +// If any error occurs the unmodified data is returned. +func xlMetaV2TrimData(buf []byte) []byte { + metaBuf, min, maj, err := checkXL2V1(buf) + if err != nil { + return buf + } + if maj == 1 && min < 1 { + // First version to carry data. + return buf + } + // Skip header + _, metaBuf, err = msgp.ReadBytesZC(metaBuf) + if err != nil { + logger.LogIf(GlobalContext, err) + return buf + } + // = input - current pos + ends := len(buf) - len(metaBuf) + if ends > len(buf) { + return buf + } + return buf[:ends] } // AddLegacy adds a legacy version, is only called when no prior @@ -219,12 +560,69 @@ func (z *xlMetaV2) AddLegacy(m *xlMetaV1Object) error { } // Load unmarshal and load the entire message pack. +// Note that references to the incoming buffer may be kept as data. func (z *xlMetaV2) Load(buf []byte) error { - if err := checkXL2V1(buf); err != nil { - return err + buf, _, minor, err := checkXL2V1(buf) + if err != nil { + return errFileCorrupt } - _, err := z.UnmarshalMsg(buf[8:]) - return err + switch minor { + case 0: + _, err = z.UnmarshalMsg(buf) + if err != nil { + return errFileCorrupt + } + return nil + case 1: + v, buf, err := msgp.ReadBytesZC(buf) + if err != nil { + return errFileCorrupt + } + _, err = z.UnmarshalMsg(v) + if err != nil { + return errFileCorrupt + } + // Add remaining data. + z.data = nil + if len(buf) > 0 { + z.data = buf + if err := z.data.validate(); err != nil { + return errFileCorrupt + } + } + default: + return errors.New("unknown metadata version") + } + return nil +} + +// AppendTo will marshal the data in z and append it to the provided slice. +func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { + sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst) + if cap(dst) < sz { + buf := make([]byte, len(dst), sz) + copy(buf, dst) + dst = buf + } + if err := z.data.validate(); err != nil { + return nil, err + } + + dst = append(dst, xlHeader[:]...) + dst = append(dst, xlVersionCurrent[:]...) + // Add "bin 32" type header to always have enough space. + // We will fill out the correct size when we know it. + dst = append(dst, 0xc6, 0, 0, 0, 0) + dataOffset := len(dst) + dst, err := z.MarshalMsg(dst) + if err != nil { + return nil, err + } + + // Update size... + binary.BigEndian.PutUint32(dst[dataOffset-4:dataOffset], uint32(len(dst)-dataOffset)) + + return append(dst, z.data...), nil } // AddVersion adds a new version @@ -304,6 +702,10 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error { ventry.ObjectV2.MetaUser[k] = v } } + // If asked to save data. + if len(fi.Data) > 0 || fi.Size == 0 { + z.data.replace(dd.String(), fi.Data) + } } if !ventry.Valid() { @@ -324,7 +726,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error { return nil } case ObjectType: - if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) { + if version.ObjectV2.VersionID == uv { z.Versions[i] = ventry return nil } @@ -332,7 +734,7 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error { // Allowing delete marker to replaced with an proper // object data type as well, this is not S3 complaint // behavior but kept here for future flexibility. - if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) { + if version.DeleteMarker.VersionID == uv { z.Versions[i] = ventry return nil } @@ -352,7 +754,7 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) versionID := "" var uv uuid.UUID // check if the version is not "null" - if !bytes.Equal(j.VersionID[:], uv[:]) { + if j.VersionID != uv { versionID = uuid.UUID(j.VersionID).String() } fi := FileInfo{ @@ -516,7 +918,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { return version.ObjectV1.DataDir, len(z.Versions) == 0, nil } case DeleteType: - if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) { + if version.DeleteMarker.VersionID == uv { if updateVersion { if len(z.Versions[i].DeleteMarker.MetaSys) == 0 { z.Versions[i].DeleteMarker.MetaSys = make(map[string][]byte) @@ -538,7 +940,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { return "", len(z.Versions) == 0, nil } case ObjectType: - if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) && updateVersion { + if version.ObjectV2.VersionID == uv && updateVersion { z.Versions[i].ObjectV2.MetaSys[VersionPurgeStatusKey] = []byte(fi.VersionPurgeStatus) return "", len(z.Versions) == 0, nil } @@ -550,7 +952,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { for _, version := range versions { switch version.Type { case ObjectType: - if bytes.Equal(version.ObjectV2.DataDir[:], dataDir[:]) { + if version.ObjectV2.DataDir == dataDir { sameDataDirCount++ } } @@ -564,7 +966,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { } switch version.Type { case ObjectType: - if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) { + if version.ObjectV2.VersionID == uv { if fi.TransitionStatus != "" { z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+"transition-status"] = []byte(fi.TransitionStatus) return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil diff --git a/cmd/xl-storage-format-v2_test.go b/cmd/xl-storage-format-v2_test.go new file mode 100644 index 000000000..f10d6c6ae --- /dev/null +++ b/cmd/xl-storage-format-v2_test.go @@ -0,0 +1,144 @@ +/* + * MinIO Cloud Storage, (C) 2021 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "testing" + "time" +) + +func TestXLV2FormatData(t *testing.T) { + failOnErr := func(err error) { + t.Helper() + if err != nil { + t.Fatal(err) + } + } + data := []byte("some object data") + data2 := []byte("some other object data") + + xl := xlMetaV2{} + fi := FileInfo{ + Volume: "volume", + Name: "object-name", + VersionID: "756100c6-b393-4981-928a-d49bbc164741", + IsLatest: true, + Deleted: false, + TransitionStatus: "", + DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", + XLV1: false, + ModTime: time.Now(), + Size: 0, + Mode: 0, + Metadata: nil, + Parts: nil, + Erasure: ErasureInfo{ + Algorithm: ReedSolomon.String(), + DataBlocks: 4, + ParityBlocks: 2, + BlockSize: 10000, + Index: 1, + Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8}, + Checksums: []ChecksumInfo{{ + PartNumber: 1, + Algorithm: HighwayHash256S, + Hash: nil, + }}, + }, + MarkDeleted: false, + DeleteMarkerReplicationStatus: "", + VersionPurgeStatus: "", + Data: data, + NumVersions: 1, + SuccessorModTime: time.Time{}, + } + + failOnErr(xl.AddVersion(fi)) + + fi.VersionID = mustGetUUID() + fi.DataDir = mustGetUUID() + fi.Data = data2 + failOnErr(xl.AddVersion(fi)) + + serialized, err := xl.AppendTo(nil) + failOnErr(err) + // Roundtrip data + var xl2 xlMetaV2 + failOnErr(xl2.Load(serialized)) + + // We should have one data entry + list, err := xl2.data.list() + failOnErr(err) + if len(list) != 2 { + t.Fatalf("want 1 entry, got %d", len(list)) + } + + if !bytes.Equal(xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"), data) { + t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef")) + } + if !bytes.Equal(xl2.data.find(fi.DataDir), data2) { + t.Fatal("Find data returned", xl2.data.find(fi.DataDir)) + } + + // Remove entry + xl2.data.remove(fi.DataDir) + failOnErr(xl2.data.validate()) + if xl2.data.find(fi.DataDir) != nil { + t.Fatal("Data was not removed:", xl2.data.find(fi.DataDir)) + } + if xl2.data.entries() != 1 { + t.Fatal("want 1 entry, got", xl2.data.entries()) + } + // Re-add + xl2.data.replace(fi.DataDir, fi.Data) + failOnErr(xl2.data.validate()) + if xl2.data.entries() != 2 { + t.Fatal("want 2 entries, got", xl2.data.entries()) + } + + // Replace entry + xl2.data.replace("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", data2) + failOnErr(xl2.data.validate()) + if xl2.data.entries() != 2 { + t.Fatal("want 2 entries, got", xl2.data.entries()) + } + if !bytes.Equal(xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"), data2) { + t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef")) + } + + if !xl2.data.rename("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", "new-key") { + t.Fatal("old key was not found") + } + failOnErr(xl2.data.validate()) + if !bytes.Equal(xl2.data.find("new-key"), data2) { + t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef")) + } + if xl2.data.entries() != 2 { + t.Fatal("want 2 entries, got", xl2.data.entries()) + } + if !bytes.Equal(xl2.data.find(fi.DataDir), data2) { + t.Fatal("Find data returned", xl2.data.find(fi.DataDir)) + } + + // Test trimmed + xl2 = xlMetaV2{} + failOnErr(xl2.Load(xlMetaV2TrimData(serialized))) + if len(xl2.data) != 0 { + t.Fatal("data, was not trimmed, bytes left:", len(xl2.data)) + } +} diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index f703aadae..a3020b99d 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -843,40 +843,49 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } - buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...)) - if err != nil { - return err - } - - // when data-dir is specified. Transition leverages existing DeleteObject - // api call to mark object as deleted. When object is pending transition, - // just update the metadata and avoid deleting data dir. - if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending { - filePath := pathJoin(volumeDir, path, dataDir) - if err = checkPathLength(filePath); err != nil { - return err - } - - tmpuuid := mustGetUUID() - if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)); err != nil { - return err - } - } - // transitioned objects maintains metadata on the source cluster. When transition // status is set, update the metadata to disk. if !lastVersion || fi.TransitionStatus != "" { + // when data-dir is specified. Transition leverages existing DeleteObject + // api call to mark object as deleted. When object is pending transition, + // just update the metadata and avoid deleting data dir. + if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending { + xlMeta.data.remove(dataDir) + filePath := pathJoin(volumeDir, path, dataDir) + if err = checkPathLength(filePath); err != nil { + return err + } + + tmpuuid := mustGetUUID() + if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)); err != nil { + if err != errFileNotFound { + return err + } + } + } + + buf, err = xlMeta.AppendTo(nil) + if err != nil { + return err + } + return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) } - // Delete the meta file, if there are no more versions the - // top level parent is automatically removed. - filePath := pathJoin(volumeDir, path, xlStorageFormatFile) + // Move everything to trash. + filePath := retainSlash(pathJoin(volumeDir, path)) if err = checkPathLength(filePath); err != nil { return err } + err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID())) - return s.deleteFile(volumeDir, filePath, false) + // Delete parents if needed. + filePath = retainSlash(pathutil.Dir(pathJoin(volumeDir, path))) + if filePath == retainSlash(volumeDir) { + return err + } + s.deleteFile(volumeDir, filePath, false) + return err } // WriteMetadata - writes FileInfo metadata for path at `xl.meta` @@ -890,26 +899,36 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi F if !isXL2V1Format(buf) { xlMeta, err = newXLMetaV2(fi) if err != nil { + logger.LogIf(ctx, err) return err } - buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...)) + buf, err = xlMeta.AppendTo(nil) if err != nil { + logger.LogIf(ctx, err) return err } + if err := xlMeta.Load(buf); err != nil { + panic(err) + } } else { if err = xlMeta.Load(buf); err != nil { + logger.LogIf(ctx, err) return err } if err = xlMeta.AddVersion(fi); err != nil { + logger.LogIf(ctx, err) return err } - buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...)) + buf, err = xlMeta.AppendTo(nil) if err != nil { + logger.LogIf(ctx, err) return err } + } return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf) + } func (s *xlStorage) renameLegacyMetadata(volumeDir, path string) (err error) { @@ -1005,16 +1024,20 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, errFileNotFound } - fi, err = getFileInfo(buf, volume, path, versionID) + fi, err = getFileInfo(buf, volume, path, versionID, readData) if err != nil { return fi, err } if readData { + if len(fi.Data) > 0 || fi.Size == 0 { + return fi, nil + } // Reading data for small objects when // - object has not yet transitioned // - object size lesser than 32KiB // - object has maximum of 1 parts + if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 { // Enable O_DIRECT optionally only if drive supports it. requireDirectIO := globalStorageClass.GetDMA() == storageclass.DMAReadWrite @@ -1801,8 +1824,9 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, return osErrToFileErr(err) } - fi, err := getFileInfo(srcBuf, dstVolume, dstPath, "") + fi, err := getFileInfo(srcBuf, dstVolume, dstPath, "", true) if err != nil { + logger.LogIf(ctx, err) return err } @@ -1955,29 +1979,36 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, return err } - dstBuf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...)) + dstBuf, err = xlMeta.AppendTo(nil) if err != nil { + logger.LogIf(ctx, err) return errFileCorrupt } - if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil { - return err - } - - // Commit data + // Commit data, if any if srcDataPath != "" { + if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil { + return err + } + tmpuuid := mustGetUUID() renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)) tmpuuid = mustGetUUID() renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)) if err = renameAll(srcDataPath, dstDataPath); err != nil { + logger.LogIf(ctx, err) return osErrToFileErr(err) } - } - // Commit meta-file - if err = renameAll(srcFilePath, dstFilePath); err != nil { - return osErrToFileErr(err) + // Commit meta-file + if err = renameAll(srcFilePath, dstFilePath); err != nil { + return osErrToFileErr(err) + } + } else { + // Write meta-file directly, no data + if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil { + return err + } } // Remove parent dir of the source file if empty @@ -2074,63 +2105,13 @@ func (s *xlStorage) bitrotVerify(partPath string, partSize int64, algo BitrotAlg // Close the file descriptor. defer file.Close() - - if algo != HighwayHash256S { - h := algo.New() - if _, err = io.Copy(h, file); err != nil { - // Premature failure in reading the object,file is corrupt. - return errFileCorrupt - } - if !bytes.Equal(h.Sum(nil), sum) { - return errFileCorrupt - } - return nil - } - - buf := make([]byte, shardSize) - h := algo.New() - hashBuf := make([]byte, h.Size()) fi, err := file.Stat() if err != nil { // Unable to stat on the file, return an expected error // for healing code to fix this file. return err } - - size := fi.Size() - - // Calculate the size of the bitrot file and compare - // it with the actual file size. - if size != bitrotShardFileSize(partSize, shardSize, algo) { - return errFileCorrupt - } - - var n int - for { - if size == 0 { - return nil - } - h.Reset() - n, err = file.Read(hashBuf) - if err != nil { - // Read's failed for object with right size, file is corrupt. - return err - } - size -= int64(n) - if size < int64(len(buf)) { - buf = buf[:size] - } - n, err = file.Read(buf) - if err != nil { - // Read's failed for object with right size, at different offsets. - return err - } - size -= int64(n) - h.Write(buf) - if !bytes.Equal(h.Sum(nil), hashBuf) { - return errFileCorrupt - } - } + return bitrotVerify(file, fi.Size(), partSize, algo, sum, shardSize) } func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (err error) { diff --git a/docs/bucket/versioning/xl-meta.go b/docs/bucket/versioning/xl-meta.go index 090fc7b68..817030b42 100644 --- a/docs/bucket/versioning/xl-meta.go +++ b/docs/bucket/versioning/xl-meta.go @@ -18,9 +18,12 @@ package main import ( "bytes" + "encoding/binary" "encoding/json" + "errors" "fmt" "io" + "io/ioutil" "log" "os" @@ -28,8 +31,6 @@ import ( "github.com/tinylib/msgp/msgp" ) -var xlHeader = [4]byte{'X', 'L', '2', ' '} - func main() { app := cli.NewApp() app.Copyright = "MinIO, Inc." @@ -53,6 +54,10 @@ GLOBAL FLAGS: Usage: "Print each file as a separate line without formatting", Name: "ndjson", }, + cli.BoolFlag{ + Usage: "Display inline data keys and sizes", + Name: "data", + }, } app.Action = func(c *cli.Context) error { @@ -75,39 +80,57 @@ GLOBAL FLAGS: r = f } - // Read header - var tmp [4]byte - _, err := io.ReadFull(r, tmp[:]) + b, err := ioutil.ReadAll(r) if err != nil { return err } - if !bytes.Equal(tmp[:], xlHeader[:]) { - return fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], tmp[:4]) - } - // Skip version check for now - _, err = io.ReadFull(r, tmp[:]) + b, _, minor, err := checkXL2V1(b) if err != nil { return err } - var buf bytes.Buffer - _, err = msgp.CopyToJSON(&buf, r) - if err != nil { - return err + buf := bytes.NewBuffer(nil) + var data xlMetaInlineData + switch minor { + case 0: + _, err = msgp.CopyToJSON(buf, bytes.NewBuffer(b)) + if err != nil { + return err + } + case 1: + v, b, err := msgp.ReadBytesZC(b) + if err != nil { + return err + } + _, err = msgp.CopyToJSON(buf, bytes.NewBuffer(v)) + if err != nil { + return err + } + data = b + default: + return errors.New("unknown metadata version") + } + + if c.Bool("data") { + b, err := data.json() + if err != nil { + return err + } + buf = bytes.NewBuffer(b) } if c.Bool("ndjson") { fmt.Println(buf.String()) continue } var msi map[string]interface{} - dec := json.NewDecoder(&buf) + dec := json.NewDecoder(buf) // Use number to preserve integers. dec.UseNumber() err = dec.Decode(&msi) if err != nil { return err } - b, err := json.MarshalIndent(msi, "", " ") + b, err = json.MarshalIndent(msi, "", " ") if err != nil { return err } @@ -120,3 +143,111 @@ GLOBAL FLAGS: log.Fatal(err) } } + +var ( + // XL header specifies the format + xlHeader = [4]byte{'X', 'L', '2', ' '} + + // Current version being written. + xlVersionCurrent [4]byte +) + +const ( + // Breaking changes. + // Newer versions cannot be read by older software. + // This will prevent downgrades to incompatible versions. + xlVersionMajor = 1 + + // Non breaking changes. + // Bumping this is informational, but should be done + // if any change is made to the data stored, bumping this + // will allow to detect the exact version later. + xlVersionMinor = 1 +) + +func init() { + binary.LittleEndian.PutUint16(xlVersionCurrent[0:2], xlVersionMajor) + binary.LittleEndian.PutUint16(xlVersionCurrent[2:4], xlVersionMinor) +} + +// checkXL2V1 will check if the metadata has correct header and is a known major version. +// The remaining payload and versions are returned. +func checkXL2V1(buf []byte) (payload []byte, major, minor uint16, err error) { + if len(buf) <= 8 { + return payload, 0, 0, fmt.Errorf("xlMeta: no data") + } + + if !bytes.Equal(buf[:4], xlHeader[:]) { + return payload, 0, 0, fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4]) + } + + if bytes.Equal(buf[4:8], []byte("1 ")) { + // Set as 1,0. + major, minor = 1, 0 + } else { + major, minor = binary.LittleEndian.Uint16(buf[4:6]), binary.LittleEndian.Uint16(buf[6:8]) + } + if major > xlVersionMajor { + return buf[8:], major, minor, fmt.Errorf("xlMeta: unknown major version %d found", major) + } + + return buf[8:], major, minor, nil +} + +const xlMetaInlineDataVer = 1 + +type xlMetaInlineData []byte + +// afterVersion returns the payload after the version, if any. +func (x xlMetaInlineData) afterVersion() []byte { + if len(x) == 0 { + return x + } + return x[1:] +} + +// versionOK returns whether the version is ok. +func (x xlMetaInlineData) versionOK() bool { + if len(x) == 0 { + return true + } + return x[0] > 0 && x[0] <= xlMetaInlineDataVer +} + +func (x xlMetaInlineData) json() ([]byte, error) { + if len(x) == 0 { + return []byte("{}"), nil + } + if !x.versionOK() { + return nil, errors.New("xlMetaInlineData: unknown version") + } + + sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion()) + if err != nil { + return nil, err + } + res := []byte("{") + + for i := uint32(0); i < sz; i++ { + var key, val []byte + key, buf, err = msgp.ReadMapKeyZC(buf) + if err != nil { + return nil, err + } + if len(key) == 0 { + return nil, fmt.Errorf("xlMetaInlineData: key %d is length 0", i) + } + // Skip data... + val, buf, err = msgp.ReadBytesZC(buf) + if err != nil { + return nil, err + } + if i > 0 { + res = append(res, ',') + } + s := fmt.Sprintf(`"%s":%d`, string(key), len(val)) + res = append(res, []byte(s)...) + } + res = append(res, '}') + return res, nil +}