XL: Make allocations simpler avoid redundant allocs. (#1961)

- Reduce 10MiB buffers for loopy calls to use 128KiB.
- start using 128KiB buffer where needed.
This commit is contained in:
Harshavardhana 2016-06-24 02:06:23 -07:00 committed by GitHub
parent ff9fc22c72
commit e8990e42c2
11 changed files with 374 additions and 179 deletions

View File

@ -17,6 +17,7 @@
package main package main
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"errors" "errors"
"io" "io"
@ -25,6 +26,59 @@ import (
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
) )
// isSuccessDecodeBlocks - do we have all the blocks to be successfully decoded?.
// input disks here are expected to be ordered i.e parityBlocks
// are preceded by dataBlocks. For for information look at getOrderedDisks().
func isSuccessDecodeBlocks(disks []StorageAPI, dataBlocks int) bool {
// Count number of data and parity blocks that were read.
var successDataBlocksCount = 0
var successParityBlocksCount = 0
for index, disk := range disks {
if disk == nil {
continue
}
if index < dataBlocks {
successDataBlocksCount++
continue
}
successParityBlocksCount++
}
// Returns true if we have atleast dataBlocks + 1 parity.
return successDataBlocksCount+successParityBlocksCount >= dataBlocks+1
}
// isSuccessDataBlocks - do we have all the data blocks?
// input disks here are expected to be ordered i.e parityBlocks
// are preceded by dataBlocks. For for information look at getOrderedDisks().
func isSuccessDataBlocks(disks []StorageAPI, dataBlocks int) bool {
// Count number of data blocks that were read.
var successDataBlocksCount = 0
for index, disk := range disks[:dataBlocks] {
if disk == nil {
continue
}
if index < dataBlocks {
successDataBlocksCount++
}
}
// Returns true if we have all the dataBlocks.
return successDataBlocksCount >= dataBlocks
}
// getOrderedDisks - get ordered disks from erasure distribution.
// returns ordered slice of disks from their actual distribution.
func getOrderedDisks(distribution []int, disks []StorageAPI, blockCheckSums []checkSumInfo) (orderedDisks []StorageAPI, orderedBlockCheckSums []checkSumInfo) {
orderedDisks = make([]StorageAPI, len(disks))
orderedBlockCheckSums = make([]checkSumInfo, len(disks))
// From disks gets ordered disks.
for index := range disks {
blockIndex := distribution[index]
orderedDisks[blockIndex-1] = disks[index]
orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index]
}
return orderedDisks, orderedBlockCheckSums
}
// erasureReadFile - read bytes from erasure coded files and writes to given writer. // erasureReadFile - read bytes from erasure coded files and writes to given writer.
// Erasure coded files are read block by block as per given erasureInfo and data chunks // Erasure coded files are read block by block as per given erasureInfo and data chunks
// are decoded into a data block. Data block is trimmed for given offset and length, // are decoded into a data block. Data block is trimmed for given offset and length,
@ -36,26 +90,24 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
// Gather previously calculated block checksums. // Gather previously calculated block checksums.
blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName)
orderedBlockCheckSums := make([]checkSumInfo, len(disks))
// []orderedDisks will have first eInfo.DataBlocks disks as data disks and rest will be parity. // []orderedDisks will have first eInfo.DataBlocks disks as data
orderedDisks := make([]StorageAPI, len(disks)) // disks and rest will be parity.
for index := range disks { orderedDisks, orderedBlockCheckSums := getOrderedDisks(eInfo.Distribution, disks, blockCheckSums)
blockIndex := eInfo.Distribution[index]
orderedDisks[blockIndex-1] = disks[index]
orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index]
}
// bitrotVerify verifies if the file on a particular disk does not have bitrot by verifying the hash of // bitrotVerify verifies if the file on a particular disk doesn't have bitrot
// the contents of the file. // by verifying the hash of the contents of the file.
bitrotVerify := func() func(diskIndex int) bool { bitrotVerify := func() func(diskIndex int) bool {
verified := make([]bool, len(orderedDisks)) verified := make([]bool, len(orderedDisks))
// Return closure so that we have reference to []verified and not recalculate the hash on it // Return closure so that we have reference to []verified and
// everytime the function is called for the same disk. // not recalculate the hash on it everytime the function is
// called for the same disk.
return func(diskIndex int) bool { return func(diskIndex int) bool {
if verified[diskIndex] { if verified[diskIndex] {
// Already validated.
return true return true
} }
// Is this a valid block?
isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex]) isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex])
verified[diskIndex] = isValid verified[diskIndex] = isValid
return isValid return isValid
@ -65,128 +117,166 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
// Total bytes written to writer // Total bytes written to writer
bytesWritten := int64(0) bytesWritten := int64(0)
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk.
enBlocks := make([][]byte, len(orderedDisks))
// chunkSize is roughly BlockSize/DataBlocks. // chunkSize is roughly BlockSize/DataBlocks.
// chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes.
// So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by
// DataBlocks. The extra space will have 0-padding. // DataBlocks. The extra space will have 0-padding.
chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks)
// Get start and end block, also bytes to be skipped based on the input offset.
startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize) startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize)
// For each block, read chunk from each disk. If we are able to read all the data disks then we don't // For each block, read chunk from each disk. If we are able to read all the data disks then we don't
// need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number
// of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer.
for block := startBlock; bytesWritten < length; block++ { for block := startBlock; bytesWritten < length; block++ {
// curChunkSize will be chunkSize except for the last block because the size of the last block // curChunkSize is chunkSize until end block.
// can be less than BlockSize.
curChunkSize := chunkSize curChunkSize := chunkSize
if block == endBlock && (totalLength%eInfo.BlockSize != 0) { if block == endBlock && (totalLength%eInfo.BlockSize != 0) {
// If this is the last block and size of the block is < BlockSize. // If this is the last block and size of the block is < BlockSize.
curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks)
} }
// Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. // Block offset.
enBlocks := make([][]byte, len(disks)) // NOTE: That for the offset calculation we have to use chunkSize and
// not curChunkSize. If we use curChunkSize for offset calculation
// then it can result in wrong offset for the last block.
blockOffset := block * chunkSize
// Figure out the number of disks that are needed for the read. // Figure out the number of disks that are needed for the read.
// We will need DataBlocks number of disks if all the data disks are up. // We will need DataBlocks number of disks if all the data disks are up.
// We will need DataBlocks+1 number of disks even if one of the data disks is down. // We will need DataBlocks+1 number of disks even if one of the data disks is down.
diskCount := 0 readableDiskCount := 0
// Count the number of data disks that are up. // Count the number of data disks that are up.
for _, disk := range orderedDisks[:eInfo.DataBlocks] { for _, disk := range orderedDisks[:eInfo.DataBlocks] {
if disk == nil { if disk == nil {
continue continue
} }
diskCount++ readableDiskCount++
} }
if diskCount < eInfo.DataBlocks { // Readable disks..
// Not enough data disks up, so we need DataBlocks+1 number of disks for reed-solomon Reconstruct() if readableDiskCount < eInfo.DataBlocks {
diskCount = eInfo.DataBlocks + 1 // Not enough data disks up, so we need DataBlocks+1 number
// of disks for reed-solomon Reconstruct()
readableDiskCount = eInfo.DataBlocks + 1
} }
wg := &sync.WaitGroup{} // Initialize wait group.
var wg = &sync.WaitGroup{}
// current disk index from which to read, this will be used later in case one of the parallel reads fails. // Current disk index from which to read, this will be used later
// in case one of the parallel reads fails.
index := 0 index := 0
// Read from the disks in parallel. // Read from the disks in parallel.
for _, disk := range orderedDisks { for _, disk := range orderedDisks {
if disk == nil { if disk == nil {
index++ index++
continue continue
} }
// Increment wait group.
wg.Add(1) wg.Add(1)
// Start reading from disk in a go-routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
ok := bitrotVerify(index)
if !ok { // Verify bit rot for this disk slice.
if !bitrotVerify(index) {
// So that we don't read from this disk for the next block. // So that we don't read from this disk for the next block.
orderedDisks[index] = nil orderedDisks[index] = nil
return return
} }
buf := make([]byte, curChunkSize)
// Note that for the offset calculation we have to use chunkSize and not // Chunk writer.
// curChunkSize. If we use curChunkSize for offset calculation then it chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize))
// can result in wrong offset for the last block.
n, err := disk.ReadFile(volume, path, block*chunkSize, buf) // CopyN copies until current chunk size.
err := copyN(chunkWriter, disk, volume, path, blockOffset, curChunkSize)
if err != nil { if err != nil {
// So that we don't read from this disk for the next block. // So that we don't read from this disk for the next block.
orderedDisks[index] = nil orderedDisks[index] = nil
return return
} }
enBlocks[index] = buf[:n]
// Copy the read blocks.
enBlocks[index] = chunkWriter.Bytes()
// Reset the buffer.
chunkWriter.Reset()
// Successfully read.
}(index, disk) }(index, disk)
index++ index++
diskCount-- readableDiskCount--
if diskCount == 0 { // We have read all the readable disks.
if readableDiskCount == 0 {
break break
} }
} }
// Wait for all the reads to finish.
wg.Wait() wg.Wait()
// Count number of data and parity blocks that were read. // FIXME: make this parallel.
var successDataBlocksCount = 0
var successParityBlocksCount = 0
for bufidx, buf := range enBlocks {
if buf == nil {
continue
}
if bufidx < eInfo.DataBlocks {
successDataBlocksCount++
continue
}
successParityBlocksCount++
}
if successDataBlocksCount < eInfo.DataBlocks { // If we have all the data blocks no need to decode.
// If we don't have DataBlocks number of data blocks we will have to read enough if !isSuccessDataBlocks(orderedDisks, eInfo.DataBlocks) {
// parity blocks such that we have DataBlocks+1 number for blocks for reedsolomon.Reconstruct() // If we don't have DataBlocks number of data blocks we
// will have to read enough parity blocks such that we
// have DataBlocks+1 number for blocks for rs.Reconstruct().
// index is either dataBlocks or dataBlocks + 1.
for ; index < len(orderedDisks); index++ { for ; index < len(orderedDisks); index++ {
if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { // We have enough blocks to decode, break out.
// We have DataBlocks+1 blocks, enough for reedsolomon.Reconstruct() if isSuccessDecodeBlocks(orderedDisks, eInfo.DataBlocks) {
// We have DataBlocks+1 blocks, enough for rs.Reconstruct()
break break
} }
ok := bitrotVerify(index)
if !ok { // This disk was previously set to nil and ignored, do not read again.
if orderedDisks[index] == nil {
continue
}
// Verify bit-rot for this index.
if !bitrotVerify(index) {
// Mark nil so that we don't read from this disk for the next block. // Mark nil so that we don't read from this disk for the next block.
orderedDisks[index] = nil orderedDisks[index] = nil
continue continue
} }
buf := make([]byte, curChunkSize)
n, err := orderedDisks[index].ReadFile(volume, path, block*chunkSize, buf) // Chunk writer.
chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize))
// CopyN copies until current chunk size.
err := copyN(chunkWriter, orderedDisks[index], volume, path, blockOffset, curChunkSize)
if err != nil { if err != nil {
// Mark nil so that we don't read from this disk for the next block. // ERROR: Mark nil so that we don't read from
// this disk for the next block.
orderedDisks[index] = nil orderedDisks[index] = nil
continue continue
} }
successParityBlocksCount++
enBlocks[index] = buf[:n] // Copy the read blocks.
chunkWriter.Read(enBlocks[index])
// Reset the buffer.
chunkWriter.Reset()
} }
// Reconstruct the missing data blocks. // Reconstruct the missing data blocks.
err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks)
if err != nil { if err != nil {
return bytesWritten, err return bytesWritten, err
} }
// Success.
} }
var outSize, outOffset int64 var outSize, outOffset int64
@ -209,14 +299,18 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
// We should not send more data than what was requested. // We should not send more data than what was requested.
outSize = length - bytesWritten outSize = length - bytesWritten
} }
// Write data blocks. // Write data blocks.
n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize) n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize)
if err != nil { if err != nil {
return bytesWritten, err return bytesWritten, err
} }
// Update total bytes written.
bytesWritten += n bytesWritten += n
} }
// Success.
return bytesWritten, nil return bytesWritten, nil
} }
@ -273,14 +367,18 @@ func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumIn
// decodeData - decode encoded blocks. // decodeData - decode encoded blocks.
func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
// Initialized reedsolomon.
rs, err := reedsolomon.New(dataBlocks, parityBlocks) rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil { if err != nil {
return err return err
} }
// Reconstruct encoded blocks.
err = rs.Reconstruct(enBlocks) err = rs.Reconstruct(enBlocks)
if err != nil { if err != nil {
return err return err
} }
// Verify reconstructed blocks (parity). // Verify reconstructed blocks (parity).
ok, err := rs.Verify(enBlocks) ok, err := rs.Verify(enBlocks)
if err != nil { if err != nil {
@ -291,5 +389,7 @@ func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
err = errors.New("Verification failed after reconstruction, data likely corrupted.") err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return err return err
} }
// Success.
return nil return nil
} }

