improve server update behavior by re-using memory properly (#18831)

This commit is contained in:
Harshavardhana 2024-01-19 18:27:58 -08:00 committed by GitHub
parent e11d851aee
commit f9b4a8d6e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 42 additions and 48 deletions

View File

@ -149,7 +149,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
} }
// Download Binary Once // Download Binary Once
reader, err := downloadBinary(u, mode) bin, err := downloadBinary(u, mode)
if err != nil { if err != nil {
logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err)) logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err))
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
@ -157,7 +157,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
} }
// Push binary to other servers // Push binary to other servers
for _, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, reader) { for _, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, bin) {
if nerr.Err != nil { if nerr.Err != nil {
err := AdminError{ err := AdminError{
Code: AdminUpdateApplyFailure, Code: AdminUpdateApplyFailure,
@ -171,7 +171,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req
} }
} }
err = verifyBinary(u, sha256Sum, releaseInfo, mode, reader) err = verifyBinary(u, sha256Sum, releaseInfo, mode, bytes.NewReader(bin))
if err != nil { if err != nil {
logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err)) logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err))
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)

View File

@ -18,6 +18,7 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -338,7 +339,7 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io
} }
// VerifyBinary - asks remote peers to verify the checksum // VerifyBinary - asks remote peers to verify the checksum
func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, reader []byte) []NotificationPeerErr { func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, bin []byte) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)) ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients { for idx, client := range sys.peerClients {
if client == nil { if client == nil {
@ -346,7 +347,7 @@ func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256
} }
client := client client := client
ng.Go(ctx, func() error { ng.Go(ctx, func() error {
return client.VerifyBinary(ctx, u, sha256Sum, releaseInfo, reader) return client.VerifyBinary(ctx, u, sha256Sum, releaseInfo, bytes.NewReader(bin))
}, idx, *client.host) }, idx, *client.host)
} }
return ng.Wait() return ng.Wait()

View File

