XL: Cleanup and add more comments. (#1807)

This commit is contained in:
Harshavardhana 2016-05-30 16:51:59 -07:00
parent ffc2b3c304
commit 445dc22118
13 changed files with 318 additions and 234 deletions

View File

@ -115,10 +115,8 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1)
// loadFormat - load format from disk.
func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
buffer := make([]byte, blockSizeV1)
offset := int64(0)
var n int64
n, err = disk.ReadFile(minioMetaBucket, formatConfigFile, offset, buffer)
var buffer []byte
buffer, err = readAll(disk, minioMetaBucket, formatConfigFile)
if err != nil {
// 'file not found' and 'volume not found' as
// same. 'volume not found' usually means its a fresh disk.
@ -138,7 +136,7 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
return nil, err
}
format = &formatConfigV1{}
err = json.Unmarshal(buffer[:n], format)
err = json.Unmarshal(buffer, format)
if err != nil {
return nil, err
}

View File

@ -57,12 +57,12 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
// readFSMetadata - returns the object metadata `fs.json` content.
func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) {
buffer := make([]byte, blockSizeV1)
n, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0), buffer)
var buffer []byte
buffer, err = readAll(fs.storage, bucket, path.Join(object, fsMetaJSONFile))
if err != nil {
return fsMetaV1{}, err
}
err = json.Unmarshal(buffer[:n], &fsMeta)
err = json.Unmarshal(buffer, &fsMeta)
if err != nil {
return fsMetaV1{}, err
}

View File

@ -43,8 +43,8 @@ func (fs fsObjects) isBucketExist(bucket string) bool {
return true
}
// newMultipartUploadCommon - initialize a new multipart, is a common function for both object layers.
func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// newMultipartUpload - initialize a new multipart.
func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
@ -111,8 +111,8 @@ func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, er
return uploads, nil
}
// listMultipartUploadsCommon - lists all multipart uploads, common function for both object layers.
func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
// listMultipartUploads - lists all multipart uploads.
func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -266,17 +266,17 @@ func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload
// ListMultipartUploads - list multipart uploads.
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return fs.listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
// NewMultipartUpload - initialize a new multipart upload, returns a unique id.
func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs.
return fs.newMultipartUploadCommon(bucket, object, meta)
return fs.newMultipartUpload(bucket, object, meta)
}
// putObjectPartCommon - put object part.
func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
@ -364,10 +364,10 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s
// PutObjectPart - writes the multipart upload chunks.
func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
return fs.putObjectPartCommon(bucket, object, uploadID, partID, size, data, md5Hex)
return fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex)
}
func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
@ -432,8 +432,9 @@ func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
return result, nil
}
// ListObjectParts - list all parts.
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
return fs.listObjectPartsCommon(bucket, object, uploadID, partNumberMarker, maxParts)
return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
}
// isUploadIDExists - verify if a given uploadID exists and is valid.
@ -450,6 +451,7 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool {
return true
}
// CompleteMultipartUpload - implement complete multipart upload transaction.
func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -533,9 +535,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return s3MD5, nil
}
// abortMultipartUploadCommon - aborts a multipart upload, common
// function used by both object layers.
func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string) error {
// abortMultipartUpload - aborts a multipart upload.
func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
@ -581,7 +582,7 @@ func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string)
return nil
}
// AbortMultipartUpload - aborts a multipart upload.
// AbortMultipartUpload - aborts an multipart upload.
func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
return fs.abortMultipartUploadCommon(bucket, object, uploadID)
return fs.abortMultipartUpload(bucket, object, uploadID)
}

View File

