Implement batch-expiration for objects (#17946)

Based on an initial PR from -
https://github.com/minio/minio/pull/17792

But fully completes it with newer finalized YAML spec.
This commit is contained in:
Krishnan Parthasarathi
2023-12-02 02:51:33 -08:00
committed by GitHub
parent 69294cf98a
commit a50f26b7f5
22 changed files with 3037 additions and 112 deletions

View File

@@ -42,6 +42,7 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/config/batch"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
@@ -54,6 +55,8 @@ import (
"gopkg.in/yaml.v2"
)
var globalBatchConfig batch.Config
// BatchJobRequest this is an internal data structure not for external consumption.
type BatchJobRequest struct {
ID string `yaml:"-" json:"name"`
@@ -62,6 +65,7 @@ type BatchJobRequest struct {
Location string `yaml:"-" json:"location"`
Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"`
KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"`
Expire *BatchJobExpire `yaml:"expire" json:"expire"`
ctx context.Context `msg:"-"`
}
@@ -431,23 +435,27 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
wk.Take()
go func() {
defer wk.Give()
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, oi)
stopFn := globalBatchJobsMetrics.trace(batchJobMetricReplication, job.ID, attempts)
success := true
if err := r.ReplicateFromSource(ctx, api, core, oi, retry); err != nil {
// object must be deleted concurrently, allow these failures but do not count them
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
return
}
stopFn(err)
stopFn(oi, err)
logger.LogIf(ctx, err)
success = false
} else {
stopFn(nil)
stopFn(oi, nil)
}
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
if wait := globalBatchConfig.ReplicationWait(); wait > 0 {
time.Sleep(wait)
}
}()
}
wk.Wait()
@@ -725,7 +733,12 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
fileName = batchKeyRotationName
version = batchKeyRotateVersionV1
format = batchKeyRotationFormat
case job.Expire != nil:
fileName = batchExpireName
version = batchExpireVersionV1
format = batchExpireFormat
default:
return errors.New("no supported batch job request specified")
}
data, err := readConfig(ctx, api, pathJoin(job.Location, fileName))
if err != nil {
@@ -742,6 +755,11 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
if job.KeyRotate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts
}
case job.Expire != nil:
ri.RetryAttempts = batchExpireJobDefaultRetries
if job.Expire.Retry.Attempts > 0 {
ri.RetryAttempts = job.Expire.Retry.Attempts
}
}
return nil
}
@@ -851,6 +869,12 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
jobTyp = string(job.Type())
fileName = batchKeyRotationName
ri.Version = batchKeyRotateVersionV1
case madmin.BatchJobExpire:
format = batchExpireFormat
version = batchExpireVersion
jobTyp = string(job.Type())
fileName = batchExpireName
ri.Version = batchExpireVersionV1
default:
return errInvalidArgument
}
@@ -876,7 +900,18 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
return nil
}
func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, failed bool) {
// Note: to be used only with batch jobs that affect multiple versions through
// a single action. e.g batch-expire has an option to expire all versions of an
// object which matches the given filters.
func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectInfo, success bool) {
if success {
ri.Objects += int64(info.NumVersions)
} else {
ri.ObjectsFailed += int64(info.NumVersions)
}
}
func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) {
if ri == nil {
return
}
@@ -886,7 +921,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,
ri.Bucket = bucket
ri.Object = info.Name
ri.countItem(info.Size, info.DeleteMarker, failed)
ri.countItem(info.Size, info.DeleteMarker, success)
}
// Start start the batch replication job, resumes if there was a pending job via "job.ID"
@@ -1115,7 +1150,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
go func() {
defer wk.Give()
stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result)
stopFn := globalBatchJobsMetrics.trace(batchJobMetricReplication, job.ID, attempts)
success := true
if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil {
if miniogo.ToErrorResponse(err).Code == "PreconditionFailed" {
@@ -1126,16 +1161,20 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if isErrVersionNotFound(err) || isErrObjectNotFound(err) {
return
}
stopFn(err)
stopFn(result, err)
logger.LogIf(ctx, err)
success = false
} else {
stopFn(nil)
stopFn(result, nil)
}
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
if wait := globalBatchConfig.ReplicationWait(); wait > 0 {
time.Sleep(wait)
}
}()
}
wk.Wait()
@@ -1340,6 +1379,8 @@ func (j BatchJobRequest) Type() madmin.BatchJobType {
return madmin.BatchJobReplicate
case j.KeyRotate != nil:
return madmin.BatchJobKeyRotate
case j.Expire != nil:
return madmin.BatchJobExpire
}
return madmin.BatchJobType("unknown")
}
@@ -1352,6 +1393,8 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error {
return j.Replicate.Validate(ctx, j, o)
case j.KeyRotate != nil:
return j.KeyRotate.Validate(ctx, j, o)
case j.Expire != nil:
return j.Expire.Validate(ctx, j, o)
}
return errInvalidArgument
}
@@ -1362,12 +1405,14 @@ func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
case j.KeyRotate != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName))
case j.Expire != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchExpireName))
}
deleteConfig(ctx, api, j.Location)
}
func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error {
if j.Replicate == nil && j.KeyRotate == nil {
if j.Replicate == nil && j.KeyRotate == nil && j.Expire == nil {
return errInvalidArgument
}
@@ -1692,7 +1737,8 @@ func (j *BatchJobPool) AddWorker() {
if !ok {
return
}
if job.Replicate != nil {
switch {
case job.Replicate != nil:
if job.Replicate.RemoteToLocal() {
if err := job.Replicate.StartFromSource(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
@@ -1712,14 +1758,20 @@ func (j *BatchJobPool) AddWorker() {
// Bucket not found proceed to delete such a job.
}
}
}
if job.KeyRotate != nil {
case job.KeyRotate != nil:
if err := job.KeyRotate.Start(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
continue
}
}
case job.Expire != nil:
if err := job.Expire.Start(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
continue
}
}
}
job.delete(j.ctx, j.objLayer)
j.canceler(job.ID, false)
@@ -1797,24 +1849,26 @@ type batchJobMetrics struct {
type batchJobMetric uint8
const (
batchReplicationMetricObject batchJobMetric = iota
batchKeyRotationMetricObject
batchJobMetricReplication batchJobMetric = iota
batchJobMetricKeyRotation
batchJobMetricExpire
)
func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info ObjectInfo, attempts int, err error) madmin.TraceInfo {
func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info objTraceInfoer, attempts int, err error) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
jobKind := "batchReplication"
traceType := madmin.TraceBatchReplication
if d == batchKeyRotationMetricObject {
jobKind = "batchKeyRotation"
switch d {
case batchJobMetricKeyRotation:
traceType = madmin.TraceBatchKeyRotation
case batchJobMetricExpire:
traceType = madmin.TraceBatchExpire
}
funcName := fmt.Sprintf("%s.%s (job-name=%s)", jobKind, d.String(), job)
funcName := fmt.Sprintf("%s() (job-name=%s)", d.String(), job)
if attempts > 0 {
funcName = fmt.Sprintf("%s.%s (job-name=%s,attempts=%s)", jobKind, d.String(), job, humanize.Ordinal(attempts))
funcName = fmt.Sprintf("%s() (job-name=%s,attempts=%s)", d.String(), job, humanize.Ordinal(attempts))
}
return madmin.TraceInfo{
TraceType: traceType,
@@ -1822,55 +1876,65 @@ func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration t
NodeName: globalLocalNodeName,
FuncName: funcName,
Duration: duration,
Path: info.Name,
Path: fmt.Sprintf("%s (versionID=%s)", info.TraceObjName(), info.TraceVersionID()),
Error: errStr,
}
}
func (ri *batchJobInfo) metric() madmin.JobMetric {
m := madmin.JobMetric{
JobID: ri.JobID,
JobType: ri.JobType,
StartTime: ri.StartTime,
LastUpdate: ri.LastUpdate,
RetryAttempts: ri.RetryAttempts,
Complete: ri.Complete,
Failed: ri.Failed,
}
switch ri.JobType {
case string(madmin.BatchJobReplicate):
m.Replicate = &madmin.ReplicateInfo{
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
}
case string(madmin.BatchJobKeyRotate):
m.KeyRotate = &madmin.KeyRotationInfo{
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
}
case string(madmin.BatchJobExpire):
m.Expired = &madmin.ExpirationInfo{
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
}
}
return m
}
func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics) {
metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)}
m.RLock()
defer m.RUnlock()
match := true
if jobID != "" {
if job, ok := m.metrics[jobID]; ok {
metrics.Jobs[jobID] = job.metric()
}
return metrics
}
for id, job := range m.metrics {
if jobID != "" {
match = id == jobID
}
if !match {
continue
}
m := madmin.JobMetric{
JobID: job.JobID,
JobType: job.JobType,
StartTime: job.StartTime,
LastUpdate: job.LastUpdate,
RetryAttempts: job.RetryAttempts,
Complete: job.Complete,
Failed: job.Failed,
}
switch job.JobType {
case string(madmin.BatchJobReplicate):
m.Replicate = &madmin.ReplicateInfo{
Bucket: job.Bucket,
Object: job.Object,
Objects: job.Objects,
ObjectsFailed: job.ObjectsFailed,
BytesTransferred: job.BytesTransferred,
BytesFailed: job.BytesFailed,
}
case string(madmin.BatchJobKeyRotate):
m.KeyRotate = &madmin.KeyRotationInfo{
Bucket: job.Bucket,
Object: job.Object,
Objects: job.Objects,
ObjectsFailed: job.ObjectsFailed,
}
}
metrics.Jobs[id] = m
metrics.Jobs[id] = job.metric()
}
return metrics
}
@@ -1915,19 +1979,52 @@ func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) {
m.metrics[jobID] = ri.clone()
}
func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int, info ObjectInfo) func(err error) {
type objTraceInfoer interface {
TraceObjName() string
TraceVersionID() string
}
// TraceObjName returns name of object being traced
func (td ObjectToDelete) TraceObjName() string {
return td.ObjectName
}
// TraceVersionID returns version-id of object being traced
func (td ObjectToDelete) TraceVersionID() string {
return td.VersionID
}
// TraceObjName returns name of object being traced
func (oi ObjectInfo) TraceObjName() string {
return oi.Name
}
// TraceVersionID returns version-id of object being traced
func (oi ObjectInfo) TraceVersionID() string {
return oi.VersionID
}
func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int) func(info objTraceInfoer, err error) {
startTime := time.Now()
return func(err error) {
return func(info objTraceInfoer, err error) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceBatch) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
return
}
switch d {
case batchReplicationMetricObject:
case batchJobMetricReplication:
if globalTrace.NumSubscribers(madmin.TraceBatchReplication) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
case batchKeyRotationMetricObject:
case batchJobMetricKeyRotation:
if globalTrace.NumSubscribers(madmin.TraceBatchKeyRotation) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
case batchJobMetricExpire:
if globalTrace.NumSubscribers(madmin.TraceBatchExpire) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
}
}
}