further bootstrap/startup optimization for reading 'format.json' (#18868)

- Move RenameFile to websockets
- Move ReadAll that is primarily is used
  for reading 'format.json' to to websockets
- Optimize DiskInfo calls, and provide a way
  to make a NoOp DiskInfo call.
This commit is contained in:
Harshavardhana 2024-01-25 12:45:46 -08:00 committed by GitHub
parent e377bb949a
commit 74851834c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 998 additions and 111 deletions

View File

@ -42,7 +42,7 @@ func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
if disks[i] == nil { if disks[i] == nil {
return return
} }
di, err := disks[i].DiskInfo(context.Background(), false) di, err := disks[i].DiskInfo(context.Background(), DiskInfoOptions{})
if err != nil || di.Healing { if err != nil || di.Healing {
// - Do not consume disks which are not reachable // - Do not consume disks which are not reachable
// unformatted or simply not accessible for some reason. // unformatted or simply not accessible for some reason.

View File

@ -123,10 +123,10 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, []byte, e
return nil, nil, nil, err return nil, nil, nil, err
} }
format, formatData, err := loadFormatErasureWithData(disk) format, formatData, err := loadFormatErasureWithData(disk, false)
if err != nil { if err != nil {
if errors.Is(err, errUnformattedDisk) { if errors.Is(err, errUnformattedDisk) {
info, derr := disk.DiskInfo(context.TODO(), false) info, derr := disk.DiskInfo(context.TODO(), DiskInfoOptions{})
if derr != nil && info.RootDisk { if derr != nil && info.RootDisk {
disk.Close() disk.Close()
return nil, nil, nil, fmt.Errorf("Drive: %s is a root drive", disk) return nil, nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)

View File

@ -178,7 +178,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []Endpoint, metrics bool) (disks
disksInfo[index] = di disksInfo[index] = di
return nil return nil
} }
info, err := disks[index].DiskInfo(context.TODO(), metrics) info, err := disks[index].DiskInfo(context.TODO(), DiskInfoOptions{Metrics: metrics})
di.DrivePath = info.MountPath di.DrivePath = info.MountPath
di.TotalSpace = info.Total di.TotalSpace = info.Total
di.UsedSpace = info.Used di.UsedSpace = info.Used
@ -290,7 +290,7 @@ func (er erasureObjects) getOnlineDisksWithHealingAndInfo(inclHealing bool) (new
return return
} }
di, err := disk.DiskInfo(context.Background(), false) di, err := disk.DiskInfo(context.Background(), DiskInfoOptions{})
infos[i] = di infos[i] = di
if err != nil { if err != nil {
// - Do not consume disks which are not reachable // - Do not consume disks which are not reachable

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc. // Copyright (c) 2015-2024 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //
@ -28,6 +28,7 @@ import (
"sync" "sync"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/color" "github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
@ -325,15 +326,10 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
if storageDisks[index] == nil { if storageDisks[index] == nil {
return errDiskNotFound return errDiskNotFound
} }
format, formatData, err := loadFormatErasureWithData(storageDisks[index]) format, formatData, err := loadFormatErasureWithData(storageDisks[index], heal)
if err != nil { if err != nil {
return err return err
} }
info, err := storageDisks[index].DiskInfo(context.Background(), false)
if err != nil {
return err
}
format.Info = info
formats[index] = format formats[index] = format
if !heal { if !heal {
// If no healing required, make the disks valid and // If no healing required, make the disks valid and
@ -389,14 +385,7 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
} }
// loadFormatErasureWithData - loads format.json from disk. // loadFormatErasureWithData - loads format.json from disk.
func loadFormatErasureWithData(disk StorageAPI) (format *formatErasureV3, data []byte, err error) { func loadFormatErasureWithData(disk StorageAPI, heal bool) (format *formatErasureV3, data []byte, err error) {
// Ensure that the grid is online.
if _, err := disk.DiskInfo(context.Background(), false); err != nil {
if errors.Is(err, errDiskNotFound) {
return nil, nil, err
}
}
data, err = disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile) data, err = disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile)
if err != nil { if err != nil {
// 'file not found' and 'volume not found' as // 'file not found' and 'volume not found' as
@ -413,18 +402,20 @@ func loadFormatErasureWithData(disk StorageAPI) (format *formatErasureV3, data [
return nil, nil, err return nil, nil, err
} }
if heal {
info, err := disk.DiskInfo(context.Background(), DiskInfoOptions{NoOp: heal})
if err != nil {
return nil, nil, err
}
format.Info = info
}
// Success. // Success.
return format, data, nil return format, data, nil
} }
// loadFormatErasure - loads format.json from disk. // loadFormatErasure - loads format.json from disk.
func loadFormatErasure(disk StorageAPI) (format *formatErasureV3, err error) { func loadFormatErasure(disk StorageAPI) (format *formatErasureV3, err error) {
// Ensure that the grid is online.
if _, err := disk.DiskInfo(context.Background(), false); err != nil {
if errors.Is(err, errDiskNotFound) {
return nil, err
}
}
buf, err := disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile) buf, err := disk.ReadAll(context.TODO(), minioMetaBucket, formatConfigFile)
if err != nil { if err != nil {
// 'file not found' and 'volume not found' as // 'file not found' and 'volume not found' as
@ -435,6 +426,8 @@ func loadFormatErasure(disk StorageAPI) (format *formatErasureV3, err error) {
return nil, err return nil, err
} }
json := jsoniter.ConfigCompatibleWithStandardLibrary
// Try to decode format json into formatConfigV1 struct. // Try to decode format json into formatConfigV1 struct.
format = &formatErasureV3{} format = &formatErasureV3{}
if err = json.Unmarshal(buf, format); err != nil { if err = json.Unmarshal(buf, format); err != nil {
@ -863,7 +856,9 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int,
newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo
} }
if format := formats[i*setDriveCount+j]; format != nil && (errs[i*setDriveCount+j] == nil) { if format := formats[i*setDriveCount+j]; format != nil && (errs[i*setDriveCount+j] == nil) {
currentDisksInfo[i][j] = format.Info if format.Info.Endpoint != "" {
currentDisksInfo[i][j] = format.Info
}
} }
} }
} }

View File

@ -156,7 +156,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
} }
} }
info, err := tracker.disk.DiskInfo(ctx, false) info, err := tracker.disk.DiskInfo(ctx, DiskInfoOptions{})
if err != nil { if err != nil {
return fmt.Errorf("unable to get disk information before healing it: %w", err) return fmt.Errorf("unable to get disk information before healing it: %w", err)
} }

