mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
fix: relax metadata checks for healing (#19165)
we should do this to ensure that we focus on data healing as primary focus, fixing metadata as part of healing must be done but making data available is the main focus. the main reason is metadata inconsistencies can cause data availability issues, which must be avoided at all cost. will be bringing in an additional healing mechanism that involves "metadata-only" heal, for now we do not expect to have these checks. continuation of #19154 Bonus: add a pro-active healthcheck to perform a connection
This commit is contained in:
parent
ef06644799
commit
c599c11e70
@ -27,6 +27,7 @@ import (
|
||||
|
||||
"github.com/minio/minio/internal/amztime"
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/minio/minio/internal/crypto"
|
||||
"github.com/minio/minio/internal/hash/sha256"
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
@ -300,17 +301,23 @@ func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.
|
||||
fmt.Fprintf(h, "%v", meta.Erasure.Distribution)
|
||||
}
|
||||
|
||||
if meta.IsRemote() {
|
||||
// ILM transition fields
|
||||
fmt.Fprint(h, meta.TransitionStatus)
|
||||
fmt.Fprint(h, meta.TransitionTier)
|
||||
fmt.Fprint(h, meta.TransitionedObjName)
|
||||
fmt.Fprint(h, meta.TransitionVersionID)
|
||||
}
|
||||
|
||||
// Server-side replication fields
|
||||
fmt.Fprintf(h, "%v", meta.MarkDeleted)
|
||||
fmt.Fprint(h, meta.Metadata[string(meta.ReplicationState.ReplicaStatus)])
|
||||
fmt.Fprint(h, meta.Metadata[meta.ReplicationState.ReplicationStatusInternal])
|
||||
fmt.Fprint(h, meta.Metadata[meta.ReplicationState.VersionPurgeStatusInternal])
|
||||
// If metadata says encrypted, ask for it in quorum.
|
||||
if etyp, ok := crypto.IsEncrypted(meta.Metadata); ok {
|
||||
fmt.Fprint(h, etyp)
|
||||
}
|
||||
|
||||
// If compressed, look for compressed FileInfo only
|
||||
if meta.IsCompressed() {
|
||||
fmt.Fprint(h, meta.Metadata[ReservedMetadataPrefix+"compression"])
|
||||
}
|
||||
|
||||
metaHashes[i] = hex.EncodeToString(h.Sum(nil))
|
||||
h.Reset()
|
||||
|
@ -19,6 +19,8 @@ package cmd
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/crypto"
|
||||
)
|
||||
|
||||
//go:generate msgp -file=$GOFILE
|
||||
@ -281,10 +283,15 @@ func (fi FileInfo) ReadQuorum(dquorum int) int {
|
||||
|
||||
// Equals checks if fi(FileInfo) matches ofi(FileInfo)
|
||||
func (fi FileInfo) Equals(ofi FileInfo) (ok bool) {
|
||||
if !fi.MetadataEquals(ofi) {
|
||||
typ1, ok1 := crypto.IsEncrypted(fi.Metadata)
|
||||
typ2, ok2 := crypto.IsEncrypted(ofi.Metadata)
|
||||
if ok1 != ok2 {
|
||||
return false
|
||||
}
|
||||
if !fi.ReplicationInfoEquals(ofi) {
|
||||
if typ1 != typ2 {
|
||||
return false
|
||||
}
|
||||
if fi.IsCompressed() != ofi.IsCompressed() {
|
||||
return false
|
||||
}
|
||||
if !fi.TransitionInfoEquals(ofi) {
|
||||
@ -311,6 +318,12 @@ func (fi FileInfo) GetDataDir() string {
|
||||
return fi.DataDir
|
||||
}
|
||||
|
||||
// IsCompressed returns true if the object is marked as compressed.
|
||||
func (fi FileInfo) IsCompressed() bool {
|
||||
_, ok := fi.Metadata[ReservedMetadataPrefix+"compression"]
|
||||
return ok
|
||||
}
|
||||
|
||||
// InlineData returns true if object contents are inlined alongside its metadata.
|
||||
func (fi FileInfo) InlineData() bool {
|
||||
_, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]
|
||||
|
@ -388,7 +388,7 @@ func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
|
||||
|
||||
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
|
||||
// except custom DialContext and TLSClientConfig.
|
||||
return &Client{
|
||||
clnt := &Client{
|
||||
httpClient: &http.Client{Transport: tr},
|
||||
url: u,
|
||||
lastErr: err,
|
||||
@ -400,6 +400,11 @@ func NewClient(uu *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
|
||||
HealthCheckReconnectUnit: 200 * time.Millisecond,
|
||||
HealthCheckTimeout: time.Second,
|
||||
}
|
||||
if clnt.HealthCheckFn != nil {
|
||||
// make connection pre-emptively.
|
||||
go clnt.HealthCheckFn()
|
||||
}
|
||||
return clnt
|
||||
}
|
||||
|
||||
// IsOnline returns whether the client is likely to be online.
|
||||
@ -441,15 +446,7 @@ func exponentialBackoffWait(r *rand.Rand, unit, cap time.Duration) func(uint) ti
|
||||
}
|
||||
}
|
||||
|
||||
// MarkOffline - will mark a client as being offline and spawns
|
||||
// a goroutine that will attempt to reconnect if HealthCheckFn is set.
|
||||
// returns true if the node changed state from online to offline
|
||||
func (c *Client) MarkOffline(err error) bool {
|
||||
c.Lock()
|
||||
c.lastErr = err
|
||||
c.lastErrTime = time.Now()
|
||||
atomic.StoreInt64(&c.lastConn, time.Now().UnixNano())
|
||||
c.Unlock()
|
||||
func (c *Client) runHealthCheck() bool {
|
||||
// Start goroutine that will attempt to reconnect.
|
||||
// If server is already trying to reconnect this will have no effect.
|
||||
if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) {
|
||||
@ -482,3 +479,16 @@ func (c *Client) MarkOffline(err error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// MarkOffline - will mark a client as being offline and spawns
|
||||
// a goroutine that will attempt to reconnect if HealthCheckFn is set.
|
||||
// returns true if the node changed state from online to offline
|
||||
func (c *Client) MarkOffline(err error) bool {
|
||||
c.Lock()
|
||||
c.lastErr = err
|
||||
c.lastErrTime = time.Now()
|
||||
atomic.StoreInt64(&c.lastConn, time.Now().UnixNano())
|
||||
c.Unlock()
|
||||
|
||||
return c.runHealthCheck()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user