diff --git a/erasure-appendfile.go b/erasure-appendfile.go index 449633f0e..d604fc0d9 100644 --- a/erasure-appendfile.go +++ b/erasure-appendfile.go @@ -19,16 +19,16 @@ package main import "sync" // AppendFile - append data buffer at path. -func (e erasure) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) { +func (e erasureConfig) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) { // Split the input buffer into data and parity blocks. var blocks [][]byte - blocks, err = e.ReedSolomon.Split(dataBuffer) + blocks, err = e.reedSolomon.Split(dataBuffer) if err != nil { return 0, err } // Encode parity blocks using data blocks. - err = e.ReedSolomon.Encode(blocks) + err = e.reedSolomon.Encode(blocks) if err != nil { return 0, err } @@ -55,6 +55,10 @@ func (e erasure) AppendFile(volume, path string, dataBuffer []byte) (n int64, er wErrs[index] = errUnexpected return } + // Calculate hash. + e.hashWriters[blockIndex].Write(blocks[blockIndex]) + + // Successfully wrote. wErrs[index] = nil }(index, disk) } diff --git a/erasure-readfile.go b/erasure-readfile.go index 84fd2d255..0f088cd1b 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -16,82 +16,136 @@ package main -import "errors" +import ( + "encoding/hex" + "errors" +) + +// isValidBlock - calculates the checksum hash for the block and +// validates if its correct returns true for valid cases, false otherwise. +func (e erasureConfig) isValidBlock(volume, path string, blockIdx int) bool { + diskIndex := -1 + // Find out the right disk index for the input block index. + for index, blockIndex := range e.distribution { + if blockIndex == blockIdx { + diskIndex = index + } + } + // Unknown block index requested, treat it as error. + if diskIndex == -1 { + return false + } + // Disk is not present, treat entire block to be non existent. + if e.storageDisks[diskIndex] == nil { + return false + } + // Read everything for a given block and calculate hash. + hashBytes, err := hashSum(e.storageDisks[diskIndex], volume, path, newHash(e.checkSumAlgo)) + if err != nil { + return false + } + return hex.EncodeToString(hashBytes) == e.hashChecksums[diskIndex] +} // ReadFile - decoded erasure coded file. -func (e erasure) ReadFile(volume, path string, startOffset int64, buffer []byte) (int64, error) { - // Calculate the current encoded block size. - curEncBlockSize := getEncodedBlockLen(int64(len(buffer)), e.DataBlocks) - offsetEncOffset := getEncodedBlockLen(startOffset, e.DataBlocks) +func (e erasureConfig) ReadFile(volume, path string, size int64, blockSize int64) ([]byte, error) { + // Return data buffer. + var buffer []byte - // Allocate encoded blocks up to storage disks. - enBlocks := make([][]byte, len(e.storageDisks)) + // Total size left + totalSizeLeft := size - // Counter to keep success data blocks. - var successDataBlocksCount = 0 - var noReconstruct bool // Set for no reconstruction. + // Starting offset for reading. + startOffset := int64(0) - // Read from all the disks. - for index, disk := range e.storageDisks { - blockIndex := e.distribution[index] - 1 - if disk == nil { - continue + // Write until each parts are read and exhausted. + for totalSizeLeft > 0 { + // Calculate the proper block size. + var curBlockSize int64 + if blockSize < totalSizeLeft { + curBlockSize = blockSize + } else { + curBlockSize = totalSizeLeft } - // Initialize shard slice and fill the data from each parts. - enBlocks[blockIndex] = make([]byte, curEncBlockSize) - // Read the necessary blocks. - _, err := disk.ReadFile(volume, path, offsetEncOffset, enBlocks[blockIndex]) - if err != nil { - enBlocks[blockIndex] = nil - } - // Verify if we have successfully read all the data blocks. - if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil { - successDataBlocksCount++ - // Set when we have all the data blocks and no - // reconstruction is needed, so that we can avoid - // erasure reconstruction. - noReconstruct = successDataBlocksCount == e.DataBlocks - if noReconstruct { - // Break out we have read all the data blocks. - break + + // Calculate the current encoded block size. + curEncBlockSize := getEncodedBlockLen(curBlockSize, e.dataBlocks) + offsetEncOffset := getEncodedBlockLen(startOffset, e.dataBlocks) + + // Allocate encoded blocks up to storage disks. + enBlocks := make([][]byte, len(e.storageDisks)) + + // Counter to keep success data blocks. + var successDataBlocksCount = 0 + var noReconstruct bool // Set for no reconstruction. + + // Read from all the disks. + for index, disk := range e.storageDisks { + blockIndex := e.distribution[index] - 1 + if !e.isValidBlock(volume, path, blockIndex) { + continue + } + // Initialize shard slice and fill the data from each parts. + enBlocks[blockIndex] = make([]byte, curEncBlockSize) + // Read the necessary blocks. + _, err := disk.ReadFile(volume, path, offsetEncOffset, enBlocks[blockIndex]) + if err != nil { + enBlocks[blockIndex] = nil + } + // Verify if we have successfully read all the data blocks. + if blockIndex < e.dataBlocks && enBlocks[blockIndex] != nil { + successDataBlocksCount++ + // Set when we have all the data blocks and no + // reconstruction is needed, so that we can avoid + // erasure reconstruction. + noReconstruct = successDataBlocksCount == e.dataBlocks + if noReconstruct { + // Break out we have read all the data blocks. + break + } } } - } - // Check blocks if they are all zero in length, we have corruption return error. - if checkBlockSize(enBlocks) == 0 { - return 0, errDataCorrupt - } + // Check blocks if they are all zero in length, we have corruption return error. + if checkBlockSize(enBlocks) == 0 { + return nil, errDataCorrupt + } - // Verify if reconstruction is needed, proceed with reconstruction. - if !noReconstruct { - err := e.ReedSolomon.Reconstruct(enBlocks) + // Verify if reconstruction is needed, proceed with reconstruction. + if !noReconstruct { + err := e.reedSolomon.Reconstruct(enBlocks) + if err != nil { + return nil, err + } + // Verify reconstructed blocks (parity). + ok, err := e.reedSolomon.Verify(enBlocks) + if err != nil { + return nil, err + } + if !ok { + // Blocks cannot be reconstructed, corrupted data. + err = errors.New("Verification failed after reconstruction, data likely corrupted.") + return nil, err + } + } + + // Get data blocks from encoded blocks. + dataBlocks, err := getDataBlocks(enBlocks, e.dataBlocks, int(curBlockSize)) if err != nil { - return 0, err - } - // Verify reconstructed blocks (parity). - ok, err := e.ReedSolomon.Verify(enBlocks) - if err != nil { - return 0, err - } - if !ok { - // Blocks cannot be reconstructed, corrupted data. - err = errors.New("Verification failed after reconstruction, data likely corrupted.") - return 0, err + return nil, err } + + // Copy data blocks. + buffer = append(buffer, dataBlocks...) + + // Negate the 'n' size written to client. + totalSizeLeft -= int64(len(dataBlocks)) + + // Increase the offset to move forward. + startOffset += int64(len(dataBlocks)) + + // Relenquish memory. + dataBlocks = nil } - - // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, e.DataBlocks, len(buffer)) - if err != nil { - return 0, err - } - - // Copy data blocks. - copy(buffer, dataBlocks) - - // Relenquish memory. - dataBlocks = nil - - return int64(len(buffer)), nil + return buffer, nil } diff --git a/erasure-utils.go b/erasure-utils.go index 86dca895a..625e7f314 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -16,7 +16,42 @@ package main -import "github.com/klauspost/reedsolomon" +import ( + "crypto/sha512" + "hash" + "io" + + "github.com/klauspost/reedsolomon" +) + +// newHash - gives you a newly allocated hash depending on the input algorithm. +func newHash(algo string) hash.Hash { + switch algo { + case "sha512": + return sha512.New() + // Add new hashes here. + default: + return sha512.New() + } +} + +func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { + startOffset := int64(0) + // Read until io.EOF. + for { + buf := make([]byte, blockSizeV1) + n, err := disk.ReadFile(volume, path, startOffset, buf) + if err == io.EOF { + break + } + if err != nil && err != io.EOF { + return nil, err + } + writer.Write(buf[:n]) + startOffset += n + } + return writer.Sum(nil), nil +} // getDataBlocks - fetches the data block only part of the input encoded blocks. func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte, err error) { @@ -31,6 +66,7 @@ func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data [] if size < curBlockSize { return nil, reedsolomon.ErrShortData } + write := curBlockSize for _, block := range blocks { if write < len(block) { diff --git a/erasure.go b/erasure.go index 80d9c6769..980ede8dc 100644 --- a/erasure.go +++ b/erasure.go @@ -16,21 +16,30 @@ package main -import "github.com/klauspost/reedsolomon" +import ( + "encoding/hex" + "hash" + + "github.com/klauspost/reedsolomon" +) // erasure storage layer. -type erasure struct { - ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder. - DataBlocks int - ParityBlocks int - storageDisks []StorageAPI - distribution []int +type erasureConfig struct { + reedSolomon reedsolomon.Encoder // Erasure encoder/decoder. + dataBlocks int // Calculated data disks. + storageDisks []StorageAPI // Initialized storage disks. + distribution []int // Erasure block distribution. + hashWriters []hash.Hash // Allocate hash writers. + + // Carries hex checksums needed for validating Reads. + hashChecksums []string + checkSumAlgo string } // newErasure instantiate a new erasure. -func newErasure(disks []StorageAPI, distribution []int) *erasure { +func newErasure(disks []StorageAPI, distribution []int) *erasureConfig { // Initialize E. - e := &erasure{} + e := &erasureConfig{} // Calculate data and parity blocks. dataBlocks, parityBlocks := len(disks)/2, len(disks)/2 @@ -40,9 +49,8 @@ func newErasure(disks []StorageAPI, distribution []int) *erasure { fatalIf(err, "Unable to initialize reedsolomon package.") // Save the reedsolomon. - e.DataBlocks = dataBlocks - e.ParityBlocks = parityBlocks - e.ReedSolomon = rs + e.dataBlocks = dataBlocks + e.reedSolomon = rs // Save all the initialized storage disks. e.storageDisks = disks @@ -53,3 +61,31 @@ func newErasure(disks []StorageAPI, distribution []int) *erasure { // Return successfully initialized. return e } + +// SaveAlgo - FIXME. +func (e *erasureConfig) SaveAlgo(algo string) { + e.checkSumAlgo = algo +} + +// Save hex encoded hashes - saves hashes that need to be validated +// during reads for each blocks. +func (e *erasureConfig) SaveHashes(hashes []string) { + e.hashChecksums = hashes +} + +// InitHash - initializes new hash for all blocks. +func (e *erasureConfig) InitHash(algo string) { + e.hashWriters = make([]hash.Hash, len(e.storageDisks)) + for index := range e.storageDisks { + e.hashWriters[index] = newHash(algo) + } +} + +// GetHashes - returns a slice of hex encoded hash. +func (e erasureConfig) GetHashes() []string { + var hexHashes = make([]string, len(e.storageDisks)) + for index, hashWriter := range e.hashWriters { + hexHashes[index] = hex.EncodeToString(hashWriter.Sum(nil)) + } + return hexHashes +} diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index b37252900..94e7c3610 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -52,7 +52,7 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin m.Parts = append(m.Parts, partInfo) // Parts in fsMeta should be in sorted order by part number. - sort.Sort(byPartNumber(m.Parts)) + sort.Sort(byObjectPartNumber(m.Parts)) } // readFSMetadata - returns the object metadata `fs.json` content. diff --git a/xl-v1-healing.go b/xl-v1-healing.go index b29feaed3..a77cd102e 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -41,19 +41,18 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() - offset := int64(0) - var buffer = make([]byte, blockSizeV1) - n, err := disk.ReadFile(bucket, xlMetaPath, offset, buffer) + buffer, err := readAll(disk, bucket, xlMetaPath) if err != nil { errs[index] = err return } - err = json.Unmarshal(buffer[:n], &metadataArray[index]) + err = json.Unmarshal(buffer, &metadataArray[index]) if err != nil { // Unable to parse xl.json, set error. errs[index] = err return } + // Relinquish buffer. buffer = nil errs[index] = nil }(index, disk) @@ -151,9 +150,8 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) { // - xlMetaV1 // - bool value indicating if healing is needed. // - error if any. -func (xl xlObjects) listOnlineDisks(bucket, object string) (onlineDisks []StorageAPI, version int64, err error) { +func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, version int64, err error) { onlineDisks = make([]StorageAPI, len(xl.storageDisks)) - partsMetadata, errs := xl.readAllXLMetadata(bucket, object) if err = xl.reduceError(errs); err != nil { if err == errFileNotFound { // For file not found, treat as if disks are available diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 844599eb5..d676893f0 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -18,6 +18,7 @@ package main import ( "encoding/json" + "fmt" "path" "sort" "sync" @@ -39,12 +40,19 @@ type objectPartInfo struct { Size int64 `json:"size"` } -// byPartName is a collection satisfying sort.Interface. -type byPartNumber []objectPartInfo +// byObjectPartNumber is a collection satisfying sort.Interface. +type byObjectPartNumber []objectPartInfo -func (t byPartNumber) Len() int { return len(t) } -func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number } +func (t byObjectPartNumber) Len() int { return len(t) } +func (t byObjectPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t byObjectPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number } + +// checkSumInfo - carries checksums of individual part. +type checkSumInfo struct { + Name string `json:"name"` + Algorithm string `json:"algorithm"` + Hash string `json:"hash"` +} // A xlMetaV1 represents a metadata header mapping keys to sets of values. type xlMetaV1 struct { @@ -56,17 +64,13 @@ type xlMetaV1 struct { Version int64 `json:"version"` } `json:"stat"` Erasure struct { - Algorithm string `json:"algorithm"` - DataBlocks int `json:"data"` - ParityBlocks int `json:"parity"` - BlockSize int64 `json:"blockSize"` - Index int `json:"index"` - Distribution []int `json:"distribution"` - Checksum []struct { - Name string `json:"name"` - Algorithm string `json:"algorithm"` - Hash string `json:"hash"` - } `json:"checksum"` + Algorithm string `json:"algorithm"` + DataBlocks int `json:"data"` + ParityBlocks int `json:"parity"` + BlockSize int64 `json:"blockSize"` + Index int `json:"index"` + Distribution []int `json:"distribution"` + Checksum []checkSumInfo `json:"checksum,omitempty"` } `json:"erasure"` Minio struct { Release string `json:"release"` @@ -89,6 +93,11 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { return xlMeta } +// IsValid - is validate tells if the format is sane. +func (m xlMetaV1) IsValid() bool { + return m.Version == "1" && m.Format == "xl" +} + // ObjectPartIndex - returns the index of matching object part number. func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) { for i, part := range m.Parts { @@ -100,6 +109,17 @@ func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) { return -1 } +// ObjectCheckIndex - returns the checksum for the part name from the checksum slice. +func (m xlMetaV1) PartObjectChecksum(partNumber int) checkSumInfo { + partName := fmt.Sprintf("object%d", partNumber) + for _, checksum := range m.Erasure.Checksum { + if checksum.Name == partName { + return checksum + } + } + return checkSumInfo{} +} + // AddObjectPart - add a new object part in order. func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) { partInfo := objectPartInfo{ @@ -121,11 +141,11 @@ func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag strin m.Parts = append(m.Parts, partInfo) // Parts in xlMeta should be in sorted order by part number. - sort.Sort(byPartNumber(m.Parts)) + sort.Sort(byObjectPartNumber(m.Parts)) } -// objectToPartOffset - translate offset of an object to offset of its individual part. -func (m xlMetaV1) objectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) { +// ObjectToPartOffset - translate offset of an object to offset of its individual part. +func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) { partOffset = offset // Seek until object offset maps to a particular part offset. for i, part := range m.Parts { @@ -146,6 +166,18 @@ func (m xlMetaV1) objectToPartOffset(offset int64) (partIndex int, partOffset in return 0, 0, InvalidRange{} } +// pickValidXLMeta - picks one valid xlMeta content and returns from a +// slice of xlmeta content. If no value is found this function panics +// and dies. +func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 { + for _, xlMeta := range xlMetas { + if xlMeta.IsValid() { + return xlMeta + } + } + panic("Unable to look for valid XL metadata content") +} + // readXLMetadata - returns the object metadata `xl.json` content from // one of the disks picked at random. func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) { @@ -160,7 +192,10 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err if err == nil { err = json.Unmarshal(buffer, &xlMeta) if err == nil { - return xlMeta, nil + if xlMeta.IsValid() { + return xlMeta, nil + } + err = errDataCorrupt } } xlJSONErrCount++ // Update error count. @@ -209,12 +244,85 @@ func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix return nil } +// writeXLMetadata - writes `xl.json` to a single disk. +func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error { + jsonFile := path.Join(prefix, xlMetaJSONFile) + + // Marshal json. + metadataBytes, err := json.Marshal(&xlMeta) + if err != nil { + return err + } + // Persist marshalled data. + n, err := disk.AppendFile(bucket, jsonFile, metadataBytes) + if err != nil { + return err + } + if n != int64(len(metadataBytes)) { + return errUnexpected + } + return nil +} + +// checkSumAlgorithm - get the algorithm required for checksum +// verification for a given part. Allocates a new hash and returns. +func checkSumAlgorithm(xlMeta xlMetaV1, partIdx int) string { + partCheckSumInfo := xlMeta.PartObjectChecksum(partIdx) + return partCheckSumInfo.Algorithm +} + +// xlMetaPartBlockChecksums - get block checksums for a given part. +func (xl xlObjects) metaPartBlockChecksums(xlMetas []xlMetaV1, partIdx int) (blockCheckSums []string) { + for index := range xl.storageDisks { + // Save the read checksums for a given part. + blockCheckSums = append(blockCheckSums, xlMetas[index].PartObjectChecksum(partIdx).Hash) + } + return blockCheckSums +} + +// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. +func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMetaV1) error { + var wg = &sync.WaitGroup{} + var mErrs = make([]error, len(xl.storageDisks)) + + // Start writing `xl.json` to all disks in parallel. + for index, disk := range xl.storageDisks { + wg.Add(1) + // Write `xl.json` in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + + // Pick one xlMeta for a disk at index. + xlMetas[index].Erasure.Index = index + 1 + + // Write unique `xl.json` for a disk at index. + if err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]); err != nil { + mErrs[index] = err + return + } + mErrs[index] = nil + }(index, disk) + } + + // Wait for all the routines. + wg.Wait() + + // Return the first error. + for _, err := range mErrs { + if err == nil { + continue + } + return err + } + + return nil +} + // writeXLMetadata - write `xl.json` on all disks in order. func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(xl.storageDisks)) - jsonFile := path.Join(prefix, xlMetaJSONFile) // Start writing `xl.json` to all disks in parallel. for index, disk := range xl.storageDisks { wg.Add(1) @@ -225,21 +333,11 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro // Save the disk order index. metadata.Erasure.Index = index + 1 - metadataBytes, err := json.Marshal(&metadata) - if err != nil { + // Write xl metadata. + if err := writeXLMetadata(disk, bucket, prefix, metadata); err != nil { mErrs[index] = err return } - // Persist marshalled data. - n, mErr := disk.AppendFile(bucket, jsonFile, metadataBytes) - if mErr != nil { - mErrs[index] = mErr - return - } - if n != int64(len(metadataBytes)) { - mErrs[index] = errUnexpected - return - } mErrs[index] = nil }(index, disk, xlMeta) } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 96a1a145b..597aa65fd 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -110,39 +110,32 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} } - uploadIDLocked := false - defer func() { - if uploadIDLocked { - nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - } - }() - // Figure out the erasure distribution first. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - uploadIDLocked = true + uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) + nsMutex.Lock(minioMetaBucket, uploadIDPath) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } + // Read metadata associated with the object from all disks. + partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath) // List all online disks. - onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs) if err != nil { return "", toObjectErr(err, bucket, object) } - // Unlock the uploadID so that parallel uploads of parts can happen. - nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - uploadIDLocked = false + // Pick one from the first valid metadata. + xlMeta := pickValidXLMeta(partsMetadata) // Initialize a new erasure with online disks and new distribution. erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) + // Initialize sha512 hash. + erasure.InitHash("sha512") + partSuffix := fmt.Sprintf("object%d", partID) tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix) @@ -182,31 +175,12 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, } } - // Hold lock as we are updating UPLODID/xl.json and renaming the part file from tmp location. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - uploadIDLocked = true - if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } - // List all online disks. - onlineDisks, higherVersion, err = xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Increment version only if we have online disks less than configured storage disks. - if diskCount(onlineDisks) < len(xl.storageDisks) { - higherVersion++ - } - - xlMeta, err = xl.readXLMetadata(minioMetaBucket, uploadIDPath) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } // Rename temporary part file to its final location. - partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) + partPath := path.Join(uploadIDPath, partSuffix) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) if err != nil { return "", toObjectErr(err, minioMetaBucket, partPath) @@ -214,10 +188,32 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, // Once part is successfully committed, proceed with updating XL metadata. xlMeta.Stat.Version = higherVersion + // Add the current part. xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) + // Get calculated hash checksums from erasure to save in `xl.json`. + hashChecksums := erasure.GetHashes() + + checkSums := make([]checkSumInfo, len(xl.storageDisks)) + for index := range xl.storageDisks { + blockIndex := xlMeta.Erasure.Distribution[index] - 1 + checkSums[blockIndex] = checkSumInfo{ + Name: partSuffix, + Algorithm: "sha512", + Hash: hashChecksums[blockIndex], + } + } + for index := range partsMetadata { + blockIndex := xlMeta.Erasure.Distribution[index] - 1 + partsMetadata[index].Parts = xlMeta.Parts + partsMetadata[index].Erasure.Checksum = append(partsMetadata[index].Erasure.Checksum, checkSums[blockIndex]) + } + + // Write all the checksum metadata. tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) - if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { + + // Write unique `xl.json` each disk. + if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) @@ -258,6 +254,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM result := ListPartsInfo{} uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) if err != nil { return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath) @@ -352,14 +349,18 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) - // Read the current `xl.json`. - xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) - if err != nil { + // Read metadata associated with the object from all disks. + partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath) + if err = xl.reduceError(errs); err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } + // Calculate full object size. var objectSize int64 + // Pick one from the first valid metadata. + xlMeta := pickValidXLMeta(partsMetadata) + // Save current xl meta for validation. var currentXLMeta = xlMeta @@ -405,7 +406,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload xlMeta.Meta["md5Sum"] = s3MD5 uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) - if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { + + // Update all xl metadata, make sure to not modify fields like + // checksum which are different on each disks. + for index := range partsMetadata { + partsMetadata[index].Stat = xlMeta.Stat + partsMetadata[index].Meta = xlMeta.Meta + partsMetadata[index].Parts = xlMeta.Parts + } + // Write unique `xl.json` for each disk. + if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) diff --git a/xl-v1-object.go b/xl-v1-object.go index 3fc9ef889..684850e50 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -31,63 +31,83 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i nsMutex.RLock(bucket, object) defer nsMutex.RUnlock(bucket, object) - // Read metadata associated with the object. - xlMeta, err := xl.readXLMetadata(bucket, object) - if err != nil { + // Read metadata associated with the object from all disks. + partsMetadata, errs := xl.readAllXLMetadata(bucket, object) + if err := xl.reduceError(errs); err != nil { return toObjectErr(err, bucket, object) } // List all online disks. - onlineDisks, _, err := xl.listOnlineDisks(bucket, object) + onlineDisks, _, err := xl.listOnlineDisks(partsMetadata, errs) if err != nil { return toObjectErr(err, bucket, object) } - // Initialize a new erasure with online disks, with previous block distribution. - erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) + // Pick one from the first valid metadata. + xlMeta := partsMetadata[0] + if !xlMeta.IsValid() { + for _, partMetadata := range partsMetadata { + if partMetadata.IsValid() { + xlMeta = partMetadata + break + } + } + } // Get part index offset. - partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset) + partIndex, partOffset, err := xlMeta.ObjectToPartOffset(startOffset) if err != nil { return toObjectErr(err, bucket, object) } + + // Read from all parts. for ; partIndex < len(xlMeta.Parts); partIndex++ { - part := xlMeta.Parts[partIndex] - totalLeft := part.Size - beginOffset := int64(0) - for totalLeft > 0 { - var curBlockSize int64 - if xlMeta.Erasure.BlockSize < totalLeft { - curBlockSize = xlMeta.Erasure.BlockSize - } else { - curBlockSize = totalLeft - } - var buffer = make([]byte, curBlockSize) - var n int64 - n, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), beginOffset, buffer) + // Save the current part name and size. + partName := xlMeta.Parts[partIndex].Name + partSize := xlMeta.Parts[partIndex].Size + + // Initialize a new erasure with online disks, with previous + // block distribution for each part reads. + erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) + + // Set previously calculated block checksums and algorithm for validation. + erasure.SaveAlgo(checkSumAlgorithm(xlMeta, partIndex+1)) + erasure.SaveHashes(xl.metaPartBlockChecksums(partsMetadata, partIndex+1)) + + // Data block size. + blockSize := xlMeta.Erasure.BlockSize + + // Start reading the part name. + var buffer []byte + buffer, err = erasure.ReadFile(bucket, pathJoin(object, partName), partSize, blockSize) + if err != nil { + return err + } + + // Copy to client until length requested. + if length > int64(len(buffer)) { + var m int64 + m, err = io.Copy(writer, bytes.NewReader(buffer[partOffset:])) if err != nil { return err } - if length > int64(len(buffer)) { - var m int64 - m, err = io.Copy(writer, bytes.NewReader(buffer[partOffset:])) - if err != nil { - return err - } - length -= m - } else { - _, err = io.CopyN(writer, bytes.NewReader(buffer[partOffset:]), length) - if err != nil { - return err - } - return nil + length -= m + } else { + _, err = io.CopyN(writer, bytes.NewReader(buffer[partOffset:]), length) + if err != nil { + return err } - totalLeft -= n - beginOffset += n - // Reset part offset to 0 to read rest of the part from the beginning. - partOffset = 0 + return nil } + + // Relinquish memory. + buffer = nil + + // Reset part offset to 0 to read rest of the part from the beginning. + partOffset = 0 } + + // Return success. return nil } @@ -220,8 +240,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Initialize xl meta. xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) + // Read metadata associated with the object from all disks. + partsMetadata, errs := xl.readAllXLMetadata(bucket, object) + // List all online disks. - onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object) + onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -234,6 +257,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Initialize a new erasure with online disks and new distribution. erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) + // Initialize sha512 hash. + erasure.InitHash("sha512") + // Initialize md5 writer. md5Writer := md5.New() @@ -305,10 +331,33 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. xlMeta.Stat.Size = size xlMeta.Stat.ModTime = modTime xlMeta.Stat.Version = higherVersion + // Add the final part. xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size) - // Write `xl.json` metadata. - if err = xl.writeXLMetadata(minioMetaBucket, tempObj, xlMeta); err != nil { + // Get hash checksums. + hashChecksums := erasure.GetHashes() + + // Save the checksums. + checkSums := make([]checkSumInfo, len(xl.storageDisks)) + for index := range xl.storageDisks { + blockIndex := xlMeta.Erasure.Distribution[index] - 1 + checkSums[blockIndex] = checkSumInfo{ + Name: "object1", + Algorithm: "sha512", + Hash: hashChecksums[blockIndex], + } + } + + // Update all the necessary fields making sure that checkSum field + // is different for each disks. + for index := range partsMetadata { + blockIndex := xlMeta.Erasure.Distribution[index] - 1 + partsMetadata[index] = xlMeta + partsMetadata[index].Erasure.Checksum = append(partsMetadata[index].Erasure.Checksum, checkSums[blockIndex]) + } + + // Write unique `xl.json` for each disk. + if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempObj, partsMetadata); err != nil { return "", toObjectErr(err, bucket, object) }