mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
remove older deploymentID fix behavior to speed up startup (#19497)
since mid 2018 we do not have any deployments without deployment-id, it is time to put this code to rest, this PR removes this old code as its no longer valuable. on setups with 1000's of drives these are all quite expensive operations.
This commit is contained in:
parent
b8f05b1471
commit
d1c58fc2eb
@ -114,26 +114,26 @@ func (s *erasureSets) getDiskMap() map[Endpoint]StorageAPI {
|
||||
|
||||
// Initializes a new StorageAPI from the endpoint argument, returns
|
||||
// StorageAPI and also `format` which exists on the disk.
|
||||
func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, []byte, error) {
|
||||
func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
|
||||
disk, err := newStorageAPI(endpoint, storageOpts{
|
||||
cleanUp: false,
|
||||
healthCheck: false,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
format, formatData, err := loadFormatErasureWithData(disk, false)
|
||||
format, err := loadFormatErasure(disk, false)
|
||||
if err != nil {
|
||||
if errors.Is(err, errUnformattedDisk) {
|
||||
info, derr := disk.DiskInfo(context.TODO(), DiskInfoOptions{})
|
||||
if derr != nil && info.RootDisk {
|
||||
disk.Close()
|
||||
return nil, nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)
|
||||
return nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)
|
||||
}
|
||||
}
|
||||
disk.Close()
|
||||
return nil, nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
|
||||
return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
|
||||
}
|
||||
|
||||
disk.Close()
|
||||
@ -142,10 +142,10 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, []byte, e
|
||||
healthCheck: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return disk, format, formatData, nil
|
||||
return disk, format, nil
|
||||
}
|
||||
|
||||
// findDiskIndex - returns the i,j'th position of the input `diskID` against the reference
|
||||
@ -226,7 +226,7 @@ func (s *erasureSets) connectDisks() {
|
||||
wg.Add(1)
|
||||
go func(endpoint Endpoint) {
|
||||
defer wg.Done()
|
||||
disk, format, formatData, err := connectEndpoint(endpoint)
|
||||
disk, format, err := connectEndpoint(endpoint)
|
||||
if err != nil {
|
||||
if endpoint.IsLocal && errors.Is(err, errUnformattedDisk) {
|
||||
globalBackgroundHealState.pushHealLocalDisks(endpoint)
|
||||
@ -260,7 +260,6 @@ func (s *erasureSets) connectDisks() {
|
||||
}
|
||||
|
||||
disk.SetDiskID(format.Erasure.This)
|
||||
disk.SetFormatData(formatData)
|
||||
s.erasureDisks[setIndex][diskIndex] = disk
|
||||
|
||||
if disk.IsLocal() {
|
||||
|
@ -24,11 +24,9 @@ import (
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/minio/internal/color"
|
||||
"github.com/minio/minio/internal/config"
|
||||
"github.com/minio/minio/internal/config/storageclass"
|
||||
@ -331,7 +329,7 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
|
||||
if storageDisks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
format, formatData, err := loadFormatErasureWithData(storageDisks[index], heal)
|
||||
format, err := loadFormatErasure(storageDisks[index], heal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -340,7 +338,6 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
|
||||
// If no healing required, make the disks valid and
|
||||
// online.
|
||||
storageDisks[index].SetDiskID(format.Erasure.This)
|
||||
storageDisks[index].SetFormatData(formatData)
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
@ -380,7 +377,6 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
|
||||
}
|
||||
|
||||
disk.SetDiskID(format.Erasure.This)
|
||||
disk.SetFormatData(formatData)
|
||||
if healID != "" {
|
||||
ctx := context.Background()
|
||||
ht := initHealingTracker(disk, healID)
|
||||
@ -389,56 +385,32 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadFormatErasureWithData - loads format.json from disk.
|
||||
func loadFormatErasureWithData(disk StorageAPI, heal bool) (format *formatErasureV3, data []byte, err error) {
|
||||
data, err = disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile)
|
||||
// loadFormatErasure - loads format.json from disk.
|
||||
func loadFormatErasure(disk StorageAPI, heal bool) (format *formatErasureV3, err error) {
|
||||
data, err := disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile)
|
||||
if err != nil {
|
||||
// 'file not found' and 'volume not found' as
|
||||
// same. 'volume not found' usually means its a fresh disk.
|
||||
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
|
||||
return nil, nil, errUnformattedDisk
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Try to decode format json into formatConfigV1 struct.
|
||||
format = &formatErasureV3{}
|
||||
if err = json.Unmarshal(data, format); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if heal {
|
||||
info, err := disk.DiskInfo(context.Background(), DiskInfoOptions{NoOp: heal})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
format.Info = info
|
||||
}
|
||||
|
||||
// Success.
|
||||
return format, data, nil
|
||||
}
|
||||
|
||||
// loadFormatErasure - loads format.json from disk.
|
||||
func loadFormatErasure(disk StorageAPI) (format *formatErasureV3, err error) {
|
||||
buf, err := disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile)
|
||||
if err != nil {
|
||||
// 'file not found' and 'volume not found' as
|
||||
// same. 'volume not found' usually means its a fresh disk.
|
||||
if err == errFileNotFound || err == errVolumeNotFound {
|
||||
return nil, errUnformattedDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
json := jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
|
||||
// Try to decode format json into formatConfigV1 struct.
|
||||
format = &formatErasureV3{}
|
||||
if err = json.Unmarshal(buf, format); err != nil {
|
||||
if err = json.Unmarshal(data, format); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if heal {
|
||||
info, err := disk.DiskInfo(context.Background(), DiskInfoOptions{NoOp: heal})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
format.Info = info
|
||||
}
|
||||
|
||||
// Success.
|
||||
return format, nil
|
||||
}
|
||||
@ -481,110 +453,6 @@ func checkFormatErasureValues(formats []*formatErasureV3, disks []StorageAPI, se
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get Deployment ID for the Erasure sets from format.json.
|
||||
// This need not be in quorum. Even if one of the format.json
|
||||
// file has this value, we assume it is valid.
|
||||
// If more than one format.json's have different id, it is considered a corrupt
|
||||
// backend format.
|
||||
func formatErasureGetDeploymentID(refFormat *formatErasureV3, formats []*formatErasureV3) (string, error) {
|
||||
var deploymentID string
|
||||
for _, format := range formats {
|
||||
if format == nil || format.ID == "" {
|
||||
continue
|
||||
}
|
||||
if reflect.DeepEqual(format.Erasure.Sets, refFormat.Erasure.Sets) {
|
||||
// Found an ID in one of the format.json file
|
||||
// Set deploymentID for the first time.
|
||||
if deploymentID == "" {
|
||||
deploymentID = format.ID
|
||||
} else if deploymentID != format.ID {
|
||||
// DeploymentID found earlier doesn't match with the
|
||||
// current format.json's ID.
|
||||
return "", fmt.Errorf("Deployment IDs do not match expected %s, got %s: %w",
|
||||
deploymentID, format.ID, errCorruptedFormat)
|
||||
}
|
||||
}
|
||||
}
|
||||
return deploymentID, nil
|
||||
}
|
||||
|
||||
// formatErasureFixDeploymentID - Add deployment id if it is not present.
|
||||
func formatErasureFixDeploymentID(endpoints Endpoints, storageDisks []StorageAPI, refFormat *formatErasureV3, formats []*formatErasureV3) (err error) {
|
||||
for index := range formats {
|
||||
// If the Erasure sets do not match, set those formats to nil,
|
||||
// We do not have to update the ID on those format.json file.
|
||||
if formats[index] != nil && !reflect.DeepEqual(formats[index].Erasure.Sets, refFormat.Erasure.Sets) {
|
||||
formats[index] = nil
|
||||
}
|
||||
}
|
||||
|
||||
refFormat.ID, err = formatErasureGetDeploymentID(refFormat, formats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If ID is set, then some other node got the lock
|
||||
// before this node could and generated an ID
|
||||
// for the deployment. No need to generate one.
|
||||
if refFormat.ID != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ID is generated for the first time,
|
||||
// We set the ID in all the formats and update.
|
||||
refFormat.ID = mustGetUUID()
|
||||
for _, format := range formats {
|
||||
if format != nil {
|
||||
format.ID = refFormat.ID
|
||||
}
|
||||
}
|
||||
// Deployment ID needs to be set on all the disks.
|
||||
// Save `format.json` across all disks.
|
||||
return saveFormatErasureAll(GlobalContext, storageDisks, formats)
|
||||
}
|
||||
|
||||
// Update only the valid local disks which have not been updated before.
|
||||
func formatErasureFixLocalDeploymentID(endpoints Endpoints, storageDisks []StorageAPI, refFormat *formatErasureV3) error {
|
||||
// If this server was down when the deploymentID was updated
|
||||
// then we make sure that we update the local disks with the deploymentID.
|
||||
|
||||
// Initialize errs to collect errors inside go-routine.
|
||||
g := errgroup.WithNErrs(len(storageDisks))
|
||||
|
||||
for index := range storageDisks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if endpoints[index].IsLocal && storageDisks[index] != nil && storageDisks[index].IsOnline() {
|
||||
format, err := loadFormatErasure(storageDisks[index])
|
||||
if err != nil {
|
||||
// Disk can be offline etc.
|
||||
// ignore the errors seen here.
|
||||
return nil
|
||||
}
|
||||
if format.ID != "" {
|
||||
return nil
|
||||
}
|
||||
if !reflect.DeepEqual(format.Erasure.Sets, refFormat.Erasure.Sets) {
|
||||
return nil
|
||||
}
|
||||
format.ID = refFormat.ID
|
||||
// Heal the drive if we fixed its deployment ID.
|
||||
if err := saveFormatErasure(storageDisks[index], format, mustGetUUID()); err != nil {
|
||||
bootLogIf(GlobalContext, err)
|
||||
return fmt.Errorf("Unable to save format.json, %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
for _, err := range g.Wait() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get backend Erasure format in quorum `format.json`.
|
||||
func getFormatErasureInQuorum(formats []*formatErasureV3) (*formatErasureV3, error) {
|
||||
formatCountMap := make(map[int]int, len(formats))
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
@ -431,62 +430,6 @@ func BenchmarkGetFormatErasureInQuorum(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// Tests formatErasureGetDeploymentID()
|
||||
func TestGetErasureID(t *testing.T) {
|
||||
setCount := 2
|
||||
setDriveCount := 8
|
||||
|
||||
format := newFormatErasureV3(setCount, setDriveCount)
|
||||
format.Erasure.DistributionAlgo = formatErasureVersionV2DistributionAlgoV1
|
||||
formats := make([]*formatErasureV3, 16)
|
||||
|
||||
for i := 0; i < setCount; i++ {
|
||||
for j := 0; j < setDriveCount; j++ {
|
||||
newFormat := format.Clone()
|
||||
newFormat.Erasure.This = format.Erasure.Sets[i][j]
|
||||
formats[i*setDriveCount+j] = newFormat
|
||||
}
|
||||
}
|
||||
|
||||
// Return a format from list of formats in quorum.
|
||||
quorumFormat, err := getFormatErasureInQuorum(formats)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Check if the reference format and input formats are same.
|
||||
var id string
|
||||
if id, err = formatErasureGetDeploymentID(quorumFormat, formats); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if id == "" {
|
||||
t.Fatal("ID cannot be empty.")
|
||||
}
|
||||
|
||||
formats[0] = nil
|
||||
if id, err = formatErasureGetDeploymentID(quorumFormat, formats); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if id == "" {
|
||||
t.Fatal("ID cannot be empty.")
|
||||
}
|
||||
|
||||
formats[1].Erasure.Sets[0][0] = "bad-uuid"
|
||||
if id, err = formatErasureGetDeploymentID(quorumFormat, formats); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if id == "" {
|
||||
t.Fatal("ID cannot be empty.")
|
||||
}
|
||||
|
||||
formats[2].ID = "bad-id"
|
||||
if _, err = formatErasureGetDeploymentID(quorumFormat, formats); !errors.Is(err, errCorruptedFormat) {
|
||||
t.Fatalf("Unexpected error %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize new format sets.
|
||||
func TestNewFormatSets(t *testing.T) {
|
||||
setCount := 2
|
||||
|
@ -414,7 +414,7 @@ func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions,
|
||||
// On success a meta cache stream will be returned, that should be closed when done.
|
||||
func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error {
|
||||
// Ensure remote has the same disk ID.
|
||||
opts.DiskID = client.diskID
|
||||
opts.DiskID = *client.diskID.Load()
|
||||
b, err := opts.MarshalMsg(grid.GetByteBuffer()[:0])
|
||||
if err != nil {
|
||||
return toStorageErr(err)
|
||||
|
@ -106,10 +106,6 @@ func (d *naughtyDisk) GetDiskID() (string, error) {
|
||||
return d.disk.GetDiskID()
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) SetFormatData(b []byte) {
|
||||
d.disk.SetFormatData(b)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) SetDiskID(id string) {
|
||||
d.disk.SetDiskID(id)
|
||||
}
|
||||
|
@ -245,23 +245,11 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo
|
||||
}
|
||||
|
||||
if format.ID == "" {
|
||||
// Not a first disk, wait until first disk fixes deploymentID
|
||||
if !firstDisk {
|
||||
return nil, nil, errNotFirstDisk
|
||||
}
|
||||
if err = formatErasureFixDeploymentID(endpoints, storageDisks, format, formatConfigs); err != nil {
|
||||
storageLogIf(GlobalContext, err)
|
||||
return nil, nil, err
|
||||
}
|
||||
internalLogIf(GlobalContext, errors.New("unexpected error deployment ID is missing, refusing to continue"))
|
||||
return nil, nil, errInvalidArgument
|
||||
}
|
||||
|
||||
globalDeploymentIDPtr.Store(&format.ID)
|
||||
|
||||
if err = formatErasureFixLocalDeploymentID(endpoints, storageDisks, format); err != nil {
|
||||
storageLogIf(GlobalContext, err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return storageDisks, format, nil
|
||||
}
|
||||
|
||||
|
@ -108,5 +108,4 @@ type StorageAPI interface {
|
||||
// Read all.
|
||||
ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error)
|
||||
GetDiskLoc() (poolIdx, setIdx, diskIdx int) // Retrieve location indexes.
|
||||
SetFormatData(b []byte) // Set formatData cached value
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v3"
|
||||
@ -155,12 +156,10 @@ func toStorageErr(err error) error {
|
||||
|
||||
// Abstracts a remote disk.
|
||||
type storageRESTClient struct {
|
||||
endpoint Endpoint
|
||||
restClient *rest.Client
|
||||
gridConn *grid.Subroute
|
||||
diskID string
|
||||
formatData []byte
|
||||
formatMutex sync.RWMutex
|
||||
endpoint Endpoint
|
||||
restClient *rest.Client
|
||||
gridConn *grid.Subroute
|
||||
diskID atomic.Pointer[string]
|
||||
|
||||
diskInfoCache *cachevalue.Cache[DiskInfo]
|
||||
}
|
||||
@ -170,14 +169,13 @@ func (client *storageRESTClient) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
||||
return client.endpoint.PoolIdx, client.endpoint.SetIdx, client.endpoint.DiskIdx
|
||||
}
|
||||
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
|
||||
// permanently. The only way to restore the storage connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints()
|
||||
// after verifying format.json
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is disconnected
|
||||
// and a healthcheck routine gets invoked that would reconnect.
|
||||
func (client *storageRESTClient) call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (io.ReadCloser, error) {
|
||||
if values == nil {
|
||||
values = make(url.Values)
|
||||
}
|
||||
values.Set(storageRESTDiskID, client.diskID)
|
||||
values.Set(storageRESTDiskID, *client.diskID.Load())
|
||||
respBody, err := client.restClient.Call(ctx, method, values, body, length)
|
||||
if err != nil {
|
||||
return nil, toStorageErr(err)
|
||||
@ -222,7 +220,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
||||
defer xioutil.SafeClose(updates)
|
||||
|
||||
st, err := storageNSScannerRPC.Call(ctx, client.gridConn, &nsScannerOptions{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
ScanMode: int(scanMode),
|
||||
Cache: &cache,
|
||||
})
|
||||
@ -252,14 +250,6 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
||||
return *final, nil
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) SetFormatData(b []byte) {
|
||||
if client.IsOnline() {
|
||||
client.formatMutex.Lock()
|
||||
client.formatData = b
|
||||
client.formatMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||
if !client.IsOnline() {
|
||||
// make sure to check if the disk is offline, since the underlying
|
||||
@ -274,11 +264,11 @@ func (client *storageRESTClient) GetDiskID() (string, error) {
|
||||
// a cached value - caller should make sure to use this
|
||||
// function on a fresh disk or make sure to look at the error
|
||||
// from a different networked call to validate the GetDiskID()
|
||||
return client.diskID, nil
|
||||
return *client.diskID.Load(), nil
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) SetDiskID(id string) {
|
||||
client.diskID = id
|
||||
client.diskID.Store(&id)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
|
||||
@ -296,7 +286,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
opts.DiskID = client.diskID
|
||||
opts.DiskID = *client.diskID.Load()
|
||||
|
||||
infop, err := storageDiskInfoRPC.Call(ctx, client.gridConn, &opts)
|
||||
if err != nil {
|
||||
@ -315,7 +305,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
nopts := DiskInfoOptions{DiskID: client.diskID, Metrics: true}
|
||||
nopts := DiskInfoOptions{DiskID: *client.diskID.Load(), Metrics: true}
|
||||
infop, err := storageDiskInfoRPC.Call(ctx, client.gridConn, &nopts)
|
||||
if err != nil {
|
||||
return info, toStorageErr(err)
|
||||
@ -349,7 +339,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
|
||||
// StatVol - get volume info over the network.
|
||||
func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vol VolInfo, err error) {
|
||||
v, err := storageStatVolRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
|
||||
storageRESTDiskID: client.diskID,
|
||||
storageRESTDiskID: *client.diskID.Load(),
|
||||
storageRESTVolume: volume,
|
||||
}))
|
||||
if err != nil {
|
||||
@ -395,7 +385,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, origvolume, vol
|
||||
|
||||
func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume, volume, path string, fi FileInfo) error {
|
||||
_, err := storageWriteMetadataRPC.Call(ctx, client.gridConn, &MetadataHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
OrigVolume: origvolume,
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
@ -406,7 +396,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, origvolume,
|
||||
|
||||
func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error {
|
||||
_, err := storageUpdateMetadataRPC.Call(ctx, client.gridConn, &MetadataHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
UpdateOpts: opts,
|
||||
@ -417,7 +407,7 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat
|
||||
|
||||
func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool, opts DeleteOptions) (err error) {
|
||||
_, err = storageDeleteVersionRPC.Call(ctx, client.gridConn, &DeleteVersionHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
ForceDelMarker: forceDelMarker,
|
||||
@ -429,14 +419,8 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
|
||||
|
||||
// WriteAll - write all data to a file.
|
||||
func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, path string, b []byte) error {
|
||||
// Specific optimization to avoid re-read from the drives for `format.json`
|
||||
// in-case the caller is a network operation.
|
||||
if volume == minioMetaBucket && path == formatConfigFile {
|
||||
client.SetFormatData(b)
|
||||
}
|
||||
|
||||
_, err := storageWriteAllRPC.Call(ctx, client.gridConn, &WriteAllHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
Buf: b,
|
||||
@ -447,7 +431,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
|
||||
// CheckParts - stat all file parts.
|
||||
func (client *storageRESTClient) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) error {
|
||||
_, err := storageCheckPartsRPC.Call(ctx, client.gridConn, &CheckPartsHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
FI: fi,
|
||||
@ -458,7 +442,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
||||
// RenameData - rename source path to destination path atomically, metadata and data file.
|
||||
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
|
||||
params := RenameDataHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
SrcVolume: srcVolume,
|
||||
SrcPath: srcPath,
|
||||
DstPath: dstPath,
|
||||
@ -507,7 +491,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, origvolume, vo
|
||||
// Use websocket when not reading data.
|
||||
if !opts.ReadData {
|
||||
resp, err := storageReadVersionRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
|
||||
storageRESTDiskID: client.diskID,
|
||||
storageRESTDiskID: *client.diskID.Load(),
|
||||
storageRESTOrigVolume: origvolume,
|
||||
storageRESTVolume: volume,
|
||||
storageRESTFilePath: path,
|
||||
@ -547,7 +531,7 @@ func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path
|
||||
// Use websocket when not reading data.
|
||||
if !readData {
|
||||
resp, err := storageReadXLRPC.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
|
||||
storageRESTDiskID: client.diskID,
|
||||
storageRESTDiskID: *client.diskID.Load(),
|
||||
storageRESTVolume: volume,
|
||||
storageRESTFilePath: path,
|
||||
storageRESTReadData: "false",
|
||||
@ -577,20 +561,8 @@ func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path
|
||||
|
||||
// ReadAll - reads all contents of a file.
|
||||
func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, path string) ([]byte, error) {
|
||||
// Specific optimization to avoid re-read from the drives for `format.json`
|
||||
// in-case the caller is a network operation.
|
||||
if volume == minioMetaBucket && path == formatConfigFile {
|
||||
client.formatMutex.RLock()
|
||||
formatData := make([]byte, len(client.formatData))
|
||||
copy(formatData, client.formatData)
|
||||
client.formatMutex.RUnlock()
|
||||
if len(formatData) > 0 {
|
||||
return formatData, nil
|
||||
}
|
||||
}
|
||||
|
||||
gridBytes, err := storageReadAllRPC.Call(ctx, client.gridConn, &ReadAllHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
})
|
||||
@ -645,7 +617,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, origvolume, volume
|
||||
values.Set(storageRESTDirPath, dirPath)
|
||||
values.Set(storageRESTCount, strconv.Itoa(count))
|
||||
values.Set(storageRESTOrigVolume, origvolume)
|
||||
values.Set(storageRESTDiskID, client.diskID)
|
||||
values.Set(storageRESTDiskID, *client.diskID.Load())
|
||||
|
||||
st, err := storageListDirRPC.Call(ctx, client.gridConn, values)
|
||||
if err != nil {
|
||||
@ -661,7 +633,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, origvolume, volume
|
||||
// DeleteFile - deletes a file.
|
||||
func (client *storageRESTClient) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) error {
|
||||
_, err := storageDeleteFileRPC.Call(ctx, client.gridConn, &DeleteFileHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
Volume: volume,
|
||||
FilePath: path,
|
||||
Opts: deleteOpts,
|
||||
@ -726,7 +698,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
||||
// RenameFile - renames a file.
|
||||
func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||
_, err = storageRenameFileRPC.Call(ctx, client.gridConn, &RenameFileHandlerParams{
|
||||
DiskID: client.diskID,
|
||||
DiskID: *client.diskID.Load(),
|
||||
SrcVolume: srcVolume,
|
||||
SrcFilePath: srcPath,
|
||||
DstVolume: dstVolume,
|
||||
@ -855,6 +827,8 @@ func (client *storageRESTClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var emptyDiskID = ""
|
||||
|
||||
// Returns a storage rest client.
|
||||
func newStorageRESTClient(endpoint Endpoint, healthCheck bool, gm *grid.Manager) (*storageRESTClient, error) {
|
||||
serverURL := &url.URL{
|
||||
@ -881,10 +855,12 @@ func newStorageRESTClient(endpoint Endpoint, healthCheck bool, gm *grid.Manager)
|
||||
if conn == nil {
|
||||
return nil, fmt.Errorf("unable to find connection for %s in targets: %v", endpoint.GridHost(), gm.Targets())
|
||||
}
|
||||
return &storageRESTClient{
|
||||
client := &storageRESTClient{
|
||||
endpoint: endpoint,
|
||||
restClient: restClient,
|
||||
gridConn: conn,
|
||||
diskInfoCache: cachevalue.New[DiskInfo](),
|
||||
}, nil
|
||||
}
|
||||
client.SetDiskID(emptyDiskID)
|
||||
return client, nil
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ type xlStorageDiskIDCheck struct {
|
||||
// apiCalls should be placed first so alignment is guaranteed for atomic operations.
|
||||
apiCalls [storageMetricLast]uint64
|
||||
apiLatencies [storageMetricLast]*lockedLastMinuteLatency
|
||||
diskID string
|
||||
diskID atomic.Pointer[string]
|
||||
storage *xlStorage
|
||||
health *diskHealthTracker
|
||||
healthCheck bool
|
||||
@ -182,6 +182,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
|
||||
healthCheck: healthCheck && globalDriveMonitoring,
|
||||
metricsCache: cachevalue.New[DiskMetrics](),
|
||||
}
|
||||
xl.SetDiskID(emptyDiskID)
|
||||
|
||||
xl.totalWrites.Store(xl.storage.getWriteAttribute())
|
||||
xl.totalDeletes.Store(xl.storage.getDeleteAttribute())
|
||||
@ -204,7 +205,7 @@ func (p *xlStorageDiskIDCheck) IsOnline() bool {
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return storedDiskID == p.diskID
|
||||
return storedDiskID == *p.diskID.Load()
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) LastConn() time.Time {
|
||||
@ -245,10 +246,6 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac
|
||||
return p.storage.NSScanner(ctx, cache, updates, scanMode, weSleep)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) SetFormatData(b []byte) {
|
||||
p.storage.SetFormatData(b)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
||||
return p.storage.GetDiskLoc()
|
||||
}
|
||||
@ -263,11 +260,11 @@ func (p *xlStorageDiskIDCheck) GetDiskID() (string, error) {
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) SetDiskID(id string) {
|
||||
p.diskID = id
|
||||
p.diskID.Store(&id)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) checkDiskStale() error {
|
||||
if p.diskID == "" {
|
||||
if *p.diskID.Load() == emptyDiskID {
|
||||
// For empty disk-id we allow the call as the server might be
|
||||
// coming up and trying to read format.json or create format.json
|
||||
return nil
|
||||
@ -277,7 +274,7 @@ func (p *xlStorageDiskIDCheck) checkDiskStale() error {
|
||||
// return any error generated while reading `format.json`
|
||||
return err
|
||||
}
|
||||
if err == nil && p.diskID == storedDiskID {
|
||||
if err == nil && *p.diskID.Load() == storedDiskID {
|
||||
return nil
|
||||
}
|
||||
// not the same disk we remember, take it offline.
|
||||
@ -331,7 +328,8 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOption
|
||||
|
||||
// check cached diskID against backend
|
||||
// only if its non-empty.
|
||||
if p.diskID != "" && p.diskID != info.ID {
|
||||
cachedID := *p.diskID.Load()
|
||||
if cachedID != "" && cachedID != info.ID {
|
||||
return info, errDiskNotFound
|
||||
}
|
||||
return info, nil
|
||||
|
@ -407,12 +407,6 @@ func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
|
||||
return s.endpoint.PoolIdx, s.endpoint.SetIdx, s.endpoint.DiskIdx
|
||||
}
|
||||
|
||||
func (s *xlStorage) SetFormatData(b []byte) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.formatData = b
|
||||
}
|
||||
|
||||
func (s *xlStorage) Healing() *healingTracker {
|
||||
healingFile := pathJoin(s.drivePath, minioMetaBucket,
|
||||
bucketMetaPrefix, healingTrackerFilename)
|
||||
@ -2201,7 +2195,7 @@ func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b
|
||||
|
||||
var w *os.File
|
||||
if sync {
|
||||
// Perform directIO along with fdatasync for larger xl.meta, mostly when
|
||||
// Perform DirectIO along with fdatasync for larger xl.meta, mostly when
|
||||
// xl.meta has "inlined data" we prefer writing O_DIRECT and then doing
|
||||
// fdatasync() at the end instead of opening the file with O_DSYNC.
|
||||
//
|
||||
@ -2241,6 +2235,14 @@ func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b
|
||||
}
|
||||
|
||||
func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
||||
// Specific optimization to avoid re-read from the drives for `format.json`
|
||||
// in-case the caller is a network operation.
|
||||
if volume == minioMetaBucket && path == formatConfigFile {
|
||||
s.Lock()
|
||||
s.formatData = b
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
return s.writeAll(ctx, volume, path, b, true)
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user