fix: admin info output and improve overall performance (#10015)

- admin info node offline check is now quicker
- admin info now doesn't duplicate the code
  across doing the same checks for disks
- rely on StorageInfo to return appropriate errors
  instead of calling locally.
- diskID checks now return proper errors when
  disk not found v/s format.json missing.
- add more disk states for more clarity on the
  underlying disk errors.
This commit is contained in:
Harshavardhana 2020-07-13 09:51:07 -07:00 committed by GitHub
parent 1d65ef3201
commit e7d7d5232c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 244 additions and 372 deletions

View File

@ -1419,23 +1419,12 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
// Fetching the Storage information, ignore any errors.
storageInfo, _ := objectAPI.StorageInfo(ctx, false)
var OnDisks int
var OffDisks int
var backend interface{}
if storageInfo.Backend.Type == BackendType(madmin.Erasure) {
for _, v := range storageInfo.Backend.OnlineDisks {
OnDisks += v
}
for _, v := range storageInfo.Backend.OfflineDisks {
OffDisks += v
}
backend = madmin.ErasureBackend{
Type: madmin.ErasureType,
OnlineDisks: OnDisks,
OfflineDisks: OffDisks,
OnlineDisks: storageInfo.Backend.OnlineDisks.Sum(),
OfflineDisks: storageInfo.Backend.OfflineDisks.Sum(),
StandardSCData: storageInfo.Backend.StandardSCData,
StandardSCParity: storageInfo.Backend.StandardSCParity,
RRSCData: storageInfo.Backend.RRSCData,
@ -1456,38 +1445,6 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
servers := globalNotificationSys.ServerInfo()
servers = append(servers, server)
for _, sp := range servers {
for i, di := range sp.Disks {
path := ""
if globalIsErasure {
path = di.DrivePath
}
if globalIsDistErasure {
path = sp.Endpoint + di.DrivePath
}
// For distributed
for a := range storageInfo.Backend.Sets {
for b := range storageInfo.Backend.Sets[a] {
ep := storageInfo.Backend.Sets[a][b].Endpoint
if globalIsDistErasure {
if strings.Replace(ep, "http://", "", -1) == path || strings.Replace(ep, "https://", "", -1) == path {
sp.Disks[i].State = storageInfo.Backend.Sets[a][b].State
sp.Disks[i].UUID = storageInfo.Backend.Sets[a][b].UUID
}
}
if globalIsErasure {
if ep == path {
sp.Disks[i].State = storageInfo.Backend.Sets[a][b].State
sp.Disks[i].UUID = storageInfo.Backend.Sets[a][b].UUID
}
}
}
}
}
}
domain := globalDomainNames
services := madmin.Services{
Vault: vault,
@ -1497,6 +1454,21 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
Notifications: notifyTarget,
}
// find all disks which belong to each respective endpoints
for i := range servers {
for _, disk := range storageInfo.Disks {
if strings.Contains(disk.Endpoint, servers[i].Endpoint) {
servers[i].Disks = append(servers[i].Disks, disk)
}
}
}
// add all the disks local to this server.
for _, disk := range storageInfo.Disks {
if disk.Endpoint == disk.DrivePath {
servers[len(servers)-1].Disks = append(servers[len(servers)-1].Disks, disk)
}
}
infoMsg := madmin.InfoMessage{
Mode: mode,
Domain: domain,

View File

@ -18,16 +18,13 @@ package cmd
import (
"net/http"
"os"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/madmin"
)
// getLocalServerProperty - returns madmin.ServerProperties for only the
// local endpoints from given list of endpoints
func getLocalServerProperty(endpointZones EndpointZones, r *http.Request) madmin.ServerProperties {
var disks []madmin.Disk
addr := r.Host
if globalIsDistErasure {
addr = GetLocalPeer(endpointZones)
@ -42,33 +39,14 @@ func getLocalServerProperty(endpointZones EndpointZones, r *http.Request) madmin
if endpoint.IsLocal {
// Only proceed for local endpoints
network[nodeName] = "online"
var di = madmin.Disk{
DrivePath: endpoint.Path,
}
diInfo, err := disk.GetInfo(endpoint.Path)
if err != nil {
if os.IsNotExist(err) || isSysErrPathNotFound(err) {
di.State = madmin.DriveStateMissing
} else {
di.State = madmin.DriveStateCorrupt
}
continue
}
_, present := network[nodeName]
if !present {
if err := IsServerResolvable(endpoint); err == nil {
network[nodeName] = "online"
} else {
di.State = madmin.DriveStateOk
di.DrivePath = endpoint.Path
di.TotalSpace = diInfo.Total
di.UsedSpace = diInfo.Total - diInfo.Free
di.Utilization = float64((diInfo.Total - diInfo.Free) / diInfo.Total * 100)
}
disks = append(disks, di)
} else {
_, present := network[nodeName]
if !present {
err := IsServerResolvable(endpoint)
if err == nil {
network[nodeName] = "online"
} else {
network[nodeName] = "offline"
}
network[nodeName] = "offline"
}
}
}
@ -81,6 +59,5 @@ func getLocalServerProperty(endpointZones EndpointZones, r *http.Request) madmin
Version: Version,
CommitID: CommitID,
Network: network,
Disks: disks,
}
}

View File

@ -395,10 +395,7 @@ func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Used = append(storageInfo.Used, lstorageInfo.Used...)
storageInfo.Total = append(storageInfo.Total, lstorageInfo.Total...)
storageInfo.Available = append(storageInfo.Available, lstorageInfo.Available...)
storageInfo.MountPaths = append(storageInfo.MountPaths, lstorageInfo.MountPaths...)
storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...)
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks.Merge(lstorageInfo.Backend.OnlineDisks)
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks)
}
@ -438,10 +435,7 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo,
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Used = append(storageInfo.Used, lstorageInfo.Used...)
storageInfo.Total = append(storageInfo.Total, lstorageInfo.Total...)
storageInfo.Available = append(storageInfo.Available, lstorageInfo.Available...)
storageInfo.MountPaths = append(storageInfo.MountPaths, lstorageInfo.MountPaths...)
storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...)
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks.Merge(lstorageInfo.Backend.OnlineDisks)
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks)
}
@ -457,57 +451,12 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo,
storageInfo.Backend.RRSCData = s.drivesPerSet - rrSCParity
storageInfo.Backend.RRSCParity = rrSCParity
storageInfo.Backend.Sets = make([][]madmin.DriveInfo, s.setCount)
for i := range storageInfo.Backend.Sets {
storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet)
}
if local {
// if local is true, we are not interested in the drive UUID info.
// this is called primarily by prometheus
return storageInfo, nil
}
for i, set := range s.sets {
storageDisks := set.getDisks()
endpointStrings := set.getEndpoints()
for j, storageErr := range storageInfoErrs[i] {
if storageDisks[j] == OfflineDisk {
storageInfo.Backend.Sets[i][j] = madmin.DriveInfo{
State: madmin.DriveStateOffline,
Endpoint: endpointStrings[j],
}
continue
}
var diskID string
if storageErr == nil {
// No errors returned by storage, look for its DiskID()
diskID, storageErr = storageDisks[j].GetDiskID()
}
if storageErr == nil {
storageInfo.Backend.Sets[i][j] = madmin.DriveInfo{
State: madmin.DriveStateOk,
Endpoint: storageDisks[j].String(),
UUID: diskID,
}
continue
}
if storageErr == errUnformattedDisk {
storageInfo.Backend.Sets[i][j] = madmin.DriveInfo{
State: madmin.DriveStateUnformatted,
Endpoint: storageDisks[j].String(),
UUID: "",
}
} else {
storageInfo.Backend.Sets[i][j] = madmin.DriveInfo{
State: madmin.DriveStateCorrupt,
Endpoint: storageDisks[j].String(),
UUID: "",
}
}
}
}
var errs []error
for i := range s.sets {
errs = append(errs, storageInfoErrs[i]...)
@ -1195,8 +1144,8 @@ else
fi
*/
func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs []error) (beforeDrives []madmin.DriveInfo) {
beforeDrives = make([]madmin.DriveInfo, len(endpoints))
func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs []error) (beforeDrives []madmin.HealDriveInfo) {
beforeDrives = make([]madmin.HealDriveInfo, len(endpoints))
// Existing formats are available (i.e. ok), so save it in
// result, also populate disks to be healed.
for i, format := range formats {
@ -1210,7 +1159,7 @@ func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs
case sErrs[i] == errDiskNotFound:
state = madmin.DriveStateOffline
}
beforeDrives[i] = madmin.DriveInfo{
beforeDrives[i] = madmin.HealDriveInfo{
UUID: func() string {
if format != nil {
return format.Erasure.This

View File

@ -149,12 +149,10 @@ func (z *erasureZones) getZonesAvailableSpace(ctx context.Context, size int64) z
for i, zinfo := range storageInfos {
var available uint64
for _, davailable := range zinfo.Available {
available += davailable
}
var total uint64
for _, dtotal := range zinfo.Total {
total += dtotal
for _, disk := range zinfo.Disks {
total += disk.TotalSpace
available += disk.TotalSpace - disk.UsedSpace
}
// Make sure we can fit "size" on to the disk without getting above the diskFillFraction
if available < uint64(size) {
@ -260,13 +258,9 @@ func (z *erasureZones) StorageInfo(ctx context.Context, local bool) (StorageInfo
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Used = append(storageInfo.Used, lstorageInfo.Used...)
storageInfo.Total = append(storageInfo.Total, lstorageInfo.Total...)
storageInfo.Available = append(storageInfo.Available, lstorageInfo.Available...)
storageInfo.MountPaths = append(storageInfo.MountPaths, lstorageInfo.MountPaths...)
storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...)
storageInfo.Backend.OnlineDisks = storageInfo.Backend.OnlineDisks.Merge(lstorageInfo.Backend.OnlineDisks)
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks)
storageInfo.Backend.Sets = append(storageInfo.Backend.Sets, lstorageInfo.Backend.Sets...)
}
storageInfo.Backend.Type = storageInfos[0].Backend.Type

View File

@ -79,17 +79,36 @@ func (er erasureObjects) Shutdown(ctx context.Context) error {
}
// byDiskTotal is a collection satisfying sort.Interface.
type byDiskTotal []DiskInfo
type byDiskTotal []madmin.Disk
func (d byDiskTotal) Len() int { return len(d) }
func (d byDiskTotal) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
func (d byDiskTotal) Less(i, j int) bool {
return d[i].Total < d[j].Total
return d[i].TotalSpace < d[j].TotalSpace
}
func diskErrToDriveState(err error) (state string) {
state = madmin.DriveStateUnknown
switch err {
case errDiskNotFound:
state = madmin.DriveStateOffline
case errCorruptedFormat:
state = madmin.DriveStateCorrupt
case errUnformattedDisk:
state = madmin.DriveStateUnformatted
case errDiskAccessDenied:
state = madmin.DriveStatePermission
case errFaultyDisk:
state = madmin.DriveStateFaulty
case nil:
state = madmin.DriveStateOk
}
return
}
// getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
disksInfo = make([]DiskInfo, len(disks))
func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Disk, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
disksInfo = make([]madmin.Disk, len(disks))
onlineDisks = make(madmin.BackendDisks)
offlineDisks = make(madmin.BackendDisks)
@ -107,6 +126,10 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []DiskInfo,
index := index
g.Go(func() error {
if disks[index] == OfflineDisk {
disksInfo[index] = madmin.Disk{
State: diskErrToDriveState(errDiskNotFound),
Endpoint: endpoints[index],
}
// Storage disk is empty, perhaps ignored disk or not available.
return errDiskNotFound
}
@ -117,10 +140,20 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []DiskInfo,
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
}
return err
}
disksInfo[index] = info
return nil
di := madmin.Disk{
Endpoint: endpoints[index],
DrivePath: info.MountPath,
TotalSpace: info.Total,
UsedSpace: info.Used,
UUID: info.ID,
State: diskErrToDriveState(err),
}
if info.Total > 0 {
di.Utilization = float64(info.Used / info.Total * 100)
}
disksInfo[index] = di
return err
}, index)
}
@ -146,24 +179,8 @@ func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []erro
// Sort so that the first element is the smallest.
sort.Sort(byDiskTotal(disksInfo))
// Combine all disks to get total usage
usedList := make([]uint64, len(disksInfo))
totalList := make([]uint64, len(disksInfo))
availableList := make([]uint64, len(disksInfo))
mountPaths := make([]string, len(disksInfo))
for i, di := range disksInfo {
usedList[i] = di.Used
totalList[i] = di.Total
availableList[i] = di.Free
mountPaths[i] = di.MountPath
}
storageInfo := StorageInfo{
Used: usedList,
Total: totalList,
Available: availableList,
MountPaths: mountPaths,
Disks: disksInfo,
}
storageInfo.Backend.Type = BackendErasure

View File

@ -53,9 +53,6 @@ var defaultEtag = "00000000000000000000000000000000-1"
type FSObjects struct {
GatewayUnsupported
// Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// The count of concurrent calls on FSObjects API
activeIOCount int64
// The active IO count ceiling for crawling to work
@ -215,15 +212,15 @@ func (fs *FSObjects) StorageInfo(ctx context.Context, _ bool) (StorageInfo, []er
return StorageInfo{}, []error{err}
}
used := di.Total - di.Free
if !fs.diskMount {
used = atomic.LoadUint64(&fs.totalUsed)
}
storageInfo := StorageInfo{
Used: []uint64{used},
Total: []uint64{di.Total},
Available: []uint64{di.Free},
MountPaths: []string{fs.fsPath},
Disks: []madmin.Disk{
{
TotalSpace: di.Total,
UsedSpace: used,
AvailableSpace: di.Free,
DrivePath: fs.fsPath,
},
},
}
storageInfo.Backend.Type = BackendFS
return storageInfo, nil

View File

@ -39,6 +39,7 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
krb "gopkg.in/jcmturner/gokrb5.v7/client"
"gopkg.in/jcmturner/gokrb5.v7/config"
@ -210,7 +211,9 @@ func (n *hdfsObjects) StorageInfo(ctx context.Context, _ bool) (si minio.Storage
if err != nil {
return minio.StorageInfo{}, []error{err}
}
si.Used = []uint64{fsInfo.Used}
si.Disks = []madmin.Disk{{
UsedSpace: fsInfo.Used,
}}
si.Backend.Type = minio.BackendGateway
si.Backend.GatewayOnline = true
return si, nil

View File

@ -464,10 +464,7 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
float64(totalDisks.Sum()),
)
for i := 0; i < len(storageInfo.Total); i++ {
mountPath, total, free := storageInfo.MountPaths[i], storageInfo.Total[i],
storageInfo.Available[i]
for _, disk := range storageInfo.Disks {
// Total disk usage by the disk
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
@ -475,8 +472,8 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total disk storage used on the disk",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(total-free),
mountPath,
float64(disk.UsedSpace),
disk.DrivePath,
)
// Total available space in the disk
@ -486,8 +483,8 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total available space left on the disk",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(free),
mountPath,
float64(disk.AvailableSpace),
disk.DrivePath,
)
// Total storage space of the disk
@ -497,8 +494,8 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) {
"Total space on the disk",
[]string{"disk"}, nil),
prometheus.GaugeValue,
float64(total),
mountPath,
float64(disk.TotalSpace),
disk.DrivePath,
)
}
}

