avoid reload of 'format.json' over the network under normal conditions (#18842)

This commit is contained in:
Harshavardhana 2024-01-23 14:11:46 -08:00 committed by GitHub
parent 961f7dea82
commit 52229a21cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 124 additions and 44 deletions

View File

@ -376,7 +376,7 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
var newDiskHealingTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint) error {
disk, format, err := connectEndpoint(endpoint)
disk, format, _, err := connectEndpoint(endpoint)
if err != nil {
return fmt.Errorf("Error: %w, %s", err, endpoint)
}

View File

@ -114,26 +114,26 @@ func (s *erasureSets) getDiskMap() map[Endpoint]StorageAPI {
// Initializes a new StorageAPI from the endpoint argument, returns
// StorageAPI and also `format` which exists on the disk.
func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, []byte, error) {
disk, err := newStorageAPI(endpoint, storageOpts{
cleanUp: false,
healthCheck: false,
})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
format, err := loadFormatErasure(disk)
format, formatData, err := loadFormatErasureWithData(disk)
if err != nil {
if errors.Is(err, errUnformattedDisk) {
info, derr := disk.DiskInfo(context.TODO(), false)
if derr != nil && info.RootDisk {
disk.Close()
return nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)
return nil, nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)
}
}
disk.Close()
return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
return nil, nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
}
disk.Close()
@ -142,10 +142,10 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
healthCheck: true,
})
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
return disk, format, nil
return disk, format, formatData, nil
}
// findDiskIndex - returns the i,j'th position of the input `diskID` against the reference
@ -226,7 +226,7 @@ func (s *erasureSets) connectDisks() {
wg.Add(1)
go func(endpoint Endpoint) {
defer wg.Done()
disk, format, err := connectEndpoint(endpoint)
disk, format, formatData, err := connectEndpoint(endpoint)
if err != nil {
if endpoint.IsLocal && errors.Is(err, errUnformattedDisk) {
globalBackgroundHealState.pushHealLocalDisks(endpoint)
@ -261,6 +261,7 @@ func (s *erasureSets) connectDisks() {
disk.SetDiskID(format.Erasure.This)
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
disk.SetFormatData(formatData)
s.erasureDisks[setIndex][diskIndex] = disk
s.erasureDisksMu.Unlock()
@ -1065,7 +1066,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
// HealFormat - heals missing `format.json` on fresh unformatted disks.
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{
cleanUp: true,
cleanUp: false,
healthCheck: false,
})

View File

@ -325,7 +325,7 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
if storageDisks[index] == nil {
return errDiskNotFound
}
format, err := loadFormatErasure(storageDisks[index])
format, formatData, err := loadFormatErasureWithData(storageDisks[index])
if err != nil {
return err
}
@ -339,6 +339,7 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
// If no healing required, make the disks valid and
// online.
storageDisks[index].SetDiskID(format.Erasure.This)
storageDisks[index].SetFormatData(formatData)
}
return nil
}, index)
@ -354,7 +355,7 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
}
// Marshal and write to disk.
formatBytes, err := json.Marshal(format)
formatData, err := json.Marshal(format)
if err != nil {
return err
}
@ -368,7 +369,7 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
})
// write to unique file.
if err = disk.WriteAll(context.TODO(), minioMetaBucket, tmpFormat, formatBytes); err != nil {
if err = disk.WriteAll(context.TODO(), minioMetaBucket, tmpFormat, formatData); err != nil {
return err
}
@ -378,6 +379,7 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
}
disk.SetDiskID(format.Erasure.This)
disk.SetFormatData(formatData)
if healID != "" {
ctx := context.Background()
ht := initHealingTracker(disk, healID)
@ -386,6 +388,35 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
return nil
}
// loadFormatErasureWithData - loads format.json from disk.
func loadFormatErasureWithData(disk StorageAPI) (format *formatErasureV3, data []byte, err error) {
// Ensure that the grid is online.
if _, err := disk.DiskInfo(context.Background(), false); err != nil {
if errors.Is(err, errDiskNotFound) {
return nil, nil, err
}
}
data, err = disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile)
if err != nil {
// 'file not found' and 'volume not found' as
// same. 'volume not found' usually means its a fresh disk.
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
return nil, nil, errUnformattedDisk
}
return nil, nil, err
}
// Try to decode format json into formatConfigV1 struct.
format = &formatErasureV3{}
if err = json.Unmarshal(data, format); err != nil {
return nil, nil, err
}
// Success.
return format, data, nil
}
// loadFormatErasure - loads format.json from disk.
func loadFormatErasure(disk StorageAPI) (format *formatErasureV3, err error) {
// Ensure that the grid is online.
@ -480,9 +511,7 @@ func formatErasureGetDeploymentID(refFormat *formatErasureV3, formats []*formatE
}
// formatErasureFixDeploymentID - Add deployment id if it is not present.
func formatErasureFixDeploymentID(endpoints Endpoints, storageDisks []StorageAPI, refFormat *formatErasureV3) (err error) {
// Attempt to load all `format.json` from all disks.
formats, _ := loadFormatErasureAll(storageDisks, false)
func formatErasureFixDeploymentID(endpoints Endpoints, storageDisks []StorageAPI, refFormat *formatErasureV3, formats []*formatErasureV3) (err error) {
for index := range formats {
// If the Erasure sets do not match, set those formats to nil,
// We do not have to update the ID on those format.json file.

View File

@ -108,6 +108,10 @@ func (d *naughtyDisk) GetDiskID() (string, error) {
return d.disk.GetDiskID()
}
func (d *naughtyDisk) SetFormatData(b []byte) {
d.disk.SetFormatData(b)
}
func (d *naughtyDisk) SetDiskID(id string) {
d.disk.SetDiskID(id)
}

View File

@ -83,21 +83,21 @@ func (s *peerS3Server) HealthHandler(w http.ResponseWriter, r *http.Request) {
func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
globalLocalDrivesMu.RLock()
globalLocalDrives := globalLocalDrives
localDrives := globalLocalDrives
globalLocalDrivesMu.RUnlock()
// Initialize sync waitgroup.
g := errgroup.WithNErrs(len(globalLocalDrives))
g := errgroup.WithNErrs(len(localDrives))
// Disk states slices
beforeState := make([]string, len(globalLocalDrives))
afterState := make([]string, len(globalLocalDrives))
beforeState := make([]string, len(localDrives))
afterState := make([]string, len(localDrives))
// Make a volume entry on all underlying storage disks.
for index := range globalLocalDrives {
for index := range localDrives {
index := index
g.Go(func() (serr error) {
if globalLocalDrives[index] == nil {
if localDrives[index] == nil {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return errDiskNotFound
@ -110,7 +110,7 @@ func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (
return nil
}
_, serr = globalLocalDrives[index].StatVol(ctx, bucket)
_, serr = localDrives[index].StatVol(ctx, bucket)
if serr != nil {
if serr == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline
@ -138,7 +138,7 @@ func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (
res = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: len(globalLocalDrives),
DiskCount: len(localDrives),
SetCount: -1, // explicitly set an invalid value -1, for bucket heal scenario
}
@ -150,24 +150,24 @@ func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (
for i := range beforeState {
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: globalLocalDrives[i].String(),
Endpoint: localDrives[i].String(),
State: beforeState[i],
})
}
// check dangling and delete bucket only if its not a meta bucket
if !isMinioMetaBucketName(bucket) && !isAllBucketsNotFound(errs) && opts.Remove {
g := errgroup.WithNErrs(len(globalLocalDrives))
for index := range globalLocalDrives {
g := errgroup.WithNErrs(len(localDrives))
for index := range localDrives {
index := index
g.Go(func() error {
if globalLocalDrives[index] == nil {
if localDrives[index] == nil {
return errDiskNotFound
}
err := globalLocalDrives[index].DeleteVol(ctx, bucket, false)
err := localDrives[index].DeleteVol(ctx, bucket, false)
if !errors.Is(err, errVolumeNotEmpty) {
logger.LogOnceIf(ctx, fmt.Errorf("While deleting dangling Bucket (%s), Drive %s:%s returned an error (%w)",
bucket, globalLocalDrives[index].Hostname(), globalLocalDrives[index], err), "delete-dangling-bucket-"+bucket)
bucket, localDrives[index].Hostname(), localDrives[index], err), "delete-dangling-bucket-"+bucket)
}
return nil
}, index)
@ -179,14 +179,14 @@ func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (
// Create the quorum lost volume only if its nor makred for delete
if !opts.Remove {
// Initialize sync waitgroup.
g = errgroup.WithNErrs(len(globalLocalDrives))
g = errgroup.WithNErrs(len(localDrives))
// Make a volume entry on all underlying storage disks.
for index := range globalLocalDrives {
for index := range localDrives {
index := index
g.Go(func() error {
if beforeState[index] == madmin.DriveStateMissing {
makeErr := globalLocalDrives[index].MakeVol(ctx, bucket)
makeErr := localDrives[index].MakeVol(ctx, bucket)
if makeErr == nil {
afterState[index] = madmin.DriveStateOk
}
@ -202,7 +202,7 @@ func healBucketLocal(ctx context.Context, bucket string, opts madmin.HealOpts) (
for i := range afterState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: globalLocalDrives[i].String(),
Endpoint: localDrives[i].String(),
State: afterState[i],
})
}

View File

@ -260,7 +260,7 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo
if !firstDisk {
return nil, nil, errNotFirstDisk
}
if err = formatErasureFixDeploymentID(endpoints, storageDisks, format); err != nil {
if err = formatErasureFixDeploymentID(endpoints, storageDisks, format, formatConfigs); err != nil {
logger.LogIf(GlobalContext, err)
return nil, nil, err
}

View File

@ -109,6 +109,7 @@ type StorageAPI interface {
ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error)
GetDiskLoc() (poolIdx, setIdx, diskIdx int) // Retrieve location indexes.
SetDiskLoc(poolIdx, setIdx, diskIdx int) // Set location indexes.
SetFormatData(b []byte) // Set formatData cached value
}
type unrecognizedDisk struct {
@ -151,6 +152,9 @@ func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache,
return dataUsageCache{}, errDiskNotFound
}
func (p *unrecognizedDisk) SetFormatData(b []byte) {
}
func (p *unrecognizedDisk) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return -1, -1, -1
}

View File

@ -144,10 +144,12 @@ type storageRESTClient struct {
// Indicate of NSScanner is in progress in this disk
scanning int32
endpoint Endpoint
restClient *rest.Client
gridConn *grid.Subroute
diskID string
endpoint Endpoint
restClient *rest.Client
gridConn *grid.Subroute
diskID string
formatData []byte
formatMutex sync.RWMutex
// Indexes, will be -1 until assigned a set.
poolIndex, setIndex, diskIndex int
@ -251,7 +253,24 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
return *final, nil
}
func (client *storageRESTClient) SetFormatData(b []byte) {
if client.IsOnline() {
client.formatMutex.Lock()
client.formatData = b
client.formatMutex.Unlock()
}
}
func (client *storageRESTClient) GetDiskID() (string, error) {
if !client.IsOnline() {
// make sure to check if the disk is offline, since the underlying
// value is cached we should attempt to invalidate it if such calls
// were attempted. This can lead to false success under certain conditions
// - this change attempts to avoid stale information if the underlying
// transport is already down.
return "", errDiskNotFound
}
// This call should never be over the network, this is always
// a cached value - caller should make sure to use this
// function on a fresh disk or make sure to look at the error
@ -523,6 +542,19 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
// Specific optimization to avoid re-read from the drives for `format.json`
// in-case the caller is a network operation.
if volume == minioMetaBucket && path == formatConfigFile {
client.formatMutex.RLock()
formatData := make([]byte, len(client.formatData))
copy(formatData, client.formatData)
client.formatMutex.RUnlock()
if len(formatData) > 0 {
return formatData, nil
}
}
respBody, err := client.call(ctx, storageRESTMethodReadAll, values, nil, -1)
if err != nil {
return nil, err

View File

@ -1390,6 +1390,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
}
storage := newXLStorageDiskIDCheck(xl, true)
storage.SetDiskID(xl.diskID)
// We do not have to do SetFormatData() since 'xl'
// already captures formatData cached.
globalLocalDrivesMu.Lock()
defer globalLocalDrivesMu.Unlock()

View File

@ -100,7 +100,7 @@ type xlStorageDiskIDCheck struct {
metricsCache timedValue
diskCtx context.Context
cancel context.CancelFunc
diskCancel context.CancelFunc
}
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
@ -235,7 +235,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
xl.totalDeletes.Store(xl.storage.getDeleteAttribute())
}
xl.diskCtx, xl.cancel = context.WithCancel(context.TODO())
xl.diskCtx, xl.diskCancel = context.WithCancel(context.TODO())
for i := range xl.apiLatencies[:] {
xl.apiLatencies[i] = &lockedLastMinuteLatency{}
}
@ -295,6 +295,10 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac
return p.storage.NSScanner(ctx, cache, updates, scanMode, weSleep)
}
func (p *xlStorageDiskIDCheck) SetFormatData(b []byte) {
p.storage.SetFormatData(b)
}
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return p.storage.GetDiskLoc()
}
@ -304,7 +308,7 @@ func (p *xlStorageDiskIDCheck) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
}
func (p *xlStorageDiskIDCheck) Close() error {
p.cancel()
p.diskCancel()
return p.storage.Close()
}

View File

@ -372,8 +372,6 @@ func (s *xlStorage) IsLocal() bool {
// Retrieve location indexes.
func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
s.RLock()
defer s.RUnlock()
// If unset, see if we can locate it.
if s.poolIndex < 0 || s.setIndex < 0 || s.diskIndex < 0 {
return getXLDiskLoc(s.diskID)
@ -381,6 +379,12 @@ func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return s.poolIndex, s.setIndex, s.diskIndex
}
func (s *xlStorage) SetFormatData(b []byte) {
s.Lock()
defer s.Unlock()
s.formatData = b
}
// Set location indexes.
func (s *xlStorage) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
s.poolIndex = poolIdx