2021-04-18 15:41:13 -04:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2020-06-12 23:04:01 -04:00
package cmd
import (
"context"
"encoding/hex"
"fmt"
"sort"
2021-09-18 16:31:35 -04:00
"strings"
2020-06-12 23:04:01 -04:00
"time"
2022-09-07 10:24:54 -04:00
"github.com/minio/minio/internal/amztime"
2024-07-25 17:02:50 -04:00
"github.com/minio/minio/internal/bucket/lifecycle"
2021-06-01 17:59:40 -04:00
"github.com/minio/minio/internal/bucket/replication"
2024-03-01 01:49:01 -05:00
"github.com/minio/minio/internal/crypto"
2022-05-27 09:00:19 -04:00
"github.com/minio/minio/internal/hash/sha256"
2021-06-01 17:59:40 -04:00
xhttp "github.com/minio/minio/internal/http"
2024-05-24 19:05:23 -04:00
"github.com/minio/pkg/v3/sync/errgroup"
2020-06-12 23:04:01 -04:00
)
2021-05-27 16:38:04 -04:00
// Object was stored with additional erasure codes due to degraded system at upload time
const minIOErasureUpgraded = "x-minio-internal-erasure-upgraded"
2020-06-12 23:04:01 -04:00
const erasureAlgorithm = "rs-vandermonde"
// GetChecksumInfo - get checksum of a part.
func ( e ErasureInfo ) GetChecksumInfo ( partNumber int ) ( ckSum ChecksumInfo ) {
for _ , sum := range e . Checksums {
if sum . PartNumber == partNumber {
// Return the checksum
return sum
}
}
2023-09-01 16:45:58 -04:00
return ChecksumInfo { Algorithm : DefaultBitrotAlgorithm }
2020-06-12 23:04:01 -04:00
}
// ShardFileSize - returns final erasure size from original size.
func ( e ErasureInfo ) ShardFileSize ( totalLength int64 ) int64 {
if totalLength == 0 {
return 0
}
if totalLength == - 1 {
return - 1
}
numShards := totalLength / e . BlockSize
lastBlockSize := totalLength % e . BlockSize
lastShardSize := ceilFrac ( lastBlockSize , int64 ( e . DataBlocks ) )
return numShards * e . ShardSize ( ) + lastShardSize
}
// ShardSize - returns actual shared size from erasure blockSize.
func ( e ErasureInfo ) ShardSize ( ) int64 {
return ceilFrac ( e . BlockSize , int64 ( e . DataBlocks ) )
}
// IsValid - tells if erasure info fields are valid.
func ( fi FileInfo ) IsValid ( ) bool {
if fi . Deleted {
// Delete marker has no data, no need to check
// for erasure coding information
return true
}
2020-08-03 15:15:08 -04:00
dataBlocks := fi . Erasure . DataBlocks
parityBlocks := fi . Erasure . ParityBlocks
2020-10-28 22:24:01 -04:00
correctIndexes := ( fi . Erasure . Index > 0 &&
fi . Erasure . Index <= dataBlocks + parityBlocks &&
len ( fi . Erasure . Distribution ) == ( dataBlocks + parityBlocks ) )
2020-08-03 15:15:08 -04:00
return ( ( dataBlocks >= parityBlocks ) &&
2022-05-30 13:58:37 -04:00
( dataBlocks > 0 ) && ( parityBlocks >= 0 ) &&
2020-10-28 22:24:01 -04:00
correctIndexes )
2020-06-12 23:04:01 -04:00
}
// ToObjectInfo - Converts metadata to object info.
2022-05-31 05:57:57 -04:00
func ( fi FileInfo ) ToObjectInfo ( bucket , object string , versioned bool ) ObjectInfo {
2020-09-19 11:39:41 -04:00
object = decodeDirObject ( object )
2020-09-16 13:21:50 -04:00
versionID := fi . VersionID
2022-05-31 05:57:57 -04:00
if versioned && versionID == "" {
2020-09-16 13:21:50 -04:00
versionID = nullVersionID
}
2020-06-12 23:04:01 -04:00
objInfo := ObjectInfo {
2021-02-01 12:52:11 -05:00
IsDir : HasSuffix ( object , SlashSeparator ) ,
Bucket : bucket ,
Name : object ,
2023-05-19 12:42:45 -04:00
ParityBlocks : fi . Erasure . ParityBlocks ,
DataBlocks : fi . Erasure . DataBlocks ,
2021-02-01 12:52:11 -05:00
VersionID : versionID ,
IsLatest : fi . IsLatest ,
DeleteMarker : fi . Deleted ,
Size : fi . Size ,
ModTime : fi . ModTime ,
Legacy : fi . XLV1 ,
ContentType : fi . Metadata [ "content-type" ] ,
ContentEncoding : fi . Metadata [ "content-encoding" ] ,
NumVersions : fi . NumVersions ,
SuccessorModTime : fi . SuccessorModTime ,
2023-11-22 16:46:17 -05:00
CacheControl : fi . Metadata [ "cache-control" ] ,
2020-06-12 23:04:01 -04:00
}
2021-02-01 12:52:11 -05:00
2020-06-12 23:04:01 -04:00
if exp , ok := fi . Metadata [ "expires" ] ; ok {
2022-09-07 10:24:54 -04:00
if t , err := amztime . ParseHeader ( exp ) ; err == nil {
2022-09-05 22:18:18 -04:00
objInfo . Expires = t . UTC ( )
2020-06-12 23:04:01 -04:00
}
}
// Extract etag from metadata.
objInfo . ETag = extractETag ( fi . Metadata )
// Add user tags to the object info
2021-04-10 12:13:12 -04:00
tags := fi . Metadata [ xhttp . AmzObjectTagging ]
if len ( tags ) != 0 {
objInfo . UserTags = tags
}
2020-06-12 23:04:01 -04:00
2020-07-21 20:49:56 -04:00
// Add replication status to the object info
2021-09-18 16:31:35 -04:00
objInfo . ReplicationStatusInternal = fi . ReplicationState . ReplicationStatusInternal
objInfo . VersionPurgeStatusInternal = fi . ReplicationState . VersionPurgeStatusInternal
2022-12-29 01:48:33 -05:00
objInfo . ReplicationStatus = fi . ReplicationStatus ( )
2023-10-26 00:24:10 -04:00
if objInfo . ReplicationStatus . Empty ( ) { // overlay x-amx-replication-status if present for replicas
if st , ok := fi . Metadata [ xhttp . AmzBucketReplicationStatus ] ; ok && st == string ( replication . Replica ) {
objInfo . ReplicationStatus = replication . StatusType ( st )
}
}
2022-12-29 01:48:33 -05:00
objInfo . VersionPurgeStatus = fi . VersionPurgeStatus ( )
2020-11-12 15:12:09 -05:00
2021-08-17 10:50:00 -04:00
objInfo . TransitionedObject = TransitionedObject {
Name : fi . TransitionedObjName ,
VersionID : fi . TransitionVersionID ,
Status : fi . TransitionStatus ,
FreeVersion : fi . TierFreeVersion ( ) ,
Tier : fi . TransitionTier ,
}
2020-11-12 15:12:09 -05:00
2020-06-12 23:04:01 -04:00
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of
// response headers. e.g, X-Minio-* or X-Amz-*.
// Tags have also been extracted, we remove that as well.
objInfo . UserDefined = cleanMetadata ( fi . Metadata )
// All the parts per object.
objInfo . Parts = fi . Parts
// Update storage class
2022-11-09 18:57:34 -05:00
if fi . TransitionTier != "" {
objInfo . StorageClass = fi . TransitionTier
} else if sc , ok := fi . Metadata [ xhttp . AmzStorageClass ] ; ok {
2020-06-12 23:04:01 -04:00
objInfo . StorageClass = sc
} else {
objInfo . StorageClass = globalMinioDefaultStorageClass
}
2021-04-19 13:30:42 -04:00
2020-11-12 15:12:09 -05:00
// set restore status for transitioned object
2021-04-19 13:30:42 -04:00
restoreHdr , ok := fi . Metadata [ xhttp . AmzRestore ]
if ok {
if restoreStatus , err := parseRestoreObjStatus ( restoreHdr ) ; err == nil {
objInfo . RestoreOngoing = restoreStatus . Ongoing ( )
objInfo . RestoreExpires , _ = restoreStatus . Expiry ( )
}
2020-11-12 15:12:09 -05:00
}
2022-08-29 19:57:16 -04:00
objInfo . Checksum = fi . Checksum
2024-07-29 04:02:16 -04:00
objInfo . decryptPartsChecksums ( nil )
2023-04-17 15:16:37 -04:00
objInfo . Inlined = fi . InlineData ( )
2020-06-12 23:04:01 -04:00
// Success.
return objInfo
}
2021-08-23 16:14:55 -04:00
// TransitionInfoEquals returns true if transition related information are equal, false otherwise.
func ( fi FileInfo ) TransitionInfoEquals ( ofi FileInfo ) bool {
switch {
case fi . TransitionStatus != ofi . TransitionStatus ,
fi . TransitionTier != ofi . TransitionTier ,
fi . TransitionedObjName != ofi . TransitionedObjName ,
fi . TransitionVersionID != ofi . TransitionVersionID :
return false
}
return true
}
// MetadataEquals returns true if FileInfos Metadata maps are equal, false otherwise.
func ( fi FileInfo ) MetadataEquals ( ofi FileInfo ) bool {
if len ( fi . Metadata ) != len ( ofi . Metadata ) {
return false
}
for k , v := range fi . Metadata {
if ov , ok := ofi . Metadata [ k ] ; ! ok || ov != v {
return false
}
}
return true
}
// ReplicationInfoEquals returns true if server-side replication related fields are equal, false otherwise.
func ( fi FileInfo ) ReplicationInfoEquals ( ofi FileInfo ) bool {
switch {
case fi . MarkDeleted != ofi . MarkDeleted ,
2021-09-18 16:31:35 -04:00
! fi . ReplicationState . Equal ( ofi . ReplicationState ) :
2021-08-23 16:14:55 -04:00
return false
}
return true
}
2020-06-12 23:04:01 -04:00
// objectPartIndex - returns the index of matching object part number.
2024-09-06 05:42:21 -04:00
// Returns -1 if the part cannot be found.
2020-06-12 23:04:01 -04:00
func objectPartIndex ( parts [ ] ObjectPartInfo , partNumber int ) int {
for i , part := range parts {
if partNumber == part . Number {
return i
}
}
return - 1
}
2024-09-06 05:42:21 -04:00
// objectPartIndexNums returns the index of the specified part number.
// Returns -1 if the part cannot be found.
func objectPartIndexNums ( parts [ ] int , partNumber int ) int {
for i , part := range parts {
if part != 0 && partNumber == part {
return i
}
}
return - 1
}
2020-06-12 23:04:01 -04:00
// AddObjectPart - add a new object part in order.
2022-08-29 19:57:16 -04:00
func ( fi * FileInfo ) AddObjectPart ( partNumber int , partETag string , partSize , actualSize int64 , modTime time . Time , idx [ ] byte , checksums map [ string ] string ) {
2020-06-12 23:04:01 -04:00
partInfo := ObjectPartInfo {
Number : partNumber ,
ETag : partETag ,
Size : partSize ,
ActualSize : actualSize ,
2022-07-19 21:56:24 -04:00
ModTime : modTime ,
2022-07-11 20:30:56 -04:00
Index : idx ,
2022-08-29 19:57:16 -04:00
Checksums : checksums ,
2020-06-12 23:04:01 -04:00
}
// Update part info if it already exists.
for i , part := range fi . Parts {
if partNumber == part . Number {
fi . Parts [ i ] = partInfo
return
}
}
// Proceed to include new part info.
fi . Parts = append ( fi . Parts , partInfo )
// Parts in FileInfo should be in sorted order by part number.
2023-04-24 16:28:18 -04:00
sort . Slice ( fi . Parts , func ( i , j int ) bool { return fi . Parts [ i ] . Number < fi . Parts [ j ] . Number } )
2020-06-12 23:04:01 -04:00
}
// ObjectToPartOffset - translate offset of an object to offset of its individual part.
func ( fi FileInfo ) ObjectToPartOffset ( ctx context . Context , offset int64 ) ( partIndex int , partOffset int64 , err error ) {
if offset == 0 {
// Special case - if offset is 0, then partIndex and partOffset are always 0.
return 0 , 0 , nil
}
partOffset = offset
// Seek until object offset maps to a particular part offset.
for i , part := range fi . Parts {
partIndex = i
// Offset is smaller than size we have reached the proper part offset.
if partOffset < part . Size {
return partIndex , partOffset , nil
}
// Continue to towards the next part.
partOffset -= part . Size
}
2024-04-04 08:04:40 -04:00
internalLogIf ( ctx , InvalidRange { } )
2020-06-12 23:04:01 -04:00
// Offset beyond the size of the object return InvalidRange.
return 0 , 0 , InvalidRange { }
}
2023-06-17 22:18:20 -04:00
func findFileInfoInQuorum ( ctx context . Context , metaArr [ ] FileInfo , modTime time . Time , etag string , quorum int ) ( FileInfo , error ) {
2021-05-24 21:31:56 -04:00
// with less quorum return error.
2022-05-30 13:58:37 -04:00
if quorum < 1 {
2024-04-25 20:31:12 -04:00
return FileInfo { } , InsufficientReadQuorum { Err : errErasureReadQuorum , Type : RQInsufficientOnlineDrives }
2021-05-24 21:31:56 -04:00
}
2020-06-12 23:04:01 -04:00
metaHashes := make ( [ ] string , len ( metaArr ) )
2020-10-28 03:09:15 -04:00
h := sha256 . New ( )
2020-06-12 23:04:01 -04:00
for i , meta := range metaArr {
2023-06-17 22:18:20 -04:00
if ! meta . IsValid ( ) {
continue
}
etagOnly := modTime . Equal ( timeSentinel ) && ( etag != "" && etag == meta . Metadata [ "etag" ] )
mtimeValid := meta . ModTime . Equal ( modTime )
if mtimeValid || etagOnly {
2021-11-21 13:41:30 -05:00
fmt . Fprintf ( h , "%v" , meta . XLV1 )
2020-06-12 23:04:01 -04:00
for _ , part := range meta . Parts {
2021-11-16 12:28:29 -05:00
fmt . Fprintf ( h , "part.%d" , part . Number )
2024-08-12 15:02:21 -04:00
fmt . Fprintf ( h , "part.%d" , part . Size )
2020-06-12 23:04:01 -04:00
}
2024-08-12 15:02:21 -04:00
// Previously we checked if we had quorum on DataDir value.
// We have removed this check to allow reading objects with different DataDir
// values in a few drives (due to a rebalance-stop race bug)
// provided their their etags or ModTimes match.
2023-07-12 00:53:49 -04:00
if ! meta . Deleted && meta . Size != 0 {
fmt . Fprintf ( h , "%v+%v" , meta . Erasure . DataBlocks , meta . Erasure . ParityBlocks )
fmt . Fprintf ( h , "%v" , meta . Erasure . Distribution )
}
2021-08-19 17:55:42 -04:00
2024-03-01 01:49:01 -05:00
if meta . IsRemote ( ) {
// ILM transition fields
fmt . Fprint ( h , meta . TransitionStatus )
fmt . Fprint ( h , meta . TransitionTier )
fmt . Fprint ( h , meta . TransitionedObjName )
fmt . Fprint ( h , meta . TransitionVersionID )
}
// If metadata says encrypted, ask for it in quorum.
if etyp , ok := crypto . IsEncrypted ( meta . Metadata ) ; ok {
fmt . Fprint ( h , etyp )
}
// If compressed, look for compressed FileInfo only
if meta . IsCompressed ( ) {
fmt . Fprint ( h , meta . Metadata [ ReservedMetadataPrefix + "compression" ] )
}
2021-09-18 16:31:35 -04:00
2020-06-12 23:04:01 -04:00
metaHashes [ i ] = hex . EncodeToString ( h . Sum ( nil ) )
2020-10-28 03:09:15 -04:00
h . Reset ( )
2020-06-12 23:04:01 -04:00
}
}
metaHashCountMap := make ( map [ string ] int )
for _ , hash := range metaHashes {
if hash == "" {
continue
}
metaHashCountMap [ hash ] ++
}
maxHash := ""
maxCount := 0
for hash , count := range metaHashCountMap {
if count > maxCount {
maxCount = count
maxHash = hash
}
}
if maxCount < quorum {
2024-04-25 20:31:12 -04:00
return FileInfo { } , InsufficientReadQuorum { Err : errErasureReadQuorum , Type : RQInconsistentMeta }
2020-06-12 23:04:01 -04:00
}
2024-05-17 16:57:37 -04:00
// objProps represents properties that go beyond a single version
type objProps struct {
succModTime time . Time
numVersions int
}
// Find the successor mod time and numVersions in quorum, otherwise leave the
// candidate as found
otherPropsMap := make ( counterMap [ objProps ] )
2023-09-04 11:24:17 -04:00
var candidate FileInfo
var found bool
2020-06-12 23:04:01 -04:00
for i , hash := range metaHashes {
if hash == maxHash {
2021-05-24 21:31:56 -04:00
if metaArr [ i ] . IsValid ( ) {
2023-09-04 11:24:17 -04:00
if ! found {
candidate = metaArr [ i ]
found = true
}
2024-05-17 16:57:37 -04:00
props := objProps {
succModTime : metaArr [ i ] . SuccessorModTime ,
numVersions : metaArr [ i ] . NumVersions ,
}
otherPropsMap [ props ] ++
2021-05-24 21:31:56 -04:00
}
2020-06-12 23:04:01 -04:00
}
}
2023-09-04 11:24:17 -04:00
if found {
2024-05-17 16:57:37 -04:00
// Update candidate FileInfo with succModTime and numVersions in quorum when available
if props , ok := otherPropsMap . GetValueWithQuorum ( quorum ) ; ok {
candidate . SuccessorModTime = props . succModTime
candidate . IsLatest = props . succModTime . IsZero ( )
candidate . NumVersions = props . numVersions
2023-09-04 11:24:17 -04:00
}
return candidate , nil
}
2024-04-25 20:31:12 -04:00
return FileInfo { } , InsufficientReadQuorum { Err : errErasureReadQuorum , Type : RQInconsistentMeta }
2020-06-12 23:04:01 -04:00
}
// pickValidFileInfo - picks one valid FileInfo content and returns from a
// slice of FileInfo.
2023-06-17 22:18:20 -04:00
func pickValidFileInfo ( ctx context . Context , metaArr [ ] FileInfo , modTime time . Time , etag string , quorum int ) ( FileInfo , error ) {
return findFileInfoInQuorum ( ctx , metaArr , modTime , etag , quorum )
2020-06-12 23:04:01 -04:00
}
2024-08-12 04:38:15 -04:00
func writeAllMetadataWithRevert ( ctx context . Context , disks [ ] StorageAPI , origbucket , bucket , prefix string , files [ ] FileInfo , quorum int , revert bool ) ( [ ] StorageAPI , error ) {
2020-06-12 23:04:01 -04:00
g := errgroup . WithNErrs ( len ( disks ) )
// Start writing `xl.meta` to all disks in parallel.
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
// Pick one FileInfo for a disk at index.
2021-04-03 12:03:42 -04:00
fi := files [ index ]
fi . Erasure . Index = index + 1
if fi . IsValid ( ) {
2024-01-30 15:43:25 -05:00
return disks [ index ] . WriteMetadata ( ctx , origbucket , bucket , prefix , fi )
2021-04-03 12:03:42 -04:00
}
2024-04-28 13:53:50 -04:00
return errFileCorrupt
2020-06-12 23:04:01 -04:00
} , index )
}
// Wait for all the routines.
mErrs := g . Wait ( )
err := reduceWriteQuorumErrs ( ctx , mErrs , objectOpIgnoredErrs , quorum )
2024-08-12 04:38:15 -04:00
if err != nil && revert {
ng := errgroup . WithNErrs ( len ( disks ) )
for index := range disks {
if mErrs [ index ] != nil {
continue
}
index := index
ng . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
return disks [ index ] . Delete ( ctx , bucket , pathJoin ( prefix , xlStorageFormatFile ) , DeleteOptions {
Recursive : true ,
} )
} , index )
}
ng . Wait ( )
}
2020-06-12 23:04:01 -04:00
return evalDisks ( disks , mErrs ) , err
}
2024-08-12 04:38:15 -04:00
func writeAllMetadata ( ctx context . Context , disks [ ] StorageAPI , origbucket , bucket , prefix string , files [ ] FileInfo , quorum int ) ( [ ] StorageAPI , error ) {
return writeAllMetadataWithRevert ( ctx , disks , origbucket , bucket , prefix , files , quorum , true )
}
// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
func writeUniqueFileInfo ( ctx context . Context , disks [ ] StorageAPI , origbucket , bucket , prefix string , files [ ] FileInfo , quorum int ) ( [ ] StorageAPI , error ) {
return writeAllMetadataWithRevert ( ctx , disks , origbucket , bucket , prefix , files , quorum , false )
}
2023-04-14 19:23:28 -04:00
func commonParity ( parities [ ] int , defaultParityCount int ) int {
N := len ( parities )
2022-10-12 19:42:45 -04:00
occMap := make ( map [ int ] int )
for _ , p := range parities {
occMap [ p ] ++
}
2023-05-26 12:57:44 -04:00
var maxOcc , cparity int
2022-10-12 19:42:45 -04:00
for parity , occ := range occMap {
if parity == - 1 {
// Ignore non defined parity
continue
}
2023-04-14 19:23:28 -04:00
readQuorum := N - parity
if defaultParityCount > 0 && parity == 0 {
// In this case, parity == 0 implies that this object version is a
// delete marker
readQuorum = N / 2 + 1
}
if occ < readQuorum {
// Ignore this parity since we don't have enough shards for read quorum
continue
}
if occ > maxOcc {
2022-10-12 19:42:45 -04:00
maxOcc = occ
2023-05-26 12:57:44 -04:00
cparity = parity
2022-10-12 19:42:45 -04:00
}
}
if maxOcc == 0 {
// Did not found anything useful
return - 1
}
2023-05-26 12:57:44 -04:00
return cparity
2022-10-12 19:42:45 -04:00
}
func listObjectParities ( partsMetadata [ ] FileInfo , errs [ ] error ) ( parities [ ] int ) {
2024-07-25 17:02:50 -04:00
totalShards := len ( partsMetadata )
2022-10-12 19:42:45 -04:00
parities = make ( [ ] int , len ( partsMetadata ) )
for index , metadata := range partsMetadata {
if errs [ index ] != nil {
parities [ index ] = - 1
continue
}
2023-01-12 16:58:16 -05:00
if ! metadata . IsValid ( ) {
parities [ index ] = - 1
continue
}
2024-07-25 17:02:50 -04:00
//nolint:gocritic
2023-07-12 00:53:49 -04:00
// Delete marker or zero byte objects take highest parity.
if metadata . Deleted || metadata . Size == 0 {
2024-07-25 17:02:50 -04:00
parities [ index ] = totalShards / 2
} else if metadata . TransitionStatus == lifecycle . TransitionComplete {
// For tiered objects, read quorum is N/2+1 to ensure simple majority on xl.meta. It is not equal to EcM because the data integrity is entrusted with the warm tier.
parities [ index ] = totalShards - ( totalShards / 2 + 1 )
2023-05-26 12:57:44 -04:00
} else {
2023-01-12 16:58:16 -05:00
parities [ index ] = metadata . Erasure . ParityBlocks
}
2022-10-12 19:42:45 -04:00
}
return
}
2020-06-12 23:04:01 -04:00
// Returns per object readQuorum and writeQuorum
// readQuorum is the min required disks to read data.
// writeQuorum is the min required disks to write data.
2021-01-16 15:08:02 -05:00
func objectQuorumFromMeta ( ctx context . Context , partsMetaData [ ] FileInfo , errs [ ] error , defaultParityCount int ) ( objectReadQuorum , objectWriteQuorum int , err error ) {
2024-01-18 02:03:17 -05:00
// There should be at least half correct entries, if not return failure
2022-10-12 19:42:45 -04:00
expectedRQuorum := len ( partsMetaData ) / 2
if defaultParityCount == 0 {
// if parity count is '0', we expected all entries to be present.
expectedRQuorum = len ( partsMetaData )
2020-06-12 23:04:01 -04:00
}
2022-10-12 19:42:45 -04:00
reducedErr := reduceReadQuorumErrs ( ctx , errs , objectOpIgnoredErrs , expectedRQuorum )
if reducedErr != nil {
return - 1 , - 1 , reducedErr
2022-02-04 05:47:36 -05:00
}
2022-10-12 19:42:45 -04:00
// special case when parity is '0'
if defaultParityCount == 0 {
return len ( partsMetaData ) , len ( partsMetaData ) , nil
2020-08-03 15:15:08 -04:00
}
2022-10-12 19:42:45 -04:00
parities := listObjectParities ( partsMetaData , errs )
2023-04-14 19:23:28 -04:00
parityBlocks := commonParity ( parities , defaultParityCount )
2022-10-12 19:42:45 -04:00
if parityBlocks < 0 {
2024-04-25 20:31:12 -04:00
return - 1 , - 1 , InsufficientReadQuorum { Err : errErasureReadQuorum , Type : RQInsufficientOnlineDrives }
2022-02-04 05:47:36 -05:00
}
2022-10-12 19:42:45 -04:00
dataBlocks := len ( partsMetaData ) - parityBlocks
2020-08-03 15:15:08 -04:00
writeQuorum := dataBlocks
if dataBlocks == parityBlocks {
2021-01-16 15:08:02 -05:00
writeQuorum ++
2020-08-03 15:15:08 -04:00
}
2020-06-12 23:04:01 -04:00
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
// from latestFileInfo to get the quorum
2020-08-03 15:15:08 -04:00
return dataBlocks , writeQuorum , nil
2020-06-12 23:04:01 -04:00
}
2021-06-30 22:32:07 -04:00
const (
tierFVID = "tier-free-versionID"
tierFVMarker = "tier-free-marker"
2024-03-02 00:11:03 -05:00
tierSkipFVID = "tier-skip-fvid"
2021-06-30 22:32:07 -04:00
)
// SetTierFreeVersionID sets free-version's versionID. This method is used by
// object layer to pass down a versionID to set for a free-version that may be
// created.
func ( fi * FileInfo ) SetTierFreeVersionID ( versionID string ) {
if fi . Metadata == nil {
fi . Metadata = make ( map [ string ] string )
}
fi . Metadata [ ReservedMetadataPrefixLower + tierFVID ] = versionID
}
// TierFreeVersionID returns the free-version's version id.
func ( fi * FileInfo ) TierFreeVersionID ( ) string {
return fi . Metadata [ ReservedMetadataPrefixLower + tierFVID ]
}
// SetTierFreeVersion sets fi as a free-version. This method is used by
// lower layers to indicate a free-version.
func ( fi * FileInfo ) SetTierFreeVersion ( ) {
if fi . Metadata == nil {
fi . Metadata = make ( map [ string ] string )
}
fi . Metadata [ ReservedMetadataPrefixLower + tierFVMarker ] = ""
}
2024-03-02 00:11:03 -05:00
// SetSkipTierFreeVersion indicates to skip adding a tier free version id.
// Note: Used only when expiring tiered objects and the remote content has
// already been scheduled for deletion
func ( fi * FileInfo ) SetSkipTierFreeVersion ( ) {
if fi . Metadata == nil {
fi . Metadata = make ( map [ string ] string )
}
fi . Metadata [ ReservedMetadataPrefixLower + tierSkipFVID ] = ""
}
// SkipTierFreeVersion returns true if set, false otherwise.
// See SetSkipTierVersion for its purpose.
func ( fi * FileInfo ) SkipTierFreeVersion ( ) bool {
_ , ok := fi . Metadata [ ReservedMetadataPrefixLower + tierSkipFVID ]
return ok
}
2021-06-30 22:32:07 -04:00
// TierFreeVersion returns true if version is a free-version.
func ( fi * FileInfo ) TierFreeVersion ( ) bool {
_ , ok := fi . Metadata [ ReservedMetadataPrefixLower + tierFVMarker ]
return ok
}
2021-09-18 16:31:35 -04:00
2022-02-17 18:05:19 -05:00
// IsRestoreObjReq returns true if fi corresponds to a RestoreObject request.
func ( fi * FileInfo ) IsRestoreObjReq ( ) bool {
if restoreHdr , ok := fi . Metadata [ xhttp . AmzRestore ] ; ok {
if restoreStatus , err := parseRestoreObjStatus ( restoreHdr ) ; err == nil {
if ! restoreStatus . Ongoing ( ) {
return true
}
}
}
return false
}
2021-09-18 16:31:35 -04:00
// VersionPurgeStatus returns overall version purge status for this object version across targets
func ( fi * FileInfo ) VersionPurgeStatus ( ) VersionPurgeStatusType {
return fi . ReplicationState . CompositeVersionPurgeStatus ( )
}
2022-12-29 01:48:33 -05:00
// ReplicationStatus returns overall version replication status for this object version across targets
func ( fi * FileInfo ) ReplicationStatus ( ) replication . StatusType {
return fi . ReplicationState . CompositeReplicationStatus ( )
}
2021-09-18 16:31:35 -04:00
// DeleteMarkerReplicationStatus returns overall replication status for this delete marker version across targets
func ( fi * FileInfo ) DeleteMarkerReplicationStatus ( ) replication . StatusType {
if fi . Deleted {
return fi . ReplicationState . CompositeReplicationStatus ( )
}
return replication . StatusType ( "" )
}
// GetInternalReplicationState is a wrapper method to fetch internal replication state from the map m
func GetInternalReplicationState ( m map [ string ] [ ] byte ) ReplicationState {
m1 := make ( map [ string ] string , len ( m ) )
for k , v := range m {
m1 [ k ] = string ( v )
}
return getInternalReplicationState ( m1 )
}
// getInternalReplicationState fetches internal replication state from the map m
func getInternalReplicationState ( m map [ string ] string ) ReplicationState {
2021-11-18 15:15:22 -05:00
d := ReplicationState { }
2021-09-18 16:31:35 -04:00
for k , v := range m {
switch {
case equals ( k , ReservedMetadataPrefixLower + ReplicationTimestamp ) :
2023-06-14 10:49:13 -04:00
d . ReplicaTimeStamp , _ = amztime . ParseReplicationTS ( v )
2021-09-18 16:31:35 -04:00
case equals ( k , ReservedMetadataPrefixLower + ReplicaTimestamp ) :
2023-06-14 10:49:13 -04:00
d . ReplicaTimeStamp , _ = amztime . ParseReplicationTS ( v )
2021-09-18 16:31:35 -04:00
case equals ( k , ReservedMetadataPrefixLower + ReplicaStatus ) :
d . ReplicaStatus = replication . StatusType ( v )
case equals ( k , ReservedMetadataPrefixLower + ReplicationStatus ) :
d . ReplicationStatusInternal = v
d . Targets = replicationStatusesMap ( v )
case equals ( k , VersionPurgeStatusKey ) :
d . VersionPurgeStatusInternal = v
d . PurgeTargets = versionPurgeStatusesMap ( v )
case strings . HasPrefix ( k , ReservedMetadataPrefixLower + ReplicationReset ) :
arn := strings . TrimPrefix ( k , fmt . Sprintf ( "%s-" , ReservedMetadataPrefixLower + ReplicationReset ) )
2021-11-18 15:15:22 -05:00
if d . ResetStatusesMap == nil {
d . ResetStatusesMap = make ( map [ string ] string , 1 )
}
2021-09-18 16:31:35 -04:00
d . ResetStatusesMap [ arn ] = v
}
}
return d
}