@ -57,9 +57,8 @@ func newFSObjects(disk string) (ObjectLayer, error) {
}
}
// Initialize object layer - like creating minioMetaBucket,
// cleaning up tmp files etc.
initObjectLayer(storage)
// Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc.
fsHouseKeeping(storage)
// Return successfully initialized object layer.
return fsObjects{
@ -311,7 +310,7 @@ func isBucketExist(storage StorageAPI, bucketName string) bool {
return true
}
func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Convert entry to FileInfo
entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) {
if strings.HasSuffix(entry, slashSeparator) {
@ -443,5 +442,5 @@ func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxK
// ListObjects - list all objects.
func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return fs.listObjectsFS(bucket, prefix, marker, delimiter, maxKeys)
return fs.listObjects(bucket, prefix, marker, delimiter, maxKeys)
}

View File

@ -26,8 +26,25 @@ const (
blockSizeV1 = 10 * 1024 * 1024 // 10MiB.
)
// Common initialization needed for both object layers.
func initObjectLayer(storageDisks ...StorageAPI) error {
// House keeping code needed for FS.
func fsHouseKeeping(storageDisk StorageAPI) error {
// Attempt to create `.minio`.
err := storageDisk.MakeVol(minioMetaBucket)
if err != nil {
if err != errVolumeExists && err != errDiskNotFound {
return err
}
}
// Cleanup all temp entries upon start.
err = cleanupDir(storageDisk, minioMetaBucket, tmpMetaPrefix)
if err != nil {
return err
}
return nil
}
// House keeping code needed for XL.
func xlHouseKeeping(storageDisks []StorageAPI) error {
// This happens for the first time, but keep this here since this
// is the only place where it can be made expensive optimizing all
// other calls. Create minio meta volume, if it doesn't exist yet.

View File

@ -17,7 +17,6 @@
package main
import (
"math/rand"
"sort"
"strings"
"time"
@ -79,17 +78,8 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
return nil, err
}
// getRandomDisk - gives a random disk at any point in time from the
// available pool of disks.
func (xl xlObjects) getRandomDisk() (disk StorageAPI) {
rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
randIndex := rand.Intn(len(xl.storageDisks) - 1)
disk = xl.storageDisks[randIndex] // Pick a random disk.
return disk
}
// treeWalkXL walks directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool {
// treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files.
func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -133,7 +123,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin
if recursive && !strings.HasSuffix(entry, slashSeparator) {
// We should not skip for recursive listing and if markerDir is a directory
// for ex. if marker is "four/five.txt" markerDir will be "four/" which
// should not be skipped, instead it will need to be treeWalkXL()'ed into.
// should not be skipped, instead it will need to be treeWalk()'ed into.
// Skip if it is a file though as it would be listed in previous listing.
*count--
@ -151,7 +141,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin
}
*count--
prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories.
if !xl.treeWalkXL(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) {
if !xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) {
return false
}
continue
@ -165,7 +155,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin
}
// Initiate a new treeWalk in a goroutine.
func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker {
func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
@ -202,13 +192,13 @@ func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive boo
return false
}
}
xl.treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf)
xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf)
}()
return &walkNotify
}
// Save the goroutine reference in the map
func (xl xlObjects) saveTreeWalkXL(params listParams, walker *treeWalker) {
func (xl xlObjects) saveTreeWalk(params listParams, walker *treeWalker) {
xl.listObjectMapMutex.Lock()
defer xl.listObjectMapMutex.Unlock()
@ -219,7 +209,7 @@ func (xl xlObjects) saveTreeWalkXL(params listParams, walker *treeWalker) {
}
// Lookup the goroutine reference from map
func (xl xlObjects) lookupTreeWalkXL(params listParams) *treeWalker {
func (xl xlObjects) lookupTreeWalk(params listParams) *treeWalker {
xl.listObjectMapMutex.Lock()
defer xl.listObjectMapMutex.Unlock()

112
xl-v1-common.go Normal file
View File

@ -0,0 +1,112 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"math/rand"
"path"
"sync"
"time"
)
// getRandomDisk - gives a random disk at any point in time from the
// available pool of disks.
func (xl xlObjects) getRandomDisk() (disk StorageAPI) {
rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
randIndex := rand.Intn(len(xl.storageDisks) - 1)
disk = xl.storageDisks[randIndex] // Pick a random disk.
return disk
}
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (xl xlObjects) parentDirIsObject(bucket, parent string) bool {
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." {
return false
}
if xl.isObject(bucket, p) {
// If there is already a file at prefix "p" return error.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
}
return isParentDirObject(parent)
}
func (xl xlObjects) isObject(bucket, prefix string) bool {
// Create errs and volInfo slices of storageDisks size.
var errs = make([]error, len(xl.storageDisks))
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
wg.Add(1)
// Stat file on all the disks in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
_, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile))
if err != nil {
errs[index] = err
return
}
errs[index] = nil
}(index, disk)
}
// Wait for all the Stat operations to finish.
wg.Wait()
var errFileNotFoundCount int
for _, err := range errs {
if err != nil {
if err == errFileNotFound {
errFileNotFoundCount++
// If we have errors with file not found greater than allowed read
// quorum we return err as errFileNotFound.
if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return false
}
continue
}
errorIf(err, "Unable to access file "+path.Join(bucket, prefix))
return false
}
}
return true
}
// statPart - stat a part file.
func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) {
// Count for errors encountered.
var xlJSONErrCount = 0
// Return the first success entry based on the selected random disk.
for xlJSONErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt.
disk := xl.getRandomDisk()
fileInfo, err = disk.StatFile(bucket, objectPart)
if err == nil {
return fileInfo, nil
}
xlJSONErrCount++ // Update error count.
}
return FileInfo{}, err
}

