Add ServerDrivesPerfInfo() admin API (#6969)

This is part of implementation for mc admin health command. The
ServerDrivesPerfInfo() admin API returns read and write speed
information for all the drives (local and remote) in a given Minio
server deployment.

Part of minio/mc#2606
This commit is contained in:
Nitish Tiwari
2018-12-31 23:16:44 +05:30
committed by kannappanr
parent 75cd4201b0
commit fcb56d864c
11 changed files with 396 additions and 2 deletions

View File

@@ -35,6 +35,7 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin"
@@ -284,6 +285,57 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
writeSuccessResponseJSON(w, jsonBytes)
}
// ServerDrivesPerfInfo holds informantion about address, performance
// of all drives on one server. It also reports any errors if encountered
// while trying to reach this server.
type ServerDrivesPerfInfo struct {
Addr string `json:"addr"`
Error string `json:"error,omitempty"`
Perf []disk.Performance `json:"perf"`
}
// PerfInfoHandler - GET /minio/admin/v1/performance?perfType={perfType}
// ----------
// Get all performance information based on input type
// Supported types = drive
func (a adminAPIHandlers) PerfInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PerfInfo")
// Authenticate request
// Setting the region as empty so as the mc server info command is irrespective to the region.
adminAPIErr := checkAdminRequestAuthType(ctx, r, "")
if adminAPIErr != ErrNone {
writeErrorResponseJSON(w, adminAPIErr, r.URL)
return
}
vars := mux.Vars(r)
perfType := vars["perfType"]
if perfType == "drive" {
// Get drive performance details from local server's drive(s)
dp := localEndpointsPerf(globalEndpoints)
// Notify all other Minio peers to report drive performance numbers
dps := globalNotificationSys.DrivePerfInfo()
dps = append(dps, dp)
// Marshal API response
jsonBytes, err := json.Marshal(dps)
if err != nil {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Reply with performance information (across nodes in a
// distributed setup) as json.
writeSuccessResponseJSON(w, jsonBytes)
} else {
writeErrorResponseJSON(w, ErrNotImplemented, r.URL)
}
return
}
// StartProfilingResult contains the status of the starting
// profiling action in a given server
type StartProfilingResult struct {

View File

@@ -60,6 +60,11 @@ func registerAdminRouter(router *mux.Router, enableIAM bool) {
adminV1Router.Methods(http.MethodPost).Path("/heal/").HandlerFunc(httpTraceAll(adminAPI.HealHandler))
adminV1Router.Methods(http.MethodPost).Path("/heal/{bucket}").HandlerFunc(httpTraceAll(adminAPI.HealHandler))
adminV1Router.Methods(http.MethodPost).Path("/heal/{bucket}/{prefix:.*}").HandlerFunc(httpTraceAll(adminAPI.HealHandler))
/// Health operations
// Performance command - return performance details based on input type
adminV1Router.Methods(http.MethodGet).Path("/performance").HandlerFunc(httpTraceAll(adminAPI.PerfInfoHandler)).Queries("perfType", "{perfType:.*}")
}
// Profiling operations

View File

@@ -29,6 +29,7 @@ import (
"github.com/minio/minio-go/pkg/set"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/mountinfo"
)
@@ -197,6 +198,34 @@ func (endpoints EndpointList) GetString(i int) string {
return endpoints[i].String()
}
// localEndpointsPerf - returns ServerDrivesPerfInfo for only the
// local endpoints from given list of endpoints
func localEndpointsPerf(endpoints EndpointList) ServerDrivesPerfInfo {
var dps []disk.Performance
var addr string
for _, endpoint := range endpoints {
// Only proceed for local endpoints
if endpoint.IsLocal {
addr = GetLocalPeer(endpoints)
if _, err := os.Stat(endpoint.Path); err != nil {
// Since this drive is not available, add relevant details and proceed
dps = append(dps, disk.Performance{Path: endpoint.Path, Error: err.Error()})
continue
}
tempObj := mustGetUUID()
fsPath := pathJoin(endpoint.Path, minioMetaTmpBucket, tempObj)
dp := disk.GetPerformance(fsPath)
dp.Path = endpoint.Path
dps = append(dps, dp)
}
}
return ServerDrivesPerfInfo{
Addr: addr,
Perf: dps,
}
}
// NewEndpointList - returns new endpoint list based on input args.
func NewEndpointList(args ...string) (endpoints EndpointList, err error) {
var endpointType EndpointType

View File

@@ -512,6 +512,31 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
return sys.send(args.BucketName, args.ToEvent(), targetIDs...)
}
// DrivePerfInfo - Drive speed (read and write) information
func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo {
reply := make([]ServerDrivesPerfInfo, len(sys.peerRPCClientMap))
var wg sync.WaitGroup
var i int
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, idx int) {
defer wg.Done()
di, err := client.DrivePerfInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
di.Addr = addr.String()
di.Error = err.Error()
}
reply[idx] = di
}(addr, client, i)
i++
}
wg.Wait()
return reply
}
// NewNotificationSys - creates new notification system object.
func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys {
targetList := getNotificationTargets(config)

View File

@@ -141,6 +141,15 @@ func (rpcClient *PeerRPCClient) LoadCredentials() error {
return rpcClient.Call(peerServiceName+".LoadCredentials", &args, &reply)
}
// DrivePerfInfo - returns drive performance info for remote server.
func (rpcClient *PeerRPCClient) DrivePerfInfo() (ServerDrivesPerfInfo, error) {
args := AuthArgs{}
var reply ServerDrivesPerfInfo
err := rpcClient.Call(peerServiceName+".DrivePerfInfo", &args, &reply)
return reply, err
}
// NewPeerRPCClient - returns new peer RPC client.
func NewPeerRPCClient(host *xnet.Host) (*PeerRPCClient, error) {
scheme := "http"

View File

@@ -238,6 +238,17 @@ func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidRepl
return globalConfigSys.Load(newObjectLayerFn())
}
// DrivePerfInfo - handles drive performance RPC call
func (receiver *peerRPCReceiver) DrivePerfInfo(args *AuthArgs, reply *ServerDrivesPerfInfo) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
*reply = localEndpointsPerf(globalEndpoints)
return nil
}
// NewPeerRPCServer - returns new peer RPC server.
func NewPeerRPCServer() (*xrpc.Server, error) {
rpcServer := xrpc.NewServer()