XL: bring in new storage API. (#1780)

Fixes #1771
This commit is contained in:
Harshavardhana 2016-05-28 15:13:15 -07:00 committed by Harshavardhana
parent c87f259820
commit feb337098d
27 changed files with 634 additions and 993 deletions

66
erasure-appendfile.go Normal file
View File

@ -0,0 +1,66 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "sync"
// AppendFile - append data buffer at path.
func (e erasure) AppendFile(volume, path string, dataBuffer []byte) (n int64, err error) {
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = e.ReedSolomon.Split(dataBuffer)
if err != nil {
return 0, err
}
// Encode parity blocks using data blocks.
err = e.ReedSolomon.Encode(blocks)
if err != nil {
return 0, err
}
var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(e.storageDisks))
// Write encoded data to quorum disks in parallel.
for index, disk := range e.storageDisks {
if disk == nil {
continue
}
wg.Add(1)
// Write encoded data in routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Pick the block from the distribution.
blockIndex := e.distribution[index] - 1
n, wErr := disk.AppendFile(volume, path, blocks[blockIndex])
if wErr != nil {
wErrs[index] = wErr
return
}
if n != int64(len(blocks[blockIndex])) {
wErrs[index] = errUnexpected
return
}
wErrs[index] = nil
}(index, disk)
}
// Wait for all the appends to finish.
wg.Wait()
return int64(len(dataBuffer)), nil
}

View File

@ -1,186 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"io"
"sync"
)
// cleanupCreateFileOps - cleans up all the temporary files and other
// temporary data upon any failure.
func (e erasure) cleanupCreateFileOps(volume, path string, writers []io.WriteCloser) {
// Close and remove temporary writers.
for _, writer := range writers {
if err := safeCloseAndRemove(writer); err != nil {
errorIf(err, "Failed to close writer.")
}
}
}
// WriteErasure reads predefined blocks, encodes them and writes to configured storage disks.
func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) {
// Release the block writer upon function return.
defer wcloser.release()
writers := make([]io.WriteCloser, len(e.storageDisks))
var wwg = &sync.WaitGroup{}
var errs = make([]error, len(e.storageDisks))
// Initialize all writers.
for index, disk := range e.storageDisks {
if disk == nil {
continue
}
wwg.Add(1)
go func(index int, disk StorageAPI) {
defer wwg.Done()
writer, err := disk.CreateFile(volume, path)
if err != nil {
errs[index] = err
return
}
writers[index] = writer
}(index, disk)
}
wwg.Wait() // Wait for all the create file to finish in parallel.
for _, err := range errs {
if err == nil {
continue
}
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
}
// Allocate 4MiB block size buffer for reading.
dataBuffer := make([]byte, erasureBlockSize)
for {
// Read up to allocated block size.
n, err := io.ReadFull(reader, dataBuffer)
if err != nil {
// Any unexpected errors, close the pipe reader with error.
if err != io.ErrUnexpectedEOF && err != io.EOF {
// Remove all temp writers.
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
}
}
// At EOF break out.
if err == io.EOF {
break
}
if n > 0 {
// Split the input buffer into data and parity blocks.
var dataBlocks [][]byte
dataBlocks, err = e.ReedSolomon.Split(dataBuffer[0:n])
if err != nil {
// Remove all temp writers.
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
}
// Encode parity blocks using data blocks.
err = e.ReedSolomon.Encode(dataBlocks)
if err != nil {
// Remove all temp writers upon error.
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
}
var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(writers))
// Write encoded data to quorum disks in parallel.
for index, writer := range writers {
if writer == nil {
continue
}
wg.Add(1)
// Write encoded data in routine.
go func(index int, writer io.Writer) {
defer wg.Done()
// Pick the block from the distribution.
encodedData := dataBlocks[e.distribution[index]-1]
_, wErr := writers[index].Write(encodedData)
if wErr != nil {
wErrs[index] = wErr
return
}
wErrs[index] = nil
}(index, writer)
}
wg.Wait()
// Cleanup and return on first non-nil error.
for _, wErr := range wErrs {
if wErr == nil {
continue
}
// Remove all temp writers upon error.
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(wErr)
return
}
}
}
// Close all writers and metadata writers in routines.
for _, writer := range writers {
if writer == nil {
continue
}
// Safely wrote, now rename to its actual location.
if err := writer.Close(); err != nil {
// Remove all temp writers upon error.
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
}
}
// Close the pipe reader and return.
reader.Close()
return
}
// CreateFile - create a file.
func (e erasure) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
// Input validation.
if !isValidVolname(volume) {
return nil, errInvalidArgument
}
if !isValidPath(path) {
return nil, errInvalidArgument
}
// Initialize pipe for data pipe line.
pipeReader, pipeWriter := io.Pipe()
// Initialize a new wait closer, implements both Write and Close.
wcloser := newWaitCloser(pipeWriter)
// Start erasure encoding in routine, reading data block by block from pipeReader.
go e.writeErasure(volume, path, pipeReader, wcloser)
// Return the writer, caller should start writing to this.
return wcloser, nil
}

View File