View File

@ -1,17 +1,34 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "strings"
func (xl xlObjects) listObjectsXL(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// listObjects - wrapper function implemented over file tree walk.
func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == slashSeparator {
recursive = false
}
walker := xl.lookupTreeWalkXL(listParams{bucket, recursive, marker, prefix})
walker := xl.lookupTreeWalk(listParams{bucket, recursive, marker, prefix})
if walker == nil {
walker = xl.startTreeWalkXL(bucket, prefix, marker, recursive, xl.isObject)
walker = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject)
}
var objInfos []ObjectInfo
var eof bool
@ -57,7 +74,7 @@ func (xl xlObjects) listObjectsXL(bucket, prefix, marker, delimiter string, maxK
}
params := listParams{bucket, recursive, nextMarker, prefix}
if !eof {
xl.saveTreeWalkXL(params, walker)
xl.saveTreeWalk(params, walker)
}
result := ListObjectsInfo{IsTruncated: !eof}
@ -128,7 +145,7 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
}
// Initiate a list operation, if successful filter and return quickly.
listObjInfo, err := xl.listObjectsXL(bucket, prefix, marker, delimiter, maxKeys)
listObjInfo, err := xl.listObjects(bucket, prefix, marker, delimiter, maxKeys)
if err == nil {
// We got the entries successfully return.
return listObjInfo, nil

View File

@ -18,7 +18,6 @@ package main
import (
"encoding/json"
"math/rand"
"path"
"sort"
"sync"
@ -40,6 +39,13 @@ type objectPartInfo struct {
Size int64 `json:"size"`
}
// byPartName is a collection satisfying sort.Interface.
type byPartNumber []objectPartInfo
func (t byPartNumber) Len() int { return len(t) }
func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
// A xlMetaV1 represents a metadata header mapping keys to sets of values.
type xlMetaV1 struct {
Version string `json:"version"`
@ -69,12 +75,19 @@ type xlMetaV1 struct {
Parts []objectPartInfo `json:"parts,omitempty"`
}
// byPartName is a collection satisfying sort.Interface.
type byPartNumber []objectPartInfo
func (t byPartNumber) Len() int { return len(t) }
func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
// newXLMetaV1 - initializes new xlMetaV1.
func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
xlMeta = xlMetaV1{}
xlMeta.Version = "1"
xlMeta.Format = "xl"
xlMeta.Minio.Release = minioReleaseTag
xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost
xlMeta.Erasure.DataBlocks = dataBlocks
xlMeta.Erasure.ParityBlocks = parityBlocks
xlMeta.Erasure.BlockSize = blockSizeV1
xlMeta.Erasure.Distribution = randInts(dataBlocks + parityBlocks)
return xlMeta
}
// ObjectPartIndex - returns the index of matching object part number.
func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) {
@ -139,16 +152,13 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
// Count for errors encountered.
var xlJSONErrCount = 0
// Allocate 10MiB buffer.
buffer := make([]byte, blockSizeV1)
// Return the first successful lookup from a random list of disks.
for xlJSONErrCount < len(xl.storageDisks) {
disk := xl.getRandomDisk() // Choose a random disk on each attempt.
var n int64
n, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0), buffer)
var buffer []byte
buffer, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile))
if err == nil {
err = json.Unmarshal(buffer[:n], &xlMeta)
err = json.Unmarshal(buffer, &xlMeta)
if err == nil {
return xlMeta, nil
}
@ -158,20 +168,6 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
return xlMetaV1{}, err
}
// newXLMetaV1 - initializes new xlMetaV1.
func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
xlMeta = xlMetaV1{}
xlMeta.Version = "1"
xlMeta.Format = "xl"
xlMeta.Minio.Release = minioReleaseTag
xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost
xlMeta.Erasure.DataBlocks = dataBlocks
xlMeta.Erasure.ParityBlocks = parityBlocks
xlMeta.Erasure.BlockSize = blockSizeV1
xlMeta.Erasure.Distribution = randErasureDistribution(dataBlocks + parityBlocks)
return xlMeta
}
// renameXLMetadata - renames `xl.json` from source prefix to destination prefix.
func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error {
var wg = &sync.WaitGroup{}
@ -234,6 +230,7 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
mErrs[index] = err
return
}
// Persist marshalled data.
n, mErr := disk.AppendFile(bucket, jsonFile, metadataBytes)
if mErr != nil {
mErrs[index] = mErr
@ -259,18 +256,3 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
}
return nil
}
// randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm.
func randErasureDistribution(numBlocks int) []int {
rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
distribution := make([]int, numBlocks)
for i := 0; i < numBlocks; i++ {
distribution[i] = i + 1
}
for i := 0; i < numBlocks; i++ {
// Choose index uniformly in [i, numBlocks-1]
r := i + rand.Intn(numBlocks-i)
distribution[r], distribution[i] = distribution[i], distribution[r]
}
return distribution
}

