From b2293c2bf408cafc77edcac1ca74e06a70791ea4 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 26 May 2016 03:15:01 -0700 Subject: [PATCH] XL: Rename, cleanup and add more comments. (#1769) - xl-v1-bucket.go - removes a whole bunch of code. - {xl-v1,fs-v1}-metadata.go - add a lot of comments and rename functions appropriately. --- docs/backend/README.md | 0 docs/backend/{json-files => }/fs/format.json | 0 docs/backend/{json-files => }/fs/fs.json | 0 docs/backend/{json-files => }/fs/uploads.json | 0 docs/backend/{json-files => }/xl/format.json | 0 docs/backend/{json-files => }/xl/uploads.json | 0 docs/backend/{json-files => }/xl/xl.json | 0 erasure.go | 9 +- fs-v1-metadata.go | 51 ++-- fs-v1-multipart.go | 9 +- httprange.go | 7 +- object-handlers.go | 2 +- tree-walk-xl.go | 5 +- xl-v1-bucket.go | 242 ++++-------------- xl-v1-metadata.go | 97 ++++--- xl-v1-multipart-common.go | 7 +- xl-v1-multipart.go | 37 +-- xl-v1-object.go | 27 +- xl-v1-utils.go | 4 +- 19 files changed, 207 insertions(+), 290 deletions(-) create mode 100644 docs/backend/README.md rename docs/backend/{json-files => }/fs/format.json (100%) rename docs/backend/{json-files => }/fs/fs.json (100%) rename docs/backend/{json-files => }/fs/uploads.json (100%) rename docs/backend/{json-files => }/xl/format.json (100%) rename docs/backend/{json-files => }/xl/uploads.json (100%) rename docs/backend/{json-files => }/xl/xl.json (100%) diff --git a/docs/backend/README.md b/docs/backend/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/backend/json-files/fs/format.json b/docs/backend/fs/format.json similarity index 100% rename from docs/backend/json-files/fs/format.json rename to docs/backend/fs/format.json diff --git a/docs/backend/json-files/fs/fs.json b/docs/backend/fs/fs.json similarity index 100% rename from docs/backend/json-files/fs/fs.json rename to docs/backend/fs/fs.json diff --git a/docs/backend/json-files/fs/uploads.json b/docs/backend/fs/uploads.json similarity index 100% rename from docs/backend/json-files/fs/uploads.json rename to docs/backend/fs/uploads.json diff --git a/docs/backend/json-files/xl/format.json b/docs/backend/xl/format.json similarity index 100% rename from docs/backend/json-files/xl/format.json rename to docs/backend/xl/format.json diff --git a/docs/backend/json-files/xl/uploads.json b/docs/backend/xl/uploads.json similarity index 100% rename from docs/backend/json-files/xl/uploads.json rename to docs/backend/xl/uploads.json diff --git a/docs/backend/json-files/xl/xl.json b/docs/backend/xl/xl.json similarity index 100% rename from docs/backend/json-files/xl/xl.json rename to docs/backend/xl/xl.json diff --git a/erasure.go b/erasure.go index 1eb04b807..f41a1eb40 100644 --- a/erasure.go +++ b/erasure.go @@ -16,11 +16,7 @@ package main -import ( - "errors" - - "github.com/klauspost/reedsolomon" -) +import "github.com/klauspost/reedsolomon" // erasure storage layer. type erasure struct { @@ -30,9 +26,6 @@ type erasure struct { storageDisks []StorageAPI } -// errUnexpected - returned for any unexpected error. -var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues") - // newErasure instantiate a new erasure. func newErasure(disks []StorageAPI) *erasure { // Initialize E. diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index 2e5bab9eb..a9c980eac 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -44,28 +44,42 @@ func (m fsMetaV1) WriteTo(writer io.Writer) (n int64, err error) { return int64(p), err } -// SearchObjectPart - search object part name and etag. -func (m fsMetaV1) SearchObjectPart(number int) int { +// ObjectPartIndex - returns the index of matching object part number. +func (m fsMetaV1) ObjectPartIndex(partNumber int) (partIndex int) { for i, part := range m.Parts { - if number == part.Number { - return i + if partNumber == part.Number { + partIndex = i + return partIndex } } return -1 } // AddObjectPart - add a new object part in order. -func (m *fsMetaV1) AddObjectPart(number int, name string, etag string, size int64) { - m.Parts = append(m.Parts, objectPartInfo{ - Number: number, - Name: name, - ETag: etag, - Size: size, - }) +func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) { + partInfo := objectPartInfo{ + Number: partNumber, + Name: partName, + ETag: partETag, + Size: partSize, + } + + // Update part info if it already exists. + for i, part := range m.Parts { + if partNumber == part.Number { + m.Parts[i] = partInfo + return + } + } + + // Proceed to include new part info. + m.Parts = append(m.Parts, partInfo) + + // Parts in fsMeta should be in sorted order by part number. sort.Sort(byPartNumber(m.Parts)) } -// readFSMetadata - read `fs.json`. +// readFSMetadata - returns the object metadata `fs.json` content. func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) { r, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0)) if err != nil { @@ -79,10 +93,17 @@ func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err return fsMeta, nil } -// writeFSMetadata - write `fs.json`. -func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) error { - // Initialize metadata map, save all erasure related metadata. +// newFSMetaV1 - initializes new fsMetaV1. +func newFSMetaV1() (fsMeta fsMetaV1) { + fsMeta = fsMetaV1{} + fsMeta.Version = "1" + fsMeta.Format = "fs" fsMeta.Minio.Release = minioReleaseTag + return fsMeta +} + +// writeFSMetadata - writes `fs.json` metadata. +func (fs fsObjects) writeFSMetadata(bucket, prefix string, fsMeta fsMetaV1) error { w, err := fs.storage.CreateFile(bucket, path.Join(prefix, fsMetaJSONFile)) if err != nil { return err diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index a42d055c7..da53477c9 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -63,9 +63,8 @@ func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta meta = make(map[string]string) } - fsMeta := fsMetaV1{} - fsMeta.Format = "fs" - fsMeta.Version = "1" + // Initialize `fs.json` values. + fsMeta := newFSMetaV1() // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) @@ -454,7 +453,7 @@ func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partN return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath) } // Only parts with higher part numbers will be listed. - partIdx := fsMeta.SearchObjectPart(partNumberMarker) + partIdx := fsMeta.ObjectPartIndex(partNumberMarker) parts := fsMeta.Parts if partIdx != -1 { parts = fsMeta.Parts[partIdx+1:] @@ -642,7 +641,7 @@ func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string) // the object, if yes do not attempt to delete 'uploads.json'. uploadIDs, err := getUploadIDs(bucket, object, fs.storage) if err == nil { - uploadIDIdx := uploadIDs.SearchUploadID(uploadID) + uploadIDIdx := uploadIDs.Index(uploadID) if uploadIDIdx != -1 { uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) } diff --git a/httprange.go b/httprange.go index b00a37771..4bb23806e 100644 --- a/httprange.go +++ b/httprange.go @@ -28,13 +28,10 @@ const ( ) // InvalidRange - invalid range -type InvalidRange struct { - Start int64 - Length int64 -} +type InvalidRange struct{} func (e InvalidRange) Error() string { - return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length) + return "The requested range is not satisfiable" } // HttpRange specifies the byte range to be sent to the client. diff --git a/object-handlers.go b/object-handlers.go index b67fc2b2f..f20cc2eb6 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -136,7 +136,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(w, r, apiErr, r.URL.Path) return } - defer readCloser.Close() // Close after this handler returns. + defer readCloser.Close() // Set standard object headers. setObjectHeaders(w, objInfo, hrange) diff --git a/tree-walk-xl.go b/tree-walk-xl.go index 7d4066217..ea55c60c3 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -54,9 +54,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) // Return the first success entry based on the selected random disk. for listErrCount < len(xl.storageDisks) { - // Choose a random disk on each attempt, do not hit the same disk all the time. - randIndex := rand.Intn(len(xl.storageDisks) - 1) - disk := xl.storageDisks[randIndex] // Pick a random disk. + disk := xl.getRandomDisk() // Choose a random disk on each attempt. if entries, err = disk.ListDir(bucket, prefixDir); err == nil { // Skip the entries which do not match the filter. for i, entry := range entries { @@ -85,6 +83,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) // getRandomDisk - gives a random disk at any point in time from the // available pool of disks. func (xl xlObjects) getRandomDisk() (disk StorageAPI) { + rand.Seed(time.Now().UTC().UnixNano()) randIndex := rand.Intn(len(xl.storageDisks) - 1) disk = xl.storageDisks[randIndex] // Pick a random disk. return disk diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index 001cab790..db2d972de 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -64,234 +64,102 @@ func (xl xlObjects) MakeBucket(bucket string) error { if volumeExistsErrCnt == len(xl.storageDisks) { return toObjectErr(errVolumeExists, bucket) } else if createVolErr > len(xl.storageDisks)-xl.writeQuorum { - // Return errWriteQuorum if errors were more than - // allowed write quorum. + // Return errWriteQuorum if errors were more than allowed write quorum. return toObjectErr(errWriteQuorum, bucket) } return nil } -// getAllBucketInfo - list bucket info from all disks. -// Returns error slice indicating the failed volume stat operations. -func (xl xlObjects) getAllBucketInfo(bucketName string) ([]BucketInfo, []error) { - // Create errs and volInfo slices of storageDisks size. - var errs = make([]error, len(xl.storageDisks)) - var volsInfo = make([]VolInfo, len(xl.storageDisks)) +// getBucketInfo - returns the BucketInfo from one of the disks picked +// at random. +func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) { + // Count for errors encountered. + var bucketErrCount = 0 - // Allocate a new waitgroup. - var wg = &sync.WaitGroup{} - for index, disk := range xl.storageDisks { - wg.Add(1) - // Stat volume on all the disks in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - volInfo, err := disk.StatVol(bucketName) - if err != nil { - errs[index] = err - return - } - volsInfo[index] = volInfo - errs[index] = nil - }(index, disk) - } - - // Wait for all the Stat operations to finish. - wg.Wait() - - // Return the concocted values. - var bucketsInfo = make([]BucketInfo, len(xl.storageDisks)) - for _, volInfo := range volsInfo { - if IsValidBucketName(volInfo.Name) { - bucketsInfo = append(bucketsInfo, BucketInfo{ + // Return the first successful lookup from a random list of disks. + for bucketErrCount < len(xl.storageDisks) { + disk := xl.getRandomDisk() // Choose a random disk on each attempt. + var volInfo VolInfo + volInfo, err = disk.StatVol(bucketName) + if err == nil { + bucketInfo = BucketInfo{ Name: volInfo.Name, Created: volInfo.Created, - }) - } - } - return bucketsInfo, errs -} - -// listAllBucketInfo - list all stat volume info from all disks. -// Returns -// - stat volume info for all online disks. -// - boolean to indicate if healing is necessary. -// - error if any. -func (xl xlObjects) listAllBucketInfo(bucketName string) ([]BucketInfo, bool, error) { - bucketsInfo, errs := xl.getAllBucketInfo(bucketName) - notFoundCount := 0 - for _, err := range errs { - if err == errVolumeNotFound { - notFoundCount++ - // If we have errors with file not found greater than allowed read - // quorum we return err as errFileNotFound. - if notFoundCount > len(xl.storageDisks)-xl.readQuorum { - return nil, false, errVolumeNotFound } + return bucketInfo, nil } + bucketErrCount++ // Update error count. } - - // Calculate online disk count. - onlineDiskCount := 0 - for index := range errs { - if errs[index] == nil { - onlineDiskCount++ - } - } - - var heal bool - // If online disks count is lesser than configured disks, most - // probably we need to heal the file, additionally verify if the - // count is lesser than readQuorum, if not we throw an error. - if onlineDiskCount < len(xl.storageDisks) { - // Online disks lesser than total storage disks, needs to be - // healed. unless we do not have readQuorum. - heal = true - // Verify if online disks count are lesser than readQuorum - // threshold, return an error if yes. - if onlineDiskCount < xl.readQuorum { - return nil, false, errReadQuorum - } - } - - // Return success. - return bucketsInfo, heal, nil + return BucketInfo{}, err } // Checks whether bucket exists. -func (xl xlObjects) isBucketExist(bucketName string) bool { +func (xl xlObjects) isBucketExist(bucket string) bool { + nsMutex.RLock(bucket, "") + defer nsMutex.RUnlock(bucket, "") + // Check whether bucket exists. - _, _, err := xl.listAllBucketInfo(bucketName) + _, err := xl.getBucketInfo(bucket) if err != nil { if err == errVolumeNotFound { return false } - errorIf(err, "Stat failed on bucket "+bucketName+".") + errorIf(err, "Stat failed on bucket "+bucket+".") return false } return true } -// GetBucketInfo - get bucket info. +// GetBucketInfo - returns BucketInfo for a bucket. func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return BucketInfo{}, BucketNameInvalid{Bucket: bucket} } - nsMutex.RLock(bucket, "") defer nsMutex.RUnlock(bucket, "") - - // List and figured out if we need healing. - bucketsInfo, heal, err := xl.listAllBucketInfo(bucket) + bucketInfo, err := xl.getBucketInfo(bucket) if err != nil { return BucketInfo{}, toObjectErr(err, bucket) } + return bucketInfo, nil +} - // Heal for missing entries. - if heal { - go func() { - // Create bucket if missing on disks. - for index, bktInfo := range bucketsInfo { - if bktInfo.Name != "" { +// listBuckets - returns list of all buckets from a disk picked at random. +func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) { + // Count for errors encountered. + var listBucketsErrCount = 0 + + // Return the first successful lookup from a random list of disks. + for listBucketsErrCount < len(xl.storageDisks) { + disk := xl.getRandomDisk() // Choose a random disk on each attempt. + var volsInfo []VolInfo + volsInfo, err = disk.ListVols() + if err == nil { + // NOTE: The assumption here is that volumes across all disks in + // readQuorum have consistent view i.e they all have same number + // of buckets. This is essentially not verified since healing + // should take care of this. + var bucketsInfo []BucketInfo + for _, volInfo := range volsInfo { + // StorageAPI can send volume names which are incompatible + // with buckets, handle it and skip them. + if !IsValidBucketName(volInfo.Name) { continue } - // Bucketinfo name would be an empty string, create it. - xl.storageDisks[index].MakeVol(bucket) + bucketsInfo = append(bucketsInfo, BucketInfo{ + Name: volInfo.Name, + Created: volInfo.Created, + }) } - }() - } - - // From all bucketsInfo, calculate the actual usage values. - var total, free int64 - var bucketInfo BucketInfo - for _, bucketInfo = range bucketsInfo { - if bucketInfo.Name == "" { - continue + return bucketsInfo, nil } - free += bucketInfo.Free - total += bucketInfo.Total + listBucketsErrCount++ // Update error count. } - - // Update the aggregated values. - bucketInfo.Free = free - bucketInfo.Total = total - - return BucketInfo{ - Name: bucket, - Created: bucketInfo.Created, - Total: bucketInfo.Total, - Free: bucketInfo.Free, - }, nil + return nil, err } -func (xl xlObjects) listBuckets() ([]BucketInfo, error) { - // Initialize sync waitgroup. - var wg = &sync.WaitGroup{} - - // Success vols map carries successful results of ListVols from each disks. - var successVols = make([][]VolInfo, len(xl.storageDisks)) - for index, disk := range xl.storageDisks { - wg.Add(1) // Add each go-routine to wait for. - go func(index int, disk StorageAPI) { - // Indicate wait group as finished. - defer wg.Done() - - // Initiate listing. - volsInfo, _ := disk.ListVols() - successVols[index] = volsInfo - }(index, disk) - } - - // Wait for all the list volumes running in parallel to finish. - wg.Wait() - - // From success vols map calculate aggregated usage values. - var volsInfo []VolInfo - var total, free int64 - for _, volsInfo = range successVols { - var volInfo VolInfo - for _, volInfo = range volsInfo { - if volInfo.Name == "" { - continue - } - if !IsValidBucketName(volInfo.Name) { - continue - } - break - } - free += volInfo.Free - total += volInfo.Total - } - - // Save the updated usage values back into the vols. - for index, volInfo := range volsInfo { - volInfo.Free = free - volInfo.Total = total - volsInfo[index] = volInfo - } - - // NOTE: The assumption here is that volumes across all disks in - // readQuorum have consistent view i.e they all have same number - // of buckets. This is essentially not verified since healing - // should take care of this. - var bucketsInfo []BucketInfo - for _, volInfo := range volsInfo { - // StorageAPI can send volume names which are incompatible - // with buckets, handle it and skip them. - if !IsValidBucketName(volInfo.Name) { - continue - } - bucketsInfo = append(bucketsInfo, BucketInfo{ - Name: volInfo.Name, - Created: volInfo.Created, - Total: volInfo.Total, - Free: volInfo.Free, - }) - } - return bucketsInfo, nil -} - -// ListBuckets - list buckets. +// ListBuckets - lists all the buckets, sorted by its name. func (xl xlObjects) ListBuckets() ([]BucketInfo, error) { bucketInfos, err := xl.listBuckets() if err != nil { @@ -302,7 +170,7 @@ func (xl xlObjects) ListBuckets() ([]BucketInfo, error) { return bucketInfos, nil } -// DeleteBucket - delete a bucket. +// DeleteBucket - deletes a bucket. func (xl xlObjects) DeleteBucket(bucket string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 65c447993..3abf00557 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -100,63 +100,73 @@ func (t byPartNumber) Len() int { return len(t) } func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number } -// SearchObjectPart - searches for part name and etag, returns the -// index if found. -func (m xlMetaV1) SearchObjectPart(number int) int { +// ObjectPartIndex - returns the index of matching object part number. +func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) { for i, part := range m.Parts { - if number == part.Number { - return i + if partNumber == part.Number { + index = i + return index } } return -1 } // AddObjectPart - add a new object part in order. -func (m *xlMetaV1) AddObjectPart(number int, name string, etag string, size int64) { +func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) { partInfo := objectPartInfo{ - Number: number, - Name: name, - ETag: etag, - Size: size, + Number: partNumber, + Name: partName, + ETag: partETag, + Size: partSize, } + + // Update part info if it already exists. for i, part := range m.Parts { - if number == part.Number { + if partNumber == part.Number { m.Parts[i] = partInfo return } } + + // Proceed to include new part info. m.Parts = append(m.Parts, partInfo) + + // Parts in xlMeta should be in sorted order by part number. sort.Sort(byPartNumber(m.Parts)) } -// getPartIndexOffset - given an offset for the whole object, return the part and offset in that part. -func (m xlMetaV1) getPartIndexOffset(offset int64) (partIndex int, partOffset int64, err error) { +// objectToPartOffset - translate offset of an object to offset of its individual part. +func (m xlMetaV1) objectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) { partOffset = offset + // Seek until object offset maps to a particular part offset. for i, part := range m.Parts { partIndex = i + // Last part can be of '0' bytes, treat it specially and + // return right here. if part.Size == 0 { return partIndex, partOffset, nil } + // Offset is smaller than size we have reached the proper part offset. if partOffset < part.Size { return partIndex, partOffset, nil } + // Continue to towards the next part. partOffset -= part.Size } - // Offset beyond the size of the object - err = errUnexpected - return 0, 0, err + // Offset beyond the size of the object return InvalidRange. + return 0, 0, InvalidRange{} } -// readXLMetadata - read xl metadata. +// readXLMetadata - returns the object metadata `xl.json` content from +// one of the disks picked at random. func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) { // Count for errors encountered. var xlJSONErrCount = 0 - // Return the first success entry based on the selected random disk. + // Return the first successful lookup from a random list of disks. for xlJSONErrCount < len(xl.storageDisks) { var r io.ReadCloser - // Choose a random disk on each attempt, do not hit the same disk all the time. - disk := xl.getRandomDisk() // Pick a random disk. + disk := xl.getRandomDisk() // Choose a random disk on each attempt. r, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0)) if err == nil { defer r.Close() @@ -170,23 +180,29 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err return xlMetaV1{}, err } -// writeXLJson - write `xl.json` on all disks in order. -func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { - var wg = &sync.WaitGroup{} - var mErrs = make([]error, len(xl.storageDisks)) - - // Initialize metadata map, save all erasure related metadata. +// newXLMetaV1 - initializes new xlMetaV1. +func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { + xlMeta = xlMetaV1{} xlMeta.Version = "1" xlMeta.Format = "xl" xlMeta.Minio.Release = minioReleaseTag xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost - xlMeta.Erasure.DataBlocks = xl.dataBlocks - xlMeta.Erasure.ParityBlocks = xl.parityBlocks + xlMeta.Erasure.DataBlocks = dataBlocks + xlMeta.Erasure.ParityBlocks = parityBlocks xlMeta.Erasure.BlockSize = erasureBlockSize - xlMeta.Erasure.Distribution = xl.getDiskDistribution() + xlMeta.Erasure.Distribution = randErasureDistribution(dataBlocks + parityBlocks) + return xlMeta +} +// writeXLMetadata - write `xl.json` on all disks in order. +func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { + var wg = &sync.WaitGroup{} + var mErrs = make([]error, len(xl.storageDisks)) + + // Start writing `xl.json` to all disks in parallel. for index, disk := range xl.storageDisks { wg.Add(1) + // Write `xl.json` in a routine. go func(index int, disk StorageAPI, metadata xlMetaV1) { defer wg.Done() @@ -197,8 +213,10 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro return } - // Save the order. + // Save the disk order index. metadata.Erasure.Index = index + 1 + + // Marshal metadata to the writer. _, mErr = metadata.WriteTo(metaWriter) if mErr != nil { if mErr = safeCloseAndRemove(metaWriter); mErr != nil { @@ -208,6 +226,7 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro mErrs[index] = mErr return } + // Verify if close fails with an error. if mErr = metaWriter.Close(); mErr != nil { if mErr = safeCloseAndRemove(metaWriter); mErr != nil { mErrs[index] = mErr @@ -223,7 +242,6 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro // Wait for all the routines. wg.Wait() - // FIXME: check for quorum. // Return the first error. for _, err := range mErrs { if err == nil { @@ -234,11 +252,18 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro return nil } -// getDiskDistribution - get disk distribution. -func (xl xlObjects) getDiskDistribution() []int { - var distribution = make([]int, len(xl.storageDisks)) - for index := range xl.storageDisks { - distribution[index] = index + 1 +// randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm. +func randErasureDistribution(numBlocks int) []int { + distribution := make([]int, numBlocks) + for i := 0; i < numBlocks; i++ { + distribution[i] = i + 1 } + /* + for i := 0; i < numBlocks; i++ { + // Choose index uniformly in [i, numBlocks-1] + r := i + rand.Intn(numBlocks-i) + distribution[r], distribution[i] = distribution[i], distribution[r] + } + */ return distribution } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 3aecfae87..8c39fc602 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -60,7 +60,8 @@ func (u *uploadsV1) AddUploadID(uploadID string, initiated time.Time) { sort.Sort(byInitiatedTime(u.Uploads)) } -func (u uploadsV1) SearchUploadID(uploadID string) int { +// Index - returns the index of matching the upload id. +func (u uploadsV1) Index(uploadID string) int { for i, u := range u.Uploads { if u.UploadID == uploadID { return i @@ -90,7 +91,7 @@ func (u uploadsV1) WriteTo(writer io.Writer) (n int64, err error) { return int64(m), err } -// getUploadIDs - get saved upload id's. +// getUploadIDs - get all the saved upload id's. func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) { uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) var errs = make([]error, len(storageDisks)) @@ -258,7 +259,7 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora // listUploadsInfo - list all uploads info. func (xl xlObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { - disk := xl.getRandomDisk() + disk := xl.getRandomDisk() // Choose a random disk on each attempt. splitPrefixes := strings.SplitN(prefixPath, "/", 3) uploadIDs, err := getUploadIDs(splitPrefixes[1], splitPrefixes[2], disk) if err != nil { diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 380da085f..6e616e747 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -57,7 +57,7 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta meta = make(map[string]string) } - xlMeta := xlMetaV1{} + xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) // If not set default to "application/octet-stream" if meta["content-type"] == "" { contentType := "application/octet-stream" @@ -125,11 +125,18 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } + // List all online disks. onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) if err != nil { return "", toObjectErr(err, bucket, object) } + // Increment version only if we have online disks less than configured storage disks. if diskCount(onlineDisks) < len(xl.storageDisks) { higherVersion++ } @@ -193,21 +200,18 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s return "", err } - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } - xlMeta.Stat.Version = higherVersion - xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) - partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) if err != nil { return "", toObjectErr(err, minioMetaBucket, partPath) } - if err = xl.writeXLMetadata(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID), xlMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID)) + + // Once part is successfully committed, proceed with updating XL metadata. + xlMeta.Stat.Version = higherVersion + xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) + + if err = xl.writeXLMetadata(minioMetaBucket, uploadIDPath, xlMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } return newMD5Hex, nil } @@ -261,7 +265,7 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN } // Only parts with higher part numbers will be listed. - partIdx := xlMeta.SearchObjectPart(partNumberMarker) + partIdx := xlMeta.ObjectPartIndex(partNumberMarker) parts := xlMeta.Parts if partIdx != -1 { parts = xlMeta.Parts[partIdx+1:] @@ -349,7 +353,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Loop through all parts, validate them and then commit to disk. for i, part := range parts { - partIdx := currentXLMeta.SearchObjectPart(part.PartNumber) + partIdx := currentXLMeta.ObjectPartIndex(part.PartNumber) if partIdx == -1 { return "", InvalidPart{} } @@ -414,7 +418,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // the object, if yes do not attempt to delete 'uploads.json'. uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...) if err == nil { - uploadIDIdx := uploadIDs.SearchUploadID(uploadID) + uploadIDIdx := uploadIDs.Index(uploadID) if uploadIDIdx != -1 { uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) } @@ -435,8 +439,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } -// abortMultipartUploadCommon - aborts a multipart upload, common -// function used by both object layers. +// abortMultipartUploadCommon - aborts a multipart upload, common function used by both object layers. func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -465,7 +468,7 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) // the object, if yes do not attempt to delete 'uploads.json'. uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...) if err == nil { - uploadIDIdx := uploadIDs.SearchUploadID(uploadID) + uploadIDIdx := uploadIDs.Index(uploadID) if uploadIDIdx != -1 { uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) } diff --git a/xl-v1-object.go b/xl-v1-object.go index bb08bee69..88313dcef 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -13,6 +13,12 @@ import ( "github.com/minio/minio/pkg/mimedb" ) +// nullReadCloser - returns 0 bytes and io.EOF upon first read attempt. +type nullReadCloser struct{} + +func (n nullReadCloser) Read([]byte) (int, error) { return 0, io.EOF } +func (n nullReadCloser) Close() error { return nil } + /// Object Operations // GetObject - get an object. @@ -35,16 +41,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read if err != nil { return nil, toObjectErr(err, bucket, object) } - // List all online disks. onlineDisks, _, err := xl.listOnlineDisks(bucket, object) if err != nil { return nil, toObjectErr(err, bucket, object) } + // For zero byte files, return a null reader. + if xlMeta.Stat.Size == 0 { + return nullReadCloser{}, nil + } erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks // Get part index offset. - partIndex, offset, err := xlMeta.getPartIndexOffset(startOffset) + partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset) if err != nil { return nil, toObjectErr(err, bucket, object) } @@ -59,13 +68,14 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read defer nsMutex.RUnlock(bucket, object) for ; partIndex < len(xlMeta.Parts); partIndex++ { part := xlMeta.Parts[partIndex] - r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size) + r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) if err != nil { fileWriter.CloseWithError(toObjectErr(err, bucket, object)) return } - // Reset offset to 0 as it would be non-0 only for the first loop if startOffset is non-0. - offset = 0 + // Reset part offset to 0 to read rest of the parts from + // the beginning. + partOffset = 0 if _, err = io.Copy(fileWriter, r); err != nil { switch reader := r.(type) { case *io.PipeReader: @@ -76,7 +86,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read fileWriter.CloseWithError(toObjectErr(err, bucket, object)) return } - // Close the readerCloser that reads multiparts of an object from the xl storage layer. + // Close the readerCloser that reads multiparts of an object. // Not closing leaks underlying file descriptors. r.Close() } @@ -198,12 +208,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1") tempObj := path.Join(tmpMetaPrefix, bucket, object) + // List all online disks. onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object) if err != nil { return "", toObjectErr(err, bucket, object) } + // Increment version only if we have online disks less than configured storage disks. if diskCount(onlineDisks) < len(xl.storageDisks) { - // Increment version only if we have online disks less than configured storage disks. higherVersion++ } erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks @@ -290,7 +301,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. return "", toObjectErr(err, bucket, object) } - xlMeta := xlMetaV1{} + xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) xlMeta.Meta = metadata xlMeta.Stat.Size = size xlMeta.Stat.ModTime = modTime diff --git a/xl-v1-utils.go b/xl-v1-utils.go index bb1edcb0f..beed862a8 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -73,8 +73,8 @@ func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err // Return the first success entry based on the selected random disk. for xlJSONErrCount < len(xl.storageDisks) { - // Choose a random disk on each attempt, do not hit the same disk all the time. - disk := xl.getRandomDisk() // Pick a random disk. + // Choose a random disk on each attempt. + disk := xl.getRandomDisk() fileInfo, err = disk.StatFile(bucket, objectPart) if err == nil { return fileInfo, nil