@ -17,174 +17,113 @@
package main
import (
"bytes"
"errors"
"io"
"sync"
)
// ReadFile - decoded erasure coded file.
func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.ReadCloser, error) {
// Input validation.
if !isValidVolname(volume) {
return nil, errInvalidArgument
}
if !isValidPath(path) {
return nil, errInvalidArgument
}
var rwg = &sync.WaitGroup{}
var errs = make([]error, len(e.storageDisks))
readers := make([]io.ReadCloser, len(e.storageDisks))
for index, disk := range e.storageDisks {
if disk == nil {
continue
func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int64) (io.Reader, error) {
var totalLeft = totalSize
var bufWriter = new(bytes.Buffer)
for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before.
var curBlockSize int64
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
} else {
curBlockSize = totalLeft
}
rwg.Add(1)
go func(index int, disk StorageAPI) {
defer rwg.Done()
offset := int64(0)
reader, err := disk.ReadFile(volume, path, offset)
if err == nil {
readers[index] = reader
return
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks)
// Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(e.storageDisks))
// Counter to keep success data blocks.
var successDataBlocksCount = 0
var noReconstruct bool // Set for no reconstruction.
// Read from all the disks.
for index, disk := range e.storageDisks {
blockIndex := e.distribution[index] - 1
// Initialize shard slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, curEncBlockSize)
if disk == nil {
enBlocks[blockIndex] = nil
} else {
var offset = int64(0)
// Read the necessary blocks.
_, err := disk.ReadFile(volume, path, offset, enBlocks[blockIndex])
if err != nil {
enBlocks[blockIndex] = nil
}
}
errs[index] = err
}(index, disk)
}
// Verify if we have successfully read all the data blocks.
if blockIndex < e.DataBlocks && enBlocks[blockIndex] != nil {
successDataBlocksCount++
// Set when we have all the data blocks and no
// reconstruction is needed, so that we can avoid
// erasure reconstruction.
noReconstruct = successDataBlocksCount == e.DataBlocks
if noReconstruct {
// Break out we have read all the data blocks.
break
}
}
}
// Wait for all readers.
rwg.Wait()
// Check blocks if they are all zero in length, we have corruption return error.
if checkBlockSize(enBlocks) == 0 {
return nil, errDataCorrupt
}
// For any errors in reader, we should just error out.
for _, err := range errs {
// Verify if reconstruction is needed, proceed with reconstruction.
if !noReconstruct {
err := e.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
return nil, err
}
// Verify reconstructed blocks (parity).
ok, err := e.ReedSolomon.Verify(enBlocks)
if err != nil {
return nil, err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return nil, err
}
}
// Get data blocks from encoded blocks.
dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize))
// Verify if the offset is right for the block, if not move to
// the next block.
if startOffset > 0 {
startOffset = startOffset - int64(len(dataBlocks))
// Start offset is greater than or equal to zero, skip the dataBlocks.
if startOffset >= 0 {
totalLeft = totalLeft - erasureBlockSize
continue
}
// Now get back the remaining offset if startOffset is negative.
startOffset = startOffset + int64(len(dataBlocks))
}
// Copy data blocks.
_, err := bufWriter.Write(dataBlocks[startOffset:])
if err != nil {
return nil, err
}
// Reset dataBlocks to relenquish memory.
dataBlocks = nil
// Save what's left after reading erasureBlockSize.
totalLeft = totalLeft - erasureBlockSize
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
go func() {
var totalLeft = totalSize
// Read until EOF.
for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before.
var curBlockSize int64
if erasureBlockSize < totalLeft {
curBlockSize = erasureBlockSize
} else {
curBlockSize = totalLeft
}
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(curBlockSize, e.DataBlocks)
// Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(e.storageDisks))
// Counter to keep success data blocks.
var successDataBlocksCount = 0
var noReconstruct bool // Set for no reconstruction.
// Read all the readers.
for index, reader := range readers {
blockIndex := e.distribution[index] - 1
// Initialize shard slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, curEncBlockSize)
if reader == nil {
enBlocks[blockIndex] = nil
continue
}
// Close the reader when routine returns.
defer reader.Close()
// Read the necessary blocks.
_, rErr := io.ReadFull(reader, enBlocks[blockIndex])
if rErr != nil && rErr != io.ErrUnexpectedEOF {
enBlocks[blockIndex] = nil
}
// Verify if we have successfully all the data blocks.
if blockIndex < e.DataBlocks {
successDataBlocksCount++
// Set when we have all the data blocks and no
// reconstruction is needed, so that we can avoid
// erasure reconstruction.
noReconstruct = successDataBlocksCount == e.DataBlocks
if noReconstruct {
// Break out we have read all the data blocks.
break
}
}
}
// Check blocks if they are all zero in length, we have
// corruption return error.
if checkBlockSize(enBlocks) == 0 {
pipeWriter.CloseWithError(errDataCorrupt)
return
}
// Verify if reconstruction is needed, proceed with reconstruction.
if !noReconstruct {
err := e.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
pipeWriter.CloseWithError(err)
return
}
// Verify reconstructed blocks (parity).
ok, err := e.ReedSolomon.Verify(enBlocks)
if err != nil {
pipeWriter.CloseWithError(err)
return
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
pipeWriter.CloseWithError(err)
return
}
}
// Get data blocks from encoded blocks.
dataBlocks := getDataBlocks(enBlocks, e.DataBlocks, int(curBlockSize))
// Verify if the offset is right for the block, if not move to the next block.
if startOffset > 0 {
startOffset = startOffset - int64(len(dataBlocks))
// Start offset is greater than or equal to zero, skip the dataBlocks.
if startOffset >= 0 {
totalLeft = totalLeft - erasureBlockSize
continue
}
// Now get back the remaining offset if startOffset is negative.
startOffset = startOffset + int64(len(dataBlocks))
}
// Write safely the necessary blocks to the pipe.
_, err := pipeWriter.Write(dataBlocks[int(startOffset):])
if err != nil {
pipeWriter.CloseWithError(err)
return
}
// Reset dataBlocks to relenquish memory.
dataBlocks = nil
// Reset offset to '0' to read rest of the blocks.
startOffset = int64(0)
// Save what's left after reading erasureBlockSize.
totalLeft = totalLeft - erasureBlockSize
}
// Cleanly end the pipe after a successful decoding.
pipeWriter.Close()
}()
// Return the pipe for the top level caller to start reading.
return pipeReader, nil
return bufWriter, nil
}

View File

@ -39,7 +39,7 @@ func checkBlockSize(blocks [][]byte) int {
// calculate the blockSize based on input length and total number of
// data blocks.
func getEncodedBlockLen(inputLen int64, dataBlocks int) (curBlockSize int64) {
curBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks)
return curBlockSize
func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) {
curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks)
return curEncBlockSize
}

View File

