mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
708cebe7f0
protection was in place. However, it covered only some areas, so we re-arranged the code to ensure we could hold locks properly. Along with this, remove the DataShardFix code altogether, in deployments with many drive replacements, this can affect and lead to quorum loss.
619 lines
19 KiB
Go
619 lines
19 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio/internal/amztime"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/minio/minio/internal/hash/sha256"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/v2/sync/errgroup"
|
|
)
|
|
|
|
// Object was stored with additional erasure codes due to degraded system at upload time
|
|
const minIOErasureUpgraded = "x-minio-internal-erasure-upgraded"
|
|
|
|
const erasureAlgorithm = "rs-vandermonde"
|
|
|
|
// GetChecksumInfo - get checksum of a part.
|
|
func (e ErasureInfo) GetChecksumInfo(partNumber int) (ckSum ChecksumInfo) {
|
|
for _, sum := range e.Checksums {
|
|
if sum.PartNumber == partNumber {
|
|
// Return the checksum
|
|
return sum
|
|
}
|
|
}
|
|
return ChecksumInfo{Algorithm: DefaultBitrotAlgorithm}
|
|
}
|
|
|
|
// ShardFileSize - returns final erasure size from original size.
|
|
func (e ErasureInfo) ShardFileSize(totalLength int64) int64 {
|
|
if totalLength == 0 {
|
|
return 0
|
|
}
|
|
if totalLength == -1 {
|
|
return -1
|
|
}
|
|
numShards := totalLength / e.BlockSize
|
|
lastBlockSize := totalLength % e.BlockSize
|
|
lastShardSize := ceilFrac(lastBlockSize, int64(e.DataBlocks))
|
|
return numShards*e.ShardSize() + lastShardSize
|
|
}
|
|
|
|
// ShardSize - returns actual shared size from erasure blockSize.
|
|
func (e ErasureInfo) ShardSize() int64 {
|
|
return ceilFrac(e.BlockSize, int64(e.DataBlocks))
|
|
}
|
|
|
|
// IsValid - tells if erasure info fields are valid.
|
|
func (fi FileInfo) IsValid() bool {
|
|
if fi.Deleted {
|
|
// Delete marker has no data, no need to check
|
|
// for erasure coding information
|
|
return true
|
|
}
|
|
dataBlocks := fi.Erasure.DataBlocks
|
|
parityBlocks := fi.Erasure.ParityBlocks
|
|
correctIndexes := (fi.Erasure.Index > 0 &&
|
|
fi.Erasure.Index <= dataBlocks+parityBlocks &&
|
|
len(fi.Erasure.Distribution) == (dataBlocks+parityBlocks))
|
|
return ((dataBlocks >= parityBlocks) &&
|
|
(dataBlocks > 0) && (parityBlocks >= 0) &&
|
|
correctIndexes)
|
|
}
|
|
|
|
// ToObjectInfo - Converts metadata to object info.
|
|
func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInfo {
|
|
object = decodeDirObject(object)
|
|
versionID := fi.VersionID
|
|
if versioned && versionID == "" {
|
|
versionID = nullVersionID
|
|
}
|
|
|
|
objInfo := ObjectInfo{
|
|
IsDir: HasSuffix(object, SlashSeparator),
|
|
Bucket: bucket,
|
|
Name: object,
|
|
ParityBlocks: fi.Erasure.ParityBlocks,
|
|
DataBlocks: fi.Erasure.DataBlocks,
|
|
VersionID: versionID,
|
|
IsLatest: fi.IsLatest,
|
|
DeleteMarker: fi.Deleted,
|
|
Size: fi.Size,
|
|
ModTime: fi.ModTime,
|
|
Legacy: fi.XLV1,
|
|
ContentType: fi.Metadata["content-type"],
|
|
ContentEncoding: fi.Metadata["content-encoding"],
|
|
NumVersions: fi.NumVersions,
|
|
SuccessorModTime: fi.SuccessorModTime,
|
|
CacheControl: fi.Metadata["cache-control"],
|
|
}
|
|
|
|
if exp, ok := fi.Metadata["expires"]; ok {
|
|
if t, err := amztime.ParseHeader(exp); err == nil {
|
|
objInfo.Expires = t.UTC()
|
|
}
|
|
}
|
|
|
|
// Extract etag from metadata.
|
|
objInfo.ETag = extractETag(fi.Metadata)
|
|
|
|
// Add user tags to the object info
|
|
tags := fi.Metadata[xhttp.AmzObjectTagging]
|
|
if len(tags) != 0 {
|
|
objInfo.UserTags = tags
|
|
}
|
|
|
|
// Add replication status to the object info
|
|
objInfo.ReplicationStatusInternal = fi.ReplicationState.ReplicationStatusInternal
|
|
objInfo.VersionPurgeStatusInternal = fi.ReplicationState.VersionPurgeStatusInternal
|
|
objInfo.ReplicationStatus = fi.ReplicationStatus()
|
|
if objInfo.ReplicationStatus.Empty() { // overlay x-amx-replication-status if present for replicas
|
|
if st, ok := fi.Metadata[xhttp.AmzBucketReplicationStatus]; ok && st == string(replication.Replica) {
|
|
objInfo.ReplicationStatus = replication.StatusType(st)
|
|
}
|
|
}
|
|
objInfo.VersionPurgeStatus = fi.VersionPurgeStatus()
|
|
|
|
objInfo.TransitionedObject = TransitionedObject{
|
|
Name: fi.TransitionedObjName,
|
|
VersionID: fi.TransitionVersionID,
|
|
Status: fi.TransitionStatus,
|
|
FreeVersion: fi.TierFreeVersion(),
|
|
Tier: fi.TransitionTier,
|
|
}
|
|
|
|
// etag/md5Sum has already been extracted. We need to
|
|
// remove to avoid it from appearing as part of
|
|
// response headers. e.g, X-Minio-* or X-Amz-*.
|
|
// Tags have also been extracted, we remove that as well.
|
|
objInfo.UserDefined = cleanMetadata(fi.Metadata)
|
|
|
|
// All the parts per object.
|
|
objInfo.Parts = fi.Parts
|
|
|
|
// Update storage class
|
|
if fi.TransitionTier != "" {
|
|
objInfo.StorageClass = fi.TransitionTier
|
|
} else if sc, ok := fi.Metadata[xhttp.AmzStorageClass]; ok {
|
|
objInfo.StorageClass = sc
|
|
} else {
|
|
objInfo.StorageClass = globalMinioDefaultStorageClass
|
|
}
|
|
|
|
// set restore status for transitioned object
|
|
restoreHdr, ok := fi.Metadata[xhttp.AmzRestore]
|
|
if ok {
|
|
if restoreStatus, err := parseRestoreObjStatus(restoreHdr); err == nil {
|
|
objInfo.RestoreOngoing = restoreStatus.Ongoing()
|
|
objInfo.RestoreExpires, _ = restoreStatus.Expiry()
|
|
}
|
|
}
|
|
objInfo.Checksum = fi.Checksum
|
|
objInfo.Inlined = fi.InlineData()
|
|
// Success.
|
|
return objInfo
|
|
}
|
|
|
|
// TransitionInfoEquals returns true if transition related information are equal, false otherwise.
|
|
func (fi FileInfo) TransitionInfoEquals(ofi FileInfo) bool {
|
|
switch {
|
|
case fi.TransitionStatus != ofi.TransitionStatus,
|
|
fi.TransitionTier != ofi.TransitionTier,
|
|
fi.TransitionedObjName != ofi.TransitionedObjName,
|
|
fi.TransitionVersionID != ofi.TransitionVersionID:
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// MetadataEquals returns true if FileInfos Metadata maps are equal, false otherwise.
|
|
func (fi FileInfo) MetadataEquals(ofi FileInfo) bool {
|
|
if len(fi.Metadata) != len(ofi.Metadata) {
|
|
return false
|
|
}
|
|
for k, v := range fi.Metadata {
|
|
if ov, ok := ofi.Metadata[k]; !ok || ov != v {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// ReplicationInfoEquals returns true if server-side replication related fields are equal, false otherwise.
|
|
func (fi FileInfo) ReplicationInfoEquals(ofi FileInfo) bool {
|
|
switch {
|
|
case fi.MarkDeleted != ofi.MarkDeleted,
|
|
!fi.ReplicationState.Equal(ofi.ReplicationState):
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// objectPartIndex - returns the index of matching object part number.
|
|
func objectPartIndex(parts []ObjectPartInfo, partNumber int) int {
|
|
for i, part := range parts {
|
|
if partNumber == part.Number {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
// AddObjectPart - add a new object part in order.
|
|
func (fi *FileInfo) AddObjectPart(partNumber int, partETag string, partSize, actualSize int64, modTime time.Time, idx []byte, checksums map[string]string) {
|
|
partInfo := ObjectPartInfo{
|
|
Number: partNumber,
|
|
ETag: partETag,
|
|
Size: partSize,
|
|
ActualSize: actualSize,
|
|
ModTime: modTime,
|
|
Index: idx,
|
|
Checksums: checksums,
|
|
}
|
|
|
|
// Update part info if it already exists.
|
|
for i, part := range fi.Parts {
|
|
if partNumber == part.Number {
|
|
fi.Parts[i] = partInfo
|
|
return
|
|
}
|
|
}
|
|
|
|
// Proceed to include new part info.
|
|
fi.Parts = append(fi.Parts, partInfo)
|
|
|
|
// Parts in FileInfo should be in sorted order by part number.
|
|
sort.Slice(fi.Parts, func(i, j int) bool { return fi.Parts[i].Number < fi.Parts[j].Number })
|
|
}
|
|
|
|
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
|
|
func (fi FileInfo) ObjectToPartOffset(ctx context.Context, offset int64) (partIndex int, partOffset int64, err error) {
|
|
if offset == 0 {
|
|
// Special case - if offset is 0, then partIndex and partOffset are always 0.
|
|
return 0, 0, nil
|
|
}
|
|
partOffset = offset
|
|
// Seek until object offset maps to a particular part offset.
|
|
for i, part := range fi.Parts {
|
|
partIndex = i
|
|
// Offset is smaller than size we have reached the proper part offset.
|
|
if partOffset < part.Size {
|
|
return partIndex, partOffset, nil
|
|
}
|
|
// Continue to towards the next part.
|
|
partOffset -= part.Size
|
|
}
|
|
logger.LogIf(ctx, InvalidRange{})
|
|
// Offset beyond the size of the object return InvalidRange.
|
|
return 0, 0, InvalidRange{}
|
|
}
|
|
|
|
func findFileInfoInQuorum(ctx context.Context, metaArr []FileInfo, modTime time.Time, etag string, quorum int) (FileInfo, error) {
|
|
// with less quorum return error.
|
|
if quorum < 1 {
|
|
return FileInfo{}, errErasureReadQuorum
|
|
}
|
|
metaHashes := make([]string, len(metaArr))
|
|
h := sha256.New()
|
|
for i, meta := range metaArr {
|
|
if !meta.IsValid() {
|
|
continue
|
|
}
|
|
etagOnly := modTime.Equal(timeSentinel) && (etag != "" && etag == meta.Metadata["etag"])
|
|
mtimeValid := meta.ModTime.Equal(modTime)
|
|
if mtimeValid || etagOnly {
|
|
fmt.Fprintf(h, "%v", meta.XLV1)
|
|
if !etagOnly {
|
|
// Verify dataDir is same only when mtime is valid and etag is not considered.
|
|
fmt.Fprintf(h, "%v", meta.GetDataDir())
|
|
}
|
|
for _, part := range meta.Parts {
|
|
fmt.Fprintf(h, "part.%d", part.Number)
|
|
}
|
|
|
|
if !meta.Deleted && meta.Size != 0 {
|
|
fmt.Fprintf(h, "%v+%v", meta.Erasure.DataBlocks, meta.Erasure.ParityBlocks)
|
|
fmt.Fprintf(h, "%v", meta.Erasure.Distribution)
|
|
}
|
|
|
|
// ILM transition fields
|
|
fmt.Fprint(h, meta.TransitionStatus)
|
|
fmt.Fprint(h, meta.TransitionTier)
|
|
fmt.Fprint(h, meta.TransitionedObjName)
|
|
fmt.Fprint(h, meta.TransitionVersionID)
|
|
|
|
// Server-side replication fields
|
|
fmt.Fprintf(h, "%v", meta.MarkDeleted)
|
|
fmt.Fprint(h, meta.Metadata[string(meta.ReplicationState.ReplicaStatus)])
|
|
fmt.Fprint(h, meta.Metadata[meta.ReplicationState.ReplicationStatusInternal])
|
|
fmt.Fprint(h, meta.Metadata[meta.ReplicationState.VersionPurgeStatusInternal])
|
|
|
|
metaHashes[i] = hex.EncodeToString(h.Sum(nil))
|
|
h.Reset()
|
|
}
|
|
}
|
|
|
|
metaHashCountMap := make(map[string]int)
|
|
for _, hash := range metaHashes {
|
|
if hash == "" {
|
|
continue
|
|
}
|
|
metaHashCountMap[hash]++
|
|
}
|
|
|
|
maxHash := ""
|
|
maxCount := 0
|
|
for hash, count := range metaHashCountMap {
|
|
if count > maxCount {
|
|
maxCount = count
|
|
maxHash = hash
|
|
}
|
|
}
|
|
|
|
if maxCount < quorum {
|
|
return FileInfo{}, errErasureReadQuorum
|
|
}
|
|
|
|
// Find the successor mod time in quorum, otherwise leave the
|
|
// candidate's successor modTime as found
|
|
succModTimeMap := make(map[time.Time]int)
|
|
var candidate FileInfo
|
|
var found bool
|
|
for i, hash := range metaHashes {
|
|
if hash == maxHash {
|
|
if metaArr[i].IsValid() {
|
|
if !found {
|
|
candidate = metaArr[i]
|
|
found = true
|
|
}
|
|
succModTimeMap[metaArr[i].SuccessorModTime]++
|
|
}
|
|
}
|
|
}
|
|
var succModTime time.Time
|
|
var smodTimeQuorum bool
|
|
for smodTime, count := range succModTimeMap {
|
|
if count >= quorum {
|
|
smodTimeQuorum = true
|
|
succModTime = smodTime
|
|
break
|
|
}
|
|
}
|
|
|
|
if found {
|
|
if smodTimeQuorum {
|
|
candidate.SuccessorModTime = succModTime
|
|
candidate.IsLatest = succModTime.IsZero()
|
|
}
|
|
return candidate, nil
|
|
}
|
|
return FileInfo{}, errErasureReadQuorum
|
|
}
|
|
|
|
// pickValidFileInfo - picks one valid FileInfo content and returns from a
|
|
// slice of FileInfo.
|
|
func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Time, etag string, quorum int) (FileInfo, error) {
|
|
return findFileInfoInQuorum(ctx, metaArr, modTime, etag, quorum)
|
|
}
|
|
|
|
// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
|
|
func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) {
|
|
g := errgroup.WithNErrs(len(disks))
|
|
|
|
// Start writing `xl.meta` to all disks in parallel.
|
|
for index := range disks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if disks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
// Pick one FileInfo for a disk at index.
|
|
fi := files[index]
|
|
fi.Erasure.Index = index + 1
|
|
if fi.IsValid() {
|
|
return disks[index].WriteMetadata(ctx, bucket, prefix, fi)
|
|
}
|
|
return errCorruptedFormat
|
|
}, index)
|
|
}
|
|
|
|
// Wait for all the routines.
|
|
mErrs := g.Wait()
|
|
|
|
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
|
return evalDisks(disks, mErrs), err
|
|
}
|
|
|
|
func commonParity(parities []int, defaultParityCount int) int {
|
|
N := len(parities)
|
|
|
|
occMap := make(map[int]int)
|
|
for _, p := range parities {
|
|
occMap[p]++
|
|
}
|
|
|
|
var maxOcc, cparity int
|
|
for parity, occ := range occMap {
|
|
if parity == -1 {
|
|
// Ignore non defined parity
|
|
continue
|
|
}
|
|
|
|
readQuorum := N - parity
|
|
if defaultParityCount > 0 && parity == 0 {
|
|
// In this case, parity == 0 implies that this object version is a
|
|
// delete marker
|
|
readQuorum = N/2 + 1
|
|
}
|
|
if occ < readQuorum {
|
|
// Ignore this parity since we don't have enough shards for read quorum
|
|
continue
|
|
}
|
|
|
|
if occ > maxOcc {
|
|
maxOcc = occ
|
|
cparity = parity
|
|
}
|
|
}
|
|
|
|
if maxOcc == 0 {
|
|
// Did not found anything useful
|
|
return -1
|
|
}
|
|
return cparity
|
|
}
|
|
|
|
func listObjectParities(partsMetadata []FileInfo, errs []error) (parities []int) {
|
|
parities = make([]int, len(partsMetadata))
|
|
for index, metadata := range partsMetadata {
|
|
if errs[index] != nil {
|
|
parities[index] = -1
|
|
continue
|
|
}
|
|
if !metadata.IsValid() {
|
|
parities[index] = -1
|
|
continue
|
|
}
|
|
// Delete marker or zero byte objects take highest parity.
|
|
if metadata.Deleted || metadata.Size == 0 {
|
|
parities[index] = len(partsMetadata) / 2
|
|
} else {
|
|
parities[index] = metadata.Erasure.ParityBlocks
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Returns per object readQuorum and writeQuorum
|
|
// readQuorum is the min required disks to read data.
|
|
// writeQuorum is the min required disks to write data.
|
|
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
|
|
// There should be at least half correct entries, if not return failure
|
|
expectedRQuorum := len(partsMetaData) / 2
|
|
if defaultParityCount == 0 {
|
|
// if parity count is '0', we expected all entries to be present.
|
|
expectedRQuorum = len(partsMetaData)
|
|
}
|
|
|
|
reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, expectedRQuorum)
|
|
if reducedErr != nil {
|
|
return -1, -1, reducedErr
|
|
}
|
|
|
|
// special case when parity is '0'
|
|
if defaultParityCount == 0 {
|
|
return len(partsMetaData), len(partsMetaData), nil
|
|
}
|
|
|
|
parities := listObjectParities(partsMetaData, errs)
|
|
parityBlocks := commonParity(parities, defaultParityCount)
|
|
if parityBlocks < 0 {
|
|
return -1, -1, errErasureReadQuorum
|
|
}
|
|
|
|
if parityBlocks == 0 {
|
|
// For delete markers do not use 'defaultParityCount' as it is not expected to be the case.
|
|
// Use maximum allowed read quorum instead, writeQuorum+1 is returned for compatibility sake
|
|
// but there are no callers that shall be using this.
|
|
readQuorum := len(partsMetaData) / 2
|
|
return readQuorum, readQuorum + 1, nil
|
|
}
|
|
|
|
dataBlocks := len(partsMetaData) - parityBlocks
|
|
|
|
writeQuorum := dataBlocks
|
|
if dataBlocks == parityBlocks {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
|
// from latestFileInfo to get the quorum
|
|
return dataBlocks, writeQuorum, nil
|
|
}
|
|
|
|
const (
|
|
tierFVID = "tier-free-versionID"
|
|
tierFVMarker = "tier-free-marker"
|
|
)
|
|
|
|
// SetTierFreeVersionID sets free-version's versionID. This method is used by
|
|
// object layer to pass down a versionID to set for a free-version that may be
|
|
// created.
|
|
func (fi *FileInfo) SetTierFreeVersionID(versionID string) {
|
|
if fi.Metadata == nil {
|
|
fi.Metadata = make(map[string]string)
|
|
}
|
|
fi.Metadata[ReservedMetadataPrefixLower+tierFVID] = versionID
|
|
}
|
|
|
|
// TierFreeVersionID returns the free-version's version id.
|
|
func (fi *FileInfo) TierFreeVersionID() string {
|
|
return fi.Metadata[ReservedMetadataPrefixLower+tierFVID]
|
|
}
|
|
|
|
// SetTierFreeVersion sets fi as a free-version. This method is used by
|
|
// lower layers to indicate a free-version.
|
|
func (fi *FileInfo) SetTierFreeVersion() {
|
|
if fi.Metadata == nil {
|
|
fi.Metadata = make(map[string]string)
|
|
}
|
|
fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] = ""
|
|
}
|
|
|
|
// TierFreeVersion returns true if version is a free-version.
|
|
func (fi *FileInfo) TierFreeVersion() bool {
|
|
_, ok := fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker]
|
|
return ok
|
|
}
|
|
|
|
// IsRestoreObjReq returns true if fi corresponds to a RestoreObject request.
|
|
func (fi *FileInfo) IsRestoreObjReq() bool {
|
|
if restoreHdr, ok := fi.Metadata[xhttp.AmzRestore]; ok {
|
|
if restoreStatus, err := parseRestoreObjStatus(restoreHdr); err == nil {
|
|
if !restoreStatus.Ongoing() {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// VersionPurgeStatus returns overall version purge status for this object version across targets
|
|
func (fi *FileInfo) VersionPurgeStatus() VersionPurgeStatusType {
|
|
return fi.ReplicationState.CompositeVersionPurgeStatus()
|
|
}
|
|
|
|
// ReplicationStatus returns overall version replication status for this object version across targets
|
|
func (fi *FileInfo) ReplicationStatus() replication.StatusType {
|
|
return fi.ReplicationState.CompositeReplicationStatus()
|
|
}
|
|
|
|
// DeleteMarkerReplicationStatus returns overall replication status for this delete marker version across targets
|
|
func (fi *FileInfo) DeleteMarkerReplicationStatus() replication.StatusType {
|
|
if fi.Deleted {
|
|
return fi.ReplicationState.CompositeReplicationStatus()
|
|
}
|
|
return replication.StatusType("")
|
|
}
|
|
|
|
// GetInternalReplicationState is a wrapper method to fetch internal replication state from the map m
|
|
func GetInternalReplicationState(m map[string][]byte) ReplicationState {
|
|
m1 := make(map[string]string, len(m))
|
|
for k, v := range m {
|
|
m1[k] = string(v)
|
|
}
|
|
return getInternalReplicationState(m1)
|
|
}
|
|
|
|
// getInternalReplicationState fetches internal replication state from the map m
|
|
func getInternalReplicationState(m map[string]string) ReplicationState {
|
|
d := ReplicationState{}
|
|
for k, v := range m {
|
|
switch {
|
|
case equals(k, ReservedMetadataPrefixLower+ReplicationTimestamp):
|
|
d.ReplicaTimeStamp, _ = amztime.ParseReplicationTS(v)
|
|
case equals(k, ReservedMetadataPrefixLower+ReplicaTimestamp):
|
|
d.ReplicaTimeStamp, _ = amztime.ParseReplicationTS(v)
|
|
case equals(k, ReservedMetadataPrefixLower+ReplicaStatus):
|
|
d.ReplicaStatus = replication.StatusType(v)
|
|
case equals(k, ReservedMetadataPrefixLower+ReplicationStatus):
|
|
d.ReplicationStatusInternal = v
|
|
d.Targets = replicationStatusesMap(v)
|
|
case equals(k, VersionPurgeStatusKey):
|
|
d.VersionPurgeStatusInternal = v
|
|
d.PurgeTargets = versionPurgeStatusesMap(v)
|
|
case strings.HasPrefix(k, ReservedMetadataPrefixLower+ReplicationReset):
|
|
arn := strings.TrimPrefix(k, fmt.Sprintf("%s-", ReservedMetadataPrefixLower+ReplicationReset))
|
|
if d.ResetStatusesMap == nil {
|
|
d.ResetStatusesMap = make(map[string]string, 1)
|
|
}
|
|
d.ResetStatusesMap[arn] = v
|
|
}
|
|
}
|
|
return d
|
|
}
|