View File

@ -18,6 +18,7 @@ package main
import ( import (
"bytes" "bytes"
"errors"
"hash" "hash"
"io" "io"
@ -46,21 +47,17 @@ func newHash(algo string) hash.Hash {
} }
} }
// hashSum calculates the hash of the entire path and returns.
func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) {
startOffset := int64(0) // Allocate staging buffer of 128KiB for copyBuffer.
// Read until io.EOF. buf := make([]byte, 128*1024)
for {
buf := make([]byte, blockSizeV1) // Copy entire buffer to writer.
n, err := disk.ReadFile(volume, path, startOffset, buf) if err := copyBuffer(writer, disk, volume, path, buf); err != nil {
if err == io.EOF {
break
}
if err != nil {
return nil, err return nil, err
} }
writer.Write(buf[:n])
startOffset += n // Return the final hash sum.
}
return writer.Sum(nil), nil return writer.Sum(nil), nil
} }
@ -149,3 +146,86 @@ func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64)
curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks)
return curEncBlockSize return curEncBlockSize
} }
// copyN - copies from disk, volume, path to input writer until length
// is reached at volume, path or an error occurs. A success copyN returns
// err == nil, not err == EOF. Additionally offset can be provided to start
// the read at. copyN returns io.EOF if there aren't enough data to be read.
func copyN(writer io.Writer, disk StorageAPI, volume string, path string, offset int64, length int64) (err error) {
// Use 128KiB staging buffer to read upto length.
buf := make([]byte, 128*1024)
// Read into writer until length.
for length > 0 {
nr, er := disk.ReadFile(volume, path, offset, buf)
if nr > 0 {
nw, ew := writer.Write(buf[0:nr])
if nw > 0 {
// Decrement the length.
length -= int64(nw)
// Progress the offset.
offset += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != int64(nw) {
err = io.ErrShortWrite
break
}
}
if er == io.EOF || er == io.ErrUnexpectedEOF {
break
}
if er != nil {
err = er
}
}
// Success.
return err
}
// copyBuffer - copies from disk, volume, path to input writer until either EOF
// is reached at volume, path or an error occurs. A success copyBuffer returns
// err == nil, not err == EOF. Because copyBuffer is defined to read from path
// until EOF. It does not treat an EOF from ReadFile an error to be reported.
// Additionally copyBuffer stages through the provided buffer; otherwise if it
// has zero length, returns error.
func copyBuffer(writer io.Writer, disk StorageAPI, volume string, path string, buf []byte) error {
// Error condition of zero length buffer.
if buf != nil && len(buf) == 0 {
return errors.New("empty buffer in readBuffer")
}
// Starting offset for Reading the file.
startOffset := int64(0)
// Read until io.EOF.
for {
n, err := disk.ReadFile(volume, path, startOffset, buf)
if n > 0 {
var m int
m, err = writer.Write(buf[:n])
if err != nil {
return err
}
if int64(m) != n {
return io.ErrShortWrite
}
}
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return err
}
// Progress the offset.
startOffset += n
}
// Success.
return nil
}