@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"github.com/skyrings/skyring-common/tools/uuid"
@ -116,8 +115,10 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1)
// loadFormat - load format from disk.
func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
buffer := make([]byte, blockSize)
offset := int64(0)
r, err := disk.ReadFile(minioMetaBucket, formatConfigFile, offset)
var n int64
n, err = disk.ReadFile(minioMetaBucket, formatConfigFile, offset, buffer)
if err != nil {
// 'file not found' and 'volume not found' as
// same. 'volume not found' usually means its a fresh disk.
@ -136,15 +137,11 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
}
return nil, err
}
decoder := json.NewDecoder(r)
format = &formatConfigV1{}
err = decoder.Decode(&format)
err = json.Unmarshal(buffer[:n], format)
if err != nil {
return nil, err
}
if err = r.Close(); err != nil {
return nil, err
}
return format, nil
}
@ -215,7 +212,6 @@ func checkFormatXL(formatConfigs []*formatConfigV1) error {
func initFormatXL(storageDisks []StorageAPI) (err error) {
var (
jbod = make([]string, len(storageDisks))
formatWriters = make([]io.WriteCloser, len(storageDisks))
formats = make([]*formatConfigV1, len(storageDisks))
saveFormatErrCnt = 0
)
@ -230,16 +226,6 @@ func initFormatXL(storageDisks []StorageAPI) (err error) {
return errWriteQuorum
}
}
var w io.WriteCloser
w, err = disk.CreateFile(minioMetaBucket, formatConfigFile)
if err != nil {
saveFormatErrCnt++
// Check for write quorum.
if saveFormatErrCnt <= len(storageDisks)-(len(storageDisks)/2+3) {
continue
}
return err
}
var u *uuid.UUID
u, err = uuid.New()
if err != nil {
@ -250,7 +236,6 @@ func initFormatXL(storageDisks []StorageAPI) (err error) {
}
return err
}
formatWriters[index] = w
formats[index] = &formatConfigV1{
Version: "1",
Format: "xl",
@ -261,24 +246,19 @@ func initFormatXL(storageDisks []StorageAPI) (err error) {
}
jbod[index] = formats[index].XL.Disk
}
for index, w := range formatWriters {
if formats[index] == nil {
continue
}
for index, disk := range storageDisks {
formats[index].XL.JBOD = jbod
encoder := json.NewEncoder(w)
err = encoder.Encode(&formats[index])
formatBytes, err := json.Marshal(formats[index])
if err != nil {
return err
}
}
for _, w := range formatWriters {
if w == nil {
continue
}
if err = w.Close(); err != nil {
n, err := disk.AppendFile(minioMetaBucket, formatConfigFile, formatBytes)
if err != nil {
return err
}
if n != int64(len(formatBytes)) {
return errUnexpected
}
}
return nil
}

View File

@ -1,9 +1,7 @@
package main
import (
"bytes"
"encoding/json"
"io"
"path"
"sort"
)
@ -22,28 +20,6 @@ type fsMetaV1 struct {
Parts []objectPartInfo `json:"parts,omitempty"`
}
// ReadFrom - read from implements io.ReaderFrom interface for
// unmarshalling fsMetaV1.
func (m *fsMetaV1) ReadFrom(reader io.Reader) (n int64, err error) {
var buffer bytes.Buffer
n, err = buffer.ReadFrom(reader)
if err != nil {
return 0, err
}
err = json.Unmarshal(buffer.Bytes(), m)
return n, err
}
// WriteTo - write to implements io.WriterTo interface for marshalling fsMetaV1.
func (m fsMetaV1) WriteTo(writer io.Writer) (n int64, err error) {
metadataBytes, err := json.Marshal(m)
if err != nil {
return 0, err
}
p, err := writer.Write(metadataBytes)
return int64(p), err
}
// ObjectPartIndex - returns the index of matching object part number.
func (m fsMetaV1) ObjectPartIndex(partNumber int) (partIndex int) {
for i, part := range m.Parts {
@ -81,12 +57,12 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
// readFSMetadata - returns the object metadata `fs.json` content.
func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) {
r, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0))
buffer := make([]byte, blockSize)
n, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0), buffer)
if err != nil {
return fsMetaV1{}, err
}
defer r.Close()
_, err = fsMeta.ReadFrom(r)
err = json.Unmarshal(buffer[:n], &fsMeta)
if err != nil {
return fsMetaV1{}, err
}
@ -104,22 +80,16 @@ func newFSMetaV1() (fsMeta fsMetaV1) {
// writeFSMetadata - writes `fs.json` metadata.
func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) error {
w, err := fs.storage.CreateFile(bucket, path.Join(prefix, fsMetaJSONFile))
metadataBytes, err := json.Marshal(fsMeta)
if err != nil {
return err
}
_, err = fsMeta.WriteTo(w)
n, err := fs.storage.AppendFile(bucket, path.Join(prefix, fsMetaJSONFile), metadataBytes)
if err != nil {
if mErr := safeCloseAndRemove(w); mErr != nil {
return mErr
}
return err
}
if err = w.Close(); err != nil {
if mErr := safeCloseAndRemove(w); mErr != nil {
return mErr
}
return err
if n != int64(len(metadataBytes)) {
return errUnexpected
}
return nil
}

View File

@ -21,7 +21,6 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"path"
"strconv"
"strings"
@ -302,61 +301,36 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix)
fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tmpPartPath)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
var buf = make([]byte, blockSize)
for {
n, err := io.ReadFull(data, buf)
if err == io.EOF {
break
}
if err != nil && err != io.ErrUnexpectedEOF {
return "", toObjectErr(err, bucket, object)
}
// Reader shouldn't have more data what mentioned in size argument.
// reading one more byte from the reader to validate it.
// expected to fail, success validates existence of more data in the reader.
if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", UnExpectedDataSize{Size: int(size)}
}
} else {
var n int64
if n, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
// Update md5 writer.
md5Writer.Write(buf[:n])
m, err := fs.storage.AppendFile(minioMetaBucket, tmpPartPath, buf[:n])
if err != nil {
return "", toObjectErr(err, bucket, object)
}
size = n
if m != int64(len(buf[:n])) {
return "", toObjectErr(errUnexpected, bucket, object)
}
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath)
@ -373,8 +347,17 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
}
return "", toObjectErr(err, minioMetaBucket, partPath)
}
if err = fs.writeFSMetadata(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID), fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID)
if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
err = fs.storage.RenameFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile), minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, path.Join(tempUploadIDPath, fsMetaJSONFile)); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tempUploadIDPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
return newMD5Hex, nil
}
@ -493,10 +476,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, "object1")
fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
var buffer = make([]byte, blockSize)
// Loop through all parts, validate them and then commit to disk.
for i, part := range parts {
@ -509,45 +489,30 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
if err == errFileNotFound {
return "", InvalidPart{}
}
return "", err
return "", toObjectErr(err, minioMetaBucket, multipartPartFile)
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(fi.Size) {
return "", PartTooSmall{}
}
var fileReader io.ReadCloser
fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0)
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
offset := int64(0)
totalLeft := fi.Size
for totalLeft > 0 {
var n int64
n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer)
if err != nil {
if err == errFileNotFound {
return "", InvalidPart{}
}
return "", toObjectErr(err, minioMetaBucket, multipartPartFile)
}
if err == errFileNotFound {
return "", InvalidPart{}
n, err = fs.storage.AppendFile(minioMetaBucket, tempObj, buffer[:n])
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tempObj)
}
return "", err
offset += n
totalLeft -= n
}
_, err = io.Copy(fileWriter, fileReader)
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
err = fileReader.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
}
// Rename the file back to original location, if not delete the temporary object.

View File

