[Tiering] Support remote tiers with object versioning (#12342)

- Adds versioning support for S3 based remote tiers that have versioning
enabled. This ensures that when reading or deleting we specify the specific
version ID of the object. In case of deletion, this is important to ensure that
the object version is actually deleted instead of simply being marked for
deletion.

- Stores the remote object's version id in the tier-journal. Tier-journal file
version is not bumped up as serializing the new struct version is
compatible with old journals without the remote object version id.

- `storageRESTVersion` is bumped up as FileInfo struct now includes a
`TransitionRemoteVersionID` member.

- Azure and GCS support for this feature will be added subsequently.

Co-authored-by: Krishnan Parthasarathi <krisis@users.noreply.github.com>
This commit is contained in:
Aditya Manthramurthy 2021-06-03 14:26:51 -07:00 committed by GitHub
parent 41d4d650e4
commit 30a3921d3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 536 additions and 70 deletions

View File

@ -46,6 +46,8 @@ const (
TransitionStatus = "transition-status"
// TransitionedObjectName name of transitioned object
TransitionedObjectName = "transitioned-object"
// TransitionedVersionID is version of remote object
TransitionedVersionID = "transitioned-versionID"
// TransitionTier name of transition storage class
TransitionTier = "transition-tier"
)
@ -208,41 +210,46 @@ const (
//
// 1. when a restored (via PostRestoreObject API) object expires.
// 2. when a transitioned object expires (based on an ILM rule).
func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, remoteObject, tier string, action expireAction) error {
func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, action expireAction) error {
var opts ObjectOptions
opts.Versioned = globalBucketVersioningSys.Enabled(bucket)
opts.Versioned = globalBucketVersioningSys.Enabled(oi.Bucket)
opts.VersionID = lcOpts.VersionID
switch action {
case expireObj:
// When an object is past expiry or when a transitioned object is being
// deleted, 'mark' the data in the remote tier for delete.
if err := globalTierJournal.AddEntry(jentry{ObjName: remoteObject, TierName: tier}); err != nil {
entry := jentry{
ObjName: oi.transitionedObjName,
VersionID: oi.transitionVersionID,
TierName: oi.TransitionTier,
}
if err := globalTierJournal.AddEntry(entry); err != nil {
logger.LogIf(ctx, err)
return err
}
// Delete metadata on source, now that data in remote tier has been
// marked for deletion.
if _, err := objectAPI.DeleteObject(ctx, bucket, object, opts); err != nil {
if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil {
logger.LogIf(ctx, err)
return err
}
// Send audit for the lifecycle delete operation
auditLogLifecycle(ctx, bucket, object)
auditLogLifecycle(ctx, oi.Bucket, oi.Name)
eventName := event.ObjectRemovedDelete
if lcOpts.DeleteMarker {
eventName = event.ObjectRemovedDeleteMarkerCreated
}
objInfo := ObjectInfo{
Name: object,
Name: oi.Name,
VersionID: lcOpts.VersionID,
DeleteMarker: lcOpts.DeleteMarker,
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
BucketName: oi.Bucket,
Object: objInfo,
Host: "Internal: [ILM-EXPIRY]",
})
@ -252,7 +259,7 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket
// from the source, while leaving metadata behind. The data on
// transitioned tier lies untouched and still accessible
opts.Transition.ExpireRestored = true
_, err := objectAPI.DeleteObject(ctx, bucket, object, opts)
_, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts)
return err
default:
return fmt.Errorf("Unknown expire action %v", action)
@ -286,11 +293,12 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo)
UserTags: oi.UserTags,
}
tierName := getLifeCycleTransitionTier(ctx, lc, oi.Bucket, lcOpts)
opts := ObjectOptions{Transition: TransitionOptions{
Status: lifecycle.TransitionPending,
Tier: tierName,
ETag: oi.ETag,
},
opts := ObjectOptions{
Transition: TransitionOptions{
Status: lifecycle.TransitionPending,
Tier: tierName,
ETag: oi.ETag,
},
VersionID: oi.VersionID,
Versioned: globalBucketVersioningSys.Enabled(oi.Bucket),
MTime: oi.ModTime,
@ -327,7 +335,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs
gopts.length = length
}
reader, err := tgtClient.Get(ctx, oi.transitionedObjName, gopts)
reader, err := tgtClient.Get(ctx, oi.transitionedObjName, remoteVersionID(oi.transitionVersionID), gopts)
if err != nil {
return nil, err
}

View File

@ -1068,7 +1068,7 @@ func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer,
if restoredObject {
action = expireRestoredObj
}
if err := expireTransitionedObject(ctx, objLayer, obj.Bucket, obj.Name, lcOpts, obj.transitionedObjName, obj.TransitionTier, action); err != nil {
if err := expireTransitionedObject(ctx, objLayer, &obj, lcOpts, action); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return false
}

View File

@ -155,6 +155,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
objInfo.TransitionStatus = fi.TransitionStatus
objInfo.transitionedObjName = fi.TransitionedObjName
objInfo.transitionVersionID = fi.TransitionVersionID
objInfo.TransitionTier = fi.TransitionTier
// etag/md5Sum has already been extracted. We need to

View File

@ -1348,7 +1348,8 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
pw.CloseWithError(err)
}()
err = tgtClient.Put(ctx, destObj, pr, fi.Size)
var rv remoteVersionID
rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size)
pr.CloseWithError(err)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err))
@ -1357,6 +1358,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
fi.TransitionStatus = lifecycle.TransitionComplete
fi.TransitionedObjName = destObj
fi.TransitionTier = opts.Transition.Tier
fi.TransitionVersionID = string(rv)
eventName := event.ObjectTransitionComplete
storageDisks := er.getDisks()

