Add extensive endpoints validation (#4019)

This commit is contained in:
Bala FA
2017-04-12 04:14:27 +05:30
committed by Harshavardhana
parent 1b1b9e4801
commit de204a0a52
48 changed files with 1432 additions and 2269 deletions

View File

@@ -91,11 +91,7 @@ func prepareXL() (ObjectLayer, []string, error) {
if err != nil {
return nil, nil, err
}
endpoints, err := parseStorageEndpoints(fsDirs)
if err != nil {
return nil, nil, err
}
obj, _, err := initObjectLayer(endpoints)
obj, _, err := initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
removeRoots(fsDirs)
return nil, nil, err
@@ -180,12 +176,12 @@ func isSameType(obj1, obj2 interface{}) bool {
// defer s.Stop()
type TestServer struct {
Root string
Disks []*url.URL
Disks EndpointList
AccessKey string
SecretKey string
Server *httptest.Server
Obj ObjectLayer
SrvCmdCfg serverCmdConfig
endpoints EndpointList
}
// UnstartedTestServer - Configures a temp FS/XL backend,
@@ -210,50 +206,31 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
credentials := serverConfig.GetCredential()
testServer.Obj = objLayer
testServer.Disks, err = parseStorageEndpoints(disks)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
testServer.Disks = mustGetNewEndpointList(disks...)
testServer.Root = root
testServer.AccessKey = credentials.AccessKey
testServer.SecretKey = credentials.SecretKey
srvCmdCfg := serverCmdConfig{
endpoints: testServer.Disks,
}
httpHandler, err := configureServerHandler(
srvCmdCfg,
)
httpHandler, err := configureServerHandler(testServer.Disks)
if err != nil {
t.Fatalf("Failed to configure one of the RPC services <ERROR> %s", err)
}
// Run TestServer.
testServer.Server = httptest.NewUnstartedServer(httpHandler)
// obtain server address.
srvCmdCfg.serverAddr = testServer.Server.Listener.Addr().String()
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
globalObjLayerMutex.Unlock()
// initialize peer rpc
host, port, err := net.SplitHostPort(srvCmdCfg.serverAddr)
if err != nil {
t.Fatal("Early setup error:", err)
}
host, port := mustSplitHostPort(testServer.Server.Listener.Addr().String())
globalMinioHost = host
globalMinioPort = port
globalMinioAddr = getLocalAddress(srvCmdCfg)
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatal("Early setup error:", err)
}
initGlobalS3Peers(endpoints)
globalMinioAddr = getEndpointsLocalAddr(testServer.Disks)
initGlobalS3Peers(testServer.Disks)
return testServer
}
// testServerCertPEM and testServerKeyPEM are generated by
@@ -339,10 +316,10 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
// Initializes storage RPC endpoints.
// The object Layer will be a temp back used for testing purpose.
func initTestStorageRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
func initTestStorageRPCEndPoint(endpoints EndpointList) http.Handler {
// Initialize router.
muxRouter := router.NewRouter()
registerStorageRPCRouters(muxRouter, srvCmdConfig)
registerStorageRPCRouters(muxRouter, endpoints)
return muxRouter
}
@@ -354,10 +331,6 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int)
if err != nil {
t.Fatal("Failed to create disks for the backend")
}
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatalf("%s", err)
}
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
@@ -369,15 +342,14 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int)
// Get credential.
credentials := serverConfig.GetCredential()
endpoints := mustGetNewEndpointList(disks...)
testRPCServer.Root = root
testRPCServer.Disks = endpoints
testRPCServer.AccessKey = credentials.AccessKey
testRPCServer.SecretKey = credentials.SecretKey
// Run TestServer.
testRPCServer.Server = httptest.NewServer(initTestStorageRPCEndPoint(serverCmdConfig{
endpoints: endpoints,
}))
testRPCServer.Server = httptest.NewServer(initTestStorageRPCEndPoint(endpoints))
return testRPCServer
}
@@ -389,10 +361,6 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer {
if err != nil {
t.Fatal("Failed to create disks for the backend")
}
endpoints, err := parseStorageEndpoints(disks)
if err != nil {
t.Fatalf("%s", err)
}
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
@@ -404,6 +372,7 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer {
// Get credential.
credentials := serverConfig.GetCredential()
endpoints := mustGetNewEndpointList(disks...)
testRPCServer.Root = root
testRPCServer.Disks = endpoints
testRPCServer.AccessKey = credentials.AccessKey
@@ -420,13 +389,9 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer {
testRPCServer.Obj = objLayer
globalObjLayerMutex.Unlock()
srvCfg := serverCmdConfig{
endpoints: endpoints,
}
mux := router.NewRouter()
// need storage layer for bucket config storage.
registerStorageRPCRouters(mux, srvCfg)
registerStorageRPCRouters(mux, endpoints)
// need API layer to send requests, etc.
registerAPIRouter(mux)
// module being tested is Peer RPCs router.
@@ -436,7 +401,7 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer {
testRPCServer.Server = httptest.NewServer(mux)
// initialize remainder of serverCmdConfig
testRPCServer.SrvCmdCfg = srvCfg
testRPCServer.endpoints = endpoints
return testRPCServer
}
@@ -481,7 +446,7 @@ func resetGlobalEventnotify() {
}
func resetGlobalEndpoints() {
globalEndpoints = []*url.URL{}
globalEndpoints = EndpointList{}
}
func resetGlobalIsXL() {
@@ -1659,7 +1624,7 @@ func getRandomDisks(N int) ([]string, error) {
}
// initObjectLayer - Instantiates object layer and returns it.
func initObjectLayer(endpoints []*url.URL) (ObjectLayer, []StorageAPI, error) {
func initObjectLayer(endpoints EndpointList) (ObjectLayer, []StorageAPI, error) {
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return nil, nil, err
@@ -1738,12 +1703,8 @@ func prepareXLStorageDisks(t *testing.T) ([]StorageAPI, []string) {
if err != nil {
t.Fatal("Unexpected error: ", err)
}
endpoints, err := parseStorageEndpoints(fsDirs)
if err != nil {
t.Fatal("Unexpected error: ", err)
}
_, storageDisks, err := initObjectLayer(endpoints)
_, storageDisks, err := initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
removeRoots(fsDirs)
t.Fatal("Unable to initialize storage disks", err)
@@ -2077,11 +2038,7 @@ func ExecObjectLayerStaleFilesTest(t *testing.T, objTest objTestStaleFilesType)
if err != nil {
t.Fatalf("Initialization of disks for XL setup: %s", err)
}
endpoints, err := parseStorageEndpoints(erasureDisks)
if err != nil {
t.Fatalf("Initialization of disks for XL setup: %s", err)
}
objLayer, _, err := initObjectLayer(endpoints)
objLayer, _, err := initObjectLayer(mustGetNewEndpointList(erasureDisks...))
if err != nil {
t.Fatalf("Initialization of object layer failed for XL setup: %s", err)
}
@@ -2380,3 +2337,27 @@ func generateTLSCertKey(host string) ([]byte, []byte, error) {
return certOut.Bytes(), keyOut.Bytes(), nil
}
func mustGetNewEndpointList(args ...string) (endpoints EndpointList) {
if len(args) == 1 {
endpoint, err := NewEndpoint(args[0])
fatalIf(err, "unable to create new endpoint")
endpoints = append(endpoints, endpoint)
} else {
var err error
endpoints, err = NewEndpointList(args...)
fatalIf(err, "unable to create new endpoint list")
}
return endpoints
}
func getEndpointsLocalAddr(endpoints EndpointList) string {
for _, endpoint := range endpoints {
if endpoint.IsLocal && endpoint.Type() == URLEndpointType {
return endpoint.Host
}
}
return globalMinioHost + ":" + globalMinioPort
}