@ -21,6 +21,7 @@ import (
"encoding/hex"
"io"
"os"
"path"
"path/filepath"
"sort"
"strings"
@ -146,20 +147,37 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
/// Object Operations
// GetObject - get an object.
func (fs fsObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) {
func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return nil, BucketNameInvalid{Bucket: bucket}
return BucketNameInvalid{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return nil, ObjectNameInvalid{Bucket: bucket, Object: object}
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
fileReader, err := fs.storage.ReadFile(bucket, object, startOffset)
if err != nil {
return nil, toObjectErr(err, bucket, object)
var totalLeft = length
for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before.
var curBlockSize int64
if blockSize < totalLeft {
curBlockSize = blockSize
} else {
curBlockSize = totalLeft
}
buf := make([]byte, curBlockSize)
n, err := fs.storage.ReadFile(bucket, object, startOffset, buf)
if err != nil {
return toObjectErr(err, bucket, object)
}
_, err = writer.Write(buf[:n])
if err != nil {
return toObjectErr(err, bucket, object)
}
totalLeft -= n
startOffset += n
}
return fileReader, nil
return nil
}
// GetObjectInfo - get object info.
@ -194,6 +212,10 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
}, nil
}
const (
blockSize = 4 * 1024 * 1024 // 4MiB.
)
// PutObject - create an object.
func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) {
// Verify if bucket is valid.
@ -207,31 +229,38 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
}
}
fileWriter, err := fs.storage.CreateFile(bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Temporary object.
tempObj := path.Join(tmpMetaPrefix, bucket, object)
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
if size == 0 {
// For size 0 we write a 0byte file.
_, err := fs.storage.AppendFile(minioMetaBucket, tempObj, []byte(""))
if err != nil {
return "", toObjectErr(err, bucket, object)
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
// Allocate buffer.
buf := make([]byte, blockSize)
for {
n, rErr := data.Read(buf)
if rErr == io.EOF {
break
}
if rErr != nil {
return "", toObjectErr(rErr, bucket, object)
}
// Update md5 writer.
md5Writer.Write(buf[:n])
m, wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n])
if wErr != nil {
return "", toObjectErr(wErr, bucket, object)
}
if m != int64(len(buf[:n])) {
return "", toObjectErr(errUnexpected, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
}
@ -243,18 +272,13 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
}
if md5Hex != "" {
if newMD5Hex != md5Hex {
if err = safeCloseAndRemove(fileWriter); err != nil {
return "", err
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", clErr
}
return "", err
return "", toObjectErr(err, bucket, object)
}
// Return md5sum, successfully wrote object.

View File

@ -20,7 +20,6 @@ import (
"bytes"
"crypto/md5"
"encoding/hex"
"io"
"io/ioutil"
"os"
"strconv"
@ -111,7 +110,7 @@ func testGetObjectInfo(obj ObjectLayer, instanceType string, t *testing.T) {
}
}
func BenchmarkGetObject(b *testing.B) {
func BenchmarkGetObjectFS(b *testing.B) {
// Make a temporary directory to use as the obj.
directory, err := ioutil.TempDir("", "minio-benchmark-getobject")
if err != nil {
@ -146,16 +145,12 @@ func BenchmarkGetObject(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buffer = new(bytes.Buffer)
r, err := obj.GetObject("bucket", "object"+strconv.Itoa(i%10), 0)
err = obj.GetObject("bucket", "object"+strconv.Itoa(i%10), 0, int64(len([]byte(text))), buffer)
if err != nil {
b.Error(err)
}
if _, err := io.Copy(buffer, r); err != nil {
b.Error(err)
}
if buffer.Len() != len(text) {
b.Errorf("GetObject returned incorrect length %d (should be %d)\n", buffer.Len(), len(text))
}
r.Close()
}
}

View File

@ -180,11 +180,11 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t *testing
fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated "+"d41d8cd98f00b204e9800998ecf8427e")},
// Test case - 12.
// Input with size more than the size of actual data inside the reader.
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "", fmt.Errorf("%s", "EOF")},
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "", fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")},
// Test case - 13.
// Input with size less than the size of actual data inside the reader.
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") - 1), false, "",
fmt.Errorf("%s", "Contains more data than specified size of 3 bytes.")},
fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")},
// Test case - 14-17.
// Validating for success cases.
{bucket, object, uploadID, 1, "abcd", "e2fc714c4727ee9395f324cd2e7f331f", int64(len("abcd")), true, "", nil},

View File

@ -124,38 +124,22 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
return
}
// Get the object.
startOffset := hrange.start
readCloser, err := api.ObjectAPI.GetObject(bucket, object, startOffset)
if err != nil {
errorIf(err, "Unable to read object.")
apiErr := toAPIErrorCode(err)
if apiErr == ErrNoSuchKey {
apiErr = errAllowableObjectNotFound(bucket, r)
}
writeErrorResponse(w, r, apiErr, r.URL.Path)
return
}
defer readCloser.Close()
// Set standard object headers.
setObjectHeaders(w, objInfo, hrange)
// Set any additional requested response headers.
setGetRespHeaders(w, r.URL.Query())
if hrange.length > 0 {
if _, err := io.CopyN(w, readCloser, hrange.length); err != nil {
errorIf(err, "Writing to client failed.")
// Do not send error response here, since client could have died.
return
}
} else {
if _, err := io.Copy(w, readCloser); err != nil {
errorIf(err, "Writing to client failed.")
// Do not send error response here, since client could have died.
return
}
// Get the object.
startOffset := hrange.start
length := hrange.length
if length == 0 {
length = objInfo.Size - startOffset
}
if err := api.ObjectAPI.GetObject(bucket, object, startOffset, length, w); err != nil {
errorIf(err, "Writing to client failed.")
// Do not send error response here, client would have already died.
return
}
}
@ -393,14 +377,19 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return
}
startOffset := int64(0) // Read the whole file.
// Get the object.
readCloser, err := api.ObjectAPI.GetObject(sourceBucket, sourceObject, startOffset)
if err != nil {
errorIf(err, "Unable to read an object.")
writeErrorResponse(w, r, toAPIErrorCode(err), objectSource)
return
}
pipeReader, pipeWriter := io.Pipe()
go func() {
startOffset := int64(0) // Read the whole file.
// Get the object.
gErr := api.ObjectAPI.GetObject(sourceBucket, sourceObject, startOffset, objInfo.Size, pipeWriter)
if gErr != nil {
errorIf(gErr, "Unable to read an object.")
pipeWriter.CloseWithError(gErr)
return
}
pipeWriter.Close() // Close.
}()
// Size of object.
size := objInfo.Size
@ -413,7 +402,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// same md5sum as the source.
// Create the object.
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata)
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, pipeReader, metadata)
if err != nil {
errorIf(err, "Unable to create an object.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
@ -434,7 +423,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// write success response.
writeSuccessResponse(w, encodedSuccessResponse)
// Explicitly close the reader, to avoid fd leaks.
readCloser.Close()
pipeReader.Close()
}
// checkCopySource implements x-amz-copy-source-if-modified-since and

View File

@ -31,7 +31,7 @@ type ObjectLayer interface {
ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
// Object operations.
GetObject(bucket, object string, startOffset int64) (reader io.ReadCloser, err error)
GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error)
GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error)
PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (md5 string, err error)
DeleteObject(bucket, object string) error

View File

@ -19,15 +19,12 @@ package main
import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"path"
"regexp"
"strings"
"unicode/utf8"
"github.com/minio/minio/pkg/safe"
"github.com/skyrings/skyring-common/tools/uuid"
)
@ -160,18 +157,3 @@ type byBucketName []BucketInfo
func (d byBucketName) Len() int { return len(d) }
func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name }
// safeCloseAndRemove - safely closes and removes underlying temporary
// file writer if possible.
func safeCloseAndRemove(writer io.WriteCloser) error {
// If writer is a safe file, Attempt to close and remove.
safeWriter, ok := writer.(*safe.File)
if ok {
return safeWriter.Abort()
}
wCloser, ok := writer.(*waitCloser)
if ok {
return wCloser.CloseWithError(errors.New("Close and error out."))
}
return nil
}

View File

