mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
f903cae6ff
Current implementation requires server pools to have same erasure stripe sizes, to facilitate same SLA and expectations. This PR allows server pools to be variadic, i.e they do not have to be same erasure stripe sizes - instead they should have SLA for parity ratio. If the parity ratio cannot be guaranteed by the new server pool, the deployment is rejected i.e server pool expansion is not allowed.
359 lines
11 KiB
Go
359 lines
11 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"net/http"
|
|
"sort"
|
|
"time"
|
|
|
|
xhttp "github.com/minio/minio/cmd/http"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/bucket/replication"
|
|
"github.com/minio/minio/pkg/sync/errgroup"
|
|
"github.com/minio/sha256-simd"
|
|
)
|
|
|
|
const erasureAlgorithm = "rs-vandermonde"
|
|
|
|
// byObjectPartNumber is a collection satisfying sort.Interface.
|
|
type byObjectPartNumber []ObjectPartInfo
|
|
|
|
func (t byObjectPartNumber) Len() int { return len(t) }
|
|
func (t byObjectPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
|
func (t byObjectPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
|
|
|
|
// AddChecksumInfo adds a checksum of a part.
|
|
func (e *ErasureInfo) AddChecksumInfo(ckSumInfo ChecksumInfo) {
|
|
for i, sum := range e.Checksums {
|
|
if sum.PartNumber == ckSumInfo.PartNumber {
|
|
e.Checksums[i] = ckSumInfo
|
|
return
|
|
}
|
|
}
|
|
e.Checksums = append(e.Checksums, ckSumInfo)
|
|
}
|
|
|
|
// GetChecksumInfo - get checksum of a part.
|
|
func (e ErasureInfo) GetChecksumInfo(partNumber int) (ckSum ChecksumInfo) {
|
|
for _, sum := range e.Checksums {
|
|
if sum.PartNumber == partNumber {
|
|
// Return the checksum
|
|
return sum
|
|
}
|
|
}
|
|
return ChecksumInfo{}
|
|
}
|
|
|
|
// ShardFileSize - returns final erasure size from original size.
|
|
func (e ErasureInfo) ShardFileSize(totalLength int64) int64 {
|
|
if totalLength == 0 {
|
|
return 0
|
|
}
|
|
if totalLength == -1 {
|
|
return -1
|
|
}
|
|
numShards := totalLength / e.BlockSize
|
|
lastBlockSize := totalLength % e.BlockSize
|
|
lastShardSize := ceilFrac(lastBlockSize, int64(e.DataBlocks))
|
|
return numShards*e.ShardSize() + lastShardSize
|
|
}
|
|
|
|
// ShardSize - returns actual shared size from erasure blockSize.
|
|
func (e ErasureInfo) ShardSize() int64 {
|
|
return ceilFrac(e.BlockSize, int64(e.DataBlocks))
|
|
}
|
|
|
|
// IsValid - tells if erasure info fields are valid.
|
|
func (fi FileInfo) IsValid() bool {
|
|
if fi.Deleted {
|
|
// Delete marker has no data, no need to check
|
|
// for erasure coding information
|
|
return true
|
|
}
|
|
dataBlocks := fi.Erasure.DataBlocks
|
|
parityBlocks := fi.Erasure.ParityBlocks
|
|
correctIndexes := (fi.Erasure.Index > 0 &&
|
|
fi.Erasure.Index <= dataBlocks+parityBlocks &&
|
|
len(fi.Erasure.Distribution) == (dataBlocks+parityBlocks))
|
|
return ((dataBlocks >= parityBlocks) &&
|
|
(dataBlocks != 0) && (parityBlocks != 0) &&
|
|
correctIndexes)
|
|
}
|
|
|
|
// ToObjectInfo - Converts metadata to object info.
|
|
func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
|
|
object = decodeDirObject(object)
|
|
versionID := fi.VersionID
|
|
if globalBucketVersioningSys.Enabled(bucket) && versionID == "" {
|
|
versionID = nullVersionID
|
|
}
|
|
|
|
objInfo := ObjectInfo{
|
|
IsDir: HasSuffix(object, SlashSeparator),
|
|
Bucket: bucket,
|
|
Name: object,
|
|
VersionID: versionID,
|
|
IsLatest: fi.IsLatest,
|
|
DeleteMarker: fi.Deleted,
|
|
Size: fi.Size,
|
|
ModTime: fi.ModTime,
|
|
Legacy: fi.XLV1,
|
|
ContentType: fi.Metadata["content-type"],
|
|
ContentEncoding: fi.Metadata["content-encoding"],
|
|
}
|
|
// Update expires
|
|
var (
|
|
t time.Time
|
|
e error
|
|
)
|
|
if exp, ok := fi.Metadata["expires"]; ok {
|
|
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
|
objInfo.Expires = t.UTC()
|
|
}
|
|
}
|
|
objInfo.backendType = BackendErasure
|
|
|
|
// Extract etag from metadata.
|
|
objInfo.ETag = extractETag(fi.Metadata)
|
|
|
|
// Add user tags to the object info
|
|
objInfo.UserTags = fi.Metadata[xhttp.AmzObjectTagging]
|
|
|
|
// Add replication status to the object info
|
|
objInfo.ReplicationStatus = replication.StatusType(fi.Metadata[xhttp.AmzBucketReplicationStatus])
|
|
if fi.Deleted {
|
|
objInfo.ReplicationStatus = replication.StatusType(fi.DeleteMarkerReplicationStatus)
|
|
}
|
|
|
|
objInfo.TransitionStatus = fi.TransitionStatus
|
|
|
|
// etag/md5Sum has already been extracted. We need to
|
|
// remove to avoid it from appearing as part of
|
|
// response headers. e.g, X-Minio-* or X-Amz-*.
|
|
// Tags have also been extracted, we remove that as well.
|
|
objInfo.UserDefined = cleanMetadata(fi.Metadata)
|
|
|
|
// All the parts per object.
|
|
objInfo.Parts = fi.Parts
|
|
|
|
// Update storage class
|
|
if sc, ok := fi.Metadata[xhttp.AmzStorageClass]; ok {
|
|
objInfo.StorageClass = sc
|
|
} else {
|
|
objInfo.StorageClass = globalMinioDefaultStorageClass
|
|
}
|
|
objInfo.VersionPurgeStatus = fi.VersionPurgeStatus
|
|
// set restore status for transitioned object
|
|
if ongoing, exp, err := parseRestoreHeaderFromMeta(fi.Metadata); err == nil {
|
|
objInfo.RestoreOngoing = ongoing
|
|
objInfo.RestoreExpires = exp
|
|
}
|
|
// Success.
|
|
return objInfo
|
|
}
|
|
|
|
// objectPartIndex - returns the index of matching object part number.
|
|
func objectPartIndex(parts []ObjectPartInfo, partNumber int) int {
|
|
for i, part := range parts {
|
|
if partNumber == part.Number {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// AddObjectPart - add a new object part in order.
|
|
func (fi *FileInfo) AddObjectPart(partNumber int, partETag string, partSize int64, actualSize int64) {
|
|
partInfo := ObjectPartInfo{
|
|
Number: partNumber,
|
|
ETag: partETag,
|
|
Size: partSize,
|
|
ActualSize: actualSize,
|
|
}
|
|
|
|
// Update part info if it already exists.
|
|
for i, part := range fi.Parts {
|
|
if partNumber == part.Number {
|
|
fi.Parts[i] = partInfo
|
|
return
|
|
}
|
|
}
|
|
|
|
// Proceed to include new part info.
|
|
fi.Parts = append(fi.Parts, partInfo)
|
|
|
|
// Parts in FileInfo should be in sorted order by part number.
|
|
sort.Sort(byObjectPartNumber(fi.Parts))
|
|
}
|
|
|
|
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
|
|
func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIndex int, partOffset int64, err error) {
|
|
if offset == 0 {
|
|
// Special case - if offset is 0, then partIndex and partOffset are always 0.
|
|
return 0, 0, nil
|
|
}
|
|
partOffset = offset
|
|
// Seek until object offset maps to a particular part offset.
|
|
for i, part := range fi.Parts {
|
|
partIndex = i
|
|
// Offset is smaller than size we have reached the proper part offset.
|
|
if partOffset < part.Size {
|
|
return partIndex, partOffset, nil
|
|
}
|
|
// Continue to towards the next part.
|
|
partOffset -= part.Size
|
|
}
|
|
logger.LogIf(ctx, InvalidRange{})
|
|
// Offset beyond the size of the object return InvalidRange.
|
|
return 0, 0, InvalidRange{}
|
|
}
|
|
|
|
func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) {
|
|
metaHashes := make([]string, len(metaArr))
|
|
h := sha256.New()
|
|
for i, meta := range metaArr {
|
|
if meta.IsValid() && meta.ModTime.Equal(modTime) {
|
|
for _, part := range meta.Parts {
|
|
h.Write([]byte(fmt.Sprintf("part.%d", part.Number)))
|
|
}
|
|
h.Write([]byte(fmt.Sprintf("%v", meta.Erasure.Distribution)))
|
|
metaHashes[i] = hex.EncodeToString(h.Sum(nil))
|
|
h.Reset()
|
|
}
|
|
}
|
|
|
|
metaHashCountMap := make(map[string]int)
|
|
for _, hash := range metaHashes {
|
|
if hash == "" {
|
|
continue
|
|
}
|
|
metaHashCountMap[hash]++
|
|
}
|
|
|
|
maxHash := ""
|
|
maxCount := 0
|
|
for hash, count := range metaHashCountMap {
|
|
if count > maxCount {
|
|
maxCount = count
|
|
maxHash = hash
|
|
}
|
|
}
|
|
|
|
if maxCount < quorum {
|
|
return FileInfo{}, errErasureReadQuorum
|
|
}
|
|
|
|
for i, hash := range metaHashes {
|
|
if hash == maxHash {
|
|
return metaArr[i], nil
|
|
}
|
|
}
|
|
|
|
return FileInfo{}, errErasureReadQuorum
|
|
}
|
|
|
|
// pickValidFileInfo - picks one valid FileInfo content and returns from a
|
|
// slice of FileInfo.
|
|
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, quorum int) (xmv FileInfo, e error) {
|
|
return findFileInfoInQuorum(ctx, metaArr, modTime, quorum)
|
|
}
|
|
|
|
// Rename metadata content to destination location for each disk concurrently.
|
|
func renameFileInfo(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
|
|
ignoredErr := []error{errFileNotFound}
|
|
|
|
g := errgroup.WithNErrs(len(disks))
|
|
|
|
// Rename file on all underlying storage disks.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
if err := disks[index].RenameData(ctx, srcBucket, srcEntry, "", dstBucket, dstEntry); err != nil {
|
|
if !IsErrIgnored(err, ignoredErr...) {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}, index)
|
|
}
|
|
|
|
// Wait for all renames to finish.
|
|
errs := g.Wait()
|
|
|
|
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
|
// otherwise return failure. Cleanup successful renames.
|
|
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum)
|
|
return evalDisks(disks, errs), err
|
|
}
|
|
|
|
// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
|
|
func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) {
|
|
g := errgroup.WithNErrs(len(disks))
|
|
|
|
// Start writing `xl.meta` to all disks in parallel.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
// Pick one FileInfo for a disk at index.
|
|
files[index].Erasure.Index = index + 1
|
|
return disks[index].WriteMetadata(ctx, bucket, prefix, files[index])
|
|
}, index)
|
|
}
|
|
|
|
// Wait for all the routines.
|
|
mErrs := g.Wait()
|
|
|
|
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
|
return evalDisks(disks, mErrs), err
|
|
}
|
|
|
|
// Returns per object readQuorum and writeQuorum
|
|
// readQuorum is the min required disks to read data.
|
|
// writeQuorum is the min required disks to write data.
|
|
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
|
|
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
|
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
dataBlocks := latestFileInfo.Erasure.DataBlocks
|
|
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
|
if parityBlocks <= 0 {
|
|
parityBlocks = defaultParityCount
|
|
}
|
|
|
|
writeQuorum := dataBlocks
|
|
if dataBlocks == parityBlocks {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
|
// from latestFileInfo to get the quorum
|
|
return dataBlocks, writeQuorum, nil
|
|
}
|