From feb337098dd019157cb69963444b8ce5c34aa4b9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 28 May 2016 15:13:15 -0700 Subject: [PATCH] XL: bring in new storage API. (#1780) Fixes #1771 --- erasure-appendfile.go | 66 +++++ erasure-createfile.go | 186 ------------- erasure-readfile.go | 253 +++++++----------- erasure-utils.go | 6 +- format-config-v1.go | 42 +-- fs-v1-metadata.go | 44 +-- fs-v1-multipart.go | 119 +++----- fs-v1.go | 88 +++--- object-api-getobjectinfo_test.go | 9 +- object-api-multipart_test.go | 4 +- object-handlers.go | 61 ++--- object-interface.go | 2 +- object-utils.go | 18 -- object_api_suite_test.go | 35 +-- posix.go | 125 ++++++--- rpc-client.go | 72 ++--- rpc-server-datatypes.go | 23 +- rpc-server.go | 79 ++---- server_test.go | 2 +- storage-errors.go | 3 + ...e-api-interface.go => storage-interface.go | 8 +- web-handlers.go | 7 +- xl-v1-healing.go | 10 +- xl-v1-metadata.go | 96 +++---- xl-v1-multipart-common.go | 57 ++-- xl-v1-multipart.go | 80 +++--- xl-v1-object.go | 132 ++++----- 27 files changed, 634 insertions(+), 993 deletions(-) create mode 100644 erasure-appendfile.go delete mode 100644 erasure-createfile.go rename storage-api-interface.go => storage-interface.go (85%) diff --git a/erasure-appendfile.go b/erasure-appendfile.go new file mode 100644 index 000000000..449633f0e --- /dev/null +++ b/erasure-appendfile.go @@ -0,0 +1,66 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "sync" + +// AppendFile - append data buffer at path. +func (e erasure) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) { + // Split the input buffer into data and parity blocks. + var blocks [][]byte + blocks, err = e.ReedSolomon.Split(dataBuffer) + if err != nil { + return 0, err + } + + // Encode parity blocks using data blocks. + err = e.ReedSolomon.Encode(blocks) + if err != nil { + return 0, err + } + + var wg = &sync.WaitGroup{} + var wErrs = make([]error, len(e.storageDisks)) + // Write encoded data to quorum disks in parallel. + for index, disk := range e.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + // Write encoded data in routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + // Pick the block from the distribution. + blockIndex := e.distribution[index] - 1 + n, wErr := disk.AppendFile(volume, path, blocks[blockIndex]) + if wErr != nil { + wErrs[index] = wErr + return + } + if n != int64(len(blocks[blockIndex])) { + wErrs[index] = errUnexpected + return + } + wErrs[index] = nil + }(index, disk) + } + + // Wait for all the appends to finish. + wg.Wait() + + return int64(len(dataBuffer)), nil +} diff --git a/erasure-createfile.go b/erasure-createfile.go deleted file mode 100644 index 2bcd11d74..000000000 --- a/erasure-createfile.go +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "io" - "sync" -) - -// cleanupCreateFileOps - cleans up all the temporary files and other -// temporary data upon any failure. -func (e erasure) cleanupCreateFileOps(volume, path string, writers []io.WriteCloser) { - // Close and remove temporary writers. - for _, writer := range writers { - if err := safeCloseAndRemove(writer); err != nil { - errorIf(err, "Failed to close writer.") - } - } -} - -// WriteErasure reads predefined blocks, encodes them and writes to configured storage disks. -func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) { - // Release the block writer upon function return. - defer wcloser.release() - - writers := make([]io.WriteCloser, len(e.storageDisks)) - - var wwg = &sync.WaitGroup{} - var errs = make([]error, len(e.storageDisks)) - - // Initialize all writers. - for index, disk := range e.storageDisks { - if disk == nil { - continue - } - wwg.Add(1) - go func(index int, disk StorageAPI) { - defer wwg.Done() - writer, err := disk.CreateFile(volume, path) - if err != nil { - errs[index] = err - return - } - writers[index] = writer - }(index, disk) - } - - wwg.Wait() // Wait for all the create file to finish in parallel. - for _, err := range errs { - if err == nil { - continue - } - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(err) - return - } - - // Allocate 4MiB block size buffer for reading. - dataBuffer := make([]byte, erasureBlockSize) - for { - // Read up to allocated block size. - n, err := io.ReadFull(reader, dataBuffer) - if err != nil { - // Any unexpected errors, close the pipe reader with error. - if err != io.ErrUnexpectedEOF && err != io.EOF { - // Remove all temp writers. - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(err) - return - } - } - // At EOF break out. - if err == io.EOF { - break - } - if n > 0 { - // Split the input buffer into data and parity blocks. - var dataBlocks [][]byte - dataBlocks, err = e.ReedSolomon.Split(dataBuffer[0:n]) - if err != nil { - // Remove all temp writers. - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(err) - return - } - - // Encode parity blocks using data blocks. - err = e.ReedSolomon.Encode(dataBlocks) - if err != nil { - // Remove all temp writers upon error. - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(err) - return - } - - var wg = &sync.WaitGroup{} - var wErrs = make([]error, len(writers)) - // Write encoded data to quorum disks in parallel. - for index, writer := range writers { - if writer == nil { - continue - } - wg.Add(1) - // Write encoded data in routine. - go func(index int, writer io.Writer) { - defer wg.Done() - // Pick the block from the distribution. - encodedData := dataBlocks[e.distribution[index]-1] - _, wErr := writers[index].Write(encodedData) - if wErr != nil { - wErrs[index] = wErr - return - } - wErrs[index] = nil - }(index, writer) - } - wg.Wait() - - // Cleanup and return on first non-nil error. - for _, wErr := range wErrs { - if wErr == nil { - continue - } - // Remove all temp writers upon error. - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(wErr) - return - } - } - } - - // Close all writers and metadata writers in routines. - for _, writer := range writers { - if writer == nil { - continue - } - // Safely wrote, now rename to its actual location. - if err := writer.Close(); err != nil { - // Remove all temp writers upon error. - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(err) - return - } - } - - // Close the pipe reader and return. - reader.Close() - return -} - -// CreateFile - create a file. -func (e erasure) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { - // Input validation. - if !isValidVolname(volume) { - return nil, errInvalidArgument - } - if !isValidPath(path) { - return nil, errInvalidArgument - } - - // Initialize pipe for data pipe line. - pipeReader, pipeWriter := io.Pipe() - - // Initialize a new wait closer, implements both Write and Close. - wcloser := newWaitCloser(pipeWriter) - - // Start erasure encoding in routine, reading data block by block from pipeReader. - go e.writeErasure(volume, path, pipeReader, wcloser) - - // Return the writer, caller should start writing to this. - return wcloser, nil -} diff --git a/erasure-readfile.go b/erasure-readfile.go index 149bbee01..a20f4dde1 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -17,174 +17,113 @@ package main import ( + "bytes" "errors" "io" - "sync" ) // ReadFile - decoded erasure coded file. -func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.ReadCloser, error) { - // Input validation. - if !isValidVolname(volume) { - return nil, errInvalidArgument - } - if !isValidPath(path) { - return nil, errInvalidArgument - } - - var rwg = &sync.WaitGroup{} - var errs = make([]error, len(e.storageDisks)) - - readers := make([]io.ReadCloser, len(e.storageDisks)) - for index, disk := range e.storageDisks { - if disk == nil { - continue +func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.Reader, error) { + var totalLeft = totalSize + var bufWriter = new(bytes.Buffer) + for totalLeft > 0 { + // Figure out the right blockSize as it was encoded before. + var curBlockSize int64 + if erasureBlockSize < totalLeft { + curBlockSize = erasureBlockSize + } else { + curBlockSize = totalLeft } - rwg.Add(1) - go func(index int, disk StorageAPI) { - defer rwg.Done() - offset := int64(0) - reader, err := disk.ReadFile(volume, path, offset) - if err == nil { - readers[index] = reader - return + + // Calculate the current encoded block size. + curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks) + + // Allocate encoded blocks up to storage disks. + enBlocks := make([][]byte, len(e.storageDisks)) + + // Counter to keep success data blocks. + var successDataBlocksCount = 0 + var noReconstruct bool // Set for no reconstruction. + + // Read from all the disks. + for index, disk := range e.storageDisks { + blockIndex := e.distribution[index] - 1 + // Initialize shard slice and fill the data from each parts. + enBlocks[blockIndex] = make([]byte, curEncBlockSize) + if disk == nil { + enBlocks[blockIndex] = nil + } else { + var offset = int64(0) + // Read the necessary blocks. + _, err := disk.ReadFile(volume, path, offset, enBlocks[blockIndex]) + if err != nil { + enBlocks[blockIndex] = nil + } } - errs[index] = err - }(index, disk) - } + // Verify if we have successfully read all the data blocks. + if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil { + successDataBlocksCount++ + // Set when we have all the data blocks and no + // reconstruction is needed, so that we can avoid + // erasure reconstruction. + noReconstruct = successDataBlocksCount == e.DataBlocks + if noReconstruct { + // Break out we have read all the data blocks. + break + } + } + } - // Wait for all readers. - rwg.Wait() + // Check blocks if they are all zero in length, we have corruption return error. + if checkBlockSize(enBlocks) == 0 { + return nil, errDataCorrupt + } - // For any errors in reader, we should just error out. - for _, err := range errs { + // Verify if reconstruction is needed, proceed with reconstruction. + if !noReconstruct { + err := e.ReedSolomon.Reconstruct(enBlocks) + if err != nil { + return nil, err + } + // Verify reconstructed blocks (parity). + ok, err := e.ReedSolomon.Verify(enBlocks) + if err != nil { + return nil, err + } + if !ok { + // Blocks cannot be reconstructed, corrupted data. + err = errors.New("Verification failed after reconstruction, data likely corrupted.") + return nil, err + } + } + + // Get data blocks from encoded blocks. + dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize)) + + // Verify if the offset is right for the block, if not move to + // the next block. + if startOffset > 0 { + startOffset = startOffset - int64(len(dataBlocks)) + // Start offset is greater than or equal to zero, skip the dataBlocks. + if startOffset >= 0 { + totalLeft = totalLeft - erasureBlockSize + continue + } + // Now get back the remaining offset if startOffset is negative. + startOffset = startOffset + int64(len(dataBlocks)) + } + + // Copy data blocks. + _, err := bufWriter.Write(dataBlocks[startOffset:]) if err != nil { return nil, err } + + // Reset dataBlocks to relenquish memory. + dataBlocks = nil + + // Save what's left after reading erasureBlockSize. + totalLeft = totalLeft - erasureBlockSize } - - // Initialize pipe. - pipeReader, pipeWriter := io.Pipe() - - go func() { - var totalLeft = totalSize - // Read until EOF. - for totalLeft > 0 { - // Figure out the right blockSize as it was encoded before. - var curBlockSize int64 - if erasureBlockSize < totalLeft { - curBlockSize = erasureBlockSize - } else { - curBlockSize = totalLeft - } - - // Calculate the current encoded block size. - curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks) - - // Allocate encoded blocks up to storage disks. - enBlocks := make([][]byte, len(e.storageDisks)) - - // Counter to keep success data blocks. - var successDataBlocksCount = 0 - var noReconstruct bool // Set for no reconstruction. - - // Read all the readers. - for index, reader := range readers { - blockIndex := e.distribution[index] - 1 - // Initialize shard slice and fill the data from each parts. - enBlocks[blockIndex] = make([]byte, curEncBlockSize) - if reader == nil { - enBlocks[blockIndex] = nil - continue - } - - // Close the reader when routine returns. - defer reader.Close() - - // Read the necessary blocks. - _, rErr := io.ReadFull(reader, enBlocks[blockIndex]) - if rErr != nil && rErr != io.ErrUnexpectedEOF { - enBlocks[blockIndex] = nil - } - - // Verify if we have successfully all the data blocks. - if blockIndex < e.DataBlocks { - successDataBlocksCount++ - // Set when we have all the data blocks and no - // reconstruction is needed, so that we can avoid - // erasure reconstruction. - noReconstruct = successDataBlocksCount == e.DataBlocks - if noReconstruct { - // Break out we have read all the data blocks. - break - } - } - } - - // Check blocks if they are all zero in length, we have - // corruption return error. - if checkBlockSize(enBlocks) == 0 { - pipeWriter.CloseWithError(errDataCorrupt) - return - } - - // Verify if reconstruction is needed, proceed with reconstruction. - if !noReconstruct { - err := e.ReedSolomon.Reconstruct(enBlocks) - if err != nil { - pipeWriter.CloseWithError(err) - return - } - // Verify reconstructed blocks (parity). - ok, err := e.ReedSolomon.Verify(enBlocks) - if err != nil { - pipeWriter.CloseWithError(err) - return - } - if !ok { - // Blocks cannot be reconstructed, corrupted data. - err = errors.New("Verification failed after reconstruction, data likely corrupted.") - pipeWriter.CloseWithError(err) - return - } - } - - // Get data blocks from encoded blocks. - dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize)) - - // Verify if the offset is right for the block, if not move to the next block. - if startOffset > 0 { - startOffset = startOffset - int64(len(dataBlocks)) - // Start offset is greater than or equal to zero, skip the dataBlocks. - if startOffset >= 0 { - totalLeft = totalLeft - erasureBlockSize - continue - } - // Now get back the remaining offset if startOffset is negative. - startOffset = startOffset + int64(len(dataBlocks)) - } - - // Write safely the necessary blocks to the pipe. - _, err := pipeWriter.Write(dataBlocks[int(startOffset):]) - if err != nil { - pipeWriter.CloseWithError(err) - return - } - - // Reset dataBlocks to relenquish memory. - dataBlocks = nil - - // Reset offset to '0' to read rest of the blocks. - startOffset = int64(0) - - // Save what's left after reading erasureBlockSize. - totalLeft = totalLeft - erasureBlockSize - } - - // Cleanly end the pipe after a successful decoding. - pipeWriter.Close() - }() - - // Return the pipe for the top level caller to start reading. - return pipeReader, nil + return bufWriter, nil } diff --git a/erasure-utils.go b/erasure-utils.go index 6a2839bbf..b992983b8 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -39,7 +39,7 @@ func checkBlockSize(blocks [][]byte) int { // calculate the blockSize based on input length and total number of // data blocks. -func getEncodedBlockLen(inputLen int64, dataBlocks int) (curBlockSize int64) { - curBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) - return curBlockSize +func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) { + curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) + return curEncBlockSize } diff --git a/format-config-v1.go b/format-config-v1.go index 823d40cff..f9547adb1 100644 --- a/format-config-v1.go +++ b/format-config-v1.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "strings" "github.com/skyrings/skyring-common/tools/uuid" @@ -116,8 +115,10 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) // loadFormat - load format from disk. func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { + buffer := make([]byte, blockSize) offset := int64(0) - r, err := disk.ReadFile(minioMetaBucket, formatConfigFile, offset) + var n int64 + n, err = disk.ReadFile(minioMetaBucket, formatConfigFile, offset, buffer) if err != nil { // 'file not found' and 'volume not found' as // same. 'volume not found' usually means its a fresh disk. @@ -136,15 +137,11 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { } return nil, err } - decoder := json.NewDecoder(r) format = &formatConfigV1{} - err = decoder.Decode(&format) + err = json.Unmarshal(buffer[:n], format) if err != nil { return nil, err } - if err = r.Close(); err != nil { - return nil, err - } return format, nil } @@ -215,7 +212,6 @@ func checkFormatXL(formatConfigs []*formatConfigV1) error { func initFormatXL(storageDisks []StorageAPI) (err error) { var ( jbod = make([]string, len(storageDisks)) - formatWriters = make([]io.WriteCloser, len(storageDisks)) formats = make([]*formatConfigV1, len(storageDisks)) saveFormatErrCnt = 0 ) @@ -230,16 +226,6 @@ func initFormatXL(storageDisks []StorageAPI) (err error) { return errWriteQuorum } } - var w io.WriteCloser - w, err = disk.CreateFile(minioMetaBucket, formatConfigFile) - if err != nil { - saveFormatErrCnt++ - // Check for write quorum. - if saveFormatErrCnt <= len(storageDisks)-(len(storageDisks)/2+3) { - continue - } - return err - } var u *uuid.UUID u, err = uuid.New() if err != nil { @@ -250,7 +236,6 @@ func initFormatXL(storageDisks []StorageAPI) (err error) { } return err } - formatWriters[index] = w formats[index] = &formatConfigV1{ Version: "1", Format: "xl", @@ -261,24 +246,19 @@ func initFormatXL(storageDisks []StorageAPI) (err error) { } jbod[index] = formats[index].XL.Disk } - for index, w := range formatWriters { - if formats[index] == nil { - continue - } + for index, disk := range storageDisks { formats[index].XL.JBOD = jbod - encoder := json.NewEncoder(w) - err = encoder.Encode(&formats[index]) + formatBytes, err := json.Marshal(formats[index]) if err != nil { return err } - } - for _, w := range formatWriters { - if w == nil { - continue - } - if err = w.Close(); err != nil { + n, err := disk.AppendFile(minioMetaBucket, formatConfigFile, formatBytes) + if err != nil { return err } + if n != int64(len(formatBytes)) { + return errUnexpected + } } return nil } diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index a9c980eac..c87d1061f 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -1,9 +1,7 @@ package main import ( - "bytes" "encoding/json" - "io" "path" "sort" ) @@ -22,28 +20,6 @@ type fsMetaV1 struct { Parts []objectPartInfo `json:"parts,omitempty"` } -// ReadFrom - read from implements io.ReaderFrom interface for -// unmarshalling fsMetaV1. -func (m *fsMetaV1) ReadFrom(reader io.Reader) (n int64, err error) { - var buffer bytes.Buffer - n, err = buffer.ReadFrom(reader) - if err != nil { - return 0, err - } - err = json.Unmarshal(buffer.Bytes(), m) - return n, err -} - -// WriteTo - write to implements io.WriterTo interface for marshalling fsMetaV1. -func (m fsMetaV1) WriteTo(writer io.Writer) (n int64, err error) { - metadataBytes, err := json.Marshal(m) - if err != nil { - return 0, err - } - p, err := writer.Write(metadataBytes) - return int64(p), err -} - // ObjectPartIndex - returns the index of matching object part number. func (m fsMetaV1) ObjectPartIndex(partNumber int) (partIndex int) { for i, part := range m.Parts { @@ -81,12 +57,12 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin // readFSMetadata - returns the object metadata `fs.json` content. func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) { - r, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0)) + buffer := make([]byte, blockSize) + n, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0), buffer) if err != nil { return fsMetaV1{}, err } - defer r.Close() - _, err = fsMeta.ReadFrom(r) + err = json.Unmarshal(buffer[:n], &fsMeta) if err != nil { return fsMetaV1{}, err } @@ -104,22 +80,16 @@ func newFSMetaV1() (fsMeta fsMetaV1) { // writeFSMetadata - writes `fs.json` metadata. func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) error { - w, err := fs.storage.CreateFile(bucket, path.Join(prefix, fsMetaJSONFile)) + metadataBytes, err := json.Marshal(fsMeta) if err != nil { return err } - _, err = fsMeta.WriteTo(w) + n, err := fs.storage.AppendFile(bucket, path.Join(prefix, fsMetaJSONFile), metadataBytes) if err != nil { - if mErr := safeCloseAndRemove(w); mErr != nil { - return mErr - } return err } - if err = w.Close(); err != nil { - if mErr := safeCloseAndRemove(w); mErr != nil { - return mErr - } - return err + if n != int64(len(metadataBytes)) { + return errUnexpected } return nil } diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index b64c4e6c5..778c33252 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -21,7 +21,6 @@ import ( "encoding/hex" "fmt" "io" - "io/ioutil" "path" "strconv" "strings" @@ -302,61 +301,36 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s partSuffix := fmt.Sprintf("object%d", partID) tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) - fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tmpPartPath) - if err != nil { - return "", toObjectErr(err, bucket, object) - } // Initialize md5 writer. md5Writer := md5.New() - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } + var buf = make([]byte, blockSize) + for { + n, err := io.ReadFull(data, buf) + if err == io.EOF { + break + } + if err != nil && err != io.ErrUnexpectedEOF { return "", toObjectErr(err, bucket, object) } - // Reader shouldn't have more data what mentioned in size argument. - // reading one more byte from the reader to validate it. - // expected to fail, success validates existence of more data in the reader. - if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", UnExpectedDataSize{Size: int(size)} - } - } else { - var n int64 - if n, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } + // Update md5 writer. + md5Writer.Write(buf[:n]) + m, err := fs.storage.AppendFile(minioMetaBucket, tmpPartPath, buf[:n]) + if err != nil { return "", toObjectErr(err, bucket, object) } - size = n + if m != int64(len(buf[:n])) { + return "", toObjectErr(errUnexpected, bucket, object) + } } newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } return "", BadDigest{md5Hex, newMD5Hex} } } - err = fileWriter.Close() - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", err - } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) @@ -373,8 +347,17 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s } return "", toObjectErr(err, minioMetaBucket, partPath) } - if err = fs.writeFSMetadata(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID), fsMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) + } + err = fs.storage.RenameFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile), minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) + if err != nil { + if dErr := fs.storage.DeleteFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile)); dErr != nil { + return "", toObjectErr(dErr, minioMetaBucket, tempUploadIDPath) + } + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } return newMD5Hex, nil } @@ -493,10 +476,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, "object1") - fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj) - if err != nil { - return "", toObjectErr(err, bucket, object) - } + var buffer = make([]byte, blockSize) // Loop through all parts, validate them and then commit to disk. for i, part := range parts { @@ -509,45 +489,30 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload if err == errFileNotFound { return "", InvalidPart{} } - return "", err + return "", toObjectErr(err, minioMetaBucket, multipartPartFile) } // All parts except the last part has to be atleast 5MB. if (i < len(parts)-1) && !isMinAllowedPartSize(fi.Size) { return "", PartTooSmall{} } - var fileReader io.ReadCloser - fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0) - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr + offset := int64(0) + totalLeft := fi.Size + for totalLeft > 0 { + var n int64 + n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer) + if err != nil { + if err == errFileNotFound { + return "", InvalidPart{} + } + return "", toObjectErr(err, minioMetaBucket, multipartPartFile) } - if err == errFileNotFound { - return "", InvalidPart{} + n, err = fs.storage.AppendFile(minioMetaBucket, tempObj, buffer[:n]) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, tempObj) } - return "", err + offset += n + totalLeft -= n } - _, err = io.Copy(fileWriter, fileReader) - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err - } - err = fileReader.Close() - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err - } - } - - err = fileWriter.Close() - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err } // Rename the file back to original location, if not delete the temporary object. diff --git a/fs-v1.go b/fs-v1.go index 8f4da244d..62df7b062 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "io" "os" + "path" "path/filepath" "sort" "strings" @@ -146,20 +147,37 @@ func (fs fsObjects) DeleteBucket(bucket string) error { /// Object Operations // GetObject - get an object. -func (fs fsObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) { +func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return nil, BucketNameInvalid{Bucket: bucket} + return BucketNameInvalid{Bucket: bucket} } // Verify if object is valid. if !IsValidObjectName(object) { - return nil, ObjectNameInvalid{Bucket: bucket, Object: object} + return ObjectNameInvalid{Bucket: bucket, Object: object} } - fileReader, err := fs.storage.ReadFile(bucket, object, startOffset) - if err != nil { - return nil, toObjectErr(err, bucket, object) + var totalLeft = length + for totalLeft > 0 { + // Figure out the right blockSize as it was encoded before. + var curBlockSize int64 + if blockSize < totalLeft { + curBlockSize = blockSize + } else { + curBlockSize = totalLeft + } + buf := make([]byte, curBlockSize) + n, err := fs.storage.ReadFile(bucket, object, startOffset, buf) + if err != nil { + return toObjectErr(err, bucket, object) + } + _, err = writer.Write(buf[:n]) + if err != nil { + return toObjectErr(err, bucket, object) + } + totalLeft -= n + startOffset += n } - return fileReader, nil + return nil } // GetObjectInfo - get object info. @@ -194,6 +212,10 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { }, nil } +const ( + blockSize = 4 * 1024 * 1024 // 4MiB. +) + // PutObject - create an object. func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { // Verify if bucket is valid. @@ -207,31 +229,38 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } - fileWriter, err := fs.storage.CreateFile(bucket, object) - if err != nil { - return "", toObjectErr(err, bucket, object) - } + // Temporary object. + tempObj := path.Join(tmpMetaPrefix, bucket, object) // Initialize md5 writer. md5Writer := md5.New() - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } + if size == 0 { + // For size 0 we write a 0byte file. + _, err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte("")) + if err != nil { return "", toObjectErr(err, bucket, object) } } else { - if _, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr + // Allocate buffer. + buf := make([]byte, blockSize) + for { + n, rErr := data.Read(buf) + if rErr == io.EOF { + break + } + if rErr != nil { + return "", toObjectErr(rErr, bucket, object) + } + // Update md5 writer. + md5Writer.Write(buf[:n]) + m, wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]) + if wErr != nil { + return "", toObjectErr(wErr, bucket, object) + } + if m != int64(len(buf[:n])) { + return "", toObjectErr(errUnexpected, bucket, object) } - return "", toObjectErr(err, bucket, object) } } @@ -243,18 +272,13 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } if md5Hex != "" { if newMD5Hex != md5Hex { - if err = safeCloseAndRemove(fileWriter); err != nil { - return "", err - } return "", BadDigest{md5Hex, newMD5Hex} } } - err = fileWriter.Close() + + err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err + return "", toObjectErr(err, bucket, object) } // Return md5sum, successfully wrote object. diff --git a/object-api-getobjectinfo_test.go b/object-api-getobjectinfo_test.go index df1109371..963523bed 100644 --- a/object-api-getobjectinfo_test.go +++ b/object-api-getobjectinfo_test.go @@ -20,7 +20,6 @@ import ( "bytes" "crypto/md5" "encoding/hex" - "io" "io/ioutil" "os" "strconv" @@ -111,7 +110,7 @@ func testGetObjectInfo(obj ObjectLayer, instanceType string, t *testing.T) { } } -func BenchmarkGetObject(b *testing.B) { +func BenchmarkGetObjectFS(b *testing.B) { // Make a temporary directory to use as the obj. directory, err := ioutil.TempDir("", "minio-benchmark-getobject") if err != nil { @@ -146,16 +145,12 @@ func BenchmarkGetObject(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { var buffer = new(bytes.Buffer) - r, err := obj.GetObject("bucket", "object"+strconv.Itoa(i%10), 0) + err = obj.GetObject("bucket", "object"+strconv.Itoa(i%10), 0, int64(len([]byte(text))), buffer) if err != nil { b.Error(err) } - if _, err := io.Copy(buffer, r); err != nil { - b.Error(err) - } if buffer.Len() != len(text) { b.Errorf("GetObject returned incorrect length %d (should be %d)\n", buffer.Len(), len(text)) } - r.Close() } } diff --git a/object-api-multipart_test.go b/object-api-multipart_test.go index 68989b660..8f5d2a0c1 100644 --- a/object-api-multipart_test.go +++ b/object-api-multipart_test.go @@ -180,11 +180,11 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t *testing fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated "+"d41d8cd98f00b204e9800998ecf8427e")}, // Test case - 12. // Input with size more than the size of actual data inside the reader. - {bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "", fmt.Errorf("%s", "EOF")}, + {bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "", fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")}, // Test case - 13. // Input with size less than the size of actual data inside the reader. {bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") - 1), false, "", - fmt.Errorf("%s", "Contains more data than specified size of 3 bytes.")}, + fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")}, // Test case - 14-17. // Validating for success cases. {bucket, object, uploadID, 1, "abcd", "e2fc714c4727ee9395f324cd2e7f331f", int64(len("abcd")), true, "", nil}, diff --git a/object-handlers.go b/object-handlers.go index f20cc2eb6..fc2757845 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -124,38 +124,22 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } - // Get the object. - startOffset := hrange.start - readCloser, err := api.ObjectAPI.GetObject(bucket, object, startOffset) - if err != nil { - errorIf(err, "Unable to read object.") - apiErr := toAPIErrorCode(err) - if apiErr == ErrNoSuchKey { - apiErr = errAllowableObjectNotFound(bucket, r) - } - writeErrorResponse(w, r, apiErr, r.URL.Path) - return - } - defer readCloser.Close() - // Set standard object headers. setObjectHeaders(w, objInfo, hrange) // Set any additional requested response headers. setGetRespHeaders(w, r.URL.Query()) - if hrange.length > 0 { - if _, err := io.CopyN(w, readCloser, hrange.length); err != nil { - errorIf(err, "Writing to client failed.") - // Do not send error response here, since client could have died. - return - } - } else { - if _, err := io.Copy(w, readCloser); err != nil { - errorIf(err, "Writing to client failed.") - // Do not send error response here, since client could have died. - return - } + // Get the object. + startOffset := hrange.start + length := hrange.length + if length == 0 { + length = objInfo.Size - startOffset + } + if err := api.ObjectAPI.GetObject(bucket, object, startOffset, length, w); err != nil { + errorIf(err, "Writing to client failed.") + // Do not send error response here, client would have already died. + return } } @@ -393,14 +377,19 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re return } - startOffset := int64(0) // Read the whole file. - // Get the object. - readCloser, err := api.ObjectAPI.GetObject(sourceBucket, sourceObject, startOffset) - if err != nil { - errorIf(err, "Unable to read an object.") - writeErrorResponse(w, r, toAPIErrorCode(err), objectSource) - return - } + pipeReader, pipeWriter := io.Pipe() + go func() { + startOffset := int64(0) // Read the whole file. + // Get the object. + gErr := api.ObjectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter) + if gErr != nil { + errorIf(gErr, "Unable to read an object.") + pipeWriter.CloseWithError(gErr) + return + } + pipeWriter.Close() // Close. + }() + // Size of object. size := objInfo.Size @@ -413,7 +402,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // same md5sum as the source. // Create the object. - md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata) + md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, pipeReader, metadata) if err != nil { errorIf(err, "Unable to create an object.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) @@ -434,7 +423,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // write success response. writeSuccessResponse(w, encodedSuccessResponse) // Explicitly close the reader, to avoid fd leaks. - readCloser.Close() + pipeReader.Close() } // checkCopySource implements x-amz-copy-source-if-modified-since and diff --git a/object-interface.go b/object-interface.go index 891fcd616..43e4b6bd5 100644 --- a/object-interface.go +++ b/object-interface.go @@ -31,7 +31,7 @@ type ObjectLayer interface { ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) // Object operations. - GetObject(bucket, object string, startOffset int64) (reader io.ReadCloser, err error) + GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (md5 string, err error) DeleteObject(bucket, object string) error diff --git a/object-utils.go b/object-utils.go index 6d7b6732f..ec7abc2cd 100644 --- a/object-utils.go +++ b/object-utils.go @@ -19,15 +19,12 @@ package main import ( "crypto/md5" "encoding/hex" - "errors" "fmt" - "io" "path" "regexp" "strings" "unicode/utf8" - "github.com/minio/minio/pkg/safe" "github.com/skyrings/skyring-common/tools/uuid" ) @@ -160,18 +157,3 @@ type byBucketName []BucketInfo func (d byBucketName) Len() int { return len(d) } func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name } - -// safeCloseAndRemove - safely closes and removes underlying temporary -// file writer if possible. -func safeCloseAndRemove(writer io.WriteCloser) error { - // If writer is a safe file, Attempt to close and remove. - safeWriter, ok := writer.(*safe.File) - if ok { - return safeWriter.Abort() - } - wCloser, ok := writer.(*waitCloser) - if ok { - return wCloser.CloseWithError(errors.New("Close and error out.")) - } - return nil -} diff --git a/object_api_suite_test.go b/object_api_suite_test.go index 96e744202..d817b6a8a 100644 --- a/object_api_suite_test.go +++ b/object_api_suite_test.go @@ -20,7 +20,6 @@ import ( "bytes" "crypto/md5" "encoding/hex" - "io" "math/rand" "strconv" @@ -133,24 +132,21 @@ func testMultipleObjectCreation(c *check.C, create func() ObjectLayer) { objects[key] = []byte(randomString) metadata := make(map[string]string) metadata["md5Sum"] = expectedMD5Sumhex - md5Sum, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata) + var md5Sum string + md5Sum, err = obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata) c.Assert(err, check.IsNil) c.Assert(md5Sum, check.Equals, expectedMD5Sumhex) } for key, value := range objects { var byteBuffer bytes.Buffer - r, err := obj.GetObject("bucket", key, 0) + err = obj.GetObject("bucket", key, 0, int64(len(value)), &byteBuffer) c.Assert(err, check.IsNil) - _, e := io.Copy(&byteBuffer, r) - c.Assert(e, check.IsNil) c.Assert(byteBuffer.Bytes(), check.DeepEquals, value) - c.Assert(r.Close(), check.IsNil) objInfo, err := obj.GetObjectInfo("bucket", key) c.Assert(err, check.IsNil) c.Assert(objInfo.Size, check.Equals, int64(len(value))) - r.Close() } } @@ -267,16 +263,14 @@ func testObjectOverwriteWorks(c *check.C, create func() ObjectLayer) { _, err = obj.PutObject("bucket", "object", int64(len("The list of parts was not in ascending order. The parts list must be specified in order by part number.")), bytes.NewBufferString("The list of parts was not in ascending order. The parts list must be specified in order by part number."), nil) c.Assert(err, check.IsNil) - _, err = obj.PutObject("bucket", "object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil) + length := int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")) + _, err = obj.PutObject("bucket", "object", length, bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil) c.Assert(err, check.IsNil) var bytesBuffer bytes.Buffer - r, err := obj.GetObject("bucket", "object", 0) + err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer) c.Assert(err, check.IsNil) - _, e := io.Copy(&bytesBuffer, r) - c.Assert(e, check.IsNil) c.Assert(string(bytesBuffer.Bytes()), check.Equals, "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.") - c.Assert(r.Close(), check.IsNil) } // Tests validate that bucket operation on non-existent bucket fails. @@ -303,17 +297,14 @@ func testPutObjectInSubdir(c *check.C, create func() ObjectLayer) { err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - _, err = obj.PutObject("bucket", "dir1/dir2/object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil) + length := int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")) + _, err = obj.PutObject("bucket", "dir1/dir2/object", length, bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil) c.Assert(err, check.IsNil) var bytesBuffer bytes.Buffer - r, err := obj.GetObject("bucket", "dir1/dir2/object", 0) + err = obj.GetObject("bucket", "dir1/dir2/object", 0, length, &bytesBuffer) c.Assert(err, check.IsNil) - n, e := io.Copy(&bytesBuffer, r) - c.Assert(e, check.IsNil) c.Assert(len(bytesBuffer.Bytes()), check.Equals, len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")) - c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, int64(n)) - c.Assert(r.Close(), check.IsNil) } // Tests validate ListBuckets. @@ -384,7 +375,8 @@ func testNonExistantObjectInBucket(c *check.C, create func() ObjectLayer) { err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) - _, err = obj.GetObject("bucket", "dir1", 0) + var bytesBuffer bytes.Buffer + err = obj.GetObject("bucket", "dir1", 0, 10, &bytesBuffer) c.Assert(err, check.Not(check.IsNil)) switch err := err.(type) { case ObjectNotFound: @@ -403,7 +395,8 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectLayer _, err = obj.PutObject("bucket", "dir1/dir3/object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("One or more of the specified parts could not be found. The part might not have been uploaded, or the specified entity tag might not have matched the part's entity tag."), nil) c.Assert(err, check.IsNil) - _, err = obj.GetObject("bucket", "dir1", 0) + var bytesBuffer bytes.Buffer + err = obj.GetObject("bucket", "dir1", 0, 10, &bytesBuffer) switch err := err.(type) { case ObjectNotFound: c.Assert(err.Bucket, check.Equals, "bucket") @@ -413,7 +406,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectLayer c.Assert(err, check.Equals, "ObjectNotFound") } - _, err = obj.GetObject("bucket", "dir1/", 0) + err = obj.GetObject("bucket", "dir1/", 0, 10, &bytesBuffer) switch err := err.(type) { case ObjectNameInvalid: c.Assert(err.Bucket, check.Equals, "bucket") diff --git a/posix.go b/posix.go index cb3438a9b..218f0755a 100644 --- a/posix.go +++ b/posix.go @@ -17,23 +17,24 @@ package main import ( + "bytes" "io" "os" slashpath "path" + "path/filepath" "runtime" "strings" "syscall" "github.com/minio/minio/pkg/disk" - "github.com/minio/minio/pkg/safe" ) const ( fsMinSpacePercent = 5 ) -// fsStorage - implements StorageAPI interface. -type fsStorage struct { +// posix - implements StorageAPI interface. +type posix struct { diskPath string minFreeDisk int64 } @@ -90,7 +91,7 @@ func newPosix(diskPath string) (StorageAPI, error) { if diskPath == "" { return nil, errInvalidArgument } - fs := fsStorage{ + fs := posix{ diskPath: diskPath, minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free. } @@ -169,7 +170,7 @@ func listVols(dirPath string) ([]VolInfo, error) { // corresponding valid volume names on the backend in a platform // compatible way for all operating systems. If volume is not found // an error is generated. -func (s fsStorage) getVolDir(volume string) (string, error) { +func (s posix) getVolDir(volume string) (string, error) { if !isValidVolname(volume) { return "", errInvalidArgument } @@ -181,7 +182,7 @@ func (s fsStorage) getVolDir(volume string) (string, error) { } // Make a volume entry. -func (s fsStorage) MakeVol(volume string) (err error) { +func (s posix) MakeVol(volume string) (err error) { // Validate if disk is free. if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { return err @@ -201,7 +202,7 @@ func (s fsStorage) MakeVol(volume string) (err error) { } // ListVols - list volumes. -func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) { +func (s posix) ListVols() (volsInfo []VolInfo, err error) { volsInfo, err = listVols(s.diskPath) if err != nil { return nil, err @@ -217,7 +218,7 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) { } // StatVol - get volume info. -func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { +func (s posix) StatVol(volume string) (volInfo VolInfo, err error) { // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -242,7 +243,7 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { } // DeleteVol - delete a volume. -func (s fsStorage) DeleteVol(volume string) error { +func (s posix) DeleteVol(volume string) error { // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -267,7 +268,7 @@ func (s fsStorage) DeleteVol(volume string) error { // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing "/". -func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) { +func (s posix) ListDir(volume, dirPath string) ([]string, error) { // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { @@ -284,93 +285,128 @@ func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) { return readDir(pathJoin(volumeDir, dirPath)) } -// ReadFile - read a file at a given offset. -func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) { +// ReadFile reads exactly len(buf) bytes into buf. It returns the +// number of bytes copied. The error is EOF only if no bytes were +// read. On return, n == len(buf) if and only if err == nil. n == 0 +// for io.EOF. Additionally ReadFile also starts reading from an +// offset. +func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) { + nsMutex.RLock(volume, path) + defer nsMutex.RUnlock(volume, path) + volumeDir, err := s.getVolDir(volume) if err != nil { - return nil, err + return 0, err } // Stat a volume entry. _, err = os.Stat(volumeDir) if err != nil { if os.IsNotExist(err) { - return nil, errVolumeNotFound + return 0, errVolumeNotFound } - return nil, err + return 0, err } filePath := pathJoin(volumeDir, path) if err = checkPathLength(filePath); err != nil { - return nil, err + return 0, err } file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { - return nil, errFileNotFound + return 0, errFileNotFound } else if os.IsPermission(err) { - return nil, errFileAccessDenied + return 0, errFileAccessDenied } else if strings.Contains(err.Error(), "not a directory") { - return nil, errFileNotFound + return 0, errFileNotFound } - return nil, err + return 0, err } st, err := file.Stat() if err != nil { - return nil, err + return 0, err } // Verify if its not a regular file, since subsequent Seek is undefined. if !st.Mode().IsRegular() { - return nil, errFileNotFound + return 0, errFileNotFound } // Seek to requested offset. _, err = file.Seek(offset, os.SEEK_SET) if err != nil { - return nil, err + return 0, err } - return file, nil + + // Close the reader. + defer file.Close() + + // Read file. + m, err := io.ReadFull(file, buf) + + // Error unexpected is valid, set this back to nil. + if err == io.ErrUnexpectedEOF { + err = nil + } + + // Success. + return int64(m), err } -// CreateFile - create a file at path. -func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { +// AppendFile - append a byte array at path, if file doesn't exist at +// path this call explicitly creates it. +func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) { + nsMutex.Lock(volume, path) + defer nsMutex.Unlock(volume, path) + volumeDir, err := s.getVolDir(volume) if err != nil { - return nil, err + return 0, err } // Stat a volume entry. _, err = os.Stat(volumeDir) if err != nil { if os.IsNotExist(err) { - return nil, errVolumeNotFound + return 0, errVolumeNotFound } - return nil, err + return 0, err } if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { - return nil, err + return 0, err } filePath := pathJoin(volumeDir, path) if err = checkPathLength(filePath); err != nil { - return nil, err + return 0, err } // Verify if the file already exists and is not of regular type. var st os.FileInfo if st, err = os.Stat(filePath); err == nil { if st.IsDir() { - return nil, errIsNotRegular + return 0, errIsNotRegular } } - w, err := safe.CreateFile(filePath) + // Create top level directories if they don't exist. + if err = os.MkdirAll(filepath.Dir(filePath), 0700); err != nil { + return 0, err + } + w, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) if err != nil { // File path cannot be verified since one of the parents is a file. if strings.Contains(err.Error(), "not a directory") { - return nil, errFileAccessDenied + return 0, errFileAccessDenied } - return nil, err + return 0, err } - return w, nil + // Close upon return. + defer w.Close() + + // Return io.Copy + return io.Copy(w, bytes.NewReader(buf)) } // StatFile - get file info. -func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) { +func (s posix) StatFile(volume, path string) (file FileInfo, err error) { + nsMutex.RLock(volume, path) + defer nsMutex.RUnlock(volume, path) + volumeDir, err := s.getVolDir(volume) if err != nil { return FileInfo{}, err @@ -447,7 +483,10 @@ func deleteFile(basePath, deletePath string) error { } // DeleteFile - delete a file at path. -func (s fsStorage) DeleteFile(volume, path string) error { +func (s posix) DeleteFile(volume, path string) error { + nsMutex.Lock(volume, path) + defer nsMutex.Unlock(volume, path) + volumeDir, err := s.getVolDir(volume) if err != nil { return err @@ -472,8 +511,14 @@ func (s fsStorage) DeleteFile(volume, path string) error { return deleteFile(volumeDir, filePath) } -// RenameFile - rename file. -func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { +// RenameFile - rename source path to destination path atomically. +func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { + nsMutex.Lock(srcVolume, srcPath) + defer nsMutex.Unlock(srcVolume, srcPath) + + nsMutex.Lock(dstVolume, dstPath) + defer nsMutex.Unlock(dstVolume, dstPath) + srcVolumeDir, err := s.getVolDir(srcVolume) if err != nil { return err diff --git a/rpc-client.go b/rpc-client.go index 7374ccf04..a2055b4c6 100644 --- a/rpc-client.go +++ b/rpc-client.go @@ -17,14 +17,8 @@ package main import ( - "errors" - "fmt" - "io" "net/http" "net/rpc" - "net/url" - urlpath "path" - "strconv" "strings" "time" ) @@ -151,34 +145,15 @@ func (n networkStorage) DeleteVol(volume string) error { // File operations. // CreateFile - create file. -func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { - writeURL := new(url.URL) - writeURL.Scheme = n.netScheme - writeURL.Host = n.netAddr - writeURL.Path = fmt.Sprintf("%s/upload/%s", storageRPCPath, urlpath.Join(volume, path)) - - contentType := "application/octet-stream" - readCloser, writeCloser := io.Pipe() - go func() { - resp, err := n.httpClient.Post(writeURL.String(), contentType, readCloser) - if err != nil { - readCloser.CloseWithError(err) - return - } - if resp != nil { - if resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { - readCloser.CloseWithError(errFileNotFound) - return - } - readCloser.CloseWithError(errors.New("Invalid response.")) - return - } - // Close the reader. - readCloser.Close() - } - }() - return writeCloser, nil +func (n networkStorage) AppendFile(volume, path string, buffer []byte) (m int64, err error) { + if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{ + Vol: volume, + Path: path, + Buffer: buffer, + }, &m); err != nil { + return 0, toStorageErr(err) + } + return m, nil } // StatFile - get latest Stat information for a file at path. @@ -193,27 +168,16 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er } // ReadFile - reads a file. -func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { - readURL := new(url.URL) - readURL.Scheme = n.netScheme - readURL.Host = n.netAddr - readURL.Path = fmt.Sprintf("%s/download/%s", storageRPCPath, urlpath.Join(volume, path)) - readQuery := make(url.Values) - readQuery.Set("offset", strconv.FormatInt(offset, 10)) - readURL.RawQuery = readQuery.Encode() - resp, err := n.httpClient.Get(readURL.String()) - if err != nil { - return nil, err +func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { + if err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{ + Vol: volume, + Path: path, + Offset: offset, + Buffer: buffer, + }, &m); err != nil { + return 0, toStorageErr(err) } - if resp != nil { - if resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { - return nil, errFileNotFound - } - return nil, errors.New("Invalid response") - } - } - return resp.Body, nil + return m, nil } // ListDir - list all entries at prefix. diff --git a/rpc-server-datatypes.go b/rpc-server-datatypes.go index 7202e9668..478e0a8f4 100644 --- a/rpc-server-datatypes.go +++ b/rpc-server-datatypes.go @@ -27,25 +27,40 @@ type ListVolsReply struct { Vols []VolInfo } -// StatFileArgs stat file args. +// ReadFileArgs contains read file arguments. +type ReadFileArgs struct { + Vol string + Path string + Offset int64 + Buffer []byte +} + +// AppendFileArgs contains append file arguments. +type AppendFileArgs struct { + Vol string + Path string + Buffer []byte +} + +// StatFileArgs contains stat file arguments. type StatFileArgs struct { Vol string Path string } -// DeleteFileArgs delete file args. +// DeleteFileArgs contains delete file arguments. type DeleteFileArgs struct { Vol string Path string } -// ListDirArgs list dir args. +// ListDirArgs contains list dir arguments. type ListDirArgs struct { Vol string Path string } -// RenameFileArgs rename file args. +// RenameFileArgs contains rename file arguments. type RenameFileArgs struct { SrcVol string SrcPath string diff --git a/rpc-server.go b/rpc-server.go index 1ebe2bfe9..a3ba64e1f 100644 --- a/rpc-server.go +++ b/rpc-server.go @@ -1,10 +1,7 @@ package main import ( - "io" - "net/http" "net/rpc" - "strconv" router "github.com/gorilla/mux" ) @@ -78,6 +75,26 @@ func (s *storageServer) ListDirHandler(arg *ListDirArgs, reply *[]string) error return nil } +// ReadFileHandler - read file handler is rpc wrapper to read file. +func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *int64) error { + n, err := s.storage.ReadFile(arg.Vol, arg.Path, arg.Offset, arg.Buffer) + if err != nil { + return err + } + reply = &n + return nil +} + +// AppendFileHandler - append file handler is rpc wrapper to append file. +func (s *storageServer) AppendFileHandler(arg *AppendFileArgs, reply *int64) error { + n, err := s.storage.AppendFile(arg.Vol, arg.Path, arg.Buffer) + if err != nil { + return err + } + reply = &n + return nil +} + // DeleteFileHandler - delete file handler is rpc wrapper to delete file. func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error { err := s.storage.DeleteFile(arg.Vol, arg.Path) @@ -115,60 +132,4 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) { storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() // Add minio storage routes. storageRouter.Path("/storage").Handler(storageRPCServer) - // StreamUpload - stream upload handler. - storageRouter.Methods("POST").Path("/storage/upload/{volume}/{path:.+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - vars := router.Vars(r) - volume := vars["volume"] - path := vars["path"] - writeCloser, err := stServer.storage.CreateFile(volume, path) - if err != nil { - httpErr := http.StatusInternalServerError - if err == errVolumeNotFound { - httpErr = http.StatusNotFound - } else if err == errIsNotRegular { - httpErr = http.StatusConflict - } - http.Error(w, err.Error(), httpErr) - return - } - reader := r.Body - if _, err = io.Copy(writeCloser, reader); err != nil { - safeCloseAndRemove(writeCloser) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - writeCloser.Close() - reader.Close() - }) - // StreamDownloadHandler - stream download handler. - storageRouter.Methods("GET").Path("/storage/download/{volume}/{path:.+}").Queries("offset", "{offset:.*}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - vars := router.Vars(r) - volume := vars["volume"] - path := vars["path"] - offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - readCloser, err := stServer.storage.ReadFile(volume, path, offset) - if err != nil { - httpErr := http.StatusBadRequest - if err == errVolumeNotFound { - httpErr = http.StatusNotFound - } else if err == errFileNotFound { - httpErr = http.StatusNotFound - } - http.Error(w, err.Error(), httpErr) - return - } - - // Copy reader to writer. - io.Copy(w, readCloser) - - // Flush out any remaining buffers to client. - w.(http.Flusher).Flush() - - // Close the reader. - readCloser.Close() - }) } diff --git a/server_test.go b/server_test.go index e7c6fba3b..dbfc8ad46 100644 --- a/server_test.go +++ b/server_test.go @@ -444,7 +444,7 @@ func (s *MyAPISuite) TestBucket(c *C) { c.Assert(response.StatusCode, Equals, http.StatusOK) } -func (s *MyAPISuite) TestObject(c *C) { +func (s *MyAPISuite) TestObjectGet(c *C) { buffer := bytes.NewReader([]byte("hello world")) request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/testobject", 0, nil) c.Assert(err, IsNil) diff --git a/storage-errors.go b/storage-errors.go index 95e1fba44..5d9f9c42a 100644 --- a/storage-errors.go +++ b/storage-errors.go @@ -18,6 +18,9 @@ package main import "errors" +// errUnexpected - unexpected error, requires manual intervention. +var errUnexpected = errors.New("Unexpected error, please report this issue at https://github.com/minio/minio/issues") + // errCorruptedFormat - corrupted backend format. var errCorruptedFormat = errors.New("corrupted backend format") diff --git a/storage-api-interface.go b/storage-interface.go similarity index 85% rename from storage-api-interface.go rename to storage-interface.go index c27c798c4..9eefc9a42 100644 --- a/storage-api-interface.go +++ b/storage-interface.go @@ -16,8 +16,6 @@ package main -import "io" - // StorageAPI interface. type StorageAPI interface { // Volume operations. @@ -28,9 +26,9 @@ type StorageAPI interface { // File operations. ListDir(volume, dirPath string) ([]string, error) - ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) - CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error) + ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) + AppendFile(volume string, path string, buf []byte) (n int64, err error) + RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error StatFile(volume string, path string) (file FileInfo, err error) DeleteFile(volume string, path string) (err error) - RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error } diff --git a/web-handlers.go b/web-handlers.go index ffd32bb9f..3249dcb0d 100644 --- a/web-handlers.go +++ b/web-handlers.go @@ -18,7 +18,6 @@ package main import ( "fmt" - "io" "net/http" "os" "path" @@ -383,12 +382,14 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { // Add content disposition. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filepath.Base(object))) - objReader, err := web.ObjectAPI.GetObject(bucket, object, 0) + objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { writeWebErrorResponse(w, err) return } - if _, err := io.Copy(w, objReader); err != nil { + offset := int64(0) + err = web.ObjectAPI.GetObject(bucket, object, offset, objInfo.Size, w) + if err != nil { /// No need to print error, response writer already written to. return } diff --git a/xl-v1-healing.go b/xl-v1-healing.go index a627baad9..76f664921 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "path" "sync" ) @@ -41,19 +42,20 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro go func(index int, disk StorageAPI) { defer wg.Done() offset := int64(0) - metadataReader, err := disk.ReadFile(bucket, xlMetaPath, offset) + var buffer = make([]byte, blockSize) + n, err := disk.ReadFile(bucket, xlMetaPath, offset, buffer) if err != nil { errs[index] = err return } - defer metadataReader.Close() - - _, err = metadataArray[index].ReadFrom(metadataReader) + err = json.Unmarshal(buffer[:n], &metadataArray[index]) if err != nil { // Unable to parse xl.json, set error. errs[index] = err return } + buffer = nil + errs[index] = nil }(index, disk) } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 06f82ef10..b89bb79d8 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -17,9 +17,7 @@ package main import ( - "bytes" "encoding/json" - "io" "math/rand" "path" "sort" @@ -72,28 +70,6 @@ type xlMetaV1 struct { Parts []objectPartInfo `json:"parts,omitempty"` } -// ReadFrom - read from implements io.ReaderFrom interface for -// unmarshalling xlMetaV1. -func (m *xlMetaV1) ReadFrom(reader io.Reader) (n int64, err error) { - var buffer bytes.Buffer - n, err = buffer.ReadFrom(reader) - if err != nil { - return 0, err - } - err = json.Unmarshal(buffer.Bytes(), m) - return n, err -} - -// WriteTo - write to implements io.WriterTo interface for marshalling xlMetaV1. -func (m xlMetaV1) WriteTo(writer io.Writer) (n int64, err error) { - metadataBytes, err := json.Marshal(&m) - if err != nil { - return 0, err - } - p, err := writer.Write(metadataBytes) - return int64(p), err -} - // byPartName is a collection satisfying sort.Interface. type byPartNumber []objectPartInfo @@ -164,14 +140,16 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err // Count for errors encountered. var xlJSONErrCount = 0 + // Allocate 4MB buffer. + var buffer = make([]byte, blockSize) + // Return the first successful lookup from a random list of disks. for xlJSONErrCount < len(xl.storageDisks) { - var r io.ReadCloser disk := xl.getRandomDisk() // Choose a random disk on each attempt. - r, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0)) + var n int64 + n, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0), buffer) if err == nil { - defer r.Close() - _, err = xlMeta.ReadFrom(r) + err = json.Unmarshal(buffer[:n], &xlMeta) if err == nil { return xlMeta, nil } @@ -195,11 +173,45 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { return xlMeta } +func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error { + var wg = &sync.WaitGroup{} + var mErrs = make([]error, len(xl.storageDisks)) + + srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile) + dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile) + // Rename `xl.json` to all disks in parallel. + for index, disk := range xl.storageDisks { + wg.Add(1) + // Rename `xl.json` in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) + if rErr != nil { + mErrs[index] = rErr + return + } + mErrs[index] = nil + }(index, disk) + } + // Wait for all the routines. + wg.Wait() + + // Return the first error. + for _, err := range mErrs { + if err == nil { + continue + } + return err + } + return nil +} + // writeXLMetadata - write `xl.json` on all disks in order. func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(xl.storageDisks)) + jsonFile := path.Join(prefix, xlMetaJSONFile) // Start writing `xl.json` to all disks in parallel. for index, disk := range xl.storageDisks { wg.Add(1) @@ -207,33 +219,21 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro go func(index int, disk StorageAPI, metadata xlMetaV1) { defer wg.Done() - metaJSONFile := path.Join(prefix, xlMetaJSONFile) - metaWriter, mErr := disk.CreateFile(bucket, metaJSONFile) - if mErr != nil { - mErrs[index] = mErr - return - } - // Save the disk order index. metadata.Erasure.Index = index + 1 - // Marshal metadata to the writer. - _, mErr = metadata.WriteTo(metaWriter) + metadataBytes, err := json.Marshal(&metadata) + if err != nil { + mErrs[index] = err + return + } + n, mErr := disk.AppendFile(bucket, jsonFile, metadataBytes) if mErr != nil { - if mErr = safeCloseAndRemove(metaWriter); mErr != nil { - mErrs[index] = mErr - return - } mErrs[index] = mErr return } - // Verify if close fails with an error. - if mErr = metaWriter.Close(); mErr != nil { - if mErr = safeCloseAndRemove(metaWriter); mErr != nil { - mErrs[index] = mErr - return - } - mErrs[index] = mErr + if n != int64(len(metadataBytes)) { + mErrs[index] = errUnexpected return } mErrs[index] = nil diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index f09e187e7..6b725f03e 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -17,9 +17,7 @@ package main import ( - "bytes" "encoding/json" - "io" "path" "sort" "strings" @@ -70,27 +68,6 @@ func (u uploadsV1) Index(uploadID string) int { return -1 } -// ReadFrom - read from implements io.ReaderFrom interface for unmarshalling uploads. -func (u *uploadsV1) ReadFrom(reader io.Reader) (n int64, err error) { - var buffer bytes.Buffer - n, err = buffer.ReadFrom(reader) - if err != nil { - return 0, err - } - err = json.Unmarshal(buffer.Bytes(), &u) - return n, err -} - -// WriteTo - write to implements io.WriterTo interface for marshalling uploads. -func (u uploadsV1) WriteTo(writer io.Writer) (n int64, err error) { - metadataBytes, err := json.Marshal(&u) - if err != nil { - return 0, err - } - m, err := writer.Write(metadataBytes) - return int64(m), err -} - // readUploadsJSON - get all the saved uploads JSON. func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) { uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) @@ -104,17 +81,18 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI // Read `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() - r, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0)) + var buffer = make([]byte, blockSize) + n, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0), buffer) if rErr != nil { errs[index] = rErr return } - defer r.Close() - _, rErr = uploads[index].ReadFrom(r) + rErr = json.Unmarshal(buffer[:n], &uploads[index]) if rErr != nil { errs[index] = rErr return } + buffer = nil errs[index] = nil }(index, disk) } @@ -136,6 +114,7 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI // uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) + tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} @@ -145,21 +124,21 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk // Update `uploads.json` in routine. go func(index int, disk StorageAPI) { defer wg.Done() - w, wErr := disk.CreateFile(minioMetaBucket, uploadsPath) + uploadsBytes, wErr := json.Marshal(uploadsJSON) if wErr != nil { errs[index] = wErr return } - _, wErr = uploadsJSON.WriteTo(w) + n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes) if wErr != nil { errs[index] = wErr return } - if wErr = w.Close(); wErr != nil { - if clErr := safeCloseAndRemove(w); clErr != nil { - errs[index] = clErr - return - } + if n != int64(len(uploadsBytes)) { + errs[index] = errUnexpected + return + } + if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil { errs[index] = wErr return } @@ -219,22 +198,18 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora // Update `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() - w, wErr := disk.CreateFile(minioMetaBucket, tmpUploadsPath) + uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON) if wErr != nil { errs[index] = wErr return } - _, wErr = uploadsJSON.WriteTo(w) + n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes) if wErr != nil { errs[index] = wErr return } - if wErr = w.Close(); wErr != nil { - if clErr := safeCloseAndRemove(w); clErr != nil { - errs[index] = clErr - return - } - errs[index] = wErr + if n != int64(len(uploadsJSONBytes)) { + errs[index] = errUnexpected return } wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath) diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index d194b9da9..c5d998775 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -21,7 +21,6 @@ import ( "encoding/hex" "fmt" "io" - "io/ioutil" "path" "path/filepath" "strings" @@ -143,62 +142,37 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s partSuffix := fmt.Sprintf("object%d", partID) tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) - fileWriter, err := erasure.CreateFile(minioMetaBucket, tmpPartPath) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, tmpPartPath) - } // Initialize md5 writer. md5Writer := md5.New() - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } + buf := make([]byte, blockSize) + for { + var n int + n, err = io.ReadFull(data, buf) + if err == io.EOF { + break + } + if err != nil && err != io.ErrUnexpectedEOF { return "", toObjectErr(err, bucket, object) } - // Reader shouldn't have more data what mentioned in size argument. - // reading one more byte from the reader to validate it. - // expected to fail, success validates existence of more data in the reader. - if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", UnExpectedDataSize{Size: int(size)} + // Update md5 writer. + md5Writer.Write(buf[:n]) + var m int64 + m, err = erasure.AppendFile(minioMetaBucket, tmpPartPath, buf[:n]) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, tmpPartPath) } - } else { - var n int64 - if n, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", toObjectErr(err, bucket, object) + if m != int64(len(buf[:n])) { + return "", toObjectErr(errUnexpected, bucket, object) } - size = n } - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } return "", BadDigest{md5Hex, newMD5Hex} } } - err = fileWriter.Close() - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", err - } - partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) if err != nil { @@ -209,9 +183,17 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s xlMeta.Stat.Version = higherVersion xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) - if err = xl.writeXLMetadata(minioMetaBucket, uploadIDPath, xlMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } + rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) + if rErr != nil { + return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) + } + + // Return success. return newMD5Hex, nil } @@ -389,8 +371,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Save successfully calculated md5sum. xlMeta.Meta["md5Sum"] = s3MD5 - if err = xl.writeXLMetadata(minioMetaBucket, uploadIDPath, xlMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) + } + rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) + if rErr != nil { + return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } // Hold write lock on the destination before rename diff --git a/xl-v1-object.go b/xl-v1-object.go index d6421f072..aa9ae4103 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "crypto/md5" "encoding/hex" "io" @@ -13,23 +14,17 @@ import ( "github.com/minio/minio/pkg/mimedb" ) -// nullReadCloser - returns 0 bytes and io.EOF upon first read attempt. -type nullReadCloser struct{} - -func (n nullReadCloser) Read([]byte) (int, error) { return 0, io.EOF } -func (n nullReadCloser) Close() error { return nil } - /// Object Operations // GetObject - get an object. -func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) { +func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return nil, BucketNameInvalid{Bucket: bucket} + return BucketNameInvalid{Bucket: bucket} } // Verify if object is valid. if !IsValidObjectName(object) { - return nil, ObjectNameInvalid{Bucket: bucket, Object: object} + return ObjectNameInvalid{Bucket: bucket, Object: object} } // Lock the object before reading. @@ -39,18 +34,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read // Read metadata associated with the object. xlMeta, err := xl.readXLMetadata(bucket, object) if err != nil { - return nil, toObjectErr(err, bucket, object) + return toObjectErr(err, bucket, object) } // List all online disks. onlineDisks, _, err := xl.listOnlineDisks(bucket, object) if err != nil { - return nil, toObjectErr(err, bucket, object) - } - - // For zero byte files, return a null reader. - if xlMeta.Stat.Size == 0 { - return nullReadCloser{}, nil + return toObjectErr(err, bucket, object) } // Initialize a new erasure with online disks, with previous block distribution. @@ -59,44 +49,36 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read // Get part index offset. partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset) if err != nil { - return nil, toObjectErr(err, bucket, object) + return toObjectErr(err, bucket, object) } - - fileReader, fileWriter := io.Pipe() - - // Hold a read lock once more which can be released after the following go-routine ends. - // We hold RLock once more because the current function would return before the go routine below - // executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call). - nsMutex.RLock(bucket, object) - go func() { - defer nsMutex.RUnlock(bucket, object) - for ; partIndex < len(xlMeta.Parts); partIndex++ { - part := xlMeta.Parts[partIndex] - r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) + totalLeft := length + for ; partIndex < len(xlMeta.Parts); partIndex++ { + part := xlMeta.Parts[partIndex] + totalPartSize := part.Size + for totalPartSize > 0 { + var buffer io.Reader + buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) if err != nil { - fileWriter.CloseWithError(toObjectErr(err, bucket, object)) - return + return err } - // Reset part offset to 0 to read rest of the parts from - // the beginning. - partOffset = 0 - if _, err = io.Copy(fileWriter, r); err != nil { - switch reader := r.(type) { - case *io.PipeReader: - reader.CloseWithError(err) - case io.ReadCloser: - reader.Close() + if int64(buffer.(*bytes.Buffer).Len()) > totalLeft { + if _, err := io.CopyN(writer, buffer, totalLeft); err != nil { + return err } - fileWriter.CloseWithError(toObjectErr(err, bucket, object)) - return + return nil } - // Close the readerCloser that reads multiparts of an object. - // Not closing leaks underlying file descriptors. - r.Close() + n, err := io.Copy(writer, buffer) + if err != nil { + return err + } + totalLeft -= n + totalPartSize -= n + partOffset += n } - fileWriter.Close() - }() - return fileReader, nil + // Reset part offset to 0 to read rest of the parts from the beginning. + partOffset = 0 + } + return nil } // GetObjectInfo - get object info. @@ -240,31 +222,29 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Initialize a new erasure with online disks and new distribution. erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) - fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj) - if err != nil { - return "", toObjectErr(err, bucket, object) - } // Initialize md5 writer. md5Writer := md5.New() - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } + buf := make([]byte, blockSize) + for { + var n int + n, err = io.ReadFull(data, buf) + if err == io.EOF { + break + } + if err != nil && err != io.ErrUnexpectedEOF { return "", toObjectErr(err, bucket, object) } - } else { - if _, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", toObjectErr(err, bucket, object) + // Update md5 writer. + md5Writer.Write(buf[:n]) + var m int64 + m, err = erasure.AppendFile(minioMetaBucket, tempErasureObj, buf[:n]) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, tempErasureObj) + } + if m != int64(len(buf[:n])) { + return "", toObjectErr(errUnexpected, bucket, object) } } @@ -292,21 +272,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. md5Hex := metadata["md5Sum"] if md5Hex != "" { if newMD5Hex != md5Hex { - if err = safeCloseAndRemove(fileWriter); err != nil { - return "", toObjectErr(err, bucket, object) - } return "", BadDigest{md5Hex, newMD5Hex} } } - err = fileWriter.Close() - if err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", toObjectErr(clErr, bucket, object) - } - return "", toObjectErr(err, bucket, object) - } - // Check if an object is present as one of the parent dir. if xl.parentDirIsObject(bucket, path.Dir(object)) { return "", toObjectErr(errFileAccessDenied, bucket, object) @@ -329,10 +298,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. xlMeta.Stat.ModTime = modTime xlMeta.Stat.Version = higherVersion xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size) - if err = xl.writeXLMetadata(bucket, object, xlMeta); err != nil { + if err = xl.writeXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), xlMeta); err != nil { return "", toObjectErr(err, bucket, object) } - + rErr := xl.renameXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), bucket, object) + if rErr != nil { + return "", toObjectErr(rErr, bucket, object) + } // Return md5sum, successfully wrote object. return newMD5Hex, nil }