mirror of https://github.com/minio/minio.git
Add a 'free' version to track deletion of tiered object content (#12470)
This commit is contained in:
parent
dc6958b6a1
commit
a1df230518
|
@ -235,7 +235,7 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob
|
|||
}
|
||||
|
||||
// Send audit for the lifecycle delete operation
|
||||
auditLogLifecycle(ctx, oi.Bucket, oi.Name)
|
||||
auditLogLifecycle(ctx, oi.Bucket, oi.Name, oi.VersionID, ILMExpiryActivity)
|
||||
|
||||
eventName := event.ObjectRemovedDelete
|
||||
if lcOpts.DeleteMarker {
|
||||
|
|
|
@ -963,11 +963,48 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac
|
|||
return false, size
|
||||
}
|
||||
|
||||
// applyTierObjSweep removes remote object pending deletion and the free-version
|
||||
// tracking this information.
|
||||
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta actionMeta) {
|
||||
if !meta.oi.tierFreeVersion {
|
||||
// nothing to be done
|
||||
return
|
||||
}
|
||||
|
||||
ignoreNotFoundErr := func(err error) error {
|
||||
switch {
|
||||
case isErrVersionNotFound(err), isErrObjectNotFound(err):
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
// Remove the remote object
|
||||
err := deleteObjectFromRemoteTier(ctx, meta.oi.transitionedObjName, meta.oi.transitionVersionID, meta.oi.TransitionTier)
|
||||
if ignoreNotFoundErr(err) != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove this free version
|
||||
opts := ObjectOptions{}
|
||||
opts.VersionID = meta.oi.VersionID
|
||||
_, err = o.DeleteObject(ctx, meta.oi.Bucket, meta.oi.Name, opts)
|
||||
if err == nil {
|
||||
auditLogLifecycle(ctx, meta.oi.Bucket, meta.oi.Name, meta.oi.VersionID, ILMFreeVersionDeleteActivity)
|
||||
}
|
||||
if ignoreNotFoundErr(err) != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// applyActions will apply lifecycle checks on to a scanned item.
|
||||
// The resulting size on disk will always be returned.
|
||||
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
||||
// If no metadata is supplied, -1 is returned if no action is taken.
|
||||
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) int64 {
|
||||
i.applyTierObjSweep(ctx, o, meta)
|
||||
|
||||
applied, size := i.applyLifecycle(ctx, o, meta)
|
||||
// For instance, an applied lifecycle means we remove/transitioned an object
|
||||
// from the current deployment, which means we don't have to call healing
|
||||
|
@ -1099,7 +1136,7 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay
|
|||
}
|
||||
|
||||
// Send audit for the lifecycle delete operation
|
||||
auditLogLifecycle(ctx, obj.Bucket, obj.Name)
|
||||
auditLogLifecycle(ctx, obj.Bucket, obj.Name, obj.VersionID, ILMExpiryActivity)
|
||||
|
||||
eventName := event.ObjectRemovedDelete
|
||||
if obj.DeleteMarker {
|
||||
|
@ -1341,12 +1378,24 @@ func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ILMExpiryActivity - activity trail for ILM expiry
|
||||
const ILMExpiryActivity = "ilm:expiry"
|
||||
const (
|
||||
// ILMExpiryActivity - activity trail for ILM expiry
|
||||
ILMExpiryActivity = "ilm:expiry"
|
||||
// ILMFreeVersionDeleteActivity - activity trail for ILM free-version delete
|
||||
ILMFreeVersionDeleteActivity = "ilm:free-version-delete"
|
||||
)
|
||||
|
||||
func auditLogLifecycle(ctx context.Context, bucket, object string) {
|
||||
func auditLogLifecycle(ctx context.Context, bucket, object, versionID string, trigger string) {
|
||||
var apiName string
|
||||
switch trigger {
|
||||
case ILMExpiryActivity:
|
||||
apiName = "s3:ExpireObject"
|
||||
case ILMFreeVersionDeleteActivity:
|
||||
apiName = "s3:DeleteFreeVersion"
|
||||
}
|
||||
auditLogInternal(ctx, bucket, object, AuditLogOptions{
|
||||
Trigger: ILMExpiryActivity,
|
||||
APIName: "s3:ExpireObject",
|
||||
Trigger: trigger,
|
||||
APIName: apiName,
|
||||
VersionID: versionID,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -156,6 +156,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
|
|||
objInfo.TransitionStatus = fi.TransitionStatus
|
||||
objInfo.transitionedObjName = fi.TransitionedObjName
|
||||
objInfo.transitionVersionID = fi.TransitionVersionID
|
||||
objInfo.tierFreeVersion = fi.TierFreeVersion()
|
||||
objInfo.TransitionTier = fi.TransitionTier
|
||||
|
||||
// etag/md5Sum has already been extracted. We need to
|
||||
|
@ -362,3 +363,38 @@ func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []
|
|||
// from latestFileInfo to get the quorum
|
||||
return dataBlocks, writeQuorum, nil
|
||||
}
|
||||
|
||||
const (
|
||||
tierFVID = "tier-free-versionID"
|
||||
tierFVMarker = "tier-free-marker"
|
||||
)
|
||||
|
||||
// SetTierFreeVersionID sets free-version's versionID. This method is used by
|
||||
// object layer to pass down a versionID to set for a free-version that may be
|
||||
// created.
|
||||
func (fi *FileInfo) SetTierFreeVersionID(versionID string) {
|
||||
if fi.Metadata == nil {
|
||||
fi.Metadata = make(map[string]string)
|
||||
}
|
||||
fi.Metadata[ReservedMetadataPrefixLower+tierFVID] = versionID
|
||||
}
|
||||
|
||||
// TierFreeVersionID returns the free-version's version id.
|
||||
func (fi *FileInfo) TierFreeVersionID() string {
|
||||
return fi.Metadata[ReservedMetadataPrefixLower+tierFVID]
|
||||
}
|
||||
|
||||
// SetTierFreeVersion sets fi as a free-version. This method is used by
|
||||
// lower layers to indicate a free-version.
|
||||
func (fi *FileInfo) SetTierFreeVersion() {
|
||||
if fi.Metadata == nil {
|
||||
fi.Metadata = make(map[string]string)
|
||||
}
|
||||
fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] = ""
|
||||
}
|
||||
|
||||
// TierFreeVersion returns true if version is a free-version.
|
||||
func (fi *FileInfo) TierFreeVersion() bool {
|
||||
_, ok := fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker]
|
||||
return ok
|
||||
}
|
||||
|
|
|
@ -505,6 +505,10 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
|||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
fvID := mustGetUUID()
|
||||
for index := range disks {
|
||||
metadata[index].SetTierFreeVersionID(fvID)
|
||||
}
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range disks {
|
||||
index := index
|
||||
|
@ -518,6 +522,7 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
|||
if fi.Erasure.Index == 0 {
|
||||
fi.Erasure.Index = index + 1
|
||||
}
|
||||
|
||||
if fi.IsValid() {
|
||||
return disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry)
|
||||
}
|
||||
|
@ -930,6 +935,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
|||
DeleteMarkerReplicationStatus: objects[i].DeleteMarkerReplicationStatus,
|
||||
VersionPurgeStatus: objects[i].VersionPurgeStatus,
|
||||
}
|
||||
versions[i].SetTierFreeVersionID(mustGetUUID())
|
||||
}
|
||||
|
||||
// Initialize list of errors.
|
||||
|
@ -1102,7 +1108,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||
if opts.MTime.IsZero() {
|
||||
modTime = UTCNow()
|
||||
}
|
||||
|
||||
fvID := mustGetUUID()
|
||||
if markDelete {
|
||||
if opts.Versioned || opts.VersionSuspended {
|
||||
fi := FileInfo{
|
||||
|
@ -1115,6 +1121,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||
TransitionStatus: opts.Transition.Status,
|
||||
ExpireRestored: opts.Transition.ExpireRestored,
|
||||
}
|
||||
fi.SetTierFreeVersionID(fvID)
|
||||
if opts.Versioned {
|
||||
fi.VersionID = mustGetUUID()
|
||||
if opts.VersionID != "" {
|
||||
|
@ -1133,7 +1140,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||
}
|
||||
|
||||
// Delete the object version on all disks.
|
||||
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{
|
||||
dfi := FileInfo{
|
||||
Name: object,
|
||||
VersionID: opts.VersionID,
|
||||
MarkDeleted: markDelete,
|
||||
|
@ -1143,7 +1150,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||
VersionPurgeStatus: opts.VersionPurgeStatus,
|
||||
TransitionStatus: opts.Transition.Status,
|
||||
ExpireRestored: opts.Transition.ExpireRestored,
|
||||
}, opts.DeleteMarker); err != nil {
|
||||
}
|
||||
dfi.SetTierFreeVersionID(fvID)
|
||||
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, dfi, opts.DeleteMarker); err != nil {
|
||||
return objInfo, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
|
|
@ -113,6 +113,8 @@ type ObjectInfo struct {
|
|||
// to a delete marker on an object.
|
||||
DeleteMarker bool
|
||||
|
||||
// tierFreeVersion is true if this is a free-version
|
||||
tierFreeVersion bool
|
||||
// TransitionStatus indicates if transition is complete/pending
|
||||
TransitionStatus string
|
||||
// Name of transitioned object on remote tier
|
||||
|
|
|
@ -33,14 +33,19 @@ import (
|
|||
)
|
||||
|
||||
//go:generate msgp -file $GOFILE -unexported
|
||||
//msgp:ignore tierJournal walkfn
|
||||
//msgp:ignore tierJournal tierDiskJournal walkfn
|
||||
|
||||
type tierJournal struct {
|
||||
type tierDiskJournal struct {
|
||||
sync.RWMutex
|
||||
diskPath string
|
||||
file *os.File // active journal file
|
||||
}
|
||||
|
||||
type tierJournal struct {
|
||||
*tierDiskJournal // for processing legacy journal entries
|
||||
*tierMemJournal // for processing new journal entries
|
||||
}
|
||||
|
||||
type jentry struct {
|
||||
ObjName string `msg:"obj"`
|
||||
VersionID string `msg:"vid"`
|
||||
|
@ -56,12 +61,20 @@ var (
|
|||
errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version")
|
||||
)
|
||||
|
||||
func initTierDeletionJournal(ctx context.Context) (*tierJournal, error) {
|
||||
for _, diskPath := range globalEndpoints.LocalDisksPaths() {
|
||||
j := &tierJournal{
|
||||
diskPath: diskPath,
|
||||
}
|
||||
func newTierDiskJournal() *tierDiskJournal {
|
||||
return &tierDiskJournal{}
|
||||
}
|
||||
|
||||
// initTierDeletionJournal intializes an in-memory journal built using a
|
||||
// buffered channel for new journal entries. It also initializes the on-disk
|
||||
// journal only to process existing journal entries made from previous versions.
|
||||
func initTierDeletionJournal(ctx context.Context) (*tierJournal, error) {
|
||||
j := &tierJournal{
|
||||
tierMemJournal: newTierMemJoural(1000),
|
||||
tierDiskJournal: newTierDiskJournal(),
|
||||
}
|
||||
for _, diskPath := range globalEndpoints.LocalDisksPaths() {
|
||||
j.diskPath = diskPath
|
||||
if err := os.MkdirAll(filepath.Dir(j.JournalPath()), os.FileMode(0700)); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
|
@ -73,7 +86,8 @@ func initTierDeletionJournal(ctx context.Context) (*tierJournal, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
go j.deletePending(ctx.Done())
|
||||
go j.deletePending(ctx) // for existing journal entries from previous MinIO versions
|
||||
go j.processEntries(ctx) // for newer journal entries circa free-versions
|
||||
return j, nil
|
||||
}
|
||||
|
||||
|
@ -83,40 +97,40 @@ func initTierDeletionJournal(ctx context.Context) (*tierJournal, error) {
|
|||
// rotate rotates the journal. If a read-only journal already exists it does
|
||||
// nothing. Otherwise renames the active journal to a read-only journal and
|
||||
// opens a new active journal.
|
||||
func (j *tierJournal) rotate() error {
|
||||
func (jd *tierDiskJournal) rotate() error {
|
||||
// Do nothing if a read-only journal file already exists.
|
||||
if _, err := os.Stat(j.ReadOnlyPath()); err == nil {
|
||||
if _, err := os.Stat(jd.ReadOnlyPath()); err == nil {
|
||||
return nil
|
||||
}
|
||||
// Close the active journal if present.
|
||||
j.Close()
|
||||
jd.Close()
|
||||
// Open a new active journal for subsequent journalling.
|
||||
return j.Open()
|
||||
return jd.Open()
|
||||
}
|
||||
|
||||
type walkFn func(objName, rvID, tierName string) error
|
||||
type walkFn func(ctx context.Context, objName, rvID, tierName string) error
|
||||
|
||||
func (j *tierJournal) ReadOnlyPath() string {
|
||||
return filepath.Join(j.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin")
|
||||
func (jd *tierDiskJournal) ReadOnlyPath() string {
|
||||
return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin")
|
||||
}
|
||||
|
||||
func (j *tierJournal) JournalPath() string {
|
||||
return filepath.Join(j.diskPath, minioMetaBucket, "ilm", "deletion-journal.bin")
|
||||
func (jd *tierDiskJournal) JournalPath() string {
|
||||
return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.bin")
|
||||
}
|
||||
|
||||
func (j *tierJournal) WalkEntries(fn walkFn) {
|
||||
err := j.rotate()
|
||||
func (jd *tierDiskJournal) WalkEntries(ctx context.Context, fn walkFn) {
|
||||
err := jd.rotate()
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to rotate pending deletes journal %s", err))
|
||||
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to rotate pending deletes journal %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
ro, err := j.OpenRO()
|
||||
ro, err := jd.OpenRO()
|
||||
switch {
|
||||
case errors.Is(err, os.ErrNotExist):
|
||||
return // No read-only journal to process; nothing to do.
|
||||
case err != nil:
|
||||
logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed open read-only journal for processing %s", err))
|
||||
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed open read-only journal for processing %s", err))
|
||||
return
|
||||
}
|
||||
defer ro.Close()
|
||||
|
@ -131,52 +145,52 @@ func (j *tierJournal) WalkEntries(fn walkFn) {
|
|||
break
|
||||
}
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to decode journal entry %s", err))
|
||||
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to decode journal entry %s", err))
|
||||
break
|
||||
}
|
||||
err = fn(entry.ObjName, entry.VersionID, entry.TierName)
|
||||
err = fn(ctx, entry.ObjName, entry.VersionID, entry.TierName)
|
||||
if err != nil && !isErrObjectNotFound(err) {
|
||||
logger.LogIf(context.Background(), fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err))
|
||||
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err))
|
||||
// We add the entry into the active journal to try again
|
||||
// later.
|
||||
j.AddEntry(entry)
|
||||
jd.addEntry(entry)
|
||||
}
|
||||
}
|
||||
if done {
|
||||
os.Remove(j.ReadOnlyPath())
|
||||
os.Remove(jd.ReadOnlyPath())
|
||||
}
|
||||
}
|
||||
|
||||
func deleteObjectFromRemoteTier(objName, rvID, tierName string) error {
|
||||
func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error {
|
||||
w, err := globalTierConfigMgr.getDriver(tierName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = w.Remove(context.Background(), objName, remoteVersionID(rvID))
|
||||
err = w.Remove(ctx, objName, remoteVersionID(rvID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *tierJournal) deletePending(done <-chan struct{}) {
|
||||
func (jd *tierDiskJournal) deletePending(ctx context.Context) {
|
||||
ticker := time.NewTicker(30 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
j.WalkEntries(deleteObjectFromRemoteTier)
|
||||
jd.WalkEntries(ctx, deleteObjectFromRemoteTier)
|
||||
|
||||
case <-done:
|
||||
j.Close()
|
||||
case <-ctx.Done():
|
||||
jd.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (j *tierJournal) AddEntry(je jentry) error {
|
||||
func (jd *tierDiskJournal) addEntry(je jentry) error {
|
||||
// Open journal if it hasn't been
|
||||
err := j.Open()
|
||||
err := jd.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -186,21 +200,21 @@ func (j *tierJournal) AddEntry(je jentry) error {
|
|||
return err
|
||||
}
|
||||
|
||||
j.Lock()
|
||||
defer j.Unlock()
|
||||
_, err = j.file.Write(b)
|
||||
jd.Lock()
|
||||
defer jd.Unlock()
|
||||
_, err = jd.file.Write(b)
|
||||
if err != nil {
|
||||
j.file = nil // reset to allow subsequent reopen when file/disk is available.
|
||||
jd.file = nil // reset to allow subsequent reopen when file/disk is available.
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the active journal and renames it to read-only for pending
|
||||
// deletes processing. Note: calling Close on a closed journal is a no-op.
|
||||
func (j *tierJournal) Close() error {
|
||||
j.Lock()
|
||||
defer j.Unlock()
|
||||
if j.file == nil { // already closed
|
||||
func (jd *tierDiskJournal) Close() error {
|
||||
jd.Lock()
|
||||
defer jd.Unlock()
|
||||
if jd.file == nil { // already closed
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -210,7 +224,7 @@ func (j *tierJournal) Close() error {
|
|||
err error
|
||||
)
|
||||
// Setting j.file to nil
|
||||
f, j.file = j.file, f
|
||||
f, jd.file = jd.file, f
|
||||
if fi, err = f.Stat(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -220,8 +234,8 @@ func (j *tierJournal) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
jPath := j.JournalPath()
|
||||
jroPath := j.ReadOnlyPath()
|
||||
jPath := jd.JournalPath()
|
||||
jroPath := jd.ReadOnlyPath()
|
||||
// Rotate active journal to perform pending deletes.
|
||||
err = os.Rename(jPath, jroPath)
|
||||
if err != nil {
|
||||
|
@ -233,28 +247,28 @@ func (j *tierJournal) Close() error {
|
|||
|
||||
// Open opens a new active journal. Note: calling Open on an opened journal is a
|
||||
// no-op.
|
||||
func (j *tierJournal) Open() error {
|
||||
j.Lock()
|
||||
defer j.Unlock()
|
||||
if j.file != nil { // already open
|
||||
func (jd *tierDiskJournal) Open() error {
|
||||
jd.Lock()
|
||||
defer jd.Unlock()
|
||||
if jd.file != nil { // already open
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
j.file, err = os.OpenFile(j.JournalPath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY|writeMode, 0644)
|
||||
jd.file, err = os.OpenFile(jd.JournalPath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY|writeMode, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// write journal version header if active journal is empty
|
||||
fi, err := j.file.Stat()
|
||||
fi, err := jd.file.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fi.Size() == 0 {
|
||||
var data [tierJournalHdrLen]byte
|
||||
binary.LittleEndian.PutUint16(data[:], tierJournalVersion)
|
||||
_, err = j.file.Write(data[:])
|
||||
_, err = jd.file.Write(data[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -262,8 +276,8 @@ func (j *tierJournal) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (j *tierJournal) OpenRO() (io.ReadCloser, error) {
|
||||
file, err := os.Open(j.ReadOnlyPath())
|
||||
func (jd *tierDiskJournal) OpenRO() (io.ReadCloser, error) {
|
||||
file, err := os.Open(jd.ReadOnlyPath())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
type tierMemJournal struct {
|
||||
entries chan jentry
|
||||
}
|
||||
|
||||
func newTierMemJoural(nevents int) *tierMemJournal {
|
||||
return &tierMemJournal{
|
||||
entries: make(chan jentry, nevents),
|
||||
}
|
||||
}
|
||||
|
||||
func (j *tierMemJournal) processEntries(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case entry := <-j.entries:
|
||||
logger.LogIf(ctx, deleteObjectFromRemoteTier(ctx, entry.ObjName, entry.VersionID, entry.TierName))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (j *tierMemJournal) AddEntry(je jentry) error {
|
||||
select {
|
||||
case j.entries <- je:
|
||||
default:
|
||||
return fmt.Errorf("failed to remove tiered content at %s with version %s from tier %s, will be retried later.",
|
||||
je.ObjName, je.VersionID, je.TierName)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -39,6 +39,23 @@ func (v versionsSorter) sort() {
|
|||
}
|
||||
|
||||
func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) {
|
||||
fivs, err := getAllFileInfoVersions(xlMetaBuf, volume, path)
|
||||
if err != nil {
|
||||
return fivs, err
|
||||
}
|
||||
n := 0
|
||||
for _, fi := range fivs.Versions {
|
||||
// Filter our tier object delete marker
|
||||
if !fi.TierFreeVersion() {
|
||||
fivs.Versions[n] = fi
|
||||
n++
|
||||
}
|
||||
}
|
||||
fivs.Versions = fivs.Versions[:n]
|
||||
return fivs, nil
|
||||
}
|
||||
|
||||
func getAllFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersions, error) {
|
||||
if isXL2V1Format(xlMetaBuf) {
|
||||
var xlMeta xlMetaV2
|
||||
if err := xlMeta.Load(xlMetaBuf); err != nil {
|
||||
|
|
|
@ -102,6 +102,11 @@ func isXL2V1Format(buf []byte) bool {
|
|||
//
|
||||
// The most recently updated element in the array is considered the latest version.
|
||||
|
||||
// In addition to these we have a special kind called free-version. This is represented
|
||||
// using a delete-marker and MetaSys entries. It's used to track tiered content of a
|
||||
// deleted/overwritten version. This version is visible _only_to the scanner routine, for subsequent deletion.
|
||||
// This kind of tracking is necessary since a version's tiered content is deleted asynchronously.
|
||||
|
||||
// Backend directory tree structure:
|
||||
// disk1/
|
||||
// └── bucket
|
||||
|
@ -856,8 +861,17 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
|||
ventry.ObjectV2.PartActualSizes[i] = fi.Parts[i].ActualSize
|
||||
}
|
||||
|
||||
tierFVIDKey := ReservedMetadataPrefixLower + tierFVID
|
||||
tierFVMarkerKey := ReservedMetadataPrefixLower + tierFVMarker
|
||||
for k, v := range fi.Metadata {
|
||||
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
||||
// Skip tierFVID, tierFVMarker keys; it's used
|
||||
// only for creating free-version.
|
||||
switch k {
|
||||
case tierFVIDKey, tierFVMarkerKey:
|
||||
continue
|
||||
}
|
||||
|
||||
ventry.ObjectV2.MetaSys[k] = []byte(v)
|
||||
} else {
|
||||
ventry.ObjectV2.MetaUser[k] = v
|
||||
|
@ -942,6 +956,13 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error)
|
|||
fi.VersionPurgeStatus = VersionPurgeStatusType(string(v))
|
||||
}
|
||||
}
|
||||
if j.FreeVersion() {
|
||||
fi.SetTierFreeVersion()
|
||||
fi.TransitionTier = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionTier])
|
||||
fi.TransitionedObjName = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionedObjectName])
|
||||
fi.TransitionVersionID = string(j.MetaSys[ReservedMetadataPrefixLower+TransitionedVersionID])
|
||||
}
|
||||
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
|
@ -1198,6 +1219,12 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
|
|||
z.Versions[i].ObjectV2.MetaSys[ReservedMetadataPrefixLower+TransitionTier] = []byte(fi.TransitionTier)
|
||||
default:
|
||||
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
|
||||
// if uv has tiered content we add a
|
||||
// free-version to track it for
|
||||
// asynchronous deletion via scanner.
|
||||
if freeVersion, toFree := version.ObjectV2.InitFreeVersion(fi); toFree {
|
||||
z.Versions = append(z.Versions, freeVersion)
|
||||
}
|
||||
}
|
||||
|
||||
if fi.Deleted {
|
||||
|
@ -1311,6 +1338,17 @@ func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
|
|||
|
||||
orderedVersions := make([]xlMetaV2Version, len(z.Versions))
|
||||
copy(orderedVersions, z.Versions)
|
||||
n := 0
|
||||
for _, version := range orderedVersions {
|
||||
// skip listing free-version unless explicitly requested via versionID
|
||||
if version.FreeVersion() && version.DeleteMarker.VersionID != uv {
|
||||
continue
|
||||
}
|
||||
orderedVersions[n] = version
|
||||
n++
|
||||
|
||||
}
|
||||
orderedVersions = orderedVersions[:n]
|
||||
|
||||
sort.Slice(orderedVersions, func(i, j int) bool {
|
||||
mtime1 := getModTimeFromVersion(orderedVersions[i])
|
||||
|
@ -1340,7 +1378,7 @@ func (z xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
|
|||
for i := range orderedVersions {
|
||||
switch orderedVersions[i].Type {
|
||||
case ObjectType:
|
||||
if bytes.Equal(orderedVersions[i].ObjectV2.VersionID[:], uv[:]) {
|
||||
if orderedVersions[i].ObjectV2.VersionID == uv {
|
||||
fi, err = orderedVersions[i].ObjectV2.ToFileInfo(volume, path)
|
||||
foundIndex = i
|
||||
break
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
)
|
||||
|
||||
const freeVersion = "free-version"
|
||||
|
||||
// InitFreeVersion creates a free-version to track the tiered-content of j. If j has
|
||||
// no tiered content, it returns false.
|
||||
func (j xlMetaV2Object) InitFreeVersion(fi FileInfo) (xlMetaV2Version, bool) {
|
||||
if status, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok && bytes.Equal(status, []byte(lifecycle.TransitionComplete)) {
|
||||
vID, err := uuid.Parse(fi.TierFreeVersionID())
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Invalid Tier Object delete marker versionId %s %v", fi.TierFreeVersionID(), err))
|
||||
}
|
||||
freeEntry := xlMetaV2Version{Type: DeleteType}
|
||||
freeEntry.DeleteMarker = &xlMetaV2DeleteMarker{
|
||||
VersionID: vID,
|
||||
ModTime: j.ModTime, // fi.ModTime may be empty
|
||||
MetaSys: make(map[string][]byte),
|
||||
}
|
||||
|
||||
freeEntry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+freeVersion] = []byte{}
|
||||
tierKey := ReservedMetadataPrefixLower + TransitionTier
|
||||
tierObjKey := ReservedMetadataPrefixLower + TransitionedObjectName
|
||||
tierObjVIDKey := ReservedMetadataPrefixLower + TransitionedVersionID
|
||||
|
||||
for k, v := range j.MetaSys {
|
||||
switch k {
|
||||
case tierKey, tierObjKey, tierObjVIDKey:
|
||||
freeEntry.DeleteMarker.MetaSys[k] = v
|
||||
}
|
||||
}
|
||||
return freeEntry, true
|
||||
}
|
||||
return xlMetaV2Version{}, false
|
||||
}
|
||||
|
||||
// FreeVersion returns true if j represents a free-version, false otherwise.
|
||||
func (j xlMetaV2DeleteMarker) FreeVersion() bool {
|
||||
_, ok := j.MetaSys[ReservedMetadataPrefixLower+freeVersion]
|
||||
return ok
|
||||
}
|
||||
|
||||
// FreeVersion returns true if j represents a free-version, false otherwise.
|
||||
func (j xlMetaV2Version) FreeVersion() bool {
|
||||
switch j.Type {
|
||||
case DeleteType:
|
||||
return j.DeleteMarker.FreeVersion()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// AddFreeVersion adds a free-version if needed for fi.VersionID version.
|
||||
// Free-version will be added if fi.VersionID has transitioned.
|
||||
func (z *xlMetaV2) AddFreeVersion(fi FileInfo) error {
|
||||
var uv uuid.UUID
|
||||
var err error
|
||||
switch fi.VersionID {
|
||||
case "", nullVersionID:
|
||||
default:
|
||||
uv, err = uuid.Parse(fi.VersionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, version := range z.Versions {
|
||||
switch version.Type {
|
||||
case ObjectType:
|
||||
if version.ObjectV2.VersionID == uv {
|
||||
// if uv has tiered content we add a
|
||||
// free-version to track it for asynchronous
|
||||
// deletion via scanner.
|
||||
if freeVersion, toFree := version.ObjectV2.InitFreeVersion(fi); toFree {
|
||||
z.Versions = append(z.Versions, freeVersion)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
)
|
||||
|
||||
func (z xlMetaV2) listFreeVersions(volume, path string) ([]FileInfo, error) {
|
||||
fivs, _, err := z.ListVersions(volume, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
n := 0
|
||||
for _, fiv := range fivs {
|
||||
if fiv.TierFreeVersion() {
|
||||
fivs[n] = fiv
|
||||
n++
|
||||
}
|
||||
}
|
||||
fivs = fivs[:n]
|
||||
return fivs, nil
|
||||
}
|
||||
|
||||
func TestFreeVersion(t *testing.T) {
|
||||
// Add a version with tiered content, one with local content
|
||||
xl := xlMetaV2{}
|
||||
fi := FileInfo{
|
||||
Volume: "volume",
|
||||
Name: "object-name",
|
||||
VersionID: "00000000-0000-0000-0000-000000000001",
|
||||
IsLatest: true,
|
||||
Deleted: false,
|
||||
TransitionStatus: "",
|
||||
DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef",
|
||||
XLV1: false,
|
||||
ModTime: time.Now(),
|
||||
Size: 0,
|
||||
Mode: 0,
|
||||
Metadata: nil,
|
||||
Parts: nil,
|
||||
Erasure: ErasureInfo{
|
||||
Algorithm: ReedSolomon.String(),
|
||||
DataBlocks: 4,
|
||||
ParityBlocks: 2,
|
||||
BlockSize: 10000,
|
||||
Index: 1,
|
||||
Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8},
|
||||
Checksums: []ChecksumInfo{{
|
||||
PartNumber: 1,
|
||||
Algorithm: HighwayHash256S,
|
||||
Hash: nil,
|
||||
}},
|
||||
},
|
||||
MarkDeleted: false,
|
||||
DeleteMarkerReplicationStatus: "",
|
||||
VersionPurgeStatus: "",
|
||||
NumVersions: 1,
|
||||
SuccessorModTime: time.Time{},
|
||||
}
|
||||
// Add a version with local content
|
||||
xl.AddVersion(fi)
|
||||
|
||||
// Add null version with tiered content
|
||||
tierfi := fi
|
||||
tierfi.VersionID = ""
|
||||
xl.AddVersion(tierfi)
|
||||
tierfi.TransitionStatus = lifecycle.TransitionComplete
|
||||
tierfi.TransitionedObjName = mustGetUUID()
|
||||
tierfi.TransitionTier = "MINIOTIER-1"
|
||||
xl.DeleteVersion(tierfi)
|
||||
|
||||
fvIDs := []string{
|
||||
"00000000-0000-0000-0000-0000000000f1",
|
||||
"00000000-0000-0000-0000-0000000000f2",
|
||||
}
|
||||
// Simulate overwrite of null version
|
||||
newtierfi := tierfi
|
||||
newtierfi.SetTierFreeVersionID(fvIDs[0])
|
||||
xl.AddFreeVersion(newtierfi)
|
||||
xl.AddVersion(newtierfi)
|
||||
|
||||
// Simulate removal of null version
|
||||
newtierfi.TransitionTier = ""
|
||||
newtierfi.TransitionedObjName = ""
|
||||
newtierfi.TransitionStatus = ""
|
||||
newtierfi.SetTierFreeVersionID(fvIDs[1])
|
||||
xl.DeleteVersion(newtierfi)
|
||||
|
||||
// Check number of free-versions
|
||||
freeVersions, err := xl.listFreeVersions(newtierfi.Volume, newtierfi.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list free versions %v", err)
|
||||
}
|
||||
if len(freeVersions) != 2 {
|
||||
t.Fatalf("Expected two free versions but got %d", len(freeVersions))
|
||||
}
|
||||
|
||||
// Simulate scanner removing free-version
|
||||
freefi := newtierfi
|
||||
for _, fvID := range fvIDs {
|
||||
freefi.VersionID = fvID
|
||||
xl.DeleteVersion(freefi)
|
||||
}
|
||||
|
||||
// Check number of free-versions
|
||||
freeVersions, err = xl.listFreeVersions(newtierfi.Volume, newtierfi.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list free versions %v", err)
|
||||
}
|
||||
if len(freeVersions) != 0 {
|
||||
t.Fatalf("Expected zero free version but got %d", len(freeVersions))
|
||||
}
|
||||
|
||||
// Adding a free version to a version with no tiered content.
|
||||
newfi := fi
|
||||
newfi.SetTierFreeVersionID("00000000-0000-0000-0000-0000000000f3")
|
||||
xl.AddFreeVersion(newfi) // this shouldn't add a free-version
|
||||
|
||||
// Check number of free-versions
|
||||
freeVersions, err = xl.listFreeVersions(newtierfi.Volume, newtierfi.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list free versions %v", err)
|
||||
}
|
||||
if len(freeVersions) != 0 {
|
||||
t.Fatalf("Expected zero free version but got %d", len(freeVersions))
|
||||
}
|
||||
}
|
|
@ -468,7 +468,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
|
|||
// Remove filename which is the meta file.
|
||||
item.transformMetaDir()
|
||||
|
||||
fivs, err := getFileInfoVersions(buf, item.bucket, item.objectPath())
|
||||
fivs, err := getAllFileInfoVersions(buf, item.bucket, item.objectPath())
|
||||
if err != nil {
|
||||
if intDataUpdateTracker.debug {
|
||||
console.Debugf(color.Green("scannerBucket:")+" reading xl.meta failed: %v: %w\n", item.Path, err)
|
||||
|
@ -2058,6 +2058,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
|||
xlMeta.data.remove(nullVersionID, ofi.DataDir)
|
||||
}
|
||||
}
|
||||
// Empty fi.VersionID indicates that versioning is either
|
||||
// suspended or disabled on this bucket. RenameData will replace
|
||||
// the 'null' version. We add a free-version to track its tiered
|
||||
// content for asynchronous deletion.
|
||||
xlMeta.AddFreeVersion(fi)
|
||||
}
|
||||
|
||||
if err = xlMeta.AddVersion(fi); err != nil {
|
||||
|
|
1
go.mod
1
go.mod
|
@ -38,6 +38,7 @@ require (
|
|||
github.com/klauspost/readahead v1.3.1
|
||||
github.com/klauspost/reedsolomon v1.9.11
|
||||
github.com/lib/pq v1.9.0
|
||||
github.com/mattn/go-runewidth v0.0.13 // indirect
|
||||
github.com/miekg/dns v1.1.35
|
||||
github.com/minio/cli v1.22.0
|
||||
github.com/minio/console v0.7.5-0.20210628223511-b51d5505f375
|
||||
|
|
5
go.sum
5
go.sum
|
@ -974,8 +974,9 @@ github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
|
|||
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
|
||||
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
|
||||
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
|
||||
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
|
||||
github.com/mattn/go-shellwords v1.0.10/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
|
||||
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
|
@ -1247,6 +1248,8 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
|
|||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
|
||||
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8=
|
||||
github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM=
|
||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||
|
|
Loading…
Reference in New Issue