enable xattr capture by default (#18911)

- healing must not set the write xattr
  because that is the job of active healing
  to update. what we need to preserve is
  permanent deletes.

- remove older env for drive monitoring and
  enable it accordingly, as a global value.
This commit is contained in:
Harshavardhana 2024-01-29 23:03:58 -08:00 committed by GitHub
parent 2ddf2ca934
commit 486e2e48ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 21 additions and 38 deletions

View File

@ -1136,15 +1136,14 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
xldisk, ok := disk.(*xlStorageDiskIDCheck)
if ok {
if driveQuorum {
commonWrites, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2)
xldisk.totalWrites.Store(commonWrites)
_, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2)
xldisk.totalDeletes.Store(commonDeletes)
xldisk.storage.setWriteAttribute(commonWrites)
xldisk.storage.setDeleteAttribute(commonDeletes)
}
if globalDriveMonitoring {
go xldisk.monitorDiskWritable(xldisk.diskCtx)
}
}
} else {
disk.Close() // Close the remote storage client, re-initialize with healthchecks.
disk, err = newStorageRESTClient(disk.Endpoint(), true, globalGrid.Load())

View File

@ -56,6 +56,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/pubsub"
"github.com/minio/pkg/v2/certs"
"github.com/minio/pkg/v2/env"
xnet "github.com/minio/pkg/v2/net"
)
@ -413,6 +414,8 @@ var (
globalLocalDrives []StorageAPI
globalLocalDrivesMu sync.RWMutex
globalDriveMonitoring = env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn
// Is MINIO_CI_CD set?
globalIsCICD bool

View File

@ -31,7 +31,6 @@ import (
"time"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v2/env"
@ -89,6 +88,7 @@ type xlStorageDiskIDCheck struct {
diskID string
storage *xlStorage
health *diskHealthTracker
healthCheck bool
// driveStartChecking is a threshold above which we will start to check
// the state of disks, generally this value is less than driveMaxConcurrent
@ -226,20 +226,18 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
xl := xlStorageDiskIDCheck{
storage: storage,
health: newDiskHealthTracker(driveMaxConcurrent),
healthCheck: healthCheck && globalDriveMonitoring,
driveMaxConcurrent: driveMaxConcurrent,
driveStartChecking: driveStartChecking,
}
if driveQuorum {
xl.totalWrites.Store(xl.storage.getWriteAttribute())
xl.totalDeletes.Store(xl.storage.getDeleteAttribute())
}
xl.diskCtx, xl.diskCancel = context.WithCancel(context.TODO())
for i := range xl.apiLatencies[:] {
xl.apiLatencies[i] = &lockedLastMinuteLatency{}
}
if healthCheck && diskActiveMonitoring {
if xl.healthCheck {
go xl.monitorDiskWritable(xl.diskCtx)
}
return &xl
@ -347,10 +345,8 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOption
defer si(&err)
if opts.NoOp {
if driveQuorum {
info.Metrics.TotalWrites = p.totalWrites.Load()
info.Metrics.TotalDeletes = p.totalDeletes.Load()
}
info.Metrics.TotalTokens = uint32(p.driveMaxConcurrent)
info.Metrics.TotalWaiting = uint32(p.health.waiting.Load())
info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load()
@ -362,10 +358,8 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOption
if opts.Metrics {
info.Metrics = p.getMetrics()
}
if driveQuorum {
info.Metrics.TotalWrites = p.totalWrites.Load()
info.Metrics.TotalDeletes = p.totalDeletes.Load()
}
info.Metrics.TotalTokens = uint32(p.driveMaxConcurrent)
info.Metrics.TotalWaiting = uint32(p.health.waiting.Load())
info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load()
@ -521,7 +515,7 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat
return 0, err
}
defer func() {
if driveQuorum && err == nil && !skipAccessChecks(dstVolume) {
if err == nil && !skipAccessChecks(dstVolume) {
p.storage.setWriteAttribute(p.totalWrites.Add(1))
}
done(&err)
@ -571,7 +565,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(ctx context.Context, volume string
return errs
}
defer func() {
if driveQuorum && !skipAccessChecks(volume) {
if !skipAccessChecks(volume) {
var permanentDeletes uint64
var deleteMarkers uint64
@ -638,7 +632,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
defer func() {
defer done(&err)
if driveQuorum && err == nil && !skipAccessChecks(volume) {
if err == nil && !skipAccessChecks(volume) {
if opts.UndoWrite {
p.storage.setWriteAttribute(p.totalWrites.Add(^uint64(0)))
return
@ -828,19 +822,6 @@ const (
diskHealthFaulty
)
// diskActiveMonitoring indicates if we have enabled "active" disk monitoring
var diskActiveMonitoring = true
// Indicates if users want to enable drive_quorum feature
var driveQuorum bool
func init() {
diskActiveMonitoring = (env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn) ||
(env.Get("_MINIO_DISK_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn)
driveQuorum = env.Get("_MINIO_DRIVE_QUORUM", config.EnableOff) == config.EnableOn
}
type diskHealthTracker struct {
// atomic time of last success
lastSuccess int64