mirror of
https://github.com/minio/minio.git
synced 2025-04-06 04:40:38 -04:00
parent
d8a11c8f4b
commit
db6b6e9518
@ -296,6 +296,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
targetServer := GetLocalPeer(globalEndpoints)
|
||||||
accountID := fmt.Sprintf("%d", UTCNow().UnixNano())
|
accountID := fmt.Sprintf("%d", UTCNow().UnixNano())
|
||||||
accountARN := fmt.Sprintf(
|
accountARN := fmt.Sprintf(
|
||||||
"%s:%s:%s:%s-%s",
|
"%s:%s:%s:%s-%s",
|
||||||
@ -303,8 +304,9 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
|
|||||||
serverConfig.GetRegion(),
|
serverConfig.GetRegion(),
|
||||||
accountID,
|
accountID,
|
||||||
snsTypeMinio,
|
snsTypeMinio,
|
||||||
globalMinioAddr,
|
targetServer,
|
||||||
)
|
)
|
||||||
|
|
||||||
var filterRules []filterRule
|
var filterRules []filterRule
|
||||||
|
|
||||||
for _, prefix := range prefixes {
|
for _, prefix := range prefixes {
|
||||||
@ -357,7 +359,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
|
|||||||
// nEventCh
|
// nEventCh
|
||||||
lc := listenerConfig{
|
lc := listenerConfig{
|
||||||
TopicConfig: *topicCfg,
|
TopicConfig: *topicCfg,
|
||||||
TargetServer: globalMinioAddr,
|
TargetServer: targetServer,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = AddBucketListenerConfig(bucket, &lc, objAPI)
|
err = AddBucketListenerConfig(bucket, &lc, objAPI)
|
||||||
|
@ -419,6 +419,29 @@ func CreateEndpoints(serverAddr string, args ...string) (string, EndpointList, S
|
|||||||
return serverAddr, endpoints, setupType, nil
|
return serverAddr, endpoints, setupType, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLocalPeer - returns local peer value, returns globalMinioAddr
|
||||||
|
// for FS and Erasure mode. In case of distributed server return
|
||||||
|
// the first element from the set of peers which indicate that
|
||||||
|
// they are local. There is always one entry that is local
|
||||||
|
// even with repeated server endpoints.
|
||||||
|
func GetLocalPeer(endpoints EndpointList) (localPeer string) {
|
||||||
|
peerSet := set.NewStringSet()
|
||||||
|
for _, endpoint := range endpoints {
|
||||||
|
if endpoint.Type() != URLEndpointType {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if endpoint.IsLocal && endpoint.Host != "" {
|
||||||
|
peerSet.Add(endpoint.Host)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if peerSet.IsEmpty() {
|
||||||
|
// If local peer is empty can happen in FS or Erasure coded mode.
|
||||||
|
// then set the value to globalMinioAddr instead.
|
||||||
|
return globalMinioAddr
|
||||||
|
}
|
||||||
|
return peerSet.ToSlice()[0]
|
||||||
|
}
|
||||||
|
|
||||||
// GetRemotePeers - get hosts information other than this minio service.
|
// GetRemotePeers - get hosts information other than this minio service.
|
||||||
func GetRemotePeers(endpoints EndpointList) []string {
|
func GetRemotePeers(endpoints EndpointList) []string {
|
||||||
peerSet := set.NewStringSet()
|
peerSet := set.NewStringSet()
|
||||||
|
@ -330,6 +330,38 @@ func TestCreateEndpoints(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests get local peer functionality, local peer is supposed to only return one entry per minio service.
|
||||||
|
// So it means that if you have say localhost:9000 and localhost:9001 as endpointArgs then localhost:9001
|
||||||
|
// is considered a remote service from localhost:9000 perspective.
|
||||||
|
func TestGetLocalPeer(t *testing.T) {
|
||||||
|
tempGlobalMinioAddr := globalMinioAddr
|
||||||
|
defer func() {
|
||||||
|
globalMinioAddr = tempGlobalMinioAddr
|
||||||
|
}()
|
||||||
|
globalMinioAddr = ":9000"
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
endpointArgs []string
|
||||||
|
expectedResult string
|
||||||
|
}{
|
||||||
|
{[]string{"/d1", "/d2", "d3", "d4"}, ":9000"},
|
||||||
|
{[]string{"http://localhost:9000/d1", "http://localhost:9000/d2", "http://example.org:9000/d3", "http://example.com:9000/d4"},
|
||||||
|
"localhost:9000"},
|
||||||
|
{[]string{"http://localhost:9000/d1", "http://example.org:9000/d2", "http://example.com:9000/d3", "http://example.net:9000/d4"},
|
||||||
|
"localhost:9000"},
|
||||||
|
{[]string{"http://localhost:9000/d1", "http://localhost:9001/d2", "http://localhost:9002/d3", "http://localhost:9003/d4"},
|
||||||
|
"localhost:9000"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, testCase := range testCases {
|
||||||
|
endpoints, _ := NewEndpointList(testCase.endpointArgs...)
|
||||||
|
remotePeer := GetLocalPeer(endpoints)
|
||||||
|
if remotePeer != testCase.expectedResult {
|
||||||
|
t.Fatalf("Test %d: expected: %v, got: %v", i+1, testCase.expectedResult, remotePeer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetRemotePeers(t *testing.T) {
|
func TestGetRemotePeers(t *testing.T) {
|
||||||
tempGlobalMinioPort := globalMinioPort
|
tempGlobalMinioPort := globalMinioPort
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -41,12 +41,13 @@ type s3Peers []s3Peer
|
|||||||
// slice. The urls slice is assumed to be non-empty and free of nil
|
// slice. The urls slice is assumed to be non-empty and free of nil
|
||||||
// values.
|
// values.
|
||||||
func makeS3Peers(endpoints EndpointList) (s3PeerList s3Peers) {
|
func makeS3Peers(endpoints EndpointList) (s3PeerList s3Peers) {
|
||||||
|
localAddr := GetLocalPeer(endpoints)
|
||||||
s3PeerList = append(s3PeerList, s3Peer{
|
s3PeerList = append(s3PeerList, s3Peer{
|
||||||
globalMinioAddr,
|
localAddr,
|
||||||
&localBucketMetaState{ObjectAPI: newObjectLayerFn},
|
&localBucketMetaState{ObjectAPI: newObjectLayerFn},
|
||||||
})
|
})
|
||||||
|
|
||||||
hostSet := set.CreateStringSet(globalMinioAddr)
|
hostSet := set.CreateStringSet(localAddr)
|
||||||
cred := serverConfig.GetCredential()
|
cred := serverConfig.GetCredential()
|
||||||
serviceEndpoint := path.Join(minioReservedBucketPath, s3Path)
|
serviceEndpoint := path.Join(minioReservedBucketPath, s3Path)
|
||||||
for _, host := range GetRemotePeers(endpoints) {
|
for _, host := range GetRemotePeers(endpoints) {
|
||||||
@ -56,17 +57,17 @@ func makeS3Peers(endpoints EndpointList) (s3PeerList s3Peers) {
|
|||||||
hostSet.Add(host)
|
hostSet.Add(host)
|
||||||
s3PeerList = append(s3PeerList, s3Peer{
|
s3PeerList = append(s3PeerList, s3Peer{
|
||||||
addr: host,
|
addr: host,
|
||||||
bmsClient: &remoteBucketMetaState{newAuthRPCClient(authConfig{
|
bmsClient: &remoteBucketMetaState{
|
||||||
accessKey: cred.AccessKey,
|
newAuthRPCClient(authConfig{
|
||||||
secretKey: cred.SecretKey,
|
accessKey: cred.AccessKey,
|
||||||
serverAddr: host,
|
secretKey: cred.SecretKey,
|
||||||
serviceEndpoint: serviceEndpoint,
|
serverAddr: host,
|
||||||
secureConn: globalIsSSL,
|
serviceEndpoint: serviceEndpoint,
|
||||||
serviceName: "S3",
|
secureConn: globalIsSSL,
|
||||||
})},
|
serviceName: "S3",
|
||||||
|
})},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return s3PeerList
|
return s3PeerList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,6 @@ func TestMakeS3Peers(t *testing.T) {
|
|||||||
peers []string
|
peers []string
|
||||||
}{
|
}{
|
||||||
{"127.0.0.1:9000", mustGetNewEndpointList("/mnt/disk1"), []string{"127.0.0.1:9000"}},
|
{"127.0.0.1:9000", mustGetNewEndpointList("/mnt/disk1"), []string{"127.0.0.1:9000"}},
|
||||||
{"127.0.0.1:9000", mustGetNewEndpointList("http://localhost:9001/d1"), []string{"127.0.0.1:9000", "localhost:9001"}},
|
|
||||||
{"example.org:9000", mustGetNewEndpointList("http://example.org:9000/d1", "http://example.com:9000/d1", "http://example.net:9000/d1", "http://example.edu:9000/d1"), []string{"example.org:9000", "example.com:9000", "example.edu:9000", "example.net:9000"}},
|
{"example.org:9000", mustGetNewEndpointList("http://example.org:9000/d1", "http://example.com:9000/d1", "http://example.net:9000/d1", "http://example.edu:9000/d1"), []string{"example.org:9000", "example.com:9000", "example.edu:9000", "example.net:9000"}},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user