View File

@ -81,13 +81,13 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI
// Read `uploads.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
var buffer = make([]byte, blockSizeV1) // Allocate blockSized buffer.
n, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0), buffer)
// Read all of 'uploads.json'
buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath)
if rErr != nil {
errs[index] = rErr
return
}
rErr = json.Unmarshal(buffer[:n], &uploads[index])
rErr = json.Unmarshal(buffer, &uploads[index])
if rErr != nil {
errs[index] = rErr
return
@ -331,9 +331,8 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo
return uploadsInfo, nil
}
// listMultipartUploadsCommon - lists all multipart uploads, common
// function for both object layers.
func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
// listMultipartUploads - lists all multipart uploads.
func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
@ -409,9 +408,9 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload
maxUploads = maxUploads - len(uploads)
}
if maxUploads > 0 {
walker := xl.lookupTreeWalkXL(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath})
if walker == nil {
walker = xl.startTreeWalkXL(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload)
walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload)
}
for maxUploads > 0 {
walkResult, ok := <-walker.ch

View File

@ -31,13 +31,11 @@ import (
// ListMultipartUploads - list multipart uploads.
func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return xl.listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
/// Common multipart object layer functions.
// newMultipartUploadCommon - initialize a new multipart, is a common function for both object layers.
func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// newMultipartUpload - initialize a new multipart.
func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) {
// Verify if bucket name is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
@ -96,11 +94,11 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta
// NewMultipartUpload - initialize a new multipart upload, returns a unique id.
func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) {
return xl.newMultipartUploadCommon(bucket, object, meta)
return xl.newMultipartUpload(bucket, object, meta)
}
// putObjectPartCommon - put object part.
func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// putObjectPart - put object part.
func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
@ -233,11 +231,11 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
// PutObjectPart - writes the multipart upload chunks.
func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {
return xl.putObjectPartCommon(bucket, object, uploadID, partID, size, data, md5Hex)
return xl.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex)
}
// ListObjectParts - list object parts, common function across both object layers.
func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// ListObjectParts - list object parts.
func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket}
@ -319,7 +317,7 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN
// ListObjectParts - list object parts.
func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
return xl.listObjectPartsCommon(bucket, object, uploadID, partNumberMarker, maxParts)
return xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
}
func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) {
@ -476,8 +474,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return s3MD5, nil
}
// abortMultipartUploadCommon - aborts a multipart upload, common function used by both object layers.
func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) error {
// abortMultipartUpload - aborts a multipart upload.
func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
@ -528,5 +526,5 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string)
// AbortMultipartUpload - aborts a multipart upload.
func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error {
return xl.abortMultipartUploadCommon(bucket, object, uploadID)
return xl.abortMultipartUpload(bucket, object, uploadID)
}

