minio/cmd/xl-storage-format-v2.go
Klaus Post 0012ca8ca5
Fix inconsistent metadata after healing (#14125)
When calculating signatures empty part ETags were not discarded, leading 
to a different signature compared to freshly created ones.

This would mean that after a heal signature of the healed metadata would be 
different. Fixing the calculation of signature will make these consistent.

Furthermore when inconsistent entries, with zero version ID, with the same 
mod times but different signatures, the one with the lowest signature would 
be picked for quorum check. Since this is 50/50, we fall back to a simple 
quorum count on all signatures.

Each of these fixes by themselves will lead to quorum. Tests were added 
for regressions and expected outcomes.
2022-01-19 10:48:00 -08:00

1981 lines
57 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"time"
"github.com/cespare/xxhash/v2"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/tinylib/msgp/msgp"
)
var (
// XL header specifies the format
xlHeader = [4]byte{'X', 'L', '2', ' '}
// Current version being written.
xlVersionCurrent [4]byte
)
//go:generate msgp -file=$GOFILE -unexported
//go:generate stringer -type VersionType,ErasureAlgo -output=xl-storage-format-v2_string.go $GOFILE
const (
// Breaking changes.
// Newer versions cannot be read by older software.
// This will prevent downgrades to incompatible versions.
xlVersionMajor = 1
// Non breaking changes.
// Bumping this is informational, but should be done
// if any change is made to the data stored, bumping this
// will allow to detect the exact version later.
xlVersionMinor = 3
)
func init() {
binary.LittleEndian.PutUint16(xlVersionCurrent[0:2], xlVersionMajor)
binary.LittleEndian.PutUint16(xlVersionCurrent[2:4], xlVersionMinor)
}
// The []journal contains all the different versions of the object.
//
// This array can have 3 kinds of objects:
//
// ``object``: If the object is uploaded the usual way: putobject, multipart-put, copyobject
//
// ``delete``: This is the delete-marker
//
// ``legacyObject``: This is the legacy object in xlV1 format, preserved until its overwritten
//
// The most recently updated element in the array is considered the latest version.
// In addition to these we have a special kind called free-version. This is represented
// using a delete-marker and MetaSys entries. It's used to track tiered content of a
// deleted/overwritten version. This version is visible _only_to the scanner routine, for subsequent deletion.
// This kind of tracking is necessary since a version's tiered content is deleted asynchronously.
// Backend directory tree structure:
// disk1/
// └── bucket
// └── object
// ├── a192c1d5-9bd5-41fd-9a90-ab10e165398d
// │ └── part.1
// ├── c06e0436-f813-447e-ae5e-f2564df9dfd4
// │ └── part.1
// ├── df433928-2dcf-47b1-a786-43efa0f6b424
// │ └── part.1
// ├── legacy
// │ └── part.1
// └── xl.meta
// VersionType defines the type of journal type of the current entry.
type VersionType uint8
// List of different types of journal type
const (
invalidVersionType VersionType = 0
ObjectType VersionType = 1
DeleteType VersionType = 2
LegacyType VersionType = 3
lastVersionType VersionType = 4
)
func (e VersionType) valid() bool {
return e > invalidVersionType && e < lastVersionType
}
// ErasureAlgo defines common type of different erasure algorithms
type ErasureAlgo uint8
// List of currently supported erasure coding algorithms
const (
invalidErasureAlgo ErasureAlgo = 0
ReedSolomon ErasureAlgo = 1
lastErasureAlgo ErasureAlgo = 2
)
func (e ErasureAlgo) valid() bool {
return e > invalidErasureAlgo && e < lastErasureAlgo
}
// ChecksumAlgo defines common type of different checksum algorithms
type ChecksumAlgo uint8
// List of currently supported checksum algorithms
const (
invalidChecksumAlgo ChecksumAlgo = 0
HighwayHash ChecksumAlgo = 1
lastChecksumAlgo ChecksumAlgo = 2
)
func (e ChecksumAlgo) valid() bool {
return e > invalidChecksumAlgo && e < lastChecksumAlgo
}
// xlMetaV2DeleteMarker defines the data struct for the delete marker journal type
type xlMetaV2DeleteMarker struct {
VersionID [16]byte `json:"ID" msg:"ID"` // Version ID for delete marker
ModTime int64 `json:"MTime" msg:"MTime"` // Object delete marker modified time
MetaSys map[string][]byte `json:"MetaSys,omitempty" msg:"MetaSys,omitempty"` // Delete marker internal metadata
}
// xlMetaV2Object defines the data struct for object journal type
type xlMetaV2Object struct {
VersionID [16]byte `json:"ID" msg:"ID"` // Version ID
DataDir [16]byte `json:"DDir" msg:"DDir"` // Data dir ID
ErasureAlgorithm ErasureAlgo `json:"EcAlgo" msg:"EcAlgo"` // Erasure coding algorithm
ErasureM int `json:"EcM" msg:"EcM"` // Erasure data blocks
ErasureN int `json:"EcN" msg:"EcN"` // Erasure parity blocks
ErasureBlockSize int64 `json:"EcBSize" msg:"EcBSize"` // Erasure block size
ErasureIndex int `json:"EcIndex" msg:"EcIndex"` // Erasure disk index
ErasureDist []uint8 `json:"EcDist" msg:"EcDist"` // Erasure distribution
BitrotChecksumAlgo ChecksumAlgo `json:"CSumAlgo" msg:"CSumAlgo"` // Bitrot checksum algo
PartNumbers []int `json:"PartNums" msg:"PartNums"` // Part Numbers
PartETags []string `json:"PartETags" msg:"PartETags,allownil"` // Part ETags
PartSizes []int64 `json:"PartSizes" msg:"PartSizes"` // Part Sizes
PartActualSizes []int64 `json:"PartASizes,omitempty" msg:"PartASizes,allownil"` // Part ActualSizes (compression)
Size int64 `json:"Size" msg:"Size"` // Object version size
ModTime int64 `json:"MTime" msg:"MTime"` // Object version modified time
MetaSys map[string][]byte `json:"MetaSys,omitempty" msg:"MetaSys,allownil"` // Object version internal metadata
MetaUser map[string]string `json:"MetaUsr,omitempty" msg:"MetaUsr,allownil"` // Object version metadata set by user
}
// xlMetaV2Version describes the journal entry, Type defines
// the current journal entry type other types might be nil based
// on what Type field carries, it is imperative for the caller
// to verify which journal type first before accessing rest of the fields.
type xlMetaV2Version struct {
Type VersionType `json:"Type" msg:"Type"`
ObjectV1 *xlMetaV1Object `json:"V1Obj,omitempty" msg:"V1Obj,omitempty"`
ObjectV2 *xlMetaV2Object `json:"V2Obj,omitempty" msg:"V2Obj,omitempty"`
DeleteMarker *xlMetaV2DeleteMarker `json:"DelObj,omitempty" msg:"DelObj,omitempty"`
}
// xlFlags contains flags on the object.
// This can be extended up to 64 bits without breaking compatibility.
type xlFlags uint8
const (
xlFlagFreeVersion xlFlags = 1 << iota
xlFlagUsesDataDir
xlFlagInlineData
)
func (x xlFlags) String() string {
var s strings.Builder
if x&xlFlagFreeVersion != 0 {
s.WriteString("FreeVersion")
}
if x&xlFlagUsesDataDir != 0 {
if s.Len() > 0 {
s.WriteByte(',')
}
s.WriteString("UsesDD")
}
if x&xlFlagInlineData != 0 {
if s.Len() > 0 {
s.WriteByte(',')
}
s.WriteString("Inline")
}
return s.String()
}
// checkXL2V1 will check if the metadata has correct header and is a known major version.
// The remaining payload and versions are returned.
func checkXL2V1(buf []byte) (payload []byte, major, minor uint16, err error) {
if len(buf) <= 8 {
return payload, 0, 0, fmt.Errorf("xlMeta: no data")
}
if !bytes.Equal(buf[:4], xlHeader[:]) {
return payload, 0, 0, fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4])
}
if bytes.Equal(buf[4:8], []byte("1 ")) {
// Set as 1,0.
major, minor = 1, 0
} else {
major, minor = binary.LittleEndian.Uint16(buf[4:6]), binary.LittleEndian.Uint16(buf[6:8])
}
if major > xlVersionMajor {
return buf[8:], major, minor, fmt.Errorf("xlMeta: unknown major version %d found", major)
}
return buf[8:], major, minor, nil
}
func isXL2V1Format(buf []byte) bool {
_, _, _, err := checkXL2V1(buf)
return err == nil
}
//msgp:tuple xlMetaV2VersionHeader
type xlMetaV2VersionHeader struct {
VersionID [16]byte
ModTime int64
Signature [4]byte
Type VersionType
Flags xlFlags
}
func (x xlMetaV2VersionHeader) String() string {
return fmt.Sprintf("Type: %s, VersionID: %s, Signature: %s, ModTime: %s, Flags: %s",
x.Type.String(),
hex.EncodeToString(x.VersionID[:]),
hex.EncodeToString(x.Signature[:]),
time.Unix(0, x.ModTime),
x.Flags.String(),
)
}
// matchesNotStrict returns whether x and o have both have non-zero version,
// their versions match and their type match.
// If they have zero version, modtime must match.
func (x xlMetaV2VersionHeader) matchesNotStrict(o xlMetaV2VersionHeader) bool {
if x.VersionID == [16]byte{} {
return x.VersionID == o.VersionID &&
x.Type == o.Type && o.ModTime == x.ModTime
}
return x.VersionID == o.VersionID &&
x.Type == o.Type
}
// sortsBefore can be used as a tiebreaker for stable sorting/selecting.
// Returns false on ties.
func (x xlMetaV2VersionHeader) sortsBefore(o xlMetaV2VersionHeader) bool {
if x == o {
return false
}
// Prefer newest modtime.
if x.ModTime != o.ModTime {
return x.ModTime > o.ModTime
}
// The following doesn't make too much sense, but we want sort to be consistent nonetheless.
// Prefer lower types
if x.Type != o.Type {
return x.Type < o.Type
}
// Consistent sort on signature
if v := bytes.Compare(x.Signature[:], o.Signature[:]); v != 0 {
return v > 0
}
// On ID mismatch
if v := bytes.Compare(x.VersionID[:], o.VersionID[:]); v != 0 {
return v > 0
}
// Flags
if x.Flags != o.Flags {
return x.Flags > o.Flags
}
return false
}
// Valid xl meta xlMetaV2Version is valid
func (j xlMetaV2Version) Valid() bool {
if !j.Type.valid() {
return false
}
switch j.Type {
case LegacyType:
return j.ObjectV1 != nil &&
j.ObjectV1.valid()
case ObjectType:
return j.ObjectV2 != nil &&
j.ObjectV2.ErasureAlgorithm.valid() &&
j.ObjectV2.BitrotChecksumAlgo.valid() &&
isXLMetaErasureInfoValid(j.ObjectV2.ErasureM, j.ObjectV2.ErasureN) &&
j.ObjectV2.ModTime > 0
case DeleteType:
return j.DeleteMarker != nil &&
j.DeleteMarker.ModTime > 0
}
return false
}
// header will return a shallow header of the version.
func (j *xlMetaV2Version) header() xlMetaV2VersionHeader {
var flags xlFlags
if j.FreeVersion() {
flags |= xlFlagFreeVersion
}
if j.Type == ObjectType && j.ObjectV2.UsesDataDir() {
flags |= xlFlagUsesDataDir
}
if j.Type == ObjectType && j.ObjectV2.InlineData() {
flags |= xlFlagInlineData
}
return xlMetaV2VersionHeader{
VersionID: j.getVersionID(),
ModTime: j.getModTime().UnixNano(),
Signature: j.getSignature(),
Type: j.Type,
Flags: flags,
}
}
// FreeVersion returns true if x represents a free-version, false otherwise.
func (x xlMetaV2VersionHeader) FreeVersion() bool {
return x.Flags&xlFlagFreeVersion != 0
}
// UsesDataDir returns true if this object version uses its data directory for
// its contents and false otherwise.
func (x xlMetaV2VersionHeader) UsesDataDir() bool {
return x.Flags&xlFlagUsesDataDir != 0
}
// InlineData returns whether inline data has been set.
// Note that false does not mean there is no inline data,
// only that it is unlikely.
func (x xlMetaV2VersionHeader) InlineData() bool {
return x.Flags&xlFlagInlineData != 0
}
// signatureErr is a signature returned when an error occurs.
var signatureErr = [4]byte{'e', 'r', 'r', 0}
// getSignature will return a signature that is expected to be the same across all disks.
func (j xlMetaV2Version) getSignature() [4]byte {
switch j.Type {
case ObjectType:
return j.ObjectV2.Signature()
case DeleteType:
return j.DeleteMarker.Signature()
case LegacyType:
return j.ObjectV1.Signature()
}
return signatureErr
}
// getModTime will return the ModTime of the underlying version.
func (j xlMetaV2Version) getModTime() time.Time {
switch j.Type {
case ObjectType:
return time.Unix(0, j.ObjectV2.ModTime)
case DeleteType:
return time.Unix(0, j.DeleteMarker.ModTime)
case LegacyType:
return j.ObjectV1.Stat.ModTime
}
return time.Time{}
}
// getVersionID will return the versionID of the underlying version.
func (j xlMetaV2Version) getVersionID() [16]byte {
switch j.Type {
case ObjectType:
return j.ObjectV2.VersionID
case DeleteType:
return j.DeleteMarker.VersionID
case LegacyType:
return [16]byte{}
}
return [16]byte{}
}
// ToFileInfo returns FileInfo of the underlying type.
func (j *xlMetaV2Version) ToFileInfo(volume, path string) (FileInfo, error) {
switch j.Type {
case ObjectType:
return j.ObjectV2.ToFileInfo(volume, path)
case DeleteType:
return j.DeleteMarker.ToFileInfo(volume, path)
case LegacyType:
return j.ObjectV1.ToFileInfo(volume, path)
}
return FileInfo{}, errFileNotFound
}
const (
xlHeaderVersion = 2
xlMetaVersion = 1
)
func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) {
versionID := ""
var uv uuid.UUID
// check if the version is not "null"
if j.VersionID != uv {
versionID = uuid.UUID(j.VersionID).String()
}
fi := FileInfo{
Volume: volume,
Name: path,
ModTime: time.Unix(0, j.ModTime).UTC(),
VersionID: versionID,
Deleted: true,
}
fi.ReplicationState = GetInternalReplicationState(j.MetaSys)
if j.FreeVersion() {
fi.SetTierFreeVersion()
fi.TransitionTier = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionTier])
fi.TransitionedObjName = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName])
fi.TransitionVersionID = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID])
}
return fi, nil
}
// Signature will return a signature that is expected to be the same across all disks.
func (j *xlMetaV2DeleteMarker) Signature() [4]byte {
// Shallow copy
c := *j
// Marshal metadata
crc := hashDeterministicBytes(c.MetaSys)
c.MetaSys = nil
if bts, err := c.MarshalMsg(metaDataPoolGet()); err == nil {
crc ^= xxhash.Sum64(bts)
metaDataPoolPut(bts)
}
// Combine upper and lower part
var tmp [4]byte
binary.LittleEndian.PutUint32(tmp[:], uint32(crc^(crc>>32)))
return tmp
}
// UsesDataDir returns true if this object version uses its data directory for
// its contents and false otherwise.
func (j xlMetaV2Object) UsesDataDir() bool {
// Skip if this version is not transitioned, i.e it uses its data directory.
if !bytes.Equal(j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus], []byte(lifecycle.TransitionComplete)) {
return true
}
// Check if this transitioned object has been restored on disk.
return isRestoredObjectOnDisk(j.MetaUser)
}
// InlineData returns whether inline data has been set.
// Note that false does not mean there is no inline data,
// only that it is unlikely.
func (j xlMetaV2Object) InlineData() bool {
_, ok := j.MetaSys[ReservedMetadataPrefixLower+"inline-data"]
return ok
}
func (j *xlMetaV2Object) SetTransition(fi FileInfo) {
j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus)
j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName)
j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID)
j.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
}
func (j *xlMetaV2Object) RemoveRestoreHdrs() {
delete(j.MetaUser, xhttp.AmzRestore)
delete(j.MetaUser, xhttp.AmzRestoreExpiryDays)
delete(j.MetaUser, xhttp.AmzRestoreRequestDate)
}
// Signature will return a signature that is expected to be the same across all disks.
func (j *xlMetaV2Object) Signature() [4]byte {
// Shallow copy
c := *j
// Zero fields that will vary across disks
c.ErasureIndex = 0
// Nil 0 size allownil, so we don't differentiate between nil and 0 len.
allEmpty := true
for _, tag := range c.PartETags {
if len(tag) != 0 {
allEmpty = false
break
}
}
if allEmpty {
c.PartETags = nil
}
if len(c.PartActualSizes) == 0 {
c.PartActualSizes = nil
}
// Get a 64 bit CRC
crc := hashDeterministicString(c.MetaUser)
crc ^= hashDeterministicBytes(c.MetaSys)
// Nil fields.
c.MetaSys = nil
c.MetaUser = nil
if bts, err := c.MarshalMsg(metaDataPoolGet()); err == nil {
crc ^= xxhash.Sum64(bts)
metaDataPoolPut(bts)
}
// Combine upper and lower part
var tmp [4]byte
binary.LittleEndian.PutUint32(tmp[:], uint32(crc^(crc>>32)))
return tmp
}
func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
versionID := ""
var uv uuid.UUID
// check if the version is not "null"
if j.VersionID != uv {
versionID = uuid.UUID(j.VersionID).String()
}
fi := FileInfo{
Volume: volume,
Name: path,
Size: j.Size,
ModTime: time.Unix(0, j.ModTime).UTC(),
VersionID: versionID,
}
fi.Parts = make([]ObjectPartInfo, len(j.PartNumbers))
for i := range fi.Parts {
fi.Parts[i].Number = j.PartNumbers[i]
fi.Parts[i].Size = j.PartSizes[i]
if len(j.PartETags) > 0 {
fi.Parts[i].ETag = j.PartETags[i]
}
fi.Parts[i].ActualSize = j.PartActualSizes[i]
}
fi.Erasure.Checksums = make([]ChecksumInfo, len(j.PartSizes))
for i := range fi.Parts {
fi.Erasure.Checksums[i].PartNumber = fi.Parts[i].Number
switch j.BitrotChecksumAlgo {
case HighwayHash:
fi.Erasure.Checksums[i].Algorithm = HighwayHash256S
fi.Erasure.Checksums[i].Hash = []byte{}
default:
return FileInfo{}, fmt.Errorf("unknown BitrotChecksumAlgo: %v", j.BitrotChecksumAlgo)
}
}
fi.Metadata = make(map[string]string, len(j.MetaUser)+len(j.MetaSys))
for k, v := range j.MetaUser {
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
continue
}
fi.Metadata[k] = v
}
for k, v := range j.MetaSys {
switch {
case strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower), equals(k, VersionPurgeStatusKey):
fi.Metadata[k] = string(v)
}
}
fi.ReplicationState = getInternalReplicationState(fi.Metadata)
replStatus := fi.ReplicationState.CompositeReplicationStatus()
if replStatus != "" {
fi.Metadata[xhttp.AmzBucketReplicationStatus] = string(replStatus)
}
fi.Erasure.Algorithm = j.ErasureAlgorithm.String()
fi.Erasure.Index = j.ErasureIndex
fi.Erasure.BlockSize = j.ErasureBlockSize
fi.Erasure.DataBlocks = j.ErasureM
fi.Erasure.ParityBlocks = j.ErasureN
fi.Erasure.Distribution = make([]int, len(j.ErasureDist))
for i := range j.ErasureDist {
fi.Erasure.Distribution[i] = int(j.ErasureDist[i])
}
fi.DataDir = uuid.UUID(j.DataDir).String()
if st, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok {
fi.TransitionStatus = string(st)
}
if o, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName]; ok {
fi.TransitionedObjName = string(o)
}
if rv, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID]; ok {
fi.TransitionVersionID = string(rv)
}
if sc, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionTier]; ok {
fi.TransitionTier = string(sc)
}
return fi, nil
}
// Read at most this much on initial read.
const metaDataReadDefault = 4 << 10
// Return used metadata byte slices here.
var metaDataPool = sync.Pool{New: func() interface{} { return make([]byte, 0, metaDataReadDefault) }}
// metaDataPoolGet will return a byte slice with capacity at least metaDataReadDefault.
// It will be length 0.
func metaDataPoolGet() []byte {
return metaDataPool.Get().([]byte)[:0]
}
// metaDataPoolPut will put an unused small buffer back into the pool.
func metaDataPoolPut(buf []byte) {
if cap(buf) >= metaDataReadDefault && cap(buf) < metaDataReadDefault*4 {
metaDataPool.Put(buf)
}
}
// readXLMetaNoData will load the metadata, but skip data segments.
// This should only be used when data is never interesting.
// If data is not xlv2, it is returned in full.
func readXLMetaNoData(r io.Reader, size int64) ([]byte, error) {
initial := size
hasFull := true
if initial > metaDataReadDefault {
initial = metaDataReadDefault
hasFull = false
}
buf := metaDataPoolGet()[:initial]
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, fmt.Errorf("readXLMetaNoData.ReadFull: %w", err)
}
readMore := func(n int64) error {
has := int64(len(buf))
if has >= n {
return nil
}
if hasFull || n > size {
return io.ErrUnexpectedEOF
}
extra := n - has
if int64(cap(buf)) >= n {
// Extend since we have enough space.
buf = buf[:n]
} else {
buf = append(buf, make([]byte, extra)...)
}
_, err := io.ReadFull(r, buf[has:])
if err != nil {
if errors.Is(err, io.EOF) {
// Returned if we read nothing.
return fmt.Errorf("readXLMetaNoData.readMore: %w", io.ErrUnexpectedEOF)
}
return fmt.Errorf("readXLMetaNoData.readMore: %w", err)
}
return nil
}
tmp, major, minor, err := checkXL2V1(buf)
if err != nil {
err = readMore(size)
return buf, err
}
switch major {
case 1:
switch minor {
case 0:
err = readMore(size)
return buf, err
case 1, 2, 3:
sz, tmp, err := msgp.ReadBytesHeader(tmp)
if err != nil {
return nil, err
}
want := int64(sz) + int64(len(buf)-len(tmp))
// v1.1 does not have CRC.
if minor < 2 {
if err := readMore(want); err != nil {
return nil, err
}
return buf[:want], nil
}
// CRC is variable length, so we need to truncate exactly that.
wantMax := want + msgp.Uint32Size
if wantMax > size {
wantMax = size
}
if err := readMore(wantMax); err != nil {
return nil, err
}
tmp = buf[want:]
_, after, err := msgp.ReadUint32Bytes(tmp)
if err != nil {
return nil, err
}
want += int64(len(tmp) - len(after))
return buf[:want], err
default:
return nil, errors.New("unknown minor metadata version")
}
default:
return nil, errors.New("unknown major metadata version")
}
}
func decodeXLHeaders(buf []byte) (versions int, headerV, metaV uint8, b []byte, err error) {
hdrVer, buf, err := msgp.ReadUint8Bytes(buf)
if err != nil {
return 0, 0, 0, buf, err
}
metaVer, buf, err := msgp.ReadUint8Bytes(buf)
if err != nil {
return 0, 0, 0, buf, err
}
if hdrVer > xlHeaderVersion {
return 0, 0, 0, buf, fmt.Errorf("decodeXLHeaders: Unknown xl header version %d", metaVer)
}
if metaVer > xlMetaVersion {
return 0, 0, 0, buf, fmt.Errorf("decodeXLHeaders: Unknown xl meta version %d", metaVer)
}
versions, buf, err = msgp.ReadIntBytes(buf)
if err != nil {
return 0, 0, 0, buf, err
}
if versions < 0 {
return 0, 0, 0, buf, fmt.Errorf("decodeXLHeaders: Negative version count %d", versions)
}
return versions, hdrVer, metaVer, buf, nil
}
// decodeVersions will decode a number of versions from a buffer
// and perform a callback for each version in order, newest first.
// Return errDoneForNow to stop processing and return nil.
// Any non-nil error is returned.
func decodeVersions(buf []byte, versions int, fn func(idx int, hdr, meta []byte) error) (err error) {
var tHdr, tMeta []byte // Zero copy bytes
for i := 0; i < versions; i++ {
tHdr, buf, err = msgp.ReadBytesZC(buf)
if err != nil {
return err
}
tMeta, buf, err = msgp.ReadBytesZC(buf)
if err != nil {
return err
}
if err = fn(i, tHdr, tMeta); err != nil {
if err == errDoneForNow {
err = nil
}
return err
}
}
return nil
}
// isIndexedMetaV2 returns non-nil result if metadata is indexed.
// If data doesn't validate nil is also returned.
func isIndexedMetaV2(buf []byte) (meta xlMetaBuf, data xlMetaInlineData) {
buf, major, minor, err := checkXL2V1(buf)
if err != nil {
return nil, nil
}
if major != 1 || minor < 3 {
return nil, nil
}
meta, buf, err = msgp.ReadBytesZC(buf)
if err != nil {
return nil, nil
}
if crc, nbuf, err := msgp.ReadUint32Bytes(buf); err == nil {
// Read metadata CRC
buf = nbuf
if got := uint32(xxhash.Sum64(meta)); got != crc {
return nil, nil
}
} else {
return nil, nil
}
data = buf
if data.validate() != nil {
data.repair()
}
return meta, data
}
type xlMetaV2ShallowVersion struct {
header xlMetaV2VersionHeader
meta []byte
}
//msgp:ignore xlMetaV2 xlMetaV2ShallowVersion
type xlMetaV2 struct {
versions []xlMetaV2ShallowVersion
// data will contain raw data if any.
// data will be one or more versions indexed by versionID.
// To remove all data set to nil.
data xlMetaInlineData
// metadata version.
metaV uint8
}
// LoadOrConvert will load the metadata in the buffer.
// If this is a legacy format, it will automatically be converted to XLV2.
func (x *xlMetaV2) LoadOrConvert(buf []byte) error {
if isXL2V1Format(buf) {
return x.Load(buf)
}
xlMeta := &xlMetaV1Object{}
json := jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(buf, xlMeta); err != nil {
return errFileCorrupt
}
if len(x.versions) > 0 {
x.versions = x.versions[:0]
}
x.data = nil
x.metaV = xlMetaVersion
return x.AddLegacy(xlMeta)
}
// Load all versions of the stored data.
// Note that references to the incoming buffer will be kept.
func (x *xlMetaV2) Load(buf []byte) error {
if meta, data := isIndexedMetaV2(buf); meta != nil {
return x.loadIndexed(meta, data)
}
// Convert older format.
return x.loadLegacy(buf)
}
func (x *xlMetaV2) loadIndexed(buf xlMetaBuf, data xlMetaInlineData) error {
versions, headerV, metaV, buf, err := decodeXLHeaders(buf)
if err != nil {
return err
}
if cap(x.versions) < versions {
x.versions = make([]xlMetaV2ShallowVersion, 0, versions+1)
}
x.versions = x.versions[:versions]
x.data = data
x.metaV = metaV
if err = x.data.validate(); err != nil {
x.data.repair()
logger.Info("xlMetaV2.loadIndexed: data validation failed: %v. %d entries after repair", err, x.data.entries())
}
return decodeVersions(buf, versions, func(i int, hdr, meta []byte) error {
ver := &x.versions[i]
_, err = ver.header.unmarshalV(headerV, hdr)
if err != nil {
return err
}
ver.meta = meta
return nil
})
}
// loadLegacy will load content prior to v1.3
// Note that references to the incoming buffer will be kept.
func (x *xlMetaV2) loadLegacy(buf []byte) error {
buf, major, minor, err := checkXL2V1(buf)
if err != nil {
return fmt.Errorf("xlMetaV2.Load %w", err)
}
var allMeta []byte
switch major {
case 1:
switch minor {
case 0:
allMeta = buf
case 1, 2:
v, buf, err := msgp.ReadBytesZC(buf)
if err != nil {
return fmt.Errorf("xlMetaV2.Load version(%d), bufLen(%d) %w", minor, len(buf), err)
}
if minor >= 2 {
if crc, nbuf, err := msgp.ReadUint32Bytes(buf); err == nil {
// Read metadata CRC (added in v2)
buf = nbuf
if got := uint32(xxhash.Sum64(v)); got != crc {
return fmt.Errorf("xlMetaV2.Load version(%d), CRC mismatch, want 0x%x, got 0x%x", minor, crc, got)
}
} else {
return fmt.Errorf("xlMetaV2.Load version(%d), loading CRC: %w", minor, err)
}
}
allMeta = v
// Add remaining data.
x.data = buf
if err = x.data.validate(); err != nil {
x.data.repair()
logger.Info("xlMetaV2.Load: data validation failed: %v. %d entries after repair", err, x.data.entries())
}
default:
return errors.New("unknown minor metadata version")
}
default:
return errors.New("unknown major metadata version")
}
if allMeta == nil {
return errFileCorrupt
}
// bts will shrink as we decode.
bts := allMeta
var field []byte
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
return msgp.WrapError(err, "loadLegacy.ReadMapHeader")
}
var tmp xlMetaV2Version
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
return msgp.WrapError(err, "loadLegacy.ReadMapKey")
}
switch msgp.UnsafeString(field) {
case "Versions":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return msgp.WrapError(err, "Versions")
}
if cap(x.versions) >= int(zb0002) {
x.versions = (x.versions)[:zb0002]
} else {
x.versions = make([]xlMetaV2ShallowVersion, zb0002, zb0002+1)
}
for za0001 := range x.versions {
start := len(allMeta) - len(bts)
bts, err = tmp.unmarshalV(1, bts)
if err != nil {
return msgp.WrapError(err, "Versions", za0001)
}
end := len(allMeta) - len(bts)
// We reference the marshaled data, so we don't have to re-marshal.
x.versions[za0001] = xlMetaV2ShallowVersion{
header: tmp.header(),
meta: allMeta[start:end],
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
return msgp.WrapError(err, "loadLegacy.Skip")
}
}
}
x.metaV = 1 // Fixed for legacy conversions.
x.sortByModTime()
return nil
}
// latestModtime returns the modtime of the latest version.
func (x *xlMetaV2) latestModtime() time.Time {
if x == nil || len(x.versions) == 0 {
return time.Time{}
}
return time.Unix(0, x.versions[0].header.ModTime)
}
func (x *xlMetaV2) addVersion(ver xlMetaV2Version) error {
modTime := ver.getModTime().UnixNano()
if !ver.Valid() {
return errors.New("attempted to add invalid version")
}
encoded, err := ver.MarshalMsg(nil)
if err != nil {
return err
}
// Add space at the end.
// Will have -1 modtime, so it will be inserted there.
x.versions = append(x.versions, xlMetaV2ShallowVersion{header: xlMetaV2VersionHeader{ModTime: -1}})
// Linear search, we likely have to insert at front.
for i, existing := range x.versions {
if existing.header.ModTime <= modTime {
// Insert at current idx. First move current back.
copy(x.versions[i+1:], x.versions[i:])
x.versions[i] = xlMetaV2ShallowVersion{
header: ver.header(),
meta: encoded,
}
return nil
}
}
return fmt.Errorf("addVersion: Internal error, unable to add version")
}
// AppendTo will marshal the data in z and append it to the provided slice.
func (x *xlMetaV2) AppendTo(dst []byte) ([]byte, error) {
// Header...
sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + len(dst) + 3*msgp.Uint32Size
// Existing + Inline data
sz += len(dst) + len(x.data)
// Versions...
for _, ver := range x.versions {
sz += 32 + len(ver.meta)
}
if cap(dst) < sz {
buf := make([]byte, len(dst), sz)
copy(buf, dst)
dst = buf
}
if err := x.data.validate(); err != nil {
return nil, err
}
dst = append(dst, xlHeader[:]...)
dst = append(dst, xlVersionCurrent[:]...)
// Add "bin 32" type header to always have enough space.
// We will fill out the correct size when we know it.
dst = append(dst, 0xc6, 0, 0, 0, 0)
dataOffset := len(dst)
dst = msgp.AppendUint(dst, xlHeaderVersion)
dst = msgp.AppendUint(dst, xlMetaVersion)
dst = msgp.AppendInt(dst, len(x.versions))
tmp := metaDataPoolGet()
defer metaDataPoolPut(tmp)
for _, ver := range x.versions {
var err error
// Add header
tmp, err = ver.header.MarshalMsg(tmp[:0])
if err != nil {
return nil, err
}
dst = msgp.AppendBytes(dst, tmp)
// Add full meta
dst = msgp.AppendBytes(dst, ver.meta)
}
// Update size...
binary.BigEndian.PutUint32(dst[dataOffset-4:dataOffset], uint32(len(dst)-dataOffset))
// Add CRC of metadata as fixed size (5 bytes)
// Prior to v1.3 this was variable sized.
tmp = tmp[:5]
tmp[0] = 0xce // muint32
binary.BigEndian.PutUint32(tmp[1:], uint32(xxhash.Sum64(dst[dataOffset:])))
dst = append(dst, tmp[:5]...)
return append(dst, x.data...), nil
}
func (x *xlMetaV2) findVersion(key [16]byte) (idx int, ver *xlMetaV2Version, err error) {
for i, ver := range x.versions {
if key == ver.header.VersionID {
obj, err := x.getIdx(i)
return i, obj, err
}
}
return -1, nil, errFileVersionNotFound
}
func (x *xlMetaV2) getIdx(idx int) (ver *xlMetaV2Version, err error) {
if idx < 0 || idx >= len(x.versions) {
return nil, errFileNotFound
}
var dst xlMetaV2Version
_, err = dst.unmarshalV(x.metaV, x.versions[idx].meta)
if false {
if err == nil && x.versions[idx].header.VersionID != dst.getVersionID() {
panic(fmt.Sprintf("header: %x != object id: %x", x.versions[idx].header.VersionID, dst.getVersionID()))
}
}
return &dst, err
}
// setIdx will replace a version at a given index.
// Note that versions may become re-sorted if modtime changes.
func (x *xlMetaV2) setIdx(idx int, ver xlMetaV2Version) (err error) {
if idx < 0 || idx >= len(x.versions) {
return errFileNotFound
}
update := &x.versions[idx]
prevMod := update.header.ModTime
update.meta, err = ver.MarshalMsg(update.meta[:0:len(update.meta)])
if err != nil {
update.meta = nil
return err
}
update.header = ver.header()
if prevMod != update.header.ModTime {
x.sortByModTime()
}
return nil
}
// sortByModTime will sort versions by modtime in descending order,
// meaning index 0 will be latest version.
func (x *xlMetaV2) sortByModTime() {
// Quick check
if len(x.versions) <= 1 || sort.SliceIsSorted(x.versions, func(i, j int) bool {
return x.versions[i].header.sortsBefore(x.versions[j].header)
}) {
return
}
// We should sort.
sort.Slice(x.versions, func(i, j int) bool {
return x.versions[i].header.sortsBefore(x.versions[j].header)
})
}
// DeleteVersion deletes the version specified by version id.
// returns to the caller which dataDir to delete, also
// indicates if this is the last version.
func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
// This is a situation where versionId is explicitly
// specified as "null", as we do not save "null"
// string it is considered empty. But empty also
// means the version which matches will be purged.
if fi.VersionID == nullVersionID {
fi.VersionID = ""
}
var uv uuid.UUID
var err error
if fi.VersionID != "" {
uv, err = uuid.Parse(fi.VersionID)
if err != nil {
return "", false, errFileVersionNotFound
}
}
var ventry xlMetaV2Version
if fi.Deleted {
ventry = xlMetaV2Version{
Type: DeleteType,
DeleteMarker: &xlMetaV2DeleteMarker{
VersionID: uv,
ModTime: fi.ModTime.UnixNano(),
MetaSys: make(map[string][]byte),
},
}
if !ventry.Valid() {
return "", false, errors.New("internal error: invalid version entry generated")
}
}
updateVersion := false
if fi.VersionPurgeStatus().Empty() && (fi.DeleteMarkerReplicationStatus() == "REPLICA" || fi.DeleteMarkerReplicationStatus().Empty()) {
updateVersion = fi.MarkDeleted
} else {
// for replication scenario
if fi.Deleted && fi.VersionPurgeStatus() != Complete {
if !fi.VersionPurgeStatus().Empty() || fi.DeleteMarkerReplicationStatus().Empty() {
updateVersion = true
}
}
// object or delete-marker versioned delete is not complete
if !fi.VersionPurgeStatus().Empty() && fi.VersionPurgeStatus() != Complete {
updateVersion = true
}
}
if fi.Deleted {
if !fi.DeleteMarkerReplicationStatus().Empty() {
switch fi.DeleteMarkerReplicationStatus() {
case replication.Replica:
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(http.TimeFormat))
default:
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(http.TimeFormat))
}
}
if !fi.VersionPurgeStatus().Empty() {
ventry.DeleteMarker.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
}
for k, v := range fi.ReplicationState.ResetStatusesMap {
ventry.DeleteMarker.MetaSys[k] = []byte(v)
}
}
for i, ver := range x.versions {
if ver.header.VersionID != uv {
continue
}
switch ver.header.Type {
case LegacyType:
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
}
x.versions = append(x.versions[:i], x.versions[i+1:]...)
if fi.Deleted {
err = x.addVersion(ventry)
}
return ver.ObjectV1.DataDir, len(x.versions) == 0, err
case DeleteType:
if updateVersion {
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
}
if len(ver.DeleteMarker.MetaSys) == 0 {
ver.DeleteMarker.MetaSys = make(map[string][]byte)
}
if !fi.DeleteMarkerReplicationStatus().Empty() {
switch fi.DeleteMarkerReplicationStatus() {
case replication.Replica:
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(http.TimeFormat))
default:
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(http.TimeFormat))
}
}
if !fi.VersionPurgeStatus().Empty() {
ver.DeleteMarker.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
}
for k, v := range fi.ReplicationState.ResetStatusesMap {
ver.DeleteMarker.MetaSys[k] = []byte(v)
}
err = x.setIdx(i, *ver)
return "", len(x.versions) == 0, err
}
var err error
x.versions = append(x.versions[:i], x.versions[i+1:]...)
if fi.MarkDeleted && (fi.VersionPurgeStatus().Empty() || (fi.VersionPurgeStatus() != Complete)) {
err = x.addVersion(ventry)
}
return "", len(x.versions) == 0, err
case ObjectType:
if updateVersion && !fi.Deleted {
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
}
ver.ObjectV2.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
for k, v := range fi.ReplicationState.ResetStatusesMap {
ver.ObjectV2.MetaSys[k] = []byte(v)
}
err = x.setIdx(i, *ver)
return "", len(x.versions) == 0, err
}
}
}
for i, version := range x.versions {
if version.header.Type != ObjectType || version.header.VersionID != uv {
continue
}
ver, err := x.getIdx(i)
if err != nil {
return "", false, err
}
switch {
case fi.ExpireRestored:
ver.ObjectV2.RemoveRestoreHdrs()
err = x.setIdx(i, *ver)
case fi.TransitionStatus == lifecycle.TransitionComplete:
ver.ObjectV2.SetTransition(fi)
err = x.setIdx(i, *ver)
default:
x.versions = append(x.versions[:i], x.versions[i+1:]...)
// if uv has tiered content we add a
// free-version to track it for
// asynchronous deletion via scanner.
if freeVersion, toFree := ver.ObjectV2.InitFreeVersion(fi); toFree {
err = x.addVersion(freeVersion)
}
}
logger.LogIf(context.Background(), err)
if fi.Deleted {
err = x.addVersion(ventry)
}
if x.SharedDataDirCount(ver.ObjectV2.VersionID, ver.ObjectV2.DataDir) > 0 {
// Found that another version references the same dataDir
// we shouldn't remove it, and only remove the version instead
return "", len(x.versions) == 0, nil
}
return uuid.UUID(ver.ObjectV2.DataDir).String(), len(x.versions) == 0, err
}
if fi.Deleted {
err = x.addVersion(ventry)
return "", false, err
}
return "", false, errFileVersionNotFound
}
// xlMetaDataDirDecoder is a shallow decoder for decoding object datadir only.
type xlMetaDataDirDecoder struct {
ObjectV2 *struct {
DataDir [16]byte `msg:"DDir"` // Data dir ID
} `msg:"V2Obj,omitempty"`
}
// UpdateObjectVersion updates metadata and modTime for a given
// versionID, NOTE: versionID must be valid and should exist -
// and must not be a DeleteMarker or legacy object, if no
// versionID is specified 'null' versionID is updated instead.
//
// It is callers responsibility to set correct versionID, this
// function shouldn't be further extended to update immutable
// values such as ErasureInfo, ChecksumInfo.
//
// Metadata is only updated to new values, existing values
// stay as is, if you wish to update all values you should
// update all metadata freshly before calling this function
// in-case you wish to clear existing metadata.
func (x *xlMetaV2) UpdateObjectVersion(fi FileInfo) error {
if fi.VersionID == "" {
// this means versioning is not yet
// enabled or suspend i.e all versions
// are basically default value i.e "null"
fi.VersionID = nullVersionID
}
var uv uuid.UUID
var err error
if fi.VersionID != "" && fi.VersionID != nullVersionID {
uv, err = uuid.Parse(fi.VersionID)
if err != nil {
return err
}
}
for i, version := range x.versions {
switch version.header.Type {
case LegacyType, DeleteType:
if version.header.VersionID == uv {
return errMethodNotAllowed
}
case ObjectType:
if version.header.VersionID == uv {
ver, err := x.getIdx(i)
if err != nil {
return err
}
for k, v := range fi.Metadata {
if len(k) > len(ReservedMetadataPrefixLower) && strings.EqualFold(k[:len(ReservedMetadataPrefixLower)], ReservedMetadataPrefixLower) {
ver.ObjectV2.MetaSys[k] = []byte(v)
} else {
ver.ObjectV2.MetaUser[k] = v
}
}
if !fi.ModTime.IsZero() {
ver.ObjectV2.ModTime = fi.ModTime.UnixNano()
}
return x.setIdx(i, *ver)
}
}
}
return errFileVersionNotFound
}
// AddVersion adds a new version
func (x *xlMetaV2) AddVersion(fi FileInfo) error {
if fi.VersionID == "" {
// this means versioning is not yet
// enabled or suspend i.e all versions
// are basically default value i.e "null"
fi.VersionID = nullVersionID
}
var uv uuid.UUID
var err error
if fi.VersionID != "" && fi.VersionID != nullVersionID {
uv, err = uuid.Parse(fi.VersionID)
if err != nil {
return err
}
}
var dd uuid.UUID
if fi.DataDir != "" {
dd, err = uuid.Parse(fi.DataDir)
if err != nil {
return err
}
}
ventry := xlMetaV2Version{}
if fi.Deleted {
ventry.Type = DeleteType
ventry.DeleteMarker = &xlMetaV2DeleteMarker{
VersionID: uv,
ModTime: fi.ModTime.UnixNano(),
MetaSys: make(map[string][]byte),
}
} else {
ventry.Type = ObjectType
ventry.ObjectV2 = &xlMetaV2Object{
VersionID: uv,
DataDir: dd,
Size: fi.Size,
ModTime: fi.ModTime.UnixNano(),
ErasureAlgorithm: ReedSolomon,
ErasureM: fi.Erasure.DataBlocks,
ErasureN: fi.Erasure.ParityBlocks,
ErasureBlockSize: fi.Erasure.BlockSize,
ErasureIndex: fi.Erasure.Index,
BitrotChecksumAlgo: HighwayHash,
ErasureDist: make([]uint8, len(fi.Erasure.Distribution)),
PartNumbers: make([]int, len(fi.Parts)),
PartETags: nil,
PartSizes: make([]int64, len(fi.Parts)),
PartActualSizes: make([]int64, len(fi.Parts)),
MetaSys: make(map[string][]byte),
MetaUser: make(map[string]string, len(fi.Metadata)),
}
for i := range fi.Parts {
// Only add etags if any.
if fi.Parts[i].ETag != "" {
ventry.ObjectV2.PartETags = make([]string, len(fi.Parts))
break
}
}
for i := range fi.Erasure.Distribution {
ventry.ObjectV2.ErasureDist[i] = uint8(fi.Erasure.Distribution[i])
}
for i := range fi.Parts {
ventry.ObjectV2.PartSizes[i] = fi.Parts[i].Size
if len(ventry.ObjectV2.PartETags) > 0 && fi.Parts[i].ETag != "" {
ventry.ObjectV2.PartETags[i] = fi.Parts[i].ETag
}
ventry.ObjectV2.PartNumbers[i] = fi.Parts[i].Number
ventry.ObjectV2.PartActualSizes[i] = fi.Parts[i].ActualSize
}
tierFVIDKey := ReservedMetadataPrefixLower + tierFVID
tierFVMarkerKey := ReservedMetadataPrefixLower + tierFVMarker
for k, v := range fi.Metadata {
if len(k) > len(ReservedMetadataPrefixLower) && strings.EqualFold(k[:len(ReservedMetadataPrefixLower)], ReservedMetadataPrefixLower) {
// Skip tierFVID, tierFVMarker keys; it's used
// only for creating free-version.
switch k {
case tierFVIDKey, tierFVMarkerKey:
continue
}
ventry.ObjectV2.MetaSys[k] = []byte(v)
} else {
ventry.ObjectV2.MetaUser[k] = v
}
}
// If asked to save data.
if len(fi.Data) > 0 || fi.Size == 0 {
x.data.replace(fi.VersionID, fi.Data)
}
if fi.TransitionStatus != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus)
}
if fi.TransitionedObjName != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName)
}
if fi.TransitionVersionID != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID)
}
if fi.TransitionTier != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
}
}
if !ventry.Valid() {
return errors.New("internal error: invalid version entry generated")
}
// Check if we should replace first.
for i := range x.versions {
if x.versions[i].header.VersionID != uv {
continue
}
switch x.versions[i].header.Type {
case LegacyType:
// This would convert legacy type into new ObjectType
// this means that we are basically purging the `null`
// version of the object.
return x.setIdx(i, ventry)
case ObjectType:
return x.setIdx(i, ventry)
case DeleteType:
// Allowing delete marker to replaced with proper
// object data type as well, this is not S3 complaint
// behavior but kept here for future flexibility.
return x.setIdx(i, ventry)
}
}
// We did not find it, add it.
return x.addVersion(ventry)
}
func (x *xlMetaV2) SharedDataDirCount(versionID [16]byte, dataDir [16]byte) int {
// v2 object is inlined, if it is skip dataDir share check.
if x.data.entries() > 0 && x.data.find(uuid.UUID(versionID).String()) != nil {
return 0
}
var sameDataDirCount int
var decoded xlMetaDataDirDecoder
for _, version := range x.versions {
if version.header.Type != ObjectType || version.header.VersionID == versionID || !version.header.UsesDataDir() {
continue
}
_, err := decoded.UnmarshalMsg(version.meta)
if err != nil || decoded.ObjectV2 == nil || decoded.ObjectV2.DataDir != dataDir {
continue
}
sameDataDirCount++
}
return sameDataDirCount
}
func (x *xlMetaV2) SharedDataDirCountStr(versionID, dataDir string) int {
var (
uv uuid.UUID
ddir uuid.UUID
err error
)
if versionID == nullVersionID {
versionID = ""
}
if versionID != "" {
uv, err = uuid.Parse(versionID)
if err != nil {
return 0
}
}
ddir, err = uuid.Parse(dataDir)
if err != nil {
return 0
}
return x.SharedDataDirCount(uv, ddir)
}
// AddLegacy adds a legacy version, is only called when no prior
// versions exist, safe to use it by only one function in xl-storage(RenameData)
func (x *xlMetaV2) AddLegacy(m *xlMetaV1Object) error {
if !m.valid() {
return errFileCorrupt
}
m.VersionID = nullVersionID
return x.addVersion(xlMetaV2Version{ObjectV1: m, Type: LegacyType})
}
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
// for consumption across callers.
func (x xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) {
var uv uuid.UUID
if versionID != "" && versionID != nullVersionID {
uv, err = uuid.Parse(versionID)
if err != nil {
logger.LogIf(GlobalContext, fmt.Errorf("invalid versionID specified %s", versionID))
return fi, errFileVersionNotFound
}
}
var succModTime int64
isLatest := true
nonFreeVersions := len(x.versions)
found := false
for _, ver := range x.versions {
header := &ver.header
// skip listing free-version unless explicitly requested via versionID
if header.FreeVersion() {
nonFreeVersions--
if header.VersionID != uv {
continue
}
}
if found {
continue
}
// We need a specific version, skip...
if versionID != "" && uv != header.VersionID {
isLatest = false
succModTime = header.ModTime
continue
}
// We found what we need.
found = true
var version xlMetaV2Version
if _, err := version.unmarshalV(x.metaV, ver.meta); err != nil {
return fi, err
}
if fi, err = version.ToFileInfo(volume, path); err != nil {
return fi, err
}
fi.IsLatest = isLatest
if succModTime != 0 {
fi.SuccessorModTime = time.Unix(0, succModTime)
}
}
if !found {
if versionID == "" {
return FileInfo{}, errFileNotFound
}
return FileInfo{}, errFileVersionNotFound
}
fi.NumVersions = nonFreeVersions
return fi, err
}
// ListVersions lists current versions, and current deleted
// versions returns error for unexpected entries.
// showPendingDeletes is set to true if ListVersions needs to list objects marked deleted
// but waiting to be replicated
func (x xlMetaV2) ListVersions(volume, path string) ([]FileInfo, error) {
versions := make([]FileInfo, 0, len(x.versions))
var err error
var dst xlMetaV2Version
for _, version := range x.versions {
_, err = dst.unmarshalV(x.metaV, version.meta)
if err != nil {
return versions, err
}
fi, err := dst.ToFileInfo(volume, path)
if err != nil {
return versions, err
}
fi.NumVersions = len(x.versions)
versions = append(versions, fi)
}
for i := range versions {
versions[i].NumVersions = len(versions)
if i > 0 {
versions[i].SuccessorModTime = versions[i-1].ModTime
}
}
if len(versions) > 0 {
versions[0].IsLatest = true
}
return versions, nil
}
// mergeXLV2Versions will merge all versions, typically from different disks
// that have at least quorum entries in all metas.
// Quorum must be the minimum number of matching metadata files.
// Quorum should be > 1 and <= len(versions).
// If strict is set to false, entries that match type
func mergeXLV2Versions(quorum int, strict bool, versions ...[]xlMetaV2ShallowVersion) (merged []xlMetaV2ShallowVersion) {
if quorum <= 0 {
quorum = 1
}
if len(versions) < quorum || len(versions) == 0 {
return nil
}
if len(versions) == 1 {
return versions[0]
}
if quorum == 1 {
// No need for non-strict checks if quorum is 1.
strict = true
}
// Shallow copy input
versions = append(make([][]xlMetaV2ShallowVersion, 0, len(versions)), versions...)
// Our result
merged = make([]xlMetaV2ShallowVersion, 0, len(versions[0]))
tops := make([]xlMetaV2ShallowVersion, len(versions))
for {
// Step 1 create slice with all top versions.
tops = tops[:0]
var topSig xlMetaV2VersionHeader
consistent := true // Are all signatures consistent (shortcut)
for _, vers := range versions {
if len(vers) == 0 {
consistent = false
continue
}
ver := vers[0]
if len(tops) == 0 {
consistent = true
topSig = ver.header
} else {
consistent = consistent && ver.header == topSig
}
tops = append(tops, vers[0])
}
// Check if done...
if len(tops) < quorum {
// We couldn't gather enough for quorum
break
}
var latest xlMetaV2ShallowVersion
var latestCount int
if consistent {
// All had the same signature, easy.
latest = tops[0]
latestCount = len(tops)
merged = append(merged, latest)
} else {
// Find latest.
for i, ver := range tops {
if ver.header == latest.header {
latestCount++
continue
}
if i == 0 || ver.header.sortsBefore(latest.header) {
if i == 0 || latestCount == 0 {
latestCount = 1
} else if !strict && ver.header.matchesNotStrict(latest.header) {
latestCount++
} else {
latestCount = 1
}
latest = ver
continue
}
// Mismatch, but older.
if latestCount > 0 && !strict && ver.header.matchesNotStrict(latest.header) {
latestCount++
continue
}
if latestCount > 0 && ver.header.VersionID == latest.header.VersionID {
// Version IDs match, but otherwise unable to resolve.
// We are either strict, or don't have enough information to match.
// Switch to a pure counting algo.
x := make(map[xlMetaV2VersionHeader]int, len(tops))
for _, a := range tops {
if a.header.VersionID != ver.header.VersionID {
continue
}
if !strict {
a.header.Signature = [4]byte{}
}
x[a.header]++
}
latestCount = 0
for k, v := range x {
if v < latestCount {
continue
}
if v == latestCount && latest.header.sortsBefore(k) {
// Tiebreak, use sort.
continue
}
for _, a := range tops {
hdr := a.header
if !strict {
hdr.Signature = [4]byte{}
}
if hdr == k {
latest = a
}
}
latestCount = v
}
break
}
}
if latestCount >= quorum {
merged = append(merged, latest)
}
}
// Remove from all streams up until latest modtime or if selected.
for i, vers := range versions {
for _, ver := range vers {
// Truncate later modtimes, not selected.
if ver.header.ModTime > latest.header.ModTime {
versions[i] = versions[i][1:]
continue
}
// Truncate matches
if ver.header == latest.header {
versions[i] = versions[i][1:]
continue
}
// Truncate non-empty version and type matches
if latest.header.VersionID == ver.header.VersionID {
versions[i] = versions[i][1:]
continue
}
// Skip versions with version id we already emitted.
for _, mergedV := range merged {
if ver.header.VersionID == mergedV.header.VersionID {
versions[i] = versions[i][1:]
continue
}
}
// Keep top entry (and remaining)...
break
}
}
}
// Sanity check. Enable if duplicates show up.
if false {
found := make(map[[16]byte]struct{})
for _, ver := range merged {
if _, ok := found[ver.header.VersionID]; ok {
panic("found dupe")
}
found[ver.header.VersionID] = struct{}{}
}
}
return merged
}
type xlMetaBuf []byte
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
// for consumption across callers.
func (x xlMetaBuf) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) {
var uv uuid.UUID
if versionID != "" && versionID != nullVersionID {
uv, err = uuid.Parse(versionID)
if err != nil {
logger.LogIf(GlobalContext, fmt.Errorf("invalid versionID specified %s", versionID))
return fi, errFileVersionNotFound
}
}
versions, headerV, metaV, buf, err := decodeXLHeaders(x)
if err != nil {
return fi, err
}
var header xlMetaV2VersionHeader
var succModTime int64
isLatest := true
nonFreeVersions := versions
found := false
err = decodeVersions(buf, versions, func(idx int, hdr, meta []byte) error {
if _, err := header.unmarshalV(headerV, hdr); err != nil {
return err
}
// skip listing free-version unless explicitly requested via versionID
if header.FreeVersion() {
nonFreeVersions--
if header.VersionID != uv {
return nil
}
}
if found {
return nil
}
// We need a specific version, skip...
if versionID != "" && uv != header.VersionID {
isLatest = false
succModTime = header.ModTime
return nil
}
// We found what we need.
found = true
var version xlMetaV2Version
if _, err := version.unmarshalV(metaV, meta); err != nil {
return err
}
if fi, err = version.ToFileInfo(volume, path); err != nil {
return err
}
fi.IsLatest = isLatest
if succModTime != 0 {
fi.SuccessorModTime = time.Unix(0, succModTime)
}
return nil
})
if !found {
if versionID == "" {
return FileInfo{}, errFileNotFound
}
return FileInfo{}, errFileVersionNotFound
}
fi.NumVersions = nonFreeVersions
return fi, err
}
// ListVersions lists current versions, and current deleted
// versions returns error for unexpected entries.
// showPendingDeletes is set to true if ListVersions needs to list objects marked deleted
// but waiting to be replicated
func (x xlMetaBuf) ListVersions(volume, path string) ([]FileInfo, error) {
vers, _, metaV, buf, err := decodeXLHeaders(x)
if err != nil {
return nil, err
}
var succModTime time.Time
isLatest := true
dst := make([]FileInfo, 0, vers)
var xl xlMetaV2Version
err = decodeVersions(buf, vers, func(idx int, hdr, meta []byte) error {
if _, err := xl.unmarshalV(metaV, meta); err != nil {
return err
}
if !xl.Valid() {
return errFileCorrupt
}
fi, err := xl.ToFileInfo(volume, path)
if err != nil {
return err
}
fi.IsLatest = isLatest
fi.SuccessorModTime = succModTime
fi.NumVersions = vers
isLatest = false
succModTime = xl.getModTime()
dst = append(dst, fi)
return nil
})
return dst, err
}
// IsLatestDeleteMarker returns true if latest version is a deletemarker or there are no versions.
// If any error occurs false is returned.
func (x xlMetaBuf) IsLatestDeleteMarker() bool {
vers, headerV, _, buf, err := decodeXLHeaders(x)
if err != nil {
return false
}
if vers == 0 {
return true
}
isDeleteMarker := false
_ = decodeVersions(buf, vers, func(idx int, hdr, _ []byte) error {
var xl xlMetaV2VersionHeader
if _, err := xl.unmarshalV(headerV, hdr); err != nil {
return errDoneForNow
}
isDeleteMarker = xl.Type == DeleteType
return errDoneForNow
})
return isDeleteMarker
}