mirror of
https://github.com/minio/minio.git
synced 2024-12-23 21:55:53 -05:00
Return 503 instead of 404 if more than half of disks are not found (#6207)
Fixes #6163
This commit is contained in:
parent
df88421087
commit
264cc4020f
@ -547,7 +547,7 @@ func readConfig(ctx context.Context, objAPI ObjectLayer, configFile string) (*by
|
||||
err := objAPI.GetObject(ctx, minioMetaBucket, configFile, 0, -1, &buffer, "")
|
||||
if err != nil {
|
||||
// Ignore if err is ObjectNotFound or IncompleteBody when bucket is not configured with notification
|
||||
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
|
||||
if isErrObjectNotFound(err) || isErrIncompleteBody(err) || isInsufficientReadQuorum(err) {
|
||||
return nil, errConfigNotFound
|
||||
}
|
||||
|
||||
|
@ -390,3 +390,9 @@ func isErrObjectNotFound(err error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isInsufficientReadQuorum - Check if error type is InsufficientReadQuorum.
|
||||
func isInsufficientReadQuorum(err error) bool {
|
||||
_, ok := err.(InsufficientReadQuorum)
|
||||
return ok
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
@ -196,18 +197,12 @@ func getRedundancyCount(sc string, totalDisks int) (data, parity int) {
|
||||
// Returns per object readQuorum and writeQuorum
|
||||
// readQuorum is the minimum required disks to read data.
|
||||
// writeQuorum is the minimum required disks to write data.
|
||||
func objectQuorumFromMeta(xl xlObjects, partsMetaData []xlMetaV1, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||
|
||||
func objectQuorumFromMeta(ctx context.Context, xl xlObjects, partsMetaData []xlMetaV1, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||
// get the latest updated Metadata and a count of all the latest updated xlMeta(s)
|
||||
latestXLMeta, count := getLatestXLMeta(partsMetaData, errs)
|
||||
latestXLMeta, err := getLatestXLMeta(ctx, partsMetaData, errs)
|
||||
|
||||
// latestXLMeta is updated most recently.
|
||||
// We implicitly assume that all the xlMeta(s) have same dataBlocks and parityBlocks.
|
||||
// We now check that at least dataBlocks number of xlMeta is available. This means count
|
||||
// should be greater than or equal to dataBlocks field of latestXLMeta. If not we throw read quorum error.
|
||||
if count < latestXLMeta.Erasure.DataBlocks {
|
||||
// This is the case when we can't reliably deduce object quorum
|
||||
return 0, 0, errXLReadQuorum
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
||||
|
@ -303,7 +303,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
{parts7, errs7, 14, 15, nil},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(*xl, tt.parts, tt.errs)
|
||||
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(context.Background(), *xl, tt.parts, tt.errs)
|
||||
if tt.expectedError != nil && err == nil {
|
||||
t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err)
|
||||
return
|
||||
|
@ -840,6 +840,7 @@ func (web *webAPIHandlers) GetBucketPolicy(r *http.Request, args *GetBucketPolic
|
||||
if _, ok := err.(BucketPolicyNotFound); !ok {
|
||||
return toJSONError(err, args.BucketName)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
policyInfo, err := PolicyToBucketAccessPolicy(bucketPolicy)
|
||||
|
@ -119,8 +119,14 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error)
|
||||
return onlineDisks, modTime
|
||||
}
|
||||
|
||||
// Returns one of the latest updated xlMeta files and count of total valid xlMeta(s) updated latest
|
||||
func getLatestXLMeta(partsMetadata []xlMetaV1, errs []error) (xlMetaV1, int) {
|
||||
// Returns the latest updated xlMeta files and error in case of failure.
|
||||
func getLatestXLMeta(ctx context.Context, partsMetadata []xlMetaV1, errs []error) (xlMetaV1, error) {
|
||||
|
||||
// There should be atleast half correct entries, if not return failure
|
||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, globalXLSetDriveCount/2); reducedErr != nil {
|
||||
return xlMetaV1{}, reducedErr
|
||||
}
|
||||
|
||||
// List all the file commit ids from parts metadata.
|
||||
modTimes := listObjectModtimes(partsMetadata, errs)
|
||||
|
||||
@ -138,8 +144,11 @@ func getLatestXLMeta(partsMetadata []xlMetaV1, errs []error) (xlMetaV1, int) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
// Return one of the latest xlMetaData, and the count of lastest updated xlMeta files
|
||||
return latestXLMeta, count
|
||||
if count < len(partsMetadata)/2 {
|
||||
return xlMetaV1{}, errXLReadQuorum
|
||||
}
|
||||
|
||||
return latestXLMeta, nil
|
||||
}
|
||||
|
||||
// disksWithAllParts - This function needs to be called with
|
||||
|
@ -611,7 +611,10 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
|
||||
// Read metadata files from all the disks
|
||||
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
|
||||
latestXLMeta, _ := getLatestXLMeta(partsMetadata, errs)
|
||||
latestXLMeta, err := getLatestXLMeta(ctx, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return hr, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Lock the object before healing.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
|
@ -145,6 +145,6 @@ func TestHealObjectXL(t *testing.T) {
|
||||
_, err = obj.HealObject(context.Background(), bucket, object, false)
|
||||
// since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum
|
||||
if _, ok := err.(InsufficientReadQuorum); !ok {
|
||||
t.Errorf("Expected %v but received %v", InsufficientWriteQuorum{}, err)
|
||||
t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err)
|
||||
}
|
||||
}
|
||||
|
@ -330,7 +330,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
||||
uploadIDPath)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -612,7 +612,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -829,7 +829,7 @@ func (xl xlObjects) AbortMultipartUpload(ctx context.Context, bucket, object, up
|
||||
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc
|
||||
metaArr, errs := readAllXLMetadata(ctx, storageDisks, srcBucket, srcObject)
|
||||
|
||||
// get Quorum for this object
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(xl, metaArr, errs)
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
@ -208,7 +208,7 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO
|
||||
metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
|
||||
// get Quorum for this object
|
||||
readQuorum, _, err := objectQuorumFromMeta(xl, metaArr, errs)
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
@ -382,7 +382,7 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o
|
||||
metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
|
||||
// get Quorum for this object
|
||||
readQuorum, _, err := objectQuorumFromMeta(xl, metaArr, errs)
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
@ -771,7 +771,7 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string) err
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err = objectQuorumFromMeta(xl, metaArr, errs)
|
||||
_, writeQuorum, err = objectQuorumFromMeta(ctx, xl, metaArr, errs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user