mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
server: Fixes for various conditions
- Fix distributed branch to be able to run FS version. - Fix distributed branch to be able to run XL local disks. - Ignore initialization failures of notification and bucket policies, the codepath should load whatever is possible.
This commit is contained in:
parent
67b8080144
commit
ba2ba328da
@ -265,12 +265,6 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
return
|
||||
}
|
||||
|
||||
objectAPI := api.ObjectAPI()
|
||||
if objectAPI == nil {
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
var wg = &sync.WaitGroup{} // Allocate a new wait group.
|
||||
var dErrs = make([]error, len(deleteObjects.Objects))
|
||||
|
||||
@ -279,7 +273,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
||||
wg.Add(1)
|
||||
go func(i int, obj ObjectIdentifier) {
|
||||
defer wg.Done()
|
||||
dErr := api.ObjectAPI.DeleteObject(bucket, obj.ObjectName)
|
||||
dErr := objectAPI.DeleteObject(bucket, obj.ObjectName)
|
||||
if dErr != nil {
|
||||
dErrs[i] = dErr
|
||||
}
|
||||
|
@ -68,25 +68,34 @@ func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolic
|
||||
buckets, err := objAPI.ListBuckets()
|
||||
errorIf(err, "Unable to list buckets.")
|
||||
err = errorCause(err)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
policies = make(map[string]*bucketPolicy)
|
||||
var pErrs []error
|
||||
// Loads bucket policy.
|
||||
for _, bucket := range buckets {
|
||||
var policy *bucketPolicy
|
||||
policy, err = readBucketPolicy(bucket.Name, objAPI)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
policy, pErr := readBucketPolicy(bucket.Name, objAPI)
|
||||
if pErr != nil {
|
||||
switch pErr.(type) {
|
||||
case BucketPolicyNotFound:
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
pErrs = append(pErrs, pErr)
|
||||
// Continue to load other bucket policies if possible.
|
||||
continue
|
||||
}
|
||||
policies[bucket.Name] = policy
|
||||
}
|
||||
|
||||
// Look for any errors occurred while reading bucket policies.
|
||||
for _, pErr := range pErrs {
|
||||
if pErr != nil {
|
||||
return policies, pErr
|
||||
}
|
||||
}
|
||||
|
||||
// Success.
|
||||
return policies, nil
|
||||
}
|
||||
@ -96,6 +105,7 @@ func initBucketPolicies(objAPI ObjectLayer) error {
|
||||
if objAPI == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
// Read all bucket policies.
|
||||
policies, err := loadAllBucketPolicies(objAPI)
|
||||
if err != nil {
|
||||
@ -130,22 +140,22 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader
|
||||
}
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath)
|
||||
errorIf(err, "Unable to get policy for the bucket %s.", bucket)
|
||||
err = errorCause(err)
|
||||
if err != nil {
|
||||
if _, ok := err.(ObjectNotFound); ok {
|
||||
return nil, BucketPolicyNotFound{Bucket: bucket}
|
||||
}
|
||||
errorIf(err, "Unable to load policy for the bucket %s.", bucket)
|
||||
return nil, err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
err = objAPI.GetObject(minioMetaBucket, policyPath, 0, objInfo.Size, &buffer)
|
||||
errorIf(err, "Unable to get policy for the bucket %s.", bucket)
|
||||
err = errorCause(err)
|
||||
if err != nil {
|
||||
if _, ok := err.(ObjectNotFound); ok {
|
||||
return nil, BucketPolicyNotFound{Bucket: bucket}
|
||||
}
|
||||
errorIf(err, "Unable to load policy for the bucket %s.", bucket)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,6 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
|
||||
// Construct the notification config path.
|
||||
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
|
||||
errorIf(err, "Unable to get bucket-notification for butkcet %s", bucket)
|
||||
err = errorCause(err)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return 'errNoSuchNotifications'.
|
||||
@ -236,12 +235,12 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
|
||||
case ObjectNotFound:
|
||||
return nil, errNoSuchNotifications
|
||||
}
|
||||
errorIf(err, "Unable to load bucket-notification for bucket %s", bucket)
|
||||
// Returns error for other errors.
|
||||
return nil, err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
|
||||
errorIf(err, "Unable to get bucket-notification for butkcet %s", bucket)
|
||||
err = errorCause(err)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return 'errNoSuchNotifications'.
|
||||
@ -250,6 +249,7 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
|
||||
case ObjectNotFound:
|
||||
return nil, errNoSuchNotifications
|
||||
}
|
||||
errorIf(err, "Unable to load bucket-notification for bucket %s", bucket)
|
||||
// Returns error for other errors.
|
||||
return nil, err
|
||||
}
|
||||
@ -277,13 +277,12 @@ func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationCon
|
||||
|
||||
// Loads all bucket notifications.
|
||||
for _, bucket := range buckets {
|
||||
var nCfg *notificationConfig
|
||||
nCfg, err = loadNotificationConfig(bucket.Name, objAPI)
|
||||
if err != nil {
|
||||
if err == errNoSuchNotifications {
|
||||
nCfg, nErr := loadNotificationConfig(bucket.Name, objAPI)
|
||||
if nErr != nil {
|
||||
if nErr == errNoSuchNotifications {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
return nil, nErr
|
||||
}
|
||||
configs[bucket.Name] = nCfg
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ func loadAllFormats(bootstrapDisks []StorageAPI) ([]*formatConfigV1, []error) {
|
||||
}
|
||||
}
|
||||
// Return all formats and nil
|
||||
return formatConfigs, nil
|
||||
return formatConfigs, sErrs
|
||||
}
|
||||
|
||||
// genericFormatCheck - validates and returns error.
|
||||
|
@ -319,7 +319,7 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) {
|
||||
}
|
||||
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile))
|
||||
// Ignore error if the metadata file is not found, other errors must be returned.
|
||||
if errorCause(err) != errFileNotFound {
|
||||
if err != nil && errorCause(err) != errFileNotFound {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@ -530,7 +530,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
|
||||
return FileInfo{}, traceError(err)
|
||||
}
|
||||
fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry, fsMetaJSONFile))
|
||||
if errorCause(mErr) != errFileNotFound {
|
||||
if mErr != nil && errorCause(mErr) != errFileNotFound {
|
||||
return FileInfo{}, traceError(mErr)
|
||||
}
|
||||
if len(fsMeta.Meta) == 0 {
|
||||
|
@ -104,14 +104,9 @@ const (
|
||||
func prepForInit(disks []string, sErrs []error, diskCount int) InitActions {
|
||||
// Count errors by error value.
|
||||
errMap := make(map[error]int)
|
||||
// If loadAllFormats returned successfully
|
||||
if sErrs == nil {
|
||||
errMap[nil] = diskCount
|
||||
} else {
|
||||
for _, err := range sErrs {
|
||||
errMap[err]++
|
||||
}
|
||||
}
|
||||
|
||||
quorum := diskCount/2 + 1
|
||||
disksOffline := errMap[errDiskNotFound]
|
||||
|
@ -17,7 +17,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
@ -33,16 +32,19 @@ func newObjectLayerFn() ObjectLayer {
|
||||
|
||||
// newObjectLayer - initialize any object layer depending on the number of disks.
|
||||
func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
var objAPI ObjectLayer
|
||||
var err error
|
||||
if len(disks) == 1 {
|
||||
exportPath := disks[0]
|
||||
// Initialize FS object layer.
|
||||
return newFSObjects(exportPath)
|
||||
}
|
||||
objAPI, err = newFSObjects(disks[0])
|
||||
} else {
|
||||
// Initialize XL object layer.
|
||||
objAPI, err := newXLObjects(disks, ignoredDisks)
|
||||
if err == errXLWriteQuorum {
|
||||
return objAPI, errors.New("Disks are different with last minio server run.")
|
||||
objAPI, err = newXLObjects(disks, ignoredDisks)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Migrate bucket policy from configDir to .minio.sys/buckets/
|
||||
err = migrateBucketPolicyConfig(objAPI)
|
||||
if err != nil {
|
||||
@ -58,9 +60,11 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
|
||||
// Register the callback that should be called when the process shuts down.
|
||||
globalShutdownCBs.AddObjectLayerCB(func() errCode {
|
||||
if objAPI != nil {
|
||||
if sErr := objAPI.Shutdown(); sErr != nil {
|
||||
return exitFailure
|
||||
}
|
||||
}
|
||||
return exitSuccess
|
||||
})
|
||||
|
||||
@ -68,14 +72,12 @@ func newObjectLayer(disks, ignoredDisks []string) (ObjectLayer, error) {
|
||||
err = initEventNotifier(objAPI)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to initialize event notification.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize and load bucket policies.
|
||||
err = initBucketPolicies(objAPI)
|
||||
if err != nil {
|
||||
errorIf(err, "Unable to load all bucket policies.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Success.
|
||||
|
Loading…
Reference in New Issue
Block a user