mirror of
https://github.com/minio/minio.git
synced 2025-05-21 17:43:48 -04:00
api: Allow reconnection of policy/notification rpc clients. (#3368)
Since we moved out reconnection logic from net-rpc-client.go we should do it from the top-layer properly and bring back the code to reconnect properly in-case the connection is lost.
This commit is contained in:
parent
834007728c
commit
d056f19d07
@ -16,7 +16,10 @@
|
|||||||
|
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import "encoding/json"
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/rpc"
|
||||||
|
)
|
||||||
|
|
||||||
// BucketMetaState - Interface to update bucket metadata in-memory
|
// BucketMetaState - Interface to update bucket metadata in-memory
|
||||||
// state.
|
// state.
|
||||||
@ -104,26 +107,62 @@ type remoteBucketMetaState struct {
|
|||||||
// change to remote peer via RPC call.
|
// change to remote peer via RPC call.
|
||||||
func (rc *remoteBucketMetaState) UpdateBucketNotification(args *SetBucketNotificationPeerArgs) error {
|
func (rc *remoteBucketMetaState) UpdateBucketNotification(args *SetBucketNotificationPeerArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
return rc.Call("S3.SetBucketNotificationPeer", args, &reply)
|
err := rc.Call("S3.SetBucketNotificationPeer", args, &reply)
|
||||||
|
// Check for network error and retry once.
|
||||||
|
if err != nil && err == rpc.ErrShutdown {
|
||||||
|
// Close the underlying connection to attempt once more.
|
||||||
|
rc.Close()
|
||||||
|
|
||||||
|
// Attempt again and proceed.
|
||||||
|
err = rc.Call("S3.SetBucketNotificationPeer", args, &reply)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// remoteBucketMetaState.UpdateBucketListener - sends bucket listener change to
|
// remoteBucketMetaState.UpdateBucketListener - sends bucket listener change to
|
||||||
// remote peer via RPC call.
|
// remote peer via RPC call.
|
||||||
func (rc *remoteBucketMetaState) UpdateBucketListener(args *SetBucketListenerPeerArgs) error {
|
func (rc *remoteBucketMetaState) UpdateBucketListener(args *SetBucketListenerPeerArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
return rc.Call("S3.SetBucketListenerPeer", args, &reply)
|
err := rc.Call("S3.SetBucketListenerPeer", args, &reply)
|
||||||
|
// Check for network error and retry once.
|
||||||
|
if err != nil && err == rpc.ErrShutdown {
|
||||||
|
// Close the underlying connection to attempt once more.
|
||||||
|
rc.Close()
|
||||||
|
|
||||||
|
// Attempt again and proceed.
|
||||||
|
err = rc.Call("S3.SetBucketListenerPeer", args, &reply)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// remoteBucketMetaState.UpdateBucketPolicy - sends bucket policy change to remote
|
// remoteBucketMetaState.UpdateBucketPolicy - sends bucket policy change to remote
|
||||||
// peer via RPC call.
|
// peer via RPC call.
|
||||||
func (rc *remoteBucketMetaState) UpdateBucketPolicy(args *SetBucketPolicyPeerArgs) error {
|
func (rc *remoteBucketMetaState) UpdateBucketPolicy(args *SetBucketPolicyPeerArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
return rc.Call("S3.SetBucketPolicyPeer", args, &reply)
|
err := rc.Call("S3.SetBucketPolicyPeer", args, &reply)
|
||||||
|
// Check for network error and retry once.
|
||||||
|
if err != nil && err == rpc.ErrShutdown {
|
||||||
|
// Close the underlying connection to attempt once more.
|
||||||
|
rc.Close()
|
||||||
|
|
||||||
|
// Attempt again and proceed.
|
||||||
|
err = rc.Call("S3.SetBucketPolicyPeer", args, &reply)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// remoteBucketMetaState.SendEvent - sends event for bucket listener to remote
|
// remoteBucketMetaState.SendEvent - sends event for bucket listener to remote
|
||||||
// peer via RPC call.
|
// peer via RPC call.
|
||||||
func (rc *remoteBucketMetaState) SendEvent(args *EventArgs) error {
|
func (rc *remoteBucketMetaState) SendEvent(args *EventArgs) error {
|
||||||
reply := GenericReply{}
|
reply := GenericReply{}
|
||||||
return rc.Call("S3.Event", args, &reply)
|
err := rc.Call("S3.Event", args, &reply)
|
||||||
|
// Check for network error and retry once.
|
||||||
|
if err != nil && err == rpc.ErrShutdown {
|
||||||
|
// Close the underlying connection to attempt once more.
|
||||||
|
rc.Close()
|
||||||
|
|
||||||
|
// Attempt again and proceed.
|
||||||
|
err = rc.Call("S3.Event", args, &reply)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
@ -71,8 +71,8 @@ func makeS3Peers(eps []*url.URL) s3Peers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ret = append(ret, s3Peer{
|
ret = append(ret, s3Peer{
|
||||||
ep.Host,
|
addr: ep.Host,
|
||||||
&remoteBucketMetaState{newAuthClient(&cfg)},
|
bmsClient: &remoteBucketMetaState{newAuthClient(&cfg)},
|
||||||
})
|
})
|
||||||
seenAddr[ep.Host] = true
|
seenAddr[ep.Host] = true
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user