View File

@ -1,85 +1,59 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"path"
"sync"
"bytes"
"io"
"math/rand"
"time"
)
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (xl xlObjects) parentDirIsObject(bucket, parent string) bool {
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." {
return false
}
if xl.isObject(bucket, p) {
// If there is already a file at prefix "p" return error.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
// randInts - uses Knuth Fisher-Yates shuffle algorithm for generating uniform shuffling.
func randInts(count int) []int {
rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
ints := make([]int, count)
for i := 0; i < count; i++ {
ints[i] = i + 1
}
return isParentDirObject(parent)
for i := 0; i < count; i++ {
// Choose index uniformly in [i, count-1]
r := i + rand.Intn(count-i)
ints[r], ints[i] = ints[i], ints[r]
}
return ints
}
func (xl xlObjects) isObject(bucket, prefix string) bool {
// Create errs and volInfo slices of storageDisks size.
var errs = make([]error, len(xl.storageDisks))
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
wg.Add(1)
// Stat file on all the disks in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
_, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile))
if err != nil {
errs[index] = err
return
}
errs[index] = nil
}(index, disk)
}
// Wait for all the Stat operations to finish.
wg.Wait()
var errFileNotFoundCount int
for _, err := range errs {
if err != nil {
if err == errFileNotFound {
errFileNotFoundCount++
// If we have errors with file not found greater than allowed read
// quorum we return err as errFileNotFound.
if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return false
}
continue
}
errorIf(err, "Unable to access file "+path.Join(bucket, prefix))
return false
// readAll reads from bucket, object until an error or returns the data it read until io.EOF.
func readAll(disk StorageAPI, bucket, object string) ([]byte, error) {
var writer = new(bytes.Buffer)
startOffset := int64(0)
// Read until io.EOF.
for {
buf := make([]byte, blockSizeV1)
n, err := disk.ReadFile(bucket, object, startOffset, buf)
if err == io.EOF {
break
}
}
return true
}
// statPart - stat a part file.
func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) {
// Count for errors encountered.
var xlJSONErrCount = 0
// Return the first success entry based on the selected random disk.
for xlJSONErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt.
disk := xl.getRandomDisk()
fileInfo, err = disk.StatFile(bucket, objectPart)
if err == nil {
return fileInfo, nil
if err != nil && err != io.EOF {
return nil, err
}
xlJSONErrCount++ // Update error count.
writer.Write(buf[:n])
startOffset += n
}
return FileInfo{}, err
return writer.Bytes(), nil
}

View File