View File

@ -17,6 +17,7 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -346,9 +347,10 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1)
// loadFormat - loads format.json from disk. // loadFormat - loads format.json from disk.
func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
var buffer []byte // Allocate staging buffer of 32KiB for copyBuffer.
buffer, err = readAll(disk, minioMetaBucket, formatConfigFile) buf := make([]byte, 32*1024)
if err != nil { var buffer = new(bytes.Buffer)
if err = copyBuffer(buffer, disk, minioMetaBucket, formatConfigFile, buf); err != nil {
// 'file not found' and 'volume not found' as // 'file not found' and 'volume not found' as
// same. 'volume not found' usually means its a fresh disk. // same. 'volume not found' usually means its a fresh disk.
if err == errFileNotFound || err == errVolumeNotFound { if err == errFileNotFound || err == errVolumeNotFound {
@ -366,11 +368,15 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
} }
return nil, err return nil, err
} }
// Try to decode format json into formatConfigV1 struct.
format = &formatConfigV1{} format = &formatConfigV1{}
err = json.Unmarshal(buffer, format) d := json.NewDecoder(buffer)
if err != nil { if err = d.Decode(format); err != nil {
return nil, err return nil, err
} }
// Success.
return format, nil return format, nil
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"path" "path"
"sort" "sort"
@ -57,16 +58,23 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
} }
// readFSMetadata - returns the object metadata `fs.json` content. // readFSMetadata - returns the object metadata `fs.json` content.
func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) { func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, err error) {
var buffer []byte // 32KiB staging buffer for copying `fs.json`.
buffer, err = readAll(fs.storage, bucket, path.Join(object, fsMetaJSONFile)) var buf = make([]byte, 32*1024)
if err != nil {
// `fs.json` writer.
var buffer = new(bytes.Buffer)
if err = copyBuffer(buffer, disk, bucket, path.Join(object, fsMetaJSONFile), buf); err != nil {
return fsMetaV1{}, err return fsMetaV1{}, err
} }
err = json.Unmarshal(buffer, &fsMeta)
if err != nil { // Decode `fs.json` into fsMeta structure.
d := json.NewDecoder(buffer)
if err = d.Decode(&fsMeta); err != nil {
return fsMetaV1{}, err return fsMetaV1{}, err
} }
// Success.
return fsMeta, nil return fsMeta, nil
} }

