mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
rpc: Add RPC client tests. (#2858)
This commit is contained in:
parent
0fc96fa25c
commit
64f37bbf5b
@ -35,7 +35,7 @@ type TestRPCControllerSuite struct {
|
|||||||
// Setting up the test suite.
|
// Setting up the test suite.
|
||||||
// Starting the Test server with temporary FS backend.
|
// Starting the Test server with temporary FS backend.
|
||||||
func (s *TestRPCControllerSuite) SetUpSuite(c *testing.T) {
|
func (s *TestRPCControllerSuite) SetUpSuite(c *testing.T) {
|
||||||
s.testServer = StartTestRPCServer(c, s.serverType)
|
s.testServer = StartTestControlRPCServer(c, s.serverType)
|
||||||
s.testAuthConf = &authConfig{
|
s.testAuthConf = &authConfig{
|
||||||
address: s.testServer.Server.Listener.Addr().String(),
|
address: s.testServer.Server.Listener.Addr().String(),
|
||||||
accessKey: s.testServer.AccessKey,
|
accessKey: s.testServer.AccessKey,
|
||||||
|
@ -85,8 +85,7 @@ func (rpcClient *RPCClient) dialRPCClient() (*rpc.Client, error) {
|
|||||||
}
|
}
|
||||||
io.WriteString(conn, "CONNECT "+rpcClient.rpcPath+" HTTP/1.0\n\n")
|
io.WriteString(conn, "CONNECT "+rpcClient.rpcPath+" HTTP/1.0\n\n")
|
||||||
|
|
||||||
// Require successful HTTP response
|
// Require successful HTTP response before switching to RPC protocol.
|
||||||
// before switching to RPC protocol.
|
|
||||||
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
|
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
|
||||||
if err == nil && resp.Status == "200 Connected to Go RPC" {
|
if err == nil && resp.Status == "200 Connected to Go RPC" {
|
||||||
rpc := rpc.NewClient(conn)
|
rpc := rpc.NewClient(conn)
|
||||||
|
@ -84,11 +84,12 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
|||||||
// Initialize router.
|
// Initialize router.
|
||||||
mux := router.NewRouter()
|
mux := router.NewRouter()
|
||||||
|
|
||||||
// Register storage rpc router.
|
|
||||||
registerStorageRPCRouters(mux, srvCmdConfig)
|
|
||||||
|
|
||||||
// Initialize distributed NS lock.
|
// Initialize distributed NS lock.
|
||||||
if isDistributedSetup(srvCmdConfig.disks) {
|
if isDistributedSetup(srvCmdConfig.disks) {
|
||||||
|
// Register storage rpc router only if its a distributed setup.
|
||||||
|
registerStorageRPCRouters(mux, srvCmdConfig)
|
||||||
|
|
||||||
|
// Register distributed namespace lock.
|
||||||
registerDistNSLockRouter(mux, srvCmdConfig)
|
registerDistNSLockRouter(mux, srvCmdConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,6 +243,7 @@ func checkNamingDisks(disks []string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validates remote disks are successfully accessible, ignores networks errors.
|
||||||
func validateRemoteDisks(disks []StorageAPI) error {
|
func validateRemoteDisks(disks []StorageAPI) error {
|
||||||
for _, disk := range disks {
|
for _, disk := range disks {
|
||||||
_, err := disk.DiskInfo()
|
_, err := disk.DiskInfo()
|
||||||
|
@ -111,6 +111,7 @@ func newRPCClient(networkPath string) (StorageAPI, error) {
|
|||||||
path: rpcPath,
|
path: rpcPath,
|
||||||
loginMethod: "Storage.LoginHandler",
|
loginMethod: "Storage.LoginHandler",
|
||||||
})
|
})
|
||||||
|
|
||||||
// Initialize network storage.
|
// Initialize network storage.
|
||||||
ndisk := &networkStorage{
|
ndisk := &networkStorage{
|
||||||
netAddr: netAddr,
|
netAddr: netAddr,
|
||||||
|
@ -17,11 +17,15 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -112,3 +116,161 @@ func TestStorageErr(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// API suite container common to both FS and XL.
|
||||||
|
type TestRPCStorageSuite struct {
|
||||||
|
serverType string
|
||||||
|
testServer TestServer
|
||||||
|
remoteDisks []StorageAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setting up the test suite.
|
||||||
|
// Starting the Test server with temporary FS backend.
|
||||||
|
func (s *TestRPCStorageSuite) SetUpSuite(c *testing.T) {
|
||||||
|
s.testServer = StartTestStorageRPCServer(c, s.serverType, 1)
|
||||||
|
splitAddrs := strings.Split(s.testServer.Server.Listener.Addr().String(), ":")
|
||||||
|
var err error
|
||||||
|
globalMinioPort, err = strconv.Atoi(splitAddrs[1])
|
||||||
|
if err != nil {
|
||||||
|
c.Fatalf("Unable to convert %s to its integer representation, %s", splitAddrs[1], err)
|
||||||
|
}
|
||||||
|
for _, disk := range s.testServer.Disks {
|
||||||
|
storageDisk, err := newRPCClient(splitAddrs[0] + ":" + disk)
|
||||||
|
if err != nil {
|
||||||
|
c.Fatal("Unable to initialize RPC client", err)
|
||||||
|
}
|
||||||
|
s.remoteDisks = append(s.remoteDisks, storageDisk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No longer used with gocheck, but used in explicit teardown code in
|
||||||
|
// each test function. // Called implicitly by "gopkg.in/check.v1"
|
||||||
|
// after all tests are run.
|
||||||
|
func (s *TestRPCStorageSuite) TearDownSuite(c *testing.T) {
|
||||||
|
s.testServer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRPCStorageClient(t *testing.T) {
|
||||||
|
// Setup code
|
||||||
|
s := &TestRPCStorageSuite{serverType: "XL"}
|
||||||
|
s.SetUpSuite(t)
|
||||||
|
|
||||||
|
// Run the test.
|
||||||
|
s.testRPCStorageClient(t)
|
||||||
|
|
||||||
|
// Teardown code
|
||||||
|
s.TearDownSuite(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestRPCStorageSuite) testRPCStorageClient(t *testing.T) {
|
||||||
|
// TODO - Fix below tests to run on windows.
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.testRPCStorageDisksInfo(t)
|
||||||
|
s.testRPCStorageVolOps(t)
|
||||||
|
s.testRPCStorageFileOps(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test storage disks info.
|
||||||
|
func (s *TestRPCStorageSuite) testRPCStorageDisksInfo(t *testing.T) {
|
||||||
|
for _, storageDisk := range s.remoteDisks {
|
||||||
|
diskInfo, err := storageDisk.DiskInfo()
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate DiskInfo", err)
|
||||||
|
}
|
||||||
|
if diskInfo.Total == 0 {
|
||||||
|
t.Error("Invalid diskInfo total")
|
||||||
|
}
|
||||||
|
if storageDisk.String() == "" {
|
||||||
|
t.Error("Stinger storageAPI should be non empty")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test storage vol operations.
|
||||||
|
func (s *TestRPCStorageSuite) testRPCStorageVolOps(t *testing.T) {
|
||||||
|
for _, storageDisk := range s.remoteDisks {
|
||||||
|
err := storageDisk.MakeVol("myvol")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate MakeVol", err)
|
||||||
|
}
|
||||||
|
volInfo, err := storageDisk.StatVol("myvol")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate StatVol", err)
|
||||||
|
}
|
||||||
|
if volInfo.Name != "myvol" {
|
||||||
|
t.Errorf("Expected `myvol` found %s instead", volInfo.Name)
|
||||||
|
}
|
||||||
|
if volInfo.Created.IsZero() {
|
||||||
|
t.Error("Expected created time to be non zero")
|
||||||
|
}
|
||||||
|
err = storageDisk.DeleteVol("myvol")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate DeleteVol", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests all file operations.
|
||||||
|
func (s *TestRPCStorageSuite) testRPCStorageFileOps(t *testing.T) {
|
||||||
|
for _, storageDisk := range s.remoteDisks {
|
||||||
|
err := storageDisk.MakeVol("myvol")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate MakeVol", err)
|
||||||
|
}
|
||||||
|
err = storageDisk.AppendFile("myvol", "file1", []byte("Hello, world"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate AppendFile", err)
|
||||||
|
}
|
||||||
|
fi, err := storageDisk.StatFile("myvol", "file1")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate StatFile", err)
|
||||||
|
}
|
||||||
|
if fi.Name != "file1" {
|
||||||
|
t.Errorf("Expected `file1` but got %s", fi.Name)
|
||||||
|
}
|
||||||
|
if fi.Volume != "myvol" {
|
||||||
|
t.Errorf("Expected `myvol` but got %s", fi.Volume)
|
||||||
|
}
|
||||||
|
if fi.Size != 12 {
|
||||||
|
t.Errorf("Expected 12 but got %d", fi.Size)
|
||||||
|
}
|
||||||
|
if !fi.Mode.IsRegular() {
|
||||||
|
t.Error("Expected file to be regular found", fi.Mode)
|
||||||
|
}
|
||||||
|
if fi.ModTime.IsZero() {
|
||||||
|
t.Error("Expected created time to be non zero")
|
||||||
|
}
|
||||||
|
buf, err := storageDisk.ReadAll("myvol", "file1")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate ReadAll", err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(buf, []byte("Hello, world")) {
|
||||||
|
t.Errorf("Expected `Hello, world`, got %s", string(buf))
|
||||||
|
}
|
||||||
|
buf1 := make([]byte, 5)
|
||||||
|
n, err := storageDisk.ReadFile("myvol", "file1", 4, buf1)
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate ReadFile", err)
|
||||||
|
}
|
||||||
|
if n != 5 {
|
||||||
|
t.Errorf("Expected `5`, got %d", n)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(buf[4:9], buf1) {
|
||||||
|
t.Errorf("Expected %s, got %s", string(buf[4:9]), string(buf1))
|
||||||
|
}
|
||||||
|
err = storageDisk.RenameFile("myvol", "file1", "myvol", "file2")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate RenameFile", err)
|
||||||
|
}
|
||||||
|
err = storageDisk.DeleteFile("myvol", "file2")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate DeleteFile", err)
|
||||||
|
}
|
||||||
|
err = storageDisk.DeleteVol("myvol")
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Unable to initiate DeleteVol", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
router "github.com/gorilla/mux"
|
router "github.com/gorilla/mux"
|
||||||
|
"github.com/minio/minio-go/pkg/set"
|
||||||
"github.com/minio/minio/pkg/disk"
|
"github.com/minio/minio/pkg/disk"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -223,14 +224,15 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
|
|||||||
exports := serverConfig.disks
|
exports := serverConfig.disks
|
||||||
ignoredExports := serverConfig.ignoredDisks
|
ignoredExports := serverConfig.ignoredDisks
|
||||||
|
|
||||||
// Save ignored disks in a map
|
// Initialize ignored disks in a new set.
|
||||||
skipDisks := make(map[string]bool)
|
ignoredSet := set.NewStringSet()
|
||||||
for _, ignoredExport := range ignoredExports {
|
if len(ignoredExports) > 0 {
|
||||||
skipDisks[ignoredExport] = true
|
ignoredSet = set.CreateStringSet(ignoredExports...)
|
||||||
}
|
}
|
||||||
t := time.Now().UTC()
|
t := time.Now().UTC()
|
||||||
for _, export := range exports {
|
for _, export := range exports {
|
||||||
if skipDisks[export] {
|
if ignoredSet.Contains(export) {
|
||||||
|
// Ignore initializing ignored export.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// e.g server:/mnt/disk1
|
// e.g server:/mnt/disk1
|
||||||
@ -253,7 +255,7 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return servers, err
|
return servers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// registerStorageRPCRouter - register storage rpc router.
|
// registerStorageRPCRouter - register storage rpc router.
|
||||||
|
@ -195,7 +195,47 @@ func StartTestServer(t TestErrHandler, instanceType string) TestServer {
|
|||||||
return testServer
|
return testServer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initializes control RPC end points.
|
// Initializes storage RPC endpoints.
|
||||||
|
// The object Layer will be a temp back used for testing purpose.
|
||||||
|
func initTestStorageRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
|
||||||
|
// Initialize router.
|
||||||
|
muxRouter := router.NewRouter()
|
||||||
|
registerStorageRPCRouters(muxRouter, srvCmdConfig)
|
||||||
|
return muxRouter
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartTestStorageRPCServer - Creates a temp XL/FS backend and initializes storage RPC end points,
|
||||||
|
// then starts a test server with those storage RPC end points registered.
|
||||||
|
func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int) TestServer {
|
||||||
|
// create temporary backend for the test server.
|
||||||
|
disks, err := getRandomDisks(diskN)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to create disks for the backend")
|
||||||
|
}
|
||||||
|
|
||||||
|
root, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an instance of TestServer.
|
||||||
|
testRPCServer := TestServer{}
|
||||||
|
// Get credential.
|
||||||
|
credentials := serverConfig.GetCredential()
|
||||||
|
|
||||||
|
testRPCServer.Root = root
|
||||||
|
testRPCServer.Disks = disks
|
||||||
|
testRPCServer.AccessKey = credentials.AccessKeyID
|
||||||
|
testRPCServer.SecretKey = credentials.SecretAccessKey
|
||||||
|
|
||||||
|
// Run TestServer.
|
||||||
|
testRPCServer.Server = httptest.NewServer(initTestStorageRPCEndPoint(serverCmdConfig{
|
||||||
|
disks: disks,
|
||||||
|
}))
|
||||||
|
return testRPCServer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initializes control RPC endpoints.
|
||||||
// The object Layer will be a temp back used for testing purpose.
|
// The object Layer will be a temp back used for testing purpose.
|
||||||
func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
|
func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
|
||||||
// Initialize router.
|
// Initialize router.
|
||||||
@ -204,9 +244,9 @@ func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
|
|||||||
return muxRouter
|
return muxRouter
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartTestRPCServer - Creates a temp XL/FS backend and initializes control RPC end points,
|
// StartTestControlRPCServer - Creates a temp XL/FS backend and initializes control RPC end points,
|
||||||
// then starts a test server with those control RPC end points registered.
|
// then starts a test server with those control RPC end points registered.
|
||||||
func StartTestRPCServer(t TestErrHandler, instanceType string) TestServer {
|
func StartTestControlRPCServer(t TestErrHandler, instanceType string) TestServer {
|
||||||
// create temporary backend for the test server.
|
// create temporary backend for the test server.
|
||||||
nDisks := 16
|
nDisks := 16
|
||||||
disks, err := getRandomDisks(nDisks)
|
disks, err := getRandomDisks(nDisks)
|
||||||
|
Loading…
Reference in New Issue
Block a user