mirror of https://github.com/minio/minio.git
cleanup dsync tests and remove net/rpc references (#14118)
This commit is contained in:
parent
70e1cbda21
commit
1a56ebea70
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
for d in $(go list ./... | grep -v browser); do
|
## TODO remove `dsync` from race detector once this is merged and released https://go-review.googlesource.com/c/go/+/333529/
|
||||||
CGO_ENABLED=1 go test -v -tags kqueue -race --timeout 100m "$d"
|
for d in $(go list ./... | grep -v dsync); do
|
||||||
|
CGO_ENABLED=1 go test -v -race --timeout 100m "$d"
|
||||||
done
|
done
|
||||||
|
|
|
@ -18,33 +18,55 @@
|
||||||
package dsync
|
package dsync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"net/rpc"
|
"errors"
|
||||||
"sync"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
"github.com/minio/minio/internal/rest"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReconnectRPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
|
// ReconnectRPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
|
||||||
type ReconnectRPCClient struct {
|
type ReconnectRPCClient struct {
|
||||||
mutex sync.Mutex
|
u *url.URL
|
||||||
rpc *rpc.Client
|
rpc *rest.Client
|
||||||
addr string
|
|
||||||
endpoint string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newClient constructs a ReconnectRPCClient object with addr and endpoint initialized.
|
// newClient constructs a ReconnectRPCClient object with addr and endpoint initialized.
|
||||||
// It _doesn't_ connect to the remote endpoint. See Call method to see when the
|
// It _doesn't_ connect to the remote endpoint. See Call method to see when the
|
||||||
// connect happens.
|
// connect happens.
|
||||||
func newClient(addr, endpoint string) NetLocker {
|
func newClient(endpoint string) NetLocker {
|
||||||
|
u, err := url.Parse(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := &http.Transport{
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
MaxIdleConnsPerHost: 1024,
|
||||||
|
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||||
|
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
|
||||||
|
IdleConnTimeout: 15 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||||
|
TLSHandshakeTimeout: 15 * time.Second,
|
||||||
|
ExpectContinueTimeout: 15 * time.Second,
|
||||||
|
// Go net/http automatically unzip if content-type is
|
||||||
|
// gzip disable this feature, as we are always interested
|
||||||
|
// in raw stream.
|
||||||
|
DisableCompression: true,
|
||||||
|
}
|
||||||
|
|
||||||
return &ReconnectRPCClient{
|
return &ReconnectRPCClient{
|
||||||
addr: addr,
|
u: u,
|
||||||
endpoint: endpoint,
|
rpc: rest.NewClient(u, tr, nil),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the underlying socket file descriptor.
|
// Close closes the underlying socket file descriptor.
|
||||||
func (rpcClient *ReconnectRPCClient) IsOnline() bool {
|
func (rpcClient *ReconnectRPCClient) IsOnline() bool {
|
||||||
rpcClient.mutex.Lock()
|
|
||||||
defer rpcClient.mutex.Unlock()
|
|
||||||
// If rpc client has not connected yet there is nothing to close.
|
// If rpc client has not connected yet there is nothing to close.
|
||||||
return rpcClient.rpc != nil
|
return rpcClient.rpc != nil
|
||||||
}
|
}
|
||||||
|
@ -55,74 +77,73 @@ func (rpcClient *ReconnectRPCClient) IsLocal() bool {
|
||||||
|
|
||||||
// Close closes the underlying socket file descriptor.
|
// Close closes the underlying socket file descriptor.
|
||||||
func (rpcClient *ReconnectRPCClient) Close() error {
|
func (rpcClient *ReconnectRPCClient) Close() error {
|
||||||
rpcClient.mutex.Lock()
|
|
||||||
defer rpcClient.mutex.Unlock()
|
|
||||||
// If rpc client has not connected yet there is nothing to close.
|
|
||||||
if rpcClient.rpc == nil {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Reset rpcClient.rpc to allow for subsequent calls to use a new
|
|
||||||
// (socket) connection.
|
var (
|
||||||
clnt := rpcClient.rpc
|
errLockConflict = errors.New("lock conflict")
|
||||||
rpcClient.rpc = nil
|
errLockNotFound = errors.New("lock not found")
|
||||||
return clnt.Close()
|
)
|
||||||
|
|
||||||
|
func toLockError(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob.
|
switch err.Error() {
|
||||||
func (rpcClient *ReconnectRPCClient) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
|
case errLockConflict.Error():
|
||||||
rpcClient.mutex.Lock()
|
return errLockConflict
|
||||||
defer rpcClient.mutex.Unlock()
|
case errLockNotFound.Error():
|
||||||
dialCall := func() error {
|
return errLockNotFound
|
||||||
// If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint.
|
|
||||||
if rpcClient.rpc == nil {
|
|
||||||
clnt, derr := rpc.DialHTTPPath("tcp", rpcClient.addr, rpcClient.endpoint)
|
|
||||||
if derr != nil {
|
|
||||||
return derr
|
|
||||||
}
|
|
||||||
rpcClient.rpc = clnt
|
|
||||||
}
|
|
||||||
// If the RPC fails due to a network-related error, then we reset
|
|
||||||
// rpc.Client for a subsequent reconnect.
|
|
||||||
return rpcClient.rpc.Call(serviceMethod, args, reply)
|
|
||||||
}
|
|
||||||
if err = dialCall(); err == rpc.ErrShutdown {
|
|
||||||
rpcClient.rpc.Close()
|
|
||||||
rpcClient.rpc = nil
|
|
||||||
err = dialCall()
|
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob.
|
||||||
|
func (rpcClient *ReconnectRPCClient) Call(method string, args LockArgs) (status bool, err error) {
|
||||||
|
buf, err := args.MarshalMsg(nil)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
body := bytes.NewReader(buf)
|
||||||
|
respBody, err := rpcClient.rpc.Call(context.Background(), method,
|
||||||
|
url.Values{}, body, body.Size())
|
||||||
|
defer xhttp.DrainBody(respBody)
|
||||||
|
|
||||||
|
switch toLockError(err) {
|
||||||
|
case nil:
|
||||||
|
return true, nil
|
||||||
|
case errLockConflict, errLockNotFound:
|
||||||
|
return false, nil
|
||||||
|
default:
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) {
|
func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.RLock", &args, &status)
|
return rpcClient.Call("/v1/rlock", args)
|
||||||
return status, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) {
|
func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.Lock", &args, &status)
|
return rpcClient.Call("/v1/lock", args)
|
||||||
return status, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) RUnlock(ctx context.Context, args LockArgs) (status bool, err error) {
|
func (rpcClient *ReconnectRPCClient) RUnlock(ctx context.Context, args LockArgs) (status bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.RUnlock", &args, &status)
|
return rpcClient.Call("/v1/runlock", args)
|
||||||
return status, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) Unlock(ctx context.Context, args LockArgs) (status bool, err error) {
|
func (rpcClient *ReconnectRPCClient) Unlock(ctx context.Context, args LockArgs) (status bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.Unlock", &args, &status)
|
return rpcClient.Call("/v1/unlock", args)
|
||||||
return status, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) Refresh(ctx context.Context, args LockArgs) (refreshed bool, err error) {
|
func (rpcClient *ReconnectRPCClient) Refresh(ctx context.Context, args LockArgs) (refreshed bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.Refresh", &args, &refreshed)
|
return rpcClient.Call("/v1/refresh", args)
|
||||||
return refreshed, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (reply bool, err error) {
|
func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (reply bool, err error) {
|
||||||
err = rpcClient.Call("Dsync.ForceUnlock", &args, &reply)
|
return rpcClient.Call("/v1/force-unlock", args)
|
||||||
return reply, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rpcClient *ReconnectRPCClient) String() string {
|
func (rpcClient *ReconnectRPCClient) String() string {
|
||||||
return "http://" + rpcClient.addr + "/" + rpcClient.endpoint
|
return rpcClient.u.String()
|
||||||
}
|
}
|
|
@ -19,11 +19,166 @@ package dsync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const numberOfNodes = 5
|
||||||
|
|
||||||
|
var (
|
||||||
|
ds *Dsync
|
||||||
|
nodes = make([]*httptest.Server, numberOfNodes) // list of node IP addrs or hostname with ports.
|
||||||
|
lockServers = make([]*lockServer, numberOfNodes)
|
||||||
|
)
|
||||||
|
|
||||||
|
func getLockArgs(r *http.Request) (args LockArgs, err error) {
|
||||||
|
buf, err := ioutil.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
return args, err
|
||||||
|
}
|
||||||
|
_, err = args.UnmarshalMsg(buf)
|
||||||
|
return args, err
|
||||||
|
}
|
||||||
|
|
||||||
|
type lockServerHandler struct {
|
||||||
|
lsrv *lockServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) writeErrorResponse(w http.ResponseWriter, err error) {
|
||||||
|
w.WriteHeader(http.StatusForbidden)
|
||||||
|
w.Write([]byte(err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
args, err := getLockArgs(r)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = lh.lsrv.ForceUnlock(&args); err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) RefreshHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
args, err := getLockArgs(r)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reply, err := lh.lsrv.Refresh(&args)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reply {
|
||||||
|
lh.writeErrorResponse(w, errLockNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) LockHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
args, err := getLockArgs(r)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
reply, err := lh.lsrv.Lock(&args)
|
||||||
|
if err == nil && !reply {
|
||||||
|
err = errLockConflict
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) UnlockHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
args, err := getLockArgs(r)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, err = lh.lsrv.Unlock(&args)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) RUnlockHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
args, err := getLockArgs(r)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, err = lh.lsrv.RUnlock(&args)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) HealthHandler(w http.ResponseWriter, r *http.Request) {}
|
||||||
|
|
||||||
|
func (lh *lockServerHandler) RLockHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
args, err := getLockArgs(r)
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reply, err := lh.lsrv.RLock(&args)
|
||||||
|
if err == nil && !reply {
|
||||||
|
err = errLockConflict
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
lh.writeErrorResponse(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func stopRPCServers() {
|
||||||
|
for i := 0; i < numberOfNodes; i++ {
|
||||||
|
nodes[i].Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startRPCServers() {
|
||||||
|
for i := 0; i < numberOfNodes; i++ {
|
||||||
|
lsrv := &lockServer{
|
||||||
|
mutex: sync.Mutex{},
|
||||||
|
lockMap: make(map[string]int64),
|
||||||
|
}
|
||||||
|
lockServer := lockServerHandler{
|
||||||
|
lsrv: lsrv,
|
||||||
|
}
|
||||||
|
lockServers[i] = lsrv
|
||||||
|
|
||||||
|
router := mux.NewRouter().SkipClean(true)
|
||||||
|
subrouter := router.PathPrefix("/").Subrouter()
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/health").HandlerFunc(lockServer.HealthHandler)
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/refresh").HandlerFunc(lockServer.RefreshHandler)
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/lock").HandlerFunc(lockServer.LockHandler)
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/rlock").HandlerFunc(lockServer.RLockHandler)
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/unlock").HandlerFunc(lockServer.UnlockHandler)
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/runlock").HandlerFunc(lockServer.RUnlockHandler)
|
||||||
|
subrouter.Methods(http.MethodPost).Path("/v1/force-unlock").HandlerFunc(lockServer.ForceUnlockHandler)
|
||||||
|
|
||||||
|
nodes[i] = httptest.NewServer(router)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const WriteLock = -1
|
const WriteLock = -1
|
||||||
|
|
||||||
type lockServer struct {
|
type lockServer struct {
|
||||||
|
@ -49,21 +204,21 @@ func (l *lockServer) setResponseDelay(responseDelay time.Duration) {
|
||||||
atomic.StoreInt64(&l.responseDelay, int64(responseDelay))
|
atomic.StoreInt64(&l.responseDelay, int64(responseDelay))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
|
func (l *lockServer) Lock(args *LockArgs) (reply bool, err error) {
|
||||||
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
||||||
time.Sleep(time.Duration(d))
|
time.Sleep(time.Duration(d))
|
||||||
}
|
}
|
||||||
|
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
if _, *reply = l.lockMap[args.Resources[0]]; !*reply {
|
if _, reply = l.lockMap[args.Resources[0]]; !reply {
|
||||||
l.lockMap[args.Resources[0]] = WriteLock // No locks held on the given name, so claim write lock
|
l.lockMap[args.Resources[0]] = WriteLock // No locks held on the given name, so claim write lock
|
||||||
}
|
}
|
||||||
*reply = !*reply // Negate *reply to return true when lock is granted or false otherwise
|
reply = !reply // Negate *reply to return true when lock is granted or false otherwise
|
||||||
return nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
|
func (l *lockServer) Unlock(args *LockArgs) (reply bool, err error) {
|
||||||
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
||||||
time.Sleep(time.Duration(d))
|
time.Sleep(time.Duration(d))
|
||||||
}
|
}
|
||||||
|
@ -71,19 +226,19 @@ func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
var locksHeld int64
|
var locksHeld int64
|
||||||
if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { // No lock is held on the given name
|
if locksHeld, reply = l.lockMap[args.Resources[0]]; !reply { // No lock is held on the given name
|
||||||
return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resources[0])
|
return false, fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Resources[0])
|
||||||
}
|
}
|
||||||
if *reply = locksHeld == WriteLock; !*reply { // Unless it is a write lock
|
if reply = locksHeld == WriteLock; !reply { // Unless it is a write lock
|
||||||
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resources[0], locksHeld)
|
return false, fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Resources[0], locksHeld)
|
||||||
}
|
}
|
||||||
delete(l.lockMap, args.Resources[0]) // Remove the write lock
|
delete(l.lockMap, args.Resources[0]) // Remove the write lock
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const ReadLock = 1
|
const ReadLock = 1
|
||||||
|
|
||||||
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
|
func (l *lockServer) RLock(args *LockArgs) (reply bool, err error) {
|
||||||
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
||||||
time.Sleep(time.Duration(d))
|
time.Sleep(time.Duration(d))
|
||||||
}
|
}
|
||||||
|
@ -91,16 +246,16 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
var locksHeld int64
|
var locksHeld int64
|
||||||
if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply {
|
if locksHeld, reply = l.lockMap[args.Resources[0]]; !reply {
|
||||||
l.lockMap[args.Resources[0]] = ReadLock // No locks held on the given name, so claim (first) read lock
|
l.lockMap[args.Resources[0]] = ReadLock // No locks held on the given name, so claim (first) read lock
|
||||||
*reply = true
|
reply = true
|
||||||
} else if *reply = locksHeld != WriteLock; *reply { // Unless there is a write lock
|
} else if reply = locksHeld != WriteLock; reply { // Unless there is a write lock
|
||||||
l.lockMap[args.Resources[0]] = locksHeld + ReadLock // Grant another read lock
|
l.lockMap[args.Resources[0]] = locksHeld + ReadLock // Grant another read lock
|
||||||
}
|
}
|
||||||
return nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
|
func (l *lockServer) RUnlock(args *LockArgs) (reply bool, err error) {
|
||||||
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
||||||
time.Sleep(time.Duration(d))
|
time.Sleep(time.Duration(d))
|
||||||
}
|
}
|
||||||
|
@ -108,32 +263,32 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
var locksHeld int64
|
var locksHeld int64
|
||||||
if locksHeld, *reply = l.lockMap[args.Resources[0]]; !*reply { // No lock is held on the given name
|
if locksHeld, reply = l.lockMap[args.Resources[0]]; !reply { // No lock is held on the given name
|
||||||
return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resources[0])
|
return false, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Resources[0])
|
||||||
}
|
}
|
||||||
if *reply = locksHeld != WriteLock; !*reply { // A write-lock is held, cannot release a read lock
|
if reply = locksHeld != WriteLock; !reply { // A write-lock is held, cannot release a read lock
|
||||||
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resources[0])
|
return false, fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Resources[0])
|
||||||
}
|
}
|
||||||
if locksHeld > ReadLock {
|
if locksHeld > ReadLock {
|
||||||
l.lockMap[args.Resources[0]] = locksHeld - ReadLock // Remove one of the read locks held
|
l.lockMap[args.Resources[0]] = locksHeld - ReadLock // Remove one of the read locks held
|
||||||
} else {
|
} else {
|
||||||
delete(l.lockMap, args.Resources[0]) // Remove the (last) read lock
|
delete(l.lockMap, args.Resources[0]) // Remove the (last) read lock
|
||||||
}
|
}
|
||||||
return nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lockServer) Refresh(args *LockArgs, reply *bool) error {
|
func (l *lockServer) Refresh(args *LockArgs) (reply bool, err error) {
|
||||||
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
||||||
time.Sleep(time.Duration(d))
|
time.Sleep(time.Duration(d))
|
||||||
}
|
}
|
||||||
|
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
*reply = !l.lockNotFound
|
reply = !l.lockNotFound
|
||||||
return nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error {
|
func (l *lockServer) ForceUnlock(args *LockArgs) (reply bool, err error) {
|
||||||
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
if d := atomic.LoadInt64(&l.responseDelay); d != 0 {
|
||||||
time.Sleep(time.Duration(d))
|
time.Sleep(time.Duration(d))
|
||||||
}
|
}
|
||||||
|
@ -141,9 +296,9 @@ func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error {
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
if len(args.UID) != 0 {
|
if len(args.UID) != 0 {
|
||||||
return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
|
return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID)
|
||||||
}
|
}
|
||||||
delete(l.lockMap, args.Resources[0]) // Remove the lock (irrespective of write or read lock)
|
delete(l.lockMap, args.Resources[0]) // Remove the lock (irrespective of write or read lock)
|
||||||
*reply = true
|
reply = true
|
||||||
return nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,14 +19,8 @@ package dsync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
golog "log"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"net/rpc"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -34,67 +28,23 @@ import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
const numberOfNodes = 5
|
|
||||||
|
|
||||||
var (
|
|
||||||
ds *Dsync
|
|
||||||
rpcPaths []string // list of rpc paths where lock server is serving.
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
nodes = make([]string, numberOfNodes) // list of node IP addrs or hostname with ports.
|
|
||||||
lockServers []*lockServer
|
|
||||||
)
|
|
||||||
|
|
||||||
func startRPCServers() {
|
|
||||||
for i := range nodes {
|
|
||||||
server := rpc.NewServer()
|
|
||||||
ls := &lockServer{
|
|
||||||
mutex: sync.Mutex{},
|
|
||||||
lockMap: make(map[string]int64),
|
|
||||||
}
|
|
||||||
server.RegisterName("Dsync", ls)
|
|
||||||
// For some reason the registration paths need to be different (even for different server objs)
|
|
||||||
server.HandleHTTP(rpcPaths[i], fmt.Sprintf("%s-debug", rpcPaths[i]))
|
|
||||||
l, e := net.Listen("tcp", ":"+strconv.Itoa(i+12345))
|
|
||||||
if e != nil {
|
|
||||||
golog.Fatal("listen error:", e)
|
|
||||||
}
|
|
||||||
go http.Serve(l, nil)
|
|
||||||
|
|
||||||
lockServers = append(lockServers, ls)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Let servers start
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestMain initializes the testing framework
|
// TestMain initializes the testing framework
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
const rpcPath = "/dsync"
|
startRPCServers()
|
||||||
|
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
|
||||||
|
|
||||||
for i := range nodes {
|
|
||||||
nodes[i] = fmt.Sprintf("127.0.0.1:%d", i+12345)
|
|
||||||
}
|
|
||||||
for i := range nodes {
|
|
||||||
rpcPaths = append(rpcPaths, rpcPath+"-"+strconv.Itoa(i))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize net/rpc clients for dsync.
|
// Initialize net/rpc clients for dsync.
|
||||||
var clnts []NetLocker
|
var clnts []NetLocker
|
||||||
for i := 0; i < len(nodes); i++ {
|
for i := 0; i < len(nodes); i++ {
|
||||||
clnts = append(clnts, newClient(nodes[i], rpcPaths[i]))
|
clnts = append(clnts, newClient(nodes[i].URL))
|
||||||
}
|
}
|
||||||
|
|
||||||
ds = &Dsync{
|
ds = &Dsync{
|
||||||
GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() },
|
GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() },
|
||||||
}
|
}
|
||||||
|
|
||||||
startRPCServers()
|
code := m.Run()
|
||||||
|
stopRPCServers()
|
||||||
os.Exit(m.Run())
|
os.Exit(code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSimpleLock(t *testing.T) {
|
func TestSimpleLock(t *testing.T) {
|
||||||
|
@ -281,7 +231,7 @@ func TestFailedRefreshLock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dm := NewDRWMutex(ds, "aap")
|
dm := NewDRWMutex(ds, "aap")
|
||||||
wg := sync.WaitGroup{}
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
ctx, cl := context.WithCancel(context.Background())
|
ctx, cl := context.WithCancel(context.Background())
|
||||||
|
|
|
@ -189,6 +189,9 @@ func TestHTTPListenerStartClose(t *testing.T) {
|
||||||
// Ignore if IP is unbindable.
|
// Ignore if IP is unbindable.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if strings.Contains(err.Error(), "bind: address already in use") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
|
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,6 +235,9 @@ func TestHTTPListenerAddr(t *testing.T) {
|
||||||
// Ignore if IP is unbindable.
|
// Ignore if IP is unbindable.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if strings.Contains(err.Error(), "bind: address already in use") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
|
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,6 +278,9 @@ func TestHTTPListenerAddrs(t *testing.T) {
|
||||||
// Ignore if IP is unbindable.
|
// Ignore if IP is unbindable.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if strings.Contains(err.Error(), "bind: address already in use") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
|
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,9 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &NetworkError{err}
|
return nil, &NetworkError{err}
|
||||||
}
|
}
|
||||||
|
if c.newAuthToken != nil {
|
||||||
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
|
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.RawQuery))
|
||||||
|
}
|
||||||
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
|
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
|
||||||
if body != nil {
|
if body != nil {
|
||||||
req.Header.Set("Expect", "100-continue")
|
req.Header.Set("Expect", "100-continue")
|
||||||
|
|
Loading…
Reference in New Issue