View File

@ -299,7 +299,8 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
var buf = make([]byte, blockSizeV1) // Allocate 32KiB buffer for staging buffer.
var buf = make([]byte, 128*1024)
for { for {
n, err := io.ReadFull(data, buf) n, err := io.ReadFull(data, buf)
if err == io.EOF { if err == io.EOF {
@ -331,7 +332,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath) return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
} }
@ -367,7 +368,7 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
result := ListPartsInfo{} result := ListPartsInfo{}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath)
if err != nil { if err != nil {
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath) return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath)
} }
@ -475,7 +476,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Read saved fs metadata for ongoing multipart. // Read saved fs metadata for ongoing multipart.
fsMeta, err := fs.readFSMetadata(minioMetaBucket, uploadIDPath) fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath) return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
} }
@ -487,7 +488,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
tempObj := path.Join(tmpMetaPrefix, uploadID, "object1") tempObj := path.Join(tmpMetaPrefix, uploadID, "object1")
var buffer = make([]byte, blockSizeV1)
// Allocate 32KiB buffer for staging buffer.
var buf = make([]byte, 128*1024)
// Loop through all parts, validate them and then commit to disk. // Loop through all parts, validate them and then commit to disk.
for i, part := range parts { for i, part := range parts {
@ -509,16 +512,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
totalLeft := fsMeta.Parts[partIdx].Size totalLeft := fsMeta.Parts[partIdx].Size
for totalLeft > 0 { for totalLeft > 0 {
var n int64 var n int64
n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buffer) n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf)
if n > 0 {
if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempObj)
}
}
if err != nil { if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err == errFileNotFound { if err == errFileNotFound {
return "", InvalidPart{} return "", InvalidPart{}
} }
return "", toObjectErr(err, minioMetaBucket, multipartPartFile) return "", toObjectErr(err, minioMetaBucket, multipartPartFile)
} }
if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buffer[:n]); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempObj)
}
offset += n offset += n
totalLeft -= n totalLeft -= n
} }

