Add support for batch key rotation (#16844)

This commit is contained in:
Poorna
2023-04-04 10:56:54 -07:00
committed by GitHub
parent 51f7f9aaa3
commit 3158f2d12e
8 changed files with 3724 additions and 64 deletions

View File

@@ -249,6 +249,7 @@ type BatchJobRequest struct {
Started time.Time `yaml:"-" json:"started"`
Location string `yaml:"-" json:"location"`
Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"`
KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"`
ctx context.Context `msg:"-"`
}
@@ -570,7 +571,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
}()
}
wk.Wait()
@@ -581,7 +582,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job.Location))
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
@@ -765,14 +766,34 @@ const (
)
func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobRequest) error {
data, err := readConfig(ctx, api, pathJoin(job.Location, batchReplName))
var fileName string
var format, version uint16
switch {
case job.Replicate != nil:
fileName = batchReplName
version = batchReplVersionV1
format = batchReplFormat
case job.KeyRotate != nil:
fileName = batchKeyRotationName
version = batchKeyRotateVersionV1
format = batchKeyRotationFormat
}
data, err := readConfig(ctx, api, pathJoin(job.Location, fileName))
if err != nil {
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
ri.Version = batchReplVersionV1
if job.Replicate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts
} else {
ri.Version = int(version)
switch {
case job.Replicate != nil:
ri.RetryAttempts = batchReplJobDefaultRetries
if job.Replicate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.Replicate.Flags.Retry.Attempts
}
case job.KeyRotate != nil:
ri.RetryAttempts = batchKeyRotateJobDefaultRetries
if job.KeyRotate.Flags.Retry.Attempts > 0 {
ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts
}
}
return nil
}
@@ -783,18 +804,18 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
return nil
}
if len(data) <= 4 {
return fmt.Errorf("batchRepl: no data")
return fmt.Errorf("%s: no data", ri.JobType)
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case batchReplFormat:
case format:
default:
return fmt.Errorf("batchRepl: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
return fmt.Errorf("%s: unknown format: %d", ri.JobType, binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case batchReplVersion:
case version:
default:
return fmt.Errorf("batchRepl: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
return fmt.Errorf("%s: unknown version: %d", ri.JobType, binary.LittleEndian.Uint16(data[2:4]))
}
ri.mu.Lock()
@@ -808,7 +829,7 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR
switch ri.Version {
case batchReplVersionV1:
default:
return fmt.Errorf("unexpected batch repl meta version: %d", ri.Version)
return fmt.Errorf("unexpected batch %s meta version: %d", ri.JobType, ri.Version)
}
return nil
@@ -857,31 +878,51 @@ func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
}
}
func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, duration time.Duration, jobLocation string) error {
func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, duration time.Duration, job BatchJobRequest) error {
if ri == nil {
return errInvalidArgument
}
now := UTCNow()
ri.mu.Lock()
var (
format, version uint16
jobTyp, fileName string
)
if now.Sub(ri.LastUpdate) >= duration {
switch job.Type() {
case madmin.BatchJobReplicate:
format = batchReplFormat
version = batchReplVersion
jobTyp = string(job.Type())
fileName = batchReplName
ri.Version = batchReplVersionV1
case madmin.BatchJobKeyRotate:
format = batchKeyRotationFormat
version = batchKeyRotateVersion
jobTyp = string(job.Type())
fileName = batchKeyRotationName
ri.Version = batchKeyRotateVersionV1
default:
return errInvalidArgument
}
if serverDebugLog {
console.Debugf("batchReplicate: persisting batchReplication info on drive: threshold:%s, batchRepl:%#v\n", now.Sub(ri.LastUpdate), ri)
console.Debugf("%s: persisting info on drive: threshold:%s, %s:%#v\n", jobTyp, now.Sub(ri.LastUpdate), jobTyp, ri)
}
ri.LastUpdate = now
ri.Version = batchReplVersionV1
data := make([]byte, 4, ri.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], batchReplFormat)
binary.LittleEndian.PutUint16(data[2:4], batchReplVersion)
binary.LittleEndian.PutUint16(data[0:2], format)
binary.LittleEndian.PutUint16(data[2:4], version)
buf, err := ri.MarshalMsg(data)
ri.mu.Unlock()
if err != nil {
return err
}
return saveConfig(ctx, api, pathJoin(jobLocation, batchReplName), buf)
return saveConfig(ctx, api, pathJoin(job.Location, fileName), buf)
}
ri.mu.Unlock()
return nil
@@ -1055,7 +1096,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
ri.trackCurrentBucketObject(r.Source.Bucket, result, success)
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job.Location))
logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
}()
}
wk.Wait()
@@ -1066,7 +1107,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk.
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job.Location))
logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job))
buf, _ := json.Marshal(ri)
if err := r.Notify(ctx, bytes.NewReader(buf)); err != nil {
@@ -1240,8 +1281,11 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest,
// Type returns type of batch job, currently only supports 'replicate'
func (j BatchJobRequest) Type() madmin.BatchJobType {
if j.Replicate != nil {
switch {
case j.Replicate != nil:
return madmin.BatchJobReplicate
case j.KeyRotate != nil:
return madmin.BatchJobKeyRotate
}
return madmin.BatchJobType("unknown")
}
@@ -1249,20 +1293,28 @@ func (j BatchJobRequest) Type() madmin.BatchJobType {
// Validate validates the current job, used by 'save()' before
// persisting the job request
func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error {
if j.Replicate != nil {
switch {
case j.Replicate != nil:
return j.Replicate.Validate(ctx, j, o)
case j.KeyRotate != nil:
return j.KeyRotate.Validate(ctx, j, o)
}
return errInvalidArgument
}
func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) {
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
switch {
case j.Replicate != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchReplName))
case j.KeyRotate != nil:
deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName))
}
globalBatchJobsMetrics.delete(j.ID)
deleteConfig(ctx, api, j.Location)
}
func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error {
if j.Replicate == nil {
if j.Replicate == nil && j.KeyRotate == nil {
return errInvalidArgument
}
@@ -1524,6 +1576,10 @@ func (j *BatchJobPool) resume() {
return
}
for result := range results {
// ignore batch-replicate.bin and batch-rotate.bin entries
if strings.HasSuffix(result.Name, slashSeparator) {
continue
}
req := &BatchJobRequest{}
if err := req.load(ctx, j.objLayer, result.Name); err != nil {
logger.LogIf(ctx, err)
@@ -1570,6 +1626,14 @@ func (j *BatchJobPool) AddWorker() {
}
}
}
if job.KeyRotate != nil {
if err := job.KeyRotate.Start(job.ctx, j.objLayer, *job); err != nil {
if !isErrBucketNotFound(err) {
logger.LogIf(j.ctx, err)
continue
}
}
}
job.delete(j.ctx, j.objLayer)
j.canceler(job.ID, false)
case <-j.workerKillCh:
@@ -1645,25 +1709,32 @@ var globalBatchJobsMetrics = batchJobMetrics{
metrics: make(map[string]*batchJobInfo),
}
//msgp:ignore batchReplicationMetric
//go:generate stringer -type=batchReplicationMetric -trimprefix=batchReplicationMetric $GOFILE
type batchReplicationMetric uint8
//msgp:ignore batchJobMetric
//go:generate stringer -type=batchJobMetric -trimprefix=batchJobMetric $GOFILE
type batchJobMetric uint8
const (
batchReplicationMetricObject batchReplicationMetric = iota
batchReplicationMetricObject batchJobMetric = iota
batchKeyRotationMetricObject
)
func batchReplicationTrace(d batchReplicationMetric, job string, startTime time.Time, duration time.Duration, info ObjectInfo, attempts int, err error) madmin.TraceInfo {
func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info ObjectInfo, attempts int, err error) madmin.TraceInfo {
var errStr string
if err != nil {
errStr = err.Error()
}
funcName := fmt.Sprintf("batchReplication.%s (job-name=%s)", d.String(), job)
jobKind := "batchReplication"
traceType := madmin.TraceBatchReplication
if d == batchKeyRotationMetricObject {
jobKind = "batchKeyRotation"
traceType = madmin.TraceBatchKeyRotation
}
funcName := fmt.Sprintf("%s.%s (job-name=%s)", jobKind, d.String(), job)
if attempts > 0 {
funcName = fmt.Sprintf("batchReplication.%s (job-name=%s,attempts=%s)", d.String(), job, humanize.Ordinal(attempts))
funcName = fmt.Sprintf("%s.%s (job-name=%s,attempts=%s)", jobKind, d.String(), job, humanize.Ordinal(attempts))
}
return madmin.TraceInfo{
TraceType: madmin.TraceBatchReplication,
TraceType: traceType,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: funcName,
@@ -1695,6 +1766,12 @@ func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics)
BytesTransferred: job.BytesTransferred,
BytesFailed: job.BytesFailed,
},
KeyRotate: &madmin.KeyRotationInfo{
Bucket: job.Bucket,
Object: job.Object,
Objects: job.Objects,
ObjectsFailed: job.ObjectsFailed,
},
}
if match {
break
@@ -1717,12 +1794,19 @@ func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) {
m.metrics[jobID] = ri.clone()
}
func (m *batchJobMetrics) trace(d batchReplicationMetric, job string, attempts int, info ObjectInfo) func(err error) {
func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int, info ObjectInfo) func(err error) {
startTime := time.Now()
return func(err error) {
duration := time.Since(startTime)
if globalTrace.NumSubscribers(madmin.TraceBatchReplication) > 0 {
globalTrace.Publish(batchReplicationTrace(d, job, startTime, duration, info, attempts, err))
switch d {
case batchReplicationMetricObject:
if globalTrace.NumSubscribers(madmin.TraceBatchReplication) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
case batchKeyRotationMetricObject:
if globalTrace.NumSubscribers(madmin.TraceBatchKeyRotation) > 0 {
globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err))
}
}
}
}