mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
XL/objects: Initialize format.json outside of erasure. (#1640)
Fixes #1636 New format now generates a UUID and includes it along with the order of disks. So that UUID is the real order of disks and on command line user is able to specify disks in any order. This pre-dominantly solves our dilemma. ``` { "format" : "xl", "xl" : { "version" : "1", "disk": "00e4cf06-5bf5-4bb5-b885-4b2fff4a7959", "jbod" : [ "00e4cf06-5bf5-4bb5-b885-4b2fff4a7959", .... "c47d2608-5067-4ed7-b1e4-fb81bdbb549f", "a543293e-99f1-4310-b540-1e450878e844", "18f97cbe-529a-456a-b6d4-0feacf64534d" ] }, "version" : "1" } ```
This commit is contained in:
parent
f5dfa895a5
commit
e4240aa58f
@ -18,7 +18,12 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
)
|
||||
|
||||
type fsFormat struct {
|
||||
@ -27,7 +32,8 @@ type fsFormat struct {
|
||||
|
||||
type xlFormat struct {
|
||||
Version string `json:"version"`
|
||||
Disks []string `json:"disks"`
|
||||
Disk string `json:"disk"`
|
||||
JBOD []string `json:"jbod"`
|
||||
}
|
||||
|
||||
type formatConfigV1 struct {
|
||||
@ -37,66 +43,241 @@ type formatConfigV1 struct {
|
||||
XL *xlFormat `json:"xl,omitempty"`
|
||||
}
|
||||
|
||||
// FIXME: currently we don't check single exportPath which uses FS layer.
|
||||
// checkJBODConsistency - validate xl jbod order if they are consistent.
|
||||
func checkJBODConsistency(formatConfigs []*formatConfigV1) error {
|
||||
var firstJBOD []string
|
||||
// Extract first valid JBOD.
|
||||
for _, format := range formatConfigs {
|
||||
if format == nil {
|
||||
continue
|
||||
}
|
||||
firstJBOD = format.XL.JBOD
|
||||
break
|
||||
}
|
||||
jbodStr := strings.Join(firstJBOD, ".")
|
||||
for _, format := range formatConfigs {
|
||||
if format == nil {
|
||||
continue
|
||||
}
|
||||
savedJBODStr := strings.Join(format.XL.JBOD, ".")
|
||||
if jbodStr != savedJBODStr {
|
||||
return errors.New("Inconsistent disks.")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadFormatXL - load XL format.json.
|
||||
func loadFormatXL(storage StorageAPI) (xl *xlFormat, err error) {
|
||||
func findIndex(disk string, jbod []string) int {
|
||||
for index, uuid := range jbod {
|
||||
if uuid == disk {
|
||||
return index
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// reorderDisks - reorder disks in JBOD order.
|
||||
func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) ([]StorageAPI, error) {
|
||||
var savedJBOD []string
|
||||
for _, format := range formatConfigs {
|
||||
if format == nil {
|
||||
continue
|
||||
}
|
||||
savedJBOD = format.XL.JBOD
|
||||
break
|
||||
}
|
||||
// Pick the first JBOD list to verify the order and construct new set of disk slice.
|
||||
var newDisks = make([]StorageAPI, len(bootstrapDisks))
|
||||
var unclaimedJBODIndex = make(map[int]struct{})
|
||||
for fIndex, format := range formatConfigs {
|
||||
if format == nil {
|
||||
unclaimedJBODIndex[fIndex] = struct{}{}
|
||||
continue
|
||||
}
|
||||
jIndex := findIndex(format.XL.Disk, savedJBOD)
|
||||
if jIndex == -1 {
|
||||
return nil, errors.New("Unrecognized uuid " + format.XL.Disk + " found")
|
||||
}
|
||||
newDisks[jIndex] = bootstrapDisks[fIndex]
|
||||
}
|
||||
// Save the unclaimed jbods as well.
|
||||
for index, disk := range newDisks {
|
||||
if disk == nil {
|
||||
for fIndex := range unclaimedJBODIndex {
|
||||
newDisks[index] = bootstrapDisks[fIndex]
|
||||
delete(unclaimedJBODIndex, fIndex)
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
return newDisks, nil
|
||||
}
|
||||
|
||||
// loadFormat - load format from disk.
|
||||
func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) {
|
||||
offset := int64(0)
|
||||
r, err := storage.ReadFile(minioMetaBucket, formatConfigFile, offset)
|
||||
r, err := disk.ReadFile(minioMetaBucket, formatConfigFile, offset)
|
||||
if err != nil {
|
||||
// 'file not found' and 'volume not found' as
|
||||
// same. 'volume not found' usually means its a fresh disk.
|
||||
if err == errFileNotFound || err == errVolumeNotFound {
|
||||
var vols []VolInfo
|
||||
vols, err = disk.ListVols()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(vols) > 1 {
|
||||
// 'format.json' not found, but we found user data.
|
||||
return nil, errCorruptedFormat
|
||||
}
|
||||
// No other data found, its a fresh disk.
|
||||
return nil, errUnformattedDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
decoder := json.NewDecoder(r)
|
||||
formatXL := formatConfigV1{}
|
||||
err = decoder.Decode(&formatXL)
|
||||
format = &formatConfigV1{}
|
||||
err = decoder.Decode(&format)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = r.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return format, nil
|
||||
}
|
||||
|
||||
// loadFormatXL - load XL format.json.
|
||||
func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) {
|
||||
var unformattedDisksFoundCnt = 0
|
||||
var diskNotFoundCount = 0
|
||||
formatConfigs := make([]*formatConfigV1, len(bootstrapDisks))
|
||||
for index, disk := range bootstrapDisks {
|
||||
var formatXL *formatConfigV1
|
||||
formatXL, err = loadFormat(disk)
|
||||
if err != nil {
|
||||
if err == errUnformattedDisk {
|
||||
unformattedDisksFoundCnt++
|
||||
continue
|
||||
} else if err == errDiskNotFound {
|
||||
diskNotFoundCount++
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// Save valid formats.
|
||||
formatConfigs[index] = formatXL
|
||||
}
|
||||
// If all disks indicate that 'format.json' is not available
|
||||
// return 'errUnformattedDisk'.
|
||||
if unformattedDisksFoundCnt == len(bootstrapDisks) {
|
||||
return nil, errUnformattedDisk
|
||||
} else if diskNotFoundCount == len(bootstrapDisks) {
|
||||
return nil, errDiskNotFound
|
||||
} else if diskNotFoundCount > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) {
|
||||
return nil, errReadQuorum
|
||||
} else if unformattedDisksFoundCnt > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) {
|
||||
return nil, errReadQuorum
|
||||
}
|
||||
|
||||
if err = checkFormatXL(formatConfigs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Erasure code requires disks to be presented in the same order each time.
|
||||
return reorderDisks(bootstrapDisks, formatConfigs)
|
||||
}
|
||||
|
||||
// checkFormatXL - verifies if format.json format is intact.
|
||||
func checkFormatXL(formatConfigs []*formatConfigV1) error {
|
||||
for _, formatXL := range formatConfigs {
|
||||
if formatXL == nil {
|
||||
continue
|
||||
}
|
||||
// Validate format version and format type.
|
||||
if formatXL.Version != "1" {
|
||||
return nil, fmt.Errorf("Unsupported version of backend format [%s] found.", formatXL.Version)
|
||||
return fmt.Errorf("Unsupported version of backend format [%s] found.", formatXL.Version)
|
||||
}
|
||||
if formatXL.Format != "xl" {
|
||||
return nil, fmt.Errorf("Unsupported backend format [%s] found.", formatXL.Format)
|
||||
return fmt.Errorf("Unsupported backend format [%s] found.", formatXL.Format)
|
||||
}
|
||||
return formatXL.XL, nil
|
||||
if formatXL.XL.Version != "1" {
|
||||
return fmt.Errorf("Unsupported XL backend format found [%s]", formatXL.XL.Version)
|
||||
}
|
||||
if len(formatConfigs) != len(formatXL.XL.JBOD) {
|
||||
return fmt.Errorf("Number of disks %d did not match the backend format %d", len(formatConfigs), len(formatXL.XL.JBOD))
|
||||
}
|
||||
}
|
||||
return checkJBODConsistency(formatConfigs)
|
||||
}
|
||||
|
||||
// checkFormat - validates if format.json file exists.
|
||||
func checkFormat(storage StorageAPI) error {
|
||||
_, err := storage.StatFile(minioMetaBucket, formatConfigFile)
|
||||
// initFormatXL - save XL format configuration on all disks.
|
||||
func initFormatXL(storageDisks []StorageAPI) (err error) {
|
||||
var (
|
||||
jbod = make([]string, len(storageDisks))
|
||||
formatWriters = make([]io.WriteCloser, len(storageDisks))
|
||||
formats = make([]*formatConfigV1, len(storageDisks))
|
||||
saveFormatErrCnt = 0
|
||||
)
|
||||
for index, disk := range storageDisks {
|
||||
if err = disk.MakeVol(minioMetaBucket); err != nil {
|
||||
if err != errVolumeExists {
|
||||
saveFormatErrCnt++
|
||||
// Check for write quorum.
|
||||
if saveFormatErrCnt <= len(storageDisks)-(len(storageDisks)/2+3) {
|
||||
continue
|
||||
}
|
||||
return errWriteQuorum
|
||||
}
|
||||
}
|
||||
var w io.WriteCloser
|
||||
w, err = disk.CreateFile(minioMetaBucket, formatConfigFile)
|
||||
if err != nil {
|
||||
saveFormatErrCnt++
|
||||
// Check for write quorum.
|
||||
if saveFormatErrCnt <= len(storageDisks)-(len(storageDisks)/2+3) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// saveFormatXL - save XL format configuration
|
||||
func saveFormatXL(storage StorageAPI, xl *xlFormat) error {
|
||||
w, err := storage.CreateFile(minioMetaBucket, formatConfigFile)
|
||||
u, err := uuid.New()
|
||||
if err != nil {
|
||||
saveFormatErrCnt++
|
||||
// Check for write quorum.
|
||||
if saveFormatErrCnt <= len(storageDisks)-(len(storageDisks)/2+3) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
formatXL := formatConfigV1{
|
||||
formatWriters[index] = w
|
||||
formats[index] = &formatConfigV1{
|
||||
Version: "1",
|
||||
Format: "xl",
|
||||
XL: xl,
|
||||
XL: &xlFormat{
|
||||
Version: "1",
|
||||
Disk: u.String(),
|
||||
},
|
||||
}
|
||||
jbod[index] = formats[index].XL.Disk
|
||||
}
|
||||
for index, w := range formatWriters {
|
||||
if formats[index] == nil {
|
||||
continue
|
||||
}
|
||||
formats[index].XL.JBOD = jbod
|
||||
encoder := json.NewEncoder(w)
|
||||
err = encoder.Encode(&formatXL)
|
||||
err = encoder.Encode(&formats[index])
|
||||
if err != nil {
|
||||
if clErr := safeCloseAndRemove(w); clErr != nil {
|
||||
return clErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, w := range formatWriters {
|
||||
if w == nil {
|
||||
continue
|
||||
}
|
||||
if err = w.Close(); err != nil {
|
||||
if clErr := safeCloseAndRemove(w); clErr != nil {
|
||||
return clErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ func (s *MySuite) TestXLAPISuite(c *C) {
|
||||
c.Check(err, IsNil)
|
||||
erasureDisks = append(erasureDisks, path)
|
||||
}
|
||||
objAPI, err := newXLObjects(erasureDisks...)
|
||||
objAPI, err := newXLObjects(erasureDisks)
|
||||
c.Check(err, IsNil)
|
||||
return objAPI
|
||||
}
|
||||
|
@ -22,12 +22,13 @@ import (
|
||||
)
|
||||
|
||||
// Common initialization needed for both object layers.
|
||||
func initObjectLayer(storage StorageAPI) error {
|
||||
func initObjectLayer(storageDisks ...StorageAPI) error {
|
||||
// This happens for the first time, but keep this here since this
|
||||
// is the only place where it can be made expensive optimizing all
|
||||
// other calls. Create minio meta volume, if it doesn't exist yet.
|
||||
for _, storage := range storageDisks {
|
||||
if err := storage.MakeVol(minioMetaBucket); err != nil {
|
||||
if err != errVolumeExists {
|
||||
if err != errVolumeExists && err != errDiskNotFound {
|
||||
return toObjectErr(err, minioMetaBucket)
|
||||
}
|
||||
}
|
||||
@ -36,6 +37,7 @@ func initObjectLayer(storage StorageAPI) error {
|
||||
if err != nil {
|
||||
return toObjectErr(err, minioMetaBucket, tmpMetaPrefix)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
9
posix.go
9
posix.go
@ -114,6 +114,9 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
|
||||
}
|
||||
di, err := disk.GetInfo(diskPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errDiskNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -203,6 +206,9 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
|
||||
var diskInfo disk.Info
|
||||
diskInfo, err = disk.GetInfo(s.diskPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, errDiskNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
volsInfo, err = listVols(s.diskPath)
|
||||
@ -242,6 +248,9 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
var diskInfo disk.Info
|
||||
diskInfo, err = disk.GetInfo(s.diskPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return VolInfo{}, errDiskNotFound
|
||||
}
|
||||
return VolInfo{}, err
|
||||
}
|
||||
// As os.Stat() doesn't carry other than ModTime(), use ModTime()
|
||||
|
@ -25,14 +25,14 @@ import (
|
||||
|
||||
// newObjectLayer - initialize any object layer depending on the
|
||||
// number of export paths.
|
||||
func newObjectLayer(exportPaths ...string) (ObjectLayer, error) {
|
||||
func newObjectLayer(exportPaths []string) (ObjectLayer, error) {
|
||||
if len(exportPaths) == 1 {
|
||||
exportPath := exportPaths[0]
|
||||
// Initialize FS object layer.
|
||||
return newFSObjects(exportPath)
|
||||
}
|
||||
// Initialize XL object layer.
|
||||
objAPI, err := newXLObjects(exportPaths...)
|
||||
objAPI, err := newXLObjects(exportPaths)
|
||||
if err == errWriteQuorum {
|
||||
return objAPI, errors.New("Disks are different with last minio server run.")
|
||||
}
|
||||
@ -41,7 +41,7 @@ func newObjectLayer(exportPaths ...string) (ObjectLayer, error) {
|
||||
|
||||
// configureServer handler returns final handler for the http server.
|
||||
func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
objAPI, err := newObjectLayer(srvCmdConfig.exportPaths...)
|
||||
objAPI, err := newObjectLayer(srvCmdConfig.exportPaths)
|
||||
fatalIf(err, "Unable to intialize object layer.")
|
||||
|
||||
// Initialize storage rpc server.
|
||||
|
@ -18,6 +18,12 @@ package main
|
||||
|
||||
import "errors"
|
||||
|
||||
// errCorruptedFormat - corrupted backend format.
|
||||
var errCorruptedFormat = errors.New("corrupted backend format")
|
||||
|
||||
// errUnformattedDisk - unformatted disk found.
|
||||
var errUnformattedDisk = errors.New("unformatted disk found")
|
||||
|
||||
// errDiskFull - cannot create volume or files when disk is full.
|
||||
var errDiskFull = errors.New("disk path full")
|
||||
|
||||
|
@ -44,7 +44,7 @@ func ExecObjectLayerTest(t *testing.T, objTest func(obj ObjectLayer, instanceTyp
|
||||
}
|
||||
erasureDisks = append(erasureDisks, path)
|
||||
}
|
||||
objLayer, err := newXLObjects(erasureDisks...)
|
||||
objLayer, err := newXLObjects(erasureDisks)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -1,31 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import "errors"
|
||||
|
||||
// errMaxDisks - returned for reached maximum of disks.
|
||||
var errMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'")
|
||||
|
||||
// errMinDisks - returned for minimum number of disks.
|
||||
var errMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'")
|
||||
|
||||
// errNumDisks - returned for odd number of disks.
|
||||
var errNumDisks = errors.New("Number of disks should be multiples of '2'")
|
||||
|
||||
// errUnexpected - returned for any unexpected error.
|
||||
var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues")
|
@ -17,6 +17,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
@ -32,10 +33,6 @@ import (
|
||||
const (
|
||||
// XL erasure metadata file.
|
||||
xlMetaV1File = "file.json"
|
||||
// Maximum erasure blocks.
|
||||
maxErasureBlocks = 16
|
||||
// Minimum erasure blocks.
|
||||
minErasureBlocks = 8
|
||||
)
|
||||
|
||||
// XL layer structure.
|
||||
@ -48,33 +45,16 @@ type XL struct {
|
||||
writeQuorum int
|
||||
}
|
||||
|
||||
// errUnexpected - returned for any unexpected error.
|
||||
var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues")
|
||||
|
||||
// newXL instantiate a new XL.
|
||||
func newXL(disks ...string) (StorageAPI, error) {
|
||||
func newXL(disks []StorageAPI) (StorageAPI, error) {
|
||||
// Initialize XL.
|
||||
xl := &XL{}
|
||||
|
||||
// Verify total number of disks.
|
||||
totalDisks := len(disks)
|
||||
if totalDisks > maxErasureBlocks {
|
||||
return nil, errMaxDisks
|
||||
}
|
||||
if totalDisks < minErasureBlocks {
|
||||
return nil, errMinDisks
|
||||
}
|
||||
|
||||
// isEven function to verify if a given number if even.
|
||||
isEven := func(number int) bool {
|
||||
return number%2 == 0
|
||||
}
|
||||
|
||||
// Verify if we have even number of disks.
|
||||
// only combination of 8, 10, 12, 14, 16 are supported.
|
||||
if !isEven(totalDisks) {
|
||||
return nil, errNumDisks
|
||||
}
|
||||
|
||||
// Calculate data and parity blocks.
|
||||
dataBlocks, parityBlocks := totalDisks/2, totalDisks/2
|
||||
dataBlocks, parityBlocks := len(disks)/2, len(disks)/2
|
||||
|
||||
// Initialize reed solomon encoding.
|
||||
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
|
||||
@ -87,23 +67,8 @@ func newXL(disks ...string) (StorageAPI, error) {
|
||||
xl.ParityBlocks = parityBlocks
|
||||
xl.ReedSolomon = rs
|
||||
|
||||
// Initialize all storage disks.
|
||||
storageDisks := make([]StorageAPI, len(disks))
|
||||
for index, disk := range disks {
|
||||
var err error
|
||||
// Intentionally ignore disk not found errors while
|
||||
// initializing POSIX, so that we have successfully
|
||||
// initialized posix Storage.
|
||||
// Subsequent calls to XL/Erasure will manage any errors
|
||||
// related to disks.
|
||||
storageDisks[index], err = newPosix(disk)
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Save all the initialized storage disks.
|
||||
xl.storageDisks = storageDisks
|
||||
xl.storageDisks = disks
|
||||
|
||||
// Figure out read and write quorum based on number of storage disks.
|
||||
// Read quorum should be always N/2 + 1 (due to Vandermonde matrix
|
||||
|
123
xl-objects.go
123
xl-objects.go
@ -44,68 +44,109 @@ type xlObjects struct {
|
||||
listObjectMapMutex *sync.Mutex
|
||||
}
|
||||
|
||||
// isValidFormat - validates input arguments with backend 'format.json'
|
||||
func isValidFormat(storage StorageAPI, exportPaths ...string) bool {
|
||||
// Load saved XL format.json and validate.
|
||||
xl, err := loadFormatXL(storage)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to load format file 'format.json'.")
|
||||
return false
|
||||
// errMaxDisks - returned for reached maximum of disks.
|
||||
var errMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'")
|
||||
|
||||
// errMinDisks - returned for minimum number of disks.
|
||||
var errMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'")
|
||||
|
||||
// errNumDisks - returned for odd number of disks.
|
||||
var errNumDisks = errors.New("Number of disks should be multiples of '2'")
|
||||
|
||||
const (
|
||||
// Maximum erasure blocks.
|
||||
maxErasureBlocks = 16
|
||||
// Minimum erasure blocks.
|
||||
minErasureBlocks = 8
|
||||
)
|
||||
|
||||
func checkSufficientDisks(disks []string) error {
|
||||
// Verify total number of disks.
|
||||
totalDisks := len(disks)
|
||||
if totalDisks > maxErasureBlocks {
|
||||
return errMaxDisks
|
||||
}
|
||||
if xl.Version != "1" {
|
||||
return false
|
||||
if totalDisks < minErasureBlocks {
|
||||
return errMinDisks
|
||||
}
|
||||
if len(exportPaths) != len(xl.Disks) {
|
||||
return false
|
||||
|
||||
// isEven function to verify if a given number if even.
|
||||
isEven := func(number int) bool {
|
||||
return number%2 == 0
|
||||
}
|
||||
for index, disk := range xl.Disks {
|
||||
if exportPaths[index] != disk {
|
||||
return false
|
||||
|
||||
// Verify if we have even number of disks.
|
||||
// only combination of 8, 10, 12, 14, 16 are supported.
|
||||
if !isEven(totalDisks) {
|
||||
return errNumDisks
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Depending on the disk type network or local, initialize storage layer.
|
||||
func newStorageLayer(disk string) (storage StorageAPI, err error) {
|
||||
if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" {
|
||||
// Initialize filesystem storage API.
|
||||
return newPosix(disk)
|
||||
}
|
||||
// Initialize rpc client storage API.
|
||||
return newRPCClient(disk)
|
||||
}
|
||||
|
||||
// Initialize all storage disks to bootstrap.
|
||||
func bootstrapDisks(disks []string) ([]StorageAPI, error) {
|
||||
storageDisks := make([]StorageAPI, len(disks))
|
||||
for index, disk := range disks {
|
||||
var err error
|
||||
// Intentionally ignore disk not found errors while
|
||||
// initializing POSIX, so that we have successfully
|
||||
// initialized posix Storage. Subsequent calls to XL/Erasure
|
||||
// will manage any errors related to disks.
|
||||
storageDisks[index], err = newStorageLayer(disk)
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return true
|
||||
return storageDisks, nil
|
||||
}
|
||||
|
||||
// newXLObjects - initialize new xl object layer.
|
||||
func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
|
||||
storage, err := newXL(exportPaths...)
|
||||
func newXLObjects(disks []string) (ObjectLayer, error) {
|
||||
if err := checkSufficientDisks(disks); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storageDisks, err := bootstrapDisks(disks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize object layer - like creating minioMetaBucket,
|
||||
// cleaning up tmp files etc.
|
||||
initObjectLayer(storage)
|
||||
// Initialize object layer - like creating minioMetaBucket, cleaning up tmp files etc.
|
||||
initObjectLayer(storageDisks...)
|
||||
|
||||
err = checkFormat(storage)
|
||||
// Load saved XL format.json and validate.
|
||||
newDisks, err := loadFormatXL(storageDisks)
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
switch err {
|
||||
case errUnformattedDisk:
|
||||
// Save new XL format.
|
||||
errSave := saveFormatXL(storage, &xlFormat{
|
||||
Version: "1",
|
||||
Disks: exportPaths,
|
||||
})
|
||||
errSave := initFormatXL(storageDisks)
|
||||
if errSave != nil {
|
||||
return nil, errSave
|
||||
}
|
||||
} else {
|
||||
if err == errReadQuorum {
|
||||
errMsg := fmt.Sprintf("Disks %s are offline. Unable to establish quorum.", exportPaths)
|
||||
err = errors.New(errMsg)
|
||||
} else if err == errDiskNotFound {
|
||||
errMsg := fmt.Sprintf("Disks %s not found.", exportPaths)
|
||||
err = errors.New(errMsg)
|
||||
} else if err == errVolumeAccessDenied {
|
||||
errMsg := fmt.Sprintf("Disks %s access permission denied.", exportPaths)
|
||||
err = errors.New(errMsg)
|
||||
}
|
||||
return nil, err
|
||||
newDisks = storageDisks
|
||||
default:
|
||||
// errCorruptedDisk - error.
|
||||
return nil, fmt.Errorf("Unable to recognize backend format, %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate if format exists and input arguments are validated with backend format.
|
||||
if !isValidFormat(storage, exportPaths...) {
|
||||
return nil, fmt.Errorf("Command-line arguments %s is not valid.", exportPaths)
|
||||
// FIXME: healFormatXL(newDisks)
|
||||
|
||||
storage, err := newXL(newDisks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return successfully initialized object layer.
|
||||
|
Loading…
x
Reference in New Issue
Block a user