mirror of
https://github.com/minio/minio.git
synced 2025-01-25 13:43:17 -05:00
upgrade: Split in two steps to ensure a stable retry (#15396)
Currently, if one server in a distributed setup fails to upgrade due to any reasons, it is not possible to upgrade again unless nodes are restarted. To fix this, split the upgrade process into two steps : - download the new binary on all servers - If successful, overwrite the old binary with the new one
This commit is contained in:
parent
4c6498d726
commit
e4b51235f8
@ -71,16 +71,6 @@ const (
|
||||
mgmtForceStop = "forceStop"
|
||||
)
|
||||
|
||||
func updateServer(u *url.URL, sha256Sum []byte, lrTime time.Time, releaseInfo string, mode string) (us madmin.ServerUpdateStatus, err error) {
|
||||
if err = doUpdate(u, lrTime, sha256Sum, releaseInfo, mode); err != nil {
|
||||
return us, err
|
||||
}
|
||||
|
||||
us.CurrentVersion = Version
|
||||
us.UpdatedVersion = lrTime.Format(minioReleaseTagTimeLayout)
|
||||
return us, nil
|
||||
}
|
||||
|
||||
// ServerUpdateHandler - POST /minio/admin/v3/update?updateURL={updateURL}
|
||||
// ----------
|
||||
// updates all minio servers and restarts them gracefully.
|
||||
@ -152,7 +142,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
for _, nerr := range globalNotificationSys.ServerUpdate(ctx, u, sha256Sum, lrTime, releaseInfo) {
|
||||
for _, nerr := range globalNotificationSys.DownloadBinary(ctx, u, sha256Sum, releaseInfo) {
|
||||
if nerr.Err != nil {
|
||||
err := AdminError{
|
||||
Code: AdminUpdateApplyFailure,
|
||||
@ -166,13 +156,39 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
|
||||
updateStatus, err := updateServer(u, sha256Sum, lrTime, releaseInfo, mode)
|
||||
err = downloadBinary(u, sha256Sum, releaseInfo, mode)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err))
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
for _, nerr := range globalNotificationSys.CommitBinary(ctx) {
|
||||
if nerr.Err != nil {
|
||||
err := AdminError{
|
||||
Code: AdminUpdateApplyFailure,
|
||||
Message: nerr.Err.Error(),
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
}
|
||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||
logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err))
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err = commitBinary()
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err))
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
updateStatus := madmin.ServerUpdateStatus{
|
||||
CurrentVersion: Version,
|
||||
UpdatedVersion: lrTime.Format(minioReleaseTagTimeLayout),
|
||||
}
|
||||
|
||||
// Marshal API response
|
||||
jsonBytes, err := json.Marshal(updateStatus)
|
||||
if err != nil {
|
||||
|
@ -356,8 +356,8 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
|
||||
return
|
||||
}
|
||||
|
||||
// ServerUpdate - updates remote peers.
|
||||
func (sys *NotificationSys) ServerUpdate(ctx context.Context, u *url.URL, sha256Sum []byte, lrTime time.Time, releaseInfo string) []NotificationPeerErr {
|
||||
// DownloadBinary - asks remote peers to download a new binary from the URL and to verify the checksum
|
||||
func (sys *NotificationSys) DownloadBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
@ -365,7 +365,22 @@ func (sys *NotificationSys) ServerUpdate(ctx context.Context, u *url.URL, sha256
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.ServerUpdate(ctx, u, sha256Sum, lrTime, releaseInfo)
|
||||
return client.DownloadBinary(ctx, u, sha256Sum, releaseInfo)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// CommitBinary - asks remote peers to overwrite the old binary with the new one
|
||||
func (sys *NotificationSys) CommitBinary(ctx context.Context) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.CommitBinary(ctx)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
|
@ -418,26 +418,34 @@ func (client *peerRESTClient) LoadGroup(group string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type serverUpdateInfo struct {
|
||||
type binaryInfo struct {
|
||||
URL *url.URL
|
||||
Sha256Sum []byte
|
||||
Time time.Time
|
||||
ReleaseInfo string
|
||||
}
|
||||
|
||||
// ServerUpdate - sends server update message to remote peers.
|
||||
func (client *peerRESTClient) ServerUpdate(ctx context.Context, u *url.URL, sha256Sum []byte, lrTime time.Time, releaseInfo string) error {
|
||||
// DownloadBinary - sends download binary message to remote peers.
|
||||
func (client *peerRESTClient) DownloadBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string) error {
|
||||
values := make(url.Values)
|
||||
var reader bytes.Buffer
|
||||
if err := gob.NewEncoder(&reader).Encode(serverUpdateInfo{
|
||||
if err := gob.NewEncoder(&reader).Encode(binaryInfo{
|
||||
URL: u,
|
||||
Sha256Sum: sha256Sum,
|
||||
Time: lrTime,
|
||||
ReleaseInfo: releaseInfo,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodServerUpdate, values, &reader, -1)
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodDownloadBinary, values, &reader, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CommitBinary - sends commit binary message to remote peers.
|
||||
func (client *peerRESTClient) CommitBinary(ctx context.Context) error {
|
||||
respBody, err := client.callWithContext(ctx, peerRESTMethodCommitBinary, nil, nil, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -18,7 +18,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
peerRESTVersion = "v23" // Added /metrics
|
||||
peerRESTVersion = "v24" // Change ServerUpdate to DownloadBinary and CommitBinary
|
||||
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
|
||||
peerRESTPrefix = minioReservedBucketPath + "/peer"
|
||||
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
|
||||
@ -39,7 +39,8 @@ const (
|
||||
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
|
||||
peerRESTMethodGetBucketStats = "/getbucketstats"
|
||||
peerRESTMethodGetAllBucketStats = "/getallbucketstats"
|
||||
peerRESTMethodServerUpdate = "/serverupdate"
|
||||
peerRESTMethodDownloadBinary = "/downloadbinary"
|
||||
peerRESTMethodCommitBinary = "/commitbinary"
|
||||
peerRESTMethodSignalService = "/signalservice"
|
||||
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
|
||||
peerRESTMethodGetLocks = "/getlocks"
|
||||
|
@ -751,8 +751,8 @@ func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request)
|
||||
logger.LogIf(ctx, gob.NewEncoder(w).Encode(ids))
|
||||
}
|
||||
|
||||
// ServerUpdateHandler - updates the current server.
|
||||
func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// DownloadBinary - updates the current server.
|
||||
func (s *peerRESTServer) DownloadBinaryHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
@ -763,14 +763,27 @@ func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
var info serverUpdateInfo
|
||||
var info binaryInfo
|
||||
err := gob.NewDecoder(r.Body).Decode(&info)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = updateServer(info.URL, info.Sha256Sum, info.Time, info.ReleaseInfo, getMinioMode()); err != nil {
|
||||
if err = downloadBinary(info.URL, info.Sha256Sum, info.ReleaseInfo, getMinioMode()); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// CommitBinary - overwrites the current binary with the new one.
|
||||
func (s *peerRESTServer) CommitBinaryHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
if err := commitBinary(); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
@ -1297,7 +1310,8 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(httpTraceHdrs(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadBinary).HandlerFunc(httpTraceHdrs(server.DownloadBinaryHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCommitBinary).HandlerFunc(httpTraceHdrs(server.CommitBinaryHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(httpTraceAll(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicy).HandlerFunc(httpTraceAll(server.LoadPolicyHandler)).Queries(restQueries(peerRESTPolicy)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicyMapping).HandlerFunc(httpTraceAll(server.LoadPolicyMappingHandler)).Queries(restQueries(peerRESTUserOrGroup)...)
|
||||
|
@ -518,7 +518,7 @@ func getUpdateReaderFromURL(u *url.URL, transport http.RoundTripper, mode string
|
||||
|
||||
var updateInProgress uint32
|
||||
|
||||
func doUpdate(u *url.URL, lrTime time.Time, sha256Sum []byte, releaseInfo string, mode string) (err error) {
|
||||
func downloadBinary(u *url.URL, sha256Sum []byte, releaseInfo string, mode string) (err error) {
|
||||
if !atomic.CompareAndSwapUint32(&updateInProgress, 0, 1) {
|
||||
return errors.New("update already in progress")
|
||||
}
|
||||
@ -565,7 +565,35 @@ func doUpdate(u *url.URL, lrTime time.Time, sha256Sum []byte, releaseInfo string
|
||||
opts.Verifier = v
|
||||
}
|
||||
|
||||
if err = selfupdate.Apply(reader, opts); err != nil {
|
||||
if err = selfupdate.PrepareAndCheckBinary(reader, opts); err != nil {
|
||||
var pathErr *os.PathError
|
||||
if errors.As(err, &pathErr) {
|
||||
return AdminError{
|
||||
Code: AdminUpdateApplyFailure,
|
||||
Message: fmt.Sprintf("Unable to update the binary at %s: %v",
|
||||
filepath.Dir(pathErr.Path), pathErr.Err),
|
||||
StatusCode: http.StatusForbidden,
|
||||
}
|
||||
}
|
||||
return AdminError{
|
||||
Code: AdminUpdateApplyFailure,
|
||||
Message: err.Error(),
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func commitBinary() (err error) {
|
||||
if !atomic.CompareAndSwapUint32(&updateInProgress, 0, 1) {
|
||||
return errors.New("update already in progress")
|
||||
}
|
||||
defer atomic.StoreUint32(&updateInProgress, 0)
|
||||
|
||||
opts := selfupdate.Options{}
|
||||
|
||||
if err = selfupdate.CommitBinary(opts); err != nil {
|
||||
if rerr := selfupdate.RollbackError(err); rerr != nil {
|
||||
return AdminError{
|
||||
Code: AdminUpdateApplyFailure,
|
||||
|
2
go.mod
2
go.mod
@ -51,7 +51,7 @@ require (
|
||||
github.com/minio/madmin-go v1.4.3
|
||||
github.com/minio/minio-go/v7 v7.0.32
|
||||
github.com/minio/pkg v1.1.26
|
||||
github.com/minio/selfupdate v0.4.0
|
||||
github.com/minio/selfupdate v0.5.0
|
||||
github.com/minio/sha256-simd v1.0.0
|
||||
github.com/minio/simdjson-go v0.4.2
|
||||
github.com/minio/sio v0.3.0
|
||||
|
4
go.sum
4
go.sum
@ -634,8 +634,8 @@ github.com/minio/minio-go/v7 v7.0.32/go.mod h1:/sjRKkKIA75CKh1iu8E3qBy7ktBmCCDGI
|
||||
github.com/minio/pkg v1.1.20/go.mod h1:Xo7LQshlxGa9shKwJ7NzQbgW4s8T/Wc1cOStR/eUiMY=
|
||||
github.com/minio/pkg v1.1.26 h1:a8x4sHNBxCiHEkxZ/0EBTLqvV3nMtM2G/A6lXNfXN3U=
|
||||
github.com/minio/pkg v1.1.26/go.mod h1:z9PfmEI804KFkF6eY4LoGe8IDVvTCsYGVuaf58Dr0WI=
|
||||
github.com/minio/selfupdate v0.4.0 h1:A7t07pN4Ch1tBTIRStW0KhUVyykz+2muCqFsITQeEW8=
|
||||
github.com/minio/selfupdate v0.4.0/go.mod h1:mcDkzMgq8PRcpCRJo/NlPY7U45O5dfYl2Y0Rg7IustY=
|
||||
github.com/minio/selfupdate v0.5.0 h1:0UH1HlL49+2XByhovKl5FpYTjKfvrQ2sgL1zEXK6mfI=
|
||||
github.com/minio/selfupdate v0.5.0/go.mod h1:mcDkzMgq8PRcpCRJo/NlPY7U45O5dfYl2Y0Rg7IustY=
|
||||
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
|
||||
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
|
||||
|
Loading…
x
Reference in New Issue
Block a user