View File

@ -43,13 +43,7 @@ const (
// StorageInfo - represents total capacity of underlying storage.
type StorageInfo struct {
Used []uint64 // Used total used per disk.
Total []uint64 // Total disk space per disk.
Available []uint64 // Total disk space available per disk.
MountPaths []string // Disk mountpoints
Disks []madmin.Disk
// Backend type.
Backend struct {
@ -66,9 +60,6 @@ type StorageInfo struct {
StandardSCParity int // Parity disks for currently configured Standard storage class.
RRSCData int // Data disks for currently configured Reduced Redundancy storage class.
RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class.
// List of all disk status, this is only meaningful if BackendType is Erasure.
Sets [][]madmin.DriveInfo
}
}

View File

@ -30,7 +30,6 @@ import (
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -183,7 +182,21 @@ func IsServerResolvable(endpoint Endpoint) error {
}
httpClient := &http.Client{
Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(),
Transport:
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(3 * time.Second),
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
},
}
defer httpClient.CloseIdleConnections()

View File

@ -190,7 +190,13 @@ func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) {
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
if err != nil {
return info, err
}
if info.Error != "" {
return info, toStorageErr(errors.New(info.Error))
}
return info, nil
}
// MakeVolBulk - create multiple volumes in a bulk operation.

View File

@ -134,8 +134,7 @@ func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Reque
}
info, err := s.storage.DiskInfo()
if err != nil {
s.writeErrorResponse(w, err)
return
info.Error = err.Error()
}
defer w.(http.Flusher).Flush()
gob.NewEncoder(w).Encode(info)

