Fix Mux Connect Error (#18567)

`OpMuxConnectError` was not handled correctly.

Remove local checks for single request handlers so they can 
run before being registered locally.

Bonus: Only log IAM bootstrap on startup.
This commit is contained in:
Klaus Post 2023-12-01 00:18:04 -08:00 committed by GitHub
parent 0d7abe3b9f
commit 5f971fea6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 113 additions and 24 deletions

View File

@ -72,7 +72,7 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, healEntry func(strin
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
disks, _ := er.getOnlineDisksWithHealing() disks, _ := er.getOnlineDisksWithHealing(false)
if len(disks) == 0 { if len(disks) == 0 {
return errors.New("listAndHeal: No non-healing drives found") return errors.New("listAndHeal: No non-healing drives found")
} }

View File

@ -1967,7 +1967,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
go func() { go func() {
defer wg.Done() defer wg.Done()
disks, _ := set.getOnlineDisksWithHealing() disks, _ := set.getOnlineDisksWithHealing(true)
if len(disks) == 0 { if len(disks) == 0 {
cancel() cancel()
return return

View File

@ -274,7 +274,12 @@ func (er erasureObjects) LocalStorageInfo(ctx context.Context) StorageInfo {
return getStorageInfo(localDisks, localEndpoints) return getStorageInfo(localDisks, localEndpoints)
} }
func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) { // getOnlineDisksWithHealing - returns online disks and overall healing status.
// Disks are randomly ordered, but in the following groups:
// - Non-scanning disks
// - Non-healing disks
// - Healing disks (if inclHealing is true)
func (er erasureObjects) getOnlineDisksWithHealing(inclHealing bool) (newDisks []StorageAPI, healing bool) {
var wg sync.WaitGroup var wg sync.WaitGroup
disks := er.getDisks() disks := er.getDisks()
infos := make([]DiskInfo, len(disks)) infos := make([]DiskInfo, len(disks))
@ -292,7 +297,7 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
} }
di, err := disk.DiskInfo(context.Background(), false) di, err := disk.DiskInfo(context.Background(), false)
if err != nil || di.Healing { if err != nil {
// - Do not consume disks which are not reachable // - Do not consume disks which are not reachable
// unformatted or simply not accessible for some reason. // unformatted or simply not accessible for some reason.
// //
@ -303,21 +308,31 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
} }
return return
} }
if !inclHealing && di.Healing {
return
}
infos[i] = di infos[i] = di
}() }()
} }
wg.Wait() wg.Wait()
var scanningDisks []StorageAPI var scanningDisks, healingDisks []StorageAPI
for i, info := range infos { for i, info := range infos {
// Check if one of the drives in the set is being healed. // Check if one of the drives in the set is being healed.
// this information is used by scanner to skip healing // this information is used by scanner to skip healing
// this erasure set while it calculates the usage. // this erasure set while it calculates the usage.
if info.Healing || info.Error != "" { if info.Error != "" || disks[i] == nil {
healing = true
continue continue
} }
if info.Healing {
healing = true
if inclHealing {
healingDisks = append(healingDisks, disks[i])
}
continue
}
if !info.Scanning { if !info.Scanning {
newDisks = append(newDisks, disks[i]) newDisks = append(newDisks, disks[i])
} else { } else {
@ -325,8 +340,10 @@ func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, hea
} }
} }
// Prefer new disks over disks which are currently being scanned. // Prefer non-scanning disks over disks which are currently being scanned.
newDisks = append(newDisks, scanningDisks...) newDisks = append(newDisks, scanningDisks...)
/// Then add healing disks.
newDisks = append(newDisks, healingDisks...)
return newDisks, healing return newDisks, healing
} }
@ -364,7 +381,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, wa
} }
// Collect disks we can use. // Collect disks we can use.
disks, healing := er.getOnlineDisksWithHealing() disks, healing := er.getOnlineDisksWithHealing(false)
if len(disks) == 0 { if len(disks) == 0 {
logger.LogIf(ctx, errors.New("data-scanner: all drives are offline or being healed, skipping scanner cycle")) logger.LogIf(ctx, errors.New("data-scanner: all drives are offline or being healed, skipping scanner cycle"))
return nil return nil

View File

@ -209,7 +209,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
bucket, humanize.Ordinal(er.setIndex+1)) bucket, humanize.Ordinal(er.setIndex+1))
} }
disks, _ := er.getOnlineDisksWithHealing() disks, _ := er.getOnlineDisksWithHealing(false)
if len(disks) == 0 { if len(disks) == 0 {
logger.LogIf(ctx, fmt.Errorf("no online disks found to heal the bucket `%s`", bucket)) logger.LogIf(ctx, fmt.Errorf("no online disks found to heal the bucket `%s`", bucket))
continue continue

View File

@ -488,7 +488,12 @@ func (store *IAMStoreSys) PurgeExpiredSTS(ctx context.Context) error {
// LoadIAMCache reads all IAM items and populates a new iamCache object and // LoadIAMCache reads all IAM items and populates a new iamCache object and
// replaces the in-memory cache object. // replaces the in-memory cache object.
func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error { func (store *IAMStoreSys) LoadIAMCache(ctx context.Context, firstTime bool) error {
bootstrapTraceMsg := func(s string) {
if firstTime {
bootstrapTraceMsg(s)
}
}
bootstrapTraceMsg("loading IAM data") bootstrapTraceMsg("loading IAM data")
newCache := newIamCache() newCache := newIamCache()

View File

@ -189,7 +189,7 @@ func (sys *IAMSys) Initialized() bool {
// Load - loads all credentials, policies and policy mappings. // Load - loads all credentials, policies and policy mappings.
func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error { func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error {
loadStartTime := time.Now() loadStartTime := time.Now()
err := sys.store.LoadIAMCache(ctx) err := sys.store.LoadIAMCache(ctx, firstTime)
if err != nil { if err != nil {
atomic.AddUint64(&sys.TotalRefreshFailures, 1) atomic.AddUint64(&sys.TotalRefreshFailures, 1)
return err return err

View File

@ -606,8 +606,8 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
defer close(results) defer close(results)
o.debugf(color.Green("listPath:")+" with options: %#v", o) o.debugf(color.Green("listPath:")+" with options: %#v", o)
// get non-healing disks for listing // get prioritized non-healing disks for listing
disks, _ := er.getOnlineDisksWithHealing() disks, _ := er.getOnlineDisksWithHealing(true)
askDisks := getListQuorum(o.AskDisks, er.setDriveCount) askDisks := getListQuorum(o.AskDisks, er.setDriveCount)
var fallbackDisks []StorageAPI var fallbackDisks []StorageAPI

View File

@ -1350,12 +1350,12 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
return collectInternodeStats(httpTraceHdrs(f)) return collectInternodeStats(httpTraceHdrs(f))
} }
registered := 0
for _, setDisks := range storageDisks { for _, setDisks := range storageDisks {
for _, storage := range setDisks { for _, storage := range setDisks {
if storage == nil { if storage == nil {
continue continue
} }
endpoint := storage.Endpoint() endpoint := storage.Endpoint()
server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage, true)} server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage, true)}
@ -1402,6 +1402,17 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
Handle: server.WalkDirHandler, Handle: server.WalkDirHandler,
OutCapacity: 1, OutCapacity: 1,
}), "unable to register handler") }), "unable to register handler")
registered++
} }
} }
if registered == 0 {
// Register a dummy handler so remote calls can go out.
logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerWalkDir, grid.StreamHandler{
Subroute: fmt.Sprintf("__dummy__%d", time.Now().UnixNano()),
Handle: func(ctx context.Context, payload []byte, in <-chan []byte, out chan<- []byte) *grid.RemoteErr {
return grid.NewRemoteErr(errDiskNotFound)
},
OutCapacity: 1,
}), "unable to register handler")
}
} }

