From 24fb1bf258b023c098ffb95558471609b5f3ed07 Mon Sep 17 00:00:00 2001 From: Ashish Kumar Sinha Date: Thu, 12 Dec 2019 03:57:03 +0530 Subject: [PATCH] New Admin Info (#8497) --- cmd/admin-handlers.go | 334 ++++++++++++++++++++++++++---- cmd/admin-server-info.go | 75 ++++++- cmd/crypto/kms.go | 19 ++ cmd/crypto/vault.go | 9 + cmd/notification.go | 70 +++---- cmd/peer-rest-client-target.go | 5 + cmd/peer-rest-client.go | 2 +- cmd/peer-rest-server.go | 32 ++- pkg/event/target/amqp.go | 12 ++ pkg/event/target/elasticsearch.go | 28 ++- pkg/event/target/httpclient.go | 5 + pkg/event/target/kafka.go | 19 +- pkg/event/target/mqtt.go | 18 +- pkg/event/target/mysql.go | 23 +- pkg/event/target/nats.go | 25 ++- pkg/event/target/nsq.go | 27 ++- pkg/event/target/postgresql.go | 25 ++- pkg/event/target/redis.go | 23 +- pkg/event/target/webhook.go | 40 ++-- pkg/event/targetlist.go | 8 + pkg/event/targetlist_test.go | 4 + pkg/madmin/info-commands.go | 238 +++++++++++++-------- 22 files changed, 766 insertions(+), 275 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 8f6ef207e..01ad9d2e0 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -20,6 +20,7 @@ import ( "context" "crypto/subtle" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -36,11 +37,13 @@ import ( "github.com/gorilla/mux" "github.com/minio/minio/cmd/config" + "github.com/minio/minio/cmd/config/notify" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/cpu" + "github.com/minio/minio/pkg/event/target" "github.com/minio/minio/pkg/handlers" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" @@ -262,48 +265,7 @@ type ServerInfo struct { Data *ServerInfoData `json:"data"` } -// ServerInfoHandler - GET /minio/admin/v2/info -// ---------- -// Get server information -func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ServerInfo") - objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ListServerInfoAdminAction) - if objectAPI == nil { - return - } - - serverInfo := globalNotificationSys.ServerInfo(ctx) - // Once we have received all the ServerInfo from peers - // add the local peer server info as well. - serverInfo = append(serverInfo, ServerInfo{ - Addr: getHostName(r), - Data: &ServerInfoData{ - ConnStats: globalConnStats.toServerConnStats(), - HTTPStats: globalHTTPStats.toServerHTTPStats(), - Properties: ServerProperties{ - Uptime: UTCNow().Sub(globalBootTime), - Version: Version, - CommitID: CommitID, - DeploymentID: globalDeploymentID, - SQSARN: globalNotificationSys.GetARNList(), - Region: globalServerRegion, - }, - }, - }) - - // Marshal API response - jsonBytes, err := json.Marshal(serverInfo) - if err != nil { - writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) - return - } - - // Reply with storage information (across nodes in a - // distributed setup) as json. - writeSuccessResponseJSON(w, jsonBytes) -} - -// ServerInfoHandler - GET /minio/admin/v2/storageinfo +// StorageInfoHandler - GET /minio/admin/v2/storageinfo // ---------- // Get server information func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Request) { @@ -1303,3 +1265,291 @@ func (a adminAPIHandlers) ServerHardwareInfoHandler(w http.ResponseWriter, r *ht writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL) } } + +// ServerInfoHandler - GET /minio/admin/v2/info +// ---------- +// Get server information +func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ServerInfo") + objectAPI, _ := validateAdminReq(ctx, w, r, "") + if objectAPI == nil { + return + } + + cfg, err := readServerConfig(ctx, objectAPI) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + infoMsg := madmin.InfoMessage{} + vault := madmin.Vault{} + if GlobalKMS != nil { + vault = fetchVaultStatus(cfg) + } + + ldap := madmin.LDAP{} + if globalLDAPConfig.Enabled { + ldapConn, err := globalLDAPConfig.Connect() + if err != nil { + ldap.Status = "offline" + } else if ldapConn == nil { + ldap.Status = "Not Configured" + } else { + ldap.Status = "online" + } + // Close ldap connection to avoid leaks. + defer ldapConn.Close() + } + + log, audit := fetchLoggerInfo(cfg) + + // Get the notification target info + notifyTarget := fetchLambdaInfo(cfg) + + // Fetching the Storage information + storageInfo := objectAPI.StorageInfo(ctx) + + var OnDisks int + var OffDisks int + var backend interface{} + + if storageInfo.Backend.Type == BackendType(madmin.Erasure) { + + for _, v := range storageInfo.Backend.OnlineDisks { + OnDisks += v + } + for _, v := range storageInfo.Backend.OfflineDisks { + OffDisks += v + } + + backend = madmin.XlBackend{ + Type: madmin.ErasureType, + OnlineDisks: OnDisks, + OfflineDisks: OffDisks, + StandardSCData: storageInfo.Backend.StandardSCData, + StandardSCParity: storageInfo.Backend.StandardSCParity, + RRSCData: storageInfo.Backend.RRSCData, + RRSCParity: storageInfo.Backend.RRSCParity, + } + } else { + backend = madmin.FsBackend{ + Type: madmin.FsType, + } + } + + mode := "" + if globalSafeMode { + mode = "safe" + } else { + mode = "online" + } + + server := getLocalServerProperty(globalEndpoints, r) + servers := globalNotificationSys.ServerInfo() + servers = append(servers, server) + + for _, sp := range servers { + for i, di := range sp.Disks { + path := "" + if globalIsXL { + path = di.DrivePath + } + if globalIsDistXL { + path = sp.Endpoint + di.DrivePath + } + // For distributed + for a := range storageInfo.Backend.Sets { + for b := range storageInfo.Backend.Sets[a] { + ep := storageInfo.Backend.Sets[a][b].Endpoint + + if globalIsDistXL { + if strings.Replace(ep, "http://", "", -1) == path || strings.Replace(ep, "https://", "", -1) == path { + sp.Disks[i].State = storageInfo.Backend.Sets[a][b].State + sp.Disks[i].UUID = storageInfo.Backend.Sets[a][b].UUID + } + } + if globalIsXL { + if ep == path { + sp.Disks[i].State = storageInfo.Backend.Sets[a][b].State + sp.Disks[i].UUID = storageInfo.Backend.Sets[a][b].UUID + } + } + } + } + + } + } + + domain := globalDomainNames + buckets := madmin.Buckets{Count: 20} + objects := madmin.Objects{Count: 200} + usage := madmin.Usage{Size: 1024} + services := madmin.Services{ + Vault: vault, + LDAP: ldap, + Logger: log, + Audit: audit, + Notifications: notifyTarget, + } + + infoMsg = madmin.InfoMessage{ + Mode: mode, + Domain: domain, + Region: globalServerRegion, + SQSARN: globalNotificationSys.GetARNList(), + DeploymentID: globalDeploymentID, + Buckets: buckets, + Objects: objects, + Usage: usage, + Services: services, + Backend: backend, + Servers: servers, + } + + // Marshal API response + jsonBytes, err := json.Marshal(infoMsg) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + //Reply with storage information (across nodes in a + // distributed setup) as json. + writeSuccessResponseJSON(w, jsonBytes) +} + +func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus { + lambdaMap := make(map[string][]madmin.TargetIDStatus) + targetList, _ := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, globalRootCAs) + + for targetID, target := range targetList.TargetMap() { + targetIDStatus := make(map[string]madmin.Status) + active, _ := target.IsActive() + if active { + targetIDStatus[targetID.ID] = madmin.Status{Status: "Online"} + } else { + targetIDStatus[targetID.ID] = madmin.Status{Status: "Offline"} + } + list := lambdaMap[targetID.Name] + list = append(list, targetIDStatus) + lambdaMap[targetID.Name] = list + } + + notify := make([]map[string][]madmin.TargetIDStatus, len(lambdaMap)) + counter := 0 + for key, value := range lambdaMap { + v := make(map[string][]madmin.TargetIDStatus) + v[key] = value + notify[counter] = v + counter++ + } + return notify +} + +// fetchVaultStatus fetches Vault Info +func fetchVaultStatus(cfg config.Config) madmin.Vault { + vault := madmin.Vault{} + if GlobalKMS == nil { + vault.Status = "disabled" + return vault + } + keyID := GlobalKMS.KeyID() + kmsInfo := GlobalKMS.Info() + + if kmsInfo.Endpoint == "" { + vault.Status = "KMS configured using master key" + return vault + } + + if err := checkConnection(kmsInfo.Endpoint); err != nil { + + vault.Status = "offline" + } else { + vault.Status = "online" + + kmsContext := crypto.Context{"MinIO admin API": "KMSKeyStatusHandler"} // Context for a test key operation + // 1. Generate a new key using the KMS. + key, sealedKey, err := GlobalKMS.GenerateKey(keyID, kmsContext) + if err != nil { + vault.Encrypt = "Encryption failed" + } else { + vault.Encrypt = "Ok" + } + + // 2. Check whether we can update / re-wrap the sealed key. + sealedKey, err = GlobalKMS.UpdateKey(keyID, sealedKey, kmsContext) + if err != nil { + vault.Update = "Re-wrap failed:" + } else { + vault.Update = "Ok" + } + + // 3. Verify that we can indeed decrypt the (encrypted) key + decryptedKey, decryptErr := GlobalKMS.UnsealKey(keyID, sealedKey, kmsContext) + + // 4. Compare generated key with decrypted key + if subtle.ConstantTimeCompare(key[:], decryptedKey[:]) != 1 || decryptErr != nil { + vault.Decrypt = "Re-wrap failed:" + } else { + vault.Decrypt = "Ok" + } + } + return vault +} + +// fetchLoggerDetails return log info +func fetchLoggerInfo(cfg config.Config) ([]madmin.Logger, []madmin.Audit) { + loggerCfg, _ := logger.LookupConfig(cfg) + + var logger []madmin.Logger + var auditlogger []madmin.Audit + for log, l := range loggerCfg.HTTP { + if l.Enabled { + err := checkConnection(l.Endpoint) + if err == nil { + mapLog := make(map[string]madmin.Status) + mapLog[log] = madmin.Status{Status: "Online"} + logger = append(logger, mapLog) + } else { + mapLog := make(map[string]madmin.Status) + mapLog[log] = madmin.Status{Status: "offline"} + logger = append(logger, mapLog) + } + } + } + + for audit, l := range loggerCfg.Audit { + if l.Enabled { + err := checkConnection(l.Endpoint) + if err == nil { + mapAudit := make(map[string]madmin.Status) + mapAudit[audit] = madmin.Status{Status: "Online"} + auditlogger = append(auditlogger, mapAudit) + } else { + mapAudit := make(map[string]madmin.Status) + mapAudit[audit] = madmin.Status{Status: "Offline"} + auditlogger = append(auditlogger, mapAudit) + } + } + } + return logger, auditlogger +} + +// checkConnection - ping an endpoint , return err in case of no connection +func checkConnection(endpointStr string) error { + u, pErr := xnet.ParseURL(endpointStr) + if pErr != nil { + return pErr + } + if dErr := u.DialHTTP(); dErr != nil { + if urlErr, ok := dErr.(*url.Error); ok { + // To treat "connection refused" errors as un reachable endpoint. + if target.IsConnRefusedErr(urlErr.Err) { + return errors.New("endpoint unreachable, please check your endpoint") + } + } + return dErr + } + return nil +} diff --git a/cmd/admin-server-info.go b/cmd/admin-server-info.go index 59d041f6f..006b712ac 100644 --- a/cmd/admin-server-info.go +++ b/cmd/admin-server-info.go @@ -20,6 +20,7 @@ import ( "net" "net/http" "os" + "strings" "github.com/minio/minio-go/pkg/set" "github.com/minio/minio/pkg/cpu" @@ -115,7 +116,6 @@ func getLocalDrivesPerf(endpointZones EndpointZones, size int64, r *http.Request return madmin.ServerDrivesPerfInfo{ Addr: addr, Perf: dps, - Size: size, } } @@ -186,3 +186,76 @@ func getLocalNetworkInfo(endpointZones EndpointZones, r *http.Request) madmin.Se NetworkInfo: networkHardwares, } } + +// getLocalServerProperty - returns ServerDrivesPerfInfo for only the +// local endpoints from given list of endpoints +func getLocalServerProperty(endpointZones EndpointZones, r *http.Request) madmin.ServerProperties { + var di madmin.Disk + var disks []madmin.Disk + addr := r.Host + if globalIsDistXL { + addr = GetLocalPeer(endpointZones) + } + network := make(map[string]string) + hosts := set.NewStringSet() + for _, ep := range endpointZones { + for _, endpoint := range ep.Endpoints { + + url := strings.Replace(endpoint.URL.String(), endpoint.Path, "", -1) + if url == "" { + url = r.Host + } + hosts.Add(url) + + // Only proceed for local endpoints + if endpoint.IsLocal { + url = fetchAddress(url) + network[url] = "online" + if _, err := os.Stat(endpoint.Path); err != nil { + continue + } + + diInfo, _ := disk.GetInfo(endpoint.Path) + di.State = "ok" + di.DrivePath = endpoint.Path + di.TotalSpace = diInfo.Total + di.UsedSpace = diInfo.Total - diInfo.Free + di.Utilization = float64((diInfo.Total - diInfo.Free) / diInfo.Total * 100) + disks = append(disks, di) + } + } + } + + for host := range hosts { + _, present := network[host] + if !present { + err := checkConnection(host) + host = fetchAddress(host) + if err != nil { + network[host] = "offline" + } else { + network[host] = "online" + } + } + } + + return madmin.ServerProperties{ + State: "ok", + Endpoint: addr, + Uptime: UTCNow().Sub(globalBootTime), + Version: Version, + CommitID: CommitID, + Network: network, + Disks: disks, + } +} + +// Replaces http and https from address +func fetchAddress(address string) string { + if strings.Contains(address, "http://") { + address = strings.Replace(address, "http://", "", -1) + } else if strings.Contains(address, "https://") { + address = strings.Replace(address, "https://", "", -1) + } + return address +} diff --git a/cmd/crypto/kms.go b/cmd/crypto/kms.go index 494c583bf..cc7a097b9 100644 --- a/cmd/crypto/kms.go +++ b/cmd/crypto/kms.go @@ -102,6 +102,9 @@ type KMS interface { // keys this method may behave like a NOP and just return the sealedKey // itself. UpdateKey(keyID string, sealedKey []byte, context Context) (rotatedKey []byte, err error) + + // Returns KMSInfo + Info() (kmsInfo KMSInfo) } type masterKeyKMS struct { @@ -109,6 +112,13 @@ type masterKeyKMS struct { masterKey [32]byte } +// KMSInfo stores the details of KMS +type KMSInfo struct { + Endpoint string + Name string + AuthType string +} + // NewMasterKey returns a basic KMS implementation from a single 256 bit master key. // // The KMS accepts any keyID but binds the keyID and context cryptographically @@ -135,6 +145,15 @@ func (kms *masterKeyKMS) GenerateKey(keyID string, ctx Context) (key [32]byte, s return key, sealedKey, nil } +// KMS is configured directly using master key +func (kms *masterKeyKMS) Info() (info KMSInfo) { + return KMSInfo{ + Endpoint: "", + Name: "", + AuthType: "master-key", + } +} + func (kms *masterKeyKMS) UnsealKey(keyID string, sealedKey []byte, ctx Context) (key [32]byte, err error) { var ( buffer bytes.Buffer diff --git a/cmd/crypto/vault.go b/cmd/crypto/vault.go index 10bff73c5..201beaee6 100644 --- a/cmd/crypto/vault.go +++ b/cmd/crypto/vault.go @@ -195,6 +195,15 @@ func (v *vaultService) KeyID() string { return v.config.Key.Name } +// Returns - vault info +func (v *vaultService) Info() (kmsInfo KMSInfo) { + return KMSInfo{ + Endpoint: v.config.Endpoint, + Name: v.config.Key.Name, + AuthType: v.config.Auth.Type, + } +} + // GenerateKey returns a new plaintext key, generated by the KMS, // and a sealed version of this plaintext key encrypted using the // named key referenced by keyID. It also binds the generated key diff --git a/cmd/notification.go b/cmd/notification.go index b1f4e2bb9..8efc90c63 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -423,51 +423,6 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE return ng.Wait() } -// ServerInfo - calls ServerInfo RPC call on all peers. -func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { - serverInfo := make([]ServerInfo, len(sys.peerClients)) - - g := errgroup.WithNErrs(len(sys.peerClients)) - for index, client := range sys.peerClients { - if client == nil { - continue - } - index := index - g.Go(func() error { - // Try to fetch serverInfo remotely in three attempts. - for i := 0; i < 3; i++ { - serverInfo[index] = ServerInfo{ - Addr: sys.peerClients[index].host.String(), - } - info, err := sys.peerClients[index].ServerInfo() - if err != nil { - serverInfo[index].Error = err.Error() - } else { - serverInfo[index].Data = &info - } - // Last iteration log the error. - if i == 2 { - return err - } - // Wait for one second and no need wait after last attempt. - if i < 2 { - time.Sleep(1 * time.Second) - } - } - return nil - }, index) - } - for index, err := range g.Wait() { - if err != nil { - addr := sys.peerClients[index].host.String() - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, err) - } - } - return serverInfo -} - // GetLocks - makes GetLocks RPC call on all peers. func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { locksResp := make([]*PeerLocks, len(sys.peerClients)) @@ -1213,6 +1168,31 @@ func (sys *NotificationSys) NetworkInfo() []madmin.ServerNetworkHardwareInfo { return reply } +// ServerInfo - calls ServerInfo RPC call on all peers. +func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties { + reply := make([]madmin.ServerProperties, len(sys.peerClients)) + var wg sync.WaitGroup + for i, client := range sys.peerClients { + if client == nil { + continue + } + wg.Add(1) + go func(client *peerRESTClient, idx int) { + defer wg.Done() + info, err := client.ServerInfo() + if err != nil { + info.Endpoint = client.host.String() + info.State = "offline" + } else { + info.State = "ok" + } + reply[idx] = info + }(client, i) + } + wg.Wait() + return reply +} + // NewNotificationSys - creates new notification system object. func NewNotificationSys(endpoints EndpointZones) *NotificationSys { // bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init() diff --git a/cmd/peer-rest-client-target.go b/cmd/peer-rest-client-target.go index 0dc9209d2..7db0827ae 100644 --- a/cmd/peer-rest-client-target.go +++ b/cmd/peer-rest-client-target.go @@ -31,6 +31,11 @@ func (target *PeerRESTClientTarget) ID() event.TargetID { return target.id } +// IsActive - does nothing and available for interface compatibility. +func (target *PeerRESTClientTarget) IsActive() (bool, error) { + return true, nil +} + // Save - Sends event directly without persisting. func (target *PeerRESTClientTarget) Save(eventData event.Event) error { return target.send(eventData) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index e5719e996..0bf271d92 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -145,7 +145,7 @@ func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) { } // ServerInfo - fetch server information for a remote node. -func (client *peerRESTClient) ServerInfo() (info ServerInfoData, err error) { +func (client *peerRESTClient) ServerInfo() (info madmin.ServerProperties, err error) { respBody, err := client.call(peerRESTMethodServerInfo, nil, nil, -1) if err != nil { return diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 06dcf1223..5addb0e60 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -374,24 +374,6 @@ func (s *peerRESTServer) StartProfilingHandler(w http.ResponseWriter, r *http.Re w.(http.Flusher).Flush() } -// ServerInfoHandler - returns server info. -func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("Invalid request")) - return - } - - ctx := newContext(r, w, "ServerInfo") - info, err := getServerInfo() - if err != nil { - s.writeErrorResponse(w, err) - return - } - - defer w.(http.Flusher).Flush() - logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) -} - // DownloadProflingDataHandler - returns proflied data. func (s *peerRESTServer) DownloadProflingDataHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -452,6 +434,20 @@ func (s *peerRESTServer) NetworkInfoHandler(w http.ResponseWriter, r *http.Reque logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) } +// ServerInfoHandler - returns Server Info +func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + ctx := newContext(r, w, "ServerInfo") + info := getLocalServerProperty(globalEndpoints, r) + + defer w.(http.Flusher).Flush() + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) +} + // DrivePerfInfoHandler - returns Drive Performance info. func (s *peerRESTServer) DrivePerfInfoHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index c8e797214..d48e7c20b 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -121,6 +121,18 @@ func (target *AMQPTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *AMQPTarget) IsActive() (bool, error) { + ch, err := target.channel() + if err != nil { + return false, err + } + defer func() { + ch.Close() + }() + return true, nil +} + func (target *AMQPTarget) channel() (*amqp.Channel, error) { var err error var conn *amqp.Connection diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 706dcc040..0fb0139d3 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -93,16 +93,25 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *ElasticsearchTarget) IsActive() (bool, error) { + if dErr := target.args.URL.DialHTTP(); dErr != nil { + if xnet.IsNetworkOrHostDown(dErr) { + return false, errNotConnected + } + return false, dErr + } + return true, nil +} + // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - if dErr := target.args.URL.DialHTTP(); dErr != nil { - if xnet.IsNetworkOrHostDown(dErr) { - return errNotConnected - } - return dErr + _, err := target.IsActive() + if err != nil { + return err } return target.send(eventData) } @@ -167,12 +176,9 @@ func (target *ElasticsearchTarget) Send(eventKey string) error { return err } } - - if dErr := target.args.URL.DialHTTP(); dErr != nil { - if xnet.IsNetworkOrHostDown(dErr) { - return errNotConnected - } - return dErr + _, err = target.IsActive() + if err != nil { + return err } eventData, eErr := target.store.Get(eventKey) diff --git a/pkg/event/target/httpclient.go b/pkg/event/target/httpclient.go index 68420ff01..722ab868b 100644 --- a/pkg/event/target/httpclient.go +++ b/pkg/event/target/httpclient.go @@ -44,6 +44,11 @@ func (target HTTPClientTarget) ID() event.TargetID { return target.id } +// IsActive - does nothing and available for interface compatibility. +func (target *HTTPClientTarget) IsActive() (bool, error) { + return true, nil +} + func (target *HTTPClientTarget) start() { go func() { defer func() { diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 8afa8132e..3d867b082 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -124,13 +124,22 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *KafkaTarget) IsActive() (bool, error) { + if !target.args.pingBrokers() { + return false, errNotConnected + } + return true, nil +} + // Save - saves the events to the store which will be replayed when the Kafka connection is active. func (target *KafkaTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - if !target.args.pingBrokers() { - return errNotConnected + _, err := target.IsActive() + if err != nil { + return err } return target.send(eventData) } @@ -162,9 +171,9 @@ func (target *KafkaTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to Kafka. func (target *KafkaTarget) Send(eventKey string) error { var err error - - if !target.args.pingBrokers() { - return errNotConnected + _, err = target.IsActive() + if err != nil { + return err } if target.producer == nil { diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 0ad92cab7..671388ea6 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -121,6 +121,14 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *MQTTTarget) IsActive() (bool, error) { + if !target.client.IsConnectionOpen() { + return false, errNotConnected + } + return true, nil +} + // send - sends an event to the mqtt. func (target *MQTTTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) @@ -144,8 +152,9 @@ func (target *MQTTTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to MQTT. func (target *MQTTTarget) Send(eventKey string) error { // Do not send if the connection is not active. - if !target.client.IsConnectionOpen() { - return errNotConnected + _, err := target.IsActive() + if err != nil { + return err } eventData, err := target.store.Get(eventKey) @@ -174,8 +183,9 @@ func (target *MQTTTarget) Save(eventData event.Event) error { } // Do not send if the connection is not active. - if !target.client.IsConnectionOpen() { - return errNotConnected + _, err := target.IsActive() + if err != nil { + return err } return target.send(eventData) diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index 1a216bda0..1c57354da 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -185,15 +185,24 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *MySQLTarget) IsActive() (bool, error) { + if err := target.db.Ping(); err != nil { + if IsConnErr(err) { + return false, errNotConnected + } + return false, err + } + return true, nil +} + // Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - if err := target.db.Ping(); err != nil { - if IsConnErr(err) { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { return err } return target.send(eventData) @@ -244,10 +253,8 @@ func (target *MySQLTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to MySQL. func (target *MySQLTarget) Send(eventKey string) error { - if err := target.db.Ping(); err != nil { - if IsConnErr(err) { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { return err } diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index c840b49af..723960dd5 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -197,19 +197,28 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *NATSTarget) IsActive() (bool, error) { + if target.args.Streaming.Enable { + if !target.stanConn.NatsConn().IsConnected() { + return false, errNotConnected + } + } else { + if !target.natsConn.IsConnected() { + return false, errNotConnected + } + } + return true, nil +} + // Save - saves the events to the store which will be replayed when the Nats connection is active. func (target *NATSTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - if target.args.Streaming.Enable { - if !target.stanConn.NatsConn().IsConnected() { - return errNotConnected - } - } else { - if !target.natsConn.IsConnected() { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { + return err } return target.send(eventData) } diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index 55718bca9..fa729e5e8 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -100,16 +100,25 @@ func (target *NSQTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *NSQTarget) IsActive() (bool, error) { + if err := target.producer.Ping(); err != nil { + // To treat "connection refused" errors as errNotConnected. + if IsConnRefusedErr(err) { + return false, errNotConnected + } + return false, err + } + return true, nil +} + // Save - saves the events to the store which will be replayed when the nsq connection is active. func (target *NSQTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - if err := target.producer.Ping(); err != nil { - // To treat "connection refused" errors as errNotConnected. - if IsConnRefusedErr(err) { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { return err } return target.send(eventData) @@ -133,12 +142,8 @@ func (target *NSQTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to NSQ. func (target *NSQTarget) Send(eventKey string) error { - - if err := target.producer.Ping(); err != nil { - // To treat "connection refused" errors as errNotConnected. - if IsConnRefusedErr(err) { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { return err } diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index c9a5d80a5..b19aad74b 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -183,15 +183,24 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *PostgreSQLTarget) IsActive() (bool, error) { + if err := target.db.Ping(); err != nil { + if IsConnErr(err) { + return false, errNotConnected + } + return false, err + } + return true, nil +} + // Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. func (target *PostgreSQLTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - if err := target.db.Ping(); err != nil { - if IsConnErr(err) { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { return err } return target.send(eventData) @@ -245,14 +254,10 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to PostgreSQL. func (target *PostgreSQLTarget) Send(eventKey string) error { - - if err := target.db.Ping(); err != nil { - if IsConnErr(err) { - return errNotConnected - } + _, err := target.IsActive() + if err != nil { return err } - if !target.firstPing { if err := target.executeStmts(); err != nil { if IsConnErr(err) { diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index d0e8e10d0..3a1b321ff 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -125,11 +125,8 @@ func (target *RedisTarget) ID() event.TargetID { return target.id } -// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. -func (target *RedisTarget) Save(eventData event.Event) error { - if target.store != nil { - return target.store.Put(eventData) - } +// IsActive - Return true if target is up and active +func (target *RedisTarget) IsActive() (bool, error) { conn := target.pool.Get() defer func() { cErr := conn.Close() @@ -138,9 +135,21 @@ func (target *RedisTarget) Save(eventData event.Event) error { _, pingErr := conn.Do("PING") if pingErr != nil { if IsConnRefusedErr(pingErr) { - return errNotConnected + return false, errNotConnected } - return pingErr + return false, pingErr + } + return true, nil +} + +// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active. +func (target *RedisTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + _, err := target.IsActive() + if err != nil { + return err } return target.send(eventData) } diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 49cb47ba4..292799957 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -93,20 +93,29 @@ func (target WebhookTarget) ID() event.TargetID { return target.id } +// IsActive - Return true if target is up and active +func (target *WebhookTarget) IsActive() (bool, error) { + u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String()) + if pErr != nil { + return false, pErr + } + if dErr := u.DialHTTP(); dErr != nil { + if xnet.IsNetworkOrHostDown(dErr) { + return false, errNotConnected + } + return false, dErr + } + return true, nil +} + // Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active. func (target *WebhookTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String()) - if pErr != nil { - return pErr - } - if dErr := u.DialHTTP(); dErr != nil { - if xnet.IsNetworkOrHostDown(dErr) { - return errNotConnected - } - return dErr + _, err := target.IsActive() + if err != nil { + return err } return target.send(eventData) } @@ -153,17 +162,10 @@ func (target *WebhookTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to webhook. func (target *WebhookTarget) Send(eventKey string) error { - u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String()) - if pErr != nil { - return pErr + _, err := target.IsActive() + if err != nil { + return err } - if dErr := u.DialHTTP(); dErr != nil { - if xnet.IsNetworkOrHostDown(dErr) { - return errNotConnected - } - return dErr - } - eventData, eErr := target.store.Get(eventKey) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() diff --git a/pkg/event/targetlist.go b/pkg/event/targetlist.go index 2912ba199..aea9d10a8 100644 --- a/pkg/event/targetlist.go +++ b/pkg/event/targetlist.go @@ -24,6 +24,7 @@ import ( // Target - event target interface type Target interface { ID() TargetID + IsActive() (bool, error) Save(Event) error Send(string) error Close() error @@ -130,6 +131,13 @@ func (list *TargetList) List() []TargetID { return keys } +// TargetMap - returns available targets. +func (list *TargetList) TargetMap() map[TargetID]Target { + list.RLock() + defer list.RUnlock() + return list.targets +} + // Send - sends events to targets identified by target IDs. func (list *TargetList) Send(event Event, targetIDs ...TargetID) <-chan TargetIDErr { errCh := make(chan TargetIDErr) diff --git a/pkg/event/targetlist_test.go b/pkg/event/targetlist_test.go index 149973cf1..aa3815db3 100644 --- a/pkg/event/targetlist_test.go +++ b/pkg/event/targetlist_test.go @@ -67,6 +67,10 @@ func (target ExampleTarget) Close() error { return nil } +func (target ExampleTarget) IsActive() (bool, error) { + return false, errors.New("not connected to target server/service") +} + func TestTargetListAdd(t *testing.T) { targetListCase1 := NewTargetList() diff --git a/pkg/madmin/info-commands.go b/pkg/madmin/info-commands.go index a5111d83a..c7a91df9d 100644 --- a/pkg/madmin/info-commands.go +++ b/pkg/madmin/info-commands.go @@ -111,91 +111,6 @@ func (d1 BackendDisks) Merge(d2 BackendDisks) BackendDisks { return d2 } -// ServerProperties holds some of the server's information such as uptime, -// version, region, .. -type ServerProperties struct { - Uptime time.Duration `json:"uptime"` - Version string `json:"version"` - CommitID string `json:"commitID"` - DeploymentID string `json:"deploymentID"` - Region string `json:"region"` - SQSARN []string `json:"sqsARN"` -} - -// ServerConnStats holds network information -type ServerConnStats struct { - TotalInputBytes uint64 `json:"transferred"` - TotalOutputBytes uint64 `json:"received"` -} - -// ServerHTTPMethodStats holds total number of HTTP operations from/to the server, -// including the average duration the call was spent. -type ServerHTTPMethodStats struct { - Count uint64 `json:"count"` - AvgDuration string `json:"avgDuration"` -} - -// ServerHTTPStats holds all type of http operations performed to/from the server -// including their average execution time. -type ServerHTTPStats struct { - TotalHEADStats ServerHTTPMethodStats `json:"totalHEADs"` - SuccessHEADStats ServerHTTPMethodStats `json:"successHEADs"` - TotalGETStats ServerHTTPMethodStats `json:"totalGETs"` - SuccessGETStats ServerHTTPMethodStats `json:"successGETs"` - TotalPUTStats ServerHTTPMethodStats `json:"totalPUTs"` - SuccessPUTStats ServerHTTPMethodStats `json:"successPUTs"` - TotalPOSTStats ServerHTTPMethodStats `json:"totalPOSTs"` - SuccessPOSTStats ServerHTTPMethodStats `json:"successPOSTs"` - TotalDELETEStats ServerHTTPMethodStats `json:"totalDELETEs"` - SuccessDELETEStats ServerHTTPMethodStats `json:"successDELETEs"` -} - -// ServerInfoData holds storage, connections and other -// information of a given server -type ServerInfoData struct { - StorageInfo StorageInfo `json:"storage"` - ConnStats ServerConnStats `json:"network"` - HTTPStats ServerHTTPStats `json:"http"` - Properties ServerProperties `json:"server"` -} - -// ServerInfo holds server information result of one node -type ServerInfo struct { - Error string `json:"error"` - Addr string `json:"addr"` - Data *ServerInfoData `json:"data"` -} - -// ServerInfo - Connect to a minio server and call Server Info Management API -// to fetch server's information represented by ServerInfo structure -func (adm *AdminClient) ServerInfo() ([]ServerInfo, error) { - resp, err := adm.executeMethod("GET", requestData{relPath: adminAPIPrefix + "/info"}) - defer closeResponse(resp) - if err != nil { - return nil, err - } - - // Check response http status code - if resp.StatusCode != http.StatusOK { - return nil, httpRespToErrorResponse(resp) - } - - // Unmarshal the server's json response - var serversInfo []ServerInfo - - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - err = json.Unmarshal(respBytes, &serversInfo) - if err != nil { - return nil, err - } - - return serversInfo, nil -} - // StorageInfo - Connect to a minio server and call Storage Info Management API // to fetch server's information represented by StorageInfo structure func (adm *AdminClient) StorageInfo() (StorageInfo, error) { @@ -409,3 +324,156 @@ func (adm *AdminClient) NetPerfInfo(size int) (map[string][]NetPerfInfo, error) return info, nil } + +// InfoMessage container to hold server admin related information. +type InfoMessage struct { + Mode string `json:"mode"` + Domain []string `json:"domain,omitempty"` + Region string `json:"region,omitempty"` + SQSARN []string `json:"sqsARN,omitempty"` + DeploymentID string `json:"deploymentID"` + Buckets Buckets `json:"buckets"` + Objects Objects `json:"objects"` + Usage Usage `json:"usage"` + Services Services `json:"services"` + Backend interface{} `json:"backend"` + Servers []ServerProperties `json:"servers"` +} + +// Services contains different services information +type Services struct { + Vault Vault `json:"vault"` + LDAP LDAP `json:"ldap"` + Logger []Logger `json:"logger,omitempty"` + Audit []Audit `json:"audit,omitempty"` + Notifications []map[string][]TargetIDStatus `json:"notifications"` +} + +// Buckets contains the number of buckets +type Buckets struct { + Count int `json:"count"` +} + +// Objects contains the number of objects +type Objects struct { + Count int `json:"count"` +} + +// Usage contains the tottal size used +type Usage struct { + Size uint64 `json:"size"` +} + +// Vault - Fetches the Vault status +type Vault struct { + Status string `json:"status,omitempty"` + Encrypt string `json:"encryp,omitempty"` + Decrypt string `json:"decrypt,omitempty"` + Update string `json:"update,omitempty"` +} + +// LDAP contains ldap status +type LDAP struct { + Status string `json:"status,omitempty"` +} + +// Status of endpoint +type Status struct { + Status string `json:"status"` +} + +// Audit contains audit logger status +type Audit map[string]Status + +// Logger contains logger status +type Logger map[string]Status + +// TargetIDStatus containsid and status +type TargetIDStatus map[string]Status + +// backendType - indicates the type of backend storage +type backendType string + +const ( + // FsType - Backend is FS Type + FsType = backendType("FS") + // ErasureType - Backend is Erasure type + ErasureType = backendType("Erasure") +) + +// FsBackend contains specific FS storage information +type FsBackend struct { + Type backendType `json:"backendType"` +} + +// XlBackend contains specific erasure storage information +type XlBackend struct { + Type backendType `json:"backendType"` + OnlineDisks int `json:"onlineDisks"` + OfflineDisks int `json:"offlineDisks"` + // Data disks for currently configured Standard storage class. + StandardSCData int `json:"standardSCData"` + // Parity disks for currently configured Standard storage class. + StandardSCParity int `json:"standardSCParity"` + // Data disks for currently configured Reduced Redundancy storage class. + RRSCData int `json:"rrSCData"` + // Parity disks for currently configured Reduced Redundancy storage class. + RRSCParity int `json:"rrSCParity"` +} + +// ServerProperties holds server information +type ServerProperties struct { + State string `json:"state"` + Endpoint string `json:"endpoint"` + Uptime time.Duration `json:"uptime"` + Version string `json:"version"` + CommitID string `json:"commitID"` + Network map[string]string `json:"network"` + Disks []Disk `json:"disks"` +} + +// Disk holds Disk information +type Disk struct { + DrivePath string `json:"path"` + State string `json:"state"` + UUID string `json:"uuid,omitempty"` + Model string `json:"model,omitempty"` + TotalSpace uint64 `json:"totalspace"` + UsedSpace uint64 `json:"usedspace"` + ReadThroughput float64 `json:"readthroughput,omitempty"` + WriteThroughPut float64 `json:"writethroughput,omitempty"` + ReadLatency float64 `json:"readlatency,omitempty"` + WriteLatency float64 `json:"writelatency,omitempty"` + Utilization float64 `json:"utilization,omitempty"` +} + +// ServerInfo - Connect to a minio server and call Server Admin Info Management API +// to fetch server's information represented by infoMessage structure +func (adm *AdminClient) ServerInfo() (InfoMessage, error) { + + resp, err := adm.executeMethod("GET", requestData{relPath: adminAPIPrefix + "/info"}) + defer closeResponse(resp) + if err != nil { + return InfoMessage{}, err + } + + // Check response http status code + if resp.StatusCode != http.StatusOK { + return InfoMessage{}, httpRespToErrorResponse(resp) + } + + // Unmarshal the server's json response + var message InfoMessage + + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return InfoMessage{}, err + } + + err = json.Unmarshal(respBytes, &message) + if err != nil { + return InfoMessage{}, err + } + + return message, nil +}