mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
30707659b5
Too many deployments come up with an odd number of hosts or drives, to facilitate even distribution among those setups allow for odd and prime numbers based packs.
444 lines
14 KiB
Go
444 lines
14 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"path"
|
|
"sort"
|
|
"time"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
xhttp "github.com/minio/minio/cmd/http"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/sync/errgroup"
|
|
"github.com/minio/sha256-simd"
|
|
)
|
|
|
|
const erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde"
|
|
|
|
// ObjectPartInfo Info of each part kept in the multipart metadata
|
|
// file after CompleteMultipartUpload() is called.
|
|
type ObjectPartInfo struct {
|
|
ETag string `json:"etag,omitempty"`
|
|
Number int `json:"number"`
|
|
Size int64 `json:"size"`
|
|
ActualSize int64 `json:"actualSize"`
|
|
}
|
|
|
|
// byObjectPartNumber is a collection satisfying sort.Interface.
|
|
type byObjectPartNumber []ObjectPartInfo
|
|
|
|
func (t byObjectPartNumber) Len() int { return len(t) }
|
|
func (t byObjectPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
|
func (t byObjectPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number }
|
|
|
|
// ChecksumInfo - carries checksums of individual scattered parts per disk.
|
|
type ChecksumInfo struct {
|
|
PartNumber int
|
|
Algorithm BitrotAlgorithm
|
|
Hash []byte
|
|
}
|
|
|
|
type checksumInfoJSON struct {
|
|
Name string `json:"name"`
|
|
Algorithm string `json:"algorithm"`
|
|
Hash string `json:"hash,omitempty"`
|
|
}
|
|
|
|
// MarshalJSON marshals the ChecksumInfo struct
|
|
func (c ChecksumInfo) MarshalJSON() ([]byte, error) {
|
|
info := checksumInfoJSON{
|
|
Name: fmt.Sprintf("part.%d", c.PartNumber),
|
|
Algorithm: c.Algorithm.String(),
|
|
Hash: hex.EncodeToString(c.Hash),
|
|
}
|
|
return json.Marshal(info)
|
|
}
|
|
|
|
// UnmarshalJSON - should never be called, instead xlMetaV1UnmarshalJSON() should be used.
|
|
func (c *ChecksumInfo) UnmarshalJSON(data []byte) error {
|
|
var info checksumInfoJSON
|
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
|
if err := json.Unmarshal(data, &info); err != nil {
|
|
return err
|
|
}
|
|
sum, err := hex.DecodeString(info.Hash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.Algorithm = BitrotAlgorithmFromString(info.Algorithm)
|
|
c.Hash = sum
|
|
if _, err = fmt.Sscanf(info.Name, "part.%d", &c.PartNumber); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !c.Algorithm.Available() {
|
|
logger.LogIf(context.Background(), errBitrotHashAlgoInvalid)
|
|
return errBitrotHashAlgoInvalid
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ErasureInfo holds erasure coding and bitrot related information.
|
|
type ErasureInfo struct {
|
|
// Algorithm is the string representation of erasure-coding-algorithm
|
|
Algorithm string `json:"algorithm"`
|
|
// DataBlocks is the number of data blocks for erasure-coding
|
|
DataBlocks int `json:"data"`
|
|
// ParityBlocks is the number of parity blocks for erasure-coding
|
|
ParityBlocks int `json:"parity"`
|
|
// BlockSize is the size of one erasure-coded block
|
|
BlockSize int64 `json:"blockSize"`
|
|
// Index is the index of the current disk
|
|
Index int `json:"index"`
|
|
// Distribution is the distribution of the data and parity blocks
|
|
Distribution []int `json:"distribution"`
|
|
// Checksums holds all bitrot checksums of all erasure encoded blocks
|
|
Checksums []ChecksumInfo `json:"checksum,omitempty"`
|
|
}
|
|
|
|
// AddChecksumInfo adds a checksum of a part.
|
|
func (e *ErasureInfo) AddChecksumInfo(ckSumInfo ChecksumInfo) {
|
|
for i, sum := range e.Checksums {
|
|
if sum.PartNumber == ckSumInfo.PartNumber {
|
|
e.Checksums[i] = ckSumInfo
|
|
return
|
|
}
|
|
}
|
|
e.Checksums = append(e.Checksums, ckSumInfo)
|
|
}
|
|
|
|
// GetChecksumInfo - get checksum of a part.
|
|
func (e ErasureInfo) GetChecksumInfo(partNumber int) (ckSum ChecksumInfo) {
|
|
for _, sum := range e.Checksums {
|
|
if sum.PartNumber == partNumber {
|
|
// Return the checksum
|
|
return sum
|
|
}
|
|
}
|
|
return ChecksumInfo{}
|
|
}
|
|
|
|
// statInfo - carries stat information of the object.
|
|
type statInfo struct {
|
|
Size int64 `json:"size"` // Size of the object `xl.json`.
|
|
ModTime time.Time `json:"modTime"` // ModTime of the object `xl.json`.
|
|
}
|
|
|
|
// A xlMetaV1 represents `xl.json` metadata header.
|
|
type xlMetaV1 struct {
|
|
Version string `json:"version"` // Version of the current `xl.json`.
|
|
Format string `json:"format"` // Format of the current `xl.json`.
|
|
Stat statInfo `json:"stat"` // Stat of the current object `xl.json`.
|
|
// Erasure coded info for the current object `xl.json`.
|
|
Erasure ErasureInfo `json:"erasure"`
|
|
// MinIO release tag for current object `xl.json`.
|
|
Minio struct {
|
|
Release string `json:"release"`
|
|
} `json:"minio"`
|
|
// Metadata map for current object `xl.json`.
|
|
Meta map[string]string `json:"meta,omitempty"`
|
|
// Captures all the individual object `xl.json`.
|
|
Parts []ObjectPartInfo `json:"parts,omitempty"`
|
|
}
|
|
|
|
// XL metadata constants.
|
|
const (
|
|
// XL meta version.
|
|
xlMetaVersion = "1.0.1"
|
|
|
|
// XL meta version.
|
|
xlMetaVersion100 = "1.0.0"
|
|
|
|
// XL meta format string.
|
|
xlMetaFormat = "xl"
|
|
|
|
// Add new constants here.
|
|
)
|
|
|
|
// newXLMetaV1 - initializes new xlMetaV1, adds version, allocates a fresh erasure info.
|
|
func newXLMetaV1(object string, dataBlocks, parityBlocks int) (xlMeta xlMetaV1) {
|
|
xlMeta = xlMetaV1{}
|
|
xlMeta.Version = xlMetaVersion
|
|
xlMeta.Format = xlMetaFormat
|
|
xlMeta.Minio.Release = ReleaseTag
|
|
xlMeta.Erasure = ErasureInfo{
|
|
Algorithm: erasureAlgorithmKlauspost,
|
|
DataBlocks: dataBlocks,
|
|
ParityBlocks: parityBlocks,
|
|
BlockSize: blockSizeV1,
|
|
Distribution: hashOrder(object, dataBlocks+parityBlocks),
|
|
}
|
|
return xlMeta
|
|
}
|
|
|
|
// Return a new xlMetaV1 initialized using the given xlMetaV1. Used in healing to make sure that we do not copy
|
|
// over any part's checksum info which will differ for different disks.
|
|
func newXLMetaFromXLMeta(meta xlMetaV1) xlMetaV1 {
|
|
xlMeta := meta
|
|
xlMeta.Erasure.Checksums = nil
|
|
xlMeta.Parts = nil
|
|
return xlMeta
|
|
}
|
|
|
|
// IsValid - tells if the format is sane by validating the version
|
|
// string, format and erasure info fields.
|
|
func (m xlMetaV1) IsValid() bool {
|
|
return isXLMetaFormatValid(m.Version, m.Format) &&
|
|
isXLMetaErasureInfoValid(m.Erasure.DataBlocks, m.Erasure.ParityBlocks)
|
|
}
|
|
|
|
// Verifies if the backend format metadata is sane by validating
|
|
// the version string and format style.
|
|
func isXLMetaFormatValid(version, format string) bool {
|
|
return ((version == xlMetaVersion || version == xlMetaVersion100) &&
|
|
format == xlMetaFormat)
|
|
}
|
|
|
|
// Verifies if the backend format metadata is sane by validating
|
|
// the ErasureInfo, i.e. data and parity blocks.
|
|
func isXLMetaErasureInfoValid(data, parity int) bool {
|
|
return ((data >= parity) && (data != 0) && (parity != 0))
|
|
}
|
|
|
|
// Converts metadata to object info.
|
|
func (m xlMetaV1) ToObjectInfo(bucket, object string) ObjectInfo {
|
|
objInfo := ObjectInfo{
|
|
IsDir: false,
|
|
Bucket: bucket,
|
|
Name: object,
|
|
Size: m.Stat.Size,
|
|
ModTime: m.Stat.ModTime,
|
|
ContentType: m.Meta["content-type"],
|
|
ContentEncoding: m.Meta["content-encoding"],
|
|
}
|
|
// Update expires
|
|
var (
|
|
t time.Time
|
|
e error
|
|
)
|
|
if exp, ok := m.Meta["expires"]; ok {
|
|
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
|
objInfo.Expires = t.UTC()
|
|
}
|
|
}
|
|
objInfo.backendType = BackendErasure
|
|
|
|
// Extract etag from metadata.
|
|
objInfo.ETag = extractETag(m.Meta)
|
|
|
|
// Add user tags to the object info
|
|
objInfo.UserTags = m.Meta[xhttp.AmzObjectTagging]
|
|
|
|
// etag/md5Sum has already been extracted. We need to
|
|
// remove to avoid it from appearing as part of
|
|
// response headers. e.g, X-Minio-* or X-Amz-*.
|
|
// Tags have also been extracted, we remove that as well.
|
|
objInfo.UserDefined = cleanMetadata(m.Meta)
|
|
|
|
// All the parts per object.
|
|
objInfo.Parts = m.Parts
|
|
|
|
// Update storage class
|
|
if sc, ok := m.Meta[xhttp.AmzStorageClass]; ok {
|
|
objInfo.StorageClass = sc
|
|
} else {
|
|
objInfo.StorageClass = globalMinioDefaultStorageClass
|
|
}
|
|
|
|
// Success.
|
|
return objInfo
|
|
}
|
|
|
|
// objectPartIndex - returns the index of matching object part number.
|
|
func objectPartIndex(parts []ObjectPartInfo, partNumber int) int {
|
|
for i, part := range parts {
|
|
if partNumber == part.Number {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// AddObjectPart - add a new object part in order.
|
|
func (m *xlMetaV1) AddObjectPart(partNumber int, partETag string, partSize int64, actualSize int64) {
|
|
partInfo := ObjectPartInfo{
|
|
Number: partNumber,
|
|
ETag: partETag,
|
|
Size: partSize,
|
|
ActualSize: actualSize,
|
|
}
|
|
|
|
// Update part info if it already exists.
|
|
for i, part := range m.Parts {
|
|
if partNumber == part.Number {
|
|
m.Parts[i] = partInfo
|
|
return
|
|
}
|
|
}
|
|
|
|
// Proceed to include new part info.
|
|
m.Parts = append(m.Parts, partInfo)
|
|
|
|
// Parts in xlMeta should be in sorted order by part number.
|
|
sort.Sort(byObjectPartNumber(m.Parts))
|
|
}
|
|
|
|
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
|
|
func (m xlMetaV1) ObjectToPartOffset(ctx context.Context, offset int64) (partIndex int, partOffset int64, err error) {
|
|
if offset == 0 {
|
|
// Special case - if offset is 0, then partIndex and partOffset are always 0.
|
|
return 0, 0, nil
|
|
}
|
|
partOffset = offset
|
|
// Seek until object offset maps to a particular part offset.
|
|
for i, part := range m.Parts {
|
|
partIndex = i
|
|
// Offset is smaller than size we have reached the proper part offset.
|
|
if partOffset < part.Size {
|
|
return partIndex, partOffset, nil
|
|
}
|
|
// Continue to towards the next part.
|
|
partOffset -= part.Size
|
|
}
|
|
logger.LogIf(ctx, InvalidRange{})
|
|
// Offset beyond the size of the object return InvalidRange.
|
|
return 0, 0, InvalidRange{}
|
|
}
|
|
|
|
func getXLMetaInQuorum(ctx context.Context, metaArr []xlMetaV1, modTime time.Time, quorum int) (xmv xlMetaV1, e error) {
|
|
metaHashes := make([]string, len(metaArr))
|
|
for i, meta := range metaArr {
|
|
if meta.IsValid() && meta.Stat.ModTime.Equal(modTime) {
|
|
h := sha256.New()
|
|
for _, part := range meta.Parts {
|
|
h.Write([]byte(fmt.Sprintf("part.%d", part.Number)))
|
|
}
|
|
metaHashes[i] = hex.EncodeToString(h.Sum(nil))
|
|
}
|
|
}
|
|
|
|
metaHashCountMap := make(map[string]int)
|
|
for _, hash := range metaHashes {
|
|
if hash == "" {
|
|
continue
|
|
}
|
|
metaHashCountMap[hash]++
|
|
}
|
|
|
|
maxHash := ""
|
|
maxCount := 0
|
|
for hash, count := range metaHashCountMap {
|
|
if count > maxCount {
|
|
maxCount = count
|
|
maxHash = hash
|
|
}
|
|
}
|
|
|
|
if maxCount < quorum {
|
|
return xlMetaV1{}, errXLReadQuorum
|
|
}
|
|
|
|
for i, hash := range metaHashes {
|
|
if hash == maxHash {
|
|
return metaArr[i], nil
|
|
}
|
|
}
|
|
|
|
return xlMetaV1{}, errXLReadQuorum
|
|
}
|
|
|
|
// pickValidXLMeta - picks one valid xlMeta content and returns from a
|
|
// slice of xlmeta content.
|
|
func pickValidXLMeta(ctx context.Context, metaArr []xlMetaV1, modTime time.Time, quorum int) (xmv xlMetaV1, e error) {
|
|
return getXLMetaInQuorum(ctx, metaArr, modTime, quorum)
|
|
}
|
|
|
|
// list of all errors that can be ignored in a metadata operation.
|
|
var objMetadataOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound, errFileAccessDenied, errCorruptedFormat)
|
|
|
|
// writeXLMetadata - writes `xl.json` to a single disk.
|
|
func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error {
|
|
jsonFile := path.Join(prefix, xlMetaJSONFile)
|
|
|
|
// Marshal json.
|
|
metadataBytes, err := json.Marshal(&xlMeta)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
|
|
// Persist marshaled data.
|
|
err = disk.WriteAll(bucket, jsonFile, bytes.NewReader(metadataBytes))
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
|
|
// Rename `xl.json` content to destination location for each disk in order.
|
|
func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
|
|
isDir := false
|
|
srcXLJSON := path.Join(srcEntry, xlMetaJSONFile)
|
|
dstXLJSON := path.Join(dstEntry, xlMetaJSONFile)
|
|
return rename(ctx, disks, srcBucket, srcXLJSON, dstBucket, dstXLJSON, isDir, quorum, []error{errFileNotFound})
|
|
}
|
|
|
|
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
|
|
func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) {
|
|
g := errgroup.WithNErrs(len(disks))
|
|
|
|
// Start writing `xl.json` to all disks in parallel.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
// Pick one xlMeta for a disk at index.
|
|
xlMetas[index].Erasure.Index = index + 1
|
|
return writeXLMetadata(ctx, disks[index], bucket, prefix, xlMetas[index])
|
|
}, index)
|
|
}
|
|
|
|
// Wait for all the routines.
|
|
mErrs := g.Wait()
|
|
|
|
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
|
return evalDisks(disks, mErrs), err
|
|
}
|
|
|
|
// Returns per object readQuorum and writeQuorum
|
|
// readQuorum is the min required disks to read data.
|
|
// writeQuorum is the min required disks to write data.
|
|
func objectQuorumFromMeta(ctx context.Context, xl xlObjects, partsMetaData []xlMetaV1, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
|
// get the latest updated Metadata and a count of all the latest updated xlMeta(s)
|
|
latestXLMeta, err := getLatestXLMeta(ctx, partsMetaData, errs)
|
|
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
|
// from latestXLMeta to get the quorum
|
|
return latestXLMeta.Erasure.DataBlocks, latestXLMeta.Erasure.DataBlocks + 1, nil
|
|
}
|