View File

@ -17,8 +17,10 @@
package main package main
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"encoding/json"
"io" "io"
"os" "os"
"path" "path"
@ -45,8 +47,26 @@ func initFormatFS(storageDisk StorageAPI) error {
} }
// loads format.json from minioMetaBucket if it exists. // loads format.json from minioMetaBucket if it exists.
func loadFormatFS(storageDisk StorageAPI) ([]byte, error) { func loadFormatFS(storageDisk StorageAPI) (format formatConfigV1, err error) {
return readAll(storageDisk, minioMetaBucket, fsFormatJSONFile) // Allocate 32k buffer, this is sufficient for the most of `format.json`.
buf := make([]byte, 32*1024)
// Allocate a new `format.json` buffer writer.
var buffer = new(bytes.Buffer)
// Reads entire `format.json`.
if err = copyBuffer(buffer, storageDisk, minioMetaBucket, fsFormatJSONFile, buf); err != nil {
return formatConfigV1{}, err
}
// Unmarshal format config.
d := json.NewDecoder(buffer)
if err = d.Decode(&format); err != nil {
return formatConfigV1{}, err
}
// Return structured `format.json`.
return format, nil
} }
// Should be called when process shuts down. // Should be called when process shuts down.
@ -74,6 +94,7 @@ func newFSObjects(disk string) (ObjectLayer, error) {
// Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc.
fsHouseKeeping(storage) fsHouseKeeping(storage)
// loading format.json from minioMetaBucket. // loading format.json from minioMetaBucket.
// Note: The format.json content is ignored, reserved for future use. // Note: The format.json content is ignored, reserved for future use.
_, err = loadFormatFS(storage) _, err = loadFormatFS(storage)
@ -88,10 +109,12 @@ func newFSObjects(disk string) (ObjectLayer, error) {
return nil, err return nil, err
} }
} }
// Register the callback that should be called when the process shuts down. // Register the callback that should be called when the process shuts down.
registerShutdown(func() { registerShutdown(func() {
shutdownFS(storage) shutdownFS(storage)
}) })
// Return successfully initialized object layer. // Return successfully initialized object layer.
return fsObjects{ return fsObjects{
storage: storage, storage: storage,
@ -178,7 +201,7 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
/// Object Operations /// Object Operations
// GetObject - get an object. // GetObject - get an object.
func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) error {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket} return BucketNameInvalid{Bucket: bucket}
@ -188,26 +211,28 @@ func (fs fsObjects) GetObject(bucket, object string, startOffset int64, length i
return ObjectNameInvalid{Bucket: bucket, Object: object} return ObjectNameInvalid{Bucket: bucket, Object: object}
} }
var totalLeft = length var totalLeft = length
buf := make([]byte, 32*1024) // Allocate a 32KiB staging buffer.
for totalLeft > 0 { for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before. // Figure out the right size for the buffer.
var curBlockSize int64 var curSize int64
if blockSizeV1 < totalLeft { if blockSizeV1 < totalLeft {
curBlockSize = blockSizeV1 curSize = blockSizeV1
} else { } else {
curBlockSize = totalLeft curSize = totalLeft
} }
buf := make([]byte, curBlockSize) // Reads the file at offset.
n, err := fs.storage.ReadFile(bucket, object, startOffset, buf) n, err := fs.storage.ReadFile(bucket, object, offset, buf[:curSize])
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
_, err = writer.Write(buf[:n]) // Write to response writer.
m, err := writer.Write(buf[:n])
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
totalLeft -= n totalLeft -= int64(m)
startOffset += n offset += int64(m)
} } // Success.
return nil return nil
} }
@ -276,7 +301,7 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
} }
} else { } else {
// Allocate a buffer to Read() the object upload stream. // Allocate a buffer to Read() the object upload stream.
buf := make([]byte, blockSizeV1) buf := make([]byte, 32*1024)
// Read the buffer till io.EOF and append the read data to // Read the buffer till io.EOF and append the read data to
// the temporary file. // the temporary file.
for { for {

View File

@ -418,14 +418,9 @@ func (s *posix) ReadFile(volume string, path string, offset int64, buf []byte) (
// Close the reader. // Close the reader.
defer file.Close() defer file.Close()
// Read file. // Read full until buffer.
m, err := io.ReadFull(file, buf) m, err := io.ReadFull(file, buf)
// Error unexpected is valid, set this back to nil.
if err == io.ErrUnexpectedEOF {
err = nil
}
// Success. // Success.
return int64(m), err return int64(m), err
} }

View File

@ -16,11 +16,7 @@
package main package main
import ( import "sync"
"encoding/json"
"path"
"sync"
)
// Get the highest integer from a given integer slice. // Get the highest integer from a given integer slice.
func highestInt(intSlice []int64, highestInt int64) (highestInteger int64) { func highestInt(intSlice []int64, highestInt int64) (highestInteger int64) {
@ -54,28 +50,23 @@ func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int6
func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) { func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) {
errs := make([]error, len(xl.storageDisks)) errs := make([]error, len(xl.storageDisks))
metadataArray := make([]xlMetaV1, len(xl.storageDisks)) metadataArray := make([]xlMetaV1, len(xl.storageDisks))
xlMetaPath := path.Join(object, xlMetaJSONFile)
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Read `xl.json` parallelly across disks.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
errs[index] = errDiskNotFound errs[index] = errDiskNotFound
continue continue
} }
wg.Add(1) wg.Add(1)
// Read `xl.json` in routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
buffer, err := readAll(disk, bucket, xlMetaPath) var err error
metadataArray[index], err = readXLMeta(disk, bucket, object)
if err != nil { if err != nil {
errs[index] = err errs[index] = err
return return
} }
err = json.Unmarshal(buffer, &metadataArray[index])
if err != nil {
// Unable to parse xl.json, set error.
errs[index] = err
return
}
errs[index] = nil
}(index, disk) }(index, disk)
} }

