diff --git a/erasure-appendfile.go b/erasure-appendfile.go deleted file mode 100644 index d604fc0d9..000000000 --- a/erasure-appendfile.go +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import "sync" - -// AppendFile - append data buffer at path. -func (e erasureConfig) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) { - // Split the input buffer into data and parity blocks. - var blocks [][]byte - blocks, err = e.reedSolomon.Split(dataBuffer) - if err != nil { - return 0, err - } - - // Encode parity blocks using data blocks. - err = e.reedSolomon.Encode(blocks) - if err != nil { - return 0, err - } - - var wg = &sync.WaitGroup{} - var wErrs = make([]error, len(e.storageDisks)) - // Write encoded data to quorum disks in parallel. - for index, disk := range e.storageDisks { - if disk == nil { - continue - } - wg.Add(1) - // Write encoded data in routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - // Pick the block from the distribution. - blockIndex := e.distribution[index] - 1 - n, wErr := disk.AppendFile(volume, path, blocks[blockIndex]) - if wErr != nil { - wErrs[index] = wErr - return - } - if n != int64(len(blocks[blockIndex])) { - wErrs[index] = errUnexpected - return - } - // Calculate hash. - e.hashWriters[blockIndex].Write(blocks[blockIndex]) - - // Successfully wrote. - wErrs[index] = nil - }(index, disk) - } - - // Wait for all the appends to finish. - wg.Wait() - - return int64(len(dataBuffer)), nil -} diff --git a/erasure-createfile.go b/erasure-createfile.go new file mode 100644 index 000000000..46aa99134 --- /dev/null +++ b/erasure-createfile.go @@ -0,0 +1,145 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/hex" + "hash" + "io" + "sync" + + "github.com/klauspost/reedsolomon" +) + +// encodeData - encodes incoming data buffer into +// dataBlocks+parityBlocks returns a 2 dimensional byte array. +func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) { + rs, err := reedsolomon.New(dataBlocks, parityBlocks) + if err != nil { + return nil, err + } + // Split the input buffer into data and parity blocks. + var blocks [][]byte + blocks, err = rs.Split(dataBuffer) + if err != nil { + return nil, err + } + + // Encode parity blocks using data blocks. + err = rs.Encode(blocks) + if err != nil { + return nil, err + } + + // Return encoded blocks. + return blocks, nil +} + +// erasureCreateFile - take a data stream, reads until io.EOF erasure +// code and writes to all the disks. +func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo) (newEInfos []erasureInfo, err error) { + // Allocated blockSized buffer for reading. + buf := make([]byte, blockSizeV1) + hashWriters := newHashWriters(len(disks)) + + // Just pick one eInfo. + eInfo := eInfos[0] + + // Read until io.EOF, erasure codes data and writes to all disks. + for { + var n int + n, err = io.ReadFull(data, buf) + if err == io.EOF { + break + } + if err != nil && err != io.ErrUnexpectedEOF { + return nil, err + } + var blocks [][]byte + // Returns encoded blocks. + blocks, err = encodeData(buf[:n], eInfo.DataBlocks, eInfo.ParityBlocks) + if err != nil { + return nil, err + } + err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters) + if err != nil { + return nil, err + } + } + + // Save the checksums. + checkSums := make([]checkSumInfo, len(disks)) + for index := range disks { + blockIndex := eInfo.Distribution[index] - 1 + checkSums[blockIndex] = checkSumInfo{ + Name: partName, + Algorithm: "sha512", + Hash: hex.EncodeToString(hashWriters[blockIndex].Sum(nil)), + } + } + + // Erasure info update for checksum for each disks. + newEInfos = make([]erasureInfo, len(disks)) + for index, eInfo := range eInfos { + blockIndex := eInfo.Distribution[index] - 1 + newEInfos[index] = eInfo + newEInfos[index].Checksum = append(newEInfos[index].Checksum, checkSums[blockIndex]) + } + + // Return newEInfos. + return newEInfos, nil +} + +// appendFile - append data buffer at path. +func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash) (err error) { + var wg = &sync.WaitGroup{} + var wErrs = make([]error, len(disks)) + // Write encoded data to quorum disks in parallel. + for index, disk := range disks { + if disk == nil { + continue + } + wg.Add(1) + // Write encoded data in routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + // Pick the block from the distribution. + blockIndex := distribution[index] - 1 + n, wErr := disk.AppendFile(volume, path, enBlocks[blockIndex]) + if wErr != nil { + wErrs[index] = wErr + return + } + if n != int64(len(enBlocks[blockIndex])) { + wErrs[index] = errUnexpected + return + } + + // Calculate hash for each blocks. + hashWriters[blockIndex].Write(enBlocks[blockIndex]) + + // Successfully wrote. + wErrs[index] = nil + }(index, disk) + } + + // Wait for all the appends to finish. + wg.Wait() + + // Return success. + return nil +} diff --git a/erasure-readfile.go b/erasure-readfile.go index 0f088cd1b..27fed9eaf 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -19,36 +19,86 @@ package main import ( "encoding/hex" "errors" + + "github.com/klauspost/reedsolomon" ) -// isValidBlock - calculates the checksum hash for the block and -// validates if its correct returns true for valid cases, false otherwise. -func (e erasureConfig) isValidBlock(volume, path string, blockIdx int) bool { - diskIndex := -1 +// PartObjectChecksum - returns the checksum for the part name from the checksum slice. +func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo { + for _, checksum := range e.Checksum { + if checksum.Name == partName { + return checksum + } + } + return checkSumInfo{} +} + +// xlMetaPartBlockChecksums - get block checksums for a given part. +func metaPartBlockChecksums(disks []StorageAPI, eInfos []erasureInfo, partName string) (blockCheckSums []checkSumInfo) { + for index := range disks { + // Save the read checksums for a given part. + blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName)) + } + return blockCheckSums +} + +// Takes block index and block distribution to get the disk index. +func toDiskIndex(blockIdx int, distribution []int) (diskIndex int) { + diskIndex = -1 // Find out the right disk index for the input block index. - for index, blockIndex := range e.distribution { + for index, blockIndex := range distribution { if blockIndex == blockIdx { diskIndex = index } } + return diskIndex +} + +// isValidBlock - calculates the checksum hash for the block and +// validates if its correct returns true for valid cases, false otherwise. +func isValidBlock(disks []StorageAPI, volume, path string, diskIndex int, blockCheckSums []checkSumInfo) bool { // Unknown block index requested, treat it as error. if diskIndex == -1 { return false } // Disk is not present, treat entire block to be non existent. - if e.storageDisks[diskIndex] == nil { + if disks[diskIndex] == nil { return false } // Read everything for a given block and calculate hash. - hashBytes, err := hashSum(e.storageDisks[diskIndex], volume, path, newHash(e.checkSumAlgo)) + hashWriter := newHash(blockCheckSums[diskIndex].Algorithm) + hashBytes, err := hashSum(disks[diskIndex], volume, path, hashWriter) if err != nil { return false } - return hex.EncodeToString(hashBytes) == e.hashChecksums[diskIndex] + return hex.EncodeToString(hashBytes) == blockCheckSums[diskIndex].Hash +} + +// decodeData - decode encoded blocks. +func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { + rs, err := reedsolomon.New(dataBlocks, parityBlocks) + if err != nil { + return err + } + err = rs.Reconstruct(enBlocks) + if err != nil { + return err + } + // Verify reconstructed blocks (parity). + ok, err := rs.Verify(enBlocks) + if err != nil { + return err + } + if !ok { + // Blocks cannot be reconstructed, corrupted data. + err = errors.New("Verification failed after reconstruction, data likely corrupted.") + return err + } + return nil } // ReadFile - decoded erasure coded file. -func (e erasureConfig) ReadFile(volume, path string, size int64, blockSize int64) ([]byte, error) { +func erasureReadFile(disks []StorageAPI, volume string, path string, partName string, size int64, eInfos []erasureInfo) ([]byte, error) { // Return data buffer. var buffer []byte @@ -58,31 +108,37 @@ func (e erasureConfig) ReadFile(volume, path string, size int64, blockSize int64 // Starting offset for reading. startOffset := int64(0) + // Gather previously calculated block checksums. + blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) + + // Pick one erasure info. + eInfo := eInfos[0] + // Write until each parts are read and exhausted. for totalSizeLeft > 0 { // Calculate the proper block size. var curBlockSize int64 - if blockSize < totalSizeLeft { - curBlockSize = blockSize + if eInfo.BlockSize < totalSizeLeft { + curBlockSize = eInfo.BlockSize } else { curBlockSize = totalSizeLeft } // Calculate the current encoded block size. - curEncBlockSize := getEncodedBlockLen(curBlockSize, e.dataBlocks) - offsetEncOffset := getEncodedBlockLen(startOffset, e.dataBlocks) + curEncBlockSize := getEncodedBlockLen(curBlockSize, eInfo.DataBlocks) + offsetEncOffset := getEncodedBlockLen(startOffset, eInfo.DataBlocks) // Allocate encoded blocks up to storage disks. - enBlocks := make([][]byte, len(e.storageDisks)) + enBlocks := make([][]byte, len(disks)) // Counter to keep success data blocks. var successDataBlocksCount = 0 var noReconstruct bool // Set for no reconstruction. // Read from all the disks. - for index, disk := range e.storageDisks { - blockIndex := e.distribution[index] - 1 - if !e.isValidBlock(volume, path, blockIndex) { + for index, disk := range disks { + blockIndex := eInfo.Distribution[index] - 1 + if !isValidBlock(disks, volume, path, toDiskIndex(blockIndex, eInfo.Distribution), blockCheckSums) { continue } // Initialize shard slice and fill the data from each parts. @@ -93,12 +149,12 @@ func (e erasureConfig) ReadFile(volume, path string, size int64, blockSize int64 enBlocks[blockIndex] = nil } // Verify if we have successfully read all the data blocks. - if blockIndex < e.dataBlocks && enBlocks[blockIndex] != nil { + if blockIndex < eInfo.DataBlocks && enBlocks[blockIndex] != nil { successDataBlocksCount++ // Set when we have all the data blocks and no // reconstruction is needed, so that we can avoid // erasure reconstruction. - noReconstruct = successDataBlocksCount == e.dataBlocks + noReconstruct = successDataBlocksCount == eInfo.DataBlocks if noReconstruct { // Break out we have read all the data blocks. break @@ -113,24 +169,14 @@ func (e erasureConfig) ReadFile(volume, path string, size int64, blockSize int64 // Verify if reconstruction is needed, proceed with reconstruction. if !noReconstruct { - err := e.reedSolomon.Reconstruct(enBlocks) + err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return nil, err } - // Verify reconstructed blocks (parity). - ok, err := e.reedSolomon.Verify(enBlocks) - if err != nil { - return nil, err - } - if !ok { - // Blocks cannot be reconstructed, corrupted data. - err = errors.New("Verification failed after reconstruction, data likely corrupted.") - return nil, err - } } // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, e.dataBlocks, int(curBlockSize)) + dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(curBlockSize)) if err != nil { return nil, err } diff --git a/erasure-utils.go b/erasure-utils.go index 625e7f314..c97195351 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -24,6 +24,15 @@ import ( "github.com/klauspost/reedsolomon" ) +// newHashWriters - inititialize a slice of hashes for the disk count. +func newHashWriters(diskCount int) []hash.Hash { + hashWriters := make([]hash.Hash, diskCount) + for index := range hashWriters { + hashWriters[index] = newHash("sha512") + } + return hashWriters +} + // newHash - gives you a newly allocated hash depending on the input algorithm. func newHash(algo string) hash.Hash { switch algo { diff --git a/erasure.go b/erasure.go index 980ede8dc..4e7aad742 100644 --- a/erasure.go +++ b/erasure.go @@ -15,77 +15,3 @@ */ package main - -import ( - "encoding/hex" - "hash" - - "github.com/klauspost/reedsolomon" -) - -// erasure storage layer. -type erasureConfig struct { - reedSolomon reedsolomon.Encoder // Erasure encoder/decoder. - dataBlocks int // Calculated data disks. - storageDisks []StorageAPI // Initialized storage disks. - distribution []int // Erasure block distribution. - hashWriters []hash.Hash // Allocate hash writers. - - // Carries hex checksums needed for validating Reads. - hashChecksums []string - checkSumAlgo string -} - -// newErasure instantiate a new erasure. -func newErasure(disks []StorageAPI, distribution []int) *erasureConfig { - // Initialize E. - e := &erasureConfig{} - - // Calculate data and parity blocks. - dataBlocks, parityBlocks := len(disks)/2, len(disks)/2 - - // Initialize reed solomon encoding. - rs, err := reedsolomon.New(dataBlocks, parityBlocks) - fatalIf(err, "Unable to initialize reedsolomon package.") - - // Save the reedsolomon. - e.dataBlocks = dataBlocks - e.reedSolomon = rs - - // Save all the initialized storage disks. - e.storageDisks = disks - - // Save the distribution. - e.distribution = distribution - - // Return successfully initialized. - return e -} - -// SaveAlgo - FIXME. -func (e *erasureConfig) SaveAlgo(algo string) { - e.checkSumAlgo = algo -} - -// Save hex encoded hashes - saves hashes that need to be validated -// during reads for each blocks. -func (e *erasureConfig) SaveHashes(hashes []string) { - e.hashChecksums = hashes -} - -// InitHash - initializes new hash for all blocks. -func (e *erasureConfig) InitHash(algo string) { - e.hashWriters = make([]hash.Hash, len(e.storageDisks)) - for index := range e.storageDisks { - e.hashWriters[index] = newHash(algo) - } -} - -// GetHashes - returns a slice of hex encoded hash. -func (e erasureConfig) GetHashes() []string { - var hexHashes = make([]string, len(e.storageDisks)) - for index, hashWriter := range e.hashWriters { - hexHashes[index] = hex.EncodeToString(hashWriter.Sum(nil)) - } - return hexHashes -} diff --git a/format-config-v1.go b/format-config-v1.go index cedea9961..bfebb7764 100644 --- a/format-config-v1.go +++ b/format-config-v1.go @@ -190,10 +190,15 @@ func healFormatXL(bootstrapDisks []StorageAPI) error { // All disks are fresh, format.json will be written by initFormatXL() return nil } + + // From reference config update UUID's not be in use. for index, diskUUID := range referenceConfig.XL.JBOD { uuidUsage[index].uuid = diskUUID uuidUsage[index].inuse = false } + + // For all config formats validate if they are in use and update + // the uuidUsage values. for _, config := range formatConfigs { if config == nil { continue @@ -205,6 +210,9 @@ func healFormatXL(bootstrapDisks []StorageAPI) error { } } } + + // This section heals the format.json and updates the fresh disks + // by reapply the unused UUID's . for index, heal := range needHeal { if !heal { // Previously we detected that heal is not needed on the disk. @@ -214,7 +222,8 @@ func healFormatXL(bootstrapDisks []StorageAPI) error { *config = *referenceConfig config.XL.Disk = getUnusedUUID() if config.XL.Disk == "" { - // getUnusedUUID() should have returned an unused uuid, if not return error. + // getUnusedUUID() should have returned an unused uuid, it + // is an unexpected error. return errUnexpected } @@ -222,6 +231,7 @@ func healFormatXL(bootstrapDisks []StorageAPI) error { if err != nil { return err } + // Fresh disk without format.json _, _ = bootstrapDisks[index].AppendFile(minioMetaBucket, formatConfigFile, formatBytes) // Ignore any error from AppendFile() as quorum might still be there to be operational. @@ -229,7 +239,8 @@ func healFormatXL(bootstrapDisks []StorageAPI) error { return nil } -// loadFormatXL - load XL format.json. +// loadFormatXL - loads XL `format.json` and returns back properly +// ordered storage slice based on `format.json`. func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { var unformattedDisksFoundCnt = 0 var diskNotFoundCount = 0 @@ -238,9 +249,10 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { // Heal missing format.json on the drives. if err = healFormatXL(bootstrapDisks); err != nil { // There was an unexpected unrecoverable error during healing. - return + return nil, err } + // Try to load `format.json` bootstrap disks. for index, disk := range bootstrapDisks { var formatXL *formatConfigV1 formatXL, err = loadFormat(disk) @@ -257,6 +269,7 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { // Save valid formats. formatConfigs[index] = formatXL } + // If all disks indicate that 'format.json' is not available // return 'errUnformattedDisk'. if unformattedDisksFoundCnt == len(bootstrapDisks) { @@ -269,6 +282,7 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { return nil, errReadQuorum } + // Validate the format configs read are correct. if err = checkFormatXL(formatConfigs); err != nil { return nil, err } diff --git a/namespace-lock.go b/namespace-lock.go index c49fa1f14..07ca23691 100644 --- a/namespace-lock.go +++ b/namespace-lock.go @@ -64,7 +64,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { } n.lockMap[param] = nsLk } - nsLk.ref++ + nsLk.ref++ // Update ref count here to avoid multiple races. // Unlock map before Locking NS which might block. n.mutex.Unlock() diff --git a/posix.go b/posix.go index 218f0755a..03007df61 100644 --- a/posix.go +++ b/posix.go @@ -291,9 +291,6 @@ func (s posix) ListDir(volume, dirPath string) ([]string, error) { // for io.EOF. Additionally ReadFile also starts reading from an // offset. func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) { - nsMutex.RLock(volume, path) - defer nsMutex.RUnlock(volume, path) - volumeDir, err := s.getVolDir(volume) if err != nil { return 0, err @@ -354,9 +351,6 @@ func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n // AppendFile - append a byte array at path, if file doesn't exist at // path this call explicitly creates it. func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) { - nsMutex.Lock(volume, path) - defer nsMutex.Unlock(volume, path) - volumeDir, err := s.getVolDir(volume) if err != nil { return 0, err @@ -404,9 +398,6 @@ func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) // StatFile - get file info. func (s posix) StatFile(volume, path string) (file FileInfo, err error) { - nsMutex.RLock(volume, path) - defer nsMutex.RUnlock(volume, path) - volumeDir, err := s.getVolDir(volume) if err != nil { return FileInfo{}, err @@ -484,9 +475,6 @@ func deleteFile(basePath, deletePath string) error { // DeleteFile - delete a file at path. func (s posix) DeleteFile(volume, path string) error { - nsMutex.Lock(volume, path) - defer nsMutex.Unlock(volume, path) - volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -513,12 +501,6 @@ func (s posix) DeleteFile(volume, path string) error { // RenameFile - rename source path to destination path atomically. func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { - nsMutex.Lock(srcVolume, srcPath) - defer nsMutex.Unlock(srcVolume, srcPath) - - nsMutex.Lock(dstVolume, dstPath) - defer nsMutex.Unlock(dstVolume, dstPath) - srcVolumeDir, err := s.getVolDir(srcVolume) if err != nil { return err diff --git a/test-utils_test.go b/test-utils_test.go index 6589e90cb..7f45bf127 100644 --- a/test-utils_test.go +++ b/test-utils_test.go @@ -85,7 +85,6 @@ func ExecObjectLayerTest(t *testing.T, objTest func(obj ObjectLayer, instanceTyp if err != nil { t.Fatalf("Initialization of object layer failed for single node setup: %s", err.Error()) } - // FIXME: enable FS tests after fixing it. // Executing the object layer tests for single node setup. objTest(objLayer, singleNodeTestStr, t) diff --git a/tree-walk-xl.go b/tree-walk-xl.go index 85d86a474..804fcccb6 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -47,31 +47,27 @@ type treeWalker struct { // listDir - listDir. func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { - // Count for list errors encountered. - var listErrCount = 0 - - // Return the first success entry based on the selected random disk. - for listErrCount < len(xl.storageDisks) { - disk := xl.getRandomDisk() // Choose a random disk on each attempt. - if entries, err = disk.ListDir(bucket, prefixDir); err == nil { - // Skip the entries which do not match the filter. - for i, entry := range entries { - if filter(entry) { - entries[i] = "" - continue - } - if strings.HasSuffix(entry, slashSeparator) && isLeaf(bucket, pathJoin(prefixDir, entry)) { - entries[i] = strings.TrimSuffix(entry, slashSeparator) - } - } - sort.Strings(entries) - // Skip the empty strings - for len(entries) > 0 && entries[0] == "" { - entries = entries[1:] - } - return entries, nil + for _, disk := range xl.getLoadBalancedQuorumDisks() { + entries, err = disk.ListDir(bucket, prefixDir) + if err != nil { + break } - listErrCount++ // Update list error count. + // Skip the entries which do not match the filter. + for i, entry := range entries { + if filter(entry) { + entries[i] = "" + continue + } + if strings.HasSuffix(entry, slashSeparator) && isLeaf(bucket, pathJoin(prefixDir, entry)) { + entries[i] = strings.TrimSuffix(entry, slashSeparator) + } + } + sort.Strings(entries) + // Skip the empty strings + for len(entries) > 0 && entries[0] == "" { + entries = entries[1:] + } + return entries, nil } // Return error at the end. diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index ac88167c8..4ec22f020 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -1,3 +1,19 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( @@ -70,27 +86,21 @@ func (xl xlObjects) MakeBucket(bucket string) error { return nil } -// getBucketInfo - returns the BucketInfo from one of the disks picked -// at random. +// getBucketInfo - returns the BucketInfo from one of the load balanced disks. func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) { - // Count for errors encountered. - var bucketErrCount = 0 - - // Return the first successful lookup from a random list of disks. - for bucketErrCount < len(xl.storageDisks) { - disk := xl.getRandomDisk() // Choose a random disk on each attempt. + for _, disk := range xl.getLoadBalancedQuorumDisks() { var volInfo VolInfo volInfo, err = disk.StatVol(bucketName) - if err == nil { - bucketInfo = BucketInfo{ - Name: volInfo.Name, - Created: volInfo.Created, - } - return bucketInfo, nil + if err != nil { + return BucketInfo{}, err } - bucketErrCount++ // Update error count. + bucketInfo = BucketInfo{ + Name: volInfo.Name, + Created: volInfo.Created, + } + break } - return BucketInfo{}, err + return bucketInfo, nil } // Checks whether bucket exists. @@ -127,12 +137,7 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { // listBuckets - returns list of all buckets from a disk picked at random. func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) { - // Count for errors encountered. - var listBucketsErrCount = 0 - - // Return the first successful lookup from a random list of disks. - for listBucketsErrCount < len(xl.storageDisks) { - disk := xl.getRandomDisk() // Choose a random disk on each attempt. + for _, disk := range xl.getLoadBalancedQuorumDisks() { var volsInfo []VolInfo volsInfo, err = disk.ListVols() if err == nil { @@ -154,7 +159,7 @@ func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) { } return bucketsInfo, nil } - listBucketsErrCount++ // Update error count. + break } return nil, err } diff --git a/xl-v1-common.go b/xl-v1-common.go index 2e0352636..d79edf5f4 100644 --- a/xl-v1-common.go +++ b/xl-v1-common.go @@ -16,20 +16,23 @@ package main -import ( - "math/rand" - "path" - "sync" - "time" -) +import "path" -// getRandomDisk - gives a random disk at any point in time from the -// available pool of disks. -func (xl xlObjects) getRandomDisk() (disk StorageAPI) { - rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. - randIndex := rand.Intn(len(xl.storageDisks) - 1) - disk = xl.storageDisks[randIndex] // Pick a random disk. - return disk +// getLoadBalancedQuorumDisks - fetches load balanced sufficiently +// randomized quorum disk slice. +func (xl xlObjects) getLoadBalancedQuorumDisks() (disks []StorageAPI) { + // It is okay to have readQuorum disks. + return xl.getLoadBalancedDisks()[:xl.readQuorum-1] +} + +// getLoadBalancedDisks - fetches load balanced (sufficiently +// randomized) disk slice. +func (xl xlObjects) getLoadBalancedDisks() (disks []StorageAPI) { + // Based on the random shuffling return back randomized disks. + for _, i := range randInts(len(xl.storageDisks)) { + disks = append(disks, xl.storageDisks[i-1]) + } + return disks } // This function does the following check, suppose @@ -51,62 +54,27 @@ func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { return isParentDirObject(parent) } +// isObject - returns `true` if the prefix is an object i.e if +// `xl.json` exists at the leaf, false otherwise. func (xl xlObjects) isObject(bucket, prefix string) bool { - // Create errs and volInfo slices of storageDisks size. - var errs = make([]error, len(xl.storageDisks)) - - // Allocate a new waitgroup. - var wg = &sync.WaitGroup{} - for index, disk := range xl.storageDisks { - wg.Add(1) - // Stat file on all the disks in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) - if err != nil { - errs[index] = err - return - } - errs[index] = nil - }(index, disk) - } - - // Wait for all the Stat operations to finish. - wg.Wait() - - var errFileNotFoundCount int - for _, err := range errs { + for _, disk := range xl.getLoadBalancedQuorumDisks() { + _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) if err != nil { - if err == errFileNotFound { - errFileNotFoundCount++ - // If we have errors with file not found greater than allowed read - // quorum we return err as errFileNotFound. - if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { - return false - } - continue - } - errorIf(err, "Unable to access file "+path.Join(bucket, prefix)) return false } + break } return true } -// statPart - stat a part file. +// statPart - returns fileInfo structure for a successful stat on part file. func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) { - // Count for errors encountered. - var xlJSONErrCount = 0 - - // Return the first success entry based on the selected random disk. - for xlJSONErrCount < len(xl.storageDisks) { - // Choose a random disk on each attempt. - disk := xl.getRandomDisk() + for _, disk := range xl.getLoadBalancedQuorumDisks() { fileInfo, err = disk.StatFile(bucket, objectPart) - if err == nil { - return fileInfo, nil + if err != nil { + return FileInfo{}, err } - xlJSONErrCount++ // Update error count. + break } - return FileInfo{}, err + return fileInfo, nil } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index d676893f0..22ea5eaa5 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -18,7 +18,6 @@ package main import ( "encoding/json" - "fmt" "path" "sort" "sync" @@ -47,53 +46,68 @@ func (t byObjectPartNumber) Len() int { return len(t) } func (t byObjectPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t byObjectPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number } -// checkSumInfo - carries checksums of individual part. +// checkSumInfo - carries checksums of individual scattered parts per disk. type checkSumInfo struct { Name string `json:"name"` Algorithm string `json:"algorithm"` Hash string `json:"hash"` } -// A xlMetaV1 represents a metadata header mapping keys to sets of values. +// erasureInfo - carries erasure coding related information, block +// distribution and checksums. +type erasureInfo struct { + Algorithm string `json:"algorithm"` + DataBlocks int `json:"data"` + ParityBlocks int `json:"parity"` + BlockSize int64 `json:"blockSize"` + Index int `json:"index"` + Distribution []int `json:"distribution"` + Checksum []checkSumInfo `json:"checksum,omitempty"` +} + +// statInfo - carries stat information of the object. +type statInfo struct { + Size int64 `json:"size"` // Size of the object `xl.json`. + ModTime time.Time `json:"modTime"` // ModTime of the object `xl.json`. + Version int64 `json:"version"` // Version of the object `xl.json`, useful to calculate quorum. +} + +// A xlMetaV1 represents `xl.json` metadata header. type xlMetaV1 struct { - Version string `json:"version"` - Format string `json:"format"` - Stat struct { - Size int64 `json:"size"` - ModTime time.Time `json:"modTime"` - Version int64 `json:"version"` - } `json:"stat"` - Erasure struct { - Algorithm string `json:"algorithm"` - DataBlocks int `json:"data"` - ParityBlocks int `json:"parity"` - BlockSize int64 `json:"blockSize"` - Index int `json:"index"` - Distribution []int `json:"distribution"` - Checksum []checkSumInfo `json:"checksum,omitempty"` - } `json:"erasure"` + Version string `json:"version"` // Version of the current `xl.json`. + Format string `json:"format"` // Format of the current `xl.json`. + Stat statInfo `json:"stat"` // Stat of the current object `xl.json`. + // Erasure coded info for the current object `xl.json`. + Erasure erasureInfo `json:"erasure"` + // Minio release tag for current object `xl.json`. Minio struct { Release string `json:"release"` } `json:"minio"` - Meta map[string]string `json:"meta"` - Parts []objectPartInfo `json:"parts,omitempty"` + // Metadata map for current object `xl.json`. + Meta map[string]string `json:"meta"` + // Captures all the individual object `xl.json`. + Parts []objectPartInfo `json:"parts,omitempty"` } -// newXLMetaV1 - initializes new xlMetaV1. +// newXLMetaV1 - initializes new xlMetaV1, adds version, allocates a +// fresh erasure info. func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { xlMeta = xlMetaV1{} xlMeta.Version = "1" xlMeta.Format = "xl" xlMeta.Minio.Release = minioReleaseTag - xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost - xlMeta.Erasure.DataBlocks = dataBlocks - xlMeta.Erasure.ParityBlocks = parityBlocks - xlMeta.Erasure.BlockSize = blockSizeV1 - xlMeta.Erasure.Distribution = randInts(dataBlocks + parityBlocks) + xlMeta.Erasure = erasureInfo{ + Algorithm: erasureAlgorithmKlauspost, + DataBlocks: dataBlocks, + ParityBlocks: parityBlocks, + BlockSize: blockSizeV1, + Distribution: randInts(dataBlocks + parityBlocks), + } return xlMeta } -// IsValid - is validate tells if the format is sane. +// IsValid - tells if the format is sane by validating the version +// string and format style. func (m xlMetaV1) IsValid() bool { return m.Version == "1" && m.Format == "xl" } @@ -109,17 +123,6 @@ func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) { return -1 } -// ObjectCheckIndex - returns the checksum for the part name from the checksum slice. -func (m xlMetaV1) PartObjectChecksum(partNumber int) checkSumInfo { - partName := fmt.Sprintf("object%d", partNumber) - for _, checksum := range m.Erasure.Checksum { - if checksum.Name == partName { - return checksum - } - } - return checkSumInfo{} -} - // AddObjectPart - add a new object part in order. func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) { partInfo := objectPartInfo{ @@ -181,26 +184,19 @@ func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 { // readXLMetadata - returns the object metadata `xl.json` content from // one of the disks picked at random. func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) { - // Count for errors encountered. - var xlJSONErrCount = 0 - - // Return the first successful lookup from a random list of disks. - for xlJSONErrCount < len(xl.storageDisks) { - disk := xl.getRandomDisk() // Choose a random disk on each attempt. - var buffer []byte - buffer, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile)) - if err == nil { - err = json.Unmarshal(buffer, &xlMeta) - if err == nil { - if xlMeta.IsValid() { - return xlMeta, nil - } - err = errDataCorrupt - } + for _, disk := range xl.getLoadBalancedQuorumDisks() { + var buf []byte + buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile)) + if err != nil { + return xlMetaV1{}, err } - xlJSONErrCount++ // Update error count. + err = json.Unmarshal(buf, &xlMeta) + if err != nil { + return xlMetaV1{}, err + } + break } - return xlMetaV1{}, err + return xlMeta, nil } // renameXLMetadata - renames `xl.json` from source prefix to destination prefix. @@ -264,22 +260,6 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er return nil } -// checkSumAlgorithm - get the algorithm required for checksum -// verification for a given part. Allocates a new hash and returns. -func checkSumAlgorithm(xlMeta xlMetaV1, partIdx int) string { - partCheckSumInfo := xlMeta.PartObjectChecksum(partIdx) - return partCheckSumInfo.Algorithm -} - -// xlMetaPartBlockChecksums - get block checksums for a given part. -func (xl xlObjects) metaPartBlockChecksums(xlMetas []xlMetaV1, partIdx int) (blockCheckSums []string) { - for index := range xl.storageDisks { - // Save the read checksums for a given part. - blockCheckSums = append(blockCheckSums, xlMetas[index].PartObjectChecksum(partIdx).Hash) - } - return blockCheckSums -} - // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMetaV1) error { var wg = &sync.WaitGroup{} diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 5550b3fe5..c380231a1 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -23,22 +23,20 @@ import ( "strings" "sync" "time" - - "github.com/skyrings/skyring-common/tools/uuid" ) -// uploadInfo - +// A uploadInfo represents the s3 compatible spec. type uploadInfo struct { - UploadID string `json:"uploadId"` - Deleted bool `json:"deleted"` // Currently unused. - Initiated time.Time `json:"initiated"` + UploadID string `json:"uploadId"` // UploadID for the active multipart upload. + Deleted bool `json:"deleted"` // Currently unused, for future use. + Initiated time.Time `json:"initiated"` // Indicates when the uploadID was initiated. } -// uploadsV1 - +// A uploadsV1 represents `uploads.json` metadata header. type uploadsV1 struct { - Version string `json:"version"` - Format string `json:"format"` - Uploads []uploadInfo `json:"uploadIds"` + Version string `json:"version"` // Version of the current `uploads.json` + Format string `json:"format"` // Format of the current `uploads.json` + Uploads []uploadInfo `json:"uploadIds"` // Captures all the upload ids for a given object. } // byInitiatedTime is a collection satisfying sort.Interface. @@ -70,49 +68,21 @@ func (u uploadsV1) Index(uploadID string) int { } // readUploadsJSON - get all the saved uploads JSON. -func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) { +func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - var errs = make([]error, len(storageDisks)) - var uploads = make([]uploadsV1, len(storageDisks)) - var wg = &sync.WaitGroup{} - - // Read `uploads.json` from all disks. - for index, disk := range storageDisks { - wg.Add(1) - // Read `uploads.json` in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - // Read all of 'uploads.json' - buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath) - if rErr != nil { - errs[index] = rErr - return - } - rErr = json.Unmarshal(buffer, &uploads[index]) - if rErr != nil { - errs[index] = rErr - return - } - buffer = nil - errs[index] = nil - }(index, disk) + // Read all of 'uploads.json' + buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath) + if rErr != nil { + return uploadsV1{}, rErr } - - // Wait for all the routines. - wg.Wait() - - // Return for first error. - for _, err = range errs { - if err != nil { - return uploadsV1{}, err - } + rErr = json.Unmarshal(buffer, &uploadIDs) + if rErr != nil { + return uploadsV1{}, rErr } - - // FIXME: Do not know if it should pick the picks the first successful one and returns. - return uploads[0], nil + return uploadIDs, nil } -// uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. +// updateUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uniqueID := getUUID() @@ -178,7 +148,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora var wg = &sync.WaitGroup{} var uploadsJSON uploadsV1 - uploadsJSON, err = readUploadsJSON(bucket, object, storageDisks...) + for _, disk := range storageDisks { + uploadsJSON, err = readUploadsJSON(bucket, object, disk) + break + } if err != nil { // For any other errors. if err != errFileNotFound { @@ -206,6 +179,7 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora errs[index] = wErr return } + // Write `uploads.json` to disk. n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes) if wErr != nil { errs[index] = wErr @@ -312,184 +286,33 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count // Returns if the prefix is a multipart upload. func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { - disk := xl.getRandomDisk() // Choose a random disk. - _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) - return err == nil + for _, disk := range xl.getLoadBalancedQuorumDisks() { + _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) + if err != nil { + return false + } + break + } + return true } // listUploadsInfo - list all uploads info. func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) { - disk := xl.getRandomDisk() // Choose a random disk on each attempt. - splitPrefixes := strings.SplitN(prefixPath, "/", 3) - uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk) - if err != nil { - if err == errFileNotFound { - return []uploadInfo{}, nil + for _, disk := range xl.getLoadBalancedQuorumDisks() { + splitPrefixes := strings.SplitN(prefixPath, "/", 3) + uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk) + if err != nil { + if err == errFileNotFound { + return []uploadInfo{}, nil + } + return nil, err } - return nil, err + uploadsInfo = uploadsJSON.Uploads + break } - uploadsInfo = uploadsJSON.Uploads return uploadsInfo, nil } -// listMultipartUploads - lists all multipart uploads. -func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - result := ListMultipartsInfo{} - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} - } - if !xl.isBucketExist(bucket) { - return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket} - } - if !IsValidObjectPrefix(prefix) { - return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} - } - // Verify if delimiter is anything other than '/', which we do not support. - if delimiter != "" && delimiter != slashSeparator { - return ListMultipartsInfo{}, UnsupportedDelimiter{ - Delimiter: delimiter, - } - } - // Verify if marker has prefix. - if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { - return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ - Marker: keyMarker, - Prefix: prefix, - } - } - if uploadIDMarker != "" { - if strings.HasSuffix(keyMarker, slashSeparator) { - return result, InvalidUploadIDKeyCombination{ - UploadIDMarker: uploadIDMarker, - KeyMarker: keyMarker, - } - } - id, err := uuid.Parse(uploadIDMarker) - if err != nil { - return result, err - } - if id.IsZero() { - return result, MalformedUploadID{ - UploadID: uploadIDMarker, - } - } - } - - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - result.IsTruncated = true - result.MaxUploads = maxUploads - result.KeyMarker = keyMarker - result.Prefix = prefix - result.Delimiter = delimiter - - // Not using path.Join() as it strips off the trailing '/'. - multipartPrefixPath := pathJoin(mpartMetaPrefix, bucket, prefix) - if prefix == "" { - // Should have a trailing "/" if prefix is "" - // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" - multipartPrefixPath += slashSeparator - } - multipartMarkerPath := "" - if keyMarker != "" { - multipartMarkerPath = pathJoin(mpartMetaPrefix, bucket, keyMarker) - } - var uploads []uploadMetadata - var err error - var eof bool - if uploadIDMarker != "" { - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) - uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, xl.getRandomDisk()) - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) - - if err != nil { - return ListMultipartsInfo{}, err - } - maxUploads = maxUploads - len(uploads) - } - if maxUploads > 0 { - walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) - if walker == nil { - walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) - } - for maxUploads > 0 { - walkResult, ok := <-walker.ch - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - // File not found or Disk not found is a valid case. - if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { - continue - } - return ListMultipartsInfo{}, err - } - entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket))) - if strings.HasSuffix(walkResult.entry, slashSeparator) { - uploads = append(uploads, uploadMetadata{ - Object: entry, - }) - maxUploads-- - if maxUploads == 0 { - if walkResult.end { - eof = true - break - } - } - continue - } - var newUploads []uploadMetadata - var end bool - uploadIDMarker = "" - nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) - newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, xl.getRandomDisk()) - nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) - if err != nil { - if err == errFileNotFound || walkResult.err == errDiskNotFound { - continue - } - return ListMultipartsInfo{}, err - } - uploads = append(uploads, newUploads...) - maxUploads -= len(newUploads) - if walkResult.end && end { - eof = true - break - } - } - } - // Loop through all the received uploads fill in the multiparts result. - for _, upload := range uploads { - var objectName string - var uploadID string - if strings.HasSuffix(upload.Object, slashSeparator) { - // All directory entries are common prefixes. - uploadID = "" // Upload ids are empty for CommonPrefixes. - objectName = upload.Object - result.CommonPrefixes = append(result.CommonPrefixes, objectName) - } else { - uploadID = upload.UploadID - objectName = upload.Object - result.Uploads = append(result.Uploads, upload) - } - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID - } - result.IsTruncated = !eof - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - return result, nil -} - // isUploadIDExists - verify if a given uploadID exists and is valid. func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool { uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 597aa65fd..c3b12af9e 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -27,32 +27,199 @@ import ( "time" "github.com/minio/minio/pkg/mimedb" + "github.com/skyrings/skyring-common/tools/uuid" ) -// ListMultipartUploads - list multipart uploads. +// listMultipartUploads - lists all multipart uploads. +func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { + result := ListMultipartsInfo{ + IsTruncated: true, + MaxUploads: maxUploads, + KeyMarker: keyMarker, + Prefix: prefix, + Delimiter: delimiter, + } + + recursive := true + if delimiter == slashSeparator { + recursive = false + } + + // Not using path.Join() as it strips off the trailing '/'. + multipartPrefixPath := pathJoin(mpartMetaPrefix, bucket, prefix) + if prefix == "" { + // Should have a trailing "/" if prefix is "" + // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" + multipartPrefixPath += slashSeparator + } + multipartMarkerPath := "" + if keyMarker != "" { + multipartMarkerPath = pathJoin(mpartMetaPrefix, bucket, keyMarker) + } + var uploads []uploadMetadata + var err error + var eof bool + // List all upload ids for the keyMarker starting from + // uploadIDMarker first. + if uploadIDMarker != "" { + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) + disk := xl.getLoadBalancedQuorumDisks()[0] + uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) + if err != nil { + return ListMultipartsInfo{}, err + } + maxUploads = maxUploads - len(uploads) + } + // Validate if we need to list further depending on maxUploads. + if maxUploads > 0 { + walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + if walker == nil { + walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) + } + // Collect uploads until we have reached maxUploads count to 0. + for maxUploads > 0 { + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + // File not found or Disk not found is a valid case. + if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { + continue + } + return ListMultipartsInfo{}, err + } + entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket))) + // For an entry looking like a directory, store and + // continue the loop not need to fetch uploads. + if strings.HasSuffix(walkResult.entry, slashSeparator) { + uploads = append(uploads, uploadMetadata{ + Object: entry, + }) + maxUploads-- + if maxUploads == 0 { + if walkResult.end { + eof = true + break + } + } + continue + } + var newUploads []uploadMetadata + var end bool + uploadIDMarker = "" + // For the new object entry we get all its pending uploadIDs. + nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) + disk := xl.getLoadBalancedQuorumDisks()[0] + newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) + nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) + if err != nil { + if err == errFileNotFound || walkResult.err == errDiskNotFound { + continue + } + return ListMultipartsInfo{}, err + } + uploads = append(uploads, newUploads...) + maxUploads -= len(newUploads) + if walkResult.end && end { + eof = true + break + } + } + } + // For all received uploads fill in the multiparts result. + for _, upload := range uploads { + var objectName string + var uploadID string + if strings.HasSuffix(upload.Object, slashSeparator) { + // All directory entries are common prefixes. + uploadID = "" // For common prefixes, upload ids are empty. + objectName = upload.Object + result.CommonPrefixes = append(result.CommonPrefixes, objectName) + } else { + uploadID = upload.UploadID + objectName = upload.Object + result.Uploads = append(result.Uploads, upload) + } + result.NextKeyMarker = objectName + result.NextUploadIDMarker = uploadID + } + result.IsTruncated = !eof + // Result is not truncated, reset the markers. + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + return result, nil +} + +// ListMultipartUploads - lists all the pending multipart uploads on a +// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a +// delimiter' which allows us to list uploads match a particular +// prefix or lexically starting from 'keyMarker' or delimiting the +// output to get a directory like listing. +// +// Implements S3 compatible ListMultipartUploads API. The resulting +// ListMultipartsInfo structure is unmarshalled directly into XML and +// replied back to the client. func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { + result := ListMultipartsInfo{} + + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} + } + if !xl.isBucketExist(bucket) { + return ListMultipartsInfo{}, BucketNotFound{Bucket: bucket} + } + if !IsValidObjectPrefix(prefix) { + return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + } + // Verify if delimiter is anything other than '/', which we do not support. + if delimiter != "" && delimiter != slashSeparator { + return ListMultipartsInfo{}, UnsupportedDelimiter{ + Delimiter: delimiter, + } + } + // Verify if marker has prefix. + if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { + return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ + Marker: keyMarker, + Prefix: prefix, + } + } + if uploadIDMarker != "" { + if strings.HasSuffix(keyMarker, slashSeparator) { + return result, InvalidUploadIDKeyCombination{ + UploadIDMarker: uploadIDMarker, + KeyMarker: keyMarker, + } + } + id, err := uuid.Parse(uploadIDMarker) + if err != nil { + return result, err + } + if id.IsZero() { + return result, MalformedUploadID{ + UploadID: uploadIDMarker, + } + } + } return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } -// newMultipartUpload - initialize a new multipart. +// newMultipartUpload - wrapper for initializing a new multipart +// request, returns back a unique upload id. +// +// Internally this function creates 'uploads.json' associated for the +// incoming object at '.minio/multipart/bucket/object/uploads.json' on +// all the disks. `uploads.json` carries metadata regarding on going +// multipart operation on the object. func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) { - // Verify if bucket name is valid. - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - // Verify whether the bucket exists. - if !xl.isBucketExist(bucket) { - return "", BucketNotFound{Bucket: bucket} - } - // Verify if object name is valid. - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - // No metadata is set, allocate a new one. - if meta == nil { - meta = make(map[string]string) - } - xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) // If not set default to "application/octet-stream" if meta["content-type"] == "" { @@ -92,14 +259,13 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } -// NewMultipartUpload - initialize a new multipart upload, returns a unique id. +// NewMultipartUpload - initialize a new multipart upload, returns a +// unique id. The unique id returned here is of UUID form, for each +// subsequent request each UUID is unique. +// +// Implements S3 compatible initiate multipart API. func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { - return xl.newMultipartUpload(bucket, object, meta) -} - -// putObjectPart - put object part. -func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - // Verify if bucket is valid. + // Verify if bucket name is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} } @@ -107,9 +273,22 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, if !xl.isBucketExist(bucket) { return "", BucketNotFound{Bucket: bucket} } + // Verify if object name is valid. if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} } + // No metadata is set, allocate a new one. + if meta == nil { + meta = make(map[string]string) + } + return xl.newMultipartUpload(bucket, object, meta) +} + +// putObjectPart - reads incoming data until EOF for the part file on +// an ongoing multipart transaction. Internally incoming data is +// erasure coded and written across all disks. +func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { + // Hold the lock and start the operation. uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) nsMutex.Lock(minioMetaBucket, uploadIDPath) defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) @@ -130,51 +309,39 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, // Pick one from the first valid metadata. xlMeta := pickValidXLMeta(partsMetadata) - // Initialize a new erasure with online disks and new distribution. - erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) - - // Initialize sha512 hash. - erasure.InitHash("sha512") - partSuffix := fmt.Sprintf("object%d", partID) tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix) // Initialize md5 writer. md5Writer := md5.New() - // Allocate blocksized buffer for reading. - buf := make([]byte, blockSizeV1) + // Construct a tee reader for md5sum. + teeReader := io.TeeReader(data, md5Writer) - // Read until io.EOF, fill the allocated buf. - for { - var n int - n, err = io.ReadFull(data, buf) - if err == io.EOF { - break - } - if err != nil && err != io.ErrUnexpectedEOF { - return "", toObjectErr(err, bucket, object) - } - // Update md5 writer. - md5Writer.Write(buf[:n]) - var m int64 - m, err = erasure.AppendFile(minioMetaBucket, tmpPartPath, buf[:n]) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, tmpPartPath) - } - if m != int64(len(buf[:n])) { - return "", toObjectErr(errUnexpected, bucket, object) - } + // Collect all the previous erasure infos across the disk. + var eInfos []erasureInfo + for index := range onlineDisks { + eInfos = append(eInfos, partsMetadata[index].Erasure) + } + + // Erasure code data and write across all disks. + newEInfos, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, tmpPartPath) } // Calculate new md5sum. newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { + // MD5 mismatch, delete the temporary object. + xl.deleteObject(minioMetaBucket, tmpPartPath) + // Returns md5 mismatch. return "", BadDigest{md5Hex, newMD5Hex} } } + // Validates if upload ID exists again. if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } @@ -191,28 +358,17 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, // Add the current part. xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) - // Get calculated hash checksums from erasure to save in `xl.json`. - hashChecksums := erasure.GetHashes() - - checkSums := make([]checkSumInfo, len(xl.storageDisks)) - for index := range xl.storageDisks { - blockIndex := xlMeta.Erasure.Distribution[index] - 1 - checkSums[blockIndex] = checkSumInfo{ - Name: partSuffix, - Algorithm: "sha512", - Hash: hashChecksums[blockIndex], - } - } + // Update `xl.json` content for each disks. for index := range partsMetadata { - blockIndex := xlMeta.Erasure.Distribution[index] - 1 partsMetadata[index].Parts = xlMeta.Parts - partsMetadata[index].Erasure.Checksum = append(partsMetadata[index].Erasure.Checksum, checkSums[blockIndex]) + partsMetadata[index].Erasure = newEInfos[index] } // Write all the checksum metadata. tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) - // Write unique `xl.json` each disk. + // Writes a unique `xl.json` each disk carrying new checksum + // related information. if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } @@ -225,32 +381,29 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, return newMD5Hex, nil } -// PutObjectPart - writes the multipart upload chunks. +// PutObjectPart - reads incoming stream and internally erasure codes +// them. This call is similar to single put operation but it is part +// of the multipart transcation. +// +// Implements S3 compatible Upload Part API. func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - return xl.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) -} - -// ListObjectParts - list object parts. -func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + return "", BucketNameInvalid{Bucket: bucket} } // Verify whether the bucket exists. if !xl.isBucketExist(bucket) { - return ListPartsInfo{}, BucketNotFound{Bucket: bucket} + return "", BucketNotFound{Bucket: bucket} } if !IsValidObjectName(object) { - return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} - } - // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - - if !xl.isUploadIDExists(bucket, object, uploadID) { - return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + return "", ObjectNameInvalid{Bucket: bucket, Object: object} } + return xl.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) +} +// listObjectParts - wrapper reading `xl.json` for a given object and +// uploadID. Lists all the parts captured inside `xl.json` content. +func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { result := ListPartsInfo{} uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -312,11 +465,42 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM return result, nil } -// ListObjectParts - list object parts. +// ListObjectParts - lists all previously uploaded parts for a given +// object and uploadID. Takes additional input of part-number-marker +// to indicate where the listing should begin from. +// +// Implements S3 compatible ListObjectParts API. The resulting +// ListPartsInfo structure is unmarshalled directly into XML and +// replied back to the client. func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { - return xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + } + // Verify whether the bucket exists. + if !xl.isBucketExist(bucket) { + return ListPartsInfo{}, BucketNotFound{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + } + // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + + if !xl.isUploadIDExists(bucket, object, uploadID) { + return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + } + result, err := xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) + return result, err } +// CompleteMultipartUpload - completes an ongoing multipart +// transaction after receiving all the parts indicated by the client. +// Returns an md5sum calculated by concatenating all the individual +// md5sums of all the parts. +// +// Implements S3 compatible Complete multipart API. func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -367,7 +551,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Allocate parts similar to incoming slice. xlMeta.Parts = make([]objectPartInfo, len(parts)) - // Loop through all parts, validate them and then commit to disk. + // Validate each part and then commit to disk. for i, part := range parts { partIdx := currentXLMeta.ObjectPartIndex(part.PartNumber) if partIdx == -1 { @@ -414,6 +598,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload partsMetadata[index].Meta = xlMeta.Meta partsMetadata[index].Parts = xlMeta.Parts } + // Write unique `xl.json` for each disk. if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) @@ -461,20 +646,25 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. - uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...) - if err == nil { - uploadIDIdx := uploadsJSON.Index(uploadID) - if uploadIDIdx != -1 { - uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) - } - if len(uploadsJSON.Uploads) > 0 { - if err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...); err != nil { - return "", err - } - return s3MD5, nil - } + disk := xl.getLoadBalancedQuorumDisks()[0] + uploadsJSON, err := readUploadsJSON(bucket, object, disk) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, object) } - + // If we have successfully read `uploads.json`, then we proceed to + // purge or update `uploads.json`. + uploadIDIdx := uploadsJSON.Index(uploadID) + if uploadIDIdx != -1 { + uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) + } + if len(uploadsJSON.Uploads) > 0 { + if err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...); err != nil { + return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + } + // Return success. + return s3MD5, nil + } // No more pending uploads for the object, proceed to delete + // object completely from '.minio/multipart'. err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) if err != nil { return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) @@ -484,8 +674,59 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } -// abortMultipartUpload - aborts a multipart upload. -func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) error { +// abortMultipartUpload - wrapper for purging an ongoing multipart +// transaction, deletes uploadID entry from `uploads.json` and purges +// the directory at '.minio/multipart/bucket/object/uploadID' holding +// all the upload parts. +func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) { + // Cleanup all uploaded parts. + if err = cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil { + return toObjectErr(err, bucket, object) + } + + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + // Validate if there are other incomplete upload-id's present for + // the object, if yes do not attempt to delete 'uploads.json'. + disk := xl.getLoadBalancedQuorumDisks()[0] + uploadsJSON, err := readUploadsJSON(bucket, object, disk) + if err != nil { + return toObjectErr(err, bucket, object) + } + uploadIDIdx := uploadsJSON.Index(uploadID) + if uploadIDIdx != -1 { + uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) + } + if len(uploadsJSON.Uploads) > 0 { + // There are pending uploads for the same object, preserve + // them update 'uploads.json' in-place. + err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...) + if err != nil { + return toObjectErr(err, bucket, object) + } + return nil + } // No more pending uploads for the object, we purge the entire + // entry at '.minio/multipart/bucket/object'. + if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil { + return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) + } + + // Successfully purged. + return nil +} + +// AbortMultipartUpload - aborts an ongoing multipart operation +// signified by the input uploadID. This is an atomic operation +// doesn't require clients to initiate multiple such requests. +// +// All parts are purged from all disks and reference to the uploadID +// would be removed from the system, rollback is not possible on this +// operation. +// +// Implements S3 compatible Abort multipart API, slight difference is +// that this is an atomic idempotent operation. Subsequent calls have +// no affect and further requests to the same uploadID would not be honored. +func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} @@ -504,37 +745,6 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) error if !xl.isUploadIDExists(bucket, object, uploadID) { return InvalidUploadID{UploadID: uploadID} } - - // Cleanup all uploaded parts. - if err := cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil { - return toObjectErr(err, bucket, object) - } - - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) - // Validate if there are other incomplete upload-id's present for - // the object, if yes do not attempt to delete 'uploads.json'. - uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...) - if err == nil { - uploadIDIdx := uploadsJSON.Index(uploadID) - if uploadIDIdx != -1 { - uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) - } - if len(uploadsJSON.Uploads) > 0 { - err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...) - if err != nil { - return toObjectErr(err, bucket, object) - } - return nil - } - } - if err = xl.deleteObject(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)); err != nil { - return toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object)) - } - return nil -} - -// AbortMultipartUpload - aborts a multipart upload. -func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - return xl.abortMultipartUpload(bucket, object, uploadID) + err := xl.abortMultipartUpload(bucket, object, uploadID) + return err } diff --git a/xl-v1-object.go b/xl-v1-object.go index 684850e50..ea398eaa2 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -1,3 +1,19 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( @@ -16,7 +32,13 @@ import ( /// Object Operations -// GetObject - get an object. +// GetObject - reads an object erasured coded across multiple +// disks. Supports additional parameters like offset and length +// which is synonymous with HTTP Range requests. +// +// startOffset indicates the location at which the client requested +// object to be read at. length indicates the total length of the +// object requested by client. func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -60,26 +82,21 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return toObjectErr(err, bucket, object) } + // Collect all the previous erasure infos across the disk. + var eInfos []erasureInfo + for index := range onlineDisks { + eInfos = append(eInfos, partsMetadata[index].Erasure) + } + // Read from all parts. for ; partIndex < len(xlMeta.Parts); partIndex++ { // Save the current part name and size. partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size - // Initialize a new erasure with online disks, with previous - // block distribution for each part reads. - erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) - - // Set previously calculated block checksums and algorithm for validation. - erasure.SaveAlgo(checkSumAlgorithm(xlMeta, partIndex+1)) - erasure.SaveHashes(xl.metaPartBlockChecksums(partsMetadata, partIndex+1)) - - // Data block size. - blockSize := xlMeta.Erasure.BlockSize - // Start reading the part name. var buffer []byte - buffer, err = erasure.ReadFile(bucket, pathJoin(object, partName), partSize, blockSize) + buffer, err = erasureReadFile(onlineDisks, bucket, pathJoin(object, partName), partName, partSize, eInfos) if err != nil { return err } @@ -100,18 +117,15 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return nil } - // Relinquish memory. - buffer = nil - // Reset part offset to 0 to read rest of the part from the beginning. partOffset = 0 - } + } // End of read all parts loop. // Return success. return nil } -// GetObjectInfo - get object info. +// GetObjectInfo - reads object metadata and replies back ObjectInfo. func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -130,7 +144,7 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { return info, nil } -// getObjectInfo - get object info. +// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) { var xlMeta xlMetaV1 xlMeta, err = xl.readXLMetadata(bucket, object) @@ -138,19 +152,23 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er // Return error. return ObjectInfo{}, err } - objInfo = ObjectInfo{} - objInfo.IsDir = false - objInfo.Bucket = bucket - objInfo.Name = object - objInfo.Size = xlMeta.Stat.Size - objInfo.ModTime = xlMeta.Stat.ModTime - objInfo.MD5Sum = xlMeta.Meta["md5Sum"] - objInfo.ContentType = xlMeta.Meta["content-type"] - objInfo.ContentEncoding = xlMeta.Meta["content-encoding"] + objInfo = ObjectInfo{ + IsDir: false, + Bucket: bucket, + Name: object, + Size: xlMeta.Stat.Size, + ModTime: xlMeta.Stat.ModTime, + MD5Sum: xlMeta.Meta["md5Sum"], + ContentType: xlMeta.Meta["content-type"], + ContentEncoding: xlMeta.Meta["content-encoding"], + } return objInfo, nil } -// renameObject - renaming all source objects to destination object across all disks. +// renameObject - renames all source objects to destination object +// across all disks in parallel. Additionally if we have errors and do +// not have a readQuorum partially renamed files are renamed back to +// its proper location. func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject string) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -167,14 +185,13 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri go func(index int, disk StorageAPI) { defer wg.Done() err := disk.RenameFile(srcBucket, retainSlash(srcObject), dstBucket, retainSlash(dstObject)) - if err != nil { + if err != nil && err != errFileNotFound { errs[index] = err } - errs[index] = nil }(index, disk) } - // Wait for all RenameFile to finish. + // Wait for all renames to finish. wg.Wait() // Gather err count. @@ -188,13 +205,14 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. Cleanup successful renames. if errCount > len(xl.storageDisks)-xl.writeQuorum { - // Special condition if readQuorum exists, then return success. + // Check we have successful read quorum. if errCount <= len(xl.storageDisks)-xl.readQuorum { - return nil - } - // Rename back the object on disks where RenameFile succeeded + return nil // Return success. + } // else - failed to acquire read quorum. + + // Undo rename object on disks where RenameFile succeeded. for index, disk := range xl.storageDisks { - // Rename back the object in parallel to reduce overall disk latency + // Undo rename object in parallel. wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() @@ -210,7 +228,10 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri return nil } -// PutObject - create an object. +// PutObject - creates an object upon reading from the input stream +// until EOF, erasure codes the data across all disk and additionally +// writes `xl.json` which carries the necessary metadata for future +// object operations. func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -254,36 +275,23 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. higherVersion++ } - // Initialize a new erasure with online disks and new distribution. - erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) - - // Initialize sha512 hash. - erasure.InitHash("sha512") - // Initialize md5 writer. md5Writer := md5.New() - // Allocated blockSized buffer for reading. - buf := make([]byte, blockSizeV1) - for { - var n int - n, err = io.ReadFull(data, buf) - if err == io.EOF { - break - } - if err != nil && err != io.ErrUnexpectedEOF { - return "", toObjectErr(err, bucket, object) - } - // Update md5 writer. - md5Writer.Write(buf[:n]) - var m int64 - m, err = erasure.AppendFile(minioMetaBucket, tempErasureObj, buf[:n]) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, tempErasureObj) - } - if m != int64(len(buf[:n])) { - return "", toObjectErr(errUnexpected, bucket, object) - } + // Tee reader combines incoming data stream and md5, data read + // from input stream is written to md5. + teeReader := io.TeeReader(data, md5Writer) + + // Collect all the previous erasure infos across the disk. + var eInfos []erasureInfo + for range onlineDisks { + eInfos = append(eInfos, xlMeta.Erasure) + } + + // Erasure code and write across all disks. + newEInfos, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "object1", teeReader, eInfos) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, tempErasureObj) } // Save additional erasureMetadata. @@ -294,6 +302,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. if len(metadata["md5Sum"]) == 0 { metadata["md5Sum"] = newMD5Hex } + // If not set default to "application/octet-stream" if metadata["content-type"] == "" { contentType := "application/octet-stream" @@ -310,11 +319,15 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. md5Hex := metadata["md5Sum"] if md5Hex != "" { if newMD5Hex != md5Hex { + // MD5 mismatch, delete the temporary object. + xl.deleteObject(minioMetaBucket, tempObj) + // Returns md5 mismatch. return "", BadDigest{md5Hex, newMD5Hex} } } // Check if an object is present as one of the parent dir. + // -- FIXME. (needs a new kind of lock). if xl.parentDirIsObject(bucket, path.Dir(object)) { return "", toObjectErr(errFileAccessDenied, bucket, object) } @@ -334,26 +347,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Add the final part. xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size) - // Get hash checksums. - hashChecksums := erasure.GetHashes() - - // Save the checksums. - checkSums := make([]checkSumInfo, len(xl.storageDisks)) - for index := range xl.storageDisks { - blockIndex := xlMeta.Erasure.Distribution[index] - 1 - checkSums[blockIndex] = checkSumInfo{ - Name: "object1", - Algorithm: "sha512", - Hash: hashChecksums[blockIndex], - } - } - - // Update all the necessary fields making sure that checkSum field - // is different for each disks. + // Update `xl.json` content on each disks. for index := range partsMetadata { - blockIndex := xlMeta.Erasure.Distribution[index] - 1 partsMetadata[index] = xlMeta - partsMetadata[index].Erasure.Checksum = append(partsMetadata[index].Erasure.Checksum, checkSums[blockIndex]) + partsMetadata[index].Erasure = newEInfos[index] } // Write unique `xl.json` for each disk. @@ -361,7 +358,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. return "", toObjectErr(err, bucket, object) } - // Rename the successfully written tempoary object to final location. + // Rename the successfully written temporary object to final location. err = xl.renameObject(minioMetaBucket, tempObj, bucket, object) if err != nil { return "", toObjectErr(err, bucket, object) @@ -374,7 +371,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. return newMD5Hex, nil } -// deleteObject - deletes a regular object. +// deleteObject - wrapper for delete object, deletes an object from +// all the disks in parallel, including `xl.json` associated with the +// object. func (xl xlObjects) deleteObject(bucket, object string) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} @@ -413,7 +412,6 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Update error counter separately. deleteFileErr++ } - // Return err if all disks report file not found. if fileNotFoundCnt == len(xl.storageDisks) { return errFileNotFound @@ -426,7 +424,9 @@ func (xl xlObjects) deleteObject(bucket, object string) error { return nil } -// DeleteObject - delete the object. +// DeleteObject - deletes an object, this call doesn't necessary reply +// any error as it is not necessary for the handler to reply back a +// response to the client request. func (xl xlObjects) DeleteObject(bucket, object string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { diff --git a/xl-v1.go b/xl-v1.go index 1278007f6..1d8005c9f 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -39,8 +39,8 @@ const ( // xlObjects - Implements XL object layer. type xlObjects struct { - storageDisks []StorageAPI // Collection of initialized backend disks. physicalDisks []string // Collection of regular disks. + storageDisks []StorageAPI // Collection of initialized backend disks. dataBlocks int // dataBlocks count caculated for erasure. parityBlocks int // parityBlocks count calculated for erasure. readQuorum int // readQuorum minimum required disks to read data. @@ -141,14 +141,13 @@ func newXLObjects(disks []string) (ObjectLayer, error) { } } - // FIXME: healFormatXL(newDisks) - // Calculate data and parity blocks. dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2 + // Initialize xl objects. xl := xlObjects{ - storageDisks: newPosixDisks, physicalDisks: disks, + storageDisks: newPosixDisks, dataBlocks: dataBlocks, parityBlocks: parityBlocks, listObjectMap: make(map[listParams][]*treeWalker),