remove SetDiskLoc() rely on the endpoint values instead (#19475)

the disk location never changes in the lifetime of a
MinIO cluster, even if it did validate this close to the
disk instead at the higher layer.

Return appropriate errors indicating an invalid drive, so
that the drive is not recognized as part of a valid
drive.
This commit is contained in:
Harshavardhana
2024-04-11 10:45:28 -07:00
committed by GitHub
parent aa8d25797b
commit 074febd9e1
9 changed files with 57 additions and 262 deletions

View File

@@ -25,7 +25,6 @@ import (
"errors"
"fmt"
"io"
"net/url"
"os"
pathutil "path"
"path/filepath"
@@ -103,9 +102,6 @@ type xlStorage struct {
diskID string
// Indexes, will be -1 until assigned a set.
poolIndex, setIndex, diskIndex int
formatFileInfo os.FileInfo
formatFile string
formatLegacy bool
@@ -196,15 +192,6 @@ func getValidPath(path string) (string, error) {
return path, nil
}
// Initialize a new storage disk.
func newLocalXLStorage(path string) (*xlStorage, error) {
u := url.URL{Path: path}
return newXLStorage(Endpoint{
URL: &u,
IsLocal: true,
}, true)
}
// Make Erasure backend meta volumes.
func makeFormatErasureMetaVolumes(disk StorageAPI) error {
if disk == nil {
@@ -231,9 +218,6 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
endpoint: ep,
globalSync: globalFSOSync,
diskInfoCache: cachevalue.New[DiskInfo](),
poolIndex: -1,
setIndex: -1,
diskIndex: -1,
immediatePurge: make(chan string, immediatePurgeQueue),
}
@@ -315,7 +299,19 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
if err = json.Unmarshal(s.formatData, &format); err != nil {
return s, errCorruptedFormat
}
s.diskID = format.Erasure.This
m, n, err := findDiskIndexByDiskID(format, format.Erasure.This)
if err != nil {
return s, err
}
diskID := format.Erasure.This
if m != ep.SetIdx || n != ep.DiskIdx {
storageLogOnceIf(context.Background(),
fmt.Errorf("unexpected drive ordering on pool: %s: found drive at (set=%s, drive=%s), expected at (set=%s, drive=%s): %s(%s): %w",
humanize.Ordinal(ep.PoolIdx+1), humanize.Ordinal(m+1), humanize.Ordinal(n+1), humanize.Ordinal(ep.SetIdx+1), humanize.Ordinal(ep.DiskIdx+1),
s, s.diskID, errInconsistentDisk), "drive-order-format-json")
return s, errInconsistentDisk
}
s.diskID = diskID
s.formatLastCheck = time.Now()
s.formatLegacy = format.Erasure.DistributionAlgo == formatErasureVersionV2DistributionAlgoV1
}
@@ -408,11 +404,7 @@ func (s *xlStorage) IsLocal() bool {
// Retrieve location indexes.
func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
// If unset, see if we can locate it.
if s.poolIndex < 0 || s.setIndex < 0 || s.diskIndex < 0 {
return getXLDiskLoc(s.diskID)
}
return s.poolIndex, s.setIndex, s.diskIndex
return s.endpoint.PoolIdx, s.endpoint.SetIdx, s.endpoint.DiskIdx
}
func (s *xlStorage) SetFormatData(b []byte) {
@@ -421,13 +413,6 @@ func (s *xlStorage) SetFormatData(b []byte) {
s.formatData = b
}
// Set location indexes.
func (s *xlStorage) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
s.poolIndex = poolIdx
s.setIndex = setIdx
s.diskIndex = diskIdx
}
func (s *xlStorage) Healing() *healingTracker {
healingFile := pathJoin(s.drivePath, minioMetaBucket,
bucketMetaPrefix, healingTrackerFilename)
@@ -873,14 +858,28 @@ func (s *xlStorage) GetDiskID() (string, error) {
return "", errCorruptedFormat
}
m, n, err := findDiskIndexByDiskID(format, format.Erasure.This)
if err != nil {
return "", err
}
diskID = format.Erasure.This
ep := s.endpoint
if m != ep.SetIdx || n != ep.DiskIdx {
storageLogOnceIf(GlobalContext,
fmt.Errorf("unexpected drive ordering on pool: %s: found drive at (set=%s, drive=%s), expected at (set=%s, drive=%s): %s(%s): %w",
humanize.Ordinal(ep.PoolIdx+1), humanize.Ordinal(m+1), humanize.Ordinal(n+1), humanize.Ordinal(ep.SetIdx+1), humanize.Ordinal(ep.DiskIdx+1),
s, s.diskID, errInconsistentDisk), "drive-order-format-json")
return "", errInconsistentDisk
}
s.Lock()
defer s.Unlock()
s.formatData = b
s.diskID = format.Erasure.This
s.diskID = diskID
s.formatLegacy = format.Erasure.DistributionAlgo == formatErasureVersionV2DistributionAlgoV1
s.formatFileInfo = fi
s.formatData = b
s.formatLastCheck = time.Now()
return s.diskID, nil
s.Unlock()
return diskID, nil
}
// Make a volume entry.