View File

@ -201,8 +201,7 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
if disk == nil { if disk == nil {
continue continue
} }
var buf []byte xlMeta, err = readXLMeta(disk, bucket, object)
buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile))
if err != nil { if err != nil {
// For any reason disk is not available continue and read from other disks. // For any reason disk is not available continue and read from other disks.
if err == errDiskNotFound || err == errFaultyDisk { if err == errDiskNotFound || err == errFaultyDisk {
@ -210,10 +209,6 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
} }
return xlMetaV1{}, err return xlMetaV1{}, err
} }
err = json.Unmarshal(buf, &xlMeta)
if err != nil {
return xlMetaV1{}, err
}
break break
} }
return xlMeta, nil return xlMeta, nil

View File

@ -17,6 +17,7 @@
package main package main
import ( import (
"bytes"
"encoding/json" "encoding/json"
"path" "path"
"sort" "sort"
@ -69,16 +70,25 @@ func (u uploadsV1) Index(uploadID string) int {
// readUploadsJSON - get all the saved uploads JSON. // readUploadsJSON - get all the saved uploads JSON.
func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) { func readUploadsJSON(bucket, object string, disk StorageAPI) (uploadIDs uploadsV1, err error) {
// Staging buffer of 128KiB kept for reading `uploads.json`.
var buf = make([]byte, 128*1024)
// Writer holding `uploads.json` content.
var buffer = new(bytes.Buffer)
uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile)
// Read all of 'uploads.json' // Reads entire `uploads.json`.
buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath) if err = copyBuffer(buffer, disk, minioMetaBucket, uploadJSONPath, buf); err != nil {
if rErr != nil { return uploadsV1{}, err
return uploadsV1{}, rErr
} }
rErr = json.Unmarshal(buffer, &uploadIDs)
if rErr != nil { // Decode `uploads.json`.
return uploadsV1{}, rErr d := json.NewDecoder(buffer)
if err = d.Decode(&uploadIDs); err != nil {
return uploadsV1{}, err
} }
// Success.
return uploadIDs, nil return uploadIDs, nil
} }

