add new update v2 that updates per node, allows idempotent behavior (#18859)

add new update v2 that updates per node, allows idempotent behavior

new API ensures that

- binary is correct and can be downloaded checksummed verified
- committed to actual path
- restart returns back the relevant waiting drives
This commit is contained in:
Harshavardhana
2024-01-26 08:40:13 -08:00
committed by GitHub
parent d0283ff354
commit 88837fb753
10 changed files with 286 additions and 66 deletions

View File

@@ -76,6 +76,17 @@ func WithNPeers(nerrs int) *NotificationGroup {
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3}
}
// WithNPeersThrottled returns a new NotificationGroup with length of errs slice upto nerrs,
// upon Wait() errors are returned collected from all tasks, optionally allows for X workers
// only "per" parallel task.
func WithNPeersThrottled(nerrs, wks int) *NotificationGroup {
if nerrs <= 0 {
nerrs = 1
}
wk, _ := workers.New(wks)
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3}
}
// WithRetries sets the retry count for all function calls from the Go method.
func (g *NotificationGroup) WithRetries(retryCount int) *NotificationGroup {
if g != nil {
@@ -366,7 +377,7 @@ func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256
maxWorkers = len(sys.peerClients)
}
ng := WithNPeers(maxWorkers)
ng := WithNPeersThrottled(len(sys.peerClients), maxWorkers)
for idx, client := range sys.peerClients {
if client == nil {
continue
@@ -403,7 +414,7 @@ func (sys *NotificationSys) SignalConfigReload(subSys string) []NotificationPeer
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(serviceReloadDynamic, subSys, false, true)
return client.SignalService(serviceReloadDynamic, subSys, false)
}, idx, *client.host)
}
return ng.Wait()
@@ -419,14 +430,14 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
client := client
ng.Go(GlobalContext, func() error {
// force == true preserves the current behavior
return client.SignalService(sig, "", false, true)
return client.SignalService(sig, "", false)
}, idx, *client.host)
}
return ng.Wait()
}
// SignalServiceV2 - calls signal service RPC call on all peers with v2 API
func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun, force bool) []NotificationPeerErr {
func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun bool) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
@@ -434,7 +445,7 @@ func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun, force boo
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(sig, "", dryRun, force)
return client.SignalService(sig, "", dryRun)
}, idx, *client.host)
}
return ng.Wait()
@@ -1318,7 +1329,7 @@ func (sys *NotificationSys) ServiceFreeze(ctx context.Context, freeze bool) []No
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(serviceSig, "", false, true)
return client.SignalService(serviceSig, "", false)
}, idx, *client.host)
}
nerrs := ng.Wait()