mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
Add number of offline disks in quorum errors (#16822)
This commit is contained in:
parent
443250d135
commit
54c5c88fe6
@ -226,12 +226,10 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
|
|||||||
// toAdminAPIErrCode - converts errErasureWriteQuorum error to admin API
|
// toAdminAPIErrCode - converts errErasureWriteQuorum error to admin API
|
||||||
// specific error.
|
// specific error.
|
||||||
func toAdminAPIErrCode(ctx context.Context, err error) APIErrorCode {
|
func toAdminAPIErrCode(ctx context.Context, err error) APIErrorCode {
|
||||||
switch err {
|
if errors.Is(err, errErasureWriteQuorum) {
|
||||||
case errErasureWriteQuorum:
|
|
||||||
return ErrAdminConfigNoQuorum
|
return ErrAdminConfigNoQuorum
|
||||||
default:
|
|
||||||
return toAPIErrorCode(ctx, err)
|
|
||||||
}
|
}
|
||||||
|
return toAPIErrorCode(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wraps export error for more context
|
// wraps export error for more context
|
||||||
|
@ -2026,6 +2026,9 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
|
|||||||
return ErrClientDisconnected
|
return ErrClientDisconnected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unwrap the error first
|
||||||
|
err = unwrapAll(err)
|
||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case errInvalidArgument:
|
case errInvalidArgument:
|
||||||
apiErr = ErrAdminInvalidArgument
|
apiErr = ErrAdminInvalidArgument
|
||||||
|
@ -20,8 +20,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/crypto"
|
"github.com/minio/minio/internal/crypto"
|
||||||
@ -67,11 +65,6 @@ var toAPIErrorTests = []struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestAPIErrCode(t *testing.T) {
|
func TestAPIErrCode(t *testing.T) {
|
||||||
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
|
|
||||||
defer os.RemoveAll(disk)
|
|
||||||
|
|
||||||
initFSObjects(disk, t)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for i, testCase := range toAPIErrorTests {
|
for i, testCase := range toAPIErrorTests {
|
||||||
errCode := toAPIErrorCode(ctx, testCase.err)
|
errCode := toAPIErrorCode(ctx, testCase.err)
|
||||||
|
@ -20,6 +20,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -124,6 +125,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||||||
readTriggerCh <- true
|
readTriggerCh <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
disksNotFound := int32(0)
|
||||||
bitrotHeal := int32(0) // Atomic bool flag.
|
bitrotHeal := int32(0) // Atomic bool flag.
|
||||||
missingPartsHeal := int32(0) // Atomic bool flag.
|
missingPartsHeal := int32(0) // Atomic bool flag.
|
||||||
readerIndex := 0
|
readerIndex := 0
|
||||||
@ -164,10 +166,13 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||||||
p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize]
|
p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize]
|
||||||
n, err := rr.ReadAt(p.buf[bufIdx], p.offset)
|
n, err := rr.ReadAt(p.buf[bufIdx], p.offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errFileNotFound) {
|
switch {
|
||||||
|
case errors.Is(err, errFileNotFound):
|
||||||
atomic.StoreInt32(&missingPartsHeal, 1)
|
atomic.StoreInt32(&missingPartsHeal, 1)
|
||||||
} else if errors.Is(err, errFileCorrupt) {
|
case errors.Is(err, errFileCorrupt):
|
||||||
atomic.StoreInt32(&bitrotHeal, 1)
|
atomic.StoreInt32(&bitrotHeal, 1)
|
||||||
|
case errors.Is(err, errDiskNotFound):
|
||||||
|
atomic.AddInt32(&disksNotFound, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will be communicated upstream.
|
// This will be communicated upstream.
|
||||||
@ -189,16 +194,16 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
if p.canDecode(newBuf) {
|
if p.canDecode(newBuf) {
|
||||||
p.offset += p.shardSize
|
p.offset += p.shardSize
|
||||||
if atomic.LoadInt32(&missingPartsHeal) == 1 {
|
if missingPartsHeal == 1 {
|
||||||
return newBuf, errFileNotFound
|
return newBuf, errFileNotFound
|
||||||
} else if atomic.LoadInt32(&bitrotHeal) == 1 {
|
} else if bitrotHeal == 1 {
|
||||||
return newBuf, errFileCorrupt
|
return newBuf, errFileCorrupt
|
||||||
}
|
}
|
||||||
return newBuf, nil
|
return newBuf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we cannot decode, just return read quorum error.
|
// If we cannot decode, just return read quorum error.
|
||||||
return nil, errErasureReadQuorum
|
return nil, fmt.Errorf("%w (offline-disks=%d/%d)", errErasureReadQuorum, disksNotFound, len(p.readers))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
|
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
|
||||||
|
@ -19,6 +19,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -70,7 +71,9 @@ func (p *parallelWriter) Write(ctx context.Context, blocks [][]byte) error {
|
|||||||
if nilCount >= p.writeQuorum {
|
if nilCount >= p.writeQuorum {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return reduceWriteQuorumErrs(ctx, p.errs, objectOpIgnoredErrs, p.writeQuorum)
|
|
||||||
|
writeErr := reduceWriteQuorumErrs(ctx, p.errs, objectOpIgnoredErrs, p.writeQuorum)
|
||||||
|
return fmt.Errorf("%w (offline-disks=%d/%d)", writeErr, countErrs(p.errs, errDiskNotFound), len(p.writers))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode reads from the reader, erasure-encodes the data and writes to the writers.
|
// Encode reads from the reader, erasure-encodes the data and writes to the writers.
|
||||||
|
@ -88,7 +88,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
|
|||||||
|
|
||||||
if write {
|
if write {
|
||||||
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if reducedErr == errErasureWriteQuorum {
|
if errors.Is(reducedErr, errErasureWriteQuorum) {
|
||||||
return fi, nil, reducedErr
|
return fi, nil, reducedErr
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -563,7 +563,7 @@ func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry
|
|||||||
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
|
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
|
||||||
// otherwise return failure. Cleanup successful renames.
|
// otherwise return failure. Cleanup successful renames.
|
||||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if err == errErasureWriteQuorum {
|
if errors.Is(err, errErasureWriteQuorum) {
|
||||||
// Remove all written
|
// Remove all written
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
for index := range disks {
|
for index := range disks {
|
||||||
|
@ -31,9 +31,14 @@ func toObjectErr(err error, params ...string) error {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if errors.Is(err, context.Canceled) {
|
|
||||||
|
// Unwarp the error first
|
||||||
|
err = unwrapAll(err)
|
||||||
|
|
||||||
|
if err == context.Canceled {
|
||||||
return context.Canceled
|
return context.Canceled
|
||||||
}
|
}
|
||||||
|
|
||||||
switch err.Error() {
|
switch err.Error() {
|
||||||
case errVolumeNotFound.Error():
|
case errVolumeNotFound.Error():
|
||||||
apiErr := BucketNotFound{}
|
apiErr := BucketNotFound{}
|
||||||
|
@ -224,19 +224,6 @@ func prepareErasure16(ctx context.Context) (ObjectLayer, []string, error) {
|
|||||||
return prepareErasure(ctx, 16)
|
return prepareErasure(ctx, 16)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize FS objects.
|
|
||||||
func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) {
|
|
||||||
obj, _, err := initObjectLayer(context.Background(), mustGetPoolEndpoints(disk))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
newTestConfig(globalMinioDefaultRegion, obj)
|
|
||||||
|
|
||||||
initAllSubsystems(GlobalContext)
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestErrHandler - Go testing.T satisfy this interface.
|
// TestErrHandler - Go testing.T satisfy this interface.
|
||||||
// This makes it easy to run the TestServer from any of the tests.
|
// This makes it easy to run the TestServer from any of the tests.
|
||||||
// Using this interface, functionalities to be used in tests can be
|
// Using this interface, functionalities to be used in tests can be
|
||||||
|
11
cmd/utils.go
11
cmd/utils.go
@ -1261,3 +1261,14 @@ func MockOpenIDTestUserInteraction(ctx context.Context, pro OpenIDClientAppParam
|
|||||||
// fmt.Printf("TOKEN: %s\n", rawIDToken)
|
// fmt.Printf("TOKEN: %s\n", rawIDToken)
|
||||||
return rawIDToken, nil
|
return rawIDToken, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// unwrapAll will unwrap the returned error completely.
|
||||||
|
func unwrapAll(err error) error {
|
||||||
|
for {
|
||||||
|
werr := errors.Unwrap(err)
|
||||||
|
if werr == nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = werr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package crypto
|
package crypto
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -25,24 +26,29 @@ import (
|
|||||||
// an object. It indicates that the object itself or its metadata was
|
// an object. It indicates that the object itself or its metadata was
|
||||||
// modified accidentally or maliciously.
|
// modified accidentally or maliciously.
|
||||||
type Error struct {
|
type Error struct {
|
||||||
err error
|
msg string
|
||||||
|
cause error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Errorf - formats according to a format specifier and returns
|
// Errorf - formats according to a format specifier and returns
|
||||||
// the string as a value that satisfies error of type crypto.Error
|
// the string as a value that satisfies error of type crypto.Error
|
||||||
func Errorf(format string, a ...interface{}) error {
|
func Errorf(format string, a ...interface{}) error {
|
||||||
return Error{err: fmt.Errorf(format, a...)}
|
e := fmt.Errorf(format, a...)
|
||||||
|
ee := Error{}
|
||||||
|
ee.msg = e.Error()
|
||||||
|
ee.cause = errors.Unwrap(e)
|
||||||
|
return ee
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unwrap the internal error.
|
// Unwrap the internal error.
|
||||||
func (e Error) Unwrap() error { return e.err }
|
func (e Error) Unwrap() error { return e.cause }
|
||||||
|
|
||||||
// Error 'error' compatible method.
|
// Error 'error' compatible method.
|
||||||
func (e Error) Error() string {
|
func (e Error) Error() string {
|
||||||
if e.err == nil {
|
if e.msg == "" {
|
||||||
return "crypto: cause <nil>"
|
return "crypto: cause <nil>"
|
||||||
}
|
}
|
||||||
return e.err.Error()
|
return e.msg
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
Loading…
Reference in New Issue
Block a user