Migrate all Peer communication to common Notification subsystem (#7031)

Deprecate the use of Admin Peers concept and migrate all peer
communication to Notification subsystem. This finally allows
for a common subsystem for all peer notification in case of
distributed server deployments.
This commit is contained in:
Harshavardhana
2019-01-14 12:14:20 +05:30
committed by GitHub
parent 9a71f2fdfa
commit 8757c963ba
18 changed files with 546 additions and 872 deletions

View File

@@ -17,22 +17,22 @@
package cmd
import (
"archive/zip"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/cpu"
@@ -41,9 +41,8 @@ import (
"github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/mem"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/quick"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
const (
@@ -97,6 +96,11 @@ func (a adminAPIHandlers) VersionHandler(w http.ResponseWriter, r *http.Request)
func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ServiceStatus")
if globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
adminAPIErr := checkAdminRequestAuthType(ctx, r, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(w, adminAPIErr, r.URL)
@@ -109,13 +113,8 @@ func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Re
CommitID: CommitID,
}
// Fetch uptimes from all peers. This may fail to due to lack
// of read-quorum availability.
uptime, err := getPeerUptimes(globalAdminPeers)
if err != nil {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Fetch uptimes from all peers and pick the latest.
uptime := getPeerUptimes(globalNotificationSys.ServerInfo(ctx))
// Create API response
serverStatus := madmin.ServiceStatus{
@@ -129,6 +128,7 @@ func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Re
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Reply with storage information (across nodes in a
// distributed setup) as json.
writeSuccessResponseJSON(w, jsonBytes)
@@ -142,6 +142,11 @@ func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Re
func (a adminAPIHandlers) ServiceStopNRestartHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ServiceStopNRestart")
if globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
adminAPIErr := checkAdminRequestAuthType(ctx, r, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(w, adminAPIErr, r.URL)
@@ -151,7 +156,6 @@ func (a adminAPIHandlers) ServiceStopNRestartHandler(w http.ResponseWriter, r *h
var sa madmin.ServiceAction
err := json.NewDecoder(r.Body).Decode(&sa)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(w, ErrRequestBodyParse, r.URL)
return
}
@@ -171,7 +175,15 @@ func (a adminAPIHandlers) ServiceStopNRestartHandler(w http.ResponseWriter, r *h
// Reply to the client before restarting minio server.
writeSuccessResponseHeadersOnly(w)
sendServiceCmd(globalAdminPeers, serviceSig)
// Notify all other Minio peers signal service.
for _, nerr := range globalNotificationSys.SignalService(serviceSig) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
globalServiceSignalCh <- serviceSig
}
// ServerProperties holds some server information such as, version, region
@@ -235,6 +247,12 @@ type ServerInfo struct {
func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ServerInfo")
objectAPI := newObjectLayerFn()
if objectAPI == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
// Authenticate request
// Setting the region as empty so as the mc server info command is irrespective to the region.
@@ -244,39 +262,33 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
return
}
// Web service response
reply := make([]ServerInfo, len(globalAdminPeers))
var wg sync.WaitGroup
// Gather server information for all nodes
for i, p := range globalAdminPeers {
wg.Add(1)
// Gather information from a peer in a goroutine
go func(idx int, peer adminPeer) {
defer wg.Done()
// Initialize server info at index
reply[idx] = ServerInfo{Addr: peer.addr}
serverInfoData, err := peer.cmdRunner.ServerInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", peer.addr)
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
reply[idx].Error = err.Error()
return
}
reply[idx].Data = &serverInfoData
}(i, p)
thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
if err != nil {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
wg.Wait()
serverInfo := globalNotificationSys.ServerInfo(ctx)
// Once we have recieved all the ServerInfo from peers
// add the local peer server info as well.
serverInfo = append(serverInfo, ServerInfo{
Addr: thisAddr.String(),
Data: &ServerInfoData{
StorageInfo: objectAPI.StorageInfo(ctx),
ConnStats: globalConnStats.toServerConnStats(),
HTTPStats: globalHTTPStats.toServerHTTPStats(),
Properties: ServerProperties{
Uptime: UTCNow().Sub(globalBootTime),
Version: Version,
CommitID: CommitID,
SQSARN: globalNotificationSys.GetARNList(),
Region: globalServerConfig.GetRegion(),
},
},
})
// Marshal API response
jsonBytes, err := json.Marshal(reply)
jsonBytes, err := json.Marshal(serverInfo)
if err != nil {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
@@ -323,7 +335,7 @@ func (a adminAPIHandlers) PerfInfoHandler(w http.ResponseWriter, r *http.Request
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
if objLayer == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -417,6 +429,11 @@ type StartProfilingResult struct {
func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "StartProfiling")
if globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
adminAPIErr := checkAdminRequestAuthType(ctx, r, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(w, adminAPIErr, r.URL)
@@ -426,31 +443,53 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R
vars := mux.Vars(r)
profiler := vars["profilerType"]
startProfilingResult := make([]StartProfilingResult, len(globalAdminPeers))
// Call StartProfiling function on all nodes and save results
wg := sync.WaitGroup{}
for i, peer := range globalAdminPeers {
wg.Add(1)
go func(idx int, peer adminPeer) {
defer wg.Done()
result := StartProfilingResult{NodeName: peer.addr}
if err := peer.cmdRunner.StartProfiling(profiler); err != nil {
result.Error = err.Error()
return
}
result.Success = true
startProfilingResult[idx] = result
}(i, peer)
thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
if err != nil {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Start profiling on remote servers.
hostErrs := globalNotificationSys.StartProfiling(profiler)
// Start profiling locally as well.
{
if globalProfiler != nil {
globalProfiler.Stop()
}
prof, err := startProfiler(profiler, "")
if err != nil {
hostErrs = append(hostErrs, NotificationPeerErr{
Host: *thisAddr,
Err: err,
})
} else {
globalProfiler = prof
hostErrs = append(hostErrs, NotificationPeerErr{
Host: *thisAddr,
})
}
}
var startProfilingResult []StartProfilingResult
for _, nerr := range hostErrs {
result := StartProfilingResult{NodeName: nerr.Host.String()}
if nerr.Err != nil {
result.Error = nerr.Err.Error()
} else {
result.Success = true
}
startProfilingResult = append(startProfilingResult, result)
}
wg.Wait()
// Create JSON result and send it to the client
startProfilingResultInBytes, err := json.Marshal(startProfilingResult)
if err != nil {
writeCustomErrorResponseJSON(w, http.StatusInternalServerError, err.Error(), r.URL)
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
writeSuccessResponseJSON(w, []byte(startProfilingResultInBytes))
}
@@ -478,51 +517,18 @@ func (f dummyFileInfo) Sys() interface{} { return f.sys }
func (a adminAPIHandlers) DownloadProfilingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DownloadProfiling")
if globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
adminAPIErr := checkAdminRequestAuthType(ctx, r, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(w, adminAPIErr, r.URL)
return
}
profilingDataFound := false
// Initialize a zip writer which will provide a zipped content
// of profiling data of all nodes
zipWriter := zip.NewWriter(w)
defer zipWriter.Close()
for i, peer := range globalAdminPeers {
// Get profiling data from a node
data, err := peer.cmdRunner.DownloadProfilingData()
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to download profiling data from node `%s`, reason: %s", peer.addr, err.Error()))
continue
}
profilingDataFound = true
// Send profiling data to zip as file
header, err := zip.FileInfoHeader(dummyFileInfo{
name: fmt.Sprintf("profiling-%d", i),
size: int64(len(data)),
mode: 0600,
modTime: time.Now().UTC(),
isDir: false,
sys: nil,
})
if err != nil {
continue
}
writer, err := zipWriter.CreateHeader(header)
if err != nil {
continue
}
if _, err = io.Copy(writer, bytes.NewBuffer(data)); err != nil {
return
}
}
if !profilingDataFound {
if !globalNotificationSys.DownloadProfilingData(ctx, w) {
writeErrorResponseJSON(w, ErrAdminProfilerNotEnabled, r.URL)
return
}
@@ -952,6 +958,11 @@ func (a adminAPIHandlers) ListUsers(w http.ResponseWriter, r *http.Request) {
func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SetUserStatus")
if globalNotificationSys == nil || globalIAMSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
@@ -988,10 +999,10 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request)
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
for _, nerr := range globalNotificationSys.LoadUsers() {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
@@ -1002,7 +1013,7 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalNotificationSys == nil || globalIAMSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1056,10 +1067,10 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
for _, nerr := range globalNotificationSys.LoadUsers() {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
@@ -1070,7 +1081,7 @@ func (a adminAPIHandlers) ListCannedPolicies(w http.ResponseWriter, r *http.Requ
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1102,7 +1113,7 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1129,10 +1140,10 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
for _, nerr := range globalNotificationSys.LoadUsers() {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
@@ -1143,7 +1154,7 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1194,10 +1205,10 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
for _, nerr := range globalNotificationSys.LoadUsers() {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
@@ -1208,7 +1219,7 @@ func (a adminAPIHandlers) SetUserPolicy(w http.ResponseWriter, r *http.Request)
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1241,10 +1252,10 @@ func (a adminAPIHandlers) SetUserPolicy(w http.ResponseWriter, r *http.Request)
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
for _, nerr := range globalNotificationSys.LoadUsers() {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
@@ -1255,7 +1266,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1472,7 +1483,7 @@ func (a adminAPIHandlers) UpdateAdminCredentialsHandler(w http.ResponseWriter,
// Get current object layer instance.
objectAPI := newObjectLayerFn()
if objectAPI == nil {
if objectAPI == nil || globalNotificationSys == nil {
writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL)
return
}
@@ -1535,10 +1546,10 @@ func (a adminAPIHandlers) UpdateAdminCredentialsHandler(w http.ResponseWriter,
}
// Notify all other Minio peers to update credentials
for host, err := range globalNotificationSys.LoadCredentials() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
for _, nerr := range globalNotificationSys.LoadCredentials() {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}