mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
optimize startup sequence performance (#19009)
- bucket metadata does not need to look for legacy things anymore if b.Created is non-zero - stagger bucket metadata loads across lots of nodes to avoid the current thundering herd problem. - Remove deadlines for RenameData, RenameFile - these calls should not ever be timed out and should wait until completion or wait for client timeout. Do not choose timeouts for applications during the WRITE phase. - increase R/W buffer size, increase maxMergeMessages to 30
This commit is contained in:
parent
7ec43bd177
commit
035a3ea4ae
@ -22,6 +22,7 @@ import (
|
|||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -482,11 +483,11 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
|||||||
for index := range buckets {
|
for index := range buckets {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
_, _ = sys.objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
|
// Sleep and stagger to avoid blocked CPU and thundering
|
||||||
// Ensure heal opts for bucket metadata be deep healed all the time.
|
// herd upon start up sequence.
|
||||||
ScanMode: madmin.HealDeepScan,
|
time.Sleep(25*time.Millisecond + time.Duration(rand.Int63n(int64(100*time.Millisecond))))
|
||||||
Recreate: true,
|
|
||||||
})
|
_, _ = sys.objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true})
|
||||||
meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[index].Name)
|
meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[index].Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -182,26 +182,34 @@ func loadBucketMetadataParse(ctx context.Context, objectAPI ObjectLayer, bucket
|
|||||||
b.defaultTimestamps()
|
b.defaultTimestamps()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If bucket metadata is missing look for legacy files,
|
||||||
|
// since we only ever had b.Created as non-zero when
|
||||||
|
// migration was complete in 2020-May release. So this
|
||||||
|
// a check to avoid migrating for buckets that already
|
||||||
|
// have this field set.
|
||||||
|
if b.Created.IsZero() {
|
||||||
configs, err := b.getAllLegacyConfigs(ctx, objectAPI)
|
configs, err := b.getAllLegacyConfigs(ctx, objectAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(configs) == 0 {
|
if len(configs) > 0 {
|
||||||
|
// Old bucket without bucket metadata. Hence we migrate existing settings.
|
||||||
|
if err = b.convertLegacyConfigs(ctx, objectAPI, configs); err != nil {
|
||||||
|
return b, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if parse {
|
if parse {
|
||||||
// nothing to update, parse and proceed.
|
// nothing to update, parse and proceed.
|
||||||
err = b.parseAllConfigs(ctx, objectAPI)
|
if err = b.parseAllConfigs(ctx, objectAPI); err != nil {
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Old bucket without bucket metadata. Hence we migrate existing settings.
|
|
||||||
err = b.convertLegacyConfigs(ctx, objectAPI, configs)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// migrate unencrypted remote targets
|
// migrate unencrypted remote targets
|
||||||
if err := b.migrateTargetConfig(ctx, objectAPI); err != nil {
|
if err = b.migrateTargetConfig(ctx, objectAPI); err != nil {
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -331,7 +339,7 @@ func (b *BucketMetadata) getAllLegacyConfigs(ctx context.Context, objectAPI Obje
|
|||||||
for _, legacyFile := range legacyConfigs {
|
for _, legacyFile := range legacyConfigs {
|
||||||
configFile := path.Join(bucketMetaPrefix, b.Name, legacyFile)
|
configFile := path.Join(bucketMetaPrefix, b.Name, legacyFile)
|
||||||
|
|
||||||
configData, err := readConfig(ctx, objectAPI, configFile)
|
configData, info, err := readConfigWithMetadata(ctx, objectAPI, configFile, ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(ObjectExistsAsDirectory); ok {
|
if _, ok := err.(ObjectExistsAsDirectory); ok {
|
||||||
// in FS mode it possible that we have actual
|
// in FS mode it possible that we have actual
|
||||||
@ -346,6 +354,7 @@ func (b *BucketMetadata) getAllLegacyConfigs(ctx context.Context, objectAPI Obje
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
configs[legacyFile] = configData
|
configs[legacyFile] = configData
|
||||||
|
b.Created = info.ModTime
|
||||||
}
|
}
|
||||||
|
|
||||||
return configs, nil
|
return configs, nil
|
||||||
|
@ -1958,11 +1958,7 @@ func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||||
// Attempt heal on the bucket metadata, ignore any failures
|
// .metadata.bin healing is not needed here, it is automatically healed via read() call.
|
||||||
hopts := opts
|
|
||||||
hopts.Recreate = false
|
|
||||||
defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", hopts)
|
|
||||||
|
|
||||||
return z.s3Peer.HealBucket(ctx, bucket, opts)
|
return z.s3Peer.HealBucket(ctx, bucket, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +313,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
|||||||
|
|
||||||
infop, err := storageDiskInfoHandler.Call(ctx, client.gridConn, &opts)
|
infop, err := storageDiskInfoHandler.Call(ctx, client.gridConn, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, toStorageErr(err)
|
||||||
}
|
}
|
||||||
info = *infop
|
info = *infop
|
||||||
if info.Error != "" {
|
if info.Error != "" {
|
||||||
@ -442,10 +442,6 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
|||||||
|
|
||||||
// RenameData - rename source path to destination path atomically, metadata and data file.
|
// RenameData - rename source path to destination path atomically, metadata and data file.
|
||||||
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
|
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
|
||||||
// Set a very long timeout for rename data.
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
resp, err := storageRenameDataHandler.Call(ctx, client.gridConn, &RenameDataHandlerParams{
|
resp, err := storageRenameDataHandler.Call(ctx, client.gridConn, &RenameDataHandlerParams{
|
||||||
DiskID: client.diskID,
|
DiskID: client.diskID,
|
||||||
SrcVolume: srcVolume,
|
SrcVolume: srcVolume,
|
||||||
@ -704,10 +700,6 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||||||
|
|
||||||
// RenameFile - renames a file.
|
// RenameFile - renames a file.
|
||||||
func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||||
// Set a very long timeout for rename file
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
_, err = storageRenameFileHandler.Call(ctx, client.gridConn, &RenameFileHandlerParams{
|
_, err = storageRenameFileHandler.Call(ctx, client.gridConn, &RenameFileHandlerParams{
|
||||||
DiskID: client.diskID,
|
DiskID: client.diskID,
|
||||||
SrcVolume: srcVolume,
|
SrcVolume: srcVolume,
|
||||||
|
@ -175,8 +175,8 @@ func (c ContextDialer) DialContext(ctx context.Context, network, address string)
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
defaultOutQueue = 10000
|
defaultOutQueue = 10000
|
||||||
readBufferSize = 16 << 10
|
readBufferSize = 32 << 10 // 32 KiB is the most optimal on Linux
|
||||||
writeBufferSize = 16 << 10
|
writeBufferSize = 32 << 10 // 32 KiB is the most optimal on Linux
|
||||||
defaultDialTimeout = 2 * time.Second
|
defaultDialTimeout = 2 * time.Second
|
||||||
connPingInterval = 10 * time.Second
|
connPingInterval = 10 * time.Second
|
||||||
connWriteTimeout = 3 * time.Second
|
connWriteTimeout = 3 * time.Second
|
||||||
@ -654,7 +654,7 @@ func (c *Connection) connect() {
|
|||||||
fmt.Printf("%v Connecting to %v: %v. Retrying.\n", c.Local, toDial, err)
|
fmt.Printf("%v Connecting to %v: %v. Retrying.\n", c.Local, toDial, err)
|
||||||
}
|
}
|
||||||
sleep := defaultDialTimeout + time.Duration(rng.Int63n(int64(defaultDialTimeout)))
|
sleep := defaultDialTimeout + time.Duration(rng.Int63n(int64(defaultDialTimeout)))
|
||||||
next := dialStarted.Add(sleep)
|
next := dialStarted.Add(sleep / 2)
|
||||||
sleep = time.Until(next).Round(time.Millisecond)
|
sleep = time.Until(next).Round(time.Millisecond)
|
||||||
if sleep < 0 {
|
if sleep < 0 {
|
||||||
sleep = 0
|
sleep = 0
|
||||||
@ -950,7 +950,7 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
|||||||
cancel(ErrDisconnected)
|
cancel(ErrDisconnected)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if cap(msg) > readBufferSize*8 {
|
if cap(msg) > readBufferSize*4 {
|
||||||
// Don't keep too much memory around.
|
// Don't keep too much memory around.
|
||||||
msg = nil
|
msg = nil
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ const (
|
|||||||
maxBufferSize = 64 << 10
|
maxBufferSize = 64 << 10
|
||||||
|
|
||||||
// If there is a queue, merge up to this many messages.
|
// If there is a queue, merge up to this many messages.
|
||||||
maxMergeMessages = 20
|
maxMergeMessages = 30
|
||||||
|
|
||||||
// clientPingInterval will ping the remote handler every 15 seconds.
|
// clientPingInterval will ping the remote handler every 15 seconds.
|
||||||
// Clients disconnect when we exceed 2 intervals.
|
// Clients disconnect when we exceed 2 intervals.
|
||||||
|
Loading…
Reference in New Issue
Block a user