separate lock from common grid to avoid epoll contention (#20180)

epoll contention on TCP causes latency build-up when
we have high volume ingress. This PR is an attempt to
relieve this pressure.

upstream issue https://github.com/golang/go/issues/65064
It seems to be a deeper problem; haven't yet tried the fix
provide in this issue, but however this change without
changing the compiler helps. 

Of course, this is a workaround for now, hoping for a
more comprehensive fix from Go runtime.
This commit is contained in:
Harshavardhana 2024-07-29 11:10:04 -07:00 committed by GitHub
parent 6651c655cb
commit a17f14f73a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 121 additions and 31 deletions

View File

@ -247,8 +247,11 @@ func guessIsRPCReq(req *http.Request) bool {
if req == nil {
return false
}
if req.Method == http.MethodGet && req.URL != nil && req.URL.Path == grid.RoutePath {
return true
if req.Method == http.MethodGet && req.URL != nil {
switch req.URL.Path {
case grid.RoutePath, grid.RouteLockPath:
return true
}
}
return (req.Method == http.MethodPost || req.Method == http.MethodGet) &&

View File

@ -64,6 +64,14 @@ func TestGuessIsRPC(t *testing.T) {
if !guessIsRPCReq(r) {
t.Fatal("Grid RPC path not detected")
}
r = &http.Request{
Proto: "HTTP/1.1",
Method: http.MethodGet,
URL: &url.URL{Path: grid.RouteLockPath},
}
if !guessIsRPCReq(r) {
t.Fatal("Grid RPC path not detected")
}
}
var isHTTPHeaderSizeTooLargeTests = []struct {

View File

@ -31,9 +31,15 @@ import (
// globalGrid is the global grid manager.
var globalGrid atomic.Pointer[grid.Manager]
// globalLockGrid is the global lock grid manager.
var globalLockGrid atomic.Pointer[grid.Manager]
// globalGridStart is a channel that will block startup of grid connections until closed.
var globalGridStart = make(chan struct{})
// globalLockGridStart is a channel that will block startup of lock grid connections until closed.
var globalLockGridStart = make(chan struct{})
func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
hosts, local := eps.GridHosts()
lookupHost := globalDNSCache.LookupHost
@ -55,9 +61,10 @@ func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
AuthFn: newCachedAuthToken(),
BlockConnect: globalGridStart,
// Record incoming and outgoing bytes.
Incoming: globalConnStats.incInternodeInputBytes,
Outgoing: globalConnStats.incInternodeOutputBytes,
TraceTo: globalTrace,
Incoming: globalConnStats.incInternodeInputBytes,
Outgoing: globalConnStats.incInternodeOutputBytes,
TraceTo: globalTrace,
RoutePath: grid.RoutePath,
})
if err != nil {
return err
@ -65,3 +72,36 @@ func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error {
globalGrid.Store(g)
return nil
}
func initGlobalLockGrid(ctx context.Context, eps EndpointServerPools) error {
hosts, local := eps.GridHosts()
lookupHost := globalDNSCache.LookupHost
g, err := grid.NewManager(ctx, grid.ManagerOptions{
// Pass Dialer for websocket grid, make sure we do not
// provide any DriveOPTimeout() function, as that is not
// useful over persistent connections.
Dialer: grid.ConnectWSWithRoutePath(
grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions.ForWebsocket()))),
newCachedAuthToken(),
&tls.Config{
RootCAs: globalRootCAs,
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
}, grid.RouteLockPath),
Local: local,
Hosts: hosts,
AuthToken: validateStorageRequestToken,
AuthFn: newCachedAuthToken(),
BlockConnect: globalGridStart,
// Record incoming and outgoing bytes.
Incoming: globalConnStats.incInternodeInputBytes,
Outgoing: globalConnStats.incInternodeOutputBytes,
TraceTo: globalTrace,
RoutePath: grid.RouteLockPath,
})
if err != nil {
return err
}
globalLockGrid.Store(g)
return nil
}

View File

@ -107,5 +107,5 @@ func newLockAPI(endpoint Endpoint) dsync.NetLocker {
// Returns a lock rest client.
func newlockRESTClient(ep Endpoint) *lockRESTClient {
return &lockRESTClient{globalGrid.Load().Connection(ep.GridHost())}
return &lockRESTClient{globalLockGrid.Load().Connection(ep.GridHost())}
}

View File

@ -39,7 +39,7 @@ func TestLockRESTlient(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = initGlobalGrid(ctx, []PoolEndpoints{{Endpoints: Endpoints{endpoint, endpointLocal}}})
err = initGlobalLockGrid(ctx, []PoolEndpoints{{Endpoints: Endpoints{endpoint, endpointLocal}}})
if err != nil {
t.Fatal(err)
}

View File

@ -111,17 +111,17 @@ func newLockHandler(h grid.HandlerID) *grid.SingleHandler[*dsync.LockArgs, *dsyn
}
// registerLockRESTHandlers - register lock rest router.
func registerLockRESTHandlers() {
func registerLockRESTHandlers(gm *grid.Manager) {
lockServer := &lockRESTServer{
ll: newLocker(),
}
logger.FatalIf(lockRPCForceUnlock.Register(globalGrid.Load(), lockServer.ForceUnlockHandler), "unable to register handler")
logger.FatalIf(lockRPCRefresh.Register(globalGrid.Load(), lockServer.RefreshHandler), "unable to register handler")
logger.FatalIf(lockRPCLock.Register(globalGrid.Load(), lockServer.LockHandler), "unable to register handler")
logger.FatalIf(lockRPCUnlock.Register(globalGrid.Load(), lockServer.UnlockHandler), "unable to register handler")
logger.FatalIf(lockRPCRLock.Register(globalGrid.Load(), lockServer.RLockHandler), "unable to register handler")
logger.FatalIf(lockRPCRUnlock.Register(globalGrid.Load(), lockServer.RUnlockHandler), "unable to register handler")
logger.FatalIf(lockRPCForceUnlock.Register(gm, lockServer.ForceUnlockHandler), "unable to register handler")
logger.FatalIf(lockRPCRefresh.Register(gm, lockServer.RefreshHandler), "unable to register handler")
logger.FatalIf(lockRPCLock.Register(gm, lockServer.LockHandler), "unable to register handler")
logger.FatalIf(lockRPCUnlock.Register(gm, lockServer.UnlockHandler), "unable to register handler")
logger.FatalIf(lockRPCRLock.Register(gm, lockServer.RLockHandler), "unable to register handler")
logger.FatalIf(lockRPCRUnlock.Register(gm, lockServer.RUnlockHandler), "unable to register handler")
globalLockServer = lockServer.ll

View File

@ -26,20 +26,28 @@ import (
// Composed function registering routers for only distributed Erasure setup.
func registerDistErasureRouters(router *mux.Router, endpointServerPools EndpointServerPools) {
var (
lockGrid = globalLockGrid.Load()
commonGrid = globalGrid.Load()
)
// Register storage REST router only if its a distributed setup.
registerStorageRESTHandlers(router, endpointServerPools, globalGrid.Load())
registerStorageRESTHandlers(router, endpointServerPools, commonGrid)
// Register peer REST router only if its a distributed setup.
registerPeerRESTHandlers(router, globalGrid.Load())
registerPeerRESTHandlers(router, commonGrid)
// Register bootstrap REST router for distributed setups.
registerBootstrapRESTHandlers(globalGrid.Load())
registerBootstrapRESTHandlers(commonGrid)
// Register distributed namespace lock routers.
registerLockRESTHandlers()
registerLockRESTHandlers(lockGrid)
// Add lock grid to router
router.Handle(grid.RouteLockPath, adminMiddleware(lockGrid.Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag))
// Add grid to router
router.Handle(grid.RoutePath, adminMiddleware(globalGrid.Load().Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag))
router.Handle(grid.RoutePath, adminMiddleware(commonGrid.Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag))
}
// List of some generic middlewares which are applied for all incoming requests.

View File

@ -856,6 +856,11 @@ func serverMain(ctx *cli.Context) {
logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services")
})
// Initialize lock grid
bootstrapTrace("initLockGrid", func() {
logger.FatalIf(initGlobalLockGrid(GlobalContext, globalEndpoints), "Unable to configure server lock grid RPC services")
})
// Configure server.
bootstrapTrace("configureServer", func() {
handler, err := configureServerHandler(globalEndpoints)
@ -863,7 +868,8 @@ func serverMain(ctx *cli.Context) {
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
}
// Allow grid to start after registering all services.
xioutil.SafeClose(globalGridStart)
close(globalGridStart)
close(globalLockGridStart)
httpServer := xhttp.NewServer(getServerListenAddrs()).
UseHandler(setCriticalErrorHandler(corsHandler(handler))).

View File

@ -1319,7 +1319,11 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub
handler = c.handlers.subStateless[*subID]
}
if handler == nil {
gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"}))
msg := fmt.Sprintf("Invalid Handler for type: %v", m.Handler)
if subID != nil {
msg = fmt.Sprintf("Invalid Handler for type: %v", *subID)
}
gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: msg}))
return
}
_, _ = c.inStream.LoadOrCompute(m.MuxID, func() *muxServer {
@ -1338,7 +1342,11 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub
handler = c.handlers.subStreams[*subID]
}
if handler == nil {
gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"}))
msg := fmt.Sprintf("Invalid Handler for type: %v", m.Handler)
if subID != nil {
msg = fmt.Sprintf("Invalid Handler for type: %v", *subID)
}
gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: msg}))
return
}
@ -1392,7 +1400,11 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan
handler = c.handlers.subSingle[*subID]
}
if handler == nil {
gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"}))
msg := fmt.Sprintf("Invalid Handler for type: %v", m.Handler)
if subID != nil {
msg = fmt.Sprintf("Invalid Handler for type: %v", *subID)
}
gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: msg}))
return
}

