mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
Move formatting of disks out of object layer initialization (#2572)
This commit is contained in:
parent
5c4dbc966f
commit
de67bca211
@ -28,6 +28,20 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Prepare benchmark backend
|
||||
func prepareBenchmarkBackend(instanceType string) (ObjectLayer, []string, error) {
|
||||
nDisks := 16
|
||||
disks, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
obj, err := makeTestBackend(disks, instanceType)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return obj, disks, nil
|
||||
}
|
||||
|
||||
// Benchmark utility functions for ObjectLayer.PutObject().
|
||||
// Creates Object layer setup ( MakeBucket ) and then runs the PutObject benchmark.
|
||||
func runPutObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) {
|
||||
@ -135,7 +149,7 @@ func runPutObjectPartBenchmark(b *testing.B, obj ObjectLayer, partSize int) {
|
||||
// creates XL/FS backend setup, obtains the object layer and calls the runPutObjectPartBenchmark function.
|
||||
func benchmarkPutObjectPart(b *testing.B, instanceType string, objSize int) {
|
||||
// create a temp XL/FS backend.
|
||||
objLayer, disks, err := makeTestBackend(instanceType)
|
||||
objLayer, disks, err := prepareBenchmarkBackend(instanceType)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
}
|
||||
@ -148,7 +162,7 @@ func benchmarkPutObjectPart(b *testing.B, instanceType string, objSize int) {
|
||||
// creates XL/FS backend setup, obtains the object layer and calls the runPutObjectBenchmark function.
|
||||
func benchmarkPutObject(b *testing.B, instanceType string, objSize int) {
|
||||
// create a temp XL/FS backend.
|
||||
objLayer, disks, err := makeTestBackend(instanceType)
|
||||
objLayer, disks, err := prepareBenchmarkBackend(instanceType)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
}
|
||||
@ -161,7 +175,7 @@ func benchmarkPutObject(b *testing.B, instanceType string, objSize int) {
|
||||
// creates XL/FS backend setup, obtains the object layer and runs parallel benchmark for put object.
|
||||
func benchmarkPutObjectParallel(b *testing.B, instanceType string, objSize int) {
|
||||
// create a temp XL/FS backend.
|
||||
objLayer, disks, err := makeTestBackend(instanceType)
|
||||
objLayer, disks, err := prepareBenchmarkBackend(instanceType)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
}
|
||||
@ -242,7 +256,7 @@ func generateBytesData(size int) []byte {
|
||||
// creates XL/FS backend setup, obtains the object layer and calls the runGetObjectBenchmark function.
|
||||
func benchmarkGetObject(b *testing.B, instanceType string, objSize int) {
|
||||
// create a temp XL/FS backend.
|
||||
objLayer, disks, err := makeTestBackend(instanceType)
|
||||
objLayer, disks, err := prepareBenchmarkBackend(instanceType)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
}
|
||||
@ -255,7 +269,7 @@ func benchmarkGetObject(b *testing.B, instanceType string, objSize int) {
|
||||
// creates XL/FS backend setup, obtains the object layer and runs parallel benchmark for ObjectLayer.GetObject() .
|
||||
func benchmarkGetObjectParallel(b *testing.B, instanceType string, objSize int) {
|
||||
// create a temp XL/FS backend.
|
||||
objLayer, disks, err := makeTestBackend(instanceType)
|
||||
objLayer, disks, err := prepareBenchmarkBackend(instanceType)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
}
|
||||
|
@ -112,7 +112,14 @@ func (c *controllerAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply
|
||||
if !isRPCTokenValid(args.Token) {
|
||||
return errInvalidToken
|
||||
}
|
||||
return objAPI.HealDiskMetadata()
|
||||
err := objAPI.HealDiskMetadata()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
globalWakeupCh <- struct{}{}
|
||||
}()
|
||||
return err
|
||||
}
|
||||
|
||||
// ShutdownArgs - argument for Shutdown RPC.
|
||||
@ -137,3 +144,12 @@ func (c *controllerAPIHandlers) ShutdownHandler(args *ShutdownArgs, reply *Gener
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controllerAPIHandlers) TryInitHandler(args *GenericArgs, reply *GenericReply) error {
|
||||
go func() {
|
||||
globalWakeupCh <- struct{}{}
|
||||
}()
|
||||
*reply = GenericReply{}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
@ -217,11 +217,16 @@ func TestIsSuccessBlocks(t *testing.T) {
|
||||
|
||||
// Wrapper function for testGetReadDisks, testGetOrderedDisks.
|
||||
func TestErasureReadUtils(t *testing.T) {
|
||||
objLayer, dirs, err := getXLObjectLayer()
|
||||
nDisks := 16
|
||||
disks, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer removeRoots(dirs)
|
||||
objLayer, err := getXLObjectLayer(disks)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer removeRoots(disks)
|
||||
xl := objLayer.(xlObjects)
|
||||
testGetReadDisks(t, xl)
|
||||
testGetOrderedDisks(t, xl)
|
||||
|
@ -86,16 +86,25 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) {
|
||||
|
||||
// Tests various forms of inititalization of event notifier.
|
||||
func TestInitEventNotifier(t *testing.T) {
|
||||
fs, disk, err := getSingleNodeObjectLayer()
|
||||
disk, err := getRandomDisks(1)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to create directories for FS backend. ", err)
|
||||
}
|
||||
fs, err := getSingleNodeObjectLayer(disk[0])
|
||||
if err != nil {
|
||||
t.Fatal("Unable to initialize FS backend.", err)
|
||||
}
|
||||
xl, disks, err := getXLObjectLayer()
|
||||
nDisks := 16
|
||||
disks, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to create directories for XL backend. ", err)
|
||||
}
|
||||
xl, err := getXLObjectLayer(disks)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to initialize XL backend.", err)
|
||||
}
|
||||
|
||||
disks = append(disks, disk)
|
||||
disks = append(disks, disk...)
|
||||
for _, d := range disks {
|
||||
defer removeAll(d)
|
||||
}
|
||||
|
@ -870,6 +870,11 @@ func initFormatXL(storageDisks []StorageAPI) (err error) {
|
||||
formats[index].XL.JBOD = jbod
|
||||
}
|
||||
|
||||
// Initialize meta volume, if volume already exists ignores it.
|
||||
if err := initMetaVolume(storageDisks); err != nil {
|
||||
return fmt.Errorf("Unable to initialize '.minio' meta volume, %s", err)
|
||||
}
|
||||
|
||||
// Save formats `format.json` across all disks.
|
||||
return saveFormatXL(storageDisks, formats)
|
||||
}
|
||||
|
@ -215,7 +215,6 @@ func genFormatXLInvalidDisksOrder() []*formatConfigV1 {
|
||||
}
|
||||
|
||||
func prepareFormatXLHealFreshDisks(obj ObjectLayer) ([]StorageAPI, error) {
|
||||
|
||||
var err error
|
||||
xl := obj.(xlObjects)
|
||||
|
||||
@ -326,7 +325,7 @@ func TestFormatXLHealFreshDisksErrorExpected(t *testing.T) {
|
||||
// a given disk to test healing a corrupted disk
|
||||
func TestFormatXLHealCorruptedDisks(t *testing.T) {
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := getXLObjectLayer()
|
||||
obj, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -398,7 +397,7 @@ func TestFormatXLHealCorruptedDisks(t *testing.T) {
|
||||
// some of format.json
|
||||
func TestFormatXLReorderByInspection(t *testing.T) {
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := getXLObjectLayer()
|
||||
obj, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -41,7 +41,11 @@ func TestNewFS(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initializes all disks with XL
|
||||
_, err := newXLObjects(disks, nil)
|
||||
err := formatDisks(disks, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to format XL %s", err)
|
||||
}
|
||||
_, err = newXLObjects(disks, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize XL object, %s", err)
|
||||
}
|
||||
|
@ -27,10 +27,18 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
mux "github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
var objLayerMutex *sync.Mutex
|
||||
var globalObjectAPI ObjectLayer
|
||||
|
||||
func init() {
|
||||
objLayerMutex = &sync.Mutex{}
|
||||
}
|
||||
|
||||
// supportedGetReqParams - supported request parameters for GET presigned request.
|
||||
var supportedGetReqParams = map[string]string{
|
||||
"response-expires": "Expires",
|
||||
|
227
cmd/prepare-storage.go
Normal file
227
cmd/prepare-storage.go
Normal file
@ -0,0 +1,227 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio-go/pkg/set"
|
||||
)
|
||||
|
||||
// Channel where minioctl heal handler would notify if it were successful. This
|
||||
// would be used by waitForFormattingDisks routine to check if it's worth
|
||||
// retrying loadAllFormats.
|
||||
var globalWakeupCh chan struct{}
|
||||
|
||||
func init() {
|
||||
globalWakeupCh = make(chan struct{}, 1)
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Following table lists different possible states the backend could be in.
|
||||
|
||||
* In a single-node, multi-disk setup, "Online" would refer to disks' status.
|
||||
|
||||
* In a multi-node setup, it could refer to disks' or network connectivity
|
||||
between the nodes, or both.
|
||||
|
||||
+----------+--------------------------+-----------------------+
|
||||
| Online | Format status | Course of action |
|
||||
| | | |
|
||||
-----------+--------------------------+-----------------------+
|
||||
| All | All Formatted | |
|
||||
+----------+--------------------------+ initObjectLayer |
|
||||
| Quorum | Quorum Formatted | |
|
||||
+----------+--------------------------+-----------------------+
|
||||
| All | Quorum | Print message saying |
|
||||
| | Formatted, | "Heal via minioctl" |
|
||||
| | some unformatted | and initObjectLayer |
|
||||
+----------+--------------------------+-----------------------+
|
||||
| All | None Formatted | FormatDisks |
|
||||
| | | and initObjectLayer |
|
||||
| | | |
|
||||
+----------+--------------------------+-----------------------+
|
||||
| | | Wait for notify from |
|
||||
| Quorum | | "Heal via minioctl" |
|
||||
| | Quorum UnFormatted | |
|
||||
+----------+--------------------------+-----------------------+
|
||||
| No | | Wait till enough |
|
||||
| Quorum | _ | nodes are online and |
|
||||
| | | one of the above |
|
||||
| | | sections apply |
|
||||
+----------+--------------------------+-----------------------+
|
||||
|
||||
N B A disk can be in one of the following states.
|
||||
- Unformatted
|
||||
- Formatted
|
||||
- Corrupted
|
||||
- Offline
|
||||
|
||||
*/
|
||||
|
||||
// InitActions - a type synonym for enumerating initialization activities.
|
||||
type InitActions int
|
||||
|
||||
const (
|
||||
// FormatDisks - see above table for disk states where it is applicable.
|
||||
FormatDisks InitActions = iota
|
||||
|
||||
// WaitForHeal - Wait for disks to heal.
|
||||
WaitForHeal
|
||||
|
||||
// WaitForQuorum - Wait for quorum number of disks to be online.
|
||||
WaitForQuorum
|
||||
|
||||
// WaitForAll - Wait for all disks to be online.
|
||||
WaitForAll
|
||||
|
||||
// WaitForFormatting - Wait for formatting to be triggered from the '1st' server in the cluster.
|
||||
WaitForFormatting
|
||||
|
||||
// InitObjectLayer - Initialize object layer.
|
||||
InitObjectLayer
|
||||
|
||||
// Abort initialization of object layer since there aren't enough good
|
||||
// copies of format.json to recover.
|
||||
Abort
|
||||
)
|
||||
|
||||
func prepForInit(disks []string, sErrs []error, diskCount int) InitActions {
|
||||
// Count errors by error value.
|
||||
errMap := make(map[error]int)
|
||||
// If loadAllFormats returned successfully
|
||||
if sErrs == nil {
|
||||
errMap[nil] = diskCount
|
||||
} else {
|
||||
for _, err := range sErrs {
|
||||
errMap[err]++
|
||||
}
|
||||
}
|
||||
|
||||
quorum := diskCount/2 + 1
|
||||
disksOffline := errMap[errDiskNotFound]
|
||||
disksFormatted := errMap[nil]
|
||||
disksUnformatted := errMap[errUnformattedDisk]
|
||||
disksCorrupted := errMap[errCorruptedFormat]
|
||||
|
||||
// All disks are unformatted, proceed to formatting disks.
|
||||
if disksUnformatted == diskCount {
|
||||
// Only the first server formats an uninitialized setup, others wait for notification.
|
||||
if isLocalStorage(disks[0]) {
|
||||
return FormatDisks
|
||||
}
|
||||
return WaitForFormatting
|
||||
} else if (disksUnformatted >= quorum) && (disksUnformatted+disksOffline == diskCount) {
|
||||
return WaitForAll
|
||||
}
|
||||
|
||||
// Already formatted, proceed to initialization of object layer.
|
||||
if disksFormatted == diskCount {
|
||||
return InitObjectLayer
|
||||
} else if disksFormatted > quorum && disksFormatted+disksOffline == diskCount {
|
||||
return InitObjectLayer
|
||||
} else if disksFormatted > quorum {
|
||||
// TODO: Print minioctl heal command
|
||||
return InitObjectLayer
|
||||
}
|
||||
|
||||
// No Quorum.
|
||||
if disksOffline > quorum {
|
||||
return WaitForQuorum
|
||||
}
|
||||
|
||||
// There is quorum or more corrupted disks, there is not enough good
|
||||
// disks to reconstruct format.json.
|
||||
if disksCorrupted >= quorum {
|
||||
return Abort
|
||||
}
|
||||
// Some of the formatted disks are possibly offline.
|
||||
return WaitForHeal
|
||||
}
|
||||
|
||||
func retryFormattingDisks(disks []string, storageDisks []StorageAPI) ([]StorageAPI, error) {
|
||||
nextBackoff := time.Duration(0)
|
||||
var err error
|
||||
done := false
|
||||
for !done {
|
||||
select {
|
||||
case <-time.After(nextBackoff * time.Second):
|
||||
// Attempt to load all `format.json`.
|
||||
_, sErrs := loadAllFormats(storageDisks)
|
||||
|
||||
switch prepForInit(disks, sErrs, len(storageDisks)) {
|
||||
case Abort:
|
||||
err = errCorruptedFormat
|
||||
done = true
|
||||
case FormatDisks:
|
||||
err = initFormatXL(storageDisks)
|
||||
done = true
|
||||
case InitObjectLayer:
|
||||
err = nil
|
||||
done = true
|
||||
}
|
||||
case <-globalWakeupCh:
|
||||
// Reset nextBackoff to reduce the subsequent wait and re-read
|
||||
// format.json from all disks again.
|
||||
nextBackoff = 0
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return storageDisks, nil
|
||||
}
|
||||
|
||||
func waitForFormattingDisks(disks, ignoredDisks []string) ([]StorageAPI, error) {
|
||||
// FS Setup
|
||||
if len(disks) == 1 {
|
||||
storage, err := newStorageAPI(disks[0])
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return nil, err
|
||||
}
|
||||
return []StorageAPI{storage}, nil
|
||||
}
|
||||
|
||||
// XL Setup
|
||||
if err := checkSufficientDisks(disks); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
disksSet := set.NewStringSet()
|
||||
if len(ignoredDisks) > 0 {
|
||||
disksSet = set.CreateStringSet(ignoredDisks...)
|
||||
}
|
||||
// Bootstrap disks.
|
||||
storageDisks := make([]StorageAPI, len(disks))
|
||||
for index, disk := range disks {
|
||||
// Check if disk is ignored.
|
||||
if disksSet.Contains(disk) {
|
||||
storageDisks[index] = nil
|
||||
continue
|
||||
}
|
||||
// Intentionally ignore disk not found errors. XL is designed
|
||||
// to handle these errors internally.
|
||||
storage, err := newStorageAPI(disk)
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return nil, err
|
||||
}
|
||||
storageDisks[index] = storage
|
||||
}
|
||||
|
||||
return retryFormattingDisks(disks, storageDisks)
|
||||
}
|
@ -25,6 +25,12 @@ import (
|
||||
router "github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
func newObjectLayerFn() ObjectLayer {
|
||||
objLayerMutex.Lock()
|
||||
defer objLayerMutex.Unlock()
|
||||
return globalObjectAPI
|
||||
}
|
||||
|
||||
// newObjectLayer - initialize any object layer depending on the number of disks.
|
||||
func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
if len(disks) == 1 {
|
||||
@ -37,54 +43,43 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
if err == errXLWriteQuorum {
|
||||
return objAPI, errors.New("Disks are different with last minio server run.")
|
||||
}
|
||||
return objAPI, err
|
||||
}
|
||||
|
||||
func newObjectLayerFactory(disks, ignoredDisks []string) func() ObjectLayer {
|
||||
var objAPI ObjectLayer
|
||||
// FIXME: This needs to be go-routine safe.
|
||||
return func() ObjectLayer {
|
||||
var err error
|
||||
if objAPI != nil {
|
||||
return objAPI
|
||||
}
|
||||
|
||||
// Acquire a distributed lock to ensure only one of the nodes
|
||||
// initializes the format.json.
|
||||
nsMutex.Lock(minioMetaBucket, formatConfigFile)
|
||||
defer nsMutex.Unlock(minioMetaBucket, formatConfigFile)
|
||||
objAPI, err = newObjectLayer(disks, ignoredDisks)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to initialize object layer.")
|
||||
// Purposefully do not return error, just return nil.
|
||||
return nil
|
||||
}
|
||||
// Migrate bucket policy from configDir to .minio.sys/buckets/
|
||||
err = migrateBucketPolicyConfig(objAPI)
|
||||
// Migrate bucket policy from configDir to .minio.sys/buckets/
|
||||
err = migrateBucketPolicyConfig(objAPI)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to migrate bucket policy from config directory")
|
||||
|
||||
err = cleanupOldBucketPolicyConfigs()
|
||||
errorIf(err, "Unable to clean up bucket policy from config directory.")
|
||||
|
||||
// Register the callback that should be called when the process shuts down.
|
||||
globalShutdownCBs.AddObjectLayerCB(func() errCode {
|
||||
if sErr := objAPI.Shutdown(); sErr != nil {
|
||||
return exitFailure
|
||||
}
|
||||
return exitSuccess
|
||||
})
|
||||
|
||||
// Initialize a new event notifier.
|
||||
err = initEventNotifier(objAPI)
|
||||
errorIf(err, "Unable to initialize event notification.")
|
||||
|
||||
// Initialize and load bucket policies.
|
||||
err = initBucketPolicies(objAPI)
|
||||
errorIf(err, "Unable to load all bucket policies.")
|
||||
|
||||
// Success.
|
||||
return objAPI
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = cleanupOldBucketPolicyConfigs()
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to clean up bucket policy from config directory.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Register the callback that should be called when the process shuts down.
|
||||
globalShutdownCBs.AddObjectLayerCB(func() errCode {
|
||||
if sErr := objAPI.Shutdown(); sErr != nil {
|
||||
return exitFailure
|
||||
}
|
||||
return exitSuccess
|
||||
})
|
||||
|
||||
// Initialize a new event notifier.
|
||||
err = initEventNotifier(objAPI)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to initialize event notification.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize and load bucket policies.
|
||||
err = initBucketPolicies(objAPI)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to load all bucket policies.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Success.
|
||||
return objAPI, nil
|
||||
}
|
||||
|
||||
// configureServer handler returns final handler for the http server.
|
||||
@ -97,7 +92,6 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
err = initGracefulShutdown(os.Exit)
|
||||
fatalIf(err, "Unable to initialize graceful shutdown operation")
|
||||
|
||||
newObjectLayerFn := newObjectLayerFactory(srvCmdConfig.disks, srvCmdConfig.ignoredDisks)
|
||||
// Initialize API.
|
||||
apiHandlers := objectAPIHandlers{
|
||||
ObjectAPI: newObjectLayerFn,
|
||||
|
@ -294,6 +294,35 @@ func isDistributedSetup(disks []string) (isDist bool) {
|
||||
return isDist
|
||||
}
|
||||
|
||||
// Format disks before initialization object layer.
|
||||
func formatDisks(disks, ignoredDisks []string) error {
|
||||
storageDisks, err := waitForFormattingDisks(disks, ignoredDisks)
|
||||
for i := range storageDisks {
|
||||
switch storage := storageDisks[i].(type) {
|
||||
// Closing associated TCP connections since
|
||||
// []StorageAPI is garbage collected eventually.
|
||||
case networkStorage:
|
||||
storage.rpcClient.Close()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isLocalStorage(disks[0]) {
|
||||
// notify every one else that they can try init again.
|
||||
for i := range storageDisks {
|
||||
switch storage := storageDisks[i].(type) {
|
||||
// Closing associated TCP connections since
|
||||
// []StorageAPI is garage collected eventually.
|
||||
case networkStorage:
|
||||
var reply GenericReply
|
||||
_ = storage.rpcClient.Call("Storage.TryInitHandler", &GenericArgs{}, &reply)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// serverMain handler called for 'minio server' command.
|
||||
func serverMain(c *cli.Context) {
|
||||
// Check 'server' cli arguments.
|
||||
@ -335,6 +364,11 @@ func serverMain(c *cli.Context) {
|
||||
disks: disks,
|
||||
ignoredDisks: ignoredDisks,
|
||||
}
|
||||
|
||||
// Initialize and monitor shutdown signals.
|
||||
err = initGracefulShutdown(os.Exit)
|
||||
fatalIf(err, "Unable to initialize graceful shutdown operation")
|
||||
|
||||
// Configure server.
|
||||
handler := configureServerHandler(srvConfig)
|
||||
|
||||
@ -354,12 +388,33 @@ func serverMain(c *cli.Context) {
|
||||
|
||||
// Start server.
|
||||
// Configure TLS if certs are available.
|
||||
if tls {
|
||||
err = apiServer.ListenAndServeTLS(mustGetCertFile(), mustGetKeyFile())
|
||||
} else {
|
||||
// Fallback to http.
|
||||
err = apiServer.ListenAndServe()
|
||||
wait := make(chan struct{}, 1)
|
||||
go func(tls bool, wait chan<- struct{}) {
|
||||
if tls {
|
||||
err = apiServer.ListenAndServeTLS(mustGetCertFile(), mustGetKeyFile())
|
||||
} else {
|
||||
// Fallback to http.
|
||||
err = apiServer.ListenAndServe()
|
||||
}
|
||||
wait <- struct{}{}
|
||||
|
||||
}(tls, wait)
|
||||
err = formatDisks(disks, ignoredDisks)
|
||||
if err != nil {
|
||||
// FIXME: call graceful exit
|
||||
errorIf(err, "formatting storage disks failed")
|
||||
return
|
||||
}
|
||||
newObject, err := newObjectLayer(disks, ignoredDisks)
|
||||
if err != nil {
|
||||
// FIXME: call graceful exit
|
||||
errorIf(err, "intializing object layer failed")
|
||||
return
|
||||
}
|
||||
objLayerMutex.Lock()
|
||||
globalObjectAPI = newObject
|
||||
objLayerMutex.Unlock()
|
||||
<-wait
|
||||
|
||||
fatalIf(err, "Failed to start minio server.")
|
||||
}
|
||||
|
@ -51,6 +51,33 @@ func init() {
|
||||
initNSLock(isDist)
|
||||
}
|
||||
|
||||
func prepareFS() (ObjectLayer, string, error) {
|
||||
fsDirs, err := getRandomDisks(1)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
obj, err := getSingleNodeObjectLayer(fsDirs[0])
|
||||
if err != nil {
|
||||
removeRoots(fsDirs)
|
||||
return nil, "", err
|
||||
}
|
||||
return obj, fsDirs[0], nil
|
||||
}
|
||||
|
||||
func prepareXL() (ObjectLayer, []string, error) {
|
||||
nDisks := 16
|
||||
fsDirs, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
obj, err := getXLObjectLayer(fsDirs)
|
||||
if err != nil {
|
||||
removeRoots(fsDirs)
|
||||
return nil, nil, err
|
||||
}
|
||||
return obj, fsDirs, nil
|
||||
}
|
||||
|
||||
// TestErrHandler - Golang Testing.T and Testing.B, and gocheck.C satisfy this interface.
|
||||
// This makes it easy to run the TestServer from any of the tests.
|
||||
// Using this interface, functionalities to be used in tests can be made generalized, and can be integrated in benchmarks/unit tests/go check suite tests.
|
||||
@ -112,6 +139,7 @@ type TestServer struct {
|
||||
AccessKey string
|
||||
SecretKey string
|
||||
Server *httptest.Server
|
||||
Obj ObjectLayer
|
||||
}
|
||||
|
||||
// Starts the test server and returns the TestServer instance.
|
||||
@ -119,10 +147,10 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
|
||||
// create an instance of TestServer.
|
||||
testServer := TestServer{}
|
||||
// create temporary backend for the test server.
|
||||
_, erasureDisks, err := makeTestBackend(instanceType)
|
||||
|
||||
nDisks := 16
|
||||
disks, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
t.Fatal("Failed to create disks for the backend")
|
||||
}
|
||||
|
||||
root, err := newTestConfig("us-east-1")
|
||||
@ -130,16 +158,25 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
|
||||
t.Fatalf("%s", err)
|
||||
}
|
||||
|
||||
// Test Server needs to start before formatting of disks.
|
||||
// Get credential.
|
||||
credentials := serverConfig.GetCredential()
|
||||
|
||||
testServer.Root = root
|
||||
testServer.Disks = erasureDisks
|
||||
testServer.Disks = disks
|
||||
testServer.AccessKey = credentials.AccessKeyID
|
||||
testServer.SecretKey = credentials.SecretAccessKey
|
||||
// Run TestServer.
|
||||
testServer.Server = httptest.NewServer(configureServerHandler(serverCmdConfig{disks: erasureDisks}))
|
||||
testServer.Server = httptest.NewServer(configureServerHandler(serverCmdConfig{disks: disks}))
|
||||
|
||||
objLayer, err := makeTestBackend(disks, instanceType)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed obtaining Temp Backend: <ERROR> %s", err)
|
||||
}
|
||||
testServer.Obj = objLayer
|
||||
objLayerMutex.Lock()
|
||||
globalObjectAPI = objLayer
|
||||
objLayerMutex.Unlock()
|
||||
return testServer
|
||||
}
|
||||
|
||||
@ -419,24 +456,24 @@ func getTestWebRPCResponse(resp *httptest.ResponseRecorder, data interface{}) er
|
||||
// if the option is
|
||||
// FS: Returns a temp single disk setup initializes FS Backend.
|
||||
// XL: Returns a 16 temp single disk setup and initializse XL Backend.
|
||||
func makeTestBackend(instanceType string) (ObjectLayer, []string, error) {
|
||||
func makeTestBackend(disks []string, instanceType string) (ObjectLayer, error) {
|
||||
switch instanceType {
|
||||
case "FS":
|
||||
objLayer, fsroot, err := getSingleNodeObjectLayer()
|
||||
objLayer, err := getSingleNodeObjectLayer(disks[0])
|
||||
if err != nil {
|
||||
return nil, []string{}, err
|
||||
return nil, err
|
||||
}
|
||||
return objLayer, []string{fsroot}, err
|
||||
return objLayer, err
|
||||
|
||||
case "XL":
|
||||
objectLayer, erasureDisks, err := getXLObjectLayer()
|
||||
objectLayer, err := getXLObjectLayer(disks)
|
||||
if err != nil {
|
||||
return nil, []string{}, err
|
||||
return nil, err
|
||||
}
|
||||
return objectLayer, erasureDisks, err
|
||||
return objectLayer, err
|
||||
default:
|
||||
errMsg := "Invalid instance type, Only FS and XL are valid options"
|
||||
return nil, []string{}, fmt.Errorf("Failed obtaining Temp XL layer: <ERROR> %s", errMsg)
|
||||
return nil, fmt.Errorf("Failed obtaining Temp XL layer: <ERROR> %s", errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
@ -771,21 +808,30 @@ func getTestRoot() (string, error) {
|
||||
return ioutil.TempDir(os.TempDir(), "api-")
|
||||
}
|
||||
|
||||
// getXLObjectLayer - Instantiates XL object layer and returns it.
|
||||
func getXLObjectLayer() (ObjectLayer, []string, error) {
|
||||
var nDisks = 16 // Maximum disks.
|
||||
// getRandomDisks - Creates a slice of N random disks, each of the form - minio-XXX
|
||||
func getRandomDisks(N int) ([]string, error) {
|
||||
var erasureDisks []string
|
||||
for i := 0; i < nDisks; i++ {
|
||||
for i := 0; i < N; i++ {
|
||||
path, err := ioutil.TempDir(os.TempDir(), "minio-")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
// Remove directories created so far.
|
||||
removeRoots(erasureDisks)
|
||||
return nil, err
|
||||
}
|
||||
erasureDisks = append(erasureDisks, path)
|
||||
}
|
||||
return erasureDisks, nil
|
||||
}
|
||||
|
||||
// getXLObjectLayer - Instantiates XL object layer and returns it.
|
||||
func getXLObjectLayer(erasureDisks []string) (ObjectLayer, error) {
|
||||
err := formatDisks(erasureDisks, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
objLayer, err := newXLObjects(erasureDisks, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
// Disabling the cache for integration tests.
|
||||
// Should use the object layer tests for validating cache.
|
||||
@ -793,23 +839,17 @@ func getXLObjectLayer() (ObjectLayer, []string, error) {
|
||||
xl.objCacheEnabled = false
|
||||
}
|
||||
|
||||
return objLayer, erasureDisks, nil
|
||||
return objLayer, nil
|
||||
}
|
||||
|
||||
// getSingleNodeObjectLayer - Instantiates single node object layer and returns it.
|
||||
func getSingleNodeObjectLayer() (ObjectLayer, string, error) {
|
||||
// Make a temporary directory to use as the obj.
|
||||
fsDir, err := ioutil.TempDir("", "minio-")
|
||||
func getSingleNodeObjectLayer(disk string) (ObjectLayer, error) {
|
||||
// Create the object layer.
|
||||
objLayer, err := newFSObjects(disk)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the obj.
|
||||
objLayer, err := newFSObjects(fsDir)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
return objLayer, fsDir, nil
|
||||
return objLayer, nil
|
||||
}
|
||||
|
||||
// removeRoots - Cleans up initialized directories during tests.
|
||||
@ -838,14 +878,14 @@ type objTestDiskNotFoundType func(obj ObjectLayer, instanceType string, dirs []s
|
||||
// ExecObjectLayerTest - executes object layer tests.
|
||||
// Creates single node and XL ObjectLayer instance and runs test for both the layers.
|
||||
func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
||||
objLayer, fsDir, err := getSingleNodeObjectLayer()
|
||||
objLayer, fsDir, err := prepareFS()
|
||||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for single node setup: %s", err)
|
||||
}
|
||||
// Executing the object layer tests for single node setup.
|
||||
objTest(objLayer, singleNodeTestStr, t)
|
||||
|
||||
objLayer, fsDirs, err := getXLObjectLayer()
|
||||
objLayer, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for XL setup: %s", err)
|
||||
}
|
||||
@ -857,7 +897,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
||||
// ExecObjectLayerDiskNotFoundTest - executes object layer tests while deleting
|
||||
// disks in between tests. Creates XL ObjectLayer instance and runs test for XL layer.
|
||||
func ExecObjectLayerDiskNotFoundTest(t *testing.T, objTest objTestDiskNotFoundType) {
|
||||
objLayer, fsDirs, err := getXLObjectLayer()
|
||||
objLayer, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for XL setup: %s", err)
|
||||
}
|
||||
@ -872,13 +912,18 @@ type objTestStaleFilesType func(obj ObjectLayer, instanceType string, dirs []str
|
||||
// ExecObjectLayerStaleFilesTest - executes object layer tests those leaves stale
|
||||
// files/directories under .minio/tmp. Creates XL ObjectLayer instance and runs test for XL layer.
|
||||
func ExecObjectLayerStaleFilesTest(t *testing.T, objTest objTestStaleFilesType) {
|
||||
objLayer, fsDirs, err := getXLObjectLayer()
|
||||
nDisks := 16
|
||||
erasureDisks, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatalf("Initialization of disks for XL setup: %s", err)
|
||||
}
|
||||
objLayer, err := getXLObjectLayer(erasureDisks)
|
||||
if err != nil {
|
||||
t.Fatalf("Initialization of object layer failed for XL setup: %s", err)
|
||||
}
|
||||
// Executing the object layer tests for XL.
|
||||
objTest(objLayer, xLTestStr, fsDirs, t)
|
||||
defer removeRoots(fsDirs)
|
||||
objTest(objLayer, xLTestStr, erasureDisks, t)
|
||||
defer removeRoots(erasureDisks)
|
||||
}
|
||||
|
||||
// Takes in XL/FS object layer, and the list of API end points to be tested/required, registers the API end points and returns the HTTP handler.
|
||||
|
@ -565,7 +565,11 @@ func (web *webAPIHandlers) GetBucketPolicy(r *http.Request, args *GetBucketPolic
|
||||
return &json2.Error{Message: "Unauthorized request"}
|
||||
}
|
||||
|
||||
policyInfo, err := readBucketAccessPolicy(web.ObjectAPI, args.BucketName)
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Server not initialized"}
|
||||
}
|
||||
policyInfo, err := readBucketAccessPolicy(objectAPI, args.BucketName)
|
||||
if err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
@ -596,7 +600,11 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
|
||||
return &json2.Error{Message: "Invalid policy " + args.Policy}
|
||||
}
|
||||
|
||||
policyInfo, err := readBucketAccessPolicy(web.ObjectAPI, args.BucketName)
|
||||
objectAPI := web.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
return &json2.Error{Message: "Server not initialized"}
|
||||
}
|
||||
policyInfo, err := readBucketAccessPolicy(objectAPI, args.BucketName)
|
||||
if err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
@ -609,7 +617,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
|
||||
}
|
||||
|
||||
// TODO: update policy statements according to bucket name, prefix and policy arguments.
|
||||
if err := writeBucketPolicy(args.BucketName, web.ObjectAPI, bytes.NewReader(data), int64(len(data))); err != nil {
|
||||
if err := writeBucketPolicy(args.BucketName, objectAPI, bytes.NewReader(data), int64(len(data))); err != nil {
|
||||
return &json2.Error{Message: err.Error()}
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ func TestRepeatPutObjectPart(t *testing.T) {
|
||||
var disks []string
|
||||
var err error
|
||||
|
||||
objLayer, disks, err = getXLObjectLayer()
|
||||
objLayer, disks, err = prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -77,7 +77,7 @@ func TestXLDeleteObjectBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create an instance of xl backend
|
||||
xl, fsDirs, err := getXLObjectLayer()
|
||||
xl, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -108,7 +108,7 @@ func TestXLDeleteObjectBasic(t *testing.T) {
|
||||
|
||||
func TestXLDeleteObjectDiskNotFound(t *testing.T) {
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := getXLObjectLayer()
|
||||
obj, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -155,7 +155,7 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) {
|
||||
|
||||
func TestGetObjectNoQuorum(t *testing.T) {
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := getXLObjectLayer()
|
||||
obj, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -206,7 +206,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
|
||||
func TestPutObjectNoQuorum(t *testing.T) {
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := getXLObjectLayer()
|
||||
obj, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
10
cmd/xl-v1.go
10
cmd/xl-v1.go
@ -78,22 +78,12 @@ func repairDiskMetadata(storageDisks []StorageAPI) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize meta volume, if volume already exists ignores it.
|
||||
if err := initMetaVolume(storageDisks); err != nil {
|
||||
return fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
|
||||
}
|
||||
|
||||
// Handles different cases properly.
|
||||
switch reduceFormatErrs(sErrs, len(storageDisks)) {
|
||||
case errCorruptedFormat:
|
||||
if err := healFormatXLCorruptedDisks(storageDisks); err != nil {
|
||||
return fmt.Errorf("Unable to repair corrupted format, %s", err)
|
||||
}
|
||||
case errUnformattedDisk:
|
||||
// All drives online but fresh, initialize format.
|
||||
if err := initFormatXL(storageDisks); err != nil {
|
||||
return fmt.Errorf("Unable to initialize format, %s", err)
|
||||
}
|
||||
case errSomeDiskUnformatted:
|
||||
// All drives online but some report missing format.json.
|
||||
if err := healFormatXLFreshDisks(storageDisks); err != nil {
|
||||
|
@ -92,7 +92,7 @@ func TestCheckSufficientDisks(t *testing.T) {
|
||||
|
||||
// TestStorageInfo - tests storage info.
|
||||
func TestStorageInfo(t *testing.T) {
|
||||
objLayer, fsDirs, err := getXLObjectLayer()
|
||||
objLayer, fsDirs, err := prepareXL()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize 'XL' object layer.")
|
||||
}
|
||||
@ -138,6 +138,10 @@ func TestNewXL(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initializes all erasure disks
|
||||
err = formatDisks(erasureDisks, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to format disks for erasure, %s", err)
|
||||
}
|
||||
_, err = newXLObjects(erasureDisks, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize erasure, %s", err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user