mirror of
https://github.com/minio/minio.git
synced 2025-01-25 13:43:17 -05:00
200caab82b
This was a regression introduced in '14bb969782' this has the potential to cause corruption when there are concurrent overwrites attempting to update the content on the namespace. This PR adds a situation where PutObject(), CopyObject() compete properly for the same locks with NewMultipartUpload() however it ends up turning off competing locks for the actual object with GetObject() and DeleteObject() - since they do not compete due to concurrent I/O on a versioned bucket it can lead to loss of versions. This PR fixes this bug with multi-pool setup with replication that causes corruption of inlined data due to lack of competing locks in a multi-pool setup. Instead CompleteMultipartUpload holds the necessary locks when finishing the transaction, knowing the exact location of an object to schedule the multipart upload doesn't need to compete in this manner, a pool id location for existing object.
1554 lines
45 KiB
Go
1554 lines
45 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cespare/xxhash/v2"
|
|
"github.com/google/uuid"
|
|
"github.com/minio/minio/internal/bucket/lifecycle"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/tinylib/msgp/msgp"
|
|
)
|
|
|
|
var (
|
|
// XL header specifies the format
|
|
xlHeader = [4]byte{'X', 'L', '2', ' '}
|
|
|
|
// Current version being written.
|
|
xlVersionCurrent [4]byte
|
|
)
|
|
|
|
const (
|
|
// Breaking changes.
|
|
// Newer versions cannot be read by older software.
|
|
// This will prevent downgrades to incompatible versions.
|
|
xlVersionMajor = 1
|
|
|
|
// Non breaking changes.
|
|
// Bumping this is informational, but should be done
|
|
// if any change is made to the data stored, bumping this
|
|
// will allow to detect the exact version later.
|
|
xlVersionMinor = 2
|
|
)
|
|
|
|
func init() {
|
|
binary.LittleEndian.PutUint16(xlVersionCurrent[0:2], xlVersionMajor)
|
|
binary.LittleEndian.PutUint16(xlVersionCurrent[2:4], xlVersionMinor)
|
|
}
|
|
|
|
// checkXL2V1 will check if the metadata has correct header and is a known major version.
|
|
// The remaining payload and versions are returned.
|
|
func checkXL2V1(buf []byte) (payload []byte, major, minor uint16, err error) {
|
|
if len(buf) <= 8 {
|
|
return payload, 0, 0, fmt.Errorf("xlMeta: no data")
|
|
}
|
|
|
|
if !bytes.Equal(buf[:4], xlHeader[:]) {
|
|
return payload, 0, 0, fmt.Errorf("xlMeta: unknown XLv2 header, expected %v, got %v", xlHeader[:4], buf[:4])
|
|
}
|
|
|
|
if bytes.Equal(buf[4:8], []byte("1 ")) {
|
|
// Set as 1,0.
|
|
major, minor = 1, 0
|
|
} else {
|
|
major, minor = binary.LittleEndian.Uint16(buf[4:6]), binary.LittleEndian.Uint16(buf[6:8])
|
|
}
|
|
if major > xlVersionMajor {
|
|
return buf[8:], major, minor, fmt.Errorf("xlMeta: unknown major version %d found", major)
|
|
}
|
|
|
|
return buf[8:], major, minor, nil
|
|
}
|
|
|
|
func isXL2V1Format(buf []byte) bool {
|
|
_, _, _, err := checkXL2V1(buf)
|
|
return err == nil
|
|
}
|
|
|
|
// The []journal contains all the different versions of the object.
|
|
//
|
|
// This array can have 3 kinds of objects:
|
|
//
|
|
// ``object``: If the object is uploaded the usual way: putobject, multipart-put, copyobject
|
|
//
|
|
// ``delete``: This is the delete-marker
|
|
//
|
|
// ``legacyObject``: This is the legacy object in xlV1 format, preserved until its overwritten
|
|
//
|
|
// The most recently updated element in the array is considered the latest version.
|
|
|
|
// In addition to these we have a special kind called free-version. This is represented
|
|
// using a delete-marker and MetaSys entries. It's used to track tiered content of a
|
|
// deleted/overwritten version. This version is visible _only_to the scanner routine, for subsequent deletion.
|
|
// This kind of tracking is necessary since a version's tiered content is deleted asynchronously.
|
|
|
|
// Backend directory tree structure:
|
|
// disk1/
|
|
// └── bucket
|
|
// └── object
|
|
// ├── a192c1d5-9bd5-41fd-9a90-ab10e165398d
|
|
// │ └── part.1
|
|
// ├── c06e0436-f813-447e-ae5e-f2564df9dfd4
|
|
// │ └── part.1
|
|
// ├── df433928-2dcf-47b1-a786-43efa0f6b424
|
|
// │ └── part.1
|
|
// ├── legacy
|
|
// │ └── part.1
|
|
// └── xl.meta
|
|
|
|
//go:generate msgp -file=$GOFILE -unexported
|
|
|
|
// VersionType defines the type of journal type of the current entry.
|
|
type VersionType uint8
|
|
|
|
// List of different types of journal type
|
|
const (
|
|
invalidVersionType VersionType = 0
|
|
ObjectType VersionType = 1
|
|
DeleteType VersionType = 2
|
|
LegacyType VersionType = 3
|
|
lastVersionType VersionType = 4
|
|
)
|
|
|
|
func (e VersionType) valid() bool {
|
|
return e > invalidVersionType && e < lastVersionType
|
|
}
|
|
|
|
// ErasureAlgo defines common type of different erasure algorithms
|
|
type ErasureAlgo uint8
|
|
|
|
// List of currently supported erasure coding algorithms
|
|
const (
|
|
invalidErasureAlgo ErasureAlgo = 0
|
|
ReedSolomon ErasureAlgo = 1
|
|
lastErasureAlgo ErasureAlgo = 2
|
|
)
|
|
|
|
func (e ErasureAlgo) valid() bool {
|
|
return e > invalidErasureAlgo && e < lastErasureAlgo
|
|
}
|
|
|
|
func (e ErasureAlgo) String() string {
|
|
switch e {
|
|
case ReedSolomon:
|
|
return "reedsolomon"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// ChecksumAlgo defines common type of different checksum algorithms
|
|
type ChecksumAlgo uint8
|
|
|
|
// List of currently supported checksum algorithms
|
|
const (
|
|
invalidChecksumAlgo ChecksumAlgo = 0
|
|
HighwayHash ChecksumAlgo = 1
|
|
lastChecksumAlgo ChecksumAlgo = 2
|
|
)
|
|
|
|
func (e ChecksumAlgo) valid() bool {
|
|
return e > invalidChecksumAlgo && e < lastChecksumAlgo
|
|
}
|
|
|
|
// xlMetaV2DeleteMarker defines the data struct for the delete marker journal type
|
|
type xlMetaV2DeleteMarker struct {
|
|
VersionID [16]byte `json:"ID" msg:"ID"` // Version ID for delete marker
|
|
ModTime int64 `json:"MTime" msg:"MTime"` // Object delete marker modified time
|
|
MetaSys map[string][]byte `json:"MetaSys,omitempty" msg:"MetaSys,omitempty"` // Delete marker internal metadata
|
|
}
|
|
|
|
// xlMetaV2Object defines the data struct for object journal type
|
|
type xlMetaV2Object struct {
|
|
VersionID [16]byte `json:"ID" msg:"ID"` // Version ID
|
|
DataDir [16]byte `json:"DDir" msg:"DDir"` // Data dir ID
|
|
ErasureAlgorithm ErasureAlgo `json:"EcAlgo" msg:"EcAlgo"` // Erasure coding algorithm
|
|
ErasureM int `json:"EcM" msg:"EcM"` // Erasure data blocks
|
|
ErasureN int `json:"EcN" msg:"EcN"` // Erasure parity blocks
|
|
ErasureBlockSize int64 `json:"EcBSize" msg:"EcBSize"` // Erasure block size
|
|
ErasureIndex int `json:"EcIndex" msg:"EcIndex"` // Erasure disk index
|
|
ErasureDist []uint8 `json:"EcDist" msg:"EcDist"` // Erasure distribution
|
|
BitrotChecksumAlgo ChecksumAlgo `json:"CSumAlgo" msg:"CSumAlgo"` // Bitrot checksum algo
|
|
PartNumbers []int `json:"PartNums" msg:"PartNums"` // Part Numbers
|
|
PartETags []string `json:"PartETags" msg:"PartETags"` // Part ETags
|
|
PartSizes []int64 `json:"PartSizes" msg:"PartSizes"` // Part Sizes
|
|
PartActualSizes []int64 `json:"PartASizes,omitempty" msg:"PartASizes,omitempty"` // Part ActualSizes (compression)
|
|
Size int64 `json:"Size" msg:"Size"` // Object version size
|
|
ModTime int64 `json:"MTime" msg:"MTime"` // Object version modified time
|
|
MetaSys map[string][]byte `json:"MetaSys,omitempty" msg:"MetaSys,omitempty"` // Object version internal metadata
|
|
MetaUser map[string]string `json:"MetaUsr,omitempty" msg:"MetaUsr,omitempty"` // Object version metadata set by user
|
|
}
|
|
|
|
// xlMetaV2Version describes the jouranal entry, Type defines
|
|
// the current journal entry type other types might be nil based
|
|
// on what Type field carries, it is imperative for the caller
|
|
// to verify which journal type first before accessing rest of the fields.
|
|
type xlMetaV2Version struct {
|
|
Type VersionType `json:"Type" msg:"Type"`
|
|
ObjectV1 *xlMetaV1Object `json:"V1Obj,omitempty" msg:"V1Obj,omitempty"`
|
|
ObjectV2 *xlMetaV2Object `json:"V2Obj,omitempty" msg:"V2Obj,omitempty"`
|
|
DeleteMarker *xlMetaV2DeleteMarker `json:"DelObj,omitempty" msg:"DelObj,omitempty"`
|
|
}
|
|
|
|
// Valid xl meta xlMetaV2Version is valid
|
|
func (j xlMetaV2Version) Valid() bool {
|
|
if !j.Type.valid() {
|
|
return false
|
|
}
|
|
switch j.Type {
|
|
case LegacyType:
|
|
return j.ObjectV1 != nil &&
|
|
j.ObjectV1.valid()
|
|
case ObjectType:
|
|
return j.ObjectV2 != nil &&
|
|
j.ObjectV2.ErasureAlgorithm.valid() &&
|
|
j.ObjectV2.BitrotChecksumAlgo.valid() &&
|
|
isXLMetaErasureInfoValid(j.ObjectV2.ErasureM, j.ObjectV2.ErasureN) &&
|
|
j.ObjectV2.ModTime > 0
|
|
case DeleteType:
|
|
return j.DeleteMarker != nil &&
|
|
j.DeleteMarker.ModTime > 0
|
|
}
|
|
return false
|
|
}
|
|
|
|
// getModTime will return the ModTime of the underlying version.
|
|
func (j xlMetaV2Version) getModTime() time.Time {
|
|
switch j.Type {
|
|
case ObjectType:
|
|
return time.Unix(0, j.ObjectV2.ModTime)
|
|
case DeleteType:
|
|
return time.Unix(0, j.DeleteMarker.ModTime)
|
|
case LegacyType:
|
|
return j.ObjectV1.Stat.ModTime
|
|
}
|
|
return time.Time{}
|
|
}
|
|
|
|
// xlMetaV2 - object meta structure defines the format and list of
|
|
// the journals for the object.
|
|
type xlMetaV2 struct {
|
|
Versions []xlMetaV2Version `json:"Versions" msg:"Versions"`
|
|
|
|
// data will contain raw data if any.
|
|
// data will be one or more versions indexed by versionID.
|
|
// To remove all data set to nil.
|
|
data xlMetaInlineData `msg:"-"`
|
|
}
|
|
|
|
// xlMetaInlineData is serialized data in [string][]byte pairs.
|
|
//
|
|
//msgp:ignore xlMetaInlineData
|
|
type xlMetaInlineData []byte
|
|
|
|
// xlMetaInlineDataVer indicates the version of the inline data structure.
|
|
const xlMetaInlineDataVer = 1
|
|
|
|
// versionOK returns whether the version is ok.
|
|
func (x xlMetaInlineData) versionOK() bool {
|
|
if len(x) == 0 {
|
|
return true
|
|
}
|
|
return x[0] > 0 && x[0] <= xlMetaInlineDataVer
|
|
}
|
|
|
|
// afterVersion returns the payload after the version, if any.
|
|
func (x xlMetaInlineData) afterVersion() []byte {
|
|
if len(x) == 0 {
|
|
return x
|
|
}
|
|
return x[1:]
|
|
}
|
|
|
|
// find the data with key s.
|
|
// Returns nil if not for or an error occurs.
|
|
func (x xlMetaInlineData) find(key string) []byte {
|
|
if len(x) == 0 || !x.versionOK() {
|
|
return nil
|
|
}
|
|
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
|
if err != nil || sz == 0 {
|
|
return nil
|
|
}
|
|
for i := uint32(0); i < sz; i++ {
|
|
var found []byte
|
|
found, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil || sz == 0 {
|
|
return nil
|
|
}
|
|
if string(found) == key {
|
|
val, _, _ := msgp.ReadBytesZC(buf)
|
|
return val
|
|
}
|
|
// Skip it
|
|
_, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validate checks if the data is valid.
|
|
// It does not check integrity of the stored data.
|
|
func (x xlMetaInlineData) validate() error {
|
|
if len(x) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if !x.versionOK() {
|
|
return fmt.Errorf("xlMetaInlineData: unknown version 0x%x", x[0])
|
|
}
|
|
|
|
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
|
if err != nil {
|
|
return fmt.Errorf("xlMetaInlineData: %w", err)
|
|
}
|
|
|
|
for i := uint32(0); i < sz; i++ {
|
|
var key []byte
|
|
key, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("xlMetaInlineData: %w", err)
|
|
}
|
|
if len(key) == 0 {
|
|
return fmt.Errorf("xlMetaInlineData: key %d is length 0", i)
|
|
}
|
|
_, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("xlMetaInlineData: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// repair will copy all seemingly valid data entries from a corrupted set.
|
|
// This does not ensure that data is correct, but will allow all operations to complete.
|
|
func (x *xlMetaInlineData) repair() {
|
|
data := *x
|
|
if len(data) == 0 {
|
|
return
|
|
}
|
|
|
|
if !data.versionOK() {
|
|
*x = nil
|
|
return
|
|
}
|
|
|
|
sz, buf, err := msgp.ReadMapHeaderBytes(data.afterVersion())
|
|
if err != nil {
|
|
*x = nil
|
|
return
|
|
}
|
|
|
|
// Remove all current data
|
|
keys := make([][]byte, 0, sz)
|
|
vals := make([][]byte, 0, sz)
|
|
for i := uint32(0); i < sz; i++ {
|
|
var key, val []byte
|
|
key, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
if len(key) == 0 {
|
|
break
|
|
}
|
|
val, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
keys = append(keys, key)
|
|
vals = append(vals, val)
|
|
}
|
|
x.serialize(-1, keys, vals)
|
|
}
|
|
|
|
// validate checks if the data is valid.
|
|
// It does not check integrity of the stored data.
|
|
func (x xlMetaInlineData) list() ([]string, error) {
|
|
if len(x) == 0 {
|
|
return nil, nil
|
|
}
|
|
if !x.versionOK() {
|
|
return nil, errors.New("xlMetaInlineData: unknown version")
|
|
}
|
|
|
|
sz, buf, err := msgp.ReadMapHeaderBytes(x.afterVersion())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
keys := make([]string, 0, sz)
|
|
for i := uint32(0); i < sz; i++ {
|
|
var key []byte
|
|
key, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil {
|
|
return keys, err
|
|
}
|
|
if len(key) == 0 {
|
|
return keys, fmt.Errorf("xlMetaInlineData: key %d is length 0", i)
|
|
}
|
|
keys = append(keys, string(key))
|
|
// Skip data...
|
|
_, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
return keys, err
|
|
}
|
|
}
|
|
return keys, nil
|
|
}
|
|
|
|
// serialize will serialize the provided keys and values.
|
|
// The function will panic if keys/value slices aren't of equal length.
|
|
// Payload size can give an indication of expected payload size.
|
|
// If plSize is <= 0 it will be calculated.
|
|
func (x *xlMetaInlineData) serialize(plSize int, keys [][]byte, vals [][]byte) {
|
|
if len(keys) != len(vals) {
|
|
panic(fmt.Errorf("xlMetaInlineData.serialize: keys/value number mismatch"))
|
|
}
|
|
if len(keys) == 0 {
|
|
*x = nil
|
|
return
|
|
}
|
|
if plSize <= 0 {
|
|
plSize = 1 + msgp.MapHeaderSize
|
|
for i := range keys {
|
|
plSize += len(keys[i]) + len(vals[i]) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
|
}
|
|
}
|
|
payload := make([]byte, 1, plSize)
|
|
payload[0] = xlMetaInlineDataVer
|
|
payload = msgp.AppendMapHeader(payload, uint32(len(keys)))
|
|
for i := range keys {
|
|
payload = msgp.AppendStringFromBytes(payload, keys[i])
|
|
payload = msgp.AppendBytes(payload, vals[i])
|
|
}
|
|
*x = payload
|
|
}
|
|
|
|
// entries returns the number of entries in the data.
|
|
func (x xlMetaInlineData) entries() int {
|
|
if len(x) == 0 || !x.versionOK() {
|
|
return 0
|
|
}
|
|
sz, _, _ := msgp.ReadMapHeaderBytes(x.afterVersion())
|
|
return int(sz)
|
|
}
|
|
|
|
// replace will add or replace a key/value pair.
|
|
func (x *xlMetaInlineData) replace(key string, value []byte) {
|
|
in := x.afterVersion()
|
|
sz, buf, _ := msgp.ReadMapHeaderBytes(in)
|
|
keys := make([][]byte, 0, sz+1)
|
|
vals := make([][]byte, 0, sz+1)
|
|
|
|
// Version plus header...
|
|
plSize := 1 + msgp.MapHeaderSize
|
|
replaced := false
|
|
for i := uint32(0); i < sz; i++ {
|
|
var found, foundVal []byte
|
|
var err error
|
|
found, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
foundVal, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
plSize += len(found) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
|
keys = append(keys, found)
|
|
if string(found) == key {
|
|
vals = append(vals, value)
|
|
plSize += len(value)
|
|
replaced = true
|
|
} else {
|
|
vals = append(vals, foundVal)
|
|
plSize += len(foundVal)
|
|
}
|
|
}
|
|
|
|
// Add one more.
|
|
if !replaced {
|
|
keys = append(keys, []byte(key))
|
|
vals = append(vals, value)
|
|
plSize += len(key) + len(value) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
|
}
|
|
|
|
// Reserialize...
|
|
x.serialize(plSize, keys, vals)
|
|
}
|
|
|
|
// rename will rename a key.
|
|
// Returns whether the key was found.
|
|
func (x *xlMetaInlineData) rename(oldKey, newKey string) bool {
|
|
in := x.afterVersion()
|
|
sz, buf, _ := msgp.ReadMapHeaderBytes(in)
|
|
keys := make([][]byte, 0, sz)
|
|
vals := make([][]byte, 0, sz)
|
|
|
|
// Version plus header...
|
|
plSize := 1 + msgp.MapHeaderSize
|
|
found := false
|
|
for i := uint32(0); i < sz; i++ {
|
|
var foundKey, foundVal []byte
|
|
var err error
|
|
foundKey, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
foundVal, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
plSize += len(foundVal) + msgp.StringPrefixSize + msgp.ArrayHeaderSize
|
|
vals = append(vals, foundVal)
|
|
if string(foundKey) != oldKey {
|
|
keys = append(keys, foundKey)
|
|
plSize += len(foundKey)
|
|
} else {
|
|
keys = append(keys, []byte(newKey))
|
|
plSize += len(newKey)
|
|
found = true
|
|
}
|
|
}
|
|
// If not found, just return.
|
|
if !found {
|
|
return false
|
|
}
|
|
|
|
// Reserialize...
|
|
x.serialize(plSize, keys, vals)
|
|
return true
|
|
}
|
|
|
|
// remove will remove one or more keys.
|
|
// Returns true if any key was found.
|
|
func (x *xlMetaInlineData) remove(keys ...string) bool {
|
|
in := x.afterVersion()
|
|
sz, buf, _ := msgp.ReadMapHeaderBytes(in)
|
|
newKeys := make([][]byte, 0, sz)
|
|
newVals := make([][]byte, 0, sz)
|
|
var removeKey func(s []byte) bool
|
|
|
|
// Copy if big number of compares...
|
|
if len(keys) > 5 && sz > 5 {
|
|
mKeys := make(map[string]struct{}, len(keys))
|
|
for _, key := range keys {
|
|
mKeys[key] = struct{}{}
|
|
}
|
|
removeKey = func(s []byte) bool {
|
|
_, ok := mKeys[string(s)]
|
|
return ok
|
|
}
|
|
} else {
|
|
removeKey = func(s []byte) bool {
|
|
for _, key := range keys {
|
|
if key == string(s) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Version plus header...
|
|
plSize := 1 + msgp.MapHeaderSize
|
|
found := false
|
|
for i := uint32(0); i < sz; i++ {
|
|
var foundKey, foundVal []byte
|
|
var err error
|
|
foundKey, buf, err = msgp.ReadMapKeyZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
foundVal, buf, err = msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
break
|
|
}
|
|
if !removeKey(foundKey) {
|
|
plSize += msgp.StringPrefixSize + msgp.ArrayHeaderSize + len(foundKey) + len(foundVal)
|
|
newKeys = append(newKeys, foundKey)
|
|
newVals = append(newVals, foundVal)
|
|
} else {
|
|
found = true
|
|
}
|
|
}
|
|
// If not found, just return.
|
|
if !found {
|
|
return false
|
|
}
|
|
// If none left...
|
|
if len(newKeys) == 0 {
|
|
*x = nil
|
|
return true
|
|
}
|
|
|
|
// Reserialize...
|
|
x.serialize(plSize, newKeys, newVals)
|
|
return true
|
|
}
|
|
|
|
// xlMetaV2TrimData will trim any data from the metadata without unmarshalling it.
|
|
// If any error occurs the unmodified data is returned.
|
|
func xlMetaV2TrimData(buf []byte) []byte {
|
|
metaBuf, min, maj, err := checkXL2V1(buf)
|
|
if err != nil {
|
|
return buf
|
|
}
|
|
if maj == 1 && min < 1 {
|
|
// First version to carry data.
|
|
return buf
|
|
}
|
|
// Skip header
|
|
_, metaBuf, err = msgp.ReadBytesZC(metaBuf)
|
|
if err != nil {
|
|
logger.LogIf(GlobalContext, err)
|
|
return buf
|
|
}
|
|
// Skip CRC
|
|
if maj > 1 || min >= 2 {
|
|
_, metaBuf, err = msgp.ReadUint32Bytes(metaBuf)
|
|
logger.LogIf(GlobalContext, err)
|
|
}
|
|
// = input - current pos
|
|
ends := len(buf) - len(metaBuf)
|
|
if ends > len(buf) {
|
|
return buf
|
|
}
|
|
|
|
return buf[:ends]
|
|
}
|
|
|
|
// AddLegacy adds a legacy version, is only called when no prior
|
|
// versions exist, safe to use it by only one function in xl-storage(RenameData)
|
|
func (z *xlMetaV2) AddLegacy(m *xlMetaV1Object) error {
|
|
if !m.valid() {
|
|
return errFileCorrupt
|
|
}
|
|
m.VersionID = nullVersionID
|
|
m.DataDir = legacyDataDir
|
|
z.Versions = []xlMetaV2Version{
|
|
{
|
|
Type: LegacyType,
|
|
ObjectV1: m,
|
|
},
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Load unmarshal and load the entire message pack.
|
|
// Note that references to the incoming buffer may be kept as data.
|
|
func (z *xlMetaV2) Load(buf []byte) error {
|
|
buf, major, minor, err := checkXL2V1(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("xlMetaV2.Load %w", err)
|
|
}
|
|
switch major {
|
|
case 1:
|
|
switch minor {
|
|
case 0:
|
|
_, err = z.UnmarshalMsg(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("xlMetaV2.Load %w", err)
|
|
}
|
|
return nil
|
|
case 1, 2:
|
|
v, buf, err := msgp.ReadBytesZC(buf)
|
|
if err != nil {
|
|
return fmt.Errorf("xlMetaV2.Load version(%d), bufLen(%d) %w", minor, len(buf), err)
|
|
}
|
|
if minor >= 2 {
|
|
if crc, nbuf, err := msgp.ReadUint32Bytes(buf); err == nil {
|
|
// Read metadata CRC (added in v2)
|
|
buf = nbuf
|
|
if got := uint32(xxhash.Sum64(v)); got != crc {
|
|
return fmt.Errorf("xlMetaV2.Load version(%d), CRC mismatch, want 0x%x, got 0x%x", minor, crc, got)
|
|
}
|
|
} else {
|
|
return fmt.Errorf("xlMetaV2.Load version(%d), loading CRC: %w", minor, err)
|
|
}
|
|
}
|
|
|
|
if _, err = z.UnmarshalMsg(v); err != nil {
|
|
return fmt.Errorf("xlMetaV2.Load version(%d), vLen(%d), %w", minor, len(v), err)
|
|
}
|
|
// Add remaining data.
|
|
z.data = buf
|
|
if err = z.data.validate(); err != nil {
|
|
z.data.repair()
|
|
logger.Info("xlMetaV2.Load: data validation failed: %v. %d entries after repair", err, z.data.entries())
|
|
}
|
|
default:
|
|
return errors.New("unknown minor metadata version")
|
|
}
|
|
default:
|
|
return errors.New("unknown major metadata version")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AppendTo will marshal the data in z and append it to the provided slice.
|
|
func (z *xlMetaV2) AppendTo(dst []byte) ([]byte, error) {
|
|
sz := len(xlHeader) + len(xlVersionCurrent) + msgp.ArrayHeaderSize + z.Msgsize() + len(z.data) + len(dst) + msgp.Uint32Size
|
|
if cap(dst) < sz {
|
|
buf := make([]byte, len(dst), sz)
|
|
copy(buf, dst)
|
|
dst = buf
|
|
}
|
|
if err := z.data.validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dst = append(dst, xlHeader[:]...)
|
|
dst = append(dst, xlVersionCurrent[:]...)
|
|
// Add "bin 32" type header to always have enough space.
|
|
// We will fill out the correct size when we know it.
|
|
dst = append(dst, 0xc6, 0, 0, 0, 0)
|
|
dataOffset := len(dst)
|
|
dst, err := z.MarshalMsg(dst)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Update size...
|
|
binary.BigEndian.PutUint32(dst[dataOffset-4:dataOffset], uint32(len(dst)-dataOffset))
|
|
|
|
// Add CRC of metadata.
|
|
dst = msgp.AppendUint32(dst, uint32(xxhash.Sum64(dst[dataOffset:])))
|
|
return append(dst, z.data...), nil
|
|
}
|
|
|
|
// UpdateObjectVersion updates metadata and modTime for a given
|
|
// versionID, NOTE: versionID must be valid and should exist -
|
|
// and must not be a DeleteMarker or legacy object, if no
|
|
// versionID is specified 'null' versionID is updated instead.
|
|
//
|
|
// It is callers responsibility to set correct versionID, this
|
|
// function shouldn't be further extended to update immutable
|
|
// values such as ErasureInfo, ChecksumInfo.
|
|
//
|
|
// Metadata is only updated to new values, existing values
|
|
// stay as is, if you wish to update all values you should
|
|
// update all metadata freshly before calling this function
|
|
// in-case you wish to clear existing metadata.
|
|
func (z *xlMetaV2) UpdateObjectVersion(fi FileInfo) error {
|
|
if fi.VersionID == "" {
|
|
// this means versioning is not yet
|
|
// enabled or suspend i.e all versions
|
|
// are basically default value i.e "null"
|
|
fi.VersionID = nullVersionID
|
|
}
|
|
|
|
var uv uuid.UUID
|
|
var err error
|
|
if fi.VersionID != "" && fi.VersionID != nullVersionID {
|
|
uv, err = uuid.Parse(fi.VersionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
for i, version := range z.Versions {
|
|
if !version.Valid() {
|
|
return errFileCorrupt
|
|
}
|
|
switch version.Type {
|
|
case LegacyType:
|
|
if version.ObjectV1.VersionID == fi.VersionID {
|
|
return errMethodNotAllowed
|
|
}
|
|
case ObjectType:
|
|
if version.ObjectV2.VersionID == uv {
|
|
for k, v := range fi.Metadata {
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
z.Versions[i].ObjectV2.MetaSys[k] = []byte(v)
|
|
} else {
|
|
z.Versions[i].ObjectV2.MetaUser[k] = v
|
|
}
|
|
}
|
|
if !fi.ModTime.IsZero() {
|
|
z.Versions[i].ObjectV2.ModTime = fi.ModTime.UnixNano()
|
|
}
|
|
return nil
|
|
}
|
|
case DeleteType:
|
|
if version.DeleteMarker.VersionID == uv {
|
|
return errMethodNotAllowed
|
|
}
|
|
}
|
|
}
|
|
|
|
return errFileVersionNotFound
|
|
}
|
|
|
|
// AddVersion adds a new version
|
|
func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
|
if fi.VersionID == "" {
|
|
// this means versioning is not yet
|
|
// enabled or suspend i.e all versions
|
|
// are basically default value i.e "null"
|
|
fi.VersionID = nullVersionID
|
|
}
|
|
|
|
var uv uuid.UUID
|
|
var err error
|
|
if fi.VersionID != "" && fi.VersionID != nullVersionID {
|
|
uv, err = uuid.Parse(fi.VersionID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
var dd uuid.UUID
|
|
if fi.DataDir != "" {
|
|
dd, err = uuid.Parse(fi.DataDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ventry := xlMetaV2Version{}
|
|
|
|
if fi.Deleted {
|
|
ventry.Type = DeleteType
|
|
ventry.DeleteMarker = &xlMetaV2DeleteMarker{
|
|
VersionID: uv,
|
|
ModTime: fi.ModTime.UnixNano(),
|
|
MetaSys: make(map[string][]byte),
|
|
}
|
|
} else {
|
|
ventry.Type = ObjectType
|
|
ventry.ObjectV2 = &xlMetaV2Object{
|
|
VersionID: uv,
|
|
DataDir: dd,
|
|
Size: fi.Size,
|
|
ModTime: fi.ModTime.UnixNano(),
|
|
ErasureAlgorithm: ReedSolomon,
|
|
ErasureM: fi.Erasure.DataBlocks,
|
|
ErasureN: fi.Erasure.ParityBlocks,
|
|
ErasureBlockSize: fi.Erasure.BlockSize,
|
|
ErasureIndex: fi.Erasure.Index,
|
|
BitrotChecksumAlgo: HighwayHash,
|
|
ErasureDist: make([]uint8, len(fi.Erasure.Distribution)),
|
|
PartNumbers: make([]int, len(fi.Parts)),
|
|
PartETags: make([]string, len(fi.Parts)),
|
|
PartSizes: make([]int64, len(fi.Parts)),
|
|
PartActualSizes: make([]int64, len(fi.Parts)),
|
|
MetaSys: make(map[string][]byte),
|
|
MetaUser: make(map[string]string, len(fi.Metadata)),
|
|
}
|
|
|
|
for i := range fi.Erasure.Distribution {
|
|
ventry.ObjectV2.ErasureDist[i] = uint8(fi.Erasure.Distribution[i])
|
|
}
|
|
|
|
for i := range fi.Parts {
|
|
ventry.ObjectV2.PartSizes[i] = fi.Parts[i].Size
|
|
if fi.Parts[i].ETag != "" {
|
|
ventry.ObjectV2.PartETags[i] = fi.Parts[i].ETag
|
|
}
|
|
ventry.ObjectV2.PartNumbers[i] = fi.Parts[i].Number
|
|
ventry.ObjectV2.PartActualSizes[i] = fi.Parts[i].ActualSize
|
|
}
|
|
|
|
tierFVIDKey := ReservedMetadataPrefixLower + tierFVID
|
|
tierFVMarkerKey := ReservedMetadataPrefixLower + tierFVMarker
|
|
for k, v := range fi.Metadata {
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
// Skip tierFVID, tierFVMarker keys; it's used
|
|
// only for creating free-version.
|
|
switch k {
|
|
case tierFVIDKey, tierFVMarkerKey:
|
|
continue
|
|
}
|
|
|
|
ventry.ObjectV2.MetaSys[k] = []byte(v)
|
|
} else {
|
|
ventry.ObjectV2.MetaUser[k] = v
|
|
}
|
|
}
|
|
|
|
// If asked to save data.
|
|
if len(fi.Data) > 0 || fi.Size == 0 {
|
|
z.data.replace(fi.VersionID, fi.Data)
|
|
}
|
|
|
|
if fi.TransitionStatus != "" {
|
|
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus)
|
|
}
|
|
if fi.TransitionedObjName != "" {
|
|
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName)
|
|
}
|
|
if fi.TransitionVersionID != "" {
|
|
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID)
|
|
}
|
|
if fi.TransitionTier != "" {
|
|
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
|
|
}
|
|
}
|
|
|
|
if !ventry.Valid() {
|
|
return errors.New("internal error: invalid version entry generated")
|
|
}
|
|
|
|
for i, version := range z.Versions {
|
|
if !version.Valid() {
|
|
return errFileCorrupt
|
|
}
|
|
switch version.Type {
|
|
case LegacyType:
|
|
// This would convert legacy type into new ObjectType
|
|
// this means that we are basically purging the `null`
|
|
// version of the object.
|
|
if version.ObjectV1.VersionID == fi.VersionID {
|
|
z.Versions[i] = ventry
|
|
return nil
|
|
}
|
|
case ObjectType:
|
|
if version.ObjectV2.VersionID == uv {
|
|
z.Versions[i] = ventry
|
|
return nil
|
|
}
|
|
case DeleteType:
|
|
// Allowing delete marker to replaced with an proper
|
|
// object data type as well, this is not S3 complaint
|
|
// behavior but kept here for future flexibility.
|
|
if version.DeleteMarker.VersionID == uv {
|
|
z.Versions[i] = ventry
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
z.Versions = append(z.Versions, ventry)
|
|
return nil
|
|
}
|
|
|
|
func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) {
|
|
versionID := ""
|
|
var uv uuid.UUID
|
|
// check if the version is not "null"
|
|
if j.VersionID != uv {
|
|
versionID = uuid.UUID(j.VersionID).String()
|
|
}
|
|
fi := FileInfo{
|
|
Volume: volume,
|
|
Name: path,
|
|
ModTime: time.Unix(0, j.ModTime).UTC(),
|
|
VersionID: versionID,
|
|
Deleted: true,
|
|
}
|
|
fi.ReplicationState = GetInternalReplicationState(j.MetaSys)
|
|
|
|
if j.FreeVersion() {
|
|
fi.SetTierFreeVersion()
|
|
fi.TransitionTier = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionTier])
|
|
fi.TransitionedObjName = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName])
|
|
fi.TransitionVersionID = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID])
|
|
}
|
|
|
|
return fi, nil
|
|
}
|
|
|
|
// UsesDataDir returns true if this object version uses its data directory for
|
|
// its contents and false otherwise.
|
|
func (j xlMetaV2Object) UsesDataDir() bool {
|
|
// Skip if this version is not transitioned, i.e it uses its data directory.
|
|
if !bytes.Equal(j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus], []byte(lifecycle.TransitionComplete)) {
|
|
return true
|
|
}
|
|
|
|
// Check if this transitioned object has been restored on disk.
|
|
return isRestoredObjectOnDisk(j.MetaUser)
|
|
}
|
|
|
|
func (j *xlMetaV2Object) SetTransition(fi FileInfo) {
|
|
j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus)
|
|
j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName)
|
|
j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID)
|
|
j.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
|
|
}
|
|
|
|
func (j *xlMetaV2Object) RemoveRestoreHdrs() {
|
|
delete(j.MetaUser, xhttp.AmzRestore)
|
|
delete(j.MetaUser, xhttp.AmzRestoreExpiryDays)
|
|
delete(j.MetaUser, xhttp.AmzRestoreRequestDate)
|
|
}
|
|
|
|
func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
|
|
versionID := ""
|
|
var uv uuid.UUID
|
|
// check if the version is not "null"
|
|
if j.VersionID != uv {
|
|
versionID = uuid.UUID(j.VersionID).String()
|
|
}
|
|
fi := FileInfo{
|
|
Volume: volume,
|
|
Name: path,
|
|
Size: j.Size,
|
|
ModTime: time.Unix(0, j.ModTime).UTC(),
|
|
VersionID: versionID,
|
|
}
|
|
fi.Parts = make([]ObjectPartInfo, len(j.PartNumbers))
|
|
for i := range fi.Parts {
|
|
fi.Parts[i].Number = j.PartNumbers[i]
|
|
fi.Parts[i].Size = j.PartSizes[i]
|
|
fi.Parts[i].ETag = j.PartETags[i]
|
|
fi.Parts[i].ActualSize = j.PartActualSizes[i]
|
|
}
|
|
fi.Erasure.Checksums = make([]ChecksumInfo, len(j.PartSizes))
|
|
for i := range fi.Parts {
|
|
fi.Erasure.Checksums[i].PartNumber = fi.Parts[i].Number
|
|
switch j.BitrotChecksumAlgo {
|
|
case HighwayHash:
|
|
fi.Erasure.Checksums[i].Algorithm = HighwayHash256S
|
|
fi.Erasure.Checksums[i].Hash = []byte{}
|
|
default:
|
|
return FileInfo{}, fmt.Errorf("unknown BitrotChecksumAlgo: %v", j.BitrotChecksumAlgo)
|
|
}
|
|
}
|
|
fi.Metadata = make(map[string]string, len(j.MetaUser)+len(j.MetaSys))
|
|
for k, v := range j.MetaUser {
|
|
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
|
|
if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) {
|
|
continue
|
|
}
|
|
|
|
fi.Metadata[k] = v
|
|
}
|
|
for k, v := range j.MetaSys {
|
|
switch {
|
|
case strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower), equals(k, VersionPurgeStatusKey):
|
|
fi.Metadata[k] = string(v)
|
|
}
|
|
}
|
|
fi.ReplicationState = getInternalReplicationState(fi.Metadata)
|
|
replStatus := fi.ReplicationState.CompositeReplicationStatus()
|
|
if replStatus != "" {
|
|
fi.Metadata[xhttp.AmzBucketReplicationStatus] = string(replStatus)
|
|
}
|
|
fi.Erasure.Algorithm = j.ErasureAlgorithm.String()
|
|
fi.Erasure.Index = j.ErasureIndex
|
|
fi.Erasure.BlockSize = j.ErasureBlockSize
|
|
fi.Erasure.DataBlocks = j.ErasureM
|
|
fi.Erasure.ParityBlocks = j.ErasureN
|
|
fi.Erasure.Distribution = make([]int, len(j.ErasureDist))
|
|
for i := range j.ErasureDist {
|
|
fi.Erasure.Distribution[i] = int(j.ErasureDist[i])
|
|
}
|
|
fi.DataDir = uuid.UUID(j.DataDir).String()
|
|
|
|
if st, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok {
|
|
fi.TransitionStatus = string(st)
|
|
}
|
|
if o, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName]; ok {
|
|
fi.TransitionedObjName = string(o)
|
|
}
|
|
if rv, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID]; ok {
|
|
fi.TransitionVersionID = string(rv)
|
|
}
|
|
if sc, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionTier]; ok {
|
|
fi.TransitionTier = string(sc)
|
|
}
|
|
return fi, nil
|
|
}
|
|
|
|
func (z *xlMetaV2) SharedDataDirCountStr(versionID, dataDir string) int {
|
|
var (
|
|
uv uuid.UUID
|
|
ddir uuid.UUID
|
|
err error
|
|
)
|
|
if versionID == nullVersionID {
|
|
versionID = ""
|
|
}
|
|
if versionID != "" {
|
|
uv, err = uuid.Parse(versionID)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
}
|
|
ddir, err = uuid.Parse(dataDir)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return z.SharedDataDirCount(uv, ddir)
|
|
}
|
|
|
|
func (z *xlMetaV2) SharedDataDirCount(versionID [16]byte, dataDir [16]byte) int {
|
|
// v2 object is inlined, if it is skip dataDir share check.
|
|
if z.data.find(uuid.UUID(versionID).String()) != nil {
|
|
return 0
|
|
}
|
|
var sameDataDirCount int
|
|
for _, version := range z.Versions {
|
|
switch version.Type {
|
|
case ObjectType:
|
|
if version.ObjectV2.VersionID == versionID {
|
|
continue
|
|
}
|
|
if version.ObjectV2.DataDir != dataDir {
|
|
continue
|
|
}
|
|
if version.ObjectV2.UsesDataDir() {
|
|
sameDataDirCount++
|
|
}
|
|
}
|
|
}
|
|
return sameDataDirCount
|
|
}
|
|
|
|
// DeleteVersion deletes the version specified by version id.
|
|
// returns to the caller which dataDir to delete, also
|
|
// indicates if this is the last version.
|
|
func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
|
|
// This is a situation where versionId is explicitly
|
|
// specified as "null", as we do not save "null"
|
|
// string it is considered empty. But empty also
|
|
// means the version which matches will be purged.
|
|
if fi.VersionID == nullVersionID {
|
|
fi.VersionID = ""
|
|
}
|
|
|
|
var uv uuid.UUID
|
|
var err error
|
|
if fi.VersionID != "" {
|
|
uv, err = uuid.Parse(fi.VersionID)
|
|
if err != nil {
|
|
return "", false, errFileVersionNotFound
|
|
}
|
|
}
|
|
|
|
var ventry xlMetaV2Version
|
|
if fi.Deleted {
|
|
ventry = xlMetaV2Version{
|
|
Type: DeleteType,
|
|
DeleteMarker: &xlMetaV2DeleteMarker{
|
|
VersionID: uv,
|
|
ModTime: fi.ModTime.UnixNano(),
|
|
MetaSys: make(map[string][]byte),
|
|
},
|
|
}
|
|
if !ventry.Valid() {
|
|
return "", false, errors.New("internal error: invalid version entry generated")
|
|
}
|
|
}
|
|
updateVersion := false
|
|
if fi.VersionPurgeStatus().Empty() && (fi.DeleteMarkerReplicationStatus() == "REPLICA" || fi.DeleteMarkerReplicationStatus().Empty()) {
|
|
updateVersion = fi.MarkDeleted
|
|
} else {
|
|
// for replication scenario
|
|
if fi.Deleted && fi.VersionPurgeStatus() != Complete {
|
|
if !fi.VersionPurgeStatus().Empty() || fi.DeleteMarkerReplicationStatus().Empty() {
|
|
updateVersion = true
|
|
}
|
|
}
|
|
// object or delete-marker versioned delete is not complete
|
|
if !fi.VersionPurgeStatus().Empty() && fi.VersionPurgeStatus() != Complete {
|
|
updateVersion = true
|
|
}
|
|
}
|
|
|
|
if fi.Deleted {
|
|
if !fi.DeleteMarkerReplicationStatus().Empty() {
|
|
switch fi.DeleteMarkerReplicationStatus() {
|
|
case replication.Replica:
|
|
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
|
|
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(http.TimeFormat))
|
|
default:
|
|
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
|
|
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(http.TimeFormat))
|
|
}
|
|
}
|
|
if !fi.VersionPurgeStatus().Empty() {
|
|
ventry.DeleteMarker.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
|
|
}
|
|
for k, v := range fi.ReplicationState.ResetStatusesMap {
|
|
ventry.DeleteMarker.MetaSys[k] = []byte(v)
|
|
}
|
|
}
|
|
|
|
for i, version := range z.Versions {
|
|
if !version.Valid() {
|
|
return "", false, errFileCorrupt
|
|
}
|
|
switch version.Type {
|
|
case LegacyType:
|
|
if version.ObjectV1.VersionID == fi.VersionID {
|
|
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
|
|
if fi.Deleted {
|
|
z.Versions = append(z.Versions, ventry)
|
|
}
|
|
return version.ObjectV1.DataDir, len(z.Versions) == 0, nil
|
|
}
|
|
case DeleteType:
|
|
if version.DeleteMarker.VersionID == uv {
|
|
if updateVersion {
|
|
if len(z.Versions[i].DeleteMarker.MetaSys) == 0 {
|
|
z.Versions[i].DeleteMarker.MetaSys = make(map[string][]byte)
|
|
}
|
|
if !fi.DeleteMarkerReplicationStatus().Empty() {
|
|
switch fi.DeleteMarkerReplicationStatus() {
|
|
case replication.Replica:
|
|
z.Versions[i].DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
|
|
z.Versions[i].DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(http.TimeFormat))
|
|
default:
|
|
z.Versions[i].DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
|
|
z.Versions[i].DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(http.TimeFormat))
|
|
}
|
|
}
|
|
if !fi.VersionPurgeStatus().Empty() {
|
|
z.Versions[i].DeleteMarker.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
|
|
}
|
|
for k, v := range fi.ReplicationState.ResetStatusesMap {
|
|
z.Versions[i].DeleteMarker.MetaSys[k] = []byte(v)
|
|
}
|
|
} else {
|
|
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
|
|
if fi.MarkDeleted && (fi.VersionPurgeStatus().Empty() || (fi.VersionPurgeStatus() != Complete)) {
|
|
z.Versions = append(z.Versions, ventry)
|
|
}
|
|
}
|
|
return "", len(z.Versions) == 0, nil
|
|
}
|
|
case ObjectType:
|
|
if version.ObjectV2.VersionID == uv && updateVersion {
|
|
z.Versions[i].ObjectV2.MetaSys[VersionPurgeStatusKey] = []byte(fi.ReplicationState.VersionPurgeStatusInternal)
|
|
for k, v := range fi.ReplicationState.ResetStatusesMap {
|
|
z.Versions[i].ObjectV2.MetaSys[k] = []byte(v)
|
|
}
|
|
return "", len(z.Versions) == 0, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
for i, version := range z.Versions {
|
|
if !version.Valid() {
|
|
return "", false, errFileCorrupt
|
|
}
|
|
switch version.Type {
|
|
case ObjectType:
|
|
if version.ObjectV2.VersionID == uv {
|
|
switch {
|
|
case fi.ExpireRestored:
|
|
z.Versions[i].ObjectV2.RemoveRestoreHdrs()
|
|
|
|
case fi.TransitionStatus == lifecycle.TransitionComplete:
|
|
z.Versions[i].ObjectV2.SetTransition(fi)
|
|
|
|
default:
|
|
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
|
|
// if uv has tiered content we add a
|
|
// free-version to track it for
|
|
// asynchronous deletion via scanner.
|
|
if freeVersion, toFree := version.ObjectV2.InitFreeVersion(fi); toFree {
|
|
z.Versions = append(z.Versions, freeVersion)
|
|
}
|
|
}
|
|
|
|
if fi.Deleted {
|
|
z.Versions = append(z.Versions, ventry)
|
|
}
|
|
if z.SharedDataDirCount(version.ObjectV2.VersionID, version.ObjectV2.DataDir) > 0 {
|
|
// Found that another version references the same dataDir
|
|
// we shouldn't remove it, and only remove the version instead
|
|
return "", len(z.Versions) == 0, nil
|
|
}
|
|
return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
if fi.Deleted {
|
|
z.Versions = append(z.Versions, ventry)
|
|
return "", false, nil
|
|
}
|
|
return "", false, errFileVersionNotFound
|
|
}
|
|
|
|
// TotalSize returns the total size of all versions.
|
|
func (z xlMetaV2) TotalSize() int64 {
|
|
var total int64
|
|
for i := range z.Versions {
|
|
switch z.Versions[i].Type {
|
|
case ObjectType:
|
|
total += z.Versions[i].ObjectV2.Size
|
|
case LegacyType:
|
|
total += z.Versions[i].ObjectV1.Stat.Size
|
|
}
|
|
}
|
|
return total
|
|
}
|
|
|
|
// ListVersions lists current versions, and current deleted
|
|
// versions returns error for unexpected entries.
|
|
// showPendingDeletes is set to true if ListVersions needs to list objects marked deleted
|
|
// but waiting to be replicated
|
|
func (z xlMetaV2) ListVersions(volume, path string) ([]FileInfo, time.Time, error) {
|
|
versions := make([]FileInfo, 0, len(z.Versions))
|
|
var err error
|
|
|
|
for _, version := range z.Versions {
|
|
if !version.Valid() {
|
|
return nil, time.Time{}, errFileCorrupt
|
|
}
|
|
var fi FileInfo
|
|
switch version.Type {
|
|
case ObjectType:
|
|
fi, err = version.ObjectV2.ToFileInfo(volume, path)
|
|
case DeleteType:
|
|
fi, err = version.DeleteMarker.ToFileInfo(volume, path)
|
|
case LegacyType:
|
|
fi, err = version.ObjectV1.ToFileInfo(volume, path)
|
|
}
|
|
if err != nil {
|
|
return nil, time.Time{}, err
|
|
}
|
|
versions = append(versions, fi)
|
|
}
|
|
|
|
versionsSorter(versions).sort()
|
|
|
|
for i := range versions {
|
|
versions[i].NumVersions = len(versions)
|
|
if i > 0 {
|
|
versions[i].SuccessorModTime = versions[i-1].ModTime
|
|
}
|
|
}
|
|
|
|
versions[0].IsLatest = true
|
|
return versions, versions[0].ModTime, nil
|
|
}
|
|
|
|
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
|
|
// for consumption across callers.
|
|
func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) {
|
|
var uv uuid.UUID
|
|
if versionID != "" && versionID != nullVersionID {
|
|
uv, err = uuid.Parse(versionID)
|
|
if err != nil {
|
|
logger.LogIf(GlobalContext, fmt.Errorf("invalid versionID specified %s", versionID))
|
|
return FileInfo{}, errFileVersionNotFound
|
|
}
|
|
}
|
|
|
|
orderedVersions := make([]xlMetaV2Version, 0, len(z.Versions))
|
|
for _, version := range z.Versions {
|
|
if !version.Valid() {
|
|
logger.LogIf(GlobalContext, fmt.Errorf("invalid version detected %#v", version))
|
|
if versionID == "" {
|
|
return FileInfo{}, errFileNotFound
|
|
}
|
|
return FileInfo{}, errFileVersionNotFound
|
|
|
|
}
|
|
// skip listing free-version unless explicitly requested via versionID
|
|
if version.FreeVersion() && version.DeleteMarker.VersionID != uv {
|
|
continue
|
|
}
|
|
orderedVersions = append(orderedVersions, version)
|
|
}
|
|
|
|
if len(orderedVersions) > 1 {
|
|
sort.Slice(orderedVersions, func(i, j int) bool {
|
|
return orderedVersions[i].getModTime().After(orderedVersions[j].getModTime())
|
|
})
|
|
}
|
|
|
|
if versionID == "" {
|
|
if len(orderedVersions) >= 1 {
|
|
switch orderedVersions[0].Type {
|
|
case ObjectType:
|
|
fi, err = orderedVersions[0].ObjectV2.ToFileInfo(volume, path)
|
|
case DeleteType:
|
|
fi, err = orderedVersions[0].DeleteMarker.ToFileInfo(volume, path)
|
|
case LegacyType:
|
|
fi, err = orderedVersions[0].ObjectV1.ToFileInfo(volume, path)
|
|
}
|
|
fi.IsLatest = true
|
|
fi.NumVersions = len(orderedVersions)
|
|
return fi, err
|
|
}
|
|
return FileInfo{}, errFileNotFound
|
|
}
|
|
|
|
var foundIndex = -1
|
|
|
|
for i := range orderedVersions {
|
|
switch orderedVersions[i].Type {
|
|
case ObjectType:
|
|
if orderedVersions[i].ObjectV2.VersionID == uv {
|
|
fi, err = orderedVersions[i].ObjectV2.ToFileInfo(volume, path)
|
|
foundIndex = i
|
|
break
|
|
}
|
|
case LegacyType:
|
|
if orderedVersions[i].ObjectV1.VersionID == versionID {
|
|
fi, err = orderedVersions[i].ObjectV1.ToFileInfo(volume, path)
|
|
foundIndex = i
|
|
break
|
|
}
|
|
case DeleteType:
|
|
if orderedVersions[i].DeleteMarker.VersionID == uv {
|
|
fi, err = orderedVersions[i].DeleteMarker.ToFileInfo(volume, path)
|
|
foundIndex = i
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if err != nil {
|
|
return fi, err
|
|
}
|
|
|
|
if foundIndex >= 0 {
|
|
// A version is found, fill dynamic fields
|
|
fi.IsLatest = foundIndex == 0
|
|
fi.NumVersions = len(z.Versions)
|
|
if foundIndex > 0 {
|
|
fi.SuccessorModTime = orderedVersions[foundIndex-1].getModTime()
|
|
}
|
|
return fi, nil
|
|
}
|
|
|
|
if versionID == "" {
|
|
return FileInfo{}, errFileNotFound
|
|
}
|
|
|
|
return FileInfo{}, errFileVersionNotFound
|
|
}
|
|
|
|
// Read at most this much on initial read.
|
|
const metaDataReadDefault = 4 << 10
|
|
|
|
// Return used metadata byte slices here.
|
|
var metaDataPool = sync.Pool{New: func() interface{} { return make([]byte, 0, metaDataReadDefault) }}
|
|
|
|
// metaDataPoolGet will return a byte slice with capacity at least metaDataReadDefault.
|
|
// It will be length 0.
|
|
func metaDataPoolGet() []byte {
|
|
return metaDataPool.Get().([]byte)[:0]
|
|
}
|
|
|
|
// metaDataPoolPut will put an unused small buffer back into the pool.
|
|
func metaDataPoolPut(buf []byte) {
|
|
if cap(buf) >= metaDataReadDefault && cap(buf) < metaDataReadDefault*4 {
|
|
metaDataPool.Put(buf)
|
|
}
|
|
}
|
|
|
|
// readXLMetaNoData will load the metadata, but skip data segments.
|
|
// This should only be used when data is never interesting.
|
|
// If data is not xlv2, it is returned in full.
|
|
func readXLMetaNoData(r io.Reader, size int64) ([]byte, error) {
|
|
initial := size
|
|
hasFull := true
|
|
if initial > metaDataReadDefault {
|
|
initial = metaDataReadDefault
|
|
hasFull = false
|
|
}
|
|
|
|
buf := metaDataPoolGet()[:initial]
|
|
_, err := io.ReadFull(r, buf)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("readXLMetaNoData.ReadFull: %w", err)
|
|
}
|
|
readMore := func(n int64) error {
|
|
has := int64(len(buf))
|
|
if has >= n {
|
|
return nil
|
|
}
|
|
if hasFull || n > size {
|
|
return io.ErrUnexpectedEOF
|
|
}
|
|
extra := n - has
|
|
buf = append(buf, make([]byte, extra)...)
|
|
_, err := io.ReadFull(r, buf[has:])
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
// Returned if we read nothing.
|
|
return fmt.Errorf("readXLMetaNoData.readMore: %w", io.ErrUnexpectedEOF)
|
|
}
|
|
return fmt.Errorf("readXLMetaNoData.readMore: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
tmp, major, minor, err := checkXL2V1(buf)
|
|
if err != nil {
|
|
err = readMore(size)
|
|
return buf, err
|
|
}
|
|
switch major {
|
|
case 1:
|
|
switch minor {
|
|
case 0:
|
|
err = readMore(size)
|
|
return buf, err
|
|
case 1, 2:
|
|
sz, tmp, err := msgp.ReadBytesHeader(tmp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
want := int64(sz) + int64(len(buf)-len(tmp))
|
|
|
|
// v1.1 does not have CRC.
|
|
if minor < 2 {
|
|
if err := readMore(want); err != nil {
|
|
return nil, err
|
|
}
|
|
return buf[:want], nil
|
|
}
|
|
|
|
// CRC is variable length, so we need to truncate exactly that.
|
|
wantMax := want + msgp.Uint32Size
|
|
if wantMax > size {
|
|
wantMax = size
|
|
}
|
|
if err := readMore(wantMax); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tmp = buf[want:]
|
|
_, after, err := msgp.ReadUint32Bytes(tmp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
want += int64(len(tmp) - len(after))
|
|
|
|
return buf[:want], err
|
|
|
|
default:
|
|
return nil, errors.New("unknown minor metadata version")
|
|
}
|
|
default:
|
|
return nil, errors.New("unknown major metadata version")
|
|
}
|
|
}
|