View File

@ -26,7 +26,6 @@ import (
"sync"
"time"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/mux"
)
@ -90,6 +89,7 @@ func SetupTestGrid(n int) (*TestGrid, error) {
AuthFn: dummyNewToken,
AuthToken: dummyTokenValidate,
BlockConnect: ready,
RoutePath: RoutePath,
})
if err != nil {
return nil, err
@ -101,7 +101,7 @@ func SetupTestGrid(n int) (*TestGrid, error) {
res.Listeners = append(res.Listeners, listeners[i])
res.Mux = append(res.Mux, m)
}
xioutil.SafeClose(ready)
close(ready)
for _, m := range res.Managers {
for _, remote := range m.Targets() {
if err := m.Connection(remote).WaitForConnect(ctx); err != nil {

View File

@ -202,13 +202,12 @@ func bytesOrLength(b []byte) string {
// The net.Conn must support all features as described by the net.Conn interface.
type ConnDialer func(ctx context.Context, address string) (net.Conn, error)
// ConnectWS returns a function that dials a websocket connection to the given address.
// Route and auth are added to the connection.
func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx context.Context, remote string) (net.Conn, error) {
// ConnectWSWithRoutePath is like ConnectWS but with a custom grid route path.
func ConnectWSWithRoutePath(dial ContextDialer, auth AuthFn, tls *tls.Config, routePath string) func(ctx context.Context, remote string) (net.Conn, error) {
return func(ctx context.Context, remote string) (net.Conn, error) {
toDial := strings.Replace(remote, "http://", "ws://", 1)
toDial = strings.Replace(toDial, "https://", "wss://", 1)
toDial += RoutePath
toDial += routePath
dialer := ws.DefaultDialer
dialer.ReadBufferSize = readBufferSize
@ -234,5 +233,11 @@ func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx contex
}
}
// ConnectWS returns a function that dials a websocket connection to the given address.
// Route and auth are added to the connection.
func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx context.Context, remote string) (net.Conn, error) {
return ConnectWSWithRoutePath(dial, auth, tls, RoutePath)
}
// ValidateTokenFn must validate the token and return an error if it is invalid.
type ValidateTokenFn func(token string) error

View File

@ -166,7 +166,7 @@ func TestSingleRoundtripNotReady(t *testing.T) {
const testPayload = "Hello Grid World!"
// Single requests should have remote errors.
_, err := remoteConn.Request(context.Background(), handlerTest, []byte(testPayload))
if v, ok := err.(*RemoteErr); !ok || v.Error() != "Invalid Handler for type" {
if _, ok := err.(*RemoteErr); !ok {
t.Fatalf("Unexpected error: %v, %T", err, err)
}
// Streams should not be able to set up until registered.

View File

@ -45,6 +45,9 @@ const (
// RoutePath is the remote path to connect to.
RoutePath = "/minio/grid/" + apiVersion
// RouteLockPath is the remote lock path to connect to.
RouteLockPath = "/minio/grid/lock/" + apiVersion
)
// Manager will contain all the connections to the grid.
@ -65,6 +68,9 @@ type Manager struct {
// authToken is a function that will validate a token.
authToken ValidateTokenFn
// routePath indicates the dial route path
routePath string
}
// ManagerOptions are options for creating a new grid manager.
@ -74,6 +80,7 @@ type ManagerOptions struct {
Incoming func(n int64) // Record incoming bytes.
Outgoing func(n int64) // Record outgoing bytes.
BlockConnect chan struct{} // If set, incoming and outgoing connections will be blocked until closed.
RoutePath string
TraceTo *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType]
Dialer ConnDialer
// Sign a token for the given audience.
@ -99,6 +106,7 @@ func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) {
targets: make(map[string]*Connection, len(o.Hosts)),
local: o.Local,
authToken: o.AuthToken,
routePath: o.RoutePath,
}
m.handlers.init()
if ctx == nil {
@ -137,7 +145,7 @@ func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) {
// AddToMux will add the grid manager to the given mux.
func (m *Manager) AddToMux(router *mux.Router, authReq func(r *http.Request) error) {
router.Handle(RoutePath, m.Handler(authReq))
router.Handle(m.routePath, m.Handler(authReq))
}
// Handler returns a handler that can be used to serve grid requests.