View File

@ -36,16 +36,10 @@ import (
///////////////////////////////////////////////////////////////////////////////
func testStorageAPIDiskInfo(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
testCases := []struct {
expectErr bool
}{
{false},
{true},
}
for i, testCase := range testCases {
@ -55,16 +49,13 @@ func testStorageAPIDiskInfo(t *testing.T, storage StorageAPI) {
if expectErr != testCase.expectErr {
t.Fatalf("case %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if err != errUnformattedDisk {
t.Fatalf("case %v: error: expected: %v, got: %v", i+1, errUnformattedDisk, err)
}
}
}
func testStorageAPIMakeVol(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
testCases := []struct {
volumeName string
expectErr bool
@ -85,12 +76,6 @@ func testStorageAPIMakeVol(t *testing.T, storage StorageAPI) {
}
func testStorageAPIListVols(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
testCases := []struct {
volumeNames []string
expectedResult []VolInfo
@ -124,12 +109,6 @@ func testStorageAPIListVols(t *testing.T, storage StorageAPI) {
}
func testStorageAPIStatVol(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -161,12 +140,6 @@ func testStorageAPIStatVol(t *testing.T, storage StorageAPI) {
}
func testStorageAPIDeleteVol(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -192,12 +165,6 @@ func testStorageAPIDeleteVol(t *testing.T, storage StorageAPI) {
}
func testStorageAPICheckFile(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -228,12 +195,6 @@ func testStorageAPICheckFile(t *testing.T, storage StorageAPI) {
}
func testStorageAPIListDir(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -271,12 +232,6 @@ func testStorageAPIListDir(t *testing.T, storage StorageAPI) {
}
func testStorageAPIReadAll(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -314,12 +269,6 @@ func testStorageAPIReadAll(t *testing.T, storage StorageAPI) {
}
func testStorageAPIReadFile(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -361,12 +310,6 @@ func testStorageAPIReadFile(t *testing.T, storage StorageAPI) {
}
func testStorageAPIAppendFile(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -395,12 +338,6 @@ func testStorageAPIAppendFile(t *testing.T, storage StorageAPI) {
}
func testStorageAPIDeleteFile(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -434,12 +371,6 @@ func testStorageAPIDeleteFile(t *testing.T, storage StorageAPI) {
}
func testStorageAPIRenameFile(t *testing.T, storage StorageAPI) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
globalServerConfig = newServerConfig()
err := storage.MakeVol("foo")
if err != nil {
t.Fatalf("unexpected error %v", err)
@ -518,9 +449,11 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
Endpoints: Endpoints{endpoint},
}})
restClient := newStorageRESTClient(endpoint)
prevGlobalServerConfig := globalServerConfig
globalServerConfig = newServerConfig()
lookupConfigs(globalServerConfig)
restClient := newStorageRESTClient(endpoint)
return httpServer, restClient, prevGlobalServerConfig, endpointPath
}

View File

@ -48,8 +48,8 @@ func (p *xlStorageDiskIDCheck) Hostname() string {
}
func (p *xlStorageDiskIDCheck) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
if p.isDiskStale() {
return dataUsageCache{}, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return dataUsageCache{}, err
}
return p.storage.CrawlAndGetDataUsage(ctx, cache)
}
@ -66,157 +66,178 @@ func (p *xlStorageDiskIDCheck) SetDiskID(id string) {
p.diskID = id
}
func (p *xlStorageDiskIDCheck) isDiskStale() bool {
func (p *xlStorageDiskIDCheck) checkDiskStale() error {
if p.diskID == "" {
// For empty disk-id we allow the call as the server might be coming up and trying to read format.json
// or create format.json
return false
// For empty disk-id we allow the call as the server might be
// coming up and trying to read format.json or create format.json
return nil
}
storedDiskID, err := p.storage.GetDiskID()
if err == nil && p.diskID == storedDiskID {
return false
if err != nil {
// return any error generated while reading `format.json`
return err
}
return true
if err == nil && p.diskID == storedDiskID {
return nil
}
// not the same disk we remember, take it offline.
return errDiskNotFound
}
func (p *xlStorageDiskIDCheck) DiskInfo() (info DiskInfo, err error) {
if p.isDiskStale() {
info, err = p.storage.DiskInfo()
if err != nil {
return info, err
}
if p.diskID != info.ID {
return info, errDiskNotFound
}
return p.storage.DiskInfo()
return info, nil
}
func (p *xlStorageDiskIDCheck) MakeVolBulk(volumes ...string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.MakeVolBulk(volumes...)
}
func (p *xlStorageDiskIDCheck) MakeVol(volume string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.MakeVol(volume)
}
func (p *xlStorageDiskIDCheck) ListVols() ([]VolInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.ListVols()
}
func (p *xlStorageDiskIDCheck) StatVol(volume string) (vol VolInfo, err error) {
if p.isDiskStale() {
return vol, errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return vol, err
}
return p.storage.StatVol(volume)
}
func (p *xlStorageDiskIDCheck) DeleteVol(volume string, forceDelete bool) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.DeleteVol(volume, forceDelete)
}
func (p *xlStorageDiskIDCheck) WalkVersions(volume, dirPath string, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.WalkVersions(volume, dirPath, marker, recursive, endWalkCh)
}
func (p *xlStorageDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.Walk(volume, dirPath, marker, recursive, endWalkCh)
}
func (p *xlStorageDiskIDCheck) WalkSplunk(volume, dirPath string, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.WalkSplunk(volume, dirPath, marker, endWalkCh)
}
func (p *xlStorageDiskIDCheck) ListDir(volume, dirPath string, count int) ([]string, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.ListDir(volume, dirPath, count)
}
func (p *xlStorageDiskIDCheck) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
if p.isDiskStale() {
return 0, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return 0, err
}
return p.storage.ReadFile(volume, path, offset, buf, verifier)
}
func (p *xlStorageDiskIDCheck) AppendFile(volume string, path string, buf []byte) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.AppendFile(volume, path, buf)
}
func (p *xlStorageDiskIDCheck) CreateFile(volume, path string, size int64, reader io.Reader) error {
if p.isDiskStale() {
return errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return err
}
return p.storage.CreateFile(volume, path, size, reader)
}
func (p *xlStorageDiskIDCheck) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.ReadFileStream(volume, path, offset, length)
}
func (p *xlStorageDiskIDCheck) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
if p.isDiskStale() {
return errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return err
}
return p.storage.RenameFile(srcVolume, srcPath, dstVolume, dstPath)
}
func (p *xlStorageDiskIDCheck) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath string) error {
if p.isDiskStale() {
return errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return err
}
return p.storage.RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath)
}
func (p *xlStorageDiskIDCheck) CheckParts(volume string, path string, fi FileInfo) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.CheckParts(volume, path, fi)
}
func (p *xlStorageDiskIDCheck) CheckFile(volume string, path string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.CheckFile(volume, path)
}
func (p *xlStorageDiskIDCheck) DeleteFile(volume string, path string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.DeleteFile(volume, path)
}
func (p *xlStorageDiskIDCheck) DeleteVersions(volume string, versions []FileInfo) (errs []error) {
if p.isDiskStale() {
if err := p.checkDiskStale(); err != nil {
errs = make([]error, len(versions))
for i := range errs {
errs[i] = errDiskNotFound
errs[i] = err
}
return errs
}
@ -224,43 +245,49 @@ func (p *xlStorageDiskIDCheck) DeleteVersions(volume string, versions []FileInfo
}
func (p *xlStorageDiskIDCheck) VerifyFile(volume, path string, fi FileInfo) error {
if p.isDiskStale() {
return errDiskNotFound
if err := p.checkDiskStale(); err != nil {
return err
}
return p.storage.VerifyFile(volume, path, fi)
}
func (p *xlStorageDiskIDCheck) WriteAll(volume string, path string, reader io.Reader) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.WriteAll(volume, path, reader)
}
func (p *xlStorageDiskIDCheck) DeleteVersion(volume, path string, fi FileInfo) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.DeleteVersion(volume, path, fi)
}
func (p *xlStorageDiskIDCheck) WriteMetadata(volume, path string, fi FileInfo) (err error) {
if p.isDiskStale() {
return errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return err
}
return p.storage.WriteMetadata(volume, path, fi)
}
func (p *xlStorageDiskIDCheck) ReadVersion(volume, path, versionID string) (fi FileInfo, err error) {
if p.isDiskStale() {
return fi, errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return fi, err
}
return p.storage.ReadVersion(volume, path, versionID)
}
func (p *xlStorageDiskIDCheck) ReadAll(volume string, path string) (buf []byte, err error) {
if p.isDiskStale() {
return nil, errDiskNotFound
if err = p.checkDiskStale(); err != nil {
return nil, err
}
return p.storage.ReadAll(volume, path)
}

View File

@ -86,9 +86,6 @@ func isValidVolname(volname string) bool {
// xlStorage - implements StorageAPI interface.
type xlStorage struct {
// Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
maxActiveIOCount int32
activeIOCount int32
@ -408,7 +405,10 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
var totalSize int64
for _, version := range fivs.Versions {
size := item.applyActions(ctx, objAPI, actionMeta{numVersions: len(fivs.Versions), oi: version.ToObjectInfo(item.bucket, item.objectPath())})
size := item.applyActions(ctx, objAPI, actionMeta{
numVersions: len(fivs.Versions),
oi: version.ToObjectInfo(item.bucket, item.objectPath()),
})
if !version.Deleted {
totalSize += size
}
@ -422,12 +422,6 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
}
dataUsageInfo.Info.LastUpdate = time.Now()
total := dataUsageInfo.sizeRecursive(dataUsageInfo.Info.Name)
if total == nil {
total = &dataUsageEntry{}
}
atomic.StoreUint64(&s.totalUsed, uint64(total.Size))
return dataUsageInfo, nil
}
@ -438,8 +432,10 @@ type DiskInfo struct {
Free uint64
Used uint64
RootDisk bool
Endpoint string
MountPath string
Error string // reports any error returned by underlying disk
ID string
Error string // carries the error over the network
}
// DiskInfo provides current information about disk space usage,
@ -455,23 +451,22 @@ func (s *xlStorage) DiskInfo() (info DiskInfo, err error) {
return info, err
}
used := di.Total - di.Free
if !s.diskMount {
used = atomic.LoadUint64(&s.totalUsed)
}
rootDisk, err := disk.IsRootDisk(s.diskPath)
if err != nil {
return info, err
}
return DiskInfo{
info = DiskInfo{
Total: di.Total,
Free: di.Free,
Used: used,
Used: di.Total - di.Free,
RootDisk: rootDisk,
MountPath: s.diskPath,
}, nil
}
diskID, err := s.GetDiskID()
info.ID = diskID
return info, err
}
// getVolDir - will convert incoming volume names to
@ -512,7 +507,17 @@ func (s *xlStorage) GetDiskID() (string, error) {
if err != nil {
// If the disk is still not initialized.
if os.IsNotExist(err) {
return "", errUnformattedDisk
_, err = os.Stat(s.diskPath)
if err == nil {
// Disk is present by missing `format.json`
return "", errUnformattedDisk
}
if os.IsNotExist(err) {
return "", errDiskNotFound
} else if os.IsPermission(err) {
return "", errDiskAccessDenied
}
return "", err
}
return "", errCorruptedFormat
}

View File

@ -1,5 +1,5 @@
/*
* MinIO Cloud Storage, (C) 2017, 2018 MinIO, Inc.
* MinIO Cloud Storage, (C) 2017-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -99,6 +99,9 @@ const (
DriveStateOffline = "offline"
DriveStateCorrupt = "corrupt"
DriveStateMissing = "missing"
DriveStatePermission = "permission-denied"
DriveStateFaulty = "faulty"
DriveStateUnknown = "unknown"
DriveStateUnformatted = "unformatted" // only returned by disk
)

View File

@ -39,19 +39,9 @@ const (
// Add your own backend.
)
// DriveInfo - represents each drive info, describing
// status, uuid and endpoint.
type DriveInfo HealDriveInfo
// StorageInfo - represents total capacity of underlying storage.
type StorageInfo struct {
Used []uint64 // Used total used per disk.
Total []uint64 // Total disk space per disk.
Available []uint64 // Total disk space available per disk.
MountPaths []string // Disk mountpoints
Disks []Disk
// Backend type.
Backend struct {
@ -65,9 +55,6 @@ type StorageInfo struct {
StandardSCParity int // Parity disks for currently configured Standard storage class.
RRSCData int // Data disks for currently configured Reduced Redundancy storage class.
RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class.
// List of all disk status, this is only meaningful if BackendType is Erasure.
Sets [][]DriveInfo
}
}
@ -282,12 +269,14 @@ type ServerProperties struct {
// Disk holds Disk information
type Disk struct {
Endpoint string `json:"endpoint,omitempty"`
DrivePath string `json:"path,omitempty"`
State string `json:"state,omitempty"`
UUID string `json:"uuid,omitempty"`
Model string `json:"model,omitempty"`
TotalSpace uint64 `json:"totalspace,omitempty"`
UsedSpace uint64 `json:"usedspace,omitempty"`
AvailableSpace uint64 `json:"availspace,omitempty"`
ReadThroughput float64 `json:"readthroughput,omitempty"`
WriteThroughPut float64 `json:"writethroughput,omitempty"`
ReadLatency float64 `json:"readlatency,omitempty"`