View File

@ -257,7 +257,7 @@ func collectDriveMetrics(m madmin.RealtimeMetrics) {
for _, d := range localDrives { for _, d := range localDrives {
labels := map[string]string{"drive": d.Endpoint().RawPath} labels := map[string]string{"drive": d.Endpoint().RawPath}
di, err := d.DiskInfo(GlobalContext, false) di, err := d.DiskInfo(GlobalContext, DiskInfoOptions{})
if err == nil { if err == nil {
updateResourceMetrics(driveSubsystem, usedBytes, float64(di.Used), labels, false) updateResourceMetrics(driveSubsystem, usedBytes, float64(di.Used), labels, false)
updateResourceMetrics(driveSubsystem, totalBytes, float64(di.Total), labels, false) updateResourceMetrics(driveSubsystem, totalBytes, float64(di.Total), labels, false)

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc. // Copyright (c) 2015-2024 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //
@ -123,11 +123,11 @@ func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updat
return d.disk.NSScanner(ctx, cache, updates, scanMode, weSleep) return d.disk.NSScanner(ctx, cache, updates, scanMode, weSleep)
} }
func (d *naughtyDisk) DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) { func (d *naughtyDisk) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return info, err return info, err
} }
return d.disk.DiskInfo(ctx, metrics) return d.disk.DiskInfo(ctx, opts)
} }
func (d *naughtyDisk) MakeVolBulk(ctx context.Context, volumes ...string) (err error) { func (d *naughtyDisk) MakeVolBulk(ctx context.Context, volumes ...string) (err error) {

View File

@ -1157,11 +1157,12 @@ func compressSelfTest() {
// If a disk is nil or an error is returned the result will be nil as well. // If a disk is nil or an error is returned the result will be nil as well.
func getDiskInfos(ctx context.Context, disks ...StorageAPI) []*DiskInfo { func getDiskInfos(ctx context.Context, disks ...StorageAPI) []*DiskInfo {
res := make([]*DiskInfo, len(disks)) res := make([]*DiskInfo, len(disks))
opts := DiskInfoOptions{}
for i, disk := range disks { for i, disk := range disks {
if disk == nil { if disk == nil {
continue continue
} }
if di, err := disk.DiskInfo(ctx, false); err == nil { if di, err := disk.DiskInfo(ctx, opts); err == nil {
res[i] = &di res[i] = &di
} }
} }

View File

@ -883,7 +883,7 @@ func canWeRestartNode() map[string]DiskMetrics {
errs := make([]error, len(globalLocalDrives)) errs := make([]error, len(globalLocalDrives))
infos := make([]DiskInfo, len(globalLocalDrives)) infos := make([]DiskInfo, len(globalLocalDrives))
for i, drive := range globalLocalDrives { for i, drive := range globalLocalDrives {
infos[i], errs[i] = drive.DiskInfo(GlobalContext, false) infos[i], errs[i] = drive.DiskInfo(GlobalContext, DiskInfoOptions{})
} }
infoMaps := make(map[string]DiskMetrics) infoMaps := make(map[string]DiskMetrics)
for i := range infos { for i := range infos {

View File

@ -188,6 +188,7 @@ func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpo
// Attempt to load all `format.json` from all disks. // Attempt to load all `format.json` from all disks.
formatConfigs, sErrs := loadFormatErasureAll(storageDisks, false) formatConfigs, sErrs := loadFormatErasureAll(storageDisks, false)
// Check if we have // Check if we have
for i, sErr := range sErrs { for i, sErr := range sErrs {
// print the error, nonetheless, which is perhaps unhandled // print the error, nonetheless, which is perhaps unhandled
@ -271,9 +272,12 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, poolCount, setCou
return time.Now().Round(time.Second).Sub(formatStartTime).String() return time.Now().Round(time.Second).Sub(formatStartTime).String()
} }
var tries int var (
var verboseLogging bool tries int
storageDisks, format, err := connectLoadInitFormats(verboseLogging, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo) verbose bool
)
storageDisks, format, err := connectLoadInitFormats(verbose, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo)
if err == nil { if err == nil {
return storageDisks, format, nil return storageDisks, format, nil
} }
@ -286,12 +290,12 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, poolCount, setCou
for { for {
// Only log once every 10 iterations, then reset the tries count. // Only log once every 10 iterations, then reset the tries count.
verboseLogging = tries >= 10 verbose = tries >= 10
if verboseLogging { if verbose {
tries = 1 tries = 1
} }
storageDisks, format, err := connectLoadInitFormats(verboseLogging, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo) storageDisks, format, err := connectLoadInitFormats(verbose, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo)
if err == nil { if err == nil {
return storageDisks, format, nil return storageDisks, format, nil
} }

View File

@ -37,6 +37,13 @@ type RenameOptions struct {
BaseOptions BaseOptions
} }
// DiskInfoOptions options for requesting custom results.
type DiskInfoOptions struct {
DiskID string `msg:"id"`
Metrics bool `msg:"m"`
NoOp bool `msg:"np"`
}
//go:generate msgp -file=$GOFILE //go:generate msgp -file=$GOFILE
// DiskInfo is an extended type which returns current // DiskInfo is an extended type which returns current
@ -423,6 +430,22 @@ type RenameDataHandlerParams struct {
Opts RenameOptions `msg:"ro"` Opts RenameOptions `msg:"ro"`
} }
// RenameFileHandlerParams are parameters for RenameFileHandler.
type RenameFileHandlerParams struct {
DiskID string `msg:"id"`
SrcVolume string `msg:"sv"`
SrcFilePath string `msg:"sp"`
DstVolume string `msg:"dv"`
DstFilePath string `msg:"dp"`
}
// ReadAllHandlerParams are parameters for ReadAllHandler.
type ReadAllHandlerParams struct {
DiskID string `msg:"id"`
Volume string `msg:"v"`
FilePath string `msg:"fp"`
}
// RenameDataResp - RenameData()'s response. // RenameDataResp - RenameData()'s response.
type RenameDataResp struct { type RenameDataResp struct {
Signature uint64 `msg:"sig"` Signature uint64 `msg:"sig"`

View File

@ -1249,6 +1249,159 @@ func (z *DiskInfo) Msgsize() (s int) {
return return
} }
// DecodeMsg implements msgp.Decodable
func (z *DiskInfoOptions) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.DiskID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
case "m":
z.Metrics, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "Metrics")
return
}
case "np":
z.NoOp, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "NoOp")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z DiskInfoOptions) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "id"
err = en.Append(0x83, 0xa2, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteString(z.DiskID)
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
// write "m"
err = en.Append(0xa1, 0x6d)
if err != nil {
return
}
err = en.WriteBool(z.Metrics)
if err != nil {
err = msgp.WrapError(err, "Metrics")
return
}
// write "np"
err = en.Append(0xa2, 0x6e, 0x70)
if err != nil {
return
}
err = en.WriteBool(z.NoOp)
if err != nil {
err = msgp.WrapError(err, "NoOp")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z DiskInfoOptions) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "id"
o = append(o, 0x83, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.DiskID)
// string "m"
o = append(o, 0xa1, 0x6d)
o = msgp.AppendBool(o, z.Metrics)
// string "np"
o = append(o, 0xa2, 0x6e, 0x70)
o = msgp.AppendBool(o, z.NoOp)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *DiskInfoOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.DiskID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
case "m":
z.Metrics, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Metrics")
return
}
case "np":
z.NoOp, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "NoOp")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z DiskInfoOptions) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.BoolSize + 3 + msgp.BoolSize
return
}
// DecodeMsg implements msgp.Decodable // DecodeMsg implements msgp.Decodable
func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte var field []byte
@ -3174,6 +3327,159 @@ func (z *RawFileInfo) Msgsize() (s int) {
return return
} }
// DecodeMsg implements msgp.Decodable
func (z *ReadAllHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.DiskID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
case "v":
z.Volume, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Volume")
return
}
case "fp":
z.FilePath, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "FilePath")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z ReadAllHandlerParams) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "id"
err = en.Append(0x83, 0xa2, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteString(z.DiskID)
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
// write "v"
err = en.Append(0xa1, 0x76)
if err != nil {
return
}
err = en.WriteString(z.Volume)
if err != nil {
err = msgp.WrapError(err, "Volume")
return
}
// write "fp"
err = en.Append(0xa2, 0x66, 0x70)
if err != nil {
return
}
err = en.WriteString(z.FilePath)
if err != nil {
err = msgp.WrapError(err, "FilePath")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z ReadAllHandlerParams) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "id"
o = append(o, 0x83, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.DiskID)
// string "v"
o = append(o, 0xa1, 0x76)
o = msgp.AppendString(o, z.Volume)
// string "fp"
o = append(o, 0xa2, 0x66, 0x70)
o = msgp.AppendString(o, z.FilePath)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *ReadAllHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.DiskID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
case "v":
z.Volume, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Volume")
return
}
case "fp":
z.FilePath, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "FilePath")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z ReadAllHandlerParams) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 2 + msgp.StringPrefixSize + len(z.Volume) + 3 + msgp.StringPrefixSize + len(z.FilePath)
return
}
// DecodeMsg implements msgp.Decodable // DecodeMsg implements msgp.Decodable
func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) { func (z *ReadMultipleReq) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte var field []byte
@ -4170,6 +4476,209 @@ func (z RenameDataResp) Msgsize() (s int) {
return return
} }
// DecodeMsg implements msgp.Decodable
func (z *RenameFileHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.DiskID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
case "sv":
z.SrcVolume, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "SrcVolume")
return
}
case "sp":
z.SrcFilePath, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "SrcFilePath")
return
}
case "dv":
z.DstVolume, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "DstVolume")
return
}
case "dp":
z.DstFilePath, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "DstFilePath")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *RenameFileHandlerParams) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 5
// write "id"
err = en.Append(0x85, 0xa2, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteString(z.DiskID)
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
// write "sv"
err = en.Append(0xa2, 0x73, 0x76)
if err != nil {
return
}
err = en.WriteString(z.SrcVolume)
if err != nil {
err = msgp.WrapError(err, "SrcVolume")
return
}
// write "sp"
err = en.Append(0xa2, 0x73, 0x70)
if err != nil {
return
}
err = en.WriteString(z.SrcFilePath)
if err != nil {
err = msgp.WrapError(err, "SrcFilePath")
return
}
// write "dv"
err = en.Append(0xa2, 0x64, 0x76)
if err != nil {
return
}
err = en.WriteString(z.DstVolume)
if err != nil {
err = msgp.WrapError(err, "DstVolume")
return
}
// write "dp"
err = en.Append(0xa2, 0x64, 0x70)
if err != nil {
return
}
err = en.WriteString(z.DstFilePath)
if err != nil {
err = msgp.WrapError(err, "DstFilePath")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *RenameFileHandlerParams) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 5
// string "id"
o = append(o, 0x85, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.DiskID)
// string "sv"
o = append(o, 0xa2, 0x73, 0x76)
o = msgp.AppendString(o, z.SrcVolume)
// string "sp"
o = append(o, 0xa2, 0x73, 0x70)
o = msgp.AppendString(o, z.SrcFilePath)
// string "dv"
o = append(o, 0xa2, 0x64, 0x76)
o = msgp.AppendString(o, z.DstVolume)
// string "dp"
o = append(o, 0xa2, 0x64, 0x70)
o = msgp.AppendString(o, z.DstFilePath)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *RenameFileHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "id":
z.DiskID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DiskID")
return
}
case "sv":
z.SrcVolume, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "SrcVolume")
return
}
case "sp":
z.SrcFilePath, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "SrcFilePath")
return
}
case "dv":
z.DstVolume, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DstVolume")
return
}
case "dp":
z.DstFilePath, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DstFilePath")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *RenameFileHandlerParams) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 3 + msgp.StringPrefixSize + len(z.SrcVolume) + 3 + msgp.StringPrefixSize + len(z.SrcFilePath) + 3 + msgp.StringPrefixSize + len(z.DstVolume) + 3 + msgp.StringPrefixSize + len(z.DstFilePath)
return
}
// DecodeMsg implements msgp.Decodable // DecodeMsg implements msgp.Decodable
func (z *RenameOptions) DecodeMsg(dc *msgp.Reader) (err error) { func (z *RenameOptions) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte var field []byte

View File

@ -687,6 +687,119 @@ func BenchmarkDecodeDiskInfo(b *testing.B) {
} }
} }
func TestMarshalUnmarshalDiskInfoOptions(t *testing.T) {
v := DiskInfoOptions{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgDiskInfoOptions(b *testing.B) {
v := DiskInfoOptions{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgDiskInfoOptions(b *testing.B) {
v := DiskInfoOptions{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalDiskInfoOptions(b *testing.B) {
v := DiskInfoOptions{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeDiskInfoOptions(t *testing.T) {
v := DiskInfoOptions{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeDiskInfoOptions Msgsize() is inaccurate")
}
vn := DiskInfoOptions{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeDiskInfoOptions(b *testing.B) {
v := DiskInfoOptions{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeDiskInfoOptions(b *testing.B) {
v := DiskInfoOptions{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalDiskMetrics(t *testing.T) { func TestMarshalUnmarshalDiskMetrics(t *testing.T) {
v := DiskMetrics{} v := DiskMetrics{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)
@ -1365,6 +1478,119 @@ func BenchmarkDecodeRawFileInfo(b *testing.B) {
} }
} }
func TestMarshalUnmarshalReadAllHandlerParams(t *testing.T) {
v := ReadAllHandlerParams{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgReadAllHandlerParams(b *testing.B) {
v := ReadAllHandlerParams{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgReadAllHandlerParams(b *testing.B) {
v := ReadAllHandlerParams{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalReadAllHandlerParams(b *testing.B) {
v := ReadAllHandlerParams{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeReadAllHandlerParams(t *testing.T) {
v := ReadAllHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeReadAllHandlerParams Msgsize() is inaccurate")
}
vn := ReadAllHandlerParams{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeReadAllHandlerParams(b *testing.B) {
v := ReadAllHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeReadAllHandlerParams(b *testing.B) {
v := ReadAllHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalReadMultipleReq(t *testing.T) { func TestMarshalUnmarshalReadMultipleReq(t *testing.T) {
v := ReadMultipleReq{} v := ReadMultipleReq{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)
@ -1817,6 +2043,119 @@ func BenchmarkDecodeRenameDataResp(b *testing.B) {
} }
} }
func TestMarshalUnmarshalRenameFileHandlerParams(t *testing.T) {
v := RenameFileHandlerParams{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgRenameFileHandlerParams(b *testing.B) {
v := RenameFileHandlerParams{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgRenameFileHandlerParams(b *testing.B) {
v := RenameFileHandlerParams{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalRenameFileHandlerParams(b *testing.B) {
v := RenameFileHandlerParams{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeRenameFileHandlerParams(t *testing.T) {
v := RenameFileHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeRenameFileHandlerParams Msgsize() is inaccurate")
}
vn := RenameFileHandlerParams{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeRenameFileHandlerParams(b *testing.B) {
v := RenameFileHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeRenameFileHandlerParams(b *testing.B) {
v := RenameFileHandlerParams{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalRenameOptions(t *testing.T) { func TestMarshalUnmarshalRenameOptions(t *testing.T) {
v := RenameOptions{} v := RenameOptions{}
bts, err := v.MarshalMsg(nil) bts, err := v.MarshalMsg(nil)

View File

@ -65,7 +65,7 @@ type StorageAPI interface {
// returns 'nil' once healing is complete or if the disk // returns 'nil' once healing is complete or if the disk
// has never been replaced. // has never been replaced.
Healing() *healingTracker Healing() *healingTracker
DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error)
NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, shouldSleep func() bool) (dataUsageCache, error) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode, shouldSleep func() bool) (dataUsageCache, error)
// Volume operations. // Volume operations.
@ -173,7 +173,7 @@ func (p *unrecognizedDisk) GetDiskID() (string, error) {
func (p *unrecognizedDisk) SetDiskID(id string) { func (p *unrecognizedDisk) SetDiskID(id string) {
} }
func (p *unrecognizedDisk) DiskInfo(ctx context.Context, _ bool) (info DiskInfo, err error) { func (p *unrecognizedDisk) DiskInfo(ctx context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
return info, errDiskNotFound return info, errDiskNotFound
} }

View File

@ -282,7 +282,7 @@ func (client *storageRESTClient) SetDiskID(id string) {
client.diskID = id client.diskID = id
} }
func (client *storageRESTClient) DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) { func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
if client.gridConn.State() != grid.StateConnected { if client.gridConn.State() != grid.StateConnected {
// make sure to check if the disk is offline, since the underlying // make sure to check if the disk is offline, since the underlying
// value is cached we should attempt to invalidate it if such calls // value is cached we should attempt to invalidate it if such calls
@ -294,11 +294,9 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, metrics bool) (in
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
infop, err := storageDiskInfoHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{ opts.DiskID = client.diskID
storageRESTDiskID: client.diskID,
// Always request metrics, since we are caching the result. infop, err := storageDiskInfoHandler.Call(ctx, client.gridConn, &opts)
storageRESTMetrics: strconv.FormatBool(metrics),
}))
if err != nil { if err != nil {
return info, err return info, err
} }
@ -539,10 +537,6 @@ func (client *storageRESTClient) ReadXL(ctx context.Context, volume string, path
// ReadAll - reads all contents of a file. // ReadAll - reads all contents of a file.
func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, path string) ([]byte, error) { func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, path string) ([]byte, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
// Specific optimization to avoid re-read from the drives for `format.json` // Specific optimization to avoid re-read from the drives for `format.json`
// in-case the caller is a network operation. // in-case the caller is a network operation.
if volume == minioMetaBucket && path == formatConfigFile { if volume == minioMetaBucket && path == formatConfigFile {
@ -555,12 +549,16 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
} }
} }
respBody, err := client.call(ctx, storageRESTMethodReadAll, values, nil, -1) gridBytes, err := storageReadAllHandler.Call(ctx, client.gridConn, &ReadAllHandlerParams{
DiskID: client.diskID,
Volume: volume,
FilePath: path,
})
if err != nil { if err != nil {
return nil, err return nil, toStorageErr(err)
} }
defer xhttp.DrainBody(respBody)
return io.ReadAll(respBody) return *gridBytes, nil
} }
// ReadFileStream - returns a reader for the requested file. // ReadFileStream - returns a reader for the requested file.
@ -682,14 +680,18 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
// RenameFile - renames a file. // RenameFile - renames a file.
func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) { func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {
values := make(url.Values) // Set a very long timeout for rename file
values.Set(storageRESTSrcVolume, srcVolume) ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
values.Set(storageRESTSrcPath, srcPath) defer cancel()
values.Set(storageRESTDstVolume, dstVolume)
values.Set(storageRESTDstPath, dstPath) _, err = storageRenameFileHandler.Call(ctx, client.gridConn, &RenameFileHandlerParams{
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1) DiskID: client.diskID,
defer xhttp.DrainBody(respBody) SrcVolume: srcVolume,
return err SrcFilePath: srcPath,
DstVolume: dstVolume,
DstFilePath: dstPath,
})
return toStorageErr(err)
} }
func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error { func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error {

View File

@ -20,7 +20,7 @@ package cmd
//go:generate msgp -file $GOFILE -unexported //go:generate msgp -file $GOFILE -unexported
const ( const (
storageRESTVersion = "v54" // Add more metrics per drive storageRESTVersion = "v55" // ReadAll, RenameFile migrate to websockets
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage" storageRESTPrefix = minioReservedBucketPath + "/storage"
) )
@ -68,6 +68,7 @@ const (
storageRESTForceDelete = "force-delete" storageRESTForceDelete = "force-delete"
storageRESTGlob = "glob" storageRESTGlob = "glob"
storageRESTMetrics = "metrics" storageRESTMetrics = "metrics"
storageRESTDriveQuorum = "drive-quorum"
) )
type nsScannerOptions struct { type nsScannerOptions struct {

View File

@ -200,15 +200,15 @@ func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request
// DiskInfo types. // DiskInfo types.
// DiskInfo.Metrics elements are shared, so we cannot reuse. // DiskInfo.Metrics elements are shared, so we cannot reuse.
var storageDiskInfoHandler = grid.NewSingleHandler[*grid.MSS, *DiskInfo](grid.HandlerDiskInfo, grid.NewMSS, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse() var storageDiskInfoHandler = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} },
func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse()
// DiskInfoHandler - returns disk info. // DiskInfoHandler - returns disk info.
func (s *storageRESTServer) DiskInfoHandler(params *grid.MSS) (*DiskInfo, *grid.RemoteErr) { func (s *storageRESTServer) DiskInfoHandler(opts *DiskInfoOptions) (*DiskInfo, *grid.RemoteErr) {
if !s.checkID(params.Get(storageRESTDiskID)) { if !s.checkID(opts.DiskID) {
return nil, grid.NewRemoteErr(errDiskNotFound) return nil, grid.NewRemoteErr(errDiskNotFound)
} }
withMetrics := params.Get(storageRESTMetrics) == "true" info, err := s.getStorage().DiskInfo(context.Background(), *opts)
info, err := s.getStorage().DiskInfo(context.Background(), withMetrics)
if err != nil { if err != nil {
info.Error = err.Error() info.Error = err.Error()
} }
@ -487,23 +487,21 @@ func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (grid.
return grid.NewNPErr(s.getStorage().CheckParts(context.Background(), volume, filePath, p.FI)) return grid.NewNPErr(s.getStorage().CheckParts(context.Background(), volume, filePath, p.FI))
} }
// ReadAllHandler - read all the contents of a file. var storageReadAllHandler = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams {
func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Request) { return &ReadAllHandlerParams{}
if !s.IsValid(w, r) { }, grid.NewBytes)
return
}
volume := r.Form.Get(storageRESTVolume)
filePath := r.Form.Get(storageRESTFilePath)
buf, err := s.getStorage().ReadAll(r.Context(), volume, filePath) // ReadAllHandler - read all the contents of a file.
if err != nil { func (s *storageRESTServer) ReadAllHandler(p *ReadAllHandlerParams) (*grid.Bytes, *grid.RemoteErr) {
s.writeErrorResponse(w, err) if !s.checkID(p.DiskID) {
return return nil, grid.NewRemoteErr(errDiskNotFound)
} }
// Reuse after return.
defer metaDataPoolPut(buf) volume := p.Volume
w.Header().Set(xhttp.ContentLength, strconv.Itoa(len(buf))) filePath := p.FilePath
w.Write(buf)
buf, err := s.getStorage().ReadAll(context.Background(), volume, filePath)
return grid.NewBytesWith(buf), grid.NewRemoteErr(err)
} }
// ReadXLHandler - read xl.meta for an object at path. // ReadXLHandler - read xl.meta for an object at path.
@ -744,19 +742,16 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena
return resp, grid.NewRemoteErr(err) return resp, grid.NewRemoteErr(err)
} }
// RenameFileHandler - rename a file. var storageRenameFileHandler = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams {
func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Request) { return &RenameFileHandlerParams{}
if !s.IsValid(w, r) { }, grid.NewNoPayload)
return
} // RenameFileHandler - rename a file from source to destination
srcVolume := r.Form.Get(storageRESTSrcVolume) func (s *storageRESTServer) RenameFileHandler(p *RenameFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
srcFilePath := r.Form.Get(storageRESTSrcPath) if !s.checkID(p.DiskID) {
dstVolume := r.Form.Get(storageRESTDstVolume) return grid.NewNPErr(errDiskNotFound)
dstFilePath := r.Form.Get(storageRESTDstPath)
err := s.getStorage().RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath)
if err != nil {
s.writeErrorResponse(w, err)
} }
return grid.NewNPErr(s.getStorage().RenameFile(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath))
} }
// CleanAbandonedDataHandler - Clean unused data directories. // CleanAbandonedDataHandler - Clean unused data directories.
@ -1352,17 +1347,17 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadXL).HandlerFunc(h(server.ReadXLHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadXL).HandlerFunc(h(server.ReadXLHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(h(server.CreateFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(h(server.CreateFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadAll).HandlerFunc(h(server.ReadAllHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(h(server.ReadFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(h(server.ReadFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(h(server.ListDirHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(h(server.ListDirHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(h(server.DeleteVersionsHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(h(server.DeleteVersionsHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameFile).HandlerFunc(h(server.RenameFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(h(server.VerifyFileHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(h(server.VerifyFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
logger.FatalIf(storageReadAllHandler.Register(gm, server.ReadAllHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameFileHandler.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageRenameDataHandler.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageRenameDataHandler.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageDeleteFileHandler.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageDeleteFileHandler.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageCheckPartsHandler.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler") logger.FatalIf(storageCheckPartsHandler.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler")

View File

@ -41,7 +41,7 @@ func testStorageAPIDiskInfo(t *testing.T, storage StorageAPI) {
} }
for i, testCase := range testCases { for i, testCase := range testCases {
_, err := storage.DiskInfo(context.Background(), true) _, err := storage.DiskInfo(context.Background(), DiskInfoOptions{Metrics: true})
expectErr := (err != nil) expectErr := (err != nil)
if expectErr != testCase.expectErr { if expectErr != testCase.expectErr {
@ -354,7 +354,7 @@ func newStorageRESTHTTPServerClient(t testing.TB) *storageRESTClient {
} }
for { for {
_, err := restClient.DiskInfo(context.Background(), false) _, err := restClient.DiskInfo(context.Background(), DiskInfoOptions{})
if err == nil || errors.Is(err, errUnformattedDisk) { if err == nil || errors.Is(err, errUnformattedDisk) {
break break
} }

View File

@ -338,7 +338,7 @@ func (p *xlStorageDiskIDCheck) checkDiskStale() error {
return errDiskNotFound return errDiskNotFound
} }
func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, metrics bool) (info DiskInfo, err error) { func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOptions) (info DiskInfo, err error) {
if contextCanceled(ctx) { if contextCanceled(ctx) {
return DiskInfo{}, ctx.Err() return DiskInfo{}, ctx.Err()
} }
@ -346,8 +346,20 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, metrics bool) (info
si := p.updateStorageMetrics(storageMetricDiskInfo) si := p.updateStorageMetrics(storageMetricDiskInfo)
defer si(&err) defer si(&err)
if opts.NoOp {
if driveQuorum {
info.Metrics.TotalWrites = p.totalWrites.Load()
info.Metrics.TotalDeletes = p.totalDeletes.Load()
}
info.Metrics.TotalTokens = uint32(p.driveMaxConcurrent)
info.Metrics.TotalWaiting = uint32(p.health.waiting.Load())
info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load()
info.Metrics.TotalErrorsAvailability = p.totalErrsAvailability.Load()
return
}
defer func() { defer func() {
if metrics { if opts.Metrics {
info.Metrics = p.getMetrics() info.Metrics = p.getMetrics()
} }
if driveQuorum { if driveQuorum {
@ -365,7 +377,7 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, metrics bool) (info
return info, errFaultyDisk return info, errFaultyDisk
} }
info, err = p.storage.DiskInfo(ctx, metrics) info, err = p.storage.DiskInfo(ctx, opts)
if err != nil { if err != nil {
return info, err return info, err
} }

View File

@ -728,7 +728,7 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error {
// DiskInfo provides current information about disk space usage, // DiskInfo provides current information about disk space usage,
// total free inodes and underlying filesystem. // total free inodes and underlying filesystem.
func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err error) { func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
s.diskInfoCache.Once.Do(func() { s.diskInfoCache.Once.Do(func() {
s.diskInfoCache.TTL = time.Second s.diskInfoCache.TTL = time.Second
s.diskInfoCache.Update = func() (interface{}, error) { s.diskInfoCache.Update = func() (interface{}, error) {

View File

@ -56,6 +56,8 @@ const (
HandlerWriteMetadata HandlerWriteMetadata
HandlerCheckParts HandlerCheckParts
HandlerRenameData HandlerRenameData
HandlerRenameFile
HandlerReadAll
HandlerServerVerify HandlerServerVerify
// Add more above here ^^^ // Add more above here ^^^
@ -87,6 +89,8 @@ var handlerPrefixes = [handlerLast]string{
HandlerWriteMetadata: storagePrefix, HandlerWriteMetadata: storagePrefix,
HandlerCheckParts: storagePrefix, HandlerCheckParts: storagePrefix,
HandlerRenameData: storagePrefix, HandlerRenameData: storagePrefix,
HandlerRenameFile: storagePrefix,
HandlerReadAll: storagePrefix,
HandlerServerVerify: bootstrapPrefix, HandlerServerVerify: bootstrapPrefix,
} }

View File

@ -27,15 +27,17 @@ func _() {
_ = x[HandlerWriteMetadata-16] _ = x[HandlerWriteMetadata-16]
_ = x[HandlerCheckParts-17] _ = x[HandlerCheckParts-17]
_ = x[HandlerRenameData-18] _ = x[HandlerRenameData-18]
_ = x[HandlerServerVerify-19] _ = x[HandlerRenameFile-19]
_ = x[handlerTest-20] _ = x[HandlerReadAll-20]
_ = x[handlerTest2-21] _ = x[HandlerServerVerify-21]
_ = x[handlerLast-22] _ = x[handlerTest-22]
_ = x[handlerTest2-23]
_ = x[handlerLast-24]
} }
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataServerVerifyhandlerTesthandlerTest2handlerLast" const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyhandlerTesthandlerTest2handlerLast"
var _HandlerID_index = [...]uint8{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 208, 219, 231, 242} var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 236, 248, 259}
func (i HandlerID) String() string { func (i HandlerID) String() string {
if i >= HandlerID(len(_HandlerID_index)-1) { if i >= HandlerID(len(_HandlerID_index)-1) {