diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 57d9fb9a2..f2f6d7bf6 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -174,8 +174,8 @@ func (p serverPoolsAvailableSpace) TotalAvailable() uint64 { // getAvailablePoolIdx will return an index that can hold size bytes. // -1 is returned if no serverPools have available space for the size given. -func (z *erasureServerPools) getAvailablePoolIdx(ctx context.Context, size int64) int { - serverPools := z.getServerPoolsAvailableSpace(ctx, size) +func (z *erasureServerPools) getAvailablePoolIdx(ctx context.Context, bucket, object string, size int64) int { + serverPools := z.getServerPoolsAvailableSpace(ctx, bucket, object, size) total := serverPools.TotalAvailable() if total == 0 { return -1 @@ -197,18 +197,16 @@ func (z *erasureServerPools) getAvailablePoolIdx(ctx context.Context, size int64 // getServerPoolsAvailableSpace will return the available space of each pool after storing the content. // If there is not enough space the pool will return 0 bytes available. // Negative sizes are seen as 0 bytes. -func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, size int64) serverPoolsAvailableSpace { - if size < 0 { - size = 0 - } +func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, bucket, object string, size int64) serverPoolsAvailableSpace { var serverPools = make(serverPoolsAvailableSpace, len(z.serverPools)) - storageInfos := make([]StorageInfo, len(z.serverPools)) + storageInfos := make([][]*DiskInfo, len(z.serverPools)) g := errgroup.WithNErrs(len(z.serverPools)) for index := range z.serverPools { index := index g.Go(func() error { - storageInfos[index] = z.serverPools[index].StorageUsageInfo(ctx) + // Get the set where it would be placed. + storageInfos[index] = getDiskInfos(ctx, z.serverPools[index].getHashedSet(object).getDisks()) return nil }, index) } @@ -218,24 +216,12 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, s for i, zinfo := range storageInfos { var available uint64 - var total uint64 - for _, disk := range zinfo.Disks { - total += disk.TotalSpace - available += disk.TotalSpace - disk.UsedSpace + if !isMinioMetaBucketName(bucket) && !hasSpaceFor(zinfo, size) { + serverPools[i] = poolAvailableSpace{Index: i} + continue } - // Make sure we can fit "size" on to the disk without getting above the diskFillFraction - if available < uint64(size) { - available = 0 - } - if available > 0 { - // How much will be left after adding the file. - available -= -uint64(size) - - // wantLeft is how much space there at least must be left. - wantLeft := uint64(float64(total) * (1.0 - diskFillFraction)) - if available <= wantLeft { - available = 0 - } + for _, disk := range zinfo { + available += disk.Total - disk.Used } serverPools[i] = poolAvailableSpace{ Index: i, @@ -298,8 +284,7 @@ func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object stri } if isErrObjectNotFound(err) { - // We multiply the size by 2 to account for erasure coding. - idx = z.getAvailablePoolIdx(ctx, size*2) + idx = z.getAvailablePoolIdx(ctx, bucket, object, size) if idx < 0 { return -1, toObjectErr(errDiskFull) } @@ -717,6 +702,9 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec object = encodeDirObject(object) if z.SinglePool() { + if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()), data.Size()) { + return ObjectInfo{}, toObjectErr(errDiskFull) + } return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) } @@ -1012,6 +1000,9 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } if z.SinglePool() { + if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()), -1) { + return "", toObjectErr(errDiskFull) + } return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) } @@ -1036,7 +1027,7 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } } - idx, err := z.getPoolIdx(ctx, bucket, object, 1<<30) + idx, err := z.getPoolIdx(ctx, bucket, object, -1) if err != nil { return "", err } diff --git a/cmd/globals.go b/cmd/globals.go index a302a4143..58756bc79 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -107,6 +107,12 @@ const ( // diskFillFraction is the fraction of a disk we allow to be filled. diskFillFraction = 0.95 + + // diskAssumeUnknownSize is the size to assume when an unknown size upload is requested. + diskAssumeUnknownSize = 1 << 30 + + // diskMinInodes is the minimum number of inodes we want free on a disk to perform writes. + diskMinInodes = 1000 ) var globalCLIContext = struct { diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 3197b77f8..60347dd28 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -76,10 +76,7 @@ const ( // isMinioBucket returns true if given bucket is a MinIO internal // bucket and false otherwise. func isMinioMetaBucketName(bucket string) bool { - return bucket == minioMetaBucket || - bucket == minioMetaMultipartBucket || - bucket == minioMetaTmpBucket || - bucket == dataUsageBucket + return strings.HasPrefix(bucket, minioMetaBucket) } // IsValidBucketName verifies that a bucket name is in accordance with @@ -965,3 +962,68 @@ func compressSelfTest() { } } + +// getDiskInfos returns the disk information for the provided disks. +// If a disk is nil or an error is returned the result will be nil as well. +func getDiskInfos(ctx context.Context, disks []StorageAPI) []*DiskInfo { + res := make([]*DiskInfo, len(disks)) + for i, disk := range disks { + if disk == nil { + continue + } + if di, err := disk.DiskInfo(ctx); err == nil { + res[i] = &di + } + } + return res +} + +// hasSpaceFor returns whether the disks in `di` have space for and object of a given size. +func hasSpaceFor(di []*DiskInfo, size int64) bool { + // We multiply the size by 2 to account for erasure coding. + size *= 2 + if size < 0 { + // If no size, assume diskAssumeUnknownSize. + size = diskAssumeUnknownSize + } + + var available uint64 + var total uint64 + var nDisks int + for _, disk := range di { + if disk == nil || disk.Total == 0 || (disk.FreeInodes < diskMinInodes && disk.UsedInodes > 0) { + // Disk offline, no inodes or something else is wrong. + continue + } + nDisks++ + total += disk.Total + available += disk.Total - disk.Used + } + + if nDisks == 0 { + return false + } + + // Check we have enough on each disk, ignoring diskFillFraction. + perDisk := size / int64(nDisks) + for _, disk := range di { + if disk == nil || disk.Total == 0 || (disk.FreeInodes < diskMinInodes && disk.UsedInodes > 0) { + continue + } + if int64(disk.Free) <= perDisk { + return false + } + } + + // Make sure we can fit "size" on to the disk without getting above the diskFillFraction + if available < uint64(size) { + return false + } + + // How much will be left after adding the file. + available -= uint64(size) + + // wantLeft is how much space there at least must be left. + wantLeft := uint64(float64(total) * (1.0 - diskFillFraction)) + return available > wantLeft +}