mirror of
https://github.com/minio/minio.git
synced 2025-04-25 12:34:03 -04:00
rpc: Our rpcClient should make an attempt to reconnect. (#3221)
rpcClient should attempt a reconnect if the call fails with 'rpc.ErrShutdown' this is needed since at times when the servers are taken down and brought back up. The hijacked connection from net.Dial is usually closed. So upon first attempt rpcClient might falsely indicate that disk to be down, to avoid this state make another dial attempt to really fail. Fixes #3206 Fixes #3205
This commit is contained in:
parent
cf2fb30ac7
commit
2f7fb78692
@ -166,11 +166,9 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface {
|
|||||||
// Call the underlying rpc.
|
// Call the underlying rpc.
|
||||||
err = authClient.rpc.Call(serviceMethod, args, reply)
|
err = authClient.rpc.Call(serviceMethod, args, reply)
|
||||||
|
|
||||||
// Invalidate token to mark for re-login on subsequent reconnect.
|
// Invalidate token, and mark it for re-login on subsequent reconnect.
|
||||||
if err != nil {
|
if err != nil && err == rpc.ErrShutdown {
|
||||||
if err.Error() == rpc.ErrShutdown.Error() {
|
authClient.isLoggedIn = false
|
||||||
authClient.isLoggedIn = false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -16,10 +16,7 @@
|
|||||||
|
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import "encoding/json"
|
||||||
"encoding/json"
|
|
||||||
"net/rpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
// BucketMetaState - Interface to update bucket metadata in-memory
|
// BucketMetaState - Interface to update bucket metadata in-memory
|
||||||
// state.
|
// state.
|
||||||
@ -107,46 +104,26 @@ type remoteBMS struct {
|
|||||||
// change to remote peer via RPC call.
|
// change to remote peer via RPC call.
|
||||||
func (rc *remoteBMS) UpdateBucketNotification(args *SetBNPArgs) error {
|
func (rc *remoteBMS) UpdateBucketNotification(args *SetBNPArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
err := rc.Call("S3.SetBucketNotificationPeer", args, &reply)
|
return rc.Call("S3.SetBucketNotificationPeer", args, &reply)
|
||||||
// Check for network error and retry once.
|
|
||||||
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
|
|
||||||
err = rc.Call("S3.SetBucketNotificationPeer", args, &reply)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// remoteBMS.UpdateBucketListener - sends bucket listener change to
|
// remoteBMS.UpdateBucketListener - sends bucket listener change to
|
||||||
// remote peer via RPC call.
|
// remote peer via RPC call.
|
||||||
func (rc *remoteBMS) UpdateBucketListener(args *SetBLPArgs) error {
|
func (rc *remoteBMS) UpdateBucketListener(args *SetBLPArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
err := rc.Call("S3.SetBucketListenerPeer", args, &reply)
|
return rc.Call("S3.SetBucketListenerPeer", args, &reply)
|
||||||
// Check for network error and retry once.
|
|
||||||
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
|
|
||||||
err = rc.Call("S3.SetBucketListenerPeer", args, &reply)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// remoteBMS.UpdateBucketPolicy - sends bucket policy change to remote
|
// remoteBMS.UpdateBucketPolicy - sends bucket policy change to remote
|
||||||
// peer via RPC call.
|
// peer via RPC call.
|
||||||
func (rc *remoteBMS) UpdateBucketPolicy(args *SetBPPArgs) error {
|
func (rc *remoteBMS) UpdateBucketPolicy(args *SetBPPArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
err := rc.Call("S3.SetBucketPolicyPeer", args, &reply)
|
return rc.Call("S3.SetBucketPolicyPeer", args, &reply)
|
||||||
// Check for network error and retry once.
|
|
||||||
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
|
|
||||||
err = rc.Call("S3.SetBucketPolicyPeer", args, &reply)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// remoteBMS.SendEvent - sends event for bucket listener to remote
|
// remoteBMS.SendEvent - sends event for bucket listener to remote
|
||||||
// peer via RPC call.
|
// peer via RPC call.
|
||||||
func (rc *remoteBMS) SendEvent(args *EventArgs) error {
|
func (rc *remoteBMS) SendEvent(args *EventArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
err := rc.Call("S3.Event", args, &reply)
|
return rc.Call("S3.Event", args, &reply)
|
||||||
// Check for network error and retry once.
|
|
||||||
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
|
|
||||||
err = rc.Call("S3.Event", args, &reply)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
@ -246,7 +246,7 @@ func genericFormatCheck(formatConfigs []*formatConfigV1, sErrs []error) (err err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Calculate read quorum.
|
// Calculate read quorum.
|
||||||
readQuorum := len(formatConfigs)/2 + 1
|
readQuorum := len(formatConfigs) / 2
|
||||||
|
|
||||||
// Validate the err count under tolerant limit.
|
// Validate the err count under tolerant limit.
|
||||||
if errCount > len(formatConfigs)-readQuorum {
|
if errCount > len(formatConfigs)-readQuorum {
|
||||||
|
@ -142,17 +142,26 @@ func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply i
|
|||||||
// rpc.Client for a subsequent reconnect.
|
// rpc.Client for a subsequent reconnect.
|
||||||
err := rpcLocalStack.Call(serviceMethod, args, reply)
|
err := rpcLocalStack.Call(serviceMethod, args, reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.Error() == rpc.ErrShutdown.Error() {
|
// Any errors other than rpc.ErrShutdown just return quickly.
|
||||||
// Reset rpcClient.rpc to nil to trigger a reconnect in future
|
if err != rpc.ErrShutdown {
|
||||||
// and close the underlying connection.
|
return err
|
||||||
rpcClient.clearRPCClient()
|
} // else rpc.ErrShutdown returned by rpc.Call
|
||||||
|
|
||||||
// Close the underlying connection.
|
// Reset the underlying rpc connection before
|
||||||
rpcLocalStack.Close()
|
// moving to reconnect.
|
||||||
|
rpcClient.clearRPCClient()
|
||||||
|
|
||||||
// Set rpc error as rpc.ErrShutdown type.
|
// Close the underlying connection before reconnect.
|
||||||
err = rpc.ErrShutdown
|
rpcLocalStack.Close()
|
||||||
|
|
||||||
|
// Try once more to re-connect.
|
||||||
|
rpcLocalStack, err = rpcClient.dialRPCClient()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attempt the rpc.Call once again, upon any error now just give up.
|
||||||
|
err = rpcLocalStack.Call(serviceMethod, args, reply)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/rpc"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
)
|
)
|
||||||
@ -71,15 +70,7 @@ func (lc listenerConn) Fire(entry *logrus.Entry) error {
|
|||||||
|
|
||||||
// Send Event RPC call and return error
|
// Send Event RPC call and return error
|
||||||
arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN}
|
arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN}
|
||||||
err := lc.BMSClient.SendEvent(&arg)
|
return lc.BMSClient.SendEvent(&arg)
|
||||||
|
|
||||||
// In case connection is shutdown, retry once.
|
|
||||||
if err != nil {
|
|
||||||
if err.Error() == rpc.ErrShutdown.Error() {
|
|
||||||
err = lc.BMSClient.SendEvent(&arg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc listenerConn) Levels() []logrus.Level {
|
func (lc listenerConn) Levels() []logrus.Level {
|
||||||
|
@ -639,7 +639,7 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PrepareFile - run prior actions before creating a new file for optimization purposes
|
// PrepareFile - run prior actions before creating a new file for optimization purposes
|
||||||
// Currenty we use fallocate when available to avoid disk fragmentation as much as possible
|
// Currently we use fallocate when available to avoid disk fragmentation as much as possible
|
||||||
func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
|
func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
|
||||||
|
|
||||||
// It doesn't make sense to create a negative-sized file
|
// It doesn't make sense to create a negative-sized file
|
||||||
|
@ -28,7 +28,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The value choosen below is longest word choosen
|
// The value chosen below is longest word chosen
|
||||||
// from all the http verbs comprising of
|
// from all the http verbs comprising of
|
||||||
// "PRI", "OPTIONS", "GET", "HEAD", "POST",
|
// "PRI", "OPTIONS", "GET", "HEAD", "POST",
|
||||||
// "PUT", "DELETE", "TRACE", "CONNECT".
|
// "PUT", "DELETE", "TRACE", "CONNECT".
|
||||||
|
@ -107,7 +107,7 @@ func dial(addr string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests initalizing listeners.
|
// Tests initializing listeners.
|
||||||
func TestInitListeners(t *testing.T) {
|
func TestInitListeners(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
serverAddr string
|
serverAddr string
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
# Minio Environmental varaibles
|
# Minio Environmental variables
|
||||||
|
|
||||||
#### MINIO_ENABLE_FSMETA
|
#### MINIO_ENABLE_FSMETA
|
||||||
When enabled, minio-FS saves the HTTP headers that start with `X-Amz-Meta-` and `X-Minio-Meta`. These header meta data can be retrieved on HEAD and GET requests on the object.
|
When enabled, minio-FS saves the HTTP headers that start with `X-Amz-Meta-` and `X-Minio-Meta`. These header meta data can be retrieved on HEAD and GET requests on the object.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user