feat: drive sub-sys to max timeout reload (#18501)

This commit is contained in:
jiuker 2023-11-28 01:15:06 +08:00 committed by GitHub
parent 506f121576
commit be02333529
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 197 additions and 57 deletions

View File

@ -95,7 +95,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i
h := algo.New() h := algo.New()
bw := &streamingBitrotWriter{ bw := &streamingBitrotWriter{
iow: ioutil.NewDeadlineWriter(w, diskMaxTimeout), iow: ioutil.NewDeadlineWriter(w, globalDriveConfig.GetMaxTimeout()),
closeWithErr: w.CloseWithError, closeWithErr: w.CloseWithError,
h: h, h: h,
shardSize: shardSize, shardSize: shardSize,

View File

@ -31,6 +31,7 @@ import (
"github.com/minio/minio/internal/config/callhome" "github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress" "github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/drive"
"github.com/minio/minio/internal/config/etcd" "github.com/minio/minio/internal/config/etcd"
"github.com/minio/minio/internal/config/heal" "github.com/minio/minio/internal/config/heal"
xldap "github.com/minio/minio/internal/config/identity/ldap" xldap "github.com/minio/minio/internal/config/identity/ldap"
@ -69,6 +70,7 @@ func initHelp() {
config.ScannerSubSys: scanner.DefaultKVS, config.ScannerSubSys: scanner.DefaultKVS,
config.SubnetSubSys: subnet.DefaultKVS, config.SubnetSubSys: subnet.DefaultKVS,
config.CallhomeSubSys: callhome.DefaultKVS, config.CallhomeSubSys: callhome.DefaultKVS,
config.DriveSubSys: drive.DefaultKVS,
config.CacheSubSys: cache.DefaultKVS, config.CacheSubSys: cache.DefaultKVS,
} }
for k, v := range notify.DefaultNotificationKVS { for k, v := range notify.DefaultNotificationKVS {
@ -97,6 +99,10 @@ func initHelp() {
Description: "enable callhome to MinIO SUBNET", Description: "enable callhome to MinIO SUBNET",
Optional: true, Optional: true,
}, },
config.HelpKV{
Key: config.DriveSubSys,
Description: "enable drive specific settings",
},
config.HelpKV{ config.HelpKV{
Key: config.SiteSubSys, Key: config.SiteSubSys,
Description: "label the server and its location", Description: "label the server and its location",
@ -258,6 +264,7 @@ func initHelp() {
config.LambdaWebhookSubSys: lambda.HelpWebhook, config.LambdaWebhookSubSys: lambda.HelpWebhook,
config.SubnetSubSys: subnet.HelpSubnet, config.SubnetSubSys: subnet.HelpSubnet,
config.CallhomeSubSys: callhome.HelpCallhome, config.CallhomeSubSys: callhome.HelpCallhome,
config.DriveSubSys: drive.HelpDrive,
config.CacheSubSys: cache.Help, config.CacheSubSys: cache.Help,
} }
@ -366,6 +373,10 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o
if cfg.Enabled() && !globalSubnetConfig.Registered() { if cfg.Enabled() && !globalSubnetConfig.Registered() {
return errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc license register ALIAS'") return errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc license register ALIAS'")
} }
case config.DriveSubSys:
if _, err := drive.LookupConfig(s[config.DriveSubSys][config.Default]); err != nil {
return err
}
case config.CacheSubSys: case config.CacheSubSys:
if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport); err != nil { if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport); err != nil {
return err return err
@ -645,6 +656,15 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
initCallhome(ctx, objAPI) initCallhome(ctx, objAPI)
} }
} }
case config.DriveSubSys:
if driveConfig, err := drive.LookupConfig(s[config.DriveSubSys][config.Default]); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to load drive config: %w", err))
} else {
err := globalDriveConfig.Update(driveConfig)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update drive config: %v", err))
}
}
case config.CacheSubSys: case config.CacheSubSys:
cacheCfg, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport) cacheCfg, err := cache.LookupConfig(s[config.CacheSubSys][config.Default], globalRemoteTargetTransport)
if err != nil { if err != nil {

View File

@ -204,7 +204,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
if err != nil { if err != nil {
return nil return nil
} }
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx) wait := deletedCleanupSleeper.Timer(ctx)
if now.Sub(fi.ModTime) > expiry { if now.Sub(fi.ModTime) > expiry {
@ -218,7 +218,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
if err != nil { if err != nil {
return nil return nil
} }
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx) wait := deletedCleanupSleeper.Timer(ctx)
if now.Sub(vi.Created) > expiry { if now.Sub(vi.Created) > expiry {
@ -239,7 +239,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
if err != nil { if err != nil {
return nil return nil
} }
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx) wait := deletedCleanupSleeper.Timer(ctx)
if now.Sub(vi.Created) > expiry { if now.Sub(vi.Created) > expiry {

View File

@ -342,7 +342,7 @@ func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
defer wg.Done() defer wg.Done()
diskPath := disk.Endpoint().Path diskPath := disk.Endpoint().Path
readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error { readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error {
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx) wait := deletedCleanupSleeper.Timer(ctx)
removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir)) removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir))

View File

@ -42,6 +42,7 @@ import (
"github.com/minio/minio/internal/config/callhome" "github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress" "github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/drive"
idplugin "github.com/minio/minio/internal/config/identity/plugin" idplugin "github.com/minio/minio/internal/config/identity/plugin"
polplugin "github.com/minio/minio/internal/config/policy/plugin" polplugin "github.com/minio/minio/internal/config/policy/plugin"
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
@ -243,6 +244,9 @@ var (
// The global callhome config // The global callhome config
globalCallhomeConfig callhome.Config globalCallhomeConfig callhome.Config
// The global drive config
globalDriveConfig drive.Config
// The global cache config // The global cache config
globalCacheConfig cache.Config globalCacheConfig cache.Config

View File

@ -351,7 +351,7 @@ func (p *xlStorageDiskIDCheck) MakeVolBulk(ctx context.Context, volumes ...strin
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.MakeVolBulk(ctx, volumes...) }) return w.Run(func() error { return p.storage.MakeVolBulk(ctx, volumes...) })
} }
@ -362,7 +362,7 @@ func (p *xlStorageDiskIDCheck) MakeVol(ctx context.Context, volume string) (err
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.MakeVol(ctx, volume) }) return w.Run(func() error { return p.storage.MakeVol(ctx, volume) })
} }
@ -383,7 +383,7 @@ func (p *xlStorageDiskIDCheck) StatVol(ctx context.Context, volume string) (vol
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
err = w.Run(func() error { err = w.Run(func() error {
var ierr error var ierr error
vol, ierr = p.storage.StatVol(ctx, volume) vol, ierr = p.storage.StatVol(ctx, volume)
@ -399,7 +399,7 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.DeleteVol(ctx, volume, forceDelete) }) return w.Run(func() error { return p.storage.DeleteVol(ctx, volume, forceDelete) })
} }
@ -421,7 +421,7 @@ func (p *xlStorageDiskIDCheck) ReadFile(ctx context.Context, volume string, path
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
err = w.Run(func() error { err = w.Run(func() error {
n, err = p.storage.ReadFile(ctx, volume, path, offset, buf, verifier) n, err = p.storage.ReadFile(ctx, volume, path, offset, buf, verifier)
return err return err
@ -438,7 +438,7 @@ func (p *xlStorageDiskIDCheck) AppendFile(ctx context.Context, volume string, pa
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return w.Run(func() error {
return p.storage.AppendFile(ctx, volume, path, buf) return p.storage.AppendFile(ctx, volume, path, buf)
}) })
@ -451,7 +451,7 @@ func (p *xlStorageDiskIDCheck) CreateFile(ctx context.Context, volume, path stri
} }
defer done(&err) defer done(&err)
return p.storage.CreateFile(ctx, volume, path, size, xioutil.NewDeadlineReader(io.NopCloser(reader), diskMaxTimeout)) return p.storage.CreateFile(ctx, volume, path, size, xioutil.NewDeadlineReader(io.NopCloser(reader), globalDriveConfig.GetMaxTimeout()))
} }
func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) { func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error) {
@ -461,7 +461,7 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
var rc io.ReadCloser var rc io.ReadCloser
err = w.Run(func() error { err = w.Run(func() error {
@ -473,7 +473,7 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
return nil, err return nil, err
} }
return xioutil.NewDeadlineReader(rc, diskMaxTimeout), nil return xioutil.NewDeadlineReader(rc, globalDriveConfig.GetMaxTimeout()), nil
} }
func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {
@ -483,7 +483,7 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) }) return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) })
} }
@ -494,7 +494,7 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
err = w.Run(func() error { err = w.Run(func() error {
var ierr error var ierr error
sign, ierr = p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath) sign, ierr = p.storage.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath)
@ -510,7 +510,7 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.CheckParts(ctx, volume, path, fi) }) return w.Run(func() error { return p.storage.CheckParts(ctx, volume, path, fi) })
} }
@ -521,7 +521,7 @@ func (p *xlStorageDiskIDCheck) Delete(ctx context.Context, volume string, path s
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.Delete(ctx, volume, path, deleteOpts) }) return w.Run(func() error { return p.storage.Delete(ctx, volume, path, deleteOpts) })
} }
@ -571,7 +571,7 @@ func (p *xlStorageDiskIDCheck) WriteAll(ctx context.Context, volume string, path
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.WriteAll(ctx, volume, path, b) }) return w.Run(func() error { return p.storage.WriteAll(ctx, volume, path, b) })
} }
@ -582,7 +582,7 @@ func (p *xlStorageDiskIDCheck) DeleteVersion(ctx context.Context, volume, path s
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) }) return w.Run(func() error { return p.storage.DeleteVersion(ctx, volume, path, fi, forceDelMarker) })
} }
@ -593,7 +593,7 @@ func (p *xlStorageDiskIDCheck) UpdateMetadata(ctx context.Context, volume, path
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi, opts) }) return w.Run(func() error { return p.storage.UpdateMetadata(ctx, volume, path, fi, opts) })
} }
@ -604,7 +604,7 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) }) return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) })
} }
@ -615,7 +615,7 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
rerr := w.Run(func() error { rerr := w.Run(func() error {
fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, opts) fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, opts)
return err return err
@ -633,7 +633,7 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
rerr := w.Run(func() error { rerr := w.Run(func() error {
buf, err = p.storage.ReadAll(ctx, volume, path) buf, err = p.storage.ReadAll(ctx, volume, path)
return err return err
@ -651,7 +651,7 @@ func (p *xlStorageDiskIDCheck) ReadXL(ctx context.Context, volume string, path s
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
rerr := w.Run(func() error { rerr := w.Run(func() error {
rf, err = p.storage.ReadXL(ctx, volume, path, readData) rf, err = p.storage.ReadXL(ctx, volume, path, readData)
return err return err
@ -696,7 +696,7 @@ func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume st
} }
defer done(&err) defer done(&err)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.CleanAbandonedData(ctx, volume, path) }) return w.Run(func() error { return p.storage.CleanAbandonedData(ctx, volume, path) })
} }
@ -769,27 +769,10 @@ const (
diskHealthFaulty diskHealthFaulty
) )
// diskMaxTimeoutOperation maximum wait time before we consider a drive
// offline under active monitoring.
var diskMaxTimeout = 2 * time.Minute
// diskActiveMonitoring indicates if we have enabled "active" disk monitoring // diskActiveMonitoring indicates if we have enabled "active" disk monitoring
var diskActiveMonitoring = true var diskActiveMonitoring = true
func init() { func init() {
d := env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "")
if d == "" {
d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
}
if d != "" {
timeoutOperation, _ := time.ParseDuration(d)
if timeoutOperation < time.Second {
logger.Info("invalid _MINIO_DISK_MAX_TIMEOUT value: %s, minimum allowed is 1s, defaulting to '2 minutes'", d)
} else {
diskMaxTimeout = timeoutOperation
}
}
diskActiveMonitoring = (env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn) || diskActiveMonitoring = (env.Get("_MINIO_DRIVE_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn) ||
(env.Get("_MINIO_DISK_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn) (env.Get("_MINIO_DISK_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn)
} }
@ -1023,19 +1006,19 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
// if disk max timeout is smaller than checkEvery window // if disk max timeout is smaller than checkEvery window
// reduce checks by a second. // reduce checks by a second.
if diskMaxTimeout <= checkEvery { if globalDriveConfig.GetMaxTimeout() <= checkEvery {
checkEvery = diskMaxTimeout - time.Second checkEvery = globalDriveConfig.GetMaxTimeout() - time.Second
if checkEvery <= 0 { if checkEvery <= 0 {
checkEvery = diskMaxTimeout checkEvery = globalDriveConfig.GetMaxTimeout()
} }
} }
// if disk max timeout is smaller than skipIfSuccessBefore window // if disk max timeout is smaller than skipIfSuccessBefore window
// reduce the skipIfSuccessBefore by a second. // reduce the skipIfSuccessBefore by a second.
if diskMaxTimeout <= skipIfSuccessBefore { if globalDriveConfig.GetMaxTimeout() <= skipIfSuccessBefore {
skipIfSuccessBefore = diskMaxTimeout - time.Second skipIfSuccessBefore = globalDriveConfig.GetMaxTimeout() - time.Second
if skipIfSuccessBefore <= 0 { if skipIfSuccessBefore <= 0 {
skipIfSuccessBefore = diskMaxTimeout skipIfSuccessBefore = globalDriveConfig.GetMaxTimeout()
} }
} }
@ -1074,7 +1057,7 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
dctx, dcancel := context.WithCancel(ctx) dctx, dcancel := context.WithCancel(ctx)
started := time.Now() started := time.Now()
go func() { go func() {
timeout := time.NewTimer(diskMaxTimeout) timeout := time.NewTimer(globalDriveConfig.GetMaxTimeout())
select { select {
case <-dctx.Done(): case <-dctx.Done():
if !timeout.Stop() { if !timeout.Stop() {

View File

@ -447,7 +447,7 @@ func (s *xlStorage) readMetadataWithDMTime(ctx context.Context, itemPath string)
func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) { func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte, error) {
var buf []byte var buf []byte
err := xioutil.NewDeadlineWorker(diskMaxTimeout).Run(func() error { err := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout()).Run(func() error {
var rerr error var rerr error
buf, _, rerr = s.readMetadataWithDMTime(ctx, itemPath) buf, _, rerr = s.readMetadataWithDMTime(ctx, itemPath)
return rerr return rerr
@ -1085,7 +1085,7 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
errs[i] = ctx.Err() errs[i] = ctx.Err()
continue continue
} }
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
if err := w.Run(func() error { return s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...) }); err != nil { if err := w.Run(func() error { return s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...) }); err != nil {
errs[i] = err errs[i] = err
} }
@ -2751,7 +2751,7 @@ func (s *xlStorage) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp
var data []byte var data []byte
var mt time.Time var mt time.Time
fullPath := pathJoin(volumeDir, req.Prefix, f) fullPath := pathJoin(volumeDir, req.Prefix, f)
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
if err := w.Run(func() (err error) { if err := w.Run(func() (err error) {
if req.MetadataOnly { if req.MetadataOnly {
data, mt, err = s.readMetadataWithDMTime(ctx, fullPath) data, mt, err = s.readMetadataWithDMTime(ctx, fullPath)

2
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/minio/dperf v0.5.2 github.com/minio/dperf v0.5.2
github.com/minio/highwayhash v1.0.2 github.com/minio/highwayhash v1.0.2
github.com/minio/kes-go v0.2.0 github.com/minio/kes-go v0.2.0
github.com/minio/madmin-go/v3 v3.0.32 github.com/minio/madmin-go/v3 v3.0.33
github.com/minio/minio-go/v7 v7.0.64 github.com/minio/minio-go/v7 v7.0.64
github.com/minio/mux v1.9.0 github.com/minio/mux v1.9.0
github.com/minio/pkg/v2 v2.0.3-0.20231107172951-8a60b89ec9b4 github.com/minio/pkg/v2 v2.0.3-0.20231107172951-8a60b89ec9b4

4
go.sum
View File

@ -484,8 +484,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/kes-go v0.2.0 h1:HA33arq9s3MErbsj3PAXFVfFo4U4yw7lTKQ5kWFrpCA= github.com/minio/kes-go v0.2.0 h1:HA33arq9s3MErbsj3PAXFVfFo4U4yw7lTKQ5kWFrpCA=
github.com/minio/kes-go v0.2.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo= github.com/minio/kes-go v0.2.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo=
github.com/minio/madmin-go/v3 v3.0.32 h1:2O0Up8V/R3eLRQl7wNj+fwoUCRiagYmsGVNUm5V72UA= github.com/minio/madmin-go/v3 v3.0.33 h1:KBVDC2KVkyM/MyvXuSpO09JJ3N+4kXh0hmPC4kcGd6Q=
github.com/minio/madmin-go/v3 v3.0.32/go.mod h1:4QN2NftLSV7MdlT50dkrenOMmNVHluxTvlqJou3hte8= github.com/minio/madmin-go/v3 v3.0.33/go.mod h1:4QN2NftLSV7MdlT50dkrenOMmNVHluxTvlqJou3hte8=
github.com/minio/mc v0.0.0-20231030184332-9f2fb2b6a9f8 h1:3WUMQABG8FytpYHRtLHjrnztcUB09hlIrh7rQI9H+tY= github.com/minio/mc v0.0.0-20231030184332-9f2fb2b6a9f8 h1:3WUMQABG8FytpYHRtLHjrnztcUB09hlIrh7rQI9H+tY=
github.com/minio/mc v0.0.0-20231030184332-9f2fb2b6a9f8/go.mod h1:SoPU55ntH5d6IEq6jRBn6e/7SpwI/eSNdBDWmH7nwHk= github.com/minio/mc v0.0.0-20231030184332-9f2fb2b6a9f8/go.mod h1:SoPU55ntH5d6IEq6jRBn6e/7SpwI/eSNdBDWmH7nwHk=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=

View File

@ -117,7 +117,7 @@ const (
CrawlerSubSys = madmin.CrawlerSubSys CrawlerSubSys = madmin.CrawlerSubSys
SubnetSubSys = madmin.SubnetSubSys SubnetSubSys = madmin.SubnetSubSys
CallhomeSubSys = madmin.CallhomeSubSys CallhomeSubSys = madmin.CallhomeSubSys
DriveSubSys = madmin.DriveSubSys
// Add new constants here (similar to above) if you add new fields to config. // Add new constants here (similar to above) if you add new fields to config.
) )
@ -179,6 +179,7 @@ var SubSystemsDynamic = set.CreateStringSet(
HealSubSys, HealSubSys,
SubnetSubSys, SubnetSubSys,
CallhomeSubSys, CallhomeSubSys,
DriveSubSys,
LoggerWebhookSubSys, LoggerWebhookSubSys,
AuditWebhookSubSys, AuditWebhookSubSys,
AuditKafkaSubSys, AuditKafkaSubSys,
@ -204,6 +205,7 @@ var SubSystemsSingleTargets = set.CreateStringSet(
ScannerSubSys, ScannerSubSys,
SubnetSubSys, SubnetSubSys,
CallhomeSubSys, CallhomeSubSys,
DriveSubSys,
CacheSubSys, CacheSubSys,
) )

View File

@ -0,0 +1,97 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package drive
import (
"sync"
"time"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/v2/env"
)
// DefaultKVS - default KVS for drive
var DefaultKVS = config.KVS{
config.KV{
Key: MaxTimeout,
Value: "",
},
}
// Config represents the subnet related configuration
type Config struct {
// MaxTimeout - maximum timeout for a drive operation
MaxTimeout time.Duration `json:"maxTimeout"`
mutex sync.RWMutex
}
// Update - updates the config with latest values
func (c *Config) Update(new *Config) error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.MaxTimeout = getMaxTimeout(new.MaxTimeout)
return nil
}
// GetMaxTimeout - returns the max timeout value.
func (c *Config) GetMaxTimeout() time.Duration {
c.mutex.RLock()
defer c.mutex.RUnlock()
return getMaxTimeout(c.MaxTimeout)
}
// LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg *Config, err error) {
if err = config.CheckValidKeys(config.DriveSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
// if not set. Get default value from environment
d := kvs.GetWithDefault(MaxTimeout, DefaultKVS)
if d == "" {
d = env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "")
if d == "" {
d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
}
}
cfg = &Config{
mutex: sync.RWMutex{},
}
dur, _ := time.ParseDuration(d)
if dur < time.Second {
cfg.MaxTimeout = time.Minute * 2
} else {
cfg.MaxTimeout = getMaxTimeout(dur)
}
return cfg, err
}
func getMaxTimeout(t time.Duration) time.Duration {
if t < time.Second {
// get default value
d := env.Get("_MINIO_DRIVE_MAX_TIMEOUT", "")
if d == "" {
d = env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
}
dur, _ := time.ParseDuration(d)
if dur < time.Second {
return time.Minute * 2
}
return dur
}
return t
}

View File

@ -0,0 +1,34 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package drive
import "github.com/minio/minio/internal/config"
var (
// MaxTimeout is the max timeout for drive
MaxTimeout = "max_timeout"
// HelpDrive is help for drive
HelpDrive = config.HelpKVS{
config.HelpKV{
Key: MaxTimeout,
Type: "string",
Description: "set per call max_timeout for the drive, defaults to 2 minutes",
Optional: true,
},
}
)