From a7acfa6158ea53c7a38f2034d85fbb8c8111e08a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 21 Apr 2021 19:06:08 -0700 Subject: [PATCH] fix: pick valid FileInfo additionally based on dataDir (#12116) * fix: pick valid FileInfo additionally based on dataDir historically we have always relied on modTime to be consistent and same, we can now add additional reference to look for the same dataDir value. A dataDir is the same for an object at a given point in time for a given version, let's say a `null` version is overwritten in quorum we do not by mistake pick up the fileInfo's incorrectly. * make sure to not preserve fi.Data Signed-off-by: Harshavardhana --- cmd/erasure-healing-common.go | 54 +++++++++++--- cmd/erasure-healing-common_test.go | 48 ++++++------ cmd/erasure-healing.go | 6 +- cmd/erasure-metadata-utils.go | 44 +++++++++-- cmd/erasure-metadata.go | 10 ++- cmd/erasure-metadata_test.go | 12 ++- cmd/erasure-multipart.go | 113 +++++++++++++---------------- cmd/erasure-object.go | 25 +++---- 8 files changed, 189 insertions(+), 123 deletions(-) diff --git a/cmd/erasure-healing-common.go b/cmd/erasure-healing-common.go index a834f9b05..e8be444b3 100644 --- a/cmd/erasure-healing-common.go +++ b/cmd/erasure-healing-common.go @@ -25,9 +25,11 @@ import ( ) // commonTime returns a maximally occurring time from a list of time. -func commonTime(modTimes []time.Time) (modTime time.Time, count int) { +func commonTime(modTimes []time.Time, dataDirs []string) (modTime time.Time, dataDir string) { var maxima int // Counter for remembering max occurrence of elements. - timeOccurenceMap := make(map[int64]int) + + timeOccurenceMap := make(map[int64]int, len(modTimes)) + dataDirOccurenceMap := make(map[string]int, len(dataDirs)) // Ignore the uuid sentinel and count the rest. for _, time := range modTimes { if time.Equal(timeSentinel) { @@ -36,6 +38,13 @@ func commonTime(modTimes []time.Time) (modTime time.Time, count int) { timeOccurenceMap[time.UnixNano()]++ } + for _, dataDir := range dataDirs { + if dataDir == "" { + continue + } + dataDirOccurenceMap[dataDir]++ + } + // Find the common cardinality from previously collected // occurrences of elements. for nano, count := range timeOccurenceMap { @@ -46,8 +55,18 @@ func commonTime(modTimes []time.Time) (modTime time.Time, count int) { } } + // Find the common cardinality from the previously collected + // occurrences of elements. + var dmaxima int + for ddataDir, count := range dataDirOccurenceMap { + if count > dmaxima { + dmaxima = count + dataDir = ddataDir + } + } + // Return the collected common uuid. - return modTime, maxima + return modTime, dataDir } // Beginning of unix time is treated as sentinel value here. @@ -101,24 +120,33 @@ func listObjectModtimes(partsMetadata []FileInfo, errs []error) (modTimes []time // - a slice of disks where disk having 'older' xl.meta (or nothing) // are set to nil. // - latest (in time) of the maximally occurring modTime(s). -func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time) { +func listOnlineDisks(disks []StorageAPI, partsMetadata []FileInfo, errs []error) (onlineDisks []StorageAPI, modTime time.Time, dataDir string) { onlineDisks = make([]StorageAPI, len(disks)) // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) + dataDirs := make([]string, len(partsMetadata)) + for idx, fi := range partsMetadata { + if errs[idx] != nil { + continue + } + dataDirs[idx] = fi.DataDir + } + // Reduce list of UUIDs to a single common value. - modTime, _ = commonTime(modTimes) + modTime, dataDir = commonTime(modTimes, dataDirs) // Create a new online disks slice, which have common uuid. for index, t := range modTimes { - if t.Equal(modTime) { + if partsMetadata[index].IsValid() && t.Equal(modTime) && partsMetadata[index].DataDir == dataDir { onlineDisks[index] = disks[index] } else { onlineDisks[index] = nil } } - return onlineDisks, modTime + + return onlineDisks, modTime, dataDir } // Returns the latest updated FileInfo files and error in case of failure. @@ -131,16 +159,24 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) + dataDirs := make([]string, len(partsMetadata)) + for idx, fi := range partsMetadata { + if errs[idx] != nil { + continue + } + dataDirs[idx] = fi.DataDir + } + // Count all latest updated FileInfo values var count int var latestFileInfo FileInfo // Reduce list of UUIDs to a single common value - i.e. the last updated Time - modTime, _ := commonTime(modTimes) + modTime, dataDir := commonTime(modTimes, dataDirs) // Interate through all the modTimes and count the FileInfo(s) with latest time. for index, t := range modTimes { - if t.Equal(modTime) && partsMetadata[index].IsValid() { + if partsMetadata[index].IsValid() && t.Equal(modTime) && dataDir == partsMetadata[index].DataDir { latestFileInfo = partsMetadata[index] count++ } diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 6a190f1ab..541c110de 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -82,7 +82,7 @@ func TestCommonTime(t *testing.T) { // common modtime. Tests fail if modtime does not match. for i, testCase := range testCases { // Obtain a common mod time from modTimes slice. - ctime, _ := commonTime(testCase.times) + ctime, _ := commonTime(testCase.times, nil) if !testCase.time.Equal(ctime) { t.Fatalf("Test case %d, expect to pass but failed. Wanted modTime: %s, got modTime: %s\n", i+1, testCase.time, ctime) } @@ -181,8 +181,8 @@ func TestListOnlineDisks(t *testing.T) { z := obj.(*erasureServerPools) erasureDisks := z.serverPools[0].sets[0].getDisks() for i, test := range testCases { + test := test t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) if err != nil { t.Fatalf("Failed to putObject %v", err) @@ -196,7 +196,7 @@ func TestListOnlineDisks(t *testing.T) { for j := range partsMetadata { if errs[j] != nil { - t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) + t.Fatalf("expected error to be nil: %s", errs[j]) } partsMetadata[j].ModTime = test.modTimes[j] } @@ -215,8 +215,7 @@ func TestListOnlineDisks(t *testing.T) { tamperedIndex = index dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false) if dErr != nil { - t.Fatalf("Test %d: Failed to delete %s - %v", i+1, - filepath.Join(object, "part.1"), dErr) + t.Fatalf("Failed to delete %s - %v", filepath.Join(object, "part.1"), dErr) } break } @@ -242,19 +241,22 @@ func TestListOnlineDisks(t *testing.T) { } - onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) + onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs) if !modTime.Equal(test.expectedTime) { - t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", - i+1, test.expectedTime, modTime) + t.Fatalf("Expected modTime to be equal to %v but was found to be %v", + test.expectedTime, modTime) + } + if fi.DataDir != dataDir { + t.Fatalf("Expected dataDir to be equal to %v but was found to be %v", + fi.DataDir, dataDir) } - availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) test.errs = newErrs if test._tamperBackend != noTamper { if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { - t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", - i+1, erasureDisks[tamperedIndex]) + t.Fatalf("disk (%v) with part.1 missing is not a disk with available data", + erasureDisks[tamperedIndex]) } } }) @@ -354,22 +356,22 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { z := obj.(*erasureServerPools) erasureDisks := z.serverPools[0].sets[0].getDisks() for i, test := range testCases { + test := test t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{}) if err != nil { t.Fatalf("Failed to putObject %v", err) } partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true) - _, err := getLatestFileInfo(ctx, partsMetadata, errs) + fi, err := getLatestFileInfo(ctx, partsMetadata, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) } for j := range partsMetadata { if errs[j] != nil { - t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[j]) + t.Fatalf("expected error to be nil: %s", errs[j]) } partsMetadata[j].ModTime = test.modTimes[j] } @@ -392,8 +394,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { tamperedIndex = index dErr := erasureDisks[index].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) if dErr != nil { - t.Fatalf("Test %d: Failed to delete %s - %v", i+1, - pathJoin(object, xlStorageFormatFile), dErr) + t.Fatalf("Failed to delete %s - %v", pathJoin(object, xlStorageFormatFile), dErr) } break } @@ -424,10 +425,15 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { t.Fatalf("Failed to getLatestFileInfo %v", err) } - onlineDisks, modTime := listOnlineDisks(erasureDisks, partsMetadata, test.errs) + onlineDisks, modTime, dataDir := listOnlineDisks(erasureDisks, partsMetadata, test.errs) if !modTime.Equal(test.expectedTime) { - t.Fatalf("Test %d: Expected modTime to be equal to %v but was found to be %v", - i+1, test.expectedTime, modTime) + t.Fatalf("Expected modTime to be equal to %v but was found to be %v", + test.expectedTime, modTime) + } + + if fi.DataDir != dataDir { + t.Fatalf("Expected dataDir to be equal to %v but was found to be %v", + fi.DataDir, dataDir) } availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, bucket, object, madmin.HealDeepScan) @@ -435,8 +441,8 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { if test._tamperBackend != noTamper { if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil { - t.Fatalf("Test %d: disk (%v) with part.1 missing is not a disk with available data", - i+1, erasureDisks[tamperedIndex]) + t.Fatalf("disk (%v) with part.1 missing is not a disk with available data", + erasureDisks[tamperedIndex]) } } }) diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 8aadce618..ab7874c16 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -260,7 +260,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // List of disks having latest version of the object er.meta // (by modtime). - latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + latestDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // List of disks having all parts as per latest er.meta. availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode) @@ -350,7 +350,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // Latest FileInfo for reference. If a valid metadata is not // present, it is as good as object not found. - latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks) + latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, result.DataBlocks) if err != nil { return result, toObjectErr(err, bucket, object, versionID) } @@ -471,7 +471,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s Algorithm: checksumAlgo, Hash: bitrotWriterSum(writers[i]), }) - if len(inlineBuffers) > 0 { + if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { partsMetadata[i].Data = inlineBuffers[i].Bytes() } else { partsMetadata[i].Data = nil diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index 75c80c3f6..90f4a2770 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -148,11 +148,15 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve return metadataArray, g.Wait() } +// shuffleDisksAndPartsMetadataByIndex this function should be always used by GetObjectNInfo() +// and CompleteMultipartUpload code path, it is not meant to be used with PutObject, +// NewMultipartUpload metadata shuffling. func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) { shuffledDisks = make([]StorageAPI, len(disks)) shuffledPartsMetadata = make([]FileInfo, len(disks)) - var inconsistent int distribution := fi.Erasure.Distribution + + var inconsistent int for i, meta := range metaArr { if disks[i] == nil { // Assuming offline drives as inconsistent, @@ -161,6 +165,14 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, inconsistent++ continue } + if !meta.IsValid() { + inconsistent++ + continue + } + if len(fi.Data) != len(meta.Data) { + inconsistent++ + continue + } // check if erasure distribution order matches the index // position if this is not correct we discard the disk // and move to collect others @@ -180,18 +192,36 @@ func shuffleDisksAndPartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo, } // fall back to original distribution based order. - return shuffleDisksAndPartsMetadata(disks, metaArr, distribution) + return shuffleDisksAndPartsMetadata(disks, metaArr, fi) } -// Return shuffled partsMetadata depending on distribution. -func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, distribution []int) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) { - if distribution == nil { - return disks, partsMetadata - } +// Return shuffled partsMetadata depending on fi.Distribution. +// additional validation is attempted and invalid metadata is +// automatically skipped only when fi.ModTime is non-zero +// indicating that this is called during read-phase +func shuffleDisksAndPartsMetadata(disks []StorageAPI, partsMetadata []FileInfo, fi FileInfo) (shuffledDisks []StorageAPI, shuffledPartsMetadata []FileInfo) { shuffledDisks = make([]StorageAPI, len(disks)) shuffledPartsMetadata = make([]FileInfo, len(partsMetadata)) + distribution := fi.Erasure.Distribution + + init := fi.ModTime.IsZero() // Shuffle slice xl metadata for expected distribution. for index := range partsMetadata { + if disks[index] == nil { + continue + } + if !init && !partsMetadata[index].IsValid() { + // Check for parts metadata validity for only + // fi.ModTime is not empty - ModTime is always set, + // if object was ever written previously. + continue + } + if !init && len(fi.Data) != len(partsMetadata[index].Data) { + // Check for length of data parts only when + // fi.ModTime is not empty - ModTime is always set, + // if object was ever written previously. + continue + } blockIndex := distribution[index] shuffledPartsMetadata[blockIndex-1] = partsMetadata[index] shuffledDisks[blockIndex-1] = disks[index] diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index c9a99a6b6..c7bec4db1 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -232,15 +232,17 @@ func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIn return 0, 0, InvalidRange{} } -func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) { +func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (xmv FileInfo, e error) { metaHashes := make([]string, len(metaArr)) h := sha256.New() for i, meta := range metaArr { - if meta.IsValid() && meta.ModTime.Equal(modTime) { + if meta.IsValid() && meta.ModTime.Equal(modTime) && meta.DataDir == dataDir { for _, part := range meta.Parts { h.Write([]byte(fmt.Sprintf("part.%d", part.Number))) } h.Write([]byte(fmt.Sprintf("%v", meta.Erasure.Distribution))) + // make sure that length of Data is same + h.Write([]byte(fmt.Sprintf("%v", len(meta.Data)))) metaHashes[i] = hex.EncodeToString(h.Sum(nil)) h.Reset() } @@ -278,8 +280,8 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time. // pickValidFileInfo - picks one valid FileInfo content and returns from a // slice of FileInfo. -func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) { - return findFileInfoInQuorum(ctx, metaArr, modTime, quorum) +func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, dataDir string, quorum int) (xmv FileInfo, e error) { + return findFileInfoInQuorum(ctx, metaArr, modTime, dataDir, quorum) } // writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently. diff --git a/cmd/erasure-metadata_test.go b/cmd/erasure-metadata_test.go index d18eab792..b5bd79cb1 100644 --- a/cmd/erasure-metadata_test.go +++ b/cmd/erasure-metadata_test.go @@ -157,10 +157,11 @@ func TestObjectToPartOffset(t *testing.T) { } func TestFindFileInfoInQuorum(t *testing.T) { - getNFInfo := func(n int, quorum int, t int64) []FileInfo { + getNFInfo := func(n int, quorum int, t int64, dataDir string) []FileInfo { fi := newFileInfo("test", 8, 8) fi.AddObjectPart(1, "etag", 100, 100) fi.ModTime = time.Unix(t, 0) + fi.DataDir = dataDir fis := make([]FileInfo, n) for i := range fis { fis[i] = fi @@ -176,16 +177,19 @@ func TestFindFileInfoInQuorum(t *testing.T) { tests := []struct { fis []FileInfo modTime time.Time + dataDir string expectedErr error }{ { - fis: getNFInfo(16, 16, 1603863445), + fis: getNFInfo(16, 16, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"), modTime: time.Unix(1603863445, 0), + dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21", expectedErr: nil, }, { - fis: getNFInfo(16, 7, 1603863445), + fis: getNFInfo(16, 7, 1603863445, "36a21454-a2ca-11eb-bbaa-93a81c686f21"), modTime: time.Unix(1603863445, 0), + dataDir: "36a21454-a2ca-11eb-bbaa-93a81c686f21", expectedErr: errErasureReadQuorum, }, } @@ -193,7 +197,7 @@ func TestFindFileInfoInQuorum(t *testing.T) { for _, test := range tests { test := test t.Run("", func(t *testing.T) { - _, err := findFileInfoInQuorum(context.Background(), test.fis, test.modTime, 8) + _, err := findFileInfoInQuorum(context.Background(), test.fis, test.modTime, test.dataDir, 8) if err != test.expectedErr { t.Errorf("Expected %s, got %s", test.expectedErr, err) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index cc124d819..f78699012 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -66,10 +66,10 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - _, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + _, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) return err } @@ -283,74 +283,63 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec // operation(s) on the object. func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) { onlineDisks := er.getDisks() - parityBlocks := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) - if parityBlocks <= 0 { - parityBlocks = er.defaultParityCount + parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) + if parityDrives <= 0 { + parityDrives = er.defaultParityCount } - dataBlocks := len(onlineDisks) - parityBlocks - fi := newFileInfo(pathJoin(bucket, object), dataBlocks, parityBlocks) - + dataDrives := len(onlineDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. // establish the writeQuorum using this data - writeQuorum := dataBlocks - if dataBlocks == parityBlocks { + writeQuorum := dataDrives + if dataDrives == parityDrives { writeQuorum++ } - if opts.UserDefined["content-type"] == "" { - contentType := mimedb.TypeByExtension(path.Ext(object)) - opts.UserDefined["content-type"] = contentType - } + // Initialize parts metadata + partsMetadata := make([]FileInfo, len(onlineDisks)) - // Calculate the version to be saved. + fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives) if opts.Versioned { fi.VersionID = opts.VersionID if fi.VersionID == "" { fi.VersionID = mustGetUUID() } } - fi.DataDir = mustGetUUID() - fi.ModTime = UTCNow() - fi.Metadata = cloneMSS(opts.UserDefined) + + // Initialize erasure metadata. + for index := range partsMetadata { + partsMetadata[index] = fi + } + + // Guess content-type from the extension if possible. + if opts.UserDefined["content-type"] == "" { + opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) + } + + modTime := opts.MTime + if opts.MTime.IsZero() { + modTime = UTCNow() + } + + onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi) + + // Fill all the necessary metadata. + // Update `xl.meta` content on each disks. + for index := range partsMetadata { + partsMetadata[index].Metadata = opts.UserDefined + partsMetadata[index].ModTime = modTime + } uploadID := mustGetUUID() uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - tempUploadIDPath := uploadID - // Delete the tmp path later in case we fail to commit (ignore - // returned errors) - this will be a no-op in case of a commit - // success. - var online int - defer func() { - if online != len(onlineDisks) { - er.deleteObject(context.Background(), minioMetaTmpBucket, tempUploadIDPath, writeQuorum) - } - }() - - var partsMetadata = make([]FileInfo, len(onlineDisks)) - for i := range onlineDisks { - partsMetadata[i] = fi - } - - onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi.Erasure.Distribution) - - var err error // Write updated `xl.meta` to all disks. - onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, writeQuorum) - if err != nil { - return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) - } - - // Attempt to rename temp upload object to actual upload path object - _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil) - if err != nil { + if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } - online = countOnlineDisks(onlineDisks) - // Return success. return uploadID, nil } @@ -435,10 +424,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // List all online disks. - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -545,10 +534,10 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Get current highest version based on re-read partsMetadata. - onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) + onlineDisks, modTime, dataDir = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err = pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return pi, err } @@ -633,10 +622,10 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, readQuorum) if err != nil { return result, err } @@ -682,10 +671,10 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } - _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + _, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) if err != nil { return result, err } @@ -787,7 +776,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str return oi, toObjectErr(reducedErr, bucket, object) } - onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, partsMetadata, errs) + + // Pick one from the first valid metadata. + fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, dataDir, writeQuorum) + if err != nil { + return oi, err + } // Calculate full object size. var objectSize int64 @@ -795,12 +790,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Calculate consolidated actual size. var objectActualSize int64 - // Pick one from the first valid metadata. - fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) - if err != nil { - return oi, err - } - // Order online disks in accordance with distribution order. // Order parts metadata in accordance with distribution order. onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index b12b1cc90..b8c734113 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -81,10 +81,10 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d } // List all online disks. - onlineDisks, modTime := listOnlineDisks(storageDisks, metaArr, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(storageDisks, metaArr, errs) // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } @@ -421,10 +421,10 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } // List all online disks. - onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) + onlineDisks, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - fi, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err = pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return fi, nil, nil, err } @@ -435,7 +435,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s missingBlocks++ continue } - if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) { + if metaArr[i].IsValid() && metaArr[i].ModTime.Equal(fi.ModTime) && metaArr[i].DataDir == fi.DataDir { continue } missingBlocks++ @@ -658,7 +658,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st // Order disks according to erasure distribution var onlineDisks []StorageAPI - onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi.Erasure.Distribution) + onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi) erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { @@ -755,6 +755,8 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st } if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { partsMetadata[i].Data = inlineBuffers[i].Bytes() + } else { + partsMetadata[i].Data = nil } partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize()) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ @@ -783,9 +785,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st partsMetadata[index].Metadata = opts.UserDefined partsMetadata[index].Size = n partsMetadata[index].ModTime = modTime - if len(inlineBuffers) > 0 && inlineBuffers[index] != nil { - partsMetadata[index].Data = inlineBuffers[index].Bytes() - } } // Rename the successfully written temporary object to final location. @@ -1183,10 +1182,10 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -1234,10 +1233,10 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin } // List all online disks. - _, modTime := listOnlineDisks(disks, metaArr, errs) + _, modTime, dataDir := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. - fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) + fi, err := pickValidFileInfo(ctx, metaArr, modTime, dataDir, readQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) }