2021-04-18 15:41:13 -04:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2020-06-12 23:04:01 -04:00
package cmd
import (
"context"
2022-03-09 14:38:54 -05:00
"errors"
"fmt"
2020-06-12 23:04:01 -04:00
"io"
2022-03-09 14:38:54 -05:00
"math/rand"
2023-07-05 13:40:45 -04:00
"runtime"
2022-03-09 14:38:54 -05:00
"strconv"
2021-03-27 02:24:07 -04:00
"strings"
2021-03-16 23:06:57 -04:00
"sync"
"sync/atomic"
"time"
2023-06-19 20:53:08 -04:00
"github.com/minio/madmin-go/v3"
2023-08-01 13:54:26 -04:00
"github.com/minio/minio/internal/config"
2023-07-13 14:41:55 -04:00
xioutil "github.com/minio/minio/internal/ioutil"
2022-03-09 14:38:54 -05:00
"github.com/minio/minio/internal/logger"
2023-09-04 15:57:37 -04:00
"github.com/minio/pkg/v2/env"
2021-03-16 23:06:57 -04:00
)
//go:generate stringer -type=storageMetric -trimprefix=storageMetric $GOFILE
type storageMetric uint8
const (
storageMetricMakeVolBulk storageMetric = iota
storageMetricMakeVol
storageMetricListVols
storageMetricStatVol
storageMetricDeleteVol
storageMetricWalkDir
storageMetricListDir
storageMetricReadFile
storageMetricAppendFile
storageMetricCreateFile
storageMetricReadFileStream
storageMetricRenameFile
storageMetricRenameData
storageMetricCheckParts
storageMetricDelete
storageMetricDeleteVersions
storageMetricVerifyFile
storageMetricWriteAll
storageMetricDeleteVersion
storageMetricWriteMetadata
2021-04-04 16:32:31 -04:00
storageMetricUpdateMetadata
2021-03-16 23:06:57 -04:00
storageMetricReadVersion
2022-04-20 15:49:05 -04:00
storageMetricReadXL
2021-03-16 23:06:57 -04:00
storageMetricReadAll
2021-12-02 14:29:16 -05:00
storageMetricStatInfoFile
2022-07-19 11:35:29 -04:00
storageMetricReadMultiple
2022-11-28 13:20:55 -05:00
storageMetricDeleteAbandonedParts
2022-12-01 15:10:54 -05:00
storageMetricDiskInfo
2021-03-16 23:06:57 -04:00
// .... add more
2021-03-27 02:24:07 -04:00
storageMetricLast
2020-06-12 23:04:01 -04:00
)
// Detects change in underlying disk.
type xlStorageDiskIDCheck struct {
2023-12-04 14:54:13 -05:00
totalErrsTimeout atomic . Uint64 // Captures all timeout only errors
totalErrsAvailability atomic . Uint64 // Captures all data availability errors such as permission denied, faulty disk and timeout errors.
2022-03-17 13:57:52 -04:00
// apiCalls should be placed first so alignment is guaranteed for atomic operations.
apiCalls [ storageMetricLast ] uint64
2022-01-25 19:31:44 -05:00
apiLatencies [ storageMetricLast ] * lockedLastMinuteLatency
2021-04-02 01:12:03 -04:00
diskID string
2022-03-17 13:57:52 -04:00
storage * xlStorage
2022-03-09 14:38:54 -05:00
health * diskHealthTracker
2023-10-16 20:18:13 -04:00
// diskStartChecking is a threshold above which we will start to check
// the state of disks, generally this value is less than diskMaxConcurrent
diskStartChecking int
// diskMaxConcurrent represents maximum number of running concurrent
// operations for local and (incoming) remote disk operations.
diskMaxConcurrent int
2022-07-05 14:02:30 -04:00
metricsCache timedValue
2023-07-13 14:41:55 -04:00
diskCtx context . Context
cancel context . CancelFunc
2021-03-16 23:06:57 -04:00
}
func ( p * xlStorageDiskIDCheck ) getMetrics ( ) DiskMetrics {
2022-07-05 14:02:30 -04:00
p . metricsCache . Once . Do ( func ( ) {
2023-12-04 14:54:13 -05:00
p . metricsCache . TTL = 5 * time . Second
2022-07-05 14:02:30 -04:00
p . metricsCache . Update = func ( ) ( interface { } , error ) {
diskMetric := DiskMetrics {
2022-07-05 17:45:49 -04:00
LastMinute : make ( map [ string ] AccElem , len ( p . apiLatencies ) ) ,
APICalls : make ( map [ string ] uint64 , len ( p . apiCalls ) ) ,
2022-07-05 14:02:30 -04:00
}
for i , v := range p . apiLatencies {
2022-07-05 17:45:49 -04:00
diskMetric . LastMinute [ storageMetric ( i ) . String ( ) ] = v . total ( )
2022-07-05 14:02:30 -04:00
}
for i := range p . apiCalls {
diskMetric . APICalls [ storageMetric ( i ) . String ( ) ] = atomic . LoadUint64 ( & p . apiCalls [ i ] )
}
return diskMetric , nil
}
} )
m , _ := p . metricsCache . Get ( )
2023-12-04 14:54:13 -05:00
diskMetric := DiskMetrics { }
if m != nil {
diskMetric = m . ( DiskMetrics )
}
// Do not need this value to be cached.
diskMetric . TotalErrorsTimeout = p . totalErrsTimeout . Load ( )
diskMetric . TotalErrorsAvailability = p . totalErrsAvailability . Load ( )
return diskMetric
2021-03-16 23:06:57 -04:00
}
2023-07-05 13:40:45 -04:00
// lockedLastMinuteLatency accumulates totals lockless for each second.
2022-01-25 19:31:44 -05:00
type lockedLastMinuteLatency struct {
2023-07-05 13:40:45 -04:00
cachedSec int64
cached atomic . Pointer [ AccElem ]
mu sync . Mutex
init sync . Once
2022-01-25 19:31:44 -05:00
lastMinuteLatency
2021-03-16 23:06:57 -04:00
}
2022-01-25 19:31:44 -05:00
func ( e * lockedLastMinuteLatency ) add ( value time . Duration ) {
2023-07-05 13:40:45 -04:00
e . addSize ( value , 0 )
2021-03-16 23:06:57 -04:00
}
2022-07-05 17:45:49 -04:00
// addSize will add a duration and size.
func ( e * lockedLastMinuteLatency ) addSize ( value time . Duration , sz int64 ) {
2023-07-05 13:40:45 -04:00
// alloc on every call, so we have a clean entry to swap in.
t := time . Now ( ) . Unix ( )
e . init . Do ( func ( ) {
e . cached . Store ( & AccElem { } )
atomic . StoreInt64 ( & e . cachedSec , t )
} )
acc := e . cached . Load ( )
if lastT := atomic . LoadInt64 ( & e . cachedSec ) ; lastT != t {
// Check if lastT was changed by someone else.
if atomic . CompareAndSwapInt64 ( & e . cachedSec , lastT , t ) {
// Now we swap in a new.
newAcc := & AccElem { }
old := e . cached . Swap ( newAcc )
var a AccElem
a . Size = atomic . LoadInt64 ( & old . Size )
a . Total = atomic . LoadInt64 ( & old . Total )
a . N = atomic . LoadInt64 ( & old . N )
e . mu . Lock ( )
e . lastMinuteLatency . addAll ( t - 1 , a )
e . mu . Unlock ( )
acc = newAcc
} else {
// We may be able to grab the new accumulator by yielding.
runtime . Gosched ( )
acc = e . cached . Load ( )
}
}
atomic . AddInt64 ( & acc . N , 1 )
atomic . AddInt64 ( & acc . Total , int64 ( value ) )
atomic . AddInt64 ( & acc . Size , sz )
2022-07-05 17:45:49 -04:00
}
// total returns the total call count and latency for the last minute.
func ( e * lockedLastMinuteLatency ) total ( ) AccElem {
2023-07-05 13:40:45 -04:00
e . mu . Lock ( )
defer e . mu . Unlock ( )
2022-07-05 17:45:49 -04:00
return e . lastMinuteLatency . getTotal ( )
2021-03-16 23:06:57 -04:00
}
2023-10-24 00:42:36 -04:00
var maxConcurrentOnce sync . Once
2023-07-13 14:41:55 -04:00
func newXLStorageDiskIDCheck ( storage * xlStorage , healthCheck bool ) * xlStorageDiskIDCheck {
2023-10-24 00:42:36 -04:00
// diskMaxConcurrent represents maximum number of running concurrent
// operations for local and (incoming) remote disk operations.
//
// this value is a placeholder it is overridden via ENV for custom settings
// or this default value is used to pick the correct value HDDs v/s NVMe's
diskMaxConcurrent := - 1
maxConcurrentOnce . Do ( func ( ) {
s := env . Get ( "_MINIO_DRIVE_MAX_CONCURRENT" , "" )
if s == "" {
s = env . Get ( "_MINIO_DISK_MAX_CONCURRENT" , "" )
}
if s != "" {
diskMaxConcurrent , _ = strconv . Atoi ( s )
}
} )
2023-10-16 20:18:13 -04:00
if diskMaxConcurrent <= 0 {
diskMaxConcurrent = 512
if storage . rotational {
2023-10-24 00:42:36 -04:00
diskMaxConcurrent = int ( storage . nrRequests ) / 2
2023-10-24 20:21:06 -04:00
if diskMaxConcurrent < 32 {
2023-10-24 00:42:36 -04:00
diskMaxConcurrent = 32
}
2023-10-16 20:18:13 -04:00
}
}
2023-10-24 00:42:36 -04:00
2023-10-16 20:18:13 -04:00
diskStartChecking := 16 + diskMaxConcurrent / 8
if diskStartChecking > diskMaxConcurrent {
diskStartChecking = diskMaxConcurrent
}
2021-03-16 23:06:57 -04:00
xl := xlStorageDiskIDCheck {
2023-10-16 20:18:13 -04:00
storage : storage ,
health : newDiskHealthTracker ( diskMaxConcurrent ) ,
diskMaxConcurrent : diskMaxConcurrent ,
diskStartChecking : diskStartChecking ,
2021-03-16 23:06:57 -04:00
}
2023-07-13 14:41:55 -04:00
xl . diskCtx , xl . cancel = context . WithCancel ( context . TODO ( ) )
2021-03-16 23:06:57 -04:00
for i := range xl . apiLatencies [ : ] {
2022-01-25 19:31:44 -05:00
xl . apiLatencies [ i ] = & lockedLastMinuteLatency { }
2021-03-16 23:06:57 -04:00
}
2023-08-01 13:54:26 -04:00
if healthCheck && diskActiveMonitoring {
2023-07-13 14:41:55 -04:00
go xl . monitorDiskWritable ( xl . diskCtx )
}
2021-03-16 23:06:57 -04:00
return & xl
2020-06-12 23:04:01 -04:00
}
func ( p * xlStorageDiskIDCheck ) String ( ) string {
return p . storage . String ( )
}
func ( p * xlStorageDiskIDCheck ) IsOnline ( ) bool {
storedDiskID , err := p . storage . GetDiskID ( )
if err != nil {
return false
}
return storedDiskID == p . diskID
}
2021-05-11 12:19:15 -04:00
func ( p * xlStorageDiskIDCheck ) LastConn ( ) time . Time {
return p . storage . LastConn ( )
}
2020-06-12 23:04:01 -04:00
func ( p * xlStorageDiskIDCheck ) IsLocal ( ) bool {
return p . storage . IsLocal ( )
}
2020-09-28 22:39:32 -04:00
func ( p * xlStorageDiskIDCheck ) Endpoint ( ) Endpoint {
return p . storage . Endpoint ( )
}
2020-06-12 23:04:01 -04:00
func ( p * xlStorageDiskIDCheck ) Hostname ( ) string {
return p . storage . Hostname ( )
}
2021-03-04 17:36:23 -05:00
func ( p * xlStorageDiskIDCheck ) Healing ( ) * healingTracker {
2020-09-28 22:39:32 -04:00
return p . storage . Healing ( )
}
2022-04-07 11:10:40 -04:00
func ( p * xlStorageDiskIDCheck ) NSScanner ( ctx context . Context , cache dataUsageCache , updates chan <- dataUsageEntry , scanMode madmin . HealScanMode ) ( dataUsageCache , error ) {
2021-09-17 17:11:01 -04:00
if contextCanceled ( ctx ) {
2023-04-04 13:51:05 -04:00
close ( updates )
2021-03-03 21:36:43 -05:00
return dataUsageCache { } , ctx . Err ( )
}
2020-07-13 12:51:07 -04:00
if err := p . checkDiskStale ( ) ; err != nil {
2023-04-04 13:51:05 -04:00
close ( updates )
2020-07-13 12:51:07 -04:00
return dataUsageCache { } , err
2020-06-12 23:04:01 -04:00
}
2022-04-07 11:10:40 -04:00
return p . storage . NSScanner ( ctx , cache , updates , scanMode )
2020-06-12 23:04:01 -04:00
}
2021-03-04 17:36:23 -05:00
func ( p * xlStorageDiskIDCheck ) GetDiskLoc ( ) ( poolIdx , setIdx , diskIdx int ) {
return p . storage . GetDiskLoc ( )
}
func ( p * xlStorageDiskIDCheck ) SetDiskLoc ( poolIdx , setIdx , diskIdx int ) {
p . storage . SetDiskLoc ( poolIdx , setIdx , diskIdx )
}
2020-06-12 23:04:01 -04:00
func ( p * xlStorageDiskIDCheck ) Close ( ) error {
2023-07-13 14:41:55 -04:00
p . cancel ( )
2020-06-12 23:04:01 -04:00
return p . storage . Close ( )
}
func ( p * xlStorageDiskIDCheck ) GetDiskID ( ) ( string , error ) {
return p . storage . GetDiskID ( )
}
func ( p * xlStorageDiskIDCheck ) SetDiskID ( id string ) {
p . diskID = id
}
2020-07-13 12:51:07 -04:00
func ( p * xlStorageDiskIDCheck ) checkDiskStale ( ) error {
2020-06-12 23:04:01 -04:00
if p . diskID == "" {
2020-07-13 12:51:07 -04:00
// For empty disk-id we allow the call as the server might be
// coming up and trying to read format.json or create format.json
return nil
2020-06-12 23:04:01 -04:00
}
storedDiskID , err := p . storage . GetDiskID ( )
2020-07-13 12:51:07 -04:00
if err != nil {
// return any error generated while reading `format.json`
return err
}
2020-06-12 23:04:01 -04:00
if err == nil && p . diskID == storedDiskID {
2020-07-13 12:51:07 -04:00
return nil
2020-06-12 23:04:01 -04:00
}
2020-07-13 12:51:07 -04:00
// not the same disk we remember, take it offline.
return errDiskNotFound
2020-06-12 23:04:01 -04:00
}
2023-07-31 18:20:48 -04:00
func ( p * xlStorageDiskIDCheck ) DiskInfo ( ctx context . Context , metrics bool ) ( info DiskInfo , err error ) {
2021-09-17 17:11:01 -04:00
if contextCanceled ( ctx ) {
2021-03-03 21:36:43 -05:00
return DiskInfo { } , ctx . Err ( )
}
2022-12-01 15:10:54 -05:00
si := p . updateStorageMetrics ( storageMetricDiskInfo )
defer si ( & err )
2023-08-02 01:19:56 -04:00
defer func ( ) {
if metrics {
info . Metrics = p . getMetrics ( )
}
} ( )
2023-08-01 15:47:50 -04:00
2023-07-25 19:58:31 -04:00
if p . health . isFaulty ( ) {
// if disk is already faulty return faulty for 'mc admin info' output and prometheus alerts.
return info , errFaultyDisk
}
2023-07-31 18:20:48 -04:00
info , err = p . storage . DiskInfo ( ctx , metrics )
2020-07-13 12:51:07 -04:00
if err != nil {
return info , err
}
2021-03-16 23:06:57 -04:00
2020-07-16 10:30:05 -04:00
// check cached diskID against backend
// only if its non-empty.
2023-07-25 19:58:31 -04:00
if p . diskID != "" && p . diskID != info . ID {
return info , errDiskNotFound
2022-03-09 14:38:54 -05:00
}
2020-07-13 12:51:07 -04:00
return info , nil
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) MakeVolBulk ( ctx context . Context , volumes ... string ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricMakeVolBulk , volumes ... )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . MakeVolBulk ( ctx , volumes ... ) } )
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) MakeVol ( ctx context . Context , volume string ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricMakeVol , volume )
if err != nil {
return err
}
defer done ( & err )
2021-03-03 21:36:43 -05:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . MakeVol ( ctx , volume ) } )
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
func ( p * xlStorageDiskIDCheck ) ListVols ( ctx context . Context ) ( vi [ ] VolInfo , err error ) {
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricListVols , "/" )
if err != nil {
2020-07-13 12:51:07 -04:00
return nil , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-09-04 12:45:06 -04:00
return p . storage . ListVols ( ctx )
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) StatVol ( ctx context . Context , volume string ) ( vol VolInfo , err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricStatVol , volume )
if err != nil {
2020-07-13 12:51:07 -04:00
return vol , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
err = w . Run ( func ( ) error {
var ierr error
vol , ierr = p . storage . StatVol ( ctx , volume )
return ierr
} )
return vol , err
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) DeleteVol ( ctx context . Context , volume string , forceDelete bool ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricDeleteVol , volume )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . DeleteVol ( ctx , volume , forceDelete ) } )
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
func ( p * xlStorageDiskIDCheck ) ListDir ( ctx context . Context , volume , dirPath string , count int ) ( s [ ] string , err error ) {
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricListDir , volume , dirPath )
if err != nil {
2020-07-13 12:51:07 -04:00
return nil , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2020-09-04 12:45:06 -04:00
return p . storage . ListDir ( ctx , volume , dirPath , count )
2020-06-12 23:04:01 -04:00
}
2023-07-28 18:37:53 -04:00
// Legacy API - does not have any deadlines
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) ReadFile ( ctx context . Context , volume string , path string , offset int64 , buf [ ] byte , verifier * BitrotVerifier ) ( n int64 , err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricReadFile , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return 0 , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-11-10 13:10:14 -05:00
err = w . Run ( func ( ) error {
n , err = p . storage . ReadFile ( ctx , volume , path , offset , buf , verifier )
return err
} )
return n , err
2020-06-12 23:04:01 -04:00
}
2023-07-28 18:37:53 -04:00
// Legacy API - does not have any deadlines
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) AppendFile ( ctx context . Context , volume string , path string , buf [ ] byte ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricAppendFile , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-11-10 13:10:14 -05:00
return w . Run ( func ( ) error {
return p . storage . AppendFile ( ctx , volume , path , buf )
} )
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
func ( p * xlStorageDiskIDCheck ) CreateFile ( ctx context . Context , volume , path string , size int64 , reader io . Reader ) ( err error ) {
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricCreateFile , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
return p . storage . CreateFile ( ctx , volume , path , size , xioutil . NewDeadlineReader ( io . NopCloser ( reader ) , globalDriveConfig . GetMaxTimeout ( ) ) )
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) ReadFileStream ( ctx context . Context , volume , path string , offset , length int64 ) ( io . ReadCloser , error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricReadFileStream , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return nil , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-11-10 13:10:14 -05:00
var rc io . ReadCloser
err = w . Run ( func ( ) error {
var ierr error
rc , ierr = p . storage . ReadFileStream ( ctx , volume , path , offset , length )
return ierr
} )
2023-07-28 18:37:53 -04:00
if err != nil {
2023-11-10 13:10:14 -05:00
return nil , err
2023-07-28 18:37:53 -04:00
}
2023-11-27 12:15:06 -05:00
return xioutil . NewDeadlineReader ( rc , globalDriveConfig . GetMaxTimeout ( ) ) , nil
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
func ( p * xlStorageDiskIDCheck ) RenameFile ( ctx context . Context , srcVolume , srcPath , dstVolume , dstPath string ) ( err error ) {
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricRenameFile , srcVolume , srcPath , dstVolume , dstPath )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . RenameFile ( ctx , srcVolume , srcPath , dstVolume , dstPath ) } )
2020-06-12 23:04:01 -04:00
}
2022-09-05 19:51:37 -04:00
func ( p * xlStorageDiskIDCheck ) RenameData ( ctx context . Context , srcVolume , srcPath string , fi FileInfo , dstVolume , dstPath string ) ( sign uint64 , err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricRenameData , srcPath , fi . DataDir , dstVolume , dstPath )
if err != nil {
2022-09-05 19:51:37 -04:00
return 0 , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
err = w . Run ( func ( ) error {
var ierr error
sign , ierr = p . storage . RenameData ( ctx , srcVolume , srcPath , fi , dstVolume , dstPath )
return ierr
} )
return sign , err
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) CheckParts ( ctx context . Context , volume string , path string , fi FileInfo ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricCheckParts , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . CheckParts ( ctx , volume , path , fi ) } )
2020-06-12 23:04:01 -04:00
}
2022-07-11 12:15:54 -04:00
func ( p * xlStorageDiskIDCheck ) Delete ( ctx context . Context , volume string , path string , deleteOpts DeleteOptions ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricDelete , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . Delete ( ctx , volume , path , deleteOpts ) } )
2020-06-12 23:04:01 -04:00
}
2021-03-27 02:24:07 -04:00
// DeleteVersions deletes slice of versions, it can be same object
// or multiple objects.
2021-11-01 13:50:07 -04:00
func ( p * xlStorageDiskIDCheck ) DeleteVersions ( ctx context . Context , volume string , versions [ ] FileInfoVersions ) ( errs [ ] error ) {
// Merely for tracing storage
2021-03-27 02:24:07 -04:00
path := ""
if len ( versions ) > 0 {
path = versions [ 0 ] . Name
}
errs = make ( [ ] error , len ( versions ) )
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricDeleteVersions , volume , path )
if err != nil {
2021-03-16 23:06:57 -04:00
for i := range errs {
errs [ i ] = ctx . Err ( )
}
return errs
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2023-07-25 19:58:31 -04:00
2022-03-09 14:38:54 -05:00
errs = p . storage . DeleteVersions ( ctx , volume , versions )
for i := range errs {
if errs [ i ] != nil {
err = errs [ i ]
break
2020-06-12 23:04:01 -04:00
}
}
2021-03-27 02:24:07 -04:00
2022-03-09 14:38:54 -05:00
return errs
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
func ( p * xlStorageDiskIDCheck ) VerifyFile ( ctx context . Context , volume , path string , fi FileInfo ) ( err error ) {
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricVerifyFile , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2020-09-04 12:45:06 -04:00
return p . storage . VerifyFile ( ctx , volume , path , fi )
2020-06-12 23:04:01 -04:00
}
2020-11-02 19:14:31 -05:00
func ( p * xlStorageDiskIDCheck ) WriteAll ( ctx context . Context , volume string , path string , b [ ] byte ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricWriteAll , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . WriteAll ( ctx , volume , path , b ) } )
2020-06-12 23:04:01 -04:00
}
2021-02-03 13:33:43 -05:00
func ( p * xlStorageDiskIDCheck ) DeleteVersion ( ctx context . Context , volume , path string , fi FileInfo , forceDelMarker bool ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricDeleteVersion , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . DeleteVersion ( ctx , volume , path , fi , forceDelMarker ) } )
2020-06-12 23:04:01 -04:00
}
2023-08-25 10:58:11 -04:00
func ( p * xlStorageDiskIDCheck ) UpdateMetadata ( ctx context . Context , volume , path string , fi FileInfo , opts UpdateMetadataOpts ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricUpdateMetadata , volume , path )
if err != nil {
2021-04-04 16:32:31 -04:00
return err
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2021-04-04 16:32:31 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-08-25 10:58:11 -04:00
return w . Run ( func ( ) error { return p . storage . UpdateMetadata ( ctx , volume , path , fi , opts ) } )
2021-04-04 16:32:31 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) WriteMetadata ( ctx context . Context , volume , path string , fi FileInfo ) ( err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricWriteMetadata , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-25 19:58:31 -04:00
return w . Run ( func ( ) error { return p . storage . WriteMetadata ( ctx , volume , path , fi ) } )
2020-06-12 23:04:01 -04:00
}
2023-11-21 00:33:47 -05:00
func ( p * xlStorageDiskIDCheck ) ReadVersion ( ctx context . Context , volume , path , versionID string , opts ReadOptions ) ( fi FileInfo , err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricReadVersion , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return fi , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-27 10:33:05 -04:00
rerr := w . Run ( func ( ) error {
2023-11-21 00:33:47 -05:00
fi , err = p . storage . ReadVersion ( ctx , volume , path , versionID , opts )
2023-07-27 10:33:05 -04:00
return err
} )
if rerr != nil {
return fi , rerr
}
return fi , err
2020-06-12 23:04:01 -04:00
}
2020-09-04 12:45:06 -04:00
func ( p * xlStorageDiskIDCheck ) ReadAll ( ctx context . Context , volume string , path string ) ( buf [ ] byte , err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricReadAll , volume , path )
if err != nil {
2020-07-13 12:51:07 -04:00
return nil , err
2020-06-12 23:04:01 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2020-07-13 12:51:07 -04:00
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-27 10:33:05 -04:00
rerr := w . Run ( func ( ) error {
buf , err = p . storage . ReadAll ( ctx , volume , path )
return err
} )
if rerr != nil {
return buf , rerr
}
return buf , err
2020-06-12 23:04:01 -04:00
}
2021-03-16 23:06:57 -04:00
2022-04-20 15:49:05 -04:00
func ( p * xlStorageDiskIDCheck ) ReadXL ( ctx context . Context , volume string , path string , readData bool ) ( rf RawFileInfo , err error ) {
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricReadXL , volume , path )
if err != nil {
return RawFileInfo { } , err
}
defer done ( & err )
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-27 10:33:05 -04:00
rerr := w . Run ( func ( ) error {
rf , err = p . storage . ReadXL ( ctx , volume , path , readData )
return err
} )
if rerr != nil {
return rf , rerr
}
return rf , err
2022-04-20 15:49:05 -04:00
}
2021-10-01 14:50:00 -04:00
func ( p * xlStorageDiskIDCheck ) StatInfoFile ( ctx context . Context , volume , path string , glob bool ) ( stat [ ] StatInfo , err error ) {
2022-03-09 14:38:54 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricStatInfoFile , volume , path )
if err != nil {
2021-10-01 14:50:00 -04:00
return nil , err
2021-07-09 14:29:16 -04:00
}
2022-03-09 14:38:54 -05:00
defer done ( & err )
2021-07-09 14:29:16 -04:00
2021-10-01 14:50:00 -04:00
return p . storage . StatInfoFile ( ctx , volume , path , glob )
2021-07-09 14:29:16 -04:00
}
2023-07-27 10:33:05 -04:00
// ReadMultiple will read multiple files and send each files as response.
2022-07-19 11:35:29 -04:00
// Files are read and returned in the given order.
// The resp channel is closed before the call returns.
// Only a canceled context will return an error.
2023-07-27 10:33:05 -04:00
func ( p * xlStorageDiskIDCheck ) ReadMultiple ( ctx context . Context , req ReadMultipleReq , resp chan <- ReadMultipleResp ) ( err error ) {
2022-07-19 11:35:29 -04:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricReadMultiple , req . Bucket , req . Prefix )
if err != nil {
close ( resp )
return err
}
defer done ( & err )
return p . storage . ReadMultiple ( ctx , req , resp )
}
2022-11-28 13:20:55 -05:00
// CleanAbandonedData will read metadata of the object on disk
// and delete any data directories and inline data that isn't referenced in metadata.
2023-07-27 10:33:05 -04:00
func ( p * xlStorageDiskIDCheck ) CleanAbandonedData ( ctx context . Context , volume string , path string ) ( err error ) {
2022-11-28 13:20:55 -05:00
ctx , done , err := p . TrackDiskHealth ( ctx , storageMetricDeleteAbandonedParts , volume , path )
if err != nil {
return err
}
defer done ( & err )
2023-11-27 12:15:06 -05:00
w := xioutil . NewDeadlineWorker ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-27 10:33:05 -04:00
return w . Run ( func ( ) error { return p . storage . CleanAbandonedData ( ctx , volume , path ) } )
2022-11-28 13:20:55 -05:00
}
2022-12-01 15:10:54 -05:00
func storageTrace ( s storageMetric , startTime time . Time , duration time . Duration , path string , err string ) madmin . TraceInfo {
2021-05-06 11:52:02 -04:00
return madmin . TraceInfo {
TraceType : madmin . TraceStorage ,
2021-03-27 02:24:07 -04:00
Time : startTime ,
NodeName : globalLocalNodeName ,
2021-03-27 13:07:07 -04:00
FuncName : "storage." + s . String ( ) ,
2022-07-05 17:45:49 -04:00
Duration : duration ,
Path : path ,
2022-12-01 15:10:54 -05:00
Error : err ,
2022-07-05 17:45:49 -04:00
}
}
2023-02-21 12:33:33 -05:00
func scannerTrace ( s scannerMetric , startTime time . Time , duration time . Duration , path string , custom map [ string ] string ) madmin . TraceInfo {
2022-07-05 17:45:49 -04:00
return madmin . TraceInfo {
TraceType : madmin . TraceScanner ,
Time : startTime ,
NodeName : globalLocalNodeName ,
FuncName : "scanner." + s . String ( ) ,
Duration : duration ,
Path : path ,
2023-02-21 12:33:33 -05:00
Custom : custom ,
2021-03-27 02:24:07 -04:00
}
}
2021-03-16 23:06:57 -04:00
// Update storage metrics
2022-03-09 14:38:54 -05:00
func ( p * xlStorageDiskIDCheck ) updateStorageMetrics ( s storageMetric , paths ... string ) func ( err * error ) {
2021-03-16 23:06:57 -04:00
startTime := time . Now ( )
2022-07-05 17:45:49 -04:00
trace := globalTrace . NumSubscribers ( madmin . TraceStorage ) > 0
2022-12-01 15:10:54 -05:00
return func ( errp * error ) {
2021-03-27 02:24:07 -04:00
duration := time . Since ( startTime )
2023-07-30 02:26:26 -04:00
var err error
if errp != nil && * errp != nil {
err = * errp
}
2021-03-16 23:06:57 -04:00
atomic . AddUint64 ( & p . apiCalls [ s ] , 1 )
2023-07-30 02:26:26 -04:00
if IsErr ( err , [ ] error {
errVolumeAccessDenied ,
errFileAccessDenied ,
errDiskAccessDenied ,
errFaultyDisk ,
errFaultyRemoteDisk ,
context . DeadlineExceeded ,
context . Canceled ,
} ... ) {
2023-12-04 14:54:13 -05:00
p . totalErrsAvailability . Add ( 1 )
2023-07-30 02:26:26 -04:00
if errors . Is ( err , context . DeadlineExceeded ) || errors . Is ( err , context . Canceled ) {
2023-12-04 14:54:13 -05:00
p . totalErrsTimeout . Add ( 1 )
2023-07-30 02:26:26 -04:00
}
}
2022-01-25 19:31:44 -05:00
p . apiLatencies [ s ] . add ( duration )
2021-03-27 02:24:07 -04:00
if trace {
2023-07-30 02:26:26 -04:00
paths = append ( [ ] string { p . String ( ) } , paths ... )
2022-12-01 15:10:54 -05:00
var errStr string
2023-07-30 02:26:26 -04:00
if err != nil {
errStr = err . Error ( )
2022-12-01 15:10:54 -05:00
}
globalTrace . Publish ( storageTrace ( s , startTime , duration , strings . Join ( paths , " " ) , errStr ) )
2021-03-27 02:24:07 -04:00
}
2021-03-16 23:06:57 -04:00
}
}
2022-03-09 14:38:54 -05:00
const (
diskHealthOK = iota
diskHealthFaulty
)
2023-08-01 13:54:26 -04:00
// diskActiveMonitoring indicates if we have enabled "active" disk monitoring
var diskActiveMonitoring = true
2022-03-09 14:38:54 -05:00
func init ( ) {
2023-09-11 15:19:22 -04:00
diskActiveMonitoring = ( env . Get ( "_MINIO_DRIVE_ACTIVE_MONITORING" , config . EnableOn ) == config . EnableOn ) ||
( env . Get ( "_MINIO_DISK_ACTIVE_MONITORING" , config . EnableOn ) == config . EnableOn )
2022-03-09 14:38:54 -05:00
}
type diskHealthTracker struct {
// atomic time of last success
lastSuccess int64
// atomic time of last time a token was grabbed.
lastStarted int64
// Atomic status of disk.
status int32
// Atomic number of requests blocking for a token.
blocked int32
// Concurrency tokens.
tokens chan struct { }
}
// newDiskHealthTracker creates a new disk health tracker.
2023-10-16 20:18:13 -04:00
func newDiskHealthTracker ( diskMaxConcurrent int ) * diskHealthTracker {
2022-03-09 14:38:54 -05:00
d := diskHealthTracker {
lastSuccess : time . Now ( ) . UnixNano ( ) ,
lastStarted : time . Now ( ) . UnixNano ( ) ,
status : diskHealthOK ,
tokens : make ( chan struct { } , diskMaxConcurrent ) ,
}
for i := 0 ; i < diskMaxConcurrent ; i ++ {
d . tokens <- struct { } { }
}
return & d
}
// logSuccess will update the last successful operation time.
func ( d * diskHealthTracker ) logSuccess ( ) {
atomic . StoreInt64 ( & d . lastSuccess , time . Now ( ) . UnixNano ( ) )
}
func ( d * diskHealthTracker ) isFaulty ( ) bool {
return atomic . LoadInt32 ( & d . status ) == diskHealthFaulty
}
type (
healthDiskCtxKey struct { }
healthDiskCtxValue struct {
lastSuccess * int64
}
)
// logSuccess will update the last successful operation time.
func ( h * healthDiskCtxValue ) logSuccess ( ) {
atomic . StoreInt64 ( h . lastSuccess , time . Now ( ) . UnixNano ( ) )
}
// noopDoneFunc is a no-op done func.
// Can be reused.
var noopDoneFunc = func ( _ * error ) { }
// TrackDiskHealth for this request.
// When a non-nil error is returned 'done' MUST be called
// with the status of the response, if it corresponds to disk health.
// If the pointer sent to done is non-nil AND the error
// is either nil or io.EOF the disk is considered good.
// So if unsure if the disk status is ok, return nil as a parameter to done.
// Shadowing will work as long as return error is named: https://go.dev/play/p/sauq86SsTN2
func ( p * xlStorageDiskIDCheck ) TrackDiskHealth ( ctx context . Context , s storageMetric , paths ... string ) ( c context . Context , done func ( * error ) , err error ) {
done = noopDoneFunc
if contextCanceled ( ctx ) {
return ctx , done , ctx . Err ( )
}
// Return early if disk is faulty already.
2023-07-07 03:13:57 -04:00
if err := p . checkHealth ( ctx ) ; err != nil {
return ctx , done , err
2022-03-09 14:38:54 -05:00
}
// Verify if the disk is not stale
// - missing format.json (unformatted drive)
// - format.json is valid but invalid 'uuid'
if err = p . checkDiskStale ( ) ; err != nil {
return ctx , done , err
}
// Disallow recursive tracking to avoid deadlocks.
if ctx . Value ( healthDiskCtxKey { } ) != nil {
done = p . updateStorageMetrics ( s , paths ... )
return ctx , done , nil
}
select {
case <- ctx . Done ( ) :
return ctx , done , ctx . Err ( )
case <- p . health . tokens :
// Fast path, got token.
default :
// We ran out of tokens, check health before blocking.
err = p . waitForToken ( ctx )
if err != nil {
return ctx , done , err
}
}
// We only progress here if we got a token.
atomic . StoreInt64 ( & p . health . lastStarted , time . Now ( ) . UnixNano ( ) )
ctx = context . WithValue ( ctx , healthDiskCtxKey { } , & healthDiskCtxValue { lastSuccess : & p . health . lastSuccess } )
si := p . updateStorageMetrics ( s , paths ... )
var once sync . Once
return ctx , func ( errp * error ) {
once . Do ( func ( ) {
p . health . tokens <- struct { } { }
if errp != nil {
err := * errp
2023-08-01 15:47:50 -04:00
if err == nil || errors . Is ( err , io . EOF ) {
p . health . logSuccess ( )
2022-03-09 14:38:54 -05:00
}
}
si ( errp )
} )
} , nil
}
// waitForToken will wait for a token, while periodically
// checking the disk status.
// If nil is returned a token was picked up.
func ( p * xlStorageDiskIDCheck ) waitForToken ( ctx context . Context ) ( err error ) {
atomic . AddInt32 ( & p . health . blocked , 1 )
defer func ( ) {
atomic . AddInt32 ( & p . health . blocked , - 1 )
} ( )
// Avoid stampeding herd...
ticker := time . NewTicker ( 5 * time . Second + time . Duration ( rand . Int63n ( int64 ( 5 * time . Second ) ) ) )
defer ticker . Stop ( )
for {
err = p . checkHealth ( ctx )
if err != nil {
return err
}
select {
case <- ticker . C :
// Ticker expired, check health again.
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- p . health . tokens :
return nil
}
}
}
// checkHealth should only be called when tokens have run out.
// This will check if disk should be taken offline.
func ( p * xlStorageDiskIDCheck ) checkHealth ( ctx context . Context ) ( err error ) {
if atomic . LoadInt32 ( & p . health . status ) == diskHealthFaulty {
return errFaultyDisk
}
// Check if there are tokens.
2023-10-16 20:18:13 -04:00
if p . diskMaxConcurrent - len ( p . health . tokens ) < p . diskStartChecking {
2022-03-09 14:38:54 -05:00
return nil
}
const maxTimeSinceLastSuccess = 30 * time . Second
const minTimeSinceLastOpStarted = 15 * time . Second
// To avoid stampeding herd (100s of simultaneous starting requests)
// there must be a delay between the last started request and now
// for the last lastSuccess to be useful.
t := time . Since ( time . Unix ( 0 , atomic . LoadInt64 ( & p . health . lastStarted ) ) )
if t < minTimeSinceLastOpStarted {
return nil
}
// If also more than 15 seconds since last success, take disk offline.
t = time . Since ( time . Unix ( 0 , atomic . LoadInt64 ( & p . health . lastSuccess ) ) )
if t > maxTimeSinceLastSuccess {
if atomic . CompareAndSwapInt32 ( & p . health . status , diskHealthOK , diskHealthFaulty ) {
2023-05-03 18:05:45 -04:00
logger . LogAlwaysIf ( ctx , fmt . Errorf ( "node(%s): taking drive %s offline, time since last response %v" , globalLocalNodeName , p . storage . String ( ) , t . Round ( time . Millisecond ) ) )
2023-07-25 19:58:31 -04:00
go p . monitorDiskStatus ( t )
2022-03-09 14:38:54 -05:00
}
return errFaultyDisk
}
return nil
}
// monitorDiskStatus should be called once when a drive has been marked offline.
// Once the disk has been deemed ok, it will return to online status.
2023-07-25 19:58:31 -04:00
func ( p * xlStorageDiskIDCheck ) monitorDiskStatus ( spent time . Duration ) {
2022-03-09 14:38:54 -05:00
t := time . NewTicker ( 5 * time . Second )
defer t . Stop ( )
2023-07-25 19:58:31 -04:00
2022-03-09 14:38:54 -05:00
fn := mustGetUUID ( )
for range t . C {
if len ( p . health . tokens ) == 0 {
// Queue is still full, no need to check.
continue
}
err := p . storage . WriteAll ( context . Background ( ) , minioMetaTmpBucket , fn , [ ] byte { 10000 : 42 } )
if err != nil {
continue
}
b , err := p . storage . ReadAll ( context . Background ( ) , minioMetaTmpBucket , fn )
if err != nil || len ( b ) != 10001 {
continue
}
2022-07-11 12:15:54 -04:00
err = p . storage . Delete ( context . Background ( ) , minioMetaTmpBucket , fn , DeleteOptions {
Recursive : false ,
2023-11-29 01:35:16 -05:00
Immediate : false ,
2022-07-11 12:15:54 -04:00
} )
2022-03-09 14:38:54 -05:00
if err == nil {
2023-07-25 19:58:31 -04:00
t := time . Unix ( 0 , atomic . LoadInt64 ( & p . health . lastSuccess ) )
if spent > 0 {
t = t . Add ( spent )
}
logger . Info ( "node(%s): Read/Write/Delete successful, bringing drive %s online. Drive was offline for %s." , globalLocalNodeName , p . storage . String ( ) , time . Since ( t ) )
2022-03-09 14:38:54 -05:00
atomic . StoreInt32 ( & p . health . status , diskHealthOK )
return
}
}
}
2023-07-13 14:41:55 -04:00
// monitorDiskStatus should be called once when a drive has been marked offline.
// Once the disk has been deemed ok, it will return to online status.
func ( p * xlStorageDiskIDCheck ) monitorDiskWritable ( ctx context . Context ) {
2023-07-25 19:58:31 -04:00
var (
2023-07-13 14:41:55 -04:00
// We check every 15 seconds if the disk is writable and we can read back.
checkEvery = 15 * time . Second
// If the disk has completed an operation successfully within last 5 seconds, don't check it.
skipIfSuccessBefore = 5 * time . Second
)
2023-07-25 19:58:31 -04:00
// if disk max timeout is smaller than checkEvery window
// reduce checks by a second.
2023-11-27 12:15:06 -05:00
if globalDriveConfig . GetMaxTimeout ( ) <= checkEvery {
checkEvery = globalDriveConfig . GetMaxTimeout ( ) - time . Second
2023-07-25 23:02:22 -04:00
if checkEvery <= 0 {
2023-11-27 12:15:06 -05:00
checkEvery = globalDriveConfig . GetMaxTimeout ( )
2023-07-25 23:02:22 -04:00
}
2023-07-25 19:58:31 -04:00
}
// if disk max timeout is smaller than skipIfSuccessBefore window
// reduce the skipIfSuccessBefore by a second.
2023-11-27 12:15:06 -05:00
if globalDriveConfig . GetMaxTimeout ( ) <= skipIfSuccessBefore {
skipIfSuccessBefore = globalDriveConfig . GetMaxTimeout ( ) - time . Second
2023-07-25 23:02:22 -04:00
if skipIfSuccessBefore <= 0 {
2023-11-27 12:15:06 -05:00
skipIfSuccessBefore = globalDriveConfig . GetMaxTimeout ( )
2023-07-25 23:02:22 -04:00
}
2023-07-25 19:58:31 -04:00
}
2023-07-13 14:41:55 -04:00
t := time . NewTicker ( checkEvery )
defer t . Stop ( )
fn := mustGetUUID ( )
// Be just above directio size.
toWrite := [ ] byte { xioutil . DirectioAlignSize + 1 : 42 }
rng := rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
2023-08-01 13:54:26 -04:00
monitor := func ( ) bool {
2023-07-13 14:41:55 -04:00
if contextCanceled ( ctx ) {
2023-08-01 13:54:26 -04:00
return false
2023-07-13 14:41:55 -04:00
}
2023-08-01 13:54:26 -04:00
2023-07-13 14:41:55 -04:00
if atomic . LoadInt32 ( & p . health . status ) != diskHealthOK {
2023-08-01 13:54:26 -04:00
return true
2023-07-13 14:41:55 -04:00
}
2023-08-01 13:54:26 -04:00
2023-07-13 14:41:55 -04:00
if time . Since ( time . Unix ( 0 , atomic . LoadInt64 ( & p . health . lastSuccess ) ) ) < skipIfSuccessBefore {
// We recently saw a success - no need to check.
2023-08-01 13:54:26 -04:00
return true
2023-07-13 14:41:55 -04:00
}
2023-08-01 13:54:26 -04:00
2023-07-25 19:58:31 -04:00
goOffline := func ( err error , spent time . Duration ) {
2023-07-13 14:41:55 -04:00
if atomic . CompareAndSwapInt32 ( & p . health . status , diskHealthOK , diskHealthFaulty ) {
logger . LogAlwaysIf ( ctx , fmt . Errorf ( "node(%s): taking drive %s offline: %v" , globalLocalNodeName , p . storage . String ( ) , err ) )
2023-07-25 19:58:31 -04:00
go p . monitorDiskStatus ( spent )
2023-07-13 14:41:55 -04:00
}
}
2023-08-01 13:54:26 -04:00
2023-07-13 14:41:55 -04:00
// Offset checks a bit.
time . Sleep ( time . Duration ( rng . Int63n ( int64 ( 1 * time . Second ) ) ) )
2023-08-01 13:54:26 -04:00
dctx , dcancel := context . WithCancel ( ctx )
2023-07-13 14:41:55 -04:00
started := time . Now ( )
go func ( ) {
2023-11-27 12:15:06 -05:00
timeout := time . NewTimer ( globalDriveConfig . GetMaxTimeout ( ) )
2023-07-13 14:41:55 -04:00
select {
2023-08-01 13:54:26 -04:00
case <- dctx . Done ( ) :
2023-07-13 14:41:55 -04:00
if ! timeout . Stop ( ) {
<- timeout . C
}
2023-08-01 13:54:26 -04:00
case <- timeout . C :
spent := time . Since ( started )
goOffline ( fmt . Errorf ( "unable to write+read for %v" , spent . Round ( time . Millisecond ) ) , spent )
2023-07-13 14:41:55 -04:00
}
} ( )
2023-08-01 13:54:26 -04:00
2023-07-13 14:41:55 -04:00
func ( ) {
2023-08-01 13:54:26 -04:00
defer dcancel ( )
2023-07-13 14:41:55 -04:00
err := p . storage . WriteAll ( ctx , minioMetaTmpBucket , fn , toWrite )
if err != nil {
if osErrToFileErr ( err ) == errFaultyDisk {
2023-07-25 19:58:31 -04:00
goOffline ( fmt . Errorf ( "unable to write: %w" , err ) , 0 )
2023-07-13 14:41:55 -04:00
}
return
}
b , err := p . storage . ReadAll ( context . Background ( ) , minioMetaTmpBucket , fn )
if err != nil || len ( b ) != len ( toWrite ) {
if osErrToFileErr ( err ) == errFaultyDisk {
2023-07-25 19:58:31 -04:00
goOffline ( fmt . Errorf ( "unable to read: %w" , err ) , 0 )
2023-07-13 14:41:55 -04:00
}
return
}
} ( )
2023-08-01 13:54:26 -04:00
// Continue to monitor
return true
}
for {
select {
case <- ctx . Done ( ) :
return
case <- t . C :
if ! monitor ( ) {
return
}
}
2023-07-13 14:41:55 -04:00
}
}
perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
performs a single request and returns the result. Any deadline provided on the request is
forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
// Responses from the remote server.
// Channel will be closed after an error or when the remote closes.
// All responses *must* be read by the caller until either an error is returned or the channel is closed.
// Canceling the context will cause the context cancellation error to be returned.
Responses <-chan Response
// Requests sent to the server.
// If the handler is defined with 0 incoming capacity this will be nil.
// Channel *must* be closed to signal the end of the stream.
// If the request context is canceled, the stream will no longer process requests.
Requests chan<- []byte
}
type Response struct {
Msg []byte
Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
// checkID will check if the disk ID matches the provided ID.
func ( p * xlStorageDiskIDCheck ) checkID ( wantID string ) ( err error ) {
if wantID == "" {
return nil
}
id , err := p . storage . GetDiskID ( )
if err != nil {
return err
}
if id != wantID {
return fmt . Errorf ( "disk ID %s does not match. disk reports %s" , wantID , id )
}
return nil
}
2022-03-09 14:38:54 -05:00
// diskHealthCheckOK will check if the provided error is nil
// and update disk status if good.
// For convenience a bool is returned to indicate any error state
// that is not io.EOF.
func diskHealthCheckOK ( ctx context . Context , err error ) bool {
// Check if context has a disk health check.
tracker , ok := ctx . Value ( healthDiskCtxKey { } ) . ( * healthDiskCtxValue )
if ! ok {
// No tracker, return
return err == nil || errors . Is ( err , io . EOF )
}
if err == nil || errors . Is ( err , io . EOF ) {
tracker . logSuccess ( )
return true
}
return false
}
// diskHealthWrapper provides either a io.Reader or io.Writer
// that updates status of the provided tracker.
// Use through diskHealthReader or diskHealthWriter.
type diskHealthWrapper struct {
tracker * healthDiskCtxValue
r io . Reader
w io . Writer
}
func ( d * diskHealthWrapper ) Read ( p [ ] byte ) ( int , error ) {
if d . r == nil {
return 0 , fmt . Errorf ( "diskHealthWrapper: Read with no reader" )
}
n , err := d . r . Read ( p )
if err == nil || err == io . EOF && n > 0 {
d . tracker . logSuccess ( )
}
return n , err
}
func ( d * diskHealthWrapper ) Write ( p [ ] byte ) ( int , error ) {
if d . w == nil {
return 0 , fmt . Errorf ( "diskHealthWrapper: Write with no writer" )
}
n , err := d . w . Write ( p )
if err == nil && n == len ( p ) {
d . tracker . logSuccess ( )
}
return n , err
}
// diskHealthReader provides a wrapper that will update disk health on
// ctx, on every successful read.
// This should only be used directly at the os/syscall level,
// otherwise buffered operations may return false health checks.
func diskHealthReader ( ctx context . Context , r io . Reader ) io . Reader {
// Check if context has a disk health check.
tracker , ok := ctx . Value ( healthDiskCtxKey { } ) . ( * healthDiskCtxValue )
if ! ok {
// No need to wrap
return r
}
return & diskHealthWrapper { r : r , tracker : tracker }
}
// diskHealthWriter provides a wrapper that will update disk health on
// ctx, on every successful write.
// This should only be used directly at the os/syscall level,
// otherwise buffered operations may return false health checks.
func diskHealthWriter ( ctx context . Context , w io . Writer ) io . Writer {
// Check if context has a disk health check.
tracker , ok := ctx . Value ( healthDiskCtxKey { } ) . ( * healthDiskCtxValue )
if ! ok {
// No need to wrap
return w
}
return & diskHealthWrapper { w : w , tracker : tracker }
}