mirror of
https://github.com/minio/minio.git
synced 2025-07-29 02:00:59 -04:00
Improve disk formatting stage for large disk sets (#8690)
This commit is contained in:
parent
725172e13b
commit
f68a7005c0
@ -39,8 +39,11 @@ type Err struct {
|
|||||||
// Return the error message
|
// Return the error message
|
||||||
func (u Err) Error() string {
|
func (u Err) Error() string {
|
||||||
if u.detail == "" {
|
if u.detail == "" {
|
||||||
|
if u.msg != "" {
|
||||||
return u.msg
|
return u.msg
|
||||||
}
|
}
|
||||||
|
return "<nil>"
|
||||||
|
}
|
||||||
return u.detail
|
return u.detail
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,6 +80,10 @@ func newErrFn(msg, action, hint string) ErrFn {
|
|||||||
// ErrorToErr inspects the passed error and transforms it
|
// ErrorToErr inspects the passed error and transforms it
|
||||||
// to the appropriate UI error.
|
// to the appropriate UI error.
|
||||||
func ErrorToErr(err error) Err {
|
func ErrorToErr(err error) Err {
|
||||||
|
if err == nil {
|
||||||
|
return Err{}
|
||||||
|
}
|
||||||
|
|
||||||
// If this is already a Err, do nothing
|
// If this is already a Err, do nothing
|
||||||
if e, ok := err.(Err); ok {
|
if e, ok := err.(Err); ok {
|
||||||
return e
|
return e
|
||||||
@ -95,7 +102,6 @@ func ErrorToErr(err error) Err {
|
|||||||
// Failed to identify what type of error this, return a simple UI error
|
// Failed to identify what type of error this, return a simple UI error
|
||||||
return Err{msg: err.Error()}
|
return Err{msg: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FmtError converts a fatal error message to a more clear error
|
// FmtError converts a fatal error message to a more clear error
|
||||||
|
172
cmd/format-xl.go
172
cmd/format-xl.go
@ -155,24 +155,9 @@ func newFormatXLV3(numSets int, setLen int) *formatXLV3 {
|
|||||||
return format
|
return format
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns formatXL.XL.Version information, this code is specifically
|
// Returns format XL version after reading `format.json`, returns
|
||||||
// used to read XL `format.json` and capture any version information
|
// successfully the version only if the backend is XL.
|
||||||
// that it may have.
|
func formatGetBackendXLVersion(formatPath string) (string, error) {
|
||||||
func formatXLGetVersion(formatPath string) (string, error) {
|
|
||||||
format := &formatXLVersionDetect{}
|
|
||||||
b, err := ioutil.ReadFile(formatPath)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if err = json.Unmarshal(b, format); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return format.XL.Version, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns format meta format version from `format.json`. This code
|
|
||||||
// is specifically used to detect meta format.
|
|
||||||
func formatMetaGetFormatBackendXL(formatPath string) (string, error) {
|
|
||||||
meta := &formatMetaV1{}
|
meta := &formatMetaV1{}
|
||||||
b, err := ioutil.ReadFile(formatPath)
|
b, err := ioutil.ReadFile(formatPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -184,7 +169,15 @@ func formatMetaGetFormatBackendXL(formatPath string) (string, error) {
|
|||||||
if meta.Version != formatMetaVersionV1 {
|
if meta.Version != formatMetaVersionV1 {
|
||||||
return "", fmt.Errorf(`format.Version expected: %s, got: %s`, formatMetaVersionV1, meta.Version)
|
return "", fmt.Errorf(`format.Version expected: %s, got: %s`, formatMetaVersionV1, meta.Version)
|
||||||
}
|
}
|
||||||
return meta.Format, nil
|
if meta.Format != formatBackendXL {
|
||||||
|
return "", fmt.Errorf(`found backend %s, expected %s`, meta.Format, formatBackendXL)
|
||||||
|
}
|
||||||
|
// XL backend found, proceed to detect version.
|
||||||
|
format := &formatXLVersionDetect{}
|
||||||
|
if err = json.Unmarshal(b, format); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return format.XL.Version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrates all previous versions to latest version of `format.json`,
|
// Migrates all previous versions to latest version of `format.json`,
|
||||||
@ -192,30 +185,27 @@ func formatMetaGetFormatBackendXL(formatPath string) (string, error) {
|
|||||||
// first before it V2 migrates to V3.
|
// first before it V2 migrates to V3.
|
||||||
func formatXLMigrate(export string) error {
|
func formatXLMigrate(export string) error {
|
||||||
formatPath := pathJoin(export, minioMetaBucket, formatConfigFile)
|
formatPath := pathJoin(export, minioMetaBucket, formatConfigFile)
|
||||||
backend, err := formatMetaGetFormatBackendXL(formatPath)
|
version, err := formatGetBackendXLVersion(formatPath)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if backend != formatBackendXL {
|
|
||||||
return fmt.Errorf(`Disk %s: found backend %s, expected %s`, export, backend, formatBackendXL)
|
|
||||||
}
|
|
||||||
version, err := formatXLGetVersion(formatPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
switch version {
|
switch version {
|
||||||
case formatXLVersionV1:
|
case formatXLVersionV1:
|
||||||
if err = formatXLMigrateV1ToV2(export); err != nil {
|
if err = formatXLMigrateV1ToV2(export, version); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Migrate successful v1 => v2, proceed to v2 => v3
|
||||||
|
version = formatXLVersionV2
|
||||||
fallthrough
|
fallthrough
|
||||||
case formatXLVersionV2:
|
case formatXLVersionV2:
|
||||||
if err = formatXLMigrateV2ToV3(export); err != nil {
|
if err = formatXLMigrateV2ToV3(export, version); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Migrate successful v2 => v3, v3 is latest
|
||||||
|
version = formatXLVersionV3
|
||||||
fallthrough
|
fallthrough
|
||||||
case formatXLVersionV3:
|
case formatXLVersionV3:
|
||||||
// format-V3 is the latest verion.
|
// v3 is the latest version, return.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf(`%s: unknown format version %s`, export, version)
|
return fmt.Errorf(`%s: unknown format version %s`, export, version)
|
||||||
@ -223,16 +213,13 @@ func formatXLMigrate(export string) error {
|
|||||||
|
|
||||||
// Migrates version V1 of format.json to version V2 of format.json,
|
// Migrates version V1 of format.json to version V2 of format.json,
|
||||||
// migration fails upon any error.
|
// migration fails upon any error.
|
||||||
func formatXLMigrateV1ToV2(export string) error {
|
func formatXLMigrateV1ToV2(export, version string) error {
|
||||||
formatPath := pathJoin(export, minioMetaBucket, formatConfigFile)
|
|
||||||
version, err := formatXLGetVersion(formatPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if version != formatXLVersionV1 {
|
if version != formatXLVersionV1 {
|
||||||
return fmt.Errorf(`Disk %s: format version expected %s, found %s`, export, formatXLVersionV1, version)
|
return fmt.Errorf(`Disk %s: format version expected %s, found %s`, export, formatXLVersionV1, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
formatPath := pathJoin(export, minioMetaBucket, formatConfigFile)
|
||||||
|
|
||||||
formatV1 := &formatXLV1{}
|
formatV1 := &formatXLV1{}
|
||||||
b, err := ioutil.ReadFile(formatPath)
|
b, err := ioutil.ReadFile(formatPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -260,15 +247,12 @@ func formatXLMigrateV1ToV2(export string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Migrates V2 for format.json to V3 (Flat hierarchy for multipart)
|
// Migrates V2 for format.json to V3 (Flat hierarchy for multipart)
|
||||||
func formatXLMigrateV2ToV3(export string) error {
|
func formatXLMigrateV2ToV3(export, version string) error {
|
||||||
formatPath := pathJoin(export, minioMetaBucket, formatConfigFile)
|
|
||||||
version, err := formatXLGetVersion(formatPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if version != formatXLVersionV2 {
|
if version != formatXLVersionV2 {
|
||||||
return fmt.Errorf(`Disk %s: format version expected %s, found %s`, export, formatXLVersionV2, version)
|
return fmt.Errorf(`Disk %s: format version expected %s, found %s`, export, formatXLVersionV2, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
formatPath := pathJoin(export, minioMetaBucket, formatConfigFile)
|
||||||
formatV2 := &formatXLV2{}
|
formatV2 := &formatXLV2{}
|
||||||
b, err := ioutil.ReadFile(formatPath)
|
b, err := ioutil.ReadFile(formatPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -360,18 +344,18 @@ func saveFormatXL(disk StorageAPI, format interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpFormatJSON := mustGetUUID() + ".json"
|
tmpFormat := mustGetUUID()
|
||||||
|
|
||||||
// Purge any existing temporary file, okay to ignore errors here.
|
// Purge any existing temporary file, okay to ignore errors here.
|
||||||
defer disk.DeleteFile(minioMetaBucket, tmpFormatJSON)
|
defer disk.DeleteFile(minioMetaBucket, tmpFormat)
|
||||||
|
|
||||||
// Append file `format.json.tmp`.
|
// write to unique file.
|
||||||
if err = disk.WriteAll(minioMetaBucket, tmpFormatJSON, bytes.NewReader(formatBytes)); err != nil {
|
if err = disk.WriteAll(minioMetaBucket, tmpFormat, bytes.NewReader(formatBytes)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rename file `uuid.json` --> `format.json`.
|
// Rename file `uuid.json` --> `format.json`.
|
||||||
return disk.RenameFile(minioMetaBucket, tmpFormatJSON, minioMetaBucket, formatConfigFile)
|
return disk.RenameFile(minioMetaBucket, tmpFormat, minioMetaBucket, formatConfigFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
var ignoredHiddenDirectories = []string{
|
var ignoredHiddenDirectories = []string{
|
||||||
@ -635,6 +619,39 @@ func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error {
|
|||||||
return fmt.Errorf("Disk ID %s not found in any disk sets %s", this, format.XL.Sets)
|
return fmt.Errorf("Disk ID %s not found in any disk sets %s", this, format.XL.Sets)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initializes meta volume on all input storage disks.
|
||||||
|
func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) error {
|
||||||
|
// This happens for the first time, but keep this here since this
|
||||||
|
// is the only place where it can be made expensive optimizing all
|
||||||
|
// other calls. Create minio meta volume, if it doesn't exist yet.
|
||||||
|
|
||||||
|
// Initialize errs to collect errors inside go-routine.
|
||||||
|
g := errgroup.WithNErrs(len(storageDisks))
|
||||||
|
|
||||||
|
// Initialize all disks in parallel.
|
||||||
|
for index := range storageDisks {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if formats[index] == nil || storageDisks[index] == nil {
|
||||||
|
// Ignore create meta volume on disks which are not found.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return makeFormatXLMetaVolumes(storageDisks[index])
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return upon first error.
|
||||||
|
for _, err := range g.Wait() {
|
||||||
|
if err == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return toObjectErr(err, minioMetaBucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return success here.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// saveFormatXLAll - populates `format.json` on disks in its order.
|
// saveFormatXLAll - populates `format.json` on disks in its order.
|
||||||
func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatXLV3) error {
|
func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatXLV3) error {
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
g := errgroup.WithNErrs(len(storageDisks))
|
||||||
@ -646,6 +663,9 @@ func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*
|
|||||||
if formats[index] == nil || storageDisks[index] == nil {
|
if formats[index] == nil || storageDisks[index] == nil {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
|
if err := makeFormatXLMetaVolumes(storageDisks[index]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return saveFormatXL(storageDisks[index], formats[index])
|
return saveFormatXL(storageDisks[index], formats[index])
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
@ -745,11 +765,6 @@ func initFormatXL(ctx context.Context, storageDisks []StorageAPI, setCount, driv
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize meta volume, if volume already exists ignores it.
|
|
||||||
if err := initFormatXLMetaVolume(storageDisks, formats); err != nil {
|
|
||||||
return format, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save formats `format.json` across all disks.
|
// Save formats `format.json` across all disks.
|
||||||
if err := saveFormatXLAll(ctx, storageDisks, formats); err != nil {
|
if err := saveFormatXLAll(ctx, storageDisks, formats); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -760,23 +775,9 @@ func initFormatXL(ctx context.Context, storageDisks []StorageAPI, setCount, driv
|
|||||||
|
|
||||||
// Make XL backend meta volumes.
|
// Make XL backend meta volumes.
|
||||||
func makeFormatXLMetaVolumes(disk StorageAPI) error {
|
func makeFormatXLMetaVolumes(disk StorageAPI) error {
|
||||||
// Attempt to create `.minio.sys`.
|
// Attempt to create MinIO internal buckets.
|
||||||
if err := disk.MakeVol(minioMetaBucket); err != nil {
|
err := disk.MakeVolBulk(minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, minioMetaBackgroundOpsBucket)
|
||||||
if !IsErrIgnored(err, initMetaVolIgnoredErrs...) {
|
if err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := disk.MakeVol(minioMetaTmpBucket); err != nil {
|
|
||||||
if !IsErrIgnored(err, initMetaVolIgnoredErrs...) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := disk.MakeVol(minioMetaBackgroundOpsBucket); err != nil {
|
|
||||||
if !IsErrIgnored(err, initMetaVolIgnoredErrs...) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := disk.MakeVol(minioMetaMultipartBucket); err != nil {
|
|
||||||
if !IsErrIgnored(err, initMetaVolIgnoredErrs...) {
|
if !IsErrIgnored(err, initMetaVolIgnoredErrs...) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -786,39 +787,6 @@ func makeFormatXLMetaVolumes(disk StorageAPI) error {
|
|||||||
|
|
||||||
var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists)
|
var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists)
|
||||||
|
|
||||||
// Initializes meta volume on all input storage disks.
|
|
||||||
func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) error {
|
|
||||||
// This happens for the first time, but keep this here since this
|
|
||||||
// is the only place where it can be made expensive optimizing all
|
|
||||||
// other calls. Create minio meta volume, if it doesn't exist yet.
|
|
||||||
|
|
||||||
// Initialize errs to collect errors inside go-routine.
|
|
||||||
g := errgroup.WithNErrs(len(storageDisks))
|
|
||||||
|
|
||||||
// Initialize all disks in parallel.
|
|
||||||
for index := range storageDisks {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
if formats[index] == nil || storageDisks[index] == nil {
|
|
||||||
// Ignore create meta volume on disks which are not found.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return makeFormatXLMetaVolumes(storageDisks[index])
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return upon first error.
|
|
||||||
for _, err := range g.Wait() {
|
|
||||||
if err == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return toObjectErr(err, minioMetaBucket)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return success here.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get all UUIDs which are present in reference format should
|
// Get all UUIDs which are present in reference format should
|
||||||
// be present in the list of formats provided, those are considered
|
// be present in the list of formats provided, those are considered
|
||||||
// as online UUIDs.
|
// as online UUIDs.
|
||||||
|
@ -149,95 +149,6 @@ func TestFormatXLEmpty(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests format xl get version.
|
|
||||||
func TestFormatXLGetVersion(t *testing.T) {
|
|
||||||
// Get test root.
|
|
||||||
rootPath, err := getTestRoot()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(rootPath)
|
|
||||||
|
|
||||||
v := &formatXLVersionDetect{}
|
|
||||||
v.XL.Version = "1"
|
|
||||||
b, err := json.Marshal(v)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err = ioutil.WriteFile(pathJoin(rootPath, formatConfigFile), b, os.FileMode(0644)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = formatXLGetVersion("not-found")
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("Expected to fail but found success")
|
|
||||||
}
|
|
||||||
|
|
||||||
vstr, err := formatXLGetVersion(pathJoin(rootPath, formatConfigFile))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if vstr != "1" {
|
|
||||||
t.Fatalf("Expected version '1', got '%s'", vstr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tests format get backend format.
|
|
||||||
func TestFormatMetaGetFormatBackendXL(t *testing.T) {
|
|
||||||
// Get test root.
|
|
||||||
rootPath, err := getTestRoot()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(rootPath)
|
|
||||||
|
|
||||||
m := &formatMetaV1{
|
|
||||||
Format: "fs",
|
|
||||||
Version: formatMetaVersionV1,
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := json.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = ioutil.WriteFile(pathJoin(rootPath, formatConfigFile), b, os.FileMode(0644)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = formatMetaGetFormatBackendXL("not-found")
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("Expected to fail but found success")
|
|
||||||
}
|
|
||||||
|
|
||||||
format, err := formatMetaGetFormatBackendXL(pathJoin(rootPath, formatConfigFile))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if format != m.Format {
|
|
||||||
t.Fatalf("Expected format value %s, got %s", m.Format, format)
|
|
||||||
}
|
|
||||||
|
|
||||||
m = &formatMetaV1{
|
|
||||||
Format: "xl",
|
|
||||||
Version: "2",
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err = json.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = ioutil.WriteFile(pathJoin(rootPath, formatConfigFile), b, os.FileMode(0644)); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = formatMetaGetFormatBackendXL(pathJoin(rootPath, formatConfigFile))
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("Expected to fail with incompatible meta version")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tests xl format migration.
|
// Tests xl format migration.
|
||||||
func TestFormatXLMigrate(t *testing.T) {
|
func TestFormatXLMigrate(t *testing.T) {
|
||||||
// Get test root.
|
// Get test root.
|
||||||
@ -271,10 +182,11 @@ func TestFormatXLMigrate(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
migratedVersion, err := formatXLGetVersion(pathJoin(rootPath, minioMetaBucket, formatConfigFile))
|
migratedVersion, err := formatGetBackendXLVersion(pathJoin(rootPath, minioMetaBucket, formatConfigFile))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if migratedVersion != formatXLVersionV3 {
|
if migratedVersion != formatXLVersionV3 {
|
||||||
t.Fatalf("expected version: %s, got: %s", formatXLVersionV3, migratedVersion)
|
t.Fatalf("expected version: %s, got: %s", formatXLVersionV3, migratedVersion)
|
||||||
}
|
}
|
||||||
|
@ -91,6 +91,13 @@ func (d *naughtyDisk) DiskInfo() (info DiskInfo, err error) {
|
|||||||
return d.disk.DiskInfo()
|
return d.disk.DiskInfo()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *naughtyDisk) MakeVolBulk(volumes ...string) (err error) {
|
||||||
|
if err := d.calcError(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return d.disk.MakeVolBulk(volumes...)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) MakeVol(volume string) (err error) {
|
func (d *naughtyDisk) MakeVol(volume string) (err error) {
|
||||||
if err := d.calcError(); err != nil {
|
if err := d.calcError(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -74,6 +74,13 @@ func (p *posixDiskIDCheck) DiskInfo() (info DiskInfo, err error) {
|
|||||||
return p.storage.DiskInfo()
|
return p.storage.DiskInfo()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *posixDiskIDCheck) MakeVolBulk(volumes ...string) (err error) {
|
||||||
|
if p.isDiskStale() {
|
||||||
|
return errDiskNotFound
|
||||||
|
}
|
||||||
|
return p.storage.MakeVolBulk(volumes...)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *posixDiskIDCheck) MakeVol(volume string) (err error) {
|
func (p *posixDiskIDCheck) MakeVol(volume string) (err error) {
|
||||||
if p.isDiskStale() {
|
if p.isDiskStale() {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
|
@ -551,6 +551,15 @@ func (s *posix) SetDiskID(id string) {
|
|||||||
// storage rest server for remote disks.
|
// storage rest server for remote disks.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *posix) MakeVolBulk(volumes ...string) (err error) {
|
||||||
|
for _, volume := range volumes {
|
||||||
|
if err = s.MakeVol(volume); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Make a volume entry.
|
// Make a volume entry.
|
||||||
func (s *posix) MakeVol(volume string) (err error) {
|
func (s *posix) MakeVol(volume string) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -223,13 +223,14 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
// Assign globalDeploymentID on first run for the
|
// Assign globalDeploymentID on first run for the
|
||||||
// minio server managing the first disk
|
// minio server managing the first disk
|
||||||
globalDeploymentID = format.ID
|
globalDeploymentID = format.ID
|
||||||
} else {
|
return format, nil
|
||||||
|
}
|
||||||
|
|
||||||
// The first will always recreate some directories inside .minio.sys
|
// The first will always recreate some directories inside .minio.sys
|
||||||
// such as, tmp, multipart and background-ops
|
// such as, tmp, multipart and background-ops
|
||||||
if firstDisk {
|
if firstDisk {
|
||||||
initFormatXLMetaVolume(storageDisks, formatConfigs)
|
initFormatXLMetaVolume(storageDisks, formatConfigs)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Return error when quorum unformatted disks - indicating we are
|
// Return error when quorum unformatted disks - indicating we are
|
||||||
// waiting for first server to be online.
|
// waiting for first server to be online.
|
||||||
@ -288,6 +289,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
|
|||||||
if err = formatXLFixLocalDeploymentID(endpoints, storageDisks, format); err != nil {
|
if err = formatXLFixLocalDeploymentID(endpoints, storageDisks, format); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return format, nil
|
return format, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ type StorageAPI interface {
|
|||||||
|
|
||||||
// Volume operations.
|
// Volume operations.
|
||||||
MakeVol(volume string) (err error)
|
MakeVol(volume string) (err error)
|
||||||
|
MakeVolBulk(volumes ...string) (err error)
|
||||||
ListVols() (vols []VolInfo, err error)
|
ListVols() (vols []VolInfo, err error)
|
||||||
StatVol(volume string) (vol VolInfo, err error)
|
StatVol(volume string) (vol VolInfo, err error)
|
||||||
DeleteVol(volume string) (err error)
|
DeleteVol(volume string) (err error)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/minio/minio/cmd/http"
|
"github.com/minio/minio/cmd/http"
|
||||||
@ -189,6 +190,15 @@ func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) {
|
|||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MakeVolBulk - create multiple volumes in a bulk operation.
|
||||||
|
func (client *storageRESTClient) MakeVolBulk(volumes ...string) (err error) {
|
||||||
|
values := make(url.Values)
|
||||||
|
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
|
||||||
|
respBody, err := client.call(storageRESTMethodMakeVolBulk, values, nil, -1)
|
||||||
|
defer http.DrainBody(respBody)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// MakeVol - create a volume on a remote disk.
|
// MakeVol - create a volume on a remote disk.
|
||||||
func (client *storageRESTClient) MakeVol(volume string) (err error) {
|
func (client *storageRESTClient) MakeVol(volume string) (err error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
|
@ -26,6 +26,7 @@ const (
|
|||||||
storageRESTMethodDiskInfo = "/diskinfo"
|
storageRESTMethodDiskInfo = "/diskinfo"
|
||||||
storageRESTMethodCrawlAndGetDataUsage = "/crawlandgetdatausage"
|
storageRESTMethodCrawlAndGetDataUsage = "/crawlandgetdatausage"
|
||||||
storageRESTMethodMakeVol = "/makevol"
|
storageRESTMethodMakeVol = "/makevol"
|
||||||
|
storageRESTMethodMakeVolBulk = "/makevolbulk"
|
||||||
storageRESTMethodStatVol = "/statvol"
|
storageRESTMethodStatVol = "/statvol"
|
||||||
storageRESTMethodDeleteVol = "/deletevol"
|
storageRESTMethodDeleteVol = "/deletevol"
|
||||||
storageRESTMethodListVols = "/listvols"
|
storageRESTMethodListVols = "/listvols"
|
||||||
@ -47,6 +48,7 @@ const (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
storageRESTVolume = "volume"
|
storageRESTVolume = "volume"
|
||||||
|
storageRESTVolumes = "volumes"
|
||||||
storageRESTDirPath = "dir-path"
|
storageRESTDirPath = "dir-path"
|
||||||
storageRESTFilePath = "file-path"
|
storageRESTFilePath = "file-path"
|
||||||
storageRESTSrcVolume = "source-volume"
|
storageRESTSrcVolume = "source-volume"
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@ -150,6 +151,19 @@ func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Reques
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MakeVolBulkHandler - create multiple volumes as a bulk operation.
|
||||||
|
func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !s.IsValid(w, r) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
volumes := strings.Split(vars[storageRESTVolumes], ",")
|
||||||
|
err := s.storage.MakeVolBulk(volumes...)
|
||||||
|
if err != nil {
|
||||||
|
s.writeErrorResponse(w, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ListVolsHandler - list volumes.
|
// ListVolsHandler - list volumes.
|
||||||
func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if !s.IsValid(w, r) {
|
if !s.IsValid(w, r) {
|
||||||
@ -605,6 +619,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
|
|||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler))
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler))
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCrawlAndGetDataUsage).HandlerFunc(httpTraceHdrs(server.CrawlAndGetDataUsageHandler))
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCrawlAndGetDataUsage).HandlerFunc(httpTraceHdrs(server.CrawlAndGetDataUsageHandler))
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler)).Queries(restQueries(storageRESTVolume)...)
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler)).Queries(restQueries(storageRESTVolume)...)
|
||||||
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVolBulk).HandlerFunc(httpTraceHdrs(server.MakeVolBulkHandler)).Queries(restQueries(storageRESTVolumes)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatVol).HandlerFunc(httpTraceHdrs(server.StatVolHandler)).Queries(restQueries(storageRESTVolume)...)
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatVol).HandlerFunc(httpTraceHdrs(server.StatVolHandler)).Queries(restQueries(storageRESTVolume)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler)).Queries(restQueries(storageRESTVolume)...)
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler)).Queries(restQueries(storageRESTVolume)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListVols).HandlerFunc(httpTraceHdrs(server.ListVolsHandler))
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListVols).HandlerFunc(httpTraceHdrs(server.ListVolsHandler))
|
||||||
|
@ -1491,12 +1491,6 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize meta volume, if volume already exists ignores it, all disks which
|
|
||||||
// are not found are ignored as well.
|
|
||||||
if err = initFormatXLMetaVolume(storageDisks, tmpNewFormats); err != nil {
|
|
||||||
return madmin.HealResultItem{}, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save formats `format.json` across all disks.
|
// Save formats `format.json` across all disks.
|
||||||
if err = saveFormatXLAll(ctx, storageDisks, tmpNewFormats); err != nil {
|
if err = saveFormatXLAll(ctx, storageDisks, tmpNewFormats); err != nil {
|
||||||
return madmin.HealResultItem{}, err
|
return madmin.HealResultItem{}, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user