mirror of
https://github.com/minio/minio.git
synced 2025-03-31 01:33:41 -04:00
xl: refactor functions to xl-v1-common.go xl-v1-utils.go. (#1357)
This commit is contained in:
parent
becc814531
commit
8c85815106
@ -86,7 +86,7 @@ func TestGetObjectInfo(t *testing.T) {
|
|||||||
{"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false},
|
{"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false},
|
||||||
{"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false},
|
{"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false},
|
||||||
// Test case with existing bucket but object name set to a directory (Test number 13).
|
// Test case with existing bucket but object name set to a directory (Test number 13).
|
||||||
{"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia"}, false},
|
{"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectExistsAsPrefix{Bucket: "test-getobjectinfo", Object: "Asia"}, false},
|
||||||
// Valid case with existing object (Test number 14).
|
// Valid case with existing object (Test number 14).
|
||||||
{"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true},
|
{"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true},
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ var _ = Suite(&MySuite{})
|
|||||||
|
|
||||||
func (s *MySuite) TestAPISuite(c *C) {
|
func (s *MySuite) TestAPISuite(c *C) {
|
||||||
var storageList []string
|
var storageList []string
|
||||||
create := func() *objectAPI {
|
create := func() objectAPI {
|
||||||
path, err := ioutil.TempDir(os.TempDir(), "minio-")
|
path, err := ioutil.TempDir(os.TempDir(), "minio-")
|
||||||
c.Check(err, IsNil)
|
c.Check(err, IsNil)
|
||||||
storageAPI, err := newFS(path)
|
storageAPI, err := newFS(path)
|
||||||
|
@ -28,7 +28,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// APITestSuite - collection of API tests
|
// APITestSuite - collection of API tests
|
||||||
func APITestSuite(c *check.C, create func() *objectAPI) {
|
func APITestSuite(c *check.C, create func() objectAPI) {
|
||||||
testMakeBucket(c, create)
|
testMakeBucket(c, create)
|
||||||
testMultipleObjectCreation(c, create)
|
testMultipleObjectCreation(c, create)
|
||||||
testPaging(c, create)
|
testPaging(c, create)
|
||||||
@ -46,14 +46,13 @@ func APITestSuite(c *check.C, create func() *objectAPI) {
|
|||||||
testMultipartObjectAbort(c, create)
|
testMultipartObjectAbort(c, create)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMakeBucket(c *check.C, create func() *objectAPI) {
|
func testMakeBucket(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests verifies the functionality of PutObjectPart.
|
func testMultipartObjectCreation(c *check.C, create func() objectAPI) {
|
||||||
func testMultipartObjectCreation(c *check.C, create func() *objectAPI) {
|
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -83,7 +82,7 @@ func testMultipartObjectCreation(c *check.C, create func() *objectAPI) {
|
|||||||
c.Assert(md5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10")
|
c.Assert(md5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10")
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMultipartObjectAbort(c *check.C, create func() *objectAPI) {
|
func testMultipartObjectAbort(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -114,7 +113,7 @@ func testMultipartObjectAbort(c *check.C, create func() *objectAPI) {
|
|||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMultipleObjectCreation(c *check.C, create func() *objectAPI) {
|
func testMultipleObjectCreation(c *check.C, create func() objectAPI) {
|
||||||
objects := make(map[string][]byte)
|
objects := make(map[string][]byte)
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
@ -155,7 +154,7 @@ func testMultipleObjectCreation(c *check.C, create func() *objectAPI) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPaging(c *check.C, create func() *objectAPI) {
|
func testPaging(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
obj.MakeBucket("bucket")
|
obj.MakeBucket("bucket")
|
||||||
result, err := obj.ListObjects("bucket", "", "", "", 0)
|
result, err := obj.ListObjects("bucket", "", "", "", 0)
|
||||||
@ -255,7 +254,7 @@ func testPaging(c *check.C, create func() *objectAPI) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testObjectOverwriteWorks(c *check.C, create func() *objectAPI) {
|
func testObjectOverwriteWorks(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -276,13 +275,13 @@ func testObjectOverwriteWorks(c *check.C, create func() *objectAPI) {
|
|||||||
c.Assert(r.Close(), check.IsNil)
|
c.Assert(r.Close(), check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNonExistantBucketOperations(c *check.C, create func() *objectAPI) {
|
func testNonExistantBucketOperations(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
_, err := obj.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil)
|
_, err := obj.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil)
|
||||||
c.Assert(err, check.Not(check.IsNil))
|
c.Assert(err, check.Not(check.IsNil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func testBucketRecreateFails(c *check.C, create func() *objectAPI) {
|
func testBucketRecreateFails(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("string")
|
err := obj.MakeBucket("string")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -290,7 +289,7 @@ func testBucketRecreateFails(c *check.C, create func() *objectAPI) {
|
|||||||
c.Assert(err, check.Not(check.IsNil))
|
c.Assert(err, check.Not(check.IsNil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPutObjectInSubdir(c *check.C, create func() *objectAPI) {
|
func testPutObjectInSubdir(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -308,7 +307,7 @@ func testPutObjectInSubdir(c *check.C, create func() *objectAPI) {
|
|||||||
c.Assert(r.Close(), check.IsNil)
|
c.Assert(r.Close(), check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testListBuckets(c *check.C, create func() *objectAPI) {
|
func testListBuckets(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
|
|
||||||
// test empty list
|
// test empty list
|
||||||
@ -340,7 +339,7 @@ func testListBuckets(c *check.C, create func() *objectAPI) {
|
|||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testListBucketsOrder(c *check.C, create func() *objectAPI) {
|
func testListBucketsOrder(c *check.C, create func() objectAPI) {
|
||||||
// if implementation contains a map, order of map keys will vary.
|
// if implementation contains a map, order of map keys will vary.
|
||||||
// this ensures they return in the same order each time
|
// this ensures they return in the same order each time
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -358,7 +357,7 @@ func testListBucketsOrder(c *check.C, create func() *objectAPI) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() *objectAPI) {
|
func testListObjectsTestsForNonExistantBucket(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
result, err := obj.ListObjects("bucket", "", "", "", 1000)
|
result, err := obj.ListObjects("bucket", "", "", "", 1000)
|
||||||
c.Assert(err, check.Not(check.IsNil))
|
c.Assert(err, check.Not(check.IsNil))
|
||||||
@ -366,7 +365,7 @@ func testListObjectsTestsForNonExistantBucket(c *check.C, create func() *objectA
|
|||||||
c.Assert(len(result.Objects), check.Equals, 0)
|
c.Assert(len(result.Objects), check.Equals, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNonExistantObjectInBucket(c *check.C, create func() *objectAPI) {
|
func testNonExistantObjectInBucket(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -381,7 +380,7 @@ func testNonExistantObjectInBucket(c *check.C, create func() *objectAPI) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() *objectAPI) {
|
func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
@ -410,7 +409,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() *objectAPI)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testDefaultContentType(c *check.C, create func() *objectAPI) {
|
func testDefaultContentType(c *check.C, create func() objectAPI) {
|
||||||
obj := create()
|
obj := create()
|
||||||
err := obj.MakeBucket("bucket")
|
err := obj.MakeBucket("bucket")
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
|
157
xl-v1-common.go
Normal file
157
xl-v1-common.go
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
slashpath "path"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Returns slice of disks needed for ReadFile operation:
|
||||||
|
// - slice returing readable disks.
|
||||||
|
// - fileMetadata
|
||||||
|
// - bool value indicating if selfHeal is needed.
|
||||||
|
// - error if any.
|
||||||
|
func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, fileMetadata, bool, error) {
|
||||||
|
partsMetadata, errs := xl.getPartsMetadata(volume, path)
|
||||||
|
highestVersion := int64(0)
|
||||||
|
versions := make([]int64, len(xl.storageDisks))
|
||||||
|
quorumDisks := make([]StorageAPI, len(xl.storageDisks))
|
||||||
|
notFoundCount := 0
|
||||||
|
// If quorum says errFileNotFound return errFileNotFound
|
||||||
|
for _, err := range errs {
|
||||||
|
if err == errFileNotFound {
|
||||||
|
notFoundCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if notFoundCount > xl.readQuorum {
|
||||||
|
return nil, fileMetadata{}, false, errFileNotFound
|
||||||
|
}
|
||||||
|
for index, metadata := range partsMetadata {
|
||||||
|
if errs[index] == nil {
|
||||||
|
version, err := metadata.GetFileVersion()
|
||||||
|
if err == errMetadataKeyNotExist {
|
||||||
|
versions[index] = 0
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// Unexpected, return error.
|
||||||
|
return nil, fileMetadata{}, false, err
|
||||||
|
}
|
||||||
|
versions[index] = version
|
||||||
|
} else {
|
||||||
|
versions[index] = -1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
quorumCount := 0
|
||||||
|
for index, version := range versions {
|
||||||
|
if version == highestVersion {
|
||||||
|
quorumDisks[index] = xl.storageDisks[index]
|
||||||
|
quorumCount++
|
||||||
|
} else {
|
||||||
|
quorumDisks[index] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if quorumCount < xl.readQuorum {
|
||||||
|
return nil, fileMetadata{}, false, errReadQuorum
|
||||||
|
}
|
||||||
|
var metadata fileMetadata
|
||||||
|
for index, disk := range quorumDisks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metadata = partsMetadata[index]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// FIXME: take care of the situation when a disk has failed and been removed
|
||||||
|
// by looking at the error returned from the fs layer. fs-layer will have
|
||||||
|
// to return an error indicating that the disk is not available and should be
|
||||||
|
// different from ErrNotExist.
|
||||||
|
doSelfHeal := quorumCount != len(xl.storageDisks)
|
||||||
|
return quorumDisks, metadata, doSelfHeal, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get parts.json metadata as a map slice.
|
||||||
|
// Returns error slice indicating the failed metadata reads.
|
||||||
|
// Read lockNS() should be done by caller.
|
||||||
|
func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
|
||||||
|
errs := make([]error, len(xl.storageDisks))
|
||||||
|
metadataArray := make([]fileMetadata, len(xl.storageDisks))
|
||||||
|
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||||
|
for index, disk := range xl.storageDisks {
|
||||||
|
offset := int64(0)
|
||||||
|
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
|
||||||
|
if err != nil {
|
||||||
|
errs[index] = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
defer metadataReader.Close()
|
||||||
|
|
||||||
|
metadata, err := fileMetadataDecode(metadataReader)
|
||||||
|
if err != nil {
|
||||||
|
// Unable to parse parts.json, set error.
|
||||||
|
errs[index] = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
metadataArray[index] = metadata
|
||||||
|
}
|
||||||
|
return metadataArray, errs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writes/Updates `parts.json` for given file. updateParts carries
|
||||||
|
// index of disks where `parts.json` needs to be updated.
|
||||||
|
//
|
||||||
|
// Returns collection of errors, indexed in accordance with input
|
||||||
|
// updateParts order.
|
||||||
|
// Write lockNS() should be done by caller.
|
||||||
|
func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error {
|
||||||
|
metadataFilePath := filepath.Join(path, metadataFile)
|
||||||
|
errs := make([]error, len(xl.storageDisks))
|
||||||
|
|
||||||
|
for index := range updateParts {
|
||||||
|
errs[index] = errors.New("Metadata not updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataBytes, err := json.Marshal(metadata)
|
||||||
|
if err != nil {
|
||||||
|
for index := range updateParts {
|
||||||
|
errs[index] = err
|
||||||
|
}
|
||||||
|
return errs
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, shouldUpdate := range updateParts {
|
||||||
|
if !shouldUpdate {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath)
|
||||||
|
errs[index] = err
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err = writer.Write(metadataBytes)
|
||||||
|
if err != nil {
|
||||||
|
errs[index] = err
|
||||||
|
safeCloseAndRemove(writer)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
writer.Close()
|
||||||
|
}
|
||||||
|
return errs
|
||||||
|
}
|
@ -98,16 +98,15 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if version := metadata.Get("file.version"); version == nil {
|
version, err := metadata.GetFileVersion()
|
||||||
|
if err == errMetadataKeyNotExist {
|
||||||
fileQuorumVersionMap[index] = 0
|
fileQuorumVersionMap[index] = 0
|
||||||
} else {
|
continue
|
||||||
// Convert string to integer.
|
|
||||||
fileVersion, err := strconv.ParseInt(version[0], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fileQuorumVersionMap[index] = fileVersion
|
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fileQuorumVersionMap[index] = version
|
||||||
}
|
}
|
||||||
return fileQuorumVersionMap
|
return fileQuorumVersionMap
|
||||||
}
|
}
|
||||||
|
@ -96,6 +96,25 @@ func (f fileMetadata) GetModTime() (time.Time, error) {
|
|||||||
return time.Parse(timeFormatAMZ, timeStrs[0])
|
return time.Parse(timeFormatAMZ, timeStrs[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set file Modification time.
|
||||||
|
func (f fileMetadata) SetModTime(modTime time.Time) {
|
||||||
|
f.Set("file.modTime", modTime.Format(timeFormatAMZ))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get file version.
|
||||||
|
func (f fileMetadata) GetFileVersion() (int64, error) {
|
||||||
|
version := f.Get("file.version")
|
||||||
|
if version == nil {
|
||||||
|
return 0, errMetadataKeyNotExist
|
||||||
|
}
|
||||||
|
return strconv.ParseInt(version[0], 10, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set file version.
|
||||||
|
func (f fileMetadata) SetFileVersion(fileVersion int64) {
|
||||||
|
f.Set("file.version", strconv.FormatInt(fileVersion, 10))
|
||||||
|
}
|
||||||
|
|
||||||
// fileMetadataDecode - file metadata decode.
|
// fileMetadataDecode - file metadata decode.
|
||||||
func fileMetadataDecode(reader io.Reader) (fileMetadata, error) {
|
func fileMetadataDecode(reader io.Reader) (fileMetadata, error) {
|
||||||
metadata := make(fileMetadata)
|
metadata := make(fileMetadata)
|
||||||
|
@ -21,96 +21,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
slashpath "path"
|
slashpath "path"
|
||||||
"strconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// checkBlockSize return the size of a single block.
|
|
||||||
// The first non-zero size is returned,
|
|
||||||
// or 0 if all blocks are size 0.
|
|
||||||
func checkBlockSize(blocks [][]byte) int {
|
|
||||||
for _, block := range blocks {
|
|
||||||
if len(block) != 0 {
|
|
||||||
return len(block)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculate the blockSize based on input length and total number of
|
|
||||||
// data blocks.
|
|
||||||
func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
|
|
||||||
curBlockSize = (inputLen + dataBlocks - 1) / dataBlocks
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns slice of disks needed for ReadFile operation:
|
|
||||||
// - slice returing readable disks.
|
|
||||||
// - fileMetadata
|
|
||||||
// - bool value indicating if selfHeal is needed.
|
|
||||||
// - error if any.
|
|
||||||
func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, fileMetadata, bool, error) {
|
|
||||||
partsMetadata, errs := xl.getPartsMetadata(volume, path)
|
|
||||||
highestVersion := int64(0)
|
|
||||||
versions := make([]int64, len(xl.storageDisks))
|
|
||||||
quorumDisks := make([]StorageAPI, len(xl.storageDisks))
|
|
||||||
notFoundCount := 0
|
|
||||||
// If quorum says errFileNotFound return errFileNotFound
|
|
||||||
for _, err := range errs {
|
|
||||||
if err == errFileNotFound {
|
|
||||||
notFoundCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if notFoundCount > xl.readQuorum {
|
|
||||||
return nil, fileMetadata{}, false, errFileNotFound
|
|
||||||
}
|
|
||||||
for index, metadata := range partsMetadata {
|
|
||||||
if errs[index] == nil {
|
|
||||||
if version := metadata.Get("file.version"); version != nil {
|
|
||||||
// Convert string to integer.
|
|
||||||
version, err := strconv.ParseInt(version[0], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
// Unexpected, return error.
|
|
||||||
return nil, fileMetadata{}, false, err
|
|
||||||
}
|
|
||||||
if version > highestVersion {
|
|
||||||
highestVersion = version
|
|
||||||
}
|
|
||||||
versions[index] = version
|
|
||||||
} else {
|
|
||||||
versions[index] = 0
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
versions[index] = -1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
quorumCount := 0
|
|
||||||
for index, version := range versions {
|
|
||||||
if version == highestVersion {
|
|
||||||
quorumDisks[index] = xl.storageDisks[index]
|
|
||||||
quorumCount++
|
|
||||||
} else {
|
|
||||||
quorumDisks[index] = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if quorumCount < xl.readQuorum {
|
|
||||||
return nil, fileMetadata{}, false, errReadQuorum
|
|
||||||
}
|
|
||||||
var metadata fileMetadata
|
|
||||||
for index, disk := range quorumDisks {
|
|
||||||
if disk == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
metadata = partsMetadata[index]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// FIXME: take care of the situation when a disk has failed and been removed
|
|
||||||
// by looking at the error returned from the fs layer. fs-layer will have
|
|
||||||
// to return an error indicating that the disk is not available and should be
|
|
||||||
// different from ErrNotExist.
|
|
||||||
doSelfHeal := quorumCount != len(xl.storageDisks)
|
|
||||||
return quorumDisks, metadata, doSelfHeal, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadFile - read file
|
// ReadFile - read file
|
||||||
func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) {
|
func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) {
|
||||||
// Input validation.
|
// Input validation.
|
||||||
|
@ -1,77 +1,36 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
// checkBlockSize return the size of a single block.
|
||||||
"encoding/json"
|
// The first non-zero size is returned,
|
||||||
"errors"
|
// or 0 if all blocks are size 0.
|
||||||
slashpath "path"
|
func checkBlockSize(blocks [][]byte) int {
|
||||||
"path/filepath"
|
for _, block := range blocks {
|
||||||
)
|
if len(block) != 0 {
|
||||||
|
return len(block)
|
||||||
// Get parts.json metadata as a map slice.
|
|
||||||
// Returns error slice indicating the failed metadata reads.
|
|
||||||
// Read lockNS() should be done by caller.
|
|
||||||
func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
|
|
||||||
errs := make([]error, len(xl.storageDisks))
|
|
||||||
metadataArray := make([]fileMetadata, len(xl.storageDisks))
|
|
||||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
|
||||||
for index, disk := range xl.storageDisks {
|
|
||||||
offset := int64(0)
|
|
||||||
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
|
|
||||||
if err != nil {
|
|
||||||
errs[index] = err
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
defer metadataReader.Close()
|
|
||||||
|
|
||||||
metadata, err := fileMetadataDecode(metadataReader)
|
|
||||||
if err != nil {
|
|
||||||
// Unable to parse parts.json, set error.
|
|
||||||
errs[index] = err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
metadataArray[index] = metadata
|
|
||||||
}
|
}
|
||||||
return metadataArray, errs
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writes/Updates `parts.json` for given file. updateParts carries
|
// calculate the blockSize based on input length and total number of
|
||||||
// index of disks where `parts.json` needs to be updated.
|
// data blocks.
|
||||||
//
|
func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
|
||||||
// Returns collection of errors, indexed in accordance with input
|
curBlockSize = (inputLen + dataBlocks - 1) / dataBlocks
|
||||||
// updateParts order.
|
return
|
||||||
// Write lockNS() should be done by caller.
|
|
||||||
func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error {
|
|
||||||
metadataFilePath := filepath.Join(path, metadataFile)
|
|
||||||
errs := make([]error, len(xl.storageDisks))
|
|
||||||
|
|
||||||
for index := range updateParts {
|
|
||||||
errs[index] = errors.New("Metadata not updated")
|
|
||||||
}
|
|
||||||
|
|
||||||
metadataBytes, err := json.Marshal(metadata)
|
|
||||||
if err != nil {
|
|
||||||
for index := range updateParts {
|
|
||||||
errs[index] = err
|
|
||||||
}
|
|
||||||
return errs
|
|
||||||
}
|
|
||||||
|
|
||||||
for index, shouldUpdate := range updateParts {
|
|
||||||
if !shouldUpdate {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath)
|
|
||||||
errs[index] = err
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
_, err = writer.Write(metadataBytes)
|
|
||||||
if err != nil {
|
|
||||||
errs[index] = err
|
|
||||||
safeCloseAndRemove(writer)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
writer.Close()
|
|
||||||
}
|
|
||||||
return errs
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user