return quorum error instead of insufficient storage error (#16874)

This commit is contained in:
Anis Eleuch 2023-03-23 00:22:37 +01:00 committed by GitHub
parent 4bc52897b2
commit 1346561b9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 14 deletions

View File

@ -346,11 +346,17 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
g.Wait() g.Wait()
for i, zinfo := range storageInfos { for i, zinfo := range storageInfos {
var available uint64 if zinfo == nil {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(zinfo, size) {
serverPools[i] = poolAvailableSpace{Index: i} serverPools[i] = poolAvailableSpace{Index: i}
continue continue
} }
var available uint64
if !isMinioMetaBucketName(bucket) {
if avail, err := hasSpaceFor(zinfo, size); err != nil && !avail {
serverPools[i] = poolAvailableSpace{Index: i}
continue
}
}
var maxUsedPct int var maxUsedPct int
for _, disk := range zinfo { for _, disk := range zinfo {
if disk == nil || disk.Total == 0 { if disk == nil || disk.Total == 0 {
@ -934,8 +940,15 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
object = encodeDirObject(object) object = encodeDirObject(object)
if z.SinglePool() { if z.SinglePool() {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), data.Size()) { if !isMinioMetaBucketName(bucket) {
return ObjectInfo{}, toObjectErr(errDiskFull) avail, err := hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), data.Size())
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum)
}
if !avail {
return ObjectInfo{}, toObjectErr(errDiskFull)
}
} }
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
} }
@ -1399,8 +1412,15 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
} }
if z.SinglePool() { if z.SinglePool() {
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), -1) { if !isMinioMetaBucketName(bucket) {
return nil, toObjectErr(errDiskFull) avail, err := hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), -1)
if err != nil {
logger.LogIf(ctx, err)
return nil, toObjectErr(errErasureWriteQuorum)
}
if !avail {
return nil, toObjectErr(errDiskFull)
}
} }
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)
} }

View File

@ -1061,7 +1061,7 @@ func getDiskInfos(ctx context.Context, disks ...StorageAPI) []*DiskInfo {
} }
// hasSpaceFor returns whether the disks in `di` have space for and object of a given size. // hasSpaceFor returns whether the disks in `di` have space for and object of a given size.
func hasSpaceFor(di []*DiskInfo, size int64) bool { func hasSpaceFor(di []*DiskInfo, size int64) (bool, error) {
// We multiply the size by 2 to account for erasure coding. // We multiply the size by 2 to account for erasure coding.
size *= 2 size *= 2
if size < 0 { if size < 0 {
@ -1082,8 +1082,8 @@ func hasSpaceFor(di []*DiskInfo, size int64) bool {
available += disk.Total - disk.Used available += disk.Total - disk.Used
} }
if nDisks == 0 { if nDisks < len(di)/2 {
return false return false, fmt.Errorf("not enough online disks to calculate the available space, expected (%d)/(%d)", (len(di)/2)+1, nDisks)
} }
// Check we have enough on each disk, ignoring diskFillFraction. // Check we have enough on each disk, ignoring diskFillFraction.
@ -1093,13 +1093,13 @@ func hasSpaceFor(di []*DiskInfo, size int64) bool {
continue continue
} }
if int64(disk.Free) <= perDisk { if int64(disk.Free) <= perDisk {
return false return false, nil
} }
} }
// Make sure we can fit "size" on to the disk without getting above the diskFillFraction // Make sure we can fit "size" on to the disk without getting above the diskFillFraction
if available < uint64(size) { if available < uint64(size) {
return false return false, nil
} }
// How much will be left after adding the file. // How much will be left after adding the file.
@ -1107,5 +1107,5 @@ func hasSpaceFor(di []*DiskInfo, size int64) bool {
// wantLeft is how much space there at least must be left. // wantLeft is how much space there at least must be left.
wantLeft := uint64(float64(total) * (1.0 - diskFillFraction)) wantLeft := uint64(float64(total) * (1.0 - diskFillFraction))
return available > wantLeft return available > wantLeft, nil
} }

View File

@ -37,6 +37,6 @@ func DisableDirectIO(f *os.File) error {
} }
// AlignedBlock - pass through to directio implementation. // AlignedBlock - pass through to directio implementation.
func AlignedBlock(BlockSize int) []byte { func AlignedBlock(blockSize int) []byte {
return directio.AlignedBlock(BlockSize) return directio.AlignedBlock(blockSize)
} }