mirror of
https://github.com/minio/minio.git
synced 2025-04-19 10:07:30 -04:00
Add fixed timed restarts to updates (#19960)
This commit is contained in:
parent
3e6dc02f8f
commit
fae563b85d
@ -237,6 +237,7 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
|
|||||||
|
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
// Notify all other MinIO peers signal service.
|
// Notify all other MinIO peers signal service.
|
||||||
|
startTime := time.Now().Add(restartUpdateDelay)
|
||||||
ng := WithNPeers(len(globalNotificationSys.peerClients))
|
ng := WithNPeers(len(globalNotificationSys.peerClients))
|
||||||
for idx, client := range globalNotificationSys.peerClients {
|
for idx, client := range globalNotificationSys.peerClients {
|
||||||
_, ok := failedClients[idx]
|
_, ok := failedClients[idx]
|
||||||
@ -247,7 +248,7 @@ func (a adminAPIHandlers) ServerUpdateV2Handler(w http.ResponseWriter, r *http.R
|
|||||||
ng.Go(ctx, func() error {
|
ng.Go(ctx, func() error {
|
||||||
prs, ok := peerResults[client.String()]
|
prs, ok := peerResults[client.String()]
|
||||||
if ok && prs.CurrentVersion != prs.UpdatedVersion && prs.UpdatedVersion != "" {
|
if ok && prs.CurrentVersion != prs.UpdatedVersion && prs.UpdatedVersion != "" {
|
||||||
return client.SignalService(serviceRestart, "", dryRun)
|
return client.SignalService(serviceRestart, "", dryRun, &startTime)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}, idx, *client.host)
|
}, idx, *client.host)
|
||||||
@ -542,9 +543,12 @@ func (a adminAPIHandlers) ServiceV2Handler(w http.ResponseWriter, r *http.Reques
|
|||||||
}
|
}
|
||||||
|
|
||||||
var objectAPI ObjectLayer
|
var objectAPI ObjectLayer
|
||||||
|
var execAt *time.Time
|
||||||
switch serviceSig {
|
switch serviceSig {
|
||||||
case serviceRestart:
|
case serviceRestart:
|
||||||
objectAPI, _ = validateAdminReq(ctx, w, r, policy.ServiceRestartAdminAction)
|
objectAPI, _ = validateAdminReq(ctx, w, r, policy.ServiceRestartAdminAction)
|
||||||
|
t := time.Now().Add(restartUpdateDelay)
|
||||||
|
execAt = &t
|
||||||
case serviceStop:
|
case serviceStop:
|
||||||
objectAPI, _ = validateAdminReq(ctx, w, r, policy.ServiceStopAdminAction)
|
objectAPI, _ = validateAdminReq(ctx, w, r, policy.ServiceStopAdminAction)
|
||||||
case serviceFreeze, serviceUnFreeze:
|
case serviceFreeze, serviceUnFreeze:
|
||||||
@ -571,7 +575,7 @@ func (a adminAPIHandlers) ServiceV2Handler(w http.ResponseWriter, r *http.Reques
|
|||||||
}
|
}
|
||||||
|
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
for _, nerr := range globalNotificationSys.SignalServiceV2(serviceSig, dryRun) {
|
for _, nerr := range globalNotificationSys.SignalServiceV2(serviceSig, dryRun, execAt) {
|
||||||
if nerr.Err != nil && process {
|
if nerr.Err != nil && process {
|
||||||
waitingDrives := map[string]madmin.DiskMetrics{}
|
waitingDrives := map[string]madmin.DiskMetrics{}
|
||||||
jerr := json.Unmarshal([]byte(nerr.Err.Error()), &waitingDrives)
|
jerr := json.Unmarshal([]byte(nerr.Err.Error()), &waitingDrives)
|
||||||
|
@ -424,7 +424,7 @@ func (sys *NotificationSys) SignalConfigReload(subSys string) []NotificationPeer
|
|||||||
}
|
}
|
||||||
client := client
|
client := client
|
||||||
ng.Go(GlobalContext, func() error {
|
ng.Go(GlobalContext, func() error {
|
||||||
return client.SignalService(serviceReloadDynamic, subSys, false)
|
return client.SignalService(serviceReloadDynamic, subSys, false, nil)
|
||||||
}, idx, *client.host)
|
}, idx, *client.host)
|
||||||
}
|
}
|
||||||
return ng.Wait()
|
return ng.Wait()
|
||||||
@ -440,14 +440,14 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
|
|||||||
client := client
|
client := client
|
||||||
ng.Go(GlobalContext, func() error {
|
ng.Go(GlobalContext, func() error {
|
||||||
// force == true preserves the current behavior
|
// force == true preserves the current behavior
|
||||||
return client.SignalService(sig, "", false)
|
return client.SignalService(sig, "", false, nil)
|
||||||
}, idx, *client.host)
|
}, idx, *client.host)
|
||||||
}
|
}
|
||||||
return ng.Wait()
|
return ng.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SignalServiceV2 - calls signal service RPC call on all peers with v2 API
|
// SignalServiceV2 - calls signal service RPC call on all peers with v2 API
|
||||||
func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun bool) []NotificationPeerErr {
|
func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun bool, execAt *time.Time) []NotificationPeerErr {
|
||||||
ng := WithNPeers(len(sys.peerClients))
|
ng := WithNPeers(len(sys.peerClients))
|
||||||
for idx, client := range sys.peerClients {
|
for idx, client := range sys.peerClients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -455,7 +455,7 @@ func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun bool) []No
|
|||||||
}
|
}
|
||||||
client := client
|
client := client
|
||||||
ng.Go(GlobalContext, func() error {
|
ng.Go(GlobalContext, func() error {
|
||||||
return client.SignalService(sig, "", dryRun)
|
return client.SignalService(sig, "", dryRun, execAt)
|
||||||
}, idx, *client.host)
|
}, idx, *client.host)
|
||||||
}
|
}
|
||||||
return ng.Wait()
|
return ng.Wait()
|
||||||
@ -1314,7 +1314,7 @@ func (sys *NotificationSys) ServiceFreeze(ctx context.Context, freeze bool) []No
|
|||||||
}
|
}
|
||||||
client := client
|
client := client
|
||||||
ng.Go(GlobalContext, func() error {
|
ng.Go(GlobalContext, func() error {
|
||||||
return client.SignalService(serviceSig, "", false)
|
return client.SignalService(serviceSig, "", false, nil)
|
||||||
}, idx, *client.host)
|
}, idx, *client.host)
|
||||||
}
|
}
|
||||||
nerrs := ng.Wait()
|
nerrs := ng.Wait()
|
||||||
|
@ -427,11 +427,14 @@ func (client *peerRESTClient) CommitBinary(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SignalService - sends signal to peer nodes.
|
// SignalService - sends signal to peer nodes.
|
||||||
func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string, dryRun bool) error {
|
func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string, dryRun bool, execAt *time.Time) error {
|
||||||
values := grid.NewMSS()
|
values := grid.NewMSS()
|
||||||
values.Set(peerRESTSignal, strconv.Itoa(int(sig)))
|
values.Set(peerRESTSignal, strconv.Itoa(int(sig)))
|
||||||
values.Set(peerRESTDryRun, strconv.FormatBool(dryRun))
|
values.Set(peerRESTDryRun, strconv.FormatBool(dryRun))
|
||||||
values.Set(peerRESTSubSys, subSys)
|
values.Set(peerRESTSubSys, subSys)
|
||||||
|
if execAt != nil {
|
||||||
|
values.Set(peerRESTExecAt, execAt.Format(time.RFC3339Nano))
|
||||||
|
}
|
||||||
_, err := signalServiceRPC.Call(context.Background(), client.gridConn(), values)
|
_, err := signalServiceRPC.Call(context.Background(), client.gridConn(), values)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
|
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
peerRESTVersion = "v39" // add more flags to speedtest API
|
peerRESTVersion = "v39" // add more flags to speedtest API
|
||||||
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
|
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
|
||||||
@ -69,6 +71,7 @@ const (
|
|||||||
peerRESTURL = "url"
|
peerRESTURL = "url"
|
||||||
peerRESTSha256Sum = "sha256sum"
|
peerRESTSha256Sum = "sha256sum"
|
||||||
peerRESTReleaseInfo = "releaseinfo"
|
peerRESTReleaseInfo = "releaseinfo"
|
||||||
|
peerRESTExecAt = "exec-at"
|
||||||
|
|
||||||
peerRESTListenBucket = "bucket"
|
peerRESTListenBucket = "bucket"
|
||||||
peerRESTListenPrefix = "prefix"
|
peerRESTListenPrefix = "prefix"
|
||||||
@ -76,3 +79,5 @@ const (
|
|||||||
peerRESTListenEvents = "events"
|
peerRESTListenEvents = "events"
|
||||||
peerRESTLogMask = "log-mask"
|
peerRESTLogMask = "log-mask"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const restartUpdateDelay = 250 * time.Millisecond
|
||||||
|
@ -693,6 +693,18 @@ func (s *peerRESTServer) SignalServiceHandler(vars *grid.MSS) (np grid.NoPayload
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return np, grid.NewRemoteErr(err)
|
return np, grid.NewRemoteErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait until the specified time before executing the signal.
|
||||||
|
if t := vars.Get(peerRESTExecAt); t != "" {
|
||||||
|
execAt, err := time.Parse(time.RFC3339Nano, vars.Get(peerRESTExecAt))
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(GlobalContext, "signalservice", err)
|
||||||
|
execAt = time.Now().Add(restartUpdateDelay)
|
||||||
|
}
|
||||||
|
if d := time.Until(execAt); d > 0 {
|
||||||
|
time.Sleep(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
signal := serviceSignal(si)
|
signal := serviceSignal(si)
|
||||||
switch signal {
|
switch signal {
|
||||||
case serviceRestart, serviceStop:
|
case serviceRestart, serviceStop:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user