@ -20,7 +20,6 @@ import (
"bytes"
"crypto/md5"
"encoding/hex"
"io"
"math/rand"
"strconv"
@ -133,24 +132,21 @@ func testMultipleObjectCreation(c *check.C, create func() ObjectLayer) {
objects[key] = []byte(randomString)
metadata := make(map[string]string)
metadata["md5Sum"] = expectedMD5Sumhex
md5Sum, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
var md5Sum string
md5Sum, err = obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
c.Assert(err, check.IsNil)
c.Assert(md5Sum, check.Equals, expectedMD5Sumhex)
}
for key, value := range objects {
var byteBuffer bytes.Buffer
r, err := obj.GetObject("bucket", key, 0)
err = obj.GetObject("bucket", key, 0, int64(len(value)), &byteBuffer)
c.Assert(err, check.IsNil)
_, e := io.Copy(&byteBuffer, r)
c.Assert(e, check.IsNil)
c.Assert(byteBuffer.Bytes(), check.DeepEquals, value)
c.Assert(r.Close(), check.IsNil)
objInfo, err := obj.GetObjectInfo("bucket", key)
c.Assert(err, check.IsNil)
c.Assert(objInfo.Size, check.Equals, int64(len(value)))
r.Close()
}
}
@ -267,16 +263,14 @@ func testObjectOverwriteWorks(c *check.C, create func() ObjectLayer) {
_, err = obj.PutObject("bucket", "object", int64(len("The list of parts was not in ascending order. The parts list must be specified in order by part number.")), bytes.NewBufferString("The list of parts was not in ascending order. The parts list must be specified in order by part number."), nil)
c.Assert(err, check.IsNil)
_, err = obj.PutObject("bucket", "object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil)
length := int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."))
_, err = obj.PutObject("bucket", "object", length, bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil)
c.Assert(err, check.IsNil)
var bytesBuffer bytes.Buffer
r, err := obj.GetObject("bucket", "object", 0)
err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer)
c.Assert(err, check.IsNil)
_, e := io.Copy(&bytesBuffer, r)
c.Assert(e, check.IsNil)
c.Assert(string(bytesBuffer.Bytes()), check.Equals, "The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")
c.Assert(r.Close(), check.IsNil)
}
// Tests validate that bucket operation on non-existent bucket fails.
@ -303,17 +297,14 @@ func testPutObjectInSubdir(c *check.C, create func() ObjectLayer) {
err := obj.MakeBucket("bucket")
c.Assert(err, check.IsNil)
_, err = obj.PutObject("bucket", "dir1/dir2/object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil)
length := int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."))
_, err = obj.PutObject("bucket", "dir1/dir2/object", length, bytes.NewBufferString("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."), nil)
c.Assert(err, check.IsNil)
var bytesBuffer bytes.Buffer
r, err := obj.GetObject("bucket", "dir1/dir2/object", 0)
err = obj.GetObject("bucket", "dir1/dir2/object", 0, length, &bytesBuffer)
c.Assert(err, check.IsNil)
n, e := io.Copy(&bytesBuffer, r)
c.Assert(e, check.IsNil)
c.Assert(len(bytesBuffer.Bytes()), check.Equals, len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed."))
c.Assert(int64(len(bytesBuffer.Bytes())), check.Equals, int64(n))
c.Assert(r.Close(), check.IsNil)
}
// Tests validate ListBuckets.
@ -384,7 +375,8 @@ func testNonExistantObjectInBucket(c *check.C, create func() ObjectLayer) {
err := obj.MakeBucket("bucket")
c.Assert(err, check.IsNil)
_, err = obj.GetObject("bucket", "dir1", 0)
var bytesBuffer bytes.Buffer
err = obj.GetObject("bucket", "dir1", 0, 10, &bytesBuffer)
c.Assert(err, check.Not(check.IsNil))
switch err := err.(type) {
case ObjectNotFound:
@ -403,7 +395,8 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectLayer
_, err = obj.PutObject("bucket", "dir1/dir3/object", int64(len("The specified multipart upload does not exist. The upload ID might be invalid, or the multipart upload might have been aborted or completed.")), bytes.NewBufferString("One or more of the specified parts could not be found. The part might not have been uploaded, or the specified entity tag might not have matched the part's entity tag."), nil)
c.Assert(err, check.IsNil)
_, err = obj.GetObject("bucket", "dir1", 0)
var bytesBuffer bytes.Buffer
err = obj.GetObject("bucket", "dir1", 0, 10, &bytesBuffer)
switch err := err.(type) {
case ObjectNotFound:
c.Assert(err.Bucket, check.Equals, "bucket")
@ -413,7 +406,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() ObjectLayer
c.Assert(err, check.Equals, "ObjectNotFound")
}
_, err = obj.GetObject("bucket", "dir1/", 0)
err = obj.GetObject("bucket", "dir1/", 0, 10, &bytesBuffer)
switch err := err.(type) {
case ObjectNameInvalid:
c.Assert(err.Bucket, check.Equals, "bucket")

125
posix.go
View File

@ -17,23 +17,24 @@
package main
import (
"bytes"
"io"
"os"
slashpath "path"
"path/filepath"
"runtime"
"strings"
"syscall"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/safe"
)
const (
fsMinSpacePercent = 5
)
// fsStorage - implements StorageAPI interface.
type fsStorage struct {
// posix - implements StorageAPI interface.
type posix struct {
diskPath string
minFreeDisk int64
}
@ -90,7 +91,7 @@ func newPosix(diskPath string) (StorageAPI, error) {
if diskPath == "" {
return nil, errInvalidArgument
}
fs := fsStorage{
fs := posix{
diskPath: diskPath,
minFreeDisk: fsMinSpacePercent, // Minimum 5% disk should be free.
}
@ -169,7 +170,7 @@ func listVols(dirPath string) ([]VolInfo, error) {
// corresponding valid volume names on the backend in a platform
// compatible way for all operating systems. If volume is not found
// an error is generated.
func (s fsStorage) getVolDir(volume string) (string, error) {
func (s posix) getVolDir(volume string) (string, error) {
if !isValidVolname(volume) {
return "", errInvalidArgument
}
@ -181,7 +182,7 @@ func (s fsStorage) getVolDir(volume string) (string, error) {
}
// Make a volume entry.
func (s fsStorage) MakeVol(volume string) (err error) {
func (s posix) MakeVol(volume string) (err error) {
// Validate if disk is free.
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return err
@ -201,7 +202,7 @@ func (s fsStorage) MakeVol(volume string) (err error) {
}
// ListVols - list volumes.
func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
func (s posix) ListVols() (volsInfo []VolInfo, err error) {
volsInfo, err = listVols(s.diskPath)
if err != nil {
return nil, err
@ -217,7 +218,7 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
}
// StatVol - get volume info.
func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
func (s posix) StatVol(volume string) (volInfo VolInfo, err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -242,7 +243,7 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
}
// DeleteVol - delete a volume.
func (s fsStorage) DeleteVol(volume string) error {
func (s posix) DeleteVol(volume string) error {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -267,7 +268,7 @@ func (s fsStorage) DeleteVol(volume string) error {
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/".
func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) {
func (s posix) ListDir(volume, dirPath string) ([]string, error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -284,93 +285,128 @@ func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) {
return readDir(pathJoin(volumeDir, dirPath))
}
// ReadFile - read a file at a given offset.
func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) {
// ReadFile reads exactly len(buf) bytes into buf. It returns the
// number of bytes copied. The error is EOF only if no bytes were
// read. On return, n == len(buf) if and only if err == nil. n == 0
// for io.EOF. Additionally ReadFile also starts reading from an
// offset.
func (s posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) {
nsMutex.RLock(volume, path)
defer nsMutex.RUnlock(volume, path)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
return 0, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
return 0, errVolumeNotFound
}
return nil, err
return 0, err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return nil, err
return 0, err
}
file, err := os.Open(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil, errFileNotFound
return 0, errFileNotFound
} else if os.IsPermission(err) {
return nil, errFileAccessDenied
return 0, errFileAccessDenied
} else if strings.Contains(err.Error(), "not a directory") {
return nil, errFileNotFound
return 0, errFileNotFound
}
return nil, err
return 0, err
}
st, err := file.Stat()
if err != nil {
return nil, err
return 0, err
}
// Verify if its not a regular file, since subsequent Seek is undefined.
if !st.Mode().IsRegular() {
return nil, errFileNotFound
return 0, errFileNotFound
}
// Seek to requested offset.
_, err = file.Seek(offset, os.SEEK_SET)
if err != nil {
return nil, err
return 0, err
}
return file, nil
// Close the reader.
defer file.Close()
// Read file.
m, err := io.ReadFull(file, buf)
// Error unexpected is valid, set this back to nil.
if err == io.ErrUnexpectedEOF {
err = nil
}
// Success.
return int64(m), err
}
// CreateFile - create a file at path.
func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
// AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it.
func (s posix) AppendFile(volume, path string, buf []byte) (n int64, err error) {
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
return 0, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
return 0, errVolumeNotFound
}
return nil, err
return 0, err
}
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return nil, err
return 0, err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return nil, err
return 0, err
}
// Verify if the file already exists and is not of regular type.
var st os.FileInfo
if st, err = os.Stat(filePath); err == nil {
if st.IsDir() {
return nil, errIsNotRegular
return 0, errIsNotRegular
}
}
w, err := safe.CreateFile(filePath)
// Create top level directories if they don't exist.
if err = os.MkdirAll(filepath.Dir(filePath), 0700); err != nil {
return 0, err
}
w, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
// File path cannot be verified since one of the parents is a file.
if strings.Contains(err.Error(), "not a directory") {
return nil, errFileAccessDenied
return 0, errFileAccessDenied
}
return nil, err
return 0, err
}
return w, nil
// Close upon return.
defer w.Close()
// Return io.Copy
return io.Copy(w, bytes.NewReader(buf))
}
// StatFile - get file info.
func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) {
func (s posix) StatFile(volume, path string) (file FileInfo, err error) {
nsMutex.RLock(volume, path)
defer nsMutex.RUnlock(volume, path)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return FileInfo{}, err
@ -447,7 +483,10 @@ func deleteFile(basePath, deletePath string) error {
}
// DeleteFile - delete a file at path.
func (s fsStorage) DeleteFile(volume, path string) error {
func (s posix) DeleteFile(volume, path string) error {
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
@ -472,8 +511,14 @@ func (s fsStorage) DeleteFile(volume, path string) error {
return deleteFile(volumeDir, filePath)
}
// RenameFile - rename file.
func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
// RenameFile - rename source path to destination path atomically.
func (s posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
nsMutex.Lock(srcVolume, srcPath)
defer nsMutex.Unlock(srcVolume, srcPath)
nsMutex.Lock(dstVolume, dstPath)
defer nsMutex.Unlock(dstVolume, dstPath)
srcVolumeDir, err := s.getVolDir(srcVolume)
if err != nil {
return err

View File

@ -17,14 +17,8 @@
package main
import (
"errors"
"fmt"
"io"
"net/http"
"net/rpc"
"net/url"
urlpath "path"
"strconv"
"strings"
"time"
)
@ -151,34 +145,15 @@ func (n networkStorage) DeleteVol(volume string) error {
// File operations.
// CreateFile - create file.
func (n networkStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
writeURL := new(url.URL)
writeURL.Scheme = n.netScheme
writeURL.Host = n.netAddr
writeURL.Path = fmt.Sprintf("%s/upload/%s", storageRPCPath, urlpath.Join(volume, path))
contentType := "application/octet-stream"
readCloser, writeCloser := io.Pipe()
go func() {
resp, err := n.httpClient.Post(writeURL.String(), contentType, readCloser)
if err != nil {
readCloser.CloseWithError(err)
return
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
readCloser.CloseWithError(errFileNotFound)
return
}
readCloser.CloseWithError(errors.New("Invalid response."))
return
}
// Close the reader.
readCloser.Close()
}
}()
return writeCloser, nil
func (n networkStorage) AppendFile(volume, path string, buffer []byte) (m int64, err error) {
if err = n.rpcClient.Call("Storage.AppendFileHandler", AppendFileArgs{
Vol: volume,
Path: path,
Buffer: buffer,
}, &m); err != nil {
return 0, toStorageErr(err)
}
return m, nil
}
// StatFile - get latest Stat information for a file at path.
@ -193,27 +168,16 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er
}
// ReadFile - reads a file.
func (n networkStorage) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) {
readURL := new(url.URL)
readURL.Scheme = n.netScheme
readURL.Host = n.netAddr
readURL.Path = fmt.Sprintf("%s/download/%s", storageRPCPath, urlpath.Join(volume, path))
readQuery := make(url.Values)
readQuery.Set("offset", strconv.FormatInt(offset, 10))
readURL.RawQuery = readQuery.Encode()
resp, err := n.httpClient.Get(readURL.String())
if err != nil {
return nil, err
func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) {
if err = n.rpcClient.Call("Storage.ReadFileHandler", ReadFileArgs{
Vol: volume,
Path: path,
Offset: offset,
Buffer: buffer,
}, &m); err != nil {
return 0, toStorageErr(err)
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return nil, errFileNotFound
}
return nil, errors.New("Invalid response")
}
}
return resp.Body, nil
return m, nil
}
// ListDir - list all entries at prefix.

View File

@ -27,25 +27,40 @@ type ListVolsReply struct {
Vols []VolInfo
}
// StatFileArgs stat file args.
// ReadFileArgs contains read file arguments.
type ReadFileArgs struct {
Vol string
Path string
Offset int64
Buffer []byte
}
// AppendFileArgs contains append file arguments.
type AppendFileArgs struct {
Vol string
Path string
Buffer []byte
}
// StatFileArgs contains stat file arguments.
type StatFileArgs struct {
Vol string
Path string
}
// DeleteFileArgs delete file args.
// DeleteFileArgs contains delete file arguments.
type DeleteFileArgs struct {
Vol string
Path string
}
// ListDirArgs list dir args.
// ListDirArgs contains list dir arguments.
type ListDirArgs struct {
Vol string
Path string
}
// RenameFileArgs rename file args.
// RenameFileArgs contains rename file arguments.
type RenameFileArgs struct {
SrcVol string
SrcPath string

View File

@ -1,10 +1,7 @@
package main
import (
"io"
"net/http"
"net/rpc"
"strconv"
router "github.com/gorilla/mux"
)
@ -78,6 +75,26 @@ func (s *storageServer) ListDirHandler(arg *ListDirArgs, reply *[]string) error
return nil
}
// ReadFileHandler - read file handler is rpc wrapper to read file.
func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *int64) error {
n, err := s.storage.ReadFile(arg.Vol, arg.Path, arg.Offset, arg.Buffer)
if err != nil {
return err
}
reply = &n
return nil
}
// AppendFileHandler - append file handler is rpc wrapper to append file.
func (s *storageServer) AppendFileHandler(arg *AppendFileArgs, reply *int64) error {
n, err := s.storage.AppendFile(arg.Vol, arg.Path, arg.Buffer)
if err != nil {
return err
}
reply = &n
return nil
}
// DeleteFileHandler - delete file handler is rpc wrapper to delete file.
func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericReply) error {
err := s.storage.DeleteFile(arg.Vol, arg.Path)
@ -115,60 +132,4 @@ func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) {
storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
// Add minio storage routes.
storageRouter.Path("/storage").Handler(storageRPCServer)
// StreamUpload - stream upload handler.
storageRouter.Methods("POST").Path("/storage/upload/{volume}/{path:.+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := router.Vars(r)
volume := vars["volume"]
path := vars["path"]
writeCloser, err := stServer.storage.CreateFile(volume, path)
if err != nil {
httpErr := http.StatusInternalServerError
if err == errVolumeNotFound {
httpErr = http.StatusNotFound
} else if err == errIsNotRegular {
httpErr = http.StatusConflict
}
http.Error(w, err.Error(), httpErr)
return
}
reader := r.Body
if _, err = io.Copy(writeCloser, reader); err != nil {
safeCloseAndRemove(writeCloser)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
writeCloser.Close()
reader.Close()
})
// StreamDownloadHandler - stream download handler.
storageRouter.Methods("GET").Path("/storage/download/{volume}/{path:.+}").Queries("offset", "{offset:.*}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := router.Vars(r)
volume := vars["volume"]
path := vars["path"]
offset, err := strconv.ParseInt(r.URL.Query().Get("offset"), 10, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
readCloser, err := stServer.storage.ReadFile(volume, path, offset)
if err != nil {
httpErr := http.StatusBadRequest
if err == errVolumeNotFound {
httpErr = http.StatusNotFound
} else if err == errFileNotFound {
httpErr = http.StatusNotFound
}
http.Error(w, err.Error(), httpErr)
return
}
// Copy reader to writer.
io.Copy(w, readCloser)
// Flush out any remaining buffers to client.
w.(http.Flusher).Flush()
// Close the reader.
readCloser.Close()
})
}

View File

@ -444,7 +444,7 @@ func (s *MyAPISuite) TestBucket(c *C) {
c.Assert(response.StatusCode, Equals, http.StatusOK)
}
func (s *MyAPISuite) TestObject(c *C) {
func (s *MyAPISuite) TestObjectGet(c *C) {
buffer := bytes.NewReader([]byte("hello world"))
request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/testobject", 0, nil)
c.Assert(err, IsNil)

View File

@ -18,6 +18,9 @@ package main
import "errors"
// errUnexpected - unexpected error, requires manual intervention.
var errUnexpected = errors.New("Unexpected error, please report this issue at https://github.com/minio/minio/issues")
// errCorruptedFormat - corrupted backend format.
var errCorruptedFormat = errors.New("corrupted backend format")

View File

@ -16,8 +16,6 @@
package main
import "io"
// StorageAPI interface.
type StorageAPI interface {
// Volume operations.
@ -28,9 +26,9 @@ type StorageAPI interface {
// File operations.
ListDir(volume, dirPath string) ([]string, error)
ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error)
CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error)
ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error)
AppendFile(volume string, path string, buf []byte) (n int64, err error)
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error)
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
}

View File

@ -18,7 +18,6 @@ package main
import (
"fmt"
"io"
"net/http"
"os"
"path"
@ -383,12 +382,14 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
// Add content disposition.
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filepath.Base(object)))
objReader, err := web.ObjectAPI.GetObject(bucket, object, 0)
objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil {
writeWebErrorResponse(w, err)
return
}
if _, err := io.Copy(w, objReader); err != nil {
offset := int64(0)
err = web.ObjectAPI.GetObject(bucket, object, offset, objInfo.Size, w)
if err != nil {
/// No need to print error, response writer already written to.
return
}

View File

@ -1,6 +1,7 @@
package main
import (
"encoding/json"
"path"
"sync"
)
@ -41,19 +42,20 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro
go func(index int, disk StorageAPI) {
defer wg.Done()
offset := int64(0)
metadataReader, err := disk.ReadFile(bucket, xlMetaPath, offset)
var buffer = make([]byte, blockSize)
n, err := disk.ReadFile(bucket, xlMetaPath, offset, buffer)
if err != nil {
errs[index] = err
return
}
defer metadataReader.Close()
_, err = metadataArray[index].ReadFrom(metadataReader)
err = json.Unmarshal(buffer[:n], &metadataArray[index])
if err != nil {
// Unable to parse xl.json, set error.
errs[index] = err
return
}
buffer = nil
errs[index] = nil
}(index, disk)
}

View File

@ -17,9 +17,7 @@
package main
import (
"bytes"
"encoding/json"
"io"
"math/rand"
"path"
"sort"
@ -72,28 +70,6 @@ type xlMetaV1 struct {
Parts []objectPartInfo `json:"parts,omitempty"`
}
// ReadFrom - read from implements io.ReaderFrom interface for
// unmarshalling xlMetaV1.
func (m *xlMetaV1) ReadFrom(reader io.Reader) (n int64, err error) {
var buffer bytes.Buffer
n, err = buffer.ReadFrom(reader)
if err != nil {
return 0, err
}
err = json.Unmarshal(buffer.Bytes(), m)
return n, err
}
// WriteTo - write to implements io.WriterTo interface for marshalling xlMetaV1.
func (m xlMetaV1) WriteTo(writer io.Writer) (n int64, err error) {
metadataBytes, err := json.Marshal(&m)
if err != nil {
return 0, err
}
p, err := writer.Write(metadataBytes)
return int64(p), err
}
// byPartName is a collection satisfying sort.Interface.
type byPartNumber []objectPartInfo
@ -164,14 +140,16 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
// Count for errors encountered.
var xlJSONErrCount = 0
// Allocate 4MB buffer.
var buffer = make([]byte, blockSize)
// Return the first successful lookup from a random list of disks.
for xlJSONErrCount < len(xl.storageDisks) {
var r io.ReadCloser
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
r, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0))
var n int64
n, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0), buffer)
if err == nil {
defer r.Close()
_, err = xlMeta.ReadFrom(r)
err = json.Unmarshal(buffer[:n], &xlMeta)
if err == nil {
return xlMeta, nil
}
@ -195,11 +173,45 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
return xlMeta
}
func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile)
// Rename `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
wg.Add(1)
// Rename `xl.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
if rErr != nil {
mErrs[index] = rErr
return
}
mErrs[index] = nil
}(index, disk)
}
// Wait for all the routines.
wg.Wait()
// Return the first error.
for _, err := range mErrs {
if err == nil {
continue
}
return err
}
return nil
}
// writeXLMetadata - write `xl.json` on all disks in order.
func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
jsonFile := path.Join(prefix, xlMetaJSONFile)
// Start writing `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
wg.Add(1)
@ -207,33 +219,21 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
go func(index int, disk StorageAPI, metadata xlMetaV1) {
defer wg.Done()
metaJSONFile := path.Join(prefix, xlMetaJSONFile)
metaWriter, mErr := disk.CreateFile(bucket, metaJSONFile)
if mErr != nil {
mErrs[index] = mErr
return
}
// Save the disk order index.
metadata.Erasure.Index = index + 1
// Marshal metadata to the writer.
_, mErr = metadata.WriteTo(metaWriter)
metadataBytes, err := json.Marshal(&metadata)
if err != nil {
mErrs[index] = err
return
}
n, mErr := disk.AppendFile(bucket, jsonFile, metadataBytes)
if mErr != nil {
if mErr = safeCloseAndRemove(metaWriter); mErr != nil {
mErrs[index] = mErr
return
}
mErrs[index] = mErr
return
}
// Verify if close fails with an error.
if mErr = metaWriter.Close(); mErr != nil {
if mErr = safeCloseAndRemove(metaWriter); mErr != nil {
mErrs[index] = mErr
return
}
mErrs[index] = mErr
if n != int64(len(metadataBytes)) {
mErrs[index] = errUnexpected
return
}
mErrs[index] = nil

View File

@ -17,9 +17,7 @@
package main
import (
"bytes"
"encoding/json"
"io"
"path"
"sort"
"strings"
@ -70,27 +68,6 @@ func (u uploadsV1) Index(uploadID string) int {
return -1
}
// ReadFrom - read from implements io.ReaderFrom interface for unmarshalling uploads.
func (u *uploadsV1) ReadFrom(reader io.Reader) (n int64, err error) {
var buffer bytes.Buffer
n, err = buffer.ReadFrom(reader)
if err != nil {
return 0, err
}
err = json.Unmarshal(buffer.Bytes(), &u)
return n, err
}
// WriteTo - write to implements io.WriterTo interface for marshalling uploads.
func (u uploadsV1) WriteTo(writer io.Writer) (n int64, err error) {
metadataBytes, err := json.Marshal(&u)
if err != nil {
return 0, err
}
m, err := writer.Write(metadataBytes)
return int64(m), err
}
// readUploadsJSON - get all the saved uploads JSON.
func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) {
uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
@ -104,17 +81,18 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI
// Read `uploads.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
r, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0))
var buffer = make([]byte, blockSize)
n, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0), buffer)
if rErr != nil {
errs[index] = rErr
return
}
defer r.Close()
_, rErr = uploads[index].ReadFrom(r)
rErr = json.Unmarshal(buffer[:n], &uploads[index])
if rErr != nil {
errs[index] = rErr
return
}
buffer = nil
errs[index] = nil
}(index, disk)
}
@ -136,6 +114,7 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI
// uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks.
func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error {
uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile)
var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{}
@ -145,21 +124,21 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk
// Update `uploads.json` in routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
w, wErr := disk.CreateFile(minioMetaBucket, uploadsPath)
uploadsBytes, wErr := json.Marshal(uploadsJSON)
if wErr != nil {
errs[index] = wErr
return
}
_, wErr = uploadsJSON.WriteTo(w)
n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsBytes)
if wErr != nil {
errs[index] = wErr
return
}
if wErr = w.Close(); wErr != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
errs[index] = clErr
return
}
if n != int64(len(uploadsBytes)) {
errs[index] = errUnexpected
return
}
if wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath); wErr != nil {
errs[index] = wErr
return
}
@ -219,22 +198,18 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora
// Update `uploads.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
w, wErr := disk.CreateFile(minioMetaBucket, tmpUploadsPath)
uploadsJSONBytes, wErr := json.Marshal(&uploadsJSON)
if wErr != nil {
errs[index] = wErr
return
}
_, wErr = uploadsJSON.WriteTo(w)
n, wErr := disk.AppendFile(minioMetaBucket, tmpUploadsPath, uploadsJSONBytes)
if wErr != nil {
errs[index] = wErr
return
}
if wErr = w.Close(); wErr != nil {
if clErr := safeCloseAndRemove(w); clErr != nil {
errs[index] = clErr
return
}
errs[index] = wErr
if n != int64(len(uploadsJSONBytes)) {
errs[index] = errUnexpected
return
}
wErr = disk.RenameFile(minioMetaBucket, tmpUploadsPath, minioMetaBucket, uploadsPath)

View File

@ -21,7 +21,6 @@ import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"path"
"path/filepath"
"strings"
@ -143,62 +142,37 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix)
fileWriter, err := erasure.CreateFile(minioMetaBucket, tmpPartPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
buf := make([]byte, blockSize)
for {
var n int
n, err = io.ReadFull(data, buf)
if err == io.EOF {
break
}
if err != nil && err != io.ErrUnexpectedEOF {
return "", toObjectErr(err, bucket, object)
}
// Reader shouldn't have more data what mentioned in size argument.
// reading one more byte from the reader to validate it.
// expected to fail, success validates existence of more data in the reader.
if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", UnExpectedDataSize{Size: int(size)}
// Update md5 writer.
md5Writer.Write(buf[:n])
var m int64
m, err = erasure.AppendFile(minioMetaBucket, tmpPartPath, buf[:n])
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
}
} else {
var n int64
if n, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
if m != int64(len(buf[:n])) {
return "", toObjectErr(errUnexpected, bucket, object)
}
size = n
}
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
if newMD5Hex != md5Hex {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", err
}
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil {
@ -209,9 +183,17 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
if err = xl.writeXLMetadata(minioMetaBucket, uploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
}
// Return success.
return newMD5Hex, nil
}
@ -389,8 +371,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Save successfully calculated md5sum.
xlMeta.Meta["md5Sum"] = s3MD5
if err = xl.writeXLMetadata(minioMetaBucket, uploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID)
if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
rErr := xl.renameXLMetadata(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
}
// Hold write lock on the destination before rename

