xl/bootup: Upon bootup handle errors loading bucket and event configs. (#3287)

In a situation when we have lots of buckets the bootup time
might have slowed down a bit but during this situation the
servers quickly going up and down would be an in-transit state.

Certain calls which do not use quorum like `readXLMetaStat`
might return an error saying `errDiskNotFound` this is returned
in place of expected `errFileNotFound` which leads to an issue
where server doesn't start.

To avoid this situation we need to ignore them as safe values
to be ignored, for the most part these are network related errors.

Fixes #3275
This commit is contained in:
Harshavardhana 2016-11-19 17:37:57 -08:00 committed by GitHub
parent 81eb7c0301
commit 1c47365445
6 changed files with 96 additions and 44 deletions

View File

@ -75,9 +75,8 @@ func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolic
// List buckets to proceed loading all notification configuration. // List buckets to proceed loading all notification configuration.
buckets, err := objAPI.ListBuckets() buckets, err := objAPI.ListBuckets()
errorIf(err, "Unable to list buckets.") errorIf(err, "Unable to list buckets.")
err = errorCause(err)
if err != nil { if err != nil {
return nil, err return nil, errorCause(err)
} }
policies = make(map[string]*bucketPolicy) policies = make(map[string]*bucketPolicy)
@ -86,11 +85,15 @@ func loadAllBucketPolicies(objAPI ObjectLayer) (policies map[string]*bucketPolic
for _, bucket := range buckets { for _, bucket := range buckets {
policy, pErr := readBucketPolicy(bucket.Name, objAPI) policy, pErr := readBucketPolicy(bucket.Name, objAPI)
if pErr != nil { if pErr != nil {
switch pErr.(type) { if !isErrIgnored(pErr, []error{
case BucketPolicyNotFound: // net.Dial fails for rpc client or any
continue // other unexpected errors during net.Dial.
} errDiskNotFound,
}) {
if !isErrBucketPolicyNotFound(pErr) {
pErrs = append(pErrs, pErr) pErrs = append(pErrs, pErr)
}
}
// Continue to load other bucket policies if possible. // Continue to load other bucket policies if possible.
continue continue
} }
@ -144,23 +147,21 @@ func getOldBucketsConfigPath() (string, error) {
func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader io.Reader, err error) { func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader io.Reader, err error) {
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON) policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath) objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, policyPath)
err = errorCause(err)
if err != nil { if err != nil {
if _, ok := err.(ObjectNotFound); ok { if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, BucketPolicyNotFound{Bucket: bucket} return nil, BucketPolicyNotFound{Bucket: bucket}
} }
errorIf(err, "Unable to load policy for the bucket %s.", bucket) errorIf(err, "Unable to load policy for the bucket %s.", bucket)
return nil, err return nil, errorCause(err)
} }
var buffer bytes.Buffer var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, policyPath, 0, objInfo.Size, &buffer) err = objAPI.GetObject(minioMetaBucket, policyPath, 0, objInfo.Size, &buffer)
err = errorCause(err)
if err != nil { if err != nil {
if _, ok := err.(ObjectNotFound); ok { if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, BucketPolicyNotFound{Bucket: bucket} return nil, BucketPolicyNotFound{Bucket: bucket}
} }
errorIf(err, "Unable to load policy for the bucket %s.", bucket) errorIf(err, "Unable to load policy for the bucket %s.", bucket)
return nil, err return nil, errorCause(err)
} }
return &buffer, nil return &buffer, nil
@ -200,8 +201,7 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
return nil return nil
} }
// writeBucketPolicy - save a bucket policy that is assumed to be // writeBucketPolicy - save a bucket policy that is assumed to be validated.
// validated.
func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) error { func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) error {
buf, err := json.Marshal(bpy) buf, err := json.Marshal(bpy)
if err != nil { if err != nil {

View File

@ -308,7 +308,7 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
// 'notification.xml' not found return // 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no // 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket. // bucket notifications are found on the bucket.
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, errNoSuchNotifications return nil, errNoSuchNotifications
} }
errorIf(err, "Unable to load bucket-notification for bucket %s", bucket) errorIf(err, "Unable to load bucket-notification for bucket %s", bucket)
@ -321,7 +321,7 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
// 'notification.xml' not found return // 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no // 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket. // bucket notifications are found on the bucket.
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
return nil, errNoSuchNotifications return nil, errNoSuchNotifications
} }
errorIf(err, "Unable to load bucket-notification for bucket %s", bucket) errorIf(err, "Unable to load bucket-notification for bucket %s", bucket)
@ -430,11 +430,34 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer
} }
// Remove listener configuration from storage layer. Used when a bucket is deleted. // Remove listener configuration from storage layer. Used when a bucket is deleted.
func removeListenerConfig(bucket string, obj ObjectLayer) error { func removeListenerConfig(bucket string, objAPI ObjectLayer) error {
// make the path // make the path
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
// remove it // remove it
return obj.DeleteObject(minioMetaBucket, lcPath) return objAPI.DeleteObject(minioMetaBucket, lcPath)
}
// Loads both notification and listener config.
func loadNotificationAndListenerConfig(bucketName string, objAPI ObjectLayer) (nCfg *notificationConfig, lCfg []listenerConfig, err error) {
nConfigErrs := []error{
// When no previous notification configs were found.
errNoSuchNotifications,
// net.Dial fails for rpc client or any
// other unexpected errors during net.Dial.
errDiskNotFound,
}
// Loads notification config if any.
nCfg, err = loadNotificationConfig(bucketName, objAPI)
if err != nil && !isErrIgnored(err, nConfigErrs) {
return nil, nil, err
}
// Loads listener config if any.
lCfg, err = loadListenerConfig(bucketName, objAPI)
if err != nil && !isErrIgnored(err, nConfigErrs) {
return nil, nil, err
}
return nCfg, lCfg, nil
} }
// loads all bucket notifications if present. // loads all bucket notifications if present.
@ -450,21 +473,11 @@ func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationCon
// Loads all bucket notifications. // Loads all bucket notifications.
for _, bucket := range buckets { for _, bucket := range buckets {
nCfg, nErr := loadNotificationConfig(bucket.Name, objAPI) // Load persistent notification and listener configurations
if nErr != nil { // a given bucket name.
if nErr != errNoSuchNotifications { nConfigs[bucket.Name], lConfigs[bucket.Name], err = loadNotificationAndListenerConfig(bucket.Name, objAPI)
return nil, nil, nErr if err != nil {
} return nil, nil, err
} else {
nConfigs[bucket.Name] = nCfg
}
lCfg, lErr := loadListenerConfig(bucket.Name, objAPI)
if lErr != nil {
if lErr != errNoSuchNotifications {
return nil, nil, lErr
}
} else {
lConfigs[bucket.Name] = lCfg
} }
} }