View File

@ -117,6 +117,8 @@ type ObjectInfo struct {
TransitionStatus string
// Name of transitioned object on remote tier
transitionedObjName string
// VersionID on the the remote tier
transitionVersionID string
// Name of remote tier object has transitioned to
TransitionTier string

View File

@ -138,6 +138,9 @@ type FileInfo struct {
TransitionedObjName string
// TransitionTier is the storage class label assigned to remote tier.
TransitionTier string
// TransitionVersionID stores a version ID of the object associate
// with the remote tier.
TransitionVersionID string
// ExpireRestored indicates that the restored object is to be expired.
ExpireRestored bool

View File

@ -550,8 +550,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 23 {
err = msgp.ArrayError{Wanted: 23, Got: zb0001}
if zb0001 != 24 {
err = msgp.ArrayError{Wanted: 24, Got: zb0001}
return
}
z.Volume, err = dc.ReadString()
@ -594,6 +594,11 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "TransitionTier")
return
}
z.TransitionVersionID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "TransitionVersionID")
return
}
z.ExpireRestored, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "ExpireRestored")
@ -715,8 +720,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 23
err = en.Append(0xdc, 0x0, 0x17)
// array header, size 24
err = en.Append(0xdc, 0x0, 0x18)
if err != nil {
return
}
@ -760,6 +765,11 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "TransitionTier")
return
}
err = en.WriteString(z.TransitionVersionID)
if err != nil {
err = msgp.WrapError(err, "TransitionVersionID")
return
}
err = en.WriteBool(z.ExpireRestored)
if err != nil {
err = msgp.WrapError(err, "ExpireRestored")
@ -860,8 +870,8 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 23
o = append(o, 0xdc, 0x0, 0x17)
// array header, size 24
o = append(o, 0xdc, 0x0, 0x18)
o = msgp.AppendString(o, z.Volume)
o = msgp.AppendString(o, z.Name)
o = msgp.AppendString(o, z.VersionID)
@ -870,6 +880,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendString(o, z.TransitionStatus)
o = msgp.AppendString(o, z.TransitionedObjName)
o = msgp.AppendString(o, z.TransitionTier)
o = msgp.AppendString(o, z.TransitionVersionID)
o = msgp.AppendBool(o, z.ExpireRestored)
o = msgp.AppendString(o, z.DataDir)
o = msgp.AppendBool(o, z.XLV1)
@ -911,8 +922,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 23 {
err = msgp.ArrayError{Wanted: 23, Got: zb0001}
if zb0001 != 24 {
err = msgp.ArrayError{Wanted: 24, Got: zb0001}
return
}
z.Volume, bts, err = msgp.ReadStringBytes(bts)
@ -955,6 +966,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "TransitionTier")
return
}
z.TransitionVersionID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "TransitionVersionID")
return
}
z.ExpireRestored, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ExpireRestored")
@ -1077,7 +1093,7 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *FileInfo) Msgsize() (s int) {
s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.TransitionedObjName) + msgp.StringPrefixSize + len(z.TransitionTier) + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize
s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.TransitionedObjName) + msgp.StringPrefixSize + len(z.TransitionTier) + msgp.StringPrefixSize + len(z.TransitionVersionID) + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize
if z.Metadata != nil {
for za0001, za0002 := range z.Metadata {
_ = za0002

View File

@ -18,7 +18,7 @@
package cmd
const (
storageRESTVersion = "v35" // Inline bugfix needs all servers to be updated
storageRESTVersion = "v36" // Changes to FileInfo for tier-journal
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)

View File

@ -42,8 +42,9 @@ type tierJournal struct {
}
type jentry struct {
ObjName string `msg:"obj"`
TierName string `msg:"tier"`
ObjName string `msg:"obj"`
VersionID string `msg:"vid"`
TierName string `msg:"tier"`
}
const (
@ -51,6 +52,10 @@ const (
tierJournalHdrLen = 2 // 2 bytes
)
var (
errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version")
)
func initTierDeletionJournal(done <-chan struct{}) (*tierJournal, error) {
diskPath := globalEndpoints.FirstLocalDiskPath()
j := &tierJournal{
@ -84,7 +89,7 @@ func (j *tierJournal) rotate() error {
return j.Open()
}
type walkFn func(objName, tierName string) error
type walkFn func(objName, rvID, tierName string) error
func (j *tierJournal) ReadOnlyPath() string {
return filepath.Join(j.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin")
@ -111,6 +116,7 @@ func (j *tierJournal) WalkEntries(fn walkFn) {
}
defer ro.Close()
mr := msgp.NewReader(ro)
done := false
for {
var entry jentry
@ -123,9 +129,11 @@ func (j *tierJournal) WalkEntries(fn walkFn) {
logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to decode journal entry %s", err))
break
}
err = fn(entry.ObjName, entry.TierName)
err = fn(entry.ObjName, entry.VersionID, entry.TierName)
if err != nil && !isErrObjectNotFound(err) {
logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err))
// We add the entry into the active journal to try again
// later.
j.AddEntry(entry)
}
}
@ -134,12 +142,12 @@ func (j *tierJournal) WalkEntries(fn walkFn) {
}
}
func deleteObjectFromRemoteTier(objName, tierName string) error {
func deleteObjectFromRemoteTier(objName, rvID, tierName string) error {
w, err := globalTierConfigMgr.getDriver(tierName)
if err != nil {
return err
}
err = w.Remove(context.Background(), objName)
err = w.Remove(context.Background(), objName, remoteVersionID(rvID))
if err != nil {
return err
}
@ -263,8 +271,15 @@ func (j *tierJournal) OpenRO() (io.ReadCloser, error) {
switch binary.LittleEndian.Uint16(data[:]) {
case tierJournalVersion:
return file, nil
default:
return nil, errors.New("unsupported pending deletes journal version")
return nil, errUnsupportedJournalVersion
}
return file, nil
}
// jentryV1 represents the entry in the journal before RemoteVersionID was
// added. It remains here for use in tests for the struct element addition.
type jentryV1 struct {
ObjName string `msg:"obj"`
TierName string `msg:"tier"`
}

View File

@ -8,6 +8,159 @@ import (
// DecodeMsg implements msgp.Decodable
func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "obj":
z.ObjName, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
case "vid":
z.VersionID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
case "tier":
z.TierName, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z jentry) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "obj"
err = en.Append(0x83, 0xa3, 0x6f, 0x62, 0x6a)
if err != nil {
return
}
err = en.WriteString(z.ObjName)
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
// write "vid"
err = en.Append(0xa3, 0x76, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteString(z.VersionID)
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
// write "tier"
err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72)
if err != nil {
return
}
err = en.WriteString(z.TierName)
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z jentry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "obj"
o = append(o, 0x83, 0xa3, 0x6f, 0x62, 0x6a)
o = msgp.AppendString(o, z.ObjName)
// string "vid"
o = append(o, 0xa3, 0x76, 0x69, 0x64)
o = msgp.AppendString(o, z.VersionID)
// string "tier"
o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72)
o = msgp.AppendString(o, z.TierName)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "obj":
z.ObjName, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
case "vid":
z.VersionID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
case "tier":
z.TierName, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z jentry) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 4 + msgp.StringPrefixSize + len(z.VersionID) + 5 + msgp.StringPrefixSize + len(z.TierName)
return
}
// DecodeMsg implements msgp.Decodable
func (z *jentryV1) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
@ -48,7 +201,7 @@ func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) {
}
// EncodeMsg implements msgp.Encodable
func (z jentry) EncodeMsg(en *msgp.Writer) (err error) {
func (z jentryV1) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "obj"
err = en.Append(0x82, 0xa3, 0x6f, 0x62, 0x6a)
@ -74,7 +227,7 @@ func (z jentry) EncodeMsg(en *msgp.Writer) (err error) {
}
// MarshalMsg implements msgp.Marshaler
func (z jentry) MarshalMsg(b []byte) (o []byte, err error) {
func (z jentryV1) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "obj"
@ -87,7 +240,7 @@ func (z jentry) MarshalMsg(b []byte) (o []byte, err error) {
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) {
func (z *jentryV1) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
@ -129,7 +282,7 @@ func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z jentry) Msgsize() (s int) {
func (z jentryV1) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 5 + msgp.StringPrefixSize + len(z.TierName)
return
}

View File

@ -121,3 +121,116 @@ func BenchmarkDecodejentry(b *testing.B) {
}
}
}
func TestMarshalUnmarshaljentryV1(t *testing.T) {
v := jentryV1{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgjentryV1(b *testing.B) {
v := jentryV1{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgjentryV1(b *testing.B) {
v := jentryV1{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaljentryV1(b *testing.B) {
v := jentryV1{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodejentryV1(t *testing.T) {
v := jentryV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodejentryV1 Msgsize() is inaccurate")
}
vn := jentryV1{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodejentryV1(b *testing.B) {
v := jentryV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodejentryV1(b *testing.B) {
v := jentryV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

121
cmd/tier-journal_test.go Normal file
View File

@ -0,0 +1,121 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
// TestJEntryReadOldToNew1 - tests that adding the RemoteVersionID parameter to the
// jentry struct does not cause unexpected errors when reading the serialized
// old version into new version.
func TestJEntryReadOldToNew1(t *testing.T) {
readOldToNewCases := []struct {
je jentryV1
exp jentry
}{
{jentryV1{"obj1", "tier1"}, jentry{"obj1", "", "tier1"}},
{jentryV1{"obj1", ""}, jentry{"obj1", "", ""}},
{jentryV1{"", "tier1"}, jentry{"", "", "tier1"}},
{jentryV1{"", ""}, jentry{"", "", ""}},
}
var b bytes.Buffer
for _, item := range readOldToNewCases {
bs, err := item.je.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
b.Write(bs)
}
mr := msgp.NewReader(&b)
for i, item := range readOldToNewCases {
var je jentry
err := je.DecodeMsg(mr)
if err != nil {
t.Fatal(err)
}
if je != item.exp {
t.Errorf("Case %d: Expected: %v Got: %v", i, item.exp, je)
}
}
}
// TestJEntryWriteNewToOldMix1 - tests that adding the RemoteVersionID parameter
// to the jentry struct does not cause unexpected errors when writing. This
// simulates the case when the active journal has entries in the older version
// struct and due to errors new entries are added in the new version of the
// struct.
func TestJEntryWriteNewToOldMix1(t *testing.T) {
oldStructVals := []jentryV1{
{"obj1", "tier1"},
{"obj2", "tier2"},
{"obj3", "tier3"},
}
newStructVals := []jentry{
{"obj4", "", "tier1"},
{"obj5", "ver2", "tier2"},
{"obj6", "", "tier3"},
}
// Write old struct version values followed by new version values.
var b bytes.Buffer
for _, item := range oldStructVals {
bs, err := item.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
b.Write(bs)
}
for _, item := range newStructVals {
bs, err := item.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
b.Write(bs)
}
// Read into new struct version and check.
mr := msgp.NewReader(&b)
for i := 0; i < len(oldStructVals)+len(newStructVals); i++ {
var je jentry
err := je.DecodeMsg(mr)
if err != nil {
t.Fatal(err)
}
var expectedJe jentry
if i < len(oldStructVals) {
// For old struct values, the RemoteVersionID will be
// empty
expectedJe = jentry{
ObjName: oldStructVals[i].ObjName,
VersionID: "",
TierName: oldStructVals[i].TierName,
}
} else {
expectedJe = newStructVals[i-len(oldStructVals)]
}
if expectedJe != je {
t.Errorf("Case %d: Expected: %v, Got: %v", i, expectedJe, je)
}
}
}

View File

@ -41,14 +41,15 @@ import (
// logger.LogIf(ctx, err)
// }
type objSweeper struct {
Object string
Bucket string
ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs
Versioned bool
Suspended bool
TransitionStatus string
TransitionTier string
RemoteObject string
Object string
Bucket string
ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs
Versioned bool
Suspended bool
TransitionStatus string
TransitionTier string
TransitionVersionID string
RemoteObject string
}
// newObjSweeper returns an objSweeper for a given bucket and object.
@ -116,6 +117,7 @@ func (os *objSweeper) SetTransitionState(info ObjectInfo) {
os.TransitionTier = info.TransitionTier
os.TransitionStatus = info.TransitionStatus
os.RemoteObject = info.transitionedObjName
os.TransitionVersionID = info.transitionVersionID
}
// shouldRemoveRemoteObject determines if a transitioned object should be
@ -142,7 +144,11 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) {
delTier = true
}
if delTier {
return jentry{ObjName: os.RemoteObject, TierName: os.TransitionTier}, true
return jentry{
ObjName: os.RemoteObject,
VersionID: os.TransitionVersionID,
TierName: os.TransitionTier,
}, true
}
return jentry{}, false
}

View File

@ -53,19 +53,23 @@ func (az *warmBackendAzure) tier() azblob.AccessTierType {
}
return azblob.AccessTierType("")
}
func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) error {
// FIXME: add support for remote version ID in Azure remote tier and remove
// this. Currently it's a no-op.
func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
blobURL := az.serviceURL.NewContainerURL(az.Bucket).NewBlockBlobURL(az.getDest(object))
// set tier if specified -
if az.StorageClass != "" {
if _, err := blobURL.SetTier(ctx, az.tier(), azblob.LeaseAccessConditions{}); err != nil {
return azureToObjectError(err, az.Bucket, object)
return "", azureToObjectError(err, az.Bucket, object)
}
}
_, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{})
return azureToObjectError(err, az.Bucket, object)
res, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{})
return remoteVersionID(res.Version()), azureToObjectError(err, az.Bucket, object)
}
func (az *warmBackendAzure) Get(ctx context.Context, object string, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
func (az *warmBackendAzure) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
if opts.startOffset < 0 {
return nil, InvalidRange{}
}
@ -79,7 +83,7 @@ func (az *warmBackendAzure) Get(ctx context.Context, object string, opts WarmBac
return rc, nil
}
func (az *warmBackendAzure) Remove(ctx context.Context, object string) error {
func (az *warmBackendAzure) Remove(ctx context.Context, object string, rv remoteVersionID) error {
blob := az.serviceURL.NewContainerURL(az.Bucket).NewBlobURL(az.getDest(object))
_, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
return azureToObjectError(err, az.Bucket, object)

View File

@ -43,7 +43,11 @@ func (gcs *warmBackendGCS) getDest(object string) string {
}
return destObj
}
func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) error {
// FIXME: add support for remote version ID in GCS remote tier and remove this.
// Currently it's a no-op.
func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) (remoteVersionID, error) {
object := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key))
//TODO: set storage class
w := object.NewWriter(ctx)
@ -51,13 +55,13 @@ func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader,
w.ObjectAttrs.StorageClass = gcs.StorageClass
}
if _, err := io.Copy(w, data); err != nil {
return gcsToObjectError(err, gcs.Bucket, key)
return "", gcsToObjectError(err, gcs.Bucket, key)
}
return w.Close()
return "", w.Close()
}
func (gcs *warmBackendGCS) Get(ctx context.Context, key string, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
func (gcs *warmBackendGCS) Get(ctx context.Context, key string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) {
// GCS storage decompresses a gzipped object by default and returns the data.
// Refer to https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding
// Need to set `Accept-Encoding` header to `gzip` when issuing a GetObject call, to be able
@ -73,7 +77,7 @@ func (gcs *warmBackendGCS) Get(ctx context.Context, key string, opts WarmBackend
return r, nil
}
func (gcs *warmBackendGCS) Remove(ctx context.Context, key string) error {
func (gcs *warmBackendGCS) Remove(ctx context.Context, key string, rv remoteVersionID) error {
err := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key)).Delete(ctx)
return gcsToObjectError(err, gcs.Bucket, key)
}

View File

@ -56,14 +56,17 @@ func (s3 *warmBackendS3) getDest(object string) string {
return destObj
}
func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) error {
_, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{StorageClass: s3.StorageClass})
return s3.ToObjectError(err, object)
func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) {
res, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{StorageClass: s3.StorageClass})
return remoteVersionID(res.VersionID), s3.ToObjectError(err, object)
}
func (s3 *warmBackendS3) Get(ctx context.Context, object string, opts WarmBackendGetOpts) (io.ReadCloser, error) {
func (s3 *warmBackendS3) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error) {
gopts := minio.GetObjectOptions{}
if rv != "" {
gopts.VersionID = string(rv)
}
if opts.startOffset >= 0 && opts.length > 0 {
if err := gopts.SetRange(opts.startOffset, opts.startOffset+opts.length-1); err != nil {
return nil, s3.ToObjectError(err, object)
@ -78,8 +81,12 @@ func (s3 *warmBackendS3) Get(ctx context.Context, object string, opts WarmBacken
return r, nil
}
func (s3 *warmBackendS3) Remove(ctx context.Context, object string) error {
err := s3.client.RemoveObject(ctx, s3.Bucket, s3.getDest(object), minio.RemoveObjectOptions{})
func (s3 *warmBackendS3) Remove(ctx context.Context, object string, rv remoteVersionID) error {
ropts := minio.RemoveObjectOptions{}
if rv != "" {
ropts.VersionID = string(rv)
}
err := s3.client.RemoveObject(ctx, s3.Bucket, s3.getDest(object), ropts)
return s3.ToObjectError(err, object)
}

View File

@ -36,9 +36,9 @@ type WarmBackendGetOpts struct {
// WarmBackend provides interface to be implemented by remote tier backends
type WarmBackend interface {
Put(ctx context.Context, object string, r io.Reader, length int64) error
Get(ctx context.Context, object string, opts WarmBackendGetOpts) (io.ReadCloser, error)
Remove(ctx context.Context, object string) error
Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error)
Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error)
Remove(ctx context.Context, object string, rv remoteVersionID) error
InUse(ctx context.Context) (bool, error)
}
@ -48,7 +48,7 @@ const probeObject = "probeobject"
// to perform all operations defined in the WarmBackend interface.
func checkWarmBackend(ctx context.Context, w WarmBackend) error {
var empty bytes.Reader
err := w.Put(ctx, probeObject, &empty, 0)
rv, err := w.Put(ctx, probeObject, &empty, 0)
if err != nil {
return tierPermErr{
Op: tierPut,
@ -56,7 +56,7 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error {
}
}
_, err = w.Get(ctx, probeObject, WarmBackendGetOpts{})
_, err = w.Get(ctx, probeObject, rv, WarmBackendGetOpts{})
if err != nil {
switch {
case isErrBucketNotFound(err):
@ -71,7 +71,7 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error {
}
}
if err = w.Remove(ctx, probeObject); err != nil {
if err = w.Remove(ctx, probeObject, rv); err != nil {
return tierPermErr{
Op: tierDelete,
Err: err,
@ -115,6 +115,10 @@ func errIsTierPermError(err error) bool {
return errors.As(err, &tpErr)
}
// remoteVersionID represents the version id of an object in the remote tier.
// Its usage is remote tier cloud implementation specific.
type remoteVersionID string
// newWarmBackend instantiates the tier type specific WarmBackend, runs
// checkWarmBackend on it.
func newWarmBackend(ctx context.Context, tier madmin.TierConfig) (d WarmBackend, err error) {

View File

@ -875,6 +875,9 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
if fi.TransitionedObjName != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName)
}
if fi.TransitionVersionID != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID)
}
if fi.TransitionTier != "" {
ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
}
@ -1020,6 +1023,9 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
if o, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName]; ok {
fi.TransitionedObjName = string(o)
}
if rv, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID]; ok {
fi.TransitionVersionID = string(rv)
}
if sc, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionTier]; ok {
fi.TransitionTier = string(sc)
}
@ -1188,6 +1194,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
case fi.TransitionStatus == lifecycle.TransitionComplete:
z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus)
z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName)
z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID)
z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
default:
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)