mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
fix storage info output with unordered endpoints arguments (#9610)
Shuffling arguments that we pass to MinIO server are supported. However, when that happens, Prometheus returns wrong information about disks usage and online/offline status. The commit fixes the issue by avoiding relying on xl.endpoints since it is not ordered.
This commit is contained in:
parent
bd032d13ff
commit
9baeda781a
@ -34,7 +34,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) {
|
||||
volume := "testvol"
|
||||
filePath := "testfile"
|
||||
|
||||
disk, err := newPosix(tmpDir)
|
||||
disk, err := newPosix(tmpDir, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -664,7 +664,7 @@ func initXLMetaVolumesInLocalDisks(storageDisks []StorageAPI, formats []*formatX
|
||||
// Compute the local disks eligible for meta volumes (re)initialization
|
||||
var disksToInit []StorageAPI
|
||||
for index := range storageDisks {
|
||||
if formats[index] == nil || storageDisks[index] == nil || storageDisks[index].Hostname() != "" {
|
||||
if formats[index] == nil || storageDisks[index] == nil || !storageDisks[index].IsLocal() {
|
||||
// Ignore create meta volume on disks which are not found or not local.
|
||||
continue
|
||||
}
|
||||
|
@ -54,8 +54,12 @@ func (d *naughtyDisk) IsOnline() bool {
|
||||
return d.disk.IsOnline()
|
||||
}
|
||||
|
||||
func (*naughtyDisk) Hostname() string {
|
||||
return ""
|
||||
func (d *naughtyDisk) IsLocal() bool {
|
||||
return d.disk.IsLocal()
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) Hostname() string {
|
||||
return d.disk.Hostname()
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) Close() (err error) {
|
||||
|
@ -82,7 +82,7 @@ func dirObjectInfo(bucket, object string, size int64, metadata map[string]string
|
||||
// Depending on the disk type network or local, initialize storage API.
|
||||
func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
|
||||
if endpoint.IsLocal {
|
||||
storage, err := newPosix(endpoint.Path)
|
||||
storage, err := newPosix(endpoint.Path, endpoint.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -39,6 +39,10 @@ func (p *posixDiskIDCheck) IsOnline() bool {
|
||||
return storedDiskID == p.diskID
|
||||
}
|
||||
|
||||
func (p *posixDiskIDCheck) IsLocal() bool {
|
||||
return p.storage.IsLocal()
|
||||
}
|
||||
|
||||
func (p *posixDiskIDCheck) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
|
||||
return p.storage.CrawlAndGetDataUsage(ctx, cache)
|
||||
}
|
||||
|
15
cmd/posix.go
15
cmd/posix.go
@ -87,7 +87,9 @@ type posix struct {
|
||||
activeIOCount int32
|
||||
|
||||
diskPath string
|
||||
pool sync.Pool
|
||||
hostname string
|
||||
|
||||
pool sync.Pool
|
||||
|
||||
diskMount bool // indicates if the path is an actual mount.
|
||||
|
||||
@ -228,7 +230,7 @@ func isDirEmpty(dirname string) bool {
|
||||
}
|
||||
|
||||
// Initialize a new storage disk.
|
||||
func newPosix(path string) (*posix, error) {
|
||||
func newPosix(path string, hostname string) (*posix, error) {
|
||||
var err error
|
||||
if path, err = getValidPath(path, true); err != nil {
|
||||
return nil, err
|
||||
@ -239,6 +241,7 @@ func newPosix(path string) (*posix, error) {
|
||||
}
|
||||
p := &posix{
|
||||
diskPath: path,
|
||||
hostname: hostname,
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(readBlockSize)
|
||||
@ -339,8 +342,8 @@ func (s *posix) String() string {
|
||||
return s.diskPath
|
||||
}
|
||||
|
||||
func (*posix) Hostname() string {
|
||||
return ""
|
||||
func (s *posix) Hostname() string {
|
||||
return s.hostname
|
||||
}
|
||||
|
||||
func (s *posix) Close() error {
|
||||
@ -352,6 +355,10 @@ func (s *posix) IsOnline() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *posix) IsLocal() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *posix) waitForLowActiveIO() {
|
||||
for atomic.LoadInt32(&s.activeIOCount) >= s.maxActiveIOCount {
|
||||
time.Sleep(lowActiveIOWaitTick)
|
||||
|
@ -121,7 +121,7 @@ func newPosixTestSetup() (StorageAPI, string, error) {
|
||||
return nil, "", err
|
||||
}
|
||||
// Initialize a new posix layer.
|
||||
storage, err := newPosix(diskPath)
|
||||
storage, err := newPosix(diskPath, "")
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
@ -376,7 +376,7 @@ func TestPosixNewPosix(t *testing.T) {
|
||||
// Validate all test cases.
|
||||
for i, testCase := range testCases {
|
||||
// Initialize a new posix layer.
|
||||
_, err := newPosix(testCase.name)
|
||||
_, err := newPosix(testCase.name, "")
|
||||
if err != testCase.err {
|
||||
t.Fatalf("TestPosix %d failed wanted: %s, got: %s", i+1, err, testCase.err)
|
||||
}
|
||||
@ -453,7 +453,7 @@ func TestPosixMakeVol(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initialize posix storage layer for permission denied error.
|
||||
_, err = newPosix(permDeniedDir)
|
||||
_, err = newPosix(permDeniedDir, "")
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -462,7 +462,7 @@ func TestPosixMakeVol(t *testing.T) {
|
||||
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
|
||||
}
|
||||
|
||||
posixStorage, err = newPosix(permDeniedDir)
|
||||
posixStorage, err = newPosix(permDeniedDir, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -555,7 +555,7 @@ func TestPosixDeleteVol(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initialize posix storage layer for permission denied error.
|
||||
_, err = newPosix(permDeniedDir)
|
||||
_, err = newPosix(permDeniedDir, "")
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -564,7 +564,7 @@ func TestPosixDeleteVol(t *testing.T) {
|
||||
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
|
||||
}
|
||||
|
||||
posixStorage, err = newPosix(permDeniedDir)
|
||||
posixStorage, err = newPosix(permDeniedDir, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -811,7 +811,7 @@ func TestPosixPosixListDir(t *testing.T) {
|
||||
defer removePermDeniedFile(permDeniedDir)
|
||||
|
||||
// Initialize posix storage layer for permission denied error.
|
||||
_, err = newPosix(permDeniedDir)
|
||||
_, err = newPosix(permDeniedDir, "")
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -820,7 +820,7 @@ func TestPosixPosixListDir(t *testing.T) {
|
||||
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
|
||||
}
|
||||
|
||||
posixStorage, err = newPosix(permDeniedDir)
|
||||
posixStorage, err = newPosix(permDeniedDir, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -938,7 +938,7 @@ func TestPosixDeleteFile(t *testing.T) {
|
||||
defer removePermDeniedFile(permDeniedDir)
|
||||
|
||||
// Initialize posix storage layer for permission denied error.
|
||||
_, err = newPosix(permDeniedDir)
|
||||
_, err = newPosix(permDeniedDir, "")
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -947,7 +947,7 @@ func TestPosixDeleteFile(t *testing.T) {
|
||||
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
|
||||
}
|
||||
|
||||
posixStorage, err = newPosix(permDeniedDir)
|
||||
posixStorage, err = newPosix(permDeniedDir, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -1136,7 +1136,7 @@ func TestPosixReadFile(t *testing.T) {
|
||||
defer removePermDeniedFile(permDeniedDir)
|
||||
|
||||
// Initialize posix storage layer for permission denied error.
|
||||
_, err = newPosix(permDeniedDir)
|
||||
_, err = newPosix(permDeniedDir, "")
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -1145,7 +1145,7 @@ func TestPosixReadFile(t *testing.T) {
|
||||
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
|
||||
}
|
||||
|
||||
posixPermStorage, err := newPosix(permDeniedDir)
|
||||
posixPermStorage, err := newPosix(permDeniedDir, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -1306,7 +1306,7 @@ func TestPosixAppendFile(t *testing.T) {
|
||||
|
||||
var posixPermStorage StorageAPI
|
||||
// Initialize posix storage layer for permission denied error.
|
||||
_, err = newPosix(permDeniedDir)
|
||||
_, err = newPosix(permDeniedDir, "")
|
||||
if err != nil && !os.IsPermission(err) {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
@ -1315,7 +1315,7 @@ func TestPosixAppendFile(t *testing.T) {
|
||||
t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err)
|
||||
}
|
||||
|
||||
posixPermStorage, err = newPosix(permDeniedDir)
|
||||
posixPermStorage, err = newPosix(permDeniedDir, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to initialize posix, %s", err)
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func TestIsValidUmaskVol(t *testing.T) {
|
||||
testCase := testCases[0]
|
||||
|
||||
// Initialize a new posix layer.
|
||||
disk, err := newPosix(tmpPath)
|
||||
disk, err := newPosix(tmpPath, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Initializing posix failed with %s.", err)
|
||||
}
|
||||
@ -91,7 +91,7 @@ func TestIsValidUmaskFile(t *testing.T) {
|
||||
testCase := testCases[0]
|
||||
|
||||
// Initialize a new posix layer.
|
||||
disk, err := newPosix(tmpPath)
|
||||
disk, err := newPosix(tmpPath, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Initializing posix failed with %s.", err)
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ func TestUNCPaths(t *testing.T) {
|
||||
|
||||
// Instantiate posix object to manage a disk
|
||||
var fs StorageAPI
|
||||
fs, err = newPosix(dir)
|
||||
fs, err = newPosix(dir, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -83,7 +83,7 @@ func TestUNCPathENOTDIR(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
var fs StorageAPI
|
||||
fs, err = newPosix(dir)
|
||||
fs, err = newPosix(dir, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -27,7 +27,8 @@ type StorageAPI interface {
|
||||
String() string
|
||||
|
||||
// Storage operations.
|
||||
IsOnline() bool // Returns true if disk is online.
|
||||
IsOnline() bool // Returns true if disk is online.
|
||||
IsLocal() bool
|
||||
Hostname() string // Returns host name if remote host.
|
||||
Close() error
|
||||
GetDiskID() (string, error)
|
||||
|
@ -147,6 +147,10 @@ func (client *storageRESTClient) IsOnline() bool {
|
||||
return atomic.LoadInt32(&client.connected) == 1
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) IsLocal() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) Hostname() string {
|
||||
return client.endpoint.Host
|
||||
}
|
||||
|
@ -758,7 +758,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
}
|
||||
storage, err := newPosix(endpoint.Path)
|
||||
storage, err := newPosix(endpoint.Path, endpoint.Host)
|
||||
if err != nil {
|
||||
if err == errMinDiskSize {
|
||||
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(err.Error()), "Unable to initialize backend")
|
||||
|
@ -317,9 +317,7 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA
|
||||
}
|
||||
|
||||
for i := 0; i < setCount; i++ {
|
||||
var endpoints Endpoints
|
||||
for j := 0; j < drivesPerSet; j++ {
|
||||
endpoints = append(endpoints, s.endpoints[i*drivesPerSet+j])
|
||||
// Rely on endpoints list to initialize, init lockers and available disks.
|
||||
s.xlLockers[i][j] = newLockAPI(s.endpoints[i*drivesPerSet+j])
|
||||
|
||||
@ -344,7 +342,6 @@ func newXLSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageA
|
||||
s.sets[i] = &xlObjects{
|
||||
getDisks: s.GetDisks(i),
|
||||
getLockers: s.GetLockers(i),
|
||||
endpoints: endpoints,
|
||||
nsMutex: mutex,
|
||||
bp: bp,
|
||||
mrfUploadCh: make(chan partialUpload, 10000),
|
||||
|
35
cmd/xl-v1.go
35
cmd/xl-v1.go
@ -59,8 +59,6 @@ type xlObjects struct {
|
||||
// getLockers returns list of remote and local lockers.
|
||||
getLockers func() []dsync.NetLocker
|
||||
|
||||
endpoints Endpoints
|
||||
|
||||
// Locker mutex map.
|
||||
nsMutex *nsLockMap
|
||||
|
||||
@ -95,7 +93,7 @@ func (d byDiskTotal) Less(i, j int) bool {
|
||||
}
|
||||
|
||||
// getDisksInfo - fetch disks info across all other storage API.
|
||||
func getDisksInfo(disks []StorageAPI, endpoints Endpoints) (disksInfo []DiskInfo, onlineDisks, offlineDisks madmin.BackendDisks) {
|
||||
func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks, offlineDisks madmin.BackendDisks) {
|
||||
disksInfo = make([]DiskInfo, len(disks))
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
@ -125,7 +123,10 @@ func getDisksInfo(disks []StorageAPI, endpoints Endpoints) (disksInfo []DiskInfo
|
||||
|
||||
// Wait for the routines.
|
||||
for i, diskInfoErr := range g.Wait() {
|
||||
peerAddr := endpoints[i].Host
|
||||
if disks[i] == nil {
|
||||
continue
|
||||
}
|
||||
peerAddr := disks[i].Hostname()
|
||||
if _, ok := offlineDisks[peerAddr]; !ok {
|
||||
offlineDisks[peerAddr] = 0
|
||||
}
|
||||
@ -139,13 +140,29 @@ func getDisksInfo(disks []StorageAPI, endpoints Endpoints) (disksInfo []DiskInfo
|
||||
onlineDisks[peerAddr]++
|
||||
}
|
||||
|
||||
// Iterate over the passed endpoints arguments and check
|
||||
// if there are still disks missing from the offline/online lists
|
||||
// and update them accordingly.
|
||||
missingOfflineDisks := make(map[string]int)
|
||||
for _, zone := range globalEndpoints {
|
||||
for _, endpoint := range zone.Endpoints {
|
||||
if _, ok := offlineDisks[endpoint.Host]; !ok {
|
||||
missingOfflineDisks[endpoint.Host]++
|
||||
}
|
||||
}
|
||||
}
|
||||
for missingDisk, n := range missingOfflineDisks {
|
||||
onlineDisks[missingDisk] = 0
|
||||
offlineDisks[missingDisk] = n
|
||||
}
|
||||
|
||||
// Success.
|
||||
return disksInfo, onlineDisks, offlineDisks
|
||||
}
|
||||
|
||||
// Get an aggregated storage info across all disks.
|
||||
func getStorageInfo(disks []StorageAPI, endpoints Endpoints) StorageInfo {
|
||||
disksInfo, onlineDisks, offlineDisks := getDisksInfo(disks, endpoints)
|
||||
func getStorageInfo(disks []StorageAPI) StorageInfo {
|
||||
disksInfo, onlineDisks, offlineDisks := getDisksInfo(disks)
|
||||
|
||||
// Sort so that the first element is the smallest.
|
||||
sort.Sort(byDiskTotal(disksInfo))
|
||||
@ -183,9 +200,9 @@ func (xl xlObjects) StorageInfo(ctx context.Context, local bool) StorageInfo {
|
||||
disks := xl.getDisks()
|
||||
if local {
|
||||
var localDisks []StorageAPI
|
||||
for i, disk := range disks {
|
||||
for _, disk := range disks {
|
||||
if disk != nil {
|
||||
if xl.endpoints[i].IsLocal && disk.Hostname() == "" {
|
||||
if disk.IsLocal() {
|
||||
// Append this local disk since local flag is true
|
||||
localDisks = append(localDisks, disk)
|
||||
}
|
||||
@ -193,7 +210,7 @@ func (xl xlObjects) StorageInfo(ctx context.Context, local bool) StorageInfo {
|
||||
}
|
||||
disks = localDisks
|
||||
}
|
||||
return getStorageInfo(disks, xl.endpoints)
|
||||
return getStorageInfo(disks)
|
||||
}
|
||||
|
||||
// GetMetrics - is not implemented and shouldn't be called.
|
||||
|
Loading…
x
Reference in New Issue
Block a user