View File

@ -21,6 +21,7 @@ import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"errors" "errors"
"fmt"
"io" "io"
"net" "net"
"net/http" "net/http"
@ -80,8 +81,13 @@ func (rpcClient *RPCClient) dialRPCClient() (*rpc.Client, error) {
if rpcClient.secureConn { if rpcClient.secureConn {
hostname, _, splitErr := net.SplitHostPort(rpcClient.node) hostname, _, splitErr := net.SplitHostPort(rpcClient.node)
if splitErr != nil { if splitErr != nil {
return nil, errors.New("Unable to parse RPC address <" + rpcClient.node + "> : " + splitErr.Error()) err = errors.New("Unable to parse RPC address <" + rpcClient.node + "> : " + splitErr.Error())
return nil, &net.OpError{
Op: "dial-http",
Net: rpcClient.node + " " + rpcClient.rpcPath,
Addr: nil,
Err: err,
}
} }
// ServerName in tls.Config needs to be specified to support SNI certificates // ServerName in tls.Config needs to be specified to support SNI certificates
conn, err = tls.Dial("tcp", rpcClient.node, &tls.Config{ServerName: hostname, RootCAs: globalRootCAs}) conn, err = tls.Dial("tcp", rpcClient.node, &tls.Config{ServerName: hostname, RootCAs: globalRootCAs})
@ -95,7 +101,12 @@ func (rpcClient *RPCClient) dialRPCClient() (*rpc.Client, error) {
case x509.HostnameError: case x509.HostnameError:
errorIf(err, "Unable to establish RPC to %s", rpcClient.node) errorIf(err, "Unable to establish RPC to %s", rpcClient.node)
} }
return nil, err return nil, &net.OpError{
Op: "dial-http",
Net: rpcClient.node + " " + rpcClient.rpcPath,
Addr: nil,
Err: err,
}
} }
io.WriteString(conn, "CONNECT "+rpcClient.rpcPath+" HTTP/1.0\n\n") io.WriteString(conn, "CONNECT "+rpcClient.rpcPath+" HTTP/1.0\n\n")
@ -104,7 +115,12 @@ func (rpcClient *RPCClient) dialRPCClient() (*rpc.Client, error) {
if err == nil && resp.Status == "200 Connected to Go RPC" { if err == nil && resp.Status == "200 Connected to Go RPC" {
rpc := rpc.NewClient(conn) rpc := rpc.NewClient(conn)
if rpc == nil { if rpc == nil {
return nil, errors.New("No valid RPC Client created after dial") return nil, &net.OpError{
Op: "dial-http",
Net: rpcClient.node + " " + rpcClient.rpcPath,
Addr: nil,
Err: fmt.Errorf("Unable to initialize new rpcClient, %s", errUnexpected),
}
} }
rpcClient.mu.Lock() rpcClient.mu.Lock()
rpcClient.rpcPrivate = rpc rpcClient.rpcPrivate = rpc

View File

@ -325,6 +325,26 @@ func (e NotImplemented) Error() string {
return "Not Implemented" return "Not Implemented"
} }
// Check if error type is IncompleteBody.
func isErrIncompleteBody(err error) bool {
err = errorCause(err)
switch err.(type) {
case IncompleteBody:
return true
}
return false
}
// Check if error type is BucketPolicyNotFound.
func isErrBucketPolicyNotFound(err error) bool {
err = errorCause(err)
switch err.(type) {
case BucketPolicyNotFound:
return true
}
return false
}
// Check if error type is ObjectNameInvalid. // Check if error type is ObjectNameInvalid.
func isErrObjectNameInvalid(err error) bool { func isErrObjectNameInvalid(err error) bool {
err = errorCause(err) err = errorCause(err)

View File

@ -140,7 +140,7 @@ func (n networkStorage) String() string {
func (n networkStorage) DiskInfo() (info disk.Info, err error) { func (n networkStorage) DiskInfo() (info disk.Info, err error) {
args := GenericArgs{} args := GenericArgs{}
if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil { if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil {
return disk.Info{}, err return disk.Info{}, toStorageErr(err)
} }
return info, nil return info, nil
} }
@ -160,7 +160,7 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) {
ListVols := ListVolsReply{} ListVols := ListVolsReply{}
err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols) err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols)
if err != nil { if err != nil {
return nil, err return nil, toStorageErr(err)
} }
return ListVols.Vols, nil return ListVols.Vols, nil
} }

View File

@ -77,8 +77,8 @@ func (xl xlObjects) HealBucket(bucket string) error {
return healBucketMetadata(xl.storageDisks, bucket) return healBucketMetadata(xl.storageDisks, bucket)
} }
func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error {
// Heal bucket - create buckets on disks where it does not exist. // Heal bucket - create buckets on disks where it does not exist.
func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error {
bucketLock := nsMutex.NewNSLock(bucket, "") bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock() bucketLock.Lock()
defer bucketLock.Unlock() defer bucketLock.Unlock()
@ -139,7 +139,7 @@ func healBucketMetadata(storageDisks []StorageAPI, bucket string) error {
metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath) metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath)
metaLock.RLock() metaLock.RLock()
defer metaLock.RUnlock() defer metaLock.RUnlock()
// Heals the metaPath. // Heals the given file at metaPath.
if err := healObject(storageDisks, minioMetaBucket, metaPath); err != nil && !isErrObjectNotFound(err) { if err := healObject(storageDisks, minioMetaBucket, metaPath); err != nil && !isErrObjectNotFound(err) {
return err return err
} // Success. } // Success.
@ -299,10 +299,13 @@ func healObject(storageDisks []StorageAPI, bucket string, object string) error {
return err return err
} }
for index, sum := range checkSums { for index, sum := range checkSums {
if outDatedDisks[index] == nil { if outDatedDisks[index] != nil {
continue checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{
Name: partName,
Algorithm: sumInfo.Algorithm,
Hash: sum,
})
} }
checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
} }
} }