View File

@ -1,6 +1,7 @@
package main
import (
"bytes"
"crypto/md5"
"encoding/hex"
"io"
@ -13,23 +14,17 @@ import (
"github.com/minio/minio/pkg/mimedb"
)
// nullReadCloser - returns 0 bytes and io.EOF upon first read attempt.
type nullReadCloser struct{}
func (n nullReadCloser) Read([]byte) (int, error) { return 0, io.EOF }
func (n nullReadCloser) Close() error { return nil }
/// Object Operations
// GetObject - get an object.
func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, error) {
func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return nil, BucketNameInvalid{Bucket: bucket}
return BucketNameInvalid{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return nil, ObjectNameInvalid{Bucket: bucket, Object: object}
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Lock the object before reading.
@ -39,18 +34,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
// Read metadata associated with the object.
xlMeta, err := xl.readXLMetadata(bucket, object)
if err != nil {
return nil, toObjectErr(err, bucket, object)
return toObjectErr(err, bucket, object)
}
// List all online disks.
onlineDisks, _, err := xl.listOnlineDisks(bucket, object)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
// For zero byte files, return a null reader.
if xlMeta.Stat.Size == 0 {
return nullReadCloser{}, nil
return toObjectErr(err, bucket, object)
}
// Initialize a new erasure with online disks, with previous block distribution.
@ -59,44 +49,36 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
// Get part index offset.
partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset)
if err != nil {
return nil, toObjectErr(err, bucket, object)
return toObjectErr(err, bucket, object)
}
fileReader, fileWriter := io.Pipe()
// Hold a read lock once more which can be released after the following go-routine ends.
// We hold RLock once more because the current function would return before the go routine below
// executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call).
nsMutex.RLock(bucket, object)
go func() {
defer nsMutex.RUnlock(bucket, object)
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size)
totalLeft := length
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
totalPartSize := part.Size
for totalPartSize > 0 {
var buffer io.Reader
buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size)
if err != nil {
fileWriter.CloseWithError(toObjectErr(err, bucket, object))
return
return err
}
// Reset part offset to 0 to read rest of the parts from
// the beginning.
partOffset = 0
if _, err = io.Copy(fileWriter, r); err != nil {
switch reader := r.(type) {
case *io.PipeReader:
reader.CloseWithError(err)
case io.ReadCloser:
reader.Close()
if int64(buffer.(*bytes.Buffer).Len()) > totalLeft {
if _, err := io.CopyN(writer, buffer, totalLeft); err != nil {
return err
}
fileWriter.CloseWithError(toObjectErr(err, bucket, object))
return
return nil
}
// Close the readerCloser that reads multiparts of an object.
// Not closing leaks underlying file descriptors.
r.Close()
n, err := io.Copy(writer, buffer)
if err != nil {
return err
}
totalLeft -= n
totalPartSize -= n
partOffset += n
}
fileWriter.Close()
}()
return fileReader, nil
// Reset part offset to 0 to read rest of the parts from the beginning.
partOffset = 0
}
return nil
}
// GetObjectInfo - get object info.
@ -240,31 +222,29 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Initialize a new erasure with online disks and new distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Initialize md5 writer.
md5Writer := md5.New()
// Instantiate a new multi writer.
multiWriter := io.MultiWriter(md5Writer, fileWriter)
// Instantiate checksum hashers and create a multiwriter.
if size > 0 {
if _, err = io.CopyN(multiWriter, data, size); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
buf := make([]byte, blockSize)
for {
var n int
n, err = io.ReadFull(data, buf)
if err == io.EOF {
break
}
if err != nil && err != io.ErrUnexpectedEOF {
return "", toObjectErr(err, bucket, object)
}
} else {
if _, err = io.Copy(multiWriter, data); err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
// Update md5 writer.
md5Writer.Write(buf[:n])
var m int64
m, err = erasure.AppendFile(minioMetaBucket, tempErasureObj, buf[:n])
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
}
if m != int64(len(buf[:n])) {
return "", toObjectErr(errUnexpected, bucket, object)
}
}
@ -292,21 +272,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
md5Hex := metadata["md5Sum"]
if md5Hex != "" {
if newMD5Hex != md5Hex {
if err = safeCloseAndRemove(fileWriter); err != nil {
return "", toObjectErr(err, bucket, object)
}
return "", BadDigest{md5Hex, newMD5Hex}
}
}
err = fileWriter.Close()
if err != nil {
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
return "", toObjectErr(clErr, bucket, object)
}
return "", toObjectErr(err, bucket, object)
}
// Check if an object is present as one of the parent dir.
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return "", toObjectErr(errFileAccessDenied, bucket, object)
@ -329,10 +298,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
xlMeta.Stat.ModTime = modTime
xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size)
if err = xl.writeXLMetadata(bucket, object, xlMeta); err != nil {
if err = xl.writeXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), xlMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
rErr := xl.renameXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), bucket, object)
if rErr != nil {
return "", toObjectErr(rErr, bucket, object)
}
// Return md5sum, successfully wrote object.
return newMD5Hex, nil
}