View File

@ -321,10 +321,7 @@ func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]by
if c.State() != StateConnected { if c.State() != StateConnected {
return nil, ErrDisconnected return nil, ErrDisconnected
} }
handler := c.handlers.single[h] // Create mux client and call.
if handler == nil {
return nil, ErrUnknownHandler
}
client, err := c.newMuxClient(ctx) client, err := c.newMuxClient(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -349,10 +346,7 @@ func (c *Subroute) Request(ctx context.Context, h HandlerID, req []byte) ([]byte
if c.State() != StateConnected { if c.State() != StateConnected {
return nil, ErrDisconnected return nil, ErrDisconnected
} }
handler := c.handlers.subSingle[makeZeroSubHandlerID(h)] // Create mux client and call.
if handler == nil {
return nil, ErrUnknownHandler
}
client, err := c.newMuxClient(ctx) client, err := c.newMuxClient(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1159,6 +1153,8 @@ func (c *Connection) handleMsg(ctx context.Context, m message, subID *subHandler
c.handleAckMux(ctx, m) c.handleAckMux(ctx, m)
case OpConnectMux: case OpConnectMux:
c.handleConnectMux(ctx, m, subID) c.handleConnectMux(ctx, m, subID)
case OpMuxConnectError:
c.handleConnectMuxError(ctx, m)
default: default:
logger.LogIf(ctx, fmt.Errorf("unknown message type: %v", m.Op)) logger.LogIf(ctx, fmt.Errorf("unknown message type: %v", m.Op))
} }
@ -1210,6 +1206,18 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub
} }
} }
// handleConnectMuxError when mux connect was rejected.
func (c *Connection) handleConnectMuxError(ctx context.Context, m message) {
if v, ok := c.outgoing.Load(m.MuxID); ok {
var cErr muxConnectError
_, err := cErr.UnmarshalMsg(m.Payload)
logger.LogIf(ctx, err)
v.error(RemoteErr(cErr.Error))
return
}
PutByteBuffer(m.Payload)
}
func (c *Connection) handleAckMux(ctx context.Context, m message) { func (c *Connection) handleAckMux(ctx context.Context, m message) {
PutByteBuffer(m.Payload) PutByteBuffer(m.Payload)
v, ok := c.outgoing.Load(m.MuxID) v, ok := c.outgoing.Load(m.MuxID)

View File

@ -129,6 +129,54 @@ func TestSingleRoundtrip(t *testing.T) {
}) })
} }
func TestSingleRoundtripNotReady(t *testing.T) {
defer testlogger.T.SetLogTB(t)()
errFatal := func(t testing.TB, err error) {
t.Helper()
if err != nil {
t.Fatal(err)
}
}
grid, err := SetupTestGrid(2)
errFatal(t, err)
remoteHost := grid.Hosts[1]
local := grid.Managers[0]
// 1: Echo
errFatal(t, local.RegisterSingleHandler(handlerTest, func(payload []byte) ([]byte, *RemoteErr) {
t.Log("1: server payload: ", len(payload), "bytes.")
return append([]byte{}, payload...), nil
}))
// 2: Return as error
errFatal(t, local.RegisterSingleHandler(handlerTest2, func(payload []byte) ([]byte, *RemoteErr) {
t.Log("2: server payload: ", len(payload), "bytes.")
err := RemoteErr(payload)
return nil, &err
}))
// Do not register remote handlers
// local to remote
remoteConn := local.Connection(remoteHost)
remoteConn.WaitForConnect(context.Background())
defer testlogger.T.SetErrorTB(t)()
t.Run("localToRemote", func(t *testing.T) {
const testPayload = "Hello Grid World!"
// Single requests should have remote errors.
_, err := remoteConn.Request(context.Background(), handlerTest, []byte(testPayload))
if v, ok := err.(*RemoteErr); !ok || v.Error() != "Invalid Handler for type" {
t.Fatalf("Unexpected error: %v, %T", err, err)
}
// Streams should not be able to set up until registered.
// Thus, the error is a local error.
_, err = remoteConn.NewStream(context.Background(), handlerTest, []byte(testPayload))
if !errors.Is(err, ErrUnknownHandler) {
t.Fatalf("Unexpected error: %v, %T", err, err)
}
})
}
func TestSingleRoundtripGenerics(t *testing.T) { func TestSingleRoundtripGenerics(t *testing.T) {
defer testlogger.T.SetLogTB(t)() defer testlogger.T.SetLogTB(t)()
errFatal := func(err error) { errFatal := func(err error) {

View File

@ -89,7 +89,7 @@ var handlerPrefixes = [handlerLast]string{
} }
const ( const (
lockPrefix = "lock" lockPrefix = "lockR"
storagePrefix = "storageR" storagePrefix = "storageR"
) )