@ -27,32 +27,38 @@ import (
"github.com/minio/minio/pkg/disk"
)
// XL constants.
const (
// Format config file carries backend format specific details.
formatConfigFile = "format.json"
xlMetaJSONFile = "xl.json"
uploadsJSONFile = "uploads.json"
// XL metadata file carries per object metadata.
xlMetaJSONFile = "xl.json"
// Uploads metadata file carries per multipart object metadata.
uploadsJSONFile = "uploads.json"
)
// xlObjects - Implements fs object layer.
// xlObjects - Implements XL object layer.
type xlObjects struct {
storageDisks []StorageAPI
physicalDisks []string
dataBlocks int
parityBlocks int
readQuorum int
writeQuorum int
storageDisks []StorageAPI // Collection of initialized backend disks.
physicalDisks []string // Collection of regular disks.
dataBlocks int // dataBlocks count caculated for erasure.
parityBlocks int // parityBlocks count calculated for erasure.
readQuorum int // readQuorum minimum required disks to read data.
writeQuorum int // writeQuorum minimum required disks to write data.
// List pool management.
listObjectMap map[listParams][]*treeWalker
listObjectMapMutex *sync.Mutex
}
// errMaxDisks - returned for reached maximum of disks.
var errMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'")
// errXLMaxDisks - returned for reached maximum of disks.
var errXLMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'")
// errMinDisks - returned for minimum number of disks.
var errMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'")
// errXLMinDisks - returned for minimum number of disks.
var errXLMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'")
// errNumDisks - returned for odd number of disks.
var errNumDisks = errors.New("Number of disks should be multiples of '2'")
// errXLNumDisks - returned for odd number of disks.
var errXLNumDisks = errors.New("Number of disks should be multiples of '2'")
const (
// Maximum erasure blocks.
@ -61,14 +67,15 @@ const (
minErasureBlocks = 8
)
// Validate if input disks are sufficient for initializing XL.
func checkSufficientDisks(disks []string) error {
// Verify total number of disks.
totalDisks := len(disks)
if totalDisks > maxErasureBlocks {
return errMaxDisks
return errXLMaxDisks
}
if totalDisks < minErasureBlocks {
return errMinDisks
return errXLMinDisks
}
// isEven function to verify if a given number if even.
@ -77,16 +84,16 @@ func checkSufficientDisks(disks []string) error {
}
// Verify if we have even number of disks.
// only combination of 8, 10, 12, 14, 16 are supported.
// only combination of 8, 12, 16 are supported.
if !isEven(totalDisks) {
return errNumDisks
return errXLNumDisks
}
return nil
}
// Depending on the disk type network or local, initialize storage layer.
func newStorageLayer(disk string) (storage StorageAPI, err error) {
// Depending on the disk type network or local, initialize storage API.
func newStorageAPI(disk string) (storage StorageAPI, err error) {
if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" {
// Initialize filesystem storage API.
return newPosix(disk)
@ -95,37 +102,27 @@ func newStorageLayer(disk string) (storage StorageAPI, err error) {
return newRPCClient(disk)
}
// Initialize all storage disks to bootstrap.
func bootstrapDisks(disks []string) ([]StorageAPI, error) {
storageDisks := make([]StorageAPI, len(disks))
for index, disk := range disks {
var err error
// Intentionally ignore disk not found errors while
// initializing POSIX, so that we have successfully
// initialized posix Storage. Subsequent calls to XL/Erasure
// will manage any errors related to disks.
storageDisks[index], err = newStorageLayer(disk)
if err != nil && err != errDiskNotFound {
return nil, err
}
}
return storageDisks, nil
}
// newXLObjects - initialize new xl object layer.
func newXLObjects(disks []string) (ObjectLayer, error) {
// Validate if input disks are sufficient.
if err := checkSufficientDisks(disks); err != nil {
return nil, err
}
// Bootstrap disks.
storageDisks, err := bootstrapDisks(disks)
if err != nil {
return nil, err
storageDisks := make([]StorageAPI, len(disks))
for index, disk := range disks {
var err error
// Intentionally ignore disk not found errors. XL will
// manage such errors internally.
storageDisks[index], err = newStorageAPI(disk)
if err != nil && err != errDiskNotFound {
return nil, err
}
}
// Initialize object layer - like creating minioMetaBucket, cleaning up tmp files etc.
initObjectLayer(storageDisks...)
// Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc.
xlHouseKeeping(storageDisks)
// Load saved XL format.json and validate.
newPosixDisks, err := loadFormatXL(storageDisks)