mirror of
https://github.com/minio/minio.git
synced 2025-11-08 21:24:55 -05:00
fix: re-implement cluster healthcheck (#10101)
This commit is contained in:
@@ -1633,9 +1633,10 @@ func (s *erasureSets) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||
return &Metrics{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// IsReady - Returns true if atleast n/2 disks (read quorum) are online
|
||||
func (s *erasureSets) IsReady(_ context.Context) bool {
|
||||
return false
|
||||
// Health shouldn't be called directly - will panic
|
||||
func (s *erasureSets) Health(ctx context.Context, _ HealthOptions) HealthResult {
|
||||
logger.CriticalIf(ctx, NotImplemented{})
|
||||
return HealthResult{}
|
||||
}
|
||||
|
||||
// maintainMRFList gathers the list of successful partial uploads
|
||||
|
||||
@@ -2007,29 +2007,49 @@ func (z *erasureZones) getZoneAndSet(id string) (int, int, error) {
|
||||
return 0, 0, fmt.Errorf("DiskID(%s) %w", id, errDiskNotFound)
|
||||
}
|
||||
|
||||
// IsReady - Returns true, when all the erasure sets are writable.
|
||||
func (z *erasureZones) IsReady(ctx context.Context) bool {
|
||||
// HealthOptions takes input options to return sepcific information
|
||||
type HealthOptions struct {
|
||||
Maintenance bool
|
||||
}
|
||||
|
||||
// HealthResult returns the current state of the system, also
|
||||
// additionally with any specific heuristic information which
|
||||
// was queried
|
||||
type HealthResult struct {
|
||||
Healthy bool
|
||||
ZoneID, SetID int
|
||||
WriteQuorum int
|
||||
}
|
||||
|
||||
// Health - returns current status of the object layer health,
|
||||
// provides if write access exists across sets, additionally
|
||||
// can be used to query scenarios if health may be lost
|
||||
// if this node is taken down by an external orchestrator.
|
||||
func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthResult {
|
||||
erasureSetUpCount := make([][]int, len(z.zones))
|
||||
for i := range z.zones {
|
||||
erasureSetUpCount[i] = make([]int, len(z.zones[i].sets))
|
||||
}
|
||||
|
||||
diskIDs := globalNotificationSys.GetLocalDiskIDs(ctx)
|
||||
if !opts.Maintenance {
|
||||
diskIDs = append(diskIDs, getLocalDiskIDs(z))
|
||||
}
|
||||
|
||||
diskIDs = append(diskIDs, getLocalDiskIDs(z)...)
|
||||
|
||||
for _, id := range diskIDs {
|
||||
zoneIdx, setIdx, err := z.getZoneAndSet(id)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
for _, localDiskIDs := range diskIDs {
|
||||
for _, id := range localDiskIDs {
|
||||
zoneIdx, setIdx, err := z.getZoneAndSet(id)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
erasureSetUpCount[zoneIdx][setIdx]++
|
||||
}
|
||||
erasureSetUpCount[zoneIdx][setIdx]++
|
||||
}
|
||||
|
||||
for zoneIdx := range erasureSetUpCount {
|
||||
parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
diskCount := len(z.zones[zoneIdx].format.Erasure.Sets[0])
|
||||
diskCount := z.zones[zoneIdx].drivesPerSet
|
||||
if parityDrives == 0 {
|
||||
parityDrives = getDefaultParityBlocks(diskCount)
|
||||
}
|
||||
@@ -2042,11 +2062,18 @@ func (z *erasureZones) IsReady(ctx context.Context) bool {
|
||||
if erasureSetUpCount[zoneIdx][setIdx] < writeQuorum {
|
||||
logger.LogIf(ctx, fmt.Errorf("Write quorum lost on zone: %d, set: %d, expected write quorum: %d",
|
||||
zoneIdx, setIdx, writeQuorum))
|
||||
return false
|
||||
return HealthResult{
|
||||
Healthy: false,
|
||||
ZoneID: zoneIdx,
|
||||
SetID: setIdx,
|
||||
WriteQuorum: writeQuorum,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
return HealthResult{
|
||||
Healthy: true,
|
||||
}
|
||||
}
|
||||
|
||||
// PutObjectTags - replace or add tags to an existing object
|
||||
|
||||
@@ -391,8 +391,8 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsReady - shouldn't be called will panic.
|
||||
func (er erasureObjects) IsReady(ctx context.Context) bool {
|
||||
// Health shouldn't be called directly - will panic
|
||||
func (er erasureObjects) Health(ctx context.Context, _ HealthOptions) HealthResult {
|
||||
logger.CriticalIf(ctx, NotImplemented{})
|
||||
return true
|
||||
return HealthResult{}
|
||||
}
|
||||
|
||||
11
cmd/fs-v1.go
11
cmd/fs-v1.go
@@ -1557,11 +1557,12 @@ func (fs *FSObjects) IsTaggingSupported() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// IsReady - Check if the backend disk is ready to accept traffic.
|
||||
func (fs *FSObjects) IsReady(_ context.Context) bool {
|
||||
// Health returns health of the object layer
|
||||
func (fs *FSObjects) Health(ctx context.Context, opts HealthOptions) HealthResult {
|
||||
if _, err := os.Stat(fs.fsPath); err != nil {
|
||||
return false
|
||||
return HealthResult{}
|
||||
}
|
||||
return HealthResult{
|
||||
Healthy: newObjectLayerFn() != nil,
|
||||
}
|
||||
|
||||
return newObjectLayerFn() != nil
|
||||
}
|
||||
|
||||
@@ -250,7 +250,7 @@ func (a GatewayUnsupported) IsCompressionSupported() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// IsReady - No Op.
|
||||
func (a GatewayUnsupported) IsReady(_ context.Context) bool {
|
||||
return false
|
||||
// Health - No Op.
|
||||
func (a GatewayUnsupported) Health(_ context.Context, _ HealthOptions) HealthResult {
|
||||
return HealthResult{}
|
||||
}
|
||||
|
||||
@@ -1436,8 +1436,3 @@ func (a *azureObjects) DeleteBucketPolicy(ctx context.Context, bucket string) er
|
||||
func (a *azureObjects) IsCompressionSupported() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// IsReady returns whether the layer is ready to take requests.
|
||||
func (a *azureObjects) IsReady(ctx context.Context) bool {
|
||||
return minio.IsBackendOnline(ctx, a.httpClient, a.endpoint)
|
||||
}
|
||||
|
||||
@@ -1508,8 +1508,3 @@ func (l *gcsGateway) DeleteBucketPolicy(ctx context.Context, bucket string) erro
|
||||
func (l *gcsGateway) IsCompressionSupported() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// IsReady returns whether the layer is ready to take requests.
|
||||
func (l *gcsGateway) IsReady(ctx context.Context) bool {
|
||||
return minio.IsBackendOnline(ctx, l.httpClient, "https://storage.googleapis.com")
|
||||
}
|
||||
|
||||
@@ -786,9 +786,3 @@ func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object,
|
||||
}
|
||||
return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)), bucket, object, uploadID)
|
||||
}
|
||||
|
||||
// IsReady returns whether the layer is ready to take requests.
|
||||
func (n *hdfsObjects) IsReady(ctx context.Context) bool {
|
||||
si, _ := n.StorageInfo(ctx, false)
|
||||
return si.Backend.GatewayOnline
|
||||
}
|
||||
|
||||
@@ -121,12 +121,6 @@ type nasObjects struct {
|
||||
minio.ObjectLayer
|
||||
}
|
||||
|
||||
// IsReady returns whether the layer is ready to take requests.
|
||||
func (n *nasObjects) IsReady(ctx context.Context) bool {
|
||||
si, _ := n.StorageInfo(ctx, false)
|
||||
return si.Backend.GatewayOnline
|
||||
}
|
||||
|
||||
func (n *nasObjects) IsTaggingSupported() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -755,11 +755,6 @@ func (l *s3Objects) IsEncryptionSupported() bool {
|
||||
return minio.GlobalKMS != nil || len(minio.GlobalGatewaySSE) > 0
|
||||
}
|
||||
|
||||
// IsReady returns whether the layer is ready to take requests.
|
||||
func (l *s3Objects) IsReady(ctx context.Context) bool {
|
||||
return minio.IsBackendOnline(ctx, l.HTTPClient, l.Client.EndpointURL().String())
|
||||
}
|
||||
|
||||
func (l *s3Objects) IsTaggingSupported() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -35,8 +35,17 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(ctx, globalAPIConfig.getReadyDeadline())
|
||||
defer cancel()
|
||||
|
||||
if !objLayer.IsReady(ctx) {
|
||||
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
|
||||
opts := HealthOptions{Maintenance: r.URL.Query().Get("maintenance") == "true"}
|
||||
result := objLayer.Health(ctx, opts)
|
||||
if !result.Healthy {
|
||||
// As a maintenance call we are purposefully asked to be taken
|
||||
// down, this is for orchestrators to know if we can safely
|
||||
// take this server down, return appropriate error.
|
||||
if opts.Maintenance {
|
||||
writeResponse(w, http.StatusPreconditionFailed, nil, mimeNone)
|
||||
} else {
|
||||
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1164,26 +1164,21 @@ func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties {
|
||||
}
|
||||
|
||||
// GetLocalDiskIDs - return disk ids of the local disks of the peers.
|
||||
func (sys *NotificationSys) GetLocalDiskIDs(ctx context.Context) []string {
|
||||
var diskIDs []string
|
||||
var mu sync.Mutex
|
||||
|
||||
func (sys *NotificationSys) GetLocalDiskIDs(ctx context.Context) (localDiskIDs [][]string) {
|
||||
localDiskIDs = make([][]string, len(sys.peerClients))
|
||||
var wg sync.WaitGroup
|
||||
for _, client := range sys.peerClients {
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(client *peerRESTClient) {
|
||||
go func(idx int, client *peerRESTClient) {
|
||||
defer wg.Done()
|
||||
ids := client.GetLocalDiskIDs(ctx)
|
||||
mu.Lock()
|
||||
diskIDs = append(diskIDs, ids...)
|
||||
mu.Unlock()
|
||||
}(client)
|
||||
localDiskIDs[idx] = client.GetLocalDiskIDs(ctx)
|
||||
}(idx, client)
|
||||
}
|
||||
wg.Wait()
|
||||
return diskIDs
|
||||
return localDiskIDs
|
||||
}
|
||||
|
||||
// NewNotificationSys - creates new notification system object.
|
||||
|
||||
@@ -133,8 +133,8 @@ type ObjectLayer interface {
|
||||
// Backend related metrics
|
||||
GetMetrics(ctx context.Context) (*Metrics, error)
|
||||
|
||||
// Check Readiness
|
||||
IsReady(ctx context.Context) bool
|
||||
// Returns health of the backend
|
||||
Health(ctx context.Context, opts HealthOptions) HealthResult
|
||||
|
||||
// ObjectTagging operations
|
||||
PutObjectTags(context.Context, string, string, string, ObjectOptions) error
|
||||
|
||||
Reference in New Issue
Block a user