View File

@ -18,7 +18,7 @@ package main
import ( import (
"bytes" "bytes"
"io" "encoding/json"
"math/rand" "math/rand"
"path" "path"
"time" "time"
@ -64,48 +64,25 @@ func randInts(count int) []int {
return ints return ints
} }
// readAll - returns contents from volume/path as byte array.
func readAll(disk StorageAPI, volume string, path string) ([]byte, error) {
var writer = new(bytes.Buffer)
startOffset := int64(0)
// Allocate 10MiB buffer.
buf := make([]byte, blockSizeV1)
// Read until io.EOF.
for {
n, err := disk.ReadFile(volume, path, startOffset, buf)
if err == io.EOF {
break
}
if err != nil && err != io.EOF {
return nil, err
}
writer.Write(buf[:n])
startOffset += n
}
return writer.Bytes(), nil
}
// readXLMeta reads `xl.json` returns contents as byte array. // readXLMeta reads `xl.json` returns contents as byte array.
func readXLMeta(disk StorageAPI, bucket string, object string) ([]byte, error) { func readXLMeta(disk StorageAPI, bucket string, object string) (xlMeta xlMetaV1, err error) {
var writer = new(bytes.Buffer) // Allocate 32k buffer, this is sufficient for the most of `xl.json`.
startOffset := int64(0) buf := make([]byte, 128*1024)
// Allocate 2MiB buffer, this is sufficient for the most of `xl.json`. // Allocate a new `xl.json` buffer writer.
buf := make([]byte, 2*1024*1024) var buffer = new(bytes.Buffer)
// Read until io.EOF. // Reads entire `xl.json`.
for { if err = copyBuffer(buffer, disk, bucket, path.Join(object, xlMetaJSONFile), buf); err != nil {
n, err := disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), startOffset, buf) return xlMetaV1{}, err
if err == io.EOF {
break
} }
if err != nil && err != io.EOF {
return nil, err // Unmarshal xl metadata.
d := json.NewDecoder(buffer)
if err = d.Decode(&xlMeta); err != nil {
return xlMetaV1{}, err
} }
writer.Write(buf[:n])
startOffset += n // Return structured `xl.json`.
} return xlMeta, nil
return writer.Bytes(), nil
} }