@ -21,6 +21,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/gob" "encoding/gob"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -475,26 +476,14 @@ func (client *peerRESTClient) LoadGroup(group string) error {
return nil return nil
} }
type binaryInfo struct {
URL *url.URL
Sha256Sum []byte
ReleaseInfo string
BinaryFile []byte
}
// VerifyBinary - sends verify binary message to remote peers. // VerifyBinary - sends verify binary message to remote peers.
func (client *peerRESTClient) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, readerInput []byte) error { func (client *peerRESTClient) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, reader io.Reader) error {
values := make(url.Values) values := make(url.Values)
var reader bytes.Buffer values.Set(peerRESTURL, u.String())
if err := gob.NewEncoder(&reader).Encode(binaryInfo{ values.Set(peerRESTSha256Sum, hex.EncodeToString(sha256Sum))
URL: u, values.Set(peerRESTReleaseInfo, releaseInfo)
Sha256Sum: sha256Sum,
ReleaseInfo: releaseInfo, respBody, err := client.callWithContext(ctx, peerRESTMethodVerifyBinary, values, reader, -1)
BinaryFile: readerInput,
}); err != nil {
return err
}
respBody, err := client.callWithContext(ctx, peerRESTMethodDownloadBinary, values, &reader, -1)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc. // Copyright (c) 2015-2024 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //
@ -18,8 +18,7 @@
package cmd package cmd
const ( const (
peerRESTVersion = "v35" // Add new service restart behavior peerRESTVersion = "v36" // Rewrite VerifyBinaryHandler()
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@ -42,7 +41,7 @@ const (
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata" peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
peerRESTMethodGetBucketStats = "/getbucketstats" peerRESTMethodGetBucketStats = "/getbucketstats"
peerRESTMethodGetAllBucketStats = "/getallbucketstats" peerRESTMethodGetAllBucketStats = "/getallbucketstats"
peerRESTMethodDownloadBinary = "/downloadbinary" peerRESTMethodVerifyBinary = "/verifybinary"
peerRESTMethodCommitBinary = "/commitbinary" peerRESTMethodCommitBinary = "/commitbinary"
peerRESTMethodSignalService = "/signalservice" peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
@ -111,6 +110,10 @@ const (
peerRESTDryRun = "dry-run" peerRESTDryRun = "dry-run"
peerRESTForce = "force" peerRESTForce = "force"
peerRESTURL = "url"
peerRESTSha256Sum = "sha256sum"
peerRESTReleaseInfo = "releaseinfo"
peerRESTListenBucket = "bucket" peerRESTListenBucket = "bucket"
peerRESTListenPrefix = "prefix" peerRESTListenPrefix = "prefix"
peerRESTListenSuffix = "suffix" peerRESTListenSuffix = "suffix"

View File

@ -20,11 +20,13 @@ package cmd
import ( import (
"context" "context"
"encoding/gob" "encoding/gob"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -824,8 +826,8 @@ func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(ids)) logger.LogIf(ctx, gob.NewEncoder(w).Encode(ids))
} }
// DownloadBinary - updates the current server. // VerifyBinary - verifies the downloaded binary is in-tact
func (s *peerRESTServer) DownloadBinaryHandler(w http.ResponseWriter, r *http.Request) { func (s *peerRESTServer) VerifyBinaryHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) { if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request")) s.writeErrorResponse(w, errors.New("Invalid request"))
return return
@ -836,14 +838,19 @@ func (s *peerRESTServer) DownloadBinaryHandler(w http.ResponseWriter, r *http.Re
return return
} }
var info binaryInfo u, err := url.Parse(r.Form.Get(peerRESTURL))
err := gob.NewDecoder(r.Body).Decode(&info)
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
sha256Sum, err := hex.DecodeString(r.Form.Get(peerRESTSha256Sum))
if err != nil {
s.writeErrorResponse(w, err)
return
}
releaseInfo := r.Form.Get(peerRESTReleaseInfo)
if err = verifyBinary(info.URL, info.Sha256Sum, info.ReleaseInfo, getMinioMode(), info.BinaryFile); err != nil { if err = verifyBinary(u, sha256Sum, releaseInfo, getMinioMode(), r.Body); err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
@ -1524,7 +1531,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(h(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(h(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(h(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(h(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(h(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(h(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDownloadBinary).HandlerFunc(h(server.DownloadBinaryHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodVerifyBinary).HandlerFunc(h(server.VerifyBinaryHandler)).Queries(restQueries(peerRESTURL, peerRESTSha256Sum, peerRESTReleaseInfo)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCommitBinary).HandlerFunc(h(server.CommitBinaryHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCommitBinary).HandlerFunc(h(server.CommitBinaryHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(h(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(h(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicy).HandlerFunc(h(server.LoadPolicyHandler)).Queries(restQueries(peerRESTPolicy)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicy).HandlerFunc(h(server.LoadPolicyHandler)).Queries(restQueries(peerRESTPolicy)...)

View File

@ -19,7 +19,6 @@ package cmd
import ( import (
"bufio" "bufio"
"bytes"
"crypto" "crypto"
"crypto/tls" "crypto/tls"
"encoding/hex" "encoding/hex"
@ -508,10 +507,10 @@ func getUpdateReaderFromURL(u *url.URL, transport http.RoundTripper, mode string
return resp.Body, nil return resp.Body, nil
} }
var updateInProgress uint32 var updateInProgress atomic.Uint32
// Function to get the reader from an architecture // Function to get the reader from an architecture
func downloadBinary(u *url.URL, mode string) (readerReturn []byte, err error) { func downloadBinary(u *url.URL, mode string) (bin []byte, err error) {
transport := getUpdateTransport(30 * time.Second) transport := getUpdateTransport(30 * time.Second)
var reader io.ReadCloser var reader io.ReadCloser
if u.Scheme == "https" || u.Scheme == "http" { if u.Scheme == "https" || u.Scheme == "http" {
@ -523,13 +522,8 @@ func downloadBinary(u *url.URL, mode string) (readerReturn []byte, err error) {
return nil, fmt.Errorf("unsupported protocol scheme: %s", u.Scheme) return nil, fmt.Errorf("unsupported protocol scheme: %s", u.Scheme)
} }
defer xhttp.DrainBody(reader) defer xhttp.DrainBody(reader)
// convert a Reader to bytes
binaryFile, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return binaryFile, nil return io.ReadAll(reader)
} }
const ( const (
@ -537,11 +531,11 @@ const (
defaultMinisignPubkey = "RWTx5Zr1tiHQLwG9keckT0c45M3AGeHD6IvimQHpyRywVWGbP1aVSGav" defaultMinisignPubkey = "RWTx5Zr1tiHQLwG9keckT0c45M3AGeHD6IvimQHpyRywVWGbP1aVSGav"
) )
func verifyBinary(u *url.URL, sha256Sum []byte, releaseInfo, mode string, reader []byte) (err error) { func verifyBinary(u *url.URL, sha256Sum []byte, releaseInfo, mode string, reader io.Reader) (err error) {
if !atomic.CompareAndSwapUint32(&updateInProgress, 0, 1) { if !updateInProgress.CompareAndSwap(0, 1) {
return errors.New("update already in progress") return errors.New("update already in progress")
} }
defer atomic.StoreUint32(&updateInProgress, 0) defer updateInProgress.Store(0)
transport := getUpdateTransport(30 * time.Second) transport := getUpdateTransport(30 * time.Second)
opts := selfupdate.Options{ opts := selfupdate.Options{
@ -571,7 +565,7 @@ func verifyBinary(u *url.URL, sha256Sum []byte, releaseInfo, mode string, reader
opts.Verifier = v opts.Verifier = v
} }
if err = selfupdate.PrepareAndCheckBinary(bytes.NewReader(reader), opts); err != nil { if err = selfupdate.PrepareAndCheckBinary(reader, opts); err != nil {
var pathErr *os.PathError var pathErr *os.PathError
if errors.As(err, &pathErr) { if errors.As(err, &pathErr) {
return AdminError{ return AdminError{
@ -592,10 +586,10 @@ func verifyBinary(u *url.URL, sha256Sum []byte, releaseInfo, mode string, reader
} }
func commitBinary() (err error) { func commitBinary() (err error) {
if !atomic.CompareAndSwapUint32(&updateInProgress, 0, 1) { if !updateInProgress.CompareAndSwap(0, 1) {
return errors.New("update already in progress") return errors.New("update already in progress")
} }
defer atomic.StoreUint32(&updateInProgress, 0) defer updateInProgress.Store(0)
opts := selfupdate.Options{} opts := selfupdate.Options{}