diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index ba3e62cef..05f229301 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -46,6 +46,8 @@ const ( TransitionStatus = "transition-status" // TransitionedObjectName name of transitioned object TransitionedObjectName = "transitioned-object" + // TransitionedVersionID is version of remote object + TransitionedVersionID = "transitioned-versionID" // TransitionTier name of transition storage class TransitionTier = "transition-tier" ) @@ -208,41 +210,46 @@ const ( // // 1. when a restored (via PostRestoreObject API) object expires. // 2. when a transitioned object expires (based on an ILM rule). -func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket, object string, lcOpts lifecycle.ObjectOpts, remoteObject, tier string, action expireAction) error { +func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, action expireAction) error { var opts ObjectOptions - opts.Versioned = globalBucketVersioningSys.Enabled(bucket) + opts.Versioned = globalBucketVersioningSys.Enabled(oi.Bucket) opts.VersionID = lcOpts.VersionID switch action { case expireObj: // When an object is past expiry or when a transitioned object is being // deleted, 'mark' the data in the remote tier for delete. - if err := globalTierJournal.AddEntry(jentry{ObjName: remoteObject, TierName: tier}); err != nil { + entry := jentry{ + ObjName: oi.transitionedObjName, + VersionID: oi.transitionVersionID, + TierName: oi.TransitionTier, + } + if err := globalTierJournal.AddEntry(entry); err != nil { logger.LogIf(ctx, err) return err } // Delete metadata on source, now that data in remote tier has been // marked for deletion. - if _, err := objectAPI.DeleteObject(ctx, bucket, object, opts); err != nil { + if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil { logger.LogIf(ctx, err) return err } // Send audit for the lifecycle delete operation - auditLogLifecycle(ctx, bucket, object) + auditLogLifecycle(ctx, oi.Bucket, oi.Name) eventName := event.ObjectRemovedDelete if lcOpts.DeleteMarker { eventName = event.ObjectRemovedDeleteMarkerCreated } objInfo := ObjectInfo{ - Name: object, + Name: oi.Name, VersionID: lcOpts.VersionID, DeleteMarker: lcOpts.DeleteMarker, } // Notify object deleted event. sendEvent(eventArgs{ EventName: eventName, - BucketName: bucket, + BucketName: oi.Bucket, Object: objInfo, Host: "Internal: [ILM-EXPIRY]", }) @@ -252,7 +259,7 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, bucket // from the source, while leaving metadata behind. The data on // transitioned tier lies untouched and still accessible opts.Transition.ExpireRestored = true - _, err := objectAPI.DeleteObject(ctx, bucket, object, opts) + _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts) return err default: return fmt.Errorf("Unknown expire action %v", action) @@ -286,11 +293,12 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) UserTags: oi.UserTags, } tierName := getLifeCycleTransitionTier(ctx, lc, oi.Bucket, lcOpts) - opts := ObjectOptions{Transition: TransitionOptions{ - Status: lifecycle.TransitionPending, - Tier: tierName, - ETag: oi.ETag, - }, + opts := ObjectOptions{ + Transition: TransitionOptions{ + Status: lifecycle.TransitionPending, + Tier: tierName, + ETag: oi.ETag, + }, VersionID: oi.VersionID, Versioned: globalBucketVersioningSys.Enabled(oi.Bucket), MTime: oi.ModTime, @@ -327,7 +335,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs gopts.length = length } - reader, err := tgtClient.Get(ctx, oi.transitionedObjName, gopts) + reader, err := tgtClient.Get(ctx, oi.transitionedObjName, remoteVersionID(oi.transitionVersionID), gopts) if err != nil { return nil, err } diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 7bd21b9ba..78d6a2f4f 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1068,7 +1068,7 @@ func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, if restoredObject { action = expireRestoredObj } - if err := expireTransitionedObject(ctx, objLayer, obj.Bucket, obj.Name, lcOpts, obj.transitionedObjName, obj.TransitionTier, action); err != nil { + if err := expireTransitionedObject(ctx, objLayer, &obj, lcOpts, action); err != nil { if isErrObjectNotFound(err) || isErrVersionNotFound(err) { return false } diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 0a39c5b62..b44f39444 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -155,6 +155,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo { objInfo.TransitionStatus = fi.TransitionStatus objInfo.transitionedObjName = fi.TransitionedObjName + objInfo.transitionVersionID = fi.TransitionVersionID objInfo.TransitionTier = fi.TransitionTier // etag/md5Sum has already been extracted. We need to diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 966697989..51633c9a9 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1348,7 +1348,8 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st pw.CloseWithError(err) }() - err = tgtClient.Put(ctx, destObj, pr, fi.Size) + var rv remoteVersionID + rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size) pr.CloseWithError(err) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err)) @@ -1357,6 +1358,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st fi.TransitionStatus = lifecycle.TransitionComplete fi.TransitionedObjName = destObj fi.TransitionTier = opts.Transition.Tier + fi.TransitionVersionID = string(rv) eventName := event.ObjectTransitionComplete storageDisks := er.getDisks() diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 3538da6bd..511506d9d 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -117,6 +117,8 @@ type ObjectInfo struct { TransitionStatus string // Name of transitioned object on remote tier transitionedObjName string + // VersionID on the the remote tier + transitionVersionID string // Name of remote tier object has transitioned to TransitionTier string diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 73daad7aa..e212de928 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -138,6 +138,9 @@ type FileInfo struct { TransitionedObjName string // TransitionTier is the storage class label assigned to remote tier. TransitionTier string + // TransitionVersionID stores a version ID of the object associate + // with the remote tier. + TransitionVersionID string // ExpireRestored indicates that the restored object is to be expired. ExpireRestored bool diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 073255188..875bcec75 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -550,8 +550,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 23 { - err = msgp.ArrayError{Wanted: 23, Got: zb0001} + if zb0001 != 24 { + err = msgp.ArrayError{Wanted: 24, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -594,6 +594,11 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "TransitionTier") return } + z.TransitionVersionID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TransitionVersionID") + return + } z.ExpireRestored, err = dc.ReadBool() if err != nil { err = msgp.WrapError(err, "ExpireRestored") @@ -715,8 +720,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 23 - err = en.Append(0xdc, 0x0, 0x17) + // array header, size 24 + err = en.Append(0xdc, 0x0, 0x18) if err != nil { return } @@ -760,6 +765,11 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "TransitionTier") return } + err = en.WriteString(z.TransitionVersionID) + if err != nil { + err = msgp.WrapError(err, "TransitionVersionID") + return + } err = en.WriteBool(z.ExpireRestored) if err != nil { err = msgp.WrapError(err, "ExpireRestored") @@ -860,8 +870,8 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 23 - o = append(o, 0xdc, 0x0, 0x17) + // array header, size 24 + o = append(o, 0xdc, 0x0, 0x18) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) @@ -870,6 +880,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, z.TransitionStatus) o = msgp.AppendString(o, z.TransitionedObjName) o = msgp.AppendString(o, z.TransitionTier) + o = msgp.AppendString(o, z.TransitionVersionID) o = msgp.AppendBool(o, z.ExpireRestored) o = msgp.AppendString(o, z.DataDir) o = msgp.AppendBool(o, z.XLV1) @@ -911,8 +922,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 23 { - err = msgp.ArrayError{Wanted: 23, Got: zb0001} + if zb0001 != 24 { + err = msgp.ArrayError{Wanted: 24, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -955,6 +966,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "TransitionTier") return } + z.TransitionVersionID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TransitionVersionID") + return + } z.ExpireRestored, bts, err = msgp.ReadBoolBytes(bts) if err != nil { err = msgp.WrapError(err, "ExpireRestored") @@ -1077,7 +1093,7 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *FileInfo) Msgsize() (s int) { - s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.TransitionedObjName) + msgp.StringPrefixSize + len(z.TransitionTier) + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize + s = 3 + msgp.StringPrefixSize + len(z.Volume) + msgp.StringPrefixSize + len(z.Name) + msgp.StringPrefixSize + len(z.VersionID) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.TransitionStatus) + msgp.StringPrefixSize + len(z.TransitionedObjName) + msgp.StringPrefixSize + len(z.TransitionTier) + msgp.StringPrefixSize + len(z.TransitionVersionID) + msgp.BoolSize + msgp.StringPrefixSize + len(z.DataDir) + msgp.BoolSize + msgp.TimeSize + msgp.Int64Size + msgp.Uint32Size + msgp.MapHeaderSize if z.Metadata != nil { for za0001, za0002 := range z.Metadata { _ = za0002 diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 9ce15f03f..62f8ee7db 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - storageRESTVersion = "v35" // Inline bugfix needs all servers to be updated + storageRESTVersion = "v36" // Changes to FileInfo for tier-journal storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/tier-journal.go b/cmd/tier-journal.go index 87aa5af7d..97479f2c5 100644 --- a/cmd/tier-journal.go +++ b/cmd/tier-journal.go @@ -42,8 +42,9 @@ type tierJournal struct { } type jentry struct { - ObjName string `msg:"obj"` - TierName string `msg:"tier"` + ObjName string `msg:"obj"` + VersionID string `msg:"vid"` + TierName string `msg:"tier"` } const ( @@ -51,6 +52,10 @@ const ( tierJournalHdrLen = 2 // 2 bytes ) +var ( + errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version") +) + func initTierDeletionJournal(done <-chan struct{}) (*tierJournal, error) { diskPath := globalEndpoints.FirstLocalDiskPath() j := &tierJournal{ @@ -84,7 +89,7 @@ func (j *tierJournal) rotate() error { return j.Open() } -type walkFn func(objName, tierName string) error +type walkFn func(objName, rvID, tierName string) error func (j *tierJournal) ReadOnlyPath() string { return filepath.Join(j.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin") @@ -111,6 +116,7 @@ func (j *tierJournal) WalkEntries(fn walkFn) { } defer ro.Close() mr := msgp.NewReader(ro) + done := false for { var entry jentry @@ -123,9 +129,11 @@ func (j *tierJournal) WalkEntries(fn walkFn) { logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to decode journal entry %s", err)) break } - err = fn(entry.ObjName, entry.TierName) + err = fn(entry.ObjName, entry.VersionID, entry.TierName) if err != nil && !isErrObjectNotFound(err) { logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err)) + // We add the entry into the active journal to try again + // later. j.AddEntry(entry) } } @@ -134,12 +142,12 @@ func (j *tierJournal) WalkEntries(fn walkFn) { } } -func deleteObjectFromRemoteTier(objName, tierName string) error { +func deleteObjectFromRemoteTier(objName, rvID, tierName string) error { w, err := globalTierConfigMgr.getDriver(tierName) if err != nil { return err } - err = w.Remove(context.Background(), objName) + err = w.Remove(context.Background(), objName, remoteVersionID(rvID)) if err != nil { return err } @@ -263,8 +271,15 @@ func (j *tierJournal) OpenRO() (io.ReadCloser, error) { switch binary.LittleEndian.Uint16(data[:]) { case tierJournalVersion: + return file, nil default: - return nil, errors.New("unsupported pending deletes journal version") + return nil, errUnsupportedJournalVersion } - return file, nil +} + +// jentryV1 represents the entry in the journal before RemoteVersionID was +// added. It remains here for use in tests for the struct element addition. +type jentryV1 struct { + ObjName string `msg:"obj"` + TierName string `msg:"tier"` } diff --git a/cmd/tier-journal_gen.go b/cmd/tier-journal_gen.go index 17fba8488..f62c3af13 100644 --- a/cmd/tier-journal_gen.go +++ b/cmd/tier-journal_gen.go @@ -8,6 +8,159 @@ import ( // DecodeMsg implements msgp.Decodable func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "obj": + z.ObjName, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ObjName") + return + } + case "vid": + z.VersionID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "tier": + z.TierName, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TierName") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "obj" + err = en.Append(0x83, 0xa3, 0x6f, 0x62, 0x6a) + if err != nil { + return + } + err = en.WriteString(z.ObjName) + if err != nil { + err = msgp.WrapError(err, "ObjName") + return + } + // write "vid" + err = en.Append(0xa3, 0x76, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteString(z.VersionID) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + // write "tier" + err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72) + if err != nil { + return + } + err = en.WriteString(z.TierName) + if err != nil { + err = msgp.WrapError(err, "TierName") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 3 + // string "obj" + o = append(o, 0x83, 0xa3, 0x6f, 0x62, 0x6a) + o = msgp.AppendString(o, z.ObjName) + // string "vid" + o = append(o, 0xa3, 0x76, 0x69, 0x64) + o = msgp.AppendString(o, z.VersionID) + // string "tier" + o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72) + o = msgp.AppendString(o, z.TierName) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "obj": + z.ObjName, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjName") + return + } + case "vid": + z.VersionID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "tier": + z.TierName, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TierName") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z jentry) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 4 + msgp.StringPrefixSize + len(z.VersionID) + 5 + msgp.StringPrefixSize + len(z.TierName) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *jentryV1) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field var zb0001 uint32 @@ -48,7 +201,7 @@ func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { +func (z jentryV1) EncodeMsg(en *msgp.Writer) (err error) { // map header, size 2 // write "obj" err = en.Append(0x82, 0xa3, 0x6f, 0x62, 0x6a) @@ -74,7 +227,7 @@ func (z jentry) EncodeMsg(en *msgp.Writer) (err error) { } // MarshalMsg implements msgp.Marshaler -func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { +func (z jentryV1) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // map header, size 2 // string "obj" @@ -87,7 +240,7 @@ func (z jentry) MarshalMsg(b []byte) (o []byte, err error) { } // UnmarshalMsg implements msgp.Unmarshaler -func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { +func (z *jentryV1) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field var zb0001 uint32 @@ -129,7 +282,7 @@ func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z jentry) Msgsize() (s int) { +func (z jentryV1) Msgsize() (s int) { s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 5 + msgp.StringPrefixSize + len(z.TierName) return } diff --git a/cmd/tier-journal_gen_test.go b/cmd/tier-journal_gen_test.go index d126ae391..5cff069a5 100644 --- a/cmd/tier-journal_gen_test.go +++ b/cmd/tier-journal_gen_test.go @@ -121,3 +121,116 @@ func BenchmarkDecodejentry(b *testing.B) { } } } + +func TestMarshalUnmarshaljentryV1(t *testing.T) { + v := jentryV1{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgjentryV1(b *testing.B) { + v := jentryV1{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgjentryV1(b *testing.B) { + v := jentryV1{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshaljentryV1(b *testing.B) { + v := jentryV1{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodejentryV1(t *testing.T) { + v := jentryV1{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodejentryV1 Msgsize() is inaccurate") + } + + vn := jentryV1{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodejentryV1(b *testing.B) { + v := jentryV1{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodejentryV1(b *testing.B) { + v := jentryV1{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/tier-journal_test.go b/cmd/tier-journal_test.go new file mode 100644 index 000000000..5c6dd0c75 --- /dev/null +++ b/cmd/tier-journal_test.go @@ -0,0 +1,121 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +// TestJEntryReadOldToNew1 - tests that adding the RemoteVersionID parameter to the +// jentry struct does not cause unexpected errors when reading the serialized +// old version into new version. +func TestJEntryReadOldToNew1(t *testing.T) { + readOldToNewCases := []struct { + je jentryV1 + exp jentry + }{ + {jentryV1{"obj1", "tier1"}, jentry{"obj1", "", "tier1"}}, + {jentryV1{"obj1", ""}, jentry{"obj1", "", ""}}, + {jentryV1{"", "tier1"}, jentry{"", "", "tier1"}}, + {jentryV1{"", ""}, jentry{"", "", ""}}, + } + + var b bytes.Buffer + for _, item := range readOldToNewCases { + bs, err := item.je.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + b.Write(bs) + } + + mr := msgp.NewReader(&b) + for i, item := range readOldToNewCases { + var je jentry + err := je.DecodeMsg(mr) + if err != nil { + t.Fatal(err) + } + if je != item.exp { + t.Errorf("Case %d: Expected: %v Got: %v", i, item.exp, je) + } + } +} + +// TestJEntryWriteNewToOldMix1 - tests that adding the RemoteVersionID parameter +// to the jentry struct does not cause unexpected errors when writing. This +// simulates the case when the active journal has entries in the older version +// struct and due to errors new entries are added in the new version of the +// struct. +func TestJEntryWriteNewToOldMix1(t *testing.T) { + oldStructVals := []jentryV1{ + {"obj1", "tier1"}, + {"obj2", "tier2"}, + {"obj3", "tier3"}, + } + newStructVals := []jentry{ + {"obj4", "", "tier1"}, + {"obj5", "ver2", "tier2"}, + {"obj6", "", "tier3"}, + } + + // Write old struct version values followed by new version values. + var b bytes.Buffer + for _, item := range oldStructVals { + bs, err := item.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + b.Write(bs) + } + for _, item := range newStructVals { + bs, err := item.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + b.Write(bs) + } + + // Read into new struct version and check. + mr := msgp.NewReader(&b) + for i := 0; i < len(oldStructVals)+len(newStructVals); i++ { + var je jentry + err := je.DecodeMsg(mr) + if err != nil { + t.Fatal(err) + } + var expectedJe jentry + if i < len(oldStructVals) { + // For old struct values, the RemoteVersionID will be + // empty + expectedJe = jentry{ + ObjName: oldStructVals[i].ObjName, + VersionID: "", + TierName: oldStructVals[i].TierName, + } + } else { + expectedJe = newStructVals[i-len(oldStructVals)] + } + if expectedJe != je { + t.Errorf("Case %d: Expected: %v, Got: %v", i, expectedJe, je) + } + } +} diff --git a/cmd/tier-sweeper.go b/cmd/tier-sweeper.go index 9b5944a94..24cbcedda 100644 --- a/cmd/tier-sweeper.go +++ b/cmd/tier-sweeper.go @@ -41,14 +41,15 @@ import ( // logger.LogIf(ctx, err) // } type objSweeper struct { - Object string - Bucket string - ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs - Versioned bool - Suspended bool - TransitionStatus string - TransitionTier string - RemoteObject string + Object string + Bucket string + ReqVersion string // version ID set by application, applies only to DeleteObject and DeleteObjects APIs + Versioned bool + Suspended bool + TransitionStatus string + TransitionTier string + TransitionVersionID string + RemoteObject string } // newObjSweeper returns an objSweeper for a given bucket and object. @@ -116,6 +117,7 @@ func (os *objSweeper) SetTransitionState(info ObjectInfo) { os.TransitionTier = info.TransitionTier os.TransitionStatus = info.TransitionStatus os.RemoteObject = info.transitionedObjName + os.TransitionVersionID = info.transitionVersionID } // shouldRemoveRemoteObject determines if a transitioned object should be @@ -142,7 +144,11 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) { delTier = true } if delTier { - return jentry{ObjName: os.RemoteObject, TierName: os.TransitionTier}, true + return jentry{ + ObjName: os.RemoteObject, + VersionID: os.TransitionVersionID, + TierName: os.TransitionTier, + }, true } return jentry{}, false } diff --git a/cmd/warm-backend-azure.go b/cmd/warm-backend-azure.go index 9b4e68979..01ec4033f 100644 --- a/cmd/warm-backend-azure.go +++ b/cmd/warm-backend-azure.go @@ -53,19 +53,23 @@ func (az *warmBackendAzure) tier() azblob.AccessTierType { } return azblob.AccessTierType("") } -func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) error { + +// FIXME: add support for remote version ID in Azure remote tier and remove +// this. Currently it's a no-op. + +func (az *warmBackendAzure) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) { blobURL := az.serviceURL.NewContainerURL(az.Bucket).NewBlockBlobURL(az.getDest(object)) // set tier if specified - if az.StorageClass != "" { if _, err := blobURL.SetTier(ctx, az.tier(), azblob.LeaseAccessConditions{}); err != nil { - return azureToObjectError(err, az.Bucket, object) + return "", azureToObjectError(err, az.Bucket, object) } } - _, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{}) - return azureToObjectError(err, az.Bucket, object) + res, err := azblob.UploadStreamToBlockBlob(ctx, r, blobURL, azblob.UploadStreamToBlockBlobOptions{}) + return remoteVersionID(res.Version()), azureToObjectError(err, az.Bucket, object) } -func (az *warmBackendAzure) Get(ctx context.Context, object string, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { +func (az *warmBackendAzure) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { if opts.startOffset < 0 { return nil, InvalidRange{} } @@ -79,7 +83,7 @@ func (az *warmBackendAzure) Get(ctx context.Context, object string, opts WarmBac return rc, nil } -func (az *warmBackendAzure) Remove(ctx context.Context, object string) error { +func (az *warmBackendAzure) Remove(ctx context.Context, object string, rv remoteVersionID) error { blob := az.serviceURL.NewContainerURL(az.Bucket).NewBlobURL(az.getDest(object)) _, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) return azureToObjectError(err, az.Bucket, object) diff --git a/cmd/warm-backend-gcs.go b/cmd/warm-backend-gcs.go index a24f07997..28f2edad5 100644 --- a/cmd/warm-backend-gcs.go +++ b/cmd/warm-backend-gcs.go @@ -43,7 +43,11 @@ func (gcs *warmBackendGCS) getDest(object string) string { } return destObj } -func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) error { + +// FIXME: add support for remote version ID in GCS remote tier and remove this. +// Currently it's a no-op. + +func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, length int64) (remoteVersionID, error) { object := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key)) //TODO: set storage class w := object.NewWriter(ctx) @@ -51,13 +55,13 @@ func (gcs *warmBackendGCS) Put(ctx context.Context, key string, data io.Reader, w.ObjectAttrs.StorageClass = gcs.StorageClass } if _, err := io.Copy(w, data); err != nil { - return gcsToObjectError(err, gcs.Bucket, key) + return "", gcsToObjectError(err, gcs.Bucket, key) } - return w.Close() + return "", w.Close() } -func (gcs *warmBackendGCS) Get(ctx context.Context, key string, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { +func (gcs *warmBackendGCS) Get(ctx context.Context, key string, rv remoteVersionID, opts WarmBackendGetOpts) (r io.ReadCloser, err error) { // GCS storage decompresses a gzipped object by default and returns the data. // Refer to https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding // Need to set `Accept-Encoding` header to `gzip` when issuing a GetObject call, to be able @@ -73,7 +77,7 @@ func (gcs *warmBackendGCS) Get(ctx context.Context, key string, opts WarmBackend return r, nil } -func (gcs *warmBackendGCS) Remove(ctx context.Context, key string) error { +func (gcs *warmBackendGCS) Remove(ctx context.Context, key string, rv remoteVersionID) error { err := gcs.client.Bucket(gcs.Bucket).Object(gcs.getDest(key)).Delete(ctx) return gcsToObjectError(err, gcs.Bucket, key) } diff --git a/cmd/warm-backend-s3.go b/cmd/warm-backend-s3.go index de11dc469..a5c9ca830 100644 --- a/cmd/warm-backend-s3.go +++ b/cmd/warm-backend-s3.go @@ -56,14 +56,17 @@ func (s3 *warmBackendS3) getDest(object string) string { return destObj } -func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) error { - _, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{StorageClass: s3.StorageClass}) - return s3.ToObjectError(err, object) +func (s3 *warmBackendS3) Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) { + res, err := s3.client.PutObject(ctx, s3.Bucket, s3.getDest(object), r, length, minio.PutObjectOptions{StorageClass: s3.StorageClass}) + return remoteVersionID(res.VersionID), s3.ToObjectError(err, object) } -func (s3 *warmBackendS3) Get(ctx context.Context, object string, opts WarmBackendGetOpts) (io.ReadCloser, error) { +func (s3 *warmBackendS3) Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error) { gopts := minio.GetObjectOptions{} + if rv != "" { + gopts.VersionID = string(rv) + } if opts.startOffset >= 0 && opts.length > 0 { if err := gopts.SetRange(opts.startOffset, opts.startOffset+opts.length-1); err != nil { return nil, s3.ToObjectError(err, object) @@ -78,8 +81,12 @@ func (s3 *warmBackendS3) Get(ctx context.Context, object string, opts WarmBacken return r, nil } -func (s3 *warmBackendS3) Remove(ctx context.Context, object string) error { - err := s3.client.RemoveObject(ctx, s3.Bucket, s3.getDest(object), minio.RemoveObjectOptions{}) +func (s3 *warmBackendS3) Remove(ctx context.Context, object string, rv remoteVersionID) error { + ropts := minio.RemoveObjectOptions{} + if rv != "" { + ropts.VersionID = string(rv) + } + err := s3.client.RemoveObject(ctx, s3.Bucket, s3.getDest(object), ropts) return s3.ToObjectError(err, object) } diff --git a/cmd/warm-backend.go b/cmd/warm-backend.go index c1ccdfecc..441bf27af 100644 --- a/cmd/warm-backend.go +++ b/cmd/warm-backend.go @@ -36,9 +36,9 @@ type WarmBackendGetOpts struct { // WarmBackend provides interface to be implemented by remote tier backends type WarmBackend interface { - Put(ctx context.Context, object string, r io.Reader, length int64) error - Get(ctx context.Context, object string, opts WarmBackendGetOpts) (io.ReadCloser, error) - Remove(ctx context.Context, object string) error + Put(ctx context.Context, object string, r io.Reader, length int64) (remoteVersionID, error) + Get(ctx context.Context, object string, rv remoteVersionID, opts WarmBackendGetOpts) (io.ReadCloser, error) + Remove(ctx context.Context, object string, rv remoteVersionID) error InUse(ctx context.Context) (bool, error) } @@ -48,7 +48,7 @@ const probeObject = "probeobject" // to perform all operations defined in the WarmBackend interface. func checkWarmBackend(ctx context.Context, w WarmBackend) error { var empty bytes.Reader - err := w.Put(ctx, probeObject, &empty, 0) + rv, err := w.Put(ctx, probeObject, &empty, 0) if err != nil { return tierPermErr{ Op: tierPut, @@ -56,7 +56,7 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error { } } - _, err = w.Get(ctx, probeObject, WarmBackendGetOpts{}) + _, err = w.Get(ctx, probeObject, rv, WarmBackendGetOpts{}) if err != nil { switch { case isErrBucketNotFound(err): @@ -71,7 +71,7 @@ func checkWarmBackend(ctx context.Context, w WarmBackend) error { } } - if err = w.Remove(ctx, probeObject); err != nil { + if err = w.Remove(ctx, probeObject, rv); err != nil { return tierPermErr{ Op: tierDelete, Err: err, @@ -115,6 +115,10 @@ func errIsTierPermError(err error) bool { return errors.As(err, &tpErr) } +// remoteVersionID represents the version id of an object in the remote tier. +// Its usage is remote tier cloud implementation specific. +type remoteVersionID string + // newWarmBackend instantiates the tier type specific WarmBackend, runs // checkWarmBackend on it. func newWarmBackend(ctx context.Context, tier madmin.TierConfig) (d WarmBackend, err error) { diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 9e7866c92..501d7d321 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -875,6 +875,9 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error { if fi.TransitionedObjName != "" { ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName) } + if fi.TransitionVersionID != "" { + ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID) + } if fi.TransitionTier != "" { ventry.ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier) } @@ -1020,6 +1023,9 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { if o, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName]; ok { fi.TransitionedObjName = string(o) } + if rv, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID]; ok { + fi.TransitionVersionID = string(rv) + } if sc, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionTier]; ok { fi.TransitionTier = string(sc) } @@ -1188,6 +1194,7 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) { case fi.TransitionStatus == lifecycle.TransitionComplete: z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionStatus] = []byte(fi.TransitionStatus) z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName] = []byte(fi.TransitionedObjName) + z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID] = []byte(fi.TransitionVersionID) z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier) default: z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)