From 061052786886c468ee3eab0b9411386e2147d982 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Tue, 12 Jul 2016 18:23:40 -0700 Subject: [PATCH] XL: PutObjectPart update checksum, re-read from xl.json for the part being written. (#2191) --- xl-v1-multipart.go | 17 +---------------- xl-v1-utils.go | 38 +++++++++++++++++++++++++++++++++++++ xl-v1-utils_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 16 deletions(-) diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index b5fbe11c3..e4539aec9 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -444,28 +444,13 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s updatedEInfos = append(updatedEInfos, partsMetadata[index].Erasure) } - var checksums []checkSumInfo for index, eInfo := range newEInfos { if eInfo.IsValid() { // Use a map to find union of checksums of parts that // we concurrently written and committed before this // part. N B For a different, concurrent upload of the // same part, the last written content remains. - checksumSet := make(map[string]checkSumInfo) - checksums = newEInfos[index].Checksum - for _, cksum := range checksums { - checksumSet[cksum.Name] = cksum - } - checksums = updatedEInfos[index].Checksum - for _, cksum := range checksums { - checksumSet[cksum.Name] = cksum - } - // Form the checksumInfo to be committed in xl.json - // from the map. - var finalChecksums []checkSumInfo - for _, cksum := range checksumSet { - finalChecksums = append(finalChecksums, cksum) - } + finalChecksums := unionChecksumInfos(newEInfos[index].Checksum, updatedEInfos[index].Checksum, partSuffix) updatedEInfos[index] = eInfo updatedEInfos[index].Checksum = finalChecksums } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index 2f4d73d6e..636e97ff1 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -132,3 +132,41 @@ func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, // Return structured `xl.json`. return xlMeta, nil } + +// Uses a map to find union of checksums of parts that were concurrently written +// but committed before this part. N B For a different, concurrent upload of +// the same part, the ongoing request's data/metadata prevails. +// cur - corresponds to parts written to disk before the ongoing putObjectPart request +// updated - corresponds to parts written to disk while the ongoing putObjectPart is in progress +// curPartName - name of the part that is being written +// returns []checkSumInfo containing the set union of checksums of parts that +// have been written so far incl. the part being written. +func unionChecksumInfos(cur []checkSumInfo, updated []checkSumInfo, curPartName string) []checkSumInfo { + checksumSet := make(map[string]checkSumInfo) + var checksums []checkSumInfo + + checksums = cur + for _, cksum := range checksums { + checksumSet[cksum.Name] = cksum + } + + checksums = updated + for _, cksum := range checksums { + // skip updating checksum of the part that is + // written in this request because the checksum + // from cur, corresponding to this part, + // should remain. + if cksum.Name == curPartName { + continue + } + checksumSet[cksum.Name] = cksum + } + + // Form the checksumInfo to be committed in xl.json + // from the map. + var finalChecksums []checkSumInfo + for _, cksum := range checksumSet { + finalChecksums = append(finalChecksums, cksum) + } + return finalChecksums +} diff --git a/xl-v1-utils_test.go b/xl-v1-utils_test.go index 03a4378d2..7745c5eb5 100644 --- a/xl-v1-utils_test.go +++ b/xl-v1-utils_test.go @@ -41,3 +41,49 @@ func TestReduceErrs(t *testing.T) { } } } + +// Test for unionChecksums +func TestUnionChecksumInfos(t *testing.T) { + cur := []checkSumInfo{ + {"part.1", "dummy", "cur-hash.1"}, + {"part.2", "dummy", "cur-hash.2"}, + {"part.3", "dummy", "cur-hash.3"}, + {"part.4", "dummy", "cur-hash.4"}, + {"part.5", "dummy", "cur-hash.5"}, + } + updated := []checkSumInfo{ + {"part.1", "dummy", "updated-hash.1"}, + {"part.2", "dummy", "updated-hash.2"}, + {"part.3", "dummy", "updated-hash.3"}, + } + curPartcksum := cur[0] // part.1 is the current part being written + + // Verify that hash of current part being written must be from cur []checkSumInfo + finalChecksums := unionChecksumInfos(cur, updated, curPartcksum.Name) + for _, cksum := range finalChecksums { + if cksum.Name == curPartcksum.Name && cksum.Hash != curPartcksum.Hash { + t.Errorf("expected Hash = %s but received Hash = %s\n", curPartcksum.Hash, cksum.Hash) + } + } + + // Verify that all part checksums are present in the union and nothing more. + // Map to store all unique part names + allPartNames := make(map[string]struct{}) + // Insert part names from cur and updated []checkSumInfo + for _, cksum := range cur { + allPartNames[cksum.Name] = struct{}{} + } + for _, cksum := range updated { + allPartNames[cksum.Name] = struct{}{} + } + // All parts must have an entry in the []checkSumInfo returned from unionChecksums + for _, finalcksum := range finalChecksums { + if _, ok := allPartNames[finalcksum.Name]; !ok { + t.Errorf("expected to find %s but not present in the union, where current part is %s\n", + finalcksum.Name, curPartcksum.Name) + } + } + if len(finalChecksums) != len(allPartNames) { + t.Error("Union of Checksums doesn't have same number of elements as unique parts in total") + } +}