diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go new file mode 100644 index 000000000..064099294 --- /dev/null +++ b/cmd/batch-expire.go @@ -0,0 +1,677 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "runtime" + "strconv" + "time" + + "github.com/minio/minio-go/v7/pkg/tags" + "github.com/minio/minio/internal/bucket/versioning" + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/env" + "github.com/minio/pkg/wildcard" + "github.com/minio/pkg/workers" +) + +// expire: # Expire objects that match a condition +// apiVersion: v1 +// bucket: mybucket # Bucket where this batch job will expire matching objects from +// prefix: myprefix # (Optional) Prefix under which this job will expire objects matching the rules below. +// rules: +// - type: object # regular objects with zero ore more older versions +// name: NAME # match object names that satisfy the wildcard expression. +// olderThan: 70h # match objects older than this value +// createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date" +// tags: +// - key: name +// value: pick* # match objects with tag 'name', all values starting with 'pick' +// metadata: +// - key: content-type +// value: image/* # match objects with 'content-type', all values starting with 'image/' +// size: +// lessThan: "10MiB" # match objects with size less than this value (e.g. 10MiB) +// greaterThan: 1MiB # match objects with size greater than this value (e.g. 1MiB) +// purge: +// # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest. +// # retainVersions: 5 # keep the latest 5 versions of the object. +// +// - type: deleted # objects with delete marker as their latest version +// name: NAME # match object names that satisfy the wildcard expression. +// olderThan: 10h # match objects older than this value (e.g. 7d10h31s) +// createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date" +// purge: +// # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest. +// # retainVersions: 5 # keep the latest 5 versions of the object including delete markers. +// +// notify: +// endpoint: https://notify.endpoint # notification endpoint to receive job completion status +// token: Bearer xxxxx # optional authentication token for the notification endpoint +// +// retry: +// attempts: 10 # number of retries for the job before giving up +// delay: 500ms # least amount of delay between each retry + +//go:generate msgp -file $GOFILE + +// BatchJobExpirePurge type accepts non-negative versions to be retained +type BatchJobExpirePurge struct { + RetainVersions int `yaml:"retainVersions" json:"retainVersions"` +} + +// Validate returns nil if value is valid, ie > 0. +func (p BatchJobExpirePurge) Validate() error { + if p.RetainVersions < 0 { + return errors.New("retainVersions must be >= 0") + } + return nil +} + +// BatchJobExpireFilter holds all the filters currently supported for batch replication +type BatchJobExpireFilter struct { + OlderThan time.Duration `yaml:"olderThan,omitempty" json:"olderThan"` + CreatedBefore *time.Time `yaml:"createdBefore,omitempty" json:"createdBefore"` + Tags []BatchJobKV `yaml:"tags,omitempty" json:"tags"` + Metadata []BatchJobKV `yaml:"metadata,omitempty" json:"metadata"` + Size BatchJobSizeFilter `yaml:"size" json:"size"` + Type string `yaml:"type" json:"type"` + Name string `yaml:"name" json:"name"` + Purge BatchJobExpirePurge `yaml:"purge" json:"purge"` +} + +// Matches returns true if obj matches the filter conditions specified in ef. +func (ef BatchJobExpireFilter) Matches(obj ObjectInfo, now time.Time) bool { + switch ef.Type { + case BatchJobExpireObject: + if obj.DeleteMarker { + return false + } + case BatchJobExpireDeleted: + if !obj.DeleteMarker { + return false + } + default: + // we should never come here, Validate should have caught this. + logger.LogOnceIf(context.Background(), fmt.Errorf("invalid filter type: %s", ef.Type), ef.Type) + return false + } + + if len(ef.Name) > 0 && !wildcard.Match(ef.Name, obj.Name) { + return false + } + if ef.OlderThan > 0 && now.Sub(obj.ModTime) <= ef.OlderThan { + return false + } + + if ef.CreatedBefore != nil && !obj.ModTime.Before(*ef.CreatedBefore) { + return false + } + + if len(ef.Tags) > 0 && !obj.DeleteMarker { + // Only parse object tags if tags filter is specified. + var tagMap map[string]string + if len(obj.UserTags) != 0 { + t, err := tags.ParseObjectTags(obj.UserTags) + if err != nil { + return false + } + tagMap = t.ToMap() + } + + for _, kv := range ef.Tags { + // Object (version) must match all tags specified in + // the filter + var match bool + for t, v := range tagMap { + if kv.Match(BatchJobKV{Key: t, Value: v}) { + match = true + } + } + if !match { + return false + } + } + + } + if len(ef.Metadata) > 0 && !obj.DeleteMarker { + for _, kv := range ef.Metadata { + // Object (version) must match all x-amz-meta and + // standard metadata headers + // specified in the filter + var match bool + for k, v := range obj.UserDefined { + if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) { + continue + } + // We only need to match x-amz-meta or standardHeaders + if kv.Match(BatchJobKV{Key: k, Value: v}) { + match = true + } + } + if !match { + return false + } + } + } + + return ef.Size.InRange(obj.Size) +} + +const ( + // BatchJobExpireObject - object type + BatchJobExpireObject string = "object" + // BatchJobExpireDeleted - delete marker type + BatchJobExpireDeleted string = "deleted" +) + +// Validate returns nil if ef has valid fields, validation error otherwise. +func (ef BatchJobExpireFilter) Validate() error { + switch ef.Type { + case BatchJobExpireObject: + case BatchJobExpireDeleted: + if len(ef.Tags) > 0 || len(ef.Metadata) > 0 { + return errors.New("invalid batch-expire rule filter") + } + default: + return errors.New("invalid batch-expire type") + } + + for _, tag := range ef.Tags { + if err := tag.Validate(); err != nil { + return err + } + } + + for _, meta := range ef.Metadata { + if err := meta.Validate(); err != nil { + return err + } + } + if err := ef.Purge.Validate(); err != nil { + return err + } + if err := ef.Size.Validate(); err != nil { + return err + } + if ef.CreatedBefore != nil && !ef.CreatedBefore.Before(time.Now()) { + return errors.New("CreatedBefore is in the future") + } + return nil +} + +// BatchJobExpire represents configuration parameters for a batch expiration +// job typically supplied in yaml form +type BatchJobExpire struct { + APIVersion string `yaml:"apiVersion" json:"apiVersion"` + Bucket string `yaml:"bucket" json:"bucket"` + Prefix string `yaml:"prefix" json:"prefix"` + NotificationCfg BatchJobNotification `yaml:"notify" json:"notify"` + Retry BatchJobRetry `yaml:"retry" json:"retry"` + Rules []BatchJobExpireFilter `yaml:"rules" json:"rules"` +} + +// Notify notifies notification endpoint if configured regarding job failure or success. +func (r BatchJobExpire) Notify(ctx context.Context, body io.Reader) error { + if r.NotificationCfg.Endpoint == "" { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.NotificationCfg.Endpoint, body) + if err != nil { + return err + } + + if r.NotificationCfg.Token != "" { + req.Header.Set("Authorization", r.NotificationCfg.Token) + } + + clnt := http.Client{Transport: getRemoteInstanceTransport} + resp, err := clnt.Do(req) + if err != nil { + return err + } + + xhttp.DrainBody(resp.Body) + if resp.StatusCode != http.StatusOK { + return errors.New(resp.Status) + } + + return nil +} + +// Expire expires object versions which have already matched supplied filter conditions +func (r *BatchJobExpire) Expire(ctx context.Context, api ObjectLayer, vc *versioning.Versioning, objsToDel []ObjectToDelete) []error { + opts := ObjectOptions{ + PrefixEnabledFn: vc.PrefixEnabled, + VersionSuspended: vc.Suspended(), + } + _, errs := api.DeleteObjects(ctx, r.Bucket, objsToDel, opts) + return errs +} + +const ( + batchExpireName = "batch-expire.bin" + batchExpireFormat = 1 + batchExpireVersionV1 = 1 + batchExpireVersion = batchExpireVersionV1 + batchExpireAPIVersion = "v1" + batchExpireJobDefaultRetries = 3 + batchExpireJobDefaultRetryDelay = 250 * time.Millisecond +) + +type objInfoCache map[string]*ObjectInfo + +func newObjInfoCache() objInfoCache { + return objInfoCache(make(map[string]*ObjectInfo)) +} + +func (oiCache objInfoCache) Add(toDel ObjectToDelete, oi *ObjectInfo) { + oiCache[fmt.Sprintf("%s-%s", toDel.ObjectName, toDel.VersionID)] = oi +} + +func (oiCache objInfoCache) Get(toDel ObjectToDelete) (*ObjectInfo, bool) { + oi, ok := oiCache[fmt.Sprintf("%s-%s", toDel.ObjectName, toDel.VersionID)] + return oi, ok +} + +func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo, job BatchJobRequest, api ObjectLayer, wk *workers.Workers, expireCh <-chan []expireObjInfo) { + vc, _ := globalBucketVersioningSys.Get(r.Bucket) + retryAttempts := r.Retry.Attempts + delay := job.Expire.Retry.Delay + if delay == 0 { + delay = batchExpireJobDefaultRetryDelay + } + + var i int + for toExpire := range expireCh { + select { + case <-ctx.Done(): + return + default: + } + if i > 0 { + if wait := globalBatchConfig.ExpirationWait(); wait > 0 { + time.Sleep(wait) + } + } + i++ + wk.Take() + go func(toExpire []expireObjInfo) { + defer wk.Give() + + toExpireAll := make([]ObjectInfo, 0, len(toExpire)) + toDel := make([]ObjectToDelete, 0, len(toExpire)) + oiCache := newObjInfoCache() + for _, exp := range toExpire { + if exp.ExpireAll { + toExpireAll = append(toExpireAll, exp.ObjectInfo) + continue + } + // Cache ObjectInfo value via pointers for + // subsequent use to track objects which + // couldn't be deleted. + od := ObjectToDelete{ + ObjectV: ObjectV{ + ObjectName: exp.Name, + VersionID: exp.VersionID, + }, + } + toDel = append(toDel, od) + oiCache.Add(od, &exp.ObjectInfo) + } + + var done bool + // DeleteObject(deletePrefix: true) to expire all versions of an object + for _, exp := range toExpireAll { + var success bool + for attempts := 1; attempts <= retryAttempts; attempts++ { + select { + case <-ctx.Done(): + done = true + default: + } + stopFn := globalBatchJobsMetrics.trace(batchJobMetricExpire, ri.JobID, attempts) + _, err := api.DeleteObject(ctx, exp.Bucket, exp.Name, ObjectOptions{ + DeletePrefix: true, + }) + if err != nil { + stopFn(exp, err) + logger.LogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", toExpire[i].Bucket, toExpire[i].Name, toExpire[i].VersionID, err, attempts)) + } else { + stopFn(exp, err) + success = true + break + } + } + ri.trackMultipleObjectVersions(r.Bucket, exp, success) + if done { + break + } + } + + if done { + return + } + + // DeleteMultiple objects + toDelCopy := make([]ObjectToDelete, len(toDel)) + for attempts := 1; attempts <= retryAttempts; attempts++ { + select { + case <-ctx.Done(): + return + default: + } + + stopFn := globalBatchJobsMetrics.trace(batchJobMetricExpire, ri.JobID, attempts) + // Copying toDel to select from objects whose + // deletion failed + copy(toDelCopy, toDel) + var failed int + errs := r.Expire(ctx, api, vc, toDel) + // reslice toDel in preparation for next retry + // attempt + toDel = toDel[:0] + for i, err := range errs { + if err != nil { + stopFn(toDelCopy[i], err) + logger.LogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID, err, attempts)) + failed++ + if attempts == retryAttempts { // all retry attempts failed, record failure + if oi, ok := oiCache.Get(toDelCopy[i]); ok { + ri.trackCurrentBucketObject(r.Bucket, *oi, false) + } + } else { + toDel = append(toDel, toDelCopy[i]) + } + } else { + stopFn(toDelCopy[i], nil) + if oi, ok := oiCache.Get(toDelCopy[i]); ok { + ri.trackCurrentBucketObject(r.Bucket, *oi, true) + } + } + } + + globalBatchJobsMetrics.save(ri.JobID, ri) + + if failed == 0 { + break + } + + // Add a delay between retry attempts + if attempts < retryAttempts { + time.Sleep(delay) + } + } + }(toExpire) + } +} + +type expireObjInfo struct { + ObjectInfo + ExpireAll bool +} + +// Start the batch expiration job, resumes if there was a pending job via "job.ID" +func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJobRequest) error { + ri := &batchJobInfo{ + JobID: job.ID, + JobType: string(job.Type()), + StartTime: job.Started, + } + if err := ri.load(ctx, api, job); err != nil { + return err + } + + globalBatchJobsMetrics.save(job.ID, ri) + lastObject := ri.Object + + now := time.Now().UTC() + + workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_EXPIRATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) + if err != nil { + return err + } + + wk, err := workers.New(workerSize) + if err != nil { + // invalid worker size. + return err + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + results := make(chan ObjectInfo, workerSize) + if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ + Marker: lastObject, + LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions + VersionsSort: WalkVersionsSortDesc, + }); err != nil { + // Do not need to retry if we can't list objects on source. + return err + } + + // Goroutine to periodically save batch-expire job's in-memory state + saverQuitCh := make(chan struct{}) + go func() { + saveTicker := time.NewTicker(10 * time.Second) + defer saveTicker.Stop() + for { + select { + case <-saveTicker.C: + // persist in-memory state to disk after every 10secs. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + + case <-ctx.Done(): + // persist in-memory state immediately before exiting due to context cancelation. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) + return + + case <-saverQuitCh: + // persist in-memory state immediately to disk. + logger.LogIf(ctx, ri.updateAfter(ctx, api, 0, job)) + return + } + } + }() + + expireCh := make(chan []expireObjInfo, workerSize) + go batchObjsForDelete(ctx, r, ri, job, api, wk, expireCh) + + var ( + prevObj ObjectInfo + matchedFilter BatchJobExpireFilter + versionsCount int + toDel []expireObjInfo + ) + for result := range results { + // Apply filter to find the matching rule to apply expiry + // actions accordingly. + // nolint:gocritic + if result.IsLatest { + // send down filtered entries to be deleted using + // DeleteObjects method + if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously. + xfer := make([]expireObjInfo, len(toDel)) + copy(xfer, toDel) + + var done bool + select { + case <-ctx.Done(): + done = true + case expireCh <- xfer: + toDel = toDel[:0] // resetting toDel + } + if done { + break + } + } + var match BatchJobExpireFilter + var found bool + for _, rule := range r.Rules { + if rule.Matches(result, now) { + match = rule + found = true + break + } + } + if !found { + continue + } + + prevObj = result + matchedFilter = match + versionsCount = 1 + // Include the latest version + if matchedFilter.Purge.RetainVersions == 0 { + toDel = append(toDel, expireObjInfo{ + ObjectInfo: result, + ExpireAll: true, + }) + continue + } + } else if prevObj.Name == result.Name { + if matchedFilter.Purge.RetainVersions == 0 { + continue // including latest version in toDel suffices, skipping other versions + } + versionsCount++ + } else { + continue + } + + if versionsCount <= matchedFilter.Purge.RetainVersions { + continue // retain versions + } + toDel = append(toDel, expireObjInfo{ + ObjectInfo: result, + }) + } + // Send any remaining objects downstream + if len(toDel) > 0 { + select { + case <-ctx.Done(): + case expireCh <- toDel: + } + } + close(expireCh) + + wk.Wait() // waits for all expire goroutines to complete + + ri.Complete = ri.ObjectsFailed == 0 + ri.Failed = ri.ObjectsFailed > 0 + globalBatchJobsMetrics.save(job.ID, ri) + + // Close the saverQuitCh - this also triggers saving in-memory state + // immediately one last time before we exit this method. + close(saverQuitCh) + + // Notify expire jobs final status to the configured endpoint + buf, _ := json.Marshal(ri) + if err := r.Notify(context.Background(), bytes.NewReader(buf)); err != nil { + logger.LogIf(context.Background(), fmt.Errorf("unable to notify %v", err)) + } + + return nil +} + +//msgp:ignore batchExpireJobError +type batchExpireJobError struct { + Code string + Description string + HTTPStatusCode int +} + +func (e batchExpireJobError) Error() string { + return e.Description +} + +// maxBatchRules maximum number of rules a batch-expiry job supports +const maxBatchRules = 50 + +// Validate validates the job definition input +func (r *BatchJobExpire) Validate(ctx context.Context, job BatchJobRequest, o ObjectLayer) error { + if r == nil { + return nil + } + + if r.APIVersion != batchExpireAPIVersion { + return batchExpireJobError{ + Code: "InvalidArgument", + Description: "Unsupported batch expire API version", + HTTPStatusCode: http.StatusBadRequest, + } + } + + if r.Bucket == "" { + return batchExpireJobError{ + Code: "InvalidArgument", + Description: "Bucket argument missing", + HTTPStatusCode: http.StatusBadRequest, + } + } + + if _, err := o.GetBucketInfo(ctx, r.Bucket, BucketOptions{}); err != nil { + if isErrBucketNotFound(err) { + return batchExpireJobError{ + Code: "NoSuchSourceBucket", + Description: "The specified source bucket does not exist", + HTTPStatusCode: http.StatusNotFound, + } + } + return err + } + + if len(r.Rules) > maxBatchRules { + return batchExpireJobError{ + Code: "InvalidArgument", + Description: "Too many rules. Batch expire job can't have more than 100 rules", + HTTPStatusCode: http.StatusBadRequest, + } + } + + for _, rule := range r.Rules { + if err := rule.Validate(); err != nil { + return batchExpireJobError{ + Code: "InvalidArgument", + Description: fmt.Sprintf("Invalid batch expire rule: %s", err), + HTTPStatusCode: http.StatusBadRequest, + } + } + } + + if err := r.Retry.Validate(); err != nil { + return batchExpireJobError{ + Code: "InvalidArgument", + Description: fmt.Sprintf("Invalid batch expire retry configuration: %s", err), + HTTPStatusCode: http.StatusBadRequest, + } + } + return nil +} diff --git a/cmd/batch-expire_gen.go b/cmd/batch-expire_gen.go new file mode 100644 index 000000000..12ce733a3 --- /dev/null +++ b/cmd/batch-expire_gen.go @@ -0,0 +1,856 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "time" + + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *BatchJobExpire) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "APIVersion": + z.APIVersion, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "APIVersion") + return + } + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "NotificationCfg": + err = z.NotificationCfg.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "NotificationCfg") + return + } + case "Retry": + err = z.Retry.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Retry") + return + } + case "Rules": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Rules") + return + } + if cap(z.Rules) >= int(zb0002) { + z.Rules = (z.Rules)[:zb0002] + } else { + z.Rules = make([]BatchJobExpireFilter, zb0002) + } + for za0001 := range z.Rules { + err = z.Rules[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Rules", za0001) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *BatchJobExpire) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "APIVersion" + err = en.Append(0x86, 0xaa, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteString(z.APIVersion) + if err != nil { + err = msgp.WrapError(err, "APIVersion") + return + } + // write "Bucket" + err = en.Append(0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Prefix" + err = en.Append(0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + if err != nil { + return + } + err = en.WriteString(z.Prefix) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + // write "NotificationCfg" + err = en.Append(0xaf, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x66, 0x67) + if err != nil { + return + } + err = z.NotificationCfg.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "NotificationCfg") + return + } + // write "Retry" + err = en.Append(0xa5, 0x52, 0x65, 0x74, 0x72, 0x79) + if err != nil { + return + } + err = z.Retry.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Retry") + return + } + // write "Rules" + err = en.Append(0xa5, 0x52, 0x75, 0x6c, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Rules))) + if err != nil { + err = msgp.WrapError(err, "Rules") + return + } + for za0001 := range z.Rules { + err = z.Rules[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Rules", za0001) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *BatchJobExpire) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "APIVersion" + o = append(o, 0x86, 0xaa, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + o = msgp.AppendString(o, z.APIVersion) + // string "Bucket" + o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Prefix" + o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) + o = msgp.AppendString(o, z.Prefix) + // string "NotificationCfg" + o = append(o, 0xaf, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x66, 0x67) + o, err = z.NotificationCfg.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "NotificationCfg") + return + } + // string "Retry" + o = append(o, 0xa5, 0x52, 0x65, 0x74, 0x72, 0x79) + o, err = z.Retry.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Retry") + return + } + // string "Rules" + o = append(o, 0xa5, 0x52, 0x75, 0x6c, 0x65, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Rules))) + for za0001 := range z.Rules { + o, err = z.Rules[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Rules", za0001) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobExpire) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "APIVersion": + z.APIVersion, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "APIVersion") + return + } + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Prefix": + z.Prefix, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Prefix") + return + } + case "NotificationCfg": + bts, err = z.NotificationCfg.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "NotificationCfg") + return + } + case "Retry": + bts, err = z.Retry.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Retry") + return + } + case "Rules": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Rules") + return + } + if cap(z.Rules) >= int(zb0002) { + z.Rules = (z.Rules)[:zb0002] + } else { + z.Rules = make([]BatchJobExpireFilter, zb0002) + } + for za0001 := range z.Rules { + bts, err = z.Rules[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Rules", za0001) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *BatchJobExpire) Msgsize() (s int) { + s = 1 + 11 + msgp.StringPrefixSize + len(z.APIVersion) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 16 + z.NotificationCfg.Msgsize() + 6 + z.Retry.Msgsize() + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Rules { + s += z.Rules[za0001].Msgsize() + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *BatchJobExpireFilter) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "OlderThan": + z.OlderThan, err = dc.ReadDuration() + if err != nil { + err = msgp.WrapError(err, "OlderThan") + return + } + case "CreatedBefore": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "CreatedBefore") + return + } + z.CreatedBefore = nil + } else { + if z.CreatedBefore == nil { + z.CreatedBefore = new(time.Time) + } + *z.CreatedBefore, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "CreatedBefore") + return + } + } + case "Tags": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if cap(z.Tags) >= int(zb0002) { + z.Tags = (z.Tags)[:zb0002] + } else { + z.Tags = make([]BatchJobKV, zb0002) + } + for za0001 := range z.Tags { + err = z.Tags[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + case "Metadata": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + if cap(z.Metadata) >= int(zb0003) { + z.Metadata = (z.Metadata)[:zb0003] + } else { + z.Metadata = make([]BatchJobKV, zb0003) + } + for za0002 := range z.Metadata { + err = z.Metadata[za0002].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Metadata", za0002) + return + } + } + case "Size": + err = z.Size.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + case "Type": + z.Type, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Type") + return + } + case "Name": + z.Name, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + case "Purge": + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Purge") + return + } + for zb0004 > 0 { + zb0004-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Purge") + return + } + switch msgp.UnsafeString(field) { + case "RetainVersions": + z.Purge.RetainVersions, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Purge", "RetainVersions") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Purge") + return + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *BatchJobExpireFilter) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 8 + // write "OlderThan" + err = en.Append(0x88, 0xa9, 0x4f, 0x6c, 0x64, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e) + if err != nil { + return + } + err = en.WriteDuration(z.OlderThan) + if err != nil { + err = msgp.WrapError(err, "OlderThan") + return + } + // write "CreatedBefore" + err = en.Append(0xad, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x42, 0x65, 0x66, 0x6f, 0x72, 0x65) + if err != nil { + return + } + if z.CreatedBefore == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteTime(*z.CreatedBefore) + if err != nil { + err = msgp.WrapError(err, "CreatedBefore") + return + } + } + // write "Tags" + err = en.Append(0xa4, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Tags))) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + for za0001 := range z.Tags { + err = z.Tags[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + // write "Metadata" + err = en.Append(0xa8, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Metadata))) + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + for za0002 := range z.Metadata { + err = z.Metadata[za0002].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Metadata", za0002) + return + } + } + // write "Size" + err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = z.Size.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + // write "Type" + err = en.Append(0xa4, 0x54, 0x79, 0x70, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Type) + if err != nil { + err = msgp.WrapError(err, "Type") + return + } + // write "Name" + err = en.Append(0xa4, 0x4e, 0x61, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Name) + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + // write "Purge" + err = en.Append(0xa5, 0x50, 0x75, 0x72, 0x67, 0x65) + if err != nil { + return + } + // map header, size 1 + // write "RetainVersions" + err = en.Append(0x81, 0xae, 0x52, 0x65, 0x74, 0x61, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteInt(z.Purge.RetainVersions) + if err != nil { + err = msgp.WrapError(err, "Purge", "RetainVersions") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *BatchJobExpireFilter) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 8 + // string "OlderThan" + o = append(o, 0x88, 0xa9, 0x4f, 0x6c, 0x64, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e) + o = msgp.AppendDuration(o, z.OlderThan) + // string "CreatedBefore" + o = append(o, 0xad, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x42, 0x65, 0x66, 0x6f, 0x72, 0x65) + if z.CreatedBefore == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendTime(o, *z.CreatedBefore) + } + // string "Tags" + o = append(o, 0xa4, 0x54, 0x61, 0x67, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Tags))) + for za0001 := range z.Tags { + o, err = z.Tags[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + // string "Metadata" + o = append(o, 0xa8, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61) + o = msgp.AppendArrayHeader(o, uint32(len(z.Metadata))) + for za0002 := range z.Metadata { + o, err = z.Metadata[za0002].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Metadata", za0002) + return + } + } + // string "Size" + o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65) + o, err = z.Size.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + // string "Type" + o = append(o, 0xa4, 0x54, 0x79, 0x70, 0x65) + o = msgp.AppendString(o, z.Type) + // string "Name" + o = append(o, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.Name) + // string "Purge" + o = append(o, 0xa5, 0x50, 0x75, 0x72, 0x67, 0x65) + // map header, size 1 + // string "RetainVersions" + o = append(o, 0x81, 0xae, 0x52, 0x65, 0x74, 0x61, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + o = msgp.AppendInt(o, z.Purge.RetainVersions) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobExpireFilter) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "OlderThan": + z.OlderThan, bts, err = msgp.ReadDurationBytes(bts) + if err != nil { + err = msgp.WrapError(err, "OlderThan") + return + } + case "CreatedBefore": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.CreatedBefore = nil + } else { + if z.CreatedBefore == nil { + z.CreatedBefore = new(time.Time) + } + *z.CreatedBefore, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "CreatedBefore") + return + } + } + case "Tags": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if cap(z.Tags) >= int(zb0002) { + z.Tags = (z.Tags)[:zb0002] + } else { + z.Tags = make([]BatchJobKV, zb0002) + } + for za0001 := range z.Tags { + bts, err = z.Tags[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + case "Metadata": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Metadata") + return + } + if cap(z.Metadata) >= int(zb0003) { + z.Metadata = (z.Metadata)[:zb0003] + } else { + z.Metadata = make([]BatchJobKV, zb0003) + } + for za0002 := range z.Metadata { + bts, err = z.Metadata[za0002].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Metadata", za0002) + return + } + } + case "Size": + bts, err = z.Size.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Size") + return + } + case "Type": + z.Type, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Type") + return + } + case "Name": + z.Name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + case "Purge": + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Purge") + return + } + for zb0004 > 0 { + zb0004-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, "Purge") + return + } + switch msgp.UnsafeString(field) { + case "RetainVersions": + z.Purge.RetainVersions, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Purge", "RetainVersions") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Purge") + return + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *BatchJobExpireFilter) Msgsize() (s int) { + s = 1 + 10 + msgp.DurationSize + 14 + if z.CreatedBefore == nil { + s += msgp.NilSize + } else { + s += msgp.TimeSize + } + s += 5 + msgp.ArrayHeaderSize + for za0001 := range z.Tags { + s += z.Tags[za0001].Msgsize() + } + s += 9 + msgp.ArrayHeaderSize + for za0002 := range z.Metadata { + s += z.Metadata[za0002].Msgsize() + } + s += 5 + z.Size.Msgsize() + 5 + msgp.StringPrefixSize + len(z.Type) + 5 + msgp.StringPrefixSize + len(z.Name) + 6 + 1 + 15 + msgp.IntSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *BatchJobExpirePurge) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "RetainVersions": + z.RetainVersions, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "RetainVersions") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z BatchJobExpirePurge) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "RetainVersions" + err = en.Append(0x81, 0xae, 0x52, 0x65, 0x74, 0x61, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteInt(z.RetainVersions) + if err != nil { + err = msgp.WrapError(err, "RetainVersions") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z BatchJobExpirePurge) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "RetainVersions" + o = append(o, 0x81, 0xae, 0x52, 0x65, 0x74, 0x61, 0x69, 0x6e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + o = msgp.AppendInt(o, z.RetainVersions) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobExpirePurge) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "RetainVersions": + z.RetainVersions, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "RetainVersions") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z BatchJobExpirePurge) Msgsize() (s int) { + s = 1 + 15 + msgp.IntSize + return +} diff --git a/cmd/batch-expire_gen_test.go b/cmd/batch-expire_gen_test.go new file mode 100644 index 000000000..ed5eab6ca --- /dev/null +++ b/cmd/batch-expire_gen_test.go @@ -0,0 +1,349 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalBatchJobExpire(t *testing.T) { + v := BatchJobExpire{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBatchJobExpire(b *testing.B) { + v := BatchJobExpire{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBatchJobExpire(b *testing.B) { + v := BatchJobExpire{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBatchJobExpire(b *testing.B) { + v := BatchJobExpire{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBatchJobExpire(t *testing.T) { + v := BatchJobExpire{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBatchJobExpire Msgsize() is inaccurate") + } + + vn := BatchJobExpire{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBatchJobExpire(b *testing.B) { + v := BatchJobExpire{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBatchJobExpire(b *testing.B) { + v := BatchJobExpire{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalBatchJobExpireFilter(t *testing.T) { + v := BatchJobExpireFilter{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBatchJobExpireFilter(b *testing.B) { + v := BatchJobExpireFilter{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBatchJobExpireFilter(b *testing.B) { + v := BatchJobExpireFilter{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBatchJobExpireFilter(b *testing.B) { + v := BatchJobExpireFilter{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBatchJobExpireFilter(t *testing.T) { + v := BatchJobExpireFilter{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBatchJobExpireFilter Msgsize() is inaccurate") + } + + vn := BatchJobExpireFilter{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBatchJobExpireFilter(b *testing.B) { + v := BatchJobExpireFilter{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBatchJobExpireFilter(b *testing.B) { + v := BatchJobExpireFilter{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalBatchJobExpirePurge(t *testing.T) { + v := BatchJobExpirePurge{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBatchJobExpirePurge(b *testing.B) { + v := BatchJobExpirePurge{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBatchJobExpirePurge(b *testing.B) { + v := BatchJobExpirePurge{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBatchJobExpirePurge(b *testing.B) { + v := BatchJobExpirePurge{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBatchJobExpirePurge(t *testing.T) { + v := BatchJobExpirePurge{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBatchJobExpirePurge Msgsize() is inaccurate") + } + + vn := BatchJobExpirePurge{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBatchJobExpirePurge(b *testing.B) { + v := BatchJobExpirePurge{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBatchJobExpirePurge(b *testing.B) { + v := BatchJobExpirePurge{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/batch-expire_test.go b/cmd/batch-expire_test.go new file mode 100644 index 000000000..d188fa202 --- /dev/null +++ b/cmd/batch-expire_test.go @@ -0,0 +1,71 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "testing" + + "gopkg.in/yaml.v2" +) + +func TestParseBatchJobExpire(t *testing.T) { + expireYaml := ` +expire: # Expire objects that match a condition + apiVersion: v1 + bucket: mybucket # Bucket where this batch job will expire matching objects from + prefix: myprefix # (Optional) Prefix under which this job will expire objects matching the rules below. + rules: + - type: object # regular objects with zero ore more older versions + name: NAME # match object names that satisfy the wildcard expression. + olderThan: 70h # match objects older than this value + createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date" + tags: + - key: name + value: pick* # match objects with tag 'name', all values starting with 'pick' + metadata: + - key: content-type + value: image/* # match objects with 'content-type', all values starting with 'image/' + size: + lessThan: "10MiB" # match objects with size less than this value (e.g. 10MiB) + greaterThan: 1MiB # match objects with size greater than this value (e.g. 1MiB) + purge: + # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest. + # retainVersions: 5 # keep the latest 5 versions of the object. + + - type: deleted # objects with delete marker as their latest version + name: NAME # match object names that satisfy the wildcard expression. + olderThan: 10h # match objects older than this value (e.g. 7d10h31s) + createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date" + purge: + # retainVersions: 0 # (default) delete all versions of the object. This option is the fastest. + # retainVersions: 5 # keep the latest 5 versions of the object including delete markers. + + notify: + endpoint: https://notify.endpoint # notification endpoint to receive job completion status + token: Bearer xxxxx # optional authentication token for the notification endpoint + + retry: + attempts: 10 # number of retries for the job before giving up + delay: 500ms # least amount of delay between each retry +` + var job BatchJobRequest + err := yaml.UnmarshalStrict([]byte(expireYaml), &job) + if err != nil { + t.Fatal("Failed to parse batch-job-expire yaml", err) + } +} diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 1f32ef94a..3d60fa5be 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -42,6 +42,7 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" + "github.com/minio/minio/internal/config/batch" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" @@ -54,6 +55,8 @@ import ( "gopkg.in/yaml.v2" ) +var globalBatchConfig batch.Config + // BatchJobRequest this is an internal data structure not for external consumption. type BatchJobRequest struct { ID string `yaml:"-" json:"name"` @@ -62,6 +65,7 @@ type BatchJobRequest struct { Location string `yaml:"-" json:"location"` Replicate *BatchJobReplicateV1 `yaml:"replicate" json:"replicate"` KeyRotate *BatchJobKeyRotateV1 `yaml:"keyrotate" json:"keyrotate"` + Expire *BatchJobExpire `yaml:"expire" json:"expire"` ctx context.Context `msg:"-"` } @@ -431,23 +435,27 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay wk.Take() go func() { defer wk.Give() - stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, oi) + stopFn := globalBatchJobsMetrics.trace(batchJobMetricReplication, job.ID, attempts) success := true if err := r.ReplicateFromSource(ctx, api, core, oi, retry); err != nil { // object must be deleted concurrently, allow these failures but do not count them if isErrVersionNotFound(err) || isErrObjectNotFound(err) { return } - stopFn(err) + stopFn(oi, err) logger.LogIf(ctx, err) success = false } else { - stopFn(nil) + stopFn(oi, nil) } ri.trackCurrentBucketObject(r.Target.Bucket, oi, success) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + + if wait := globalBatchConfig.ReplicationWait(); wait > 0 { + time.Sleep(wait) + } }() } wk.Wait() @@ -725,7 +733,12 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR fileName = batchKeyRotationName version = batchKeyRotateVersionV1 format = batchKeyRotationFormat - + case job.Expire != nil: + fileName = batchExpireName + version = batchExpireVersionV1 + format = batchExpireFormat + default: + return errors.New("no supported batch job request specified") } data, err := readConfig(ctx, api, pathJoin(job.Location, fileName)) if err != nil { @@ -742,6 +755,11 @@ func (ri *batchJobInfo) load(ctx context.Context, api ObjectLayer, job BatchJobR if job.KeyRotate.Flags.Retry.Attempts > 0 { ri.RetryAttempts = job.KeyRotate.Flags.Retry.Attempts } + case job.Expire != nil: + ri.RetryAttempts = batchExpireJobDefaultRetries + if job.Expire.Retry.Attempts > 0 { + ri.RetryAttempts = job.Expire.Retry.Attempts + } } return nil } @@ -851,6 +869,12 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati jobTyp = string(job.Type()) fileName = batchKeyRotationName ri.Version = batchKeyRotateVersionV1 + case madmin.BatchJobExpire: + format = batchExpireFormat + version = batchExpireVersion + jobTyp = string(job.Type()) + fileName = batchExpireName + ri.Version = batchExpireVersionV1 default: return errInvalidArgument } @@ -876,7 +900,18 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati return nil } -func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, failed bool) { +// Note: to be used only with batch jobs that affect multiple versions through +// a single action. e.g batch-expire has an option to expire all versions of an +// object which matches the given filters. +func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectInfo, success bool) { + if success { + ri.Objects += int64(info.NumVersions) + } else { + ri.ObjectsFailed += int64(info.NumVersions) + } +} + +func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) { if ri == nil { return } @@ -886,7 +921,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, ri.Bucket = bucket ri.Object = info.Name - ri.countItem(info.Size, info.DeleteMarker, failed) + ri.countItem(info.Size, info.DeleteMarker, success) } // Start start the batch replication job, resumes if there was a pending job via "job.ID" @@ -1115,7 +1150,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba go func() { defer wk.Give() - stopFn := globalBatchJobsMetrics.trace(batchReplicationMetricObject, job.ID, attempts, result) + stopFn := globalBatchJobsMetrics.trace(batchJobMetricReplication, job.ID, attempts) success := true if err := r.ReplicateToTarget(ctx, api, c, result, retry); err != nil { if miniogo.ToErrorResponse(err).Code == "PreconditionFailed" { @@ -1126,16 +1161,20 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if isErrVersionNotFound(err) || isErrObjectNotFound(err) { return } - stopFn(err) + stopFn(result, err) logger.LogIf(ctx, err) success = false } else { - stopFn(nil) + stopFn(result, nil) } ri.trackCurrentBucketObject(r.Source.Bucket, result, success) globalBatchJobsMetrics.save(job.ID, ri) // persist in-memory state to disk after every 10secs. logger.LogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) + + if wait := globalBatchConfig.ReplicationWait(); wait > 0 { + time.Sleep(wait) + } }() } wk.Wait() @@ -1340,6 +1379,8 @@ func (j BatchJobRequest) Type() madmin.BatchJobType { return madmin.BatchJobReplicate case j.KeyRotate != nil: return madmin.BatchJobKeyRotate + case j.Expire != nil: + return madmin.BatchJobExpire } return madmin.BatchJobType("unknown") } @@ -1352,6 +1393,8 @@ func (j BatchJobRequest) Validate(ctx context.Context, o ObjectLayer) error { return j.Replicate.Validate(ctx, j, o) case j.KeyRotate != nil: return j.KeyRotate.Validate(ctx, j, o) + case j.Expire != nil: + return j.Expire.Validate(ctx, j, o) } return errInvalidArgument } @@ -1362,12 +1405,14 @@ func (j BatchJobRequest) delete(ctx context.Context, api ObjectLayer) { deleteConfig(ctx, api, pathJoin(j.Location, batchReplName)) case j.KeyRotate != nil: deleteConfig(ctx, api, pathJoin(j.Location, batchKeyRotationName)) + case j.Expire != nil: + deleteConfig(ctx, api, pathJoin(j.Location, batchExpireName)) } deleteConfig(ctx, api, j.Location) } func (j *BatchJobRequest) save(ctx context.Context, api ObjectLayer) error { - if j.Replicate == nil && j.KeyRotate == nil { + if j.Replicate == nil && j.KeyRotate == nil && j.Expire == nil { return errInvalidArgument } @@ -1692,7 +1737,8 @@ func (j *BatchJobPool) AddWorker() { if !ok { return } - if job.Replicate != nil { + switch { + case job.Replicate != nil: if job.Replicate.RemoteToLocal() { if err := job.Replicate.StartFromSource(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { @@ -1712,14 +1758,20 @@ func (j *BatchJobPool) AddWorker() { // Bucket not found proceed to delete such a job. } } - } - if job.KeyRotate != nil { + case job.KeyRotate != nil: if err := job.KeyRotate.Start(job.ctx, j.objLayer, *job); err != nil { if !isErrBucketNotFound(err) { logger.LogIf(j.ctx, err) continue } } + case job.Expire != nil: + if err := job.Expire.Start(job.ctx, j.objLayer, *job); err != nil { + if !isErrBucketNotFound(err) { + logger.LogIf(j.ctx, err) + continue + } + } } job.delete(j.ctx, j.objLayer) j.canceler(job.ID, false) @@ -1797,24 +1849,26 @@ type batchJobMetrics struct { type batchJobMetric uint8 const ( - batchReplicationMetricObject batchJobMetric = iota - batchKeyRotationMetricObject + batchJobMetricReplication batchJobMetric = iota + batchJobMetricKeyRotation + batchJobMetricExpire ) -func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info ObjectInfo, attempts int, err error) madmin.TraceInfo { +func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration time.Duration, info objTraceInfoer, attempts int, err error) madmin.TraceInfo { var errStr string if err != nil { errStr = err.Error() } - jobKind := "batchReplication" traceType := madmin.TraceBatchReplication - if d == batchKeyRotationMetricObject { - jobKind = "batchKeyRotation" + switch d { + case batchJobMetricKeyRotation: traceType = madmin.TraceBatchKeyRotation + case batchJobMetricExpire: + traceType = madmin.TraceBatchExpire } - funcName := fmt.Sprintf("%s.%s (job-name=%s)", jobKind, d.String(), job) + funcName := fmt.Sprintf("%s() (job-name=%s)", d.String(), job) if attempts > 0 { - funcName = fmt.Sprintf("%s.%s (job-name=%s,attempts=%s)", jobKind, d.String(), job, humanize.Ordinal(attempts)) + funcName = fmt.Sprintf("%s() (job-name=%s,attempts=%s)", d.String(), job, humanize.Ordinal(attempts)) } return madmin.TraceInfo{ TraceType: traceType, @@ -1822,55 +1876,65 @@ func batchJobTrace(d batchJobMetric, job string, startTime time.Time, duration t NodeName: globalLocalNodeName, FuncName: funcName, Duration: duration, - Path: info.Name, + Path: fmt.Sprintf("%s (versionID=%s)", info.TraceObjName(), info.TraceVersionID()), Error: errStr, } } +func (ri *batchJobInfo) metric() madmin.JobMetric { + m := madmin.JobMetric{ + JobID: ri.JobID, + JobType: ri.JobType, + StartTime: ri.StartTime, + LastUpdate: ri.LastUpdate, + RetryAttempts: ri.RetryAttempts, + Complete: ri.Complete, + Failed: ri.Failed, + } + + switch ri.JobType { + case string(madmin.BatchJobReplicate): + m.Replicate = &madmin.ReplicateInfo{ + Bucket: ri.Bucket, + Object: ri.Object, + Objects: ri.Objects, + ObjectsFailed: ri.ObjectsFailed, + BytesTransferred: ri.BytesTransferred, + BytesFailed: ri.BytesFailed, + } + case string(madmin.BatchJobKeyRotate): + m.KeyRotate = &madmin.KeyRotationInfo{ + Bucket: ri.Bucket, + Object: ri.Object, + Objects: ri.Objects, + ObjectsFailed: ri.ObjectsFailed, + } + case string(madmin.BatchJobExpire): + m.Expired = &madmin.ExpirationInfo{ + Bucket: ri.Bucket, + Object: ri.Object, + Objects: ri.Objects, + ObjectsFailed: ri.ObjectsFailed, + } + } + + return m +} + func (m *batchJobMetrics) report(jobID string) (metrics *madmin.BatchJobMetrics) { metrics = &madmin.BatchJobMetrics{CollectedAt: time.Now(), Jobs: make(map[string]madmin.JobMetric)} m.RLock() defer m.RUnlock() - match := true + if jobID != "" { + if job, ok := m.metrics[jobID]; ok { + metrics.Jobs[jobID] = job.metric() + } + return metrics + } + for id, job := range m.metrics { - if jobID != "" { - match = id == jobID - } - if !match { - continue - } - - m := madmin.JobMetric{ - JobID: job.JobID, - JobType: job.JobType, - StartTime: job.StartTime, - LastUpdate: job.LastUpdate, - RetryAttempts: job.RetryAttempts, - Complete: job.Complete, - Failed: job.Failed, - } - - switch job.JobType { - case string(madmin.BatchJobReplicate): - m.Replicate = &madmin.ReplicateInfo{ - Bucket: job.Bucket, - Object: job.Object, - Objects: job.Objects, - ObjectsFailed: job.ObjectsFailed, - BytesTransferred: job.BytesTransferred, - BytesFailed: job.BytesFailed, - } - case string(madmin.BatchJobKeyRotate): - m.KeyRotate = &madmin.KeyRotationInfo{ - Bucket: job.Bucket, - Object: job.Object, - Objects: job.Objects, - ObjectsFailed: job.ObjectsFailed, - } - } - - metrics.Jobs[id] = m + metrics.Jobs[id] = job.metric() } return metrics } @@ -1915,19 +1979,52 @@ func (m *batchJobMetrics) save(jobID string, ri *batchJobInfo) { m.metrics[jobID] = ri.clone() } -func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int, info ObjectInfo) func(err error) { +type objTraceInfoer interface { + TraceObjName() string + TraceVersionID() string +} + +// TraceObjName returns name of object being traced +func (td ObjectToDelete) TraceObjName() string { + return td.ObjectName +} + +// TraceVersionID returns version-id of object being traced +func (td ObjectToDelete) TraceVersionID() string { + return td.VersionID +} + +// TraceObjName returns name of object being traced +func (oi ObjectInfo) TraceObjName() string { + return oi.Name +} + +// TraceVersionID returns version-id of object being traced +func (oi ObjectInfo) TraceVersionID() string { + return oi.VersionID +} + +func (m *batchJobMetrics) trace(d batchJobMetric, job string, attempts int) func(info objTraceInfoer, err error) { startTime := time.Now() - return func(err error) { + return func(info objTraceInfoer, err error) { duration := time.Since(startTime) + if globalTrace.NumSubscribers(madmin.TraceBatch) > 0 { + globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) + return + } switch d { - case batchReplicationMetricObject: + case batchJobMetricReplication: if globalTrace.NumSubscribers(madmin.TraceBatchReplication) > 0 { globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) } - case batchKeyRotationMetricObject: + case batchJobMetricKeyRotation: if globalTrace.NumSubscribers(madmin.TraceBatchKeyRotation) > 0 { globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) } + case batchJobMetricExpire: + if globalTrace.NumSubscribers(madmin.TraceBatchExpire) > 0 { + globalTrace.Publish(batchJobTrace(d, job, startTime, duration, info, attempts, err)) + } } } } diff --git a/cmd/batch-handlers_gen.go b/cmd/batch-handlers_gen.go index 54c80baef..40b476a27 100644 --- a/cmd/batch-handlers_gen.go +++ b/cmd/batch-handlers_gen.go @@ -84,6 +84,24 @@ func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) { return } } + case "Expire": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "Expire") + return + } + z.Expire = nil + } else { + if z.Expire == nil { + z.Expire = new(BatchJobExpire) + } + err = z.Expire.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Expire") + return + } + } default: err = dc.Skip() if err != nil { @@ -97,9 +115,9 @@ func (z *BatchJobRequest) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "ID" - err = en.Append(0x86, 0xa2, 0x49, 0x44) + err = en.Append(0x87, 0xa2, 0x49, 0x44) if err != nil { return } @@ -172,15 +190,32 @@ func (z *BatchJobRequest) EncodeMsg(en *msgp.Writer) (err error) { return } } + // write "Expire" + err = en.Append(0xa6, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65) + if err != nil { + return + } + if z.Expire == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = z.Expire.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Expire") + return + } + } return } // MarshalMsg implements msgp.Marshaler func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "ID" - o = append(o, 0x86, 0xa2, 0x49, 0x44) + o = append(o, 0x87, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) // string "User" o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x72) @@ -213,6 +248,17 @@ func (z *BatchJobRequest) MarshalMsg(b []byte) (o []byte, err error) { return } } + // string "Expire" + o = append(o, 0xa6, 0x45, 0x78, 0x70, 0x69, 0x72, 0x65) + if z.Expire == nil { + o = msgp.AppendNil(o) + } else { + o, err = z.Expire.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Expire") + return + } + } return } @@ -292,6 +338,23 @@ func (z *BatchJobRequest) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } + case "Expire": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.Expire = nil + } else { + if z.Expire == nil { + z.Expire = new(BatchJobExpire) + } + bts, err = z.Expire.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Expire") + return + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -318,6 +381,12 @@ func (z *BatchJobRequest) Msgsize() (s int) { } else { s += z.KeyRotate.Msgsize() } + s += 7 + if z.Expire == nil { + s += msgp.NilSize + } else { + s += z.Expire.Msgsize() + } return } diff --git a/cmd/batch-job-common-types.go b/cmd/batch-job-common-types.go index 1b1e379be..38b04c558 100644 --- a/cmd/batch-job-common-types.go +++ b/cmd/batch-job-common-types.go @@ -112,3 +112,52 @@ func (b BatchJobSnowball) Validate() error { _, err := humanize.ParseBytes(*b.SmallerThan) return err } + +// BatchJobSizeFilter supports size based filters - LesserThan and GreaterThan +type BatchJobSizeFilter struct { + UpperBound BatchJobSize `yaml:"lessThan" json:"lessThan"` + LowerBound BatchJobSize `yaml:"greaterThan" json:"greaterThan"` +} + +// InRange returns true in the following cases and false otherwise, +// - sf.LowerBound < sz, when sf.LowerBound alone is specified +// - sz < sf.UpperBound, when sf.UpperBound alone is specified +// - sf.LowerBound < sz < sf.UpperBound when both are specified, +func (sf BatchJobSizeFilter) InRange(sz int64) bool { + if sf.UpperBound > 0 && sz > int64(sf.UpperBound) { + return false + } + + if sf.LowerBound > 0 && sz < int64(sf.LowerBound) { + return false + } + return true +} + +var errInvalidBatchJobSizeFilter = errors.New("invalid batch-job size filter") + +// Validate checks if sf is a valid batch-job size filter +func (sf BatchJobSizeFilter) Validate() error { + if sf.LowerBound > 0 && sf.UpperBound > 0 && sf.LowerBound >= sf.UpperBound { + return errInvalidBatchJobSizeFilter + } + return nil +} + +// BatchJobSize supports humanized byte values in yaml files type BatchJobSize uint64 +type BatchJobSize int64 + +// UnmarshalYAML to parse humanized byte values +func (s *BatchJobSize) UnmarshalYAML(unmarshal func(interface{}) error) error { + var batchExpireSz string + err := unmarshal(&batchExpireSz) + if err != nil { + return err + } + sz, err := humanize.ParseBytes(batchExpireSz) + if err != nil { + return err + } + *s = BatchJobSize(sz) + return nil +} diff --git a/cmd/batch-job-common-types_gen.go b/cmd/batch-job-common-types_gen.go index 8b3b519a3..dc3ee6e70 100644 --- a/cmd/batch-job-common-types_gen.go +++ b/cmd/batch-job-common-types_gen.go @@ -390,6 +390,202 @@ func (z BatchJobRetry) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *BatchJobSize) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 int64 + zb0001, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = BatchJobSize(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z BatchJobSize) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteInt64(int64(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z BatchJobSize) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendInt64(o, int64(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobSize) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 int64 + zb0001, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = BatchJobSize(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z BatchJobSize) Msgsize() (s int) { + s = msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *BatchJobSizeFilter) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "UpperBound": + { + var zb0002 int64 + zb0002, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "UpperBound") + return + } + z.UpperBound = BatchJobSize(zb0002) + } + case "LowerBound": + { + var zb0003 int64 + zb0003, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "LowerBound") + return + } + z.LowerBound = BatchJobSize(zb0003) + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z BatchJobSizeFilter) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "UpperBound" + err = en.Append(0x82, 0xaa, 0x55, 0x70, 0x70, 0x65, 0x72, 0x42, 0x6f, 0x75, 0x6e, 0x64) + if err != nil { + return + } + err = en.WriteInt64(int64(z.UpperBound)) + if err != nil { + err = msgp.WrapError(err, "UpperBound") + return + } + // write "LowerBound" + err = en.Append(0xaa, 0x4c, 0x6f, 0x77, 0x65, 0x72, 0x42, 0x6f, 0x75, 0x6e, 0x64) + if err != nil { + return + } + err = en.WriteInt64(int64(z.LowerBound)) + if err != nil { + err = msgp.WrapError(err, "LowerBound") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z BatchJobSizeFilter) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "UpperBound" + o = append(o, 0x82, 0xaa, 0x55, 0x70, 0x70, 0x65, 0x72, 0x42, 0x6f, 0x75, 0x6e, 0x64) + o = msgp.AppendInt64(o, int64(z.UpperBound)) + // string "LowerBound" + o = append(o, 0xaa, 0x4c, 0x6f, 0x77, 0x65, 0x72, 0x42, 0x6f, 0x75, 0x6e, 0x64) + o = msgp.AppendInt64(o, int64(z.LowerBound)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobSizeFilter) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "UpperBound": + { + var zb0002 int64 + zb0002, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "UpperBound") + return + } + z.UpperBound = BatchJobSize(zb0002) + } + case "LowerBound": + { + var zb0003 int64 + zb0003, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "LowerBound") + return + } + z.LowerBound = BatchJobSize(zb0003) + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z BatchJobSizeFilter) Msgsize() (s int) { + s = 1 + 11 + msgp.Int64Size + 11 + msgp.Int64Size + return +} + // DecodeMsg implements msgp.Decodable func (z *BatchJobSnowball) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/cmd/batch-job-common-types_gen_test.go b/cmd/batch-job-common-types_gen_test.go index 9354197a9..96d79ef58 100644 --- a/cmd/batch-job-common-types_gen_test.go +++ b/cmd/batch-job-common-types_gen_test.go @@ -348,6 +348,119 @@ func BenchmarkDecodeBatchJobRetry(b *testing.B) { } } +func TestMarshalUnmarshalBatchJobSizeFilter(t *testing.T) { + v := BatchJobSizeFilter{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBatchJobSizeFilter(b *testing.B) { + v := BatchJobSizeFilter{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBatchJobSizeFilter(b *testing.B) { + v := BatchJobSizeFilter{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBatchJobSizeFilter(b *testing.B) { + v := BatchJobSizeFilter{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBatchJobSizeFilter(t *testing.T) { + v := BatchJobSizeFilter{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBatchJobSizeFilter Msgsize() is inaccurate") + } + + vn := BatchJobSizeFilter{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBatchJobSizeFilter(b *testing.B) { + v := BatchJobSizeFilter{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBatchJobSizeFilter(b *testing.B) { + v := BatchJobSizeFilter{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalBatchJobSnowball(t *testing.T) { v := BatchJobSnowball{} bts, err := v.MarshalMsg(nil) diff --git a/cmd/batch-job-common-types_test.go b/cmd/batch-job-common-types_test.go new file mode 100644 index 000000000..ccfc2e39b --- /dev/null +++ b/cmd/batch-job-common-types_test.go @@ -0,0 +1,136 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "fmt" + "testing" +) + +func TestBatchJobSizeInRange(t *testing.T) { + tests := []struct { + objSize int64 + sizeFilter BatchJobSizeFilter + want bool + }{ + { + // 1Mib < 2Mib < 10MiB -> in range + objSize: 2 << 20, + sizeFilter: BatchJobSizeFilter{ + UpperBound: 10 << 20, + LowerBound: 1 << 20, + }, + want: true, + }, + { + // 2KiB < 1 MiB -> out of range from left + objSize: 2 << 10, + sizeFilter: BatchJobSizeFilter{ + UpperBound: 10 << 20, + LowerBound: 1 << 20, + }, + want: false, + }, + { + // 11MiB > 10 MiB -> out of range from right + objSize: 11 << 20, + sizeFilter: BatchJobSizeFilter{ + UpperBound: 10 << 20, + LowerBound: 1 << 20, + }, + want: false, + }, + { + // 2MiB < 10MiB -> in range + objSize: 2 << 20, + sizeFilter: BatchJobSizeFilter{ + UpperBound: 10 << 20, + }, + want: true, + }, + { + // 2MiB > 1MiB -> in range + objSize: 2 << 20, + sizeFilter: BatchJobSizeFilter{ + LowerBound: 1 << 20, + }, + want: true, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("test-%d", i+1), func(t *testing.T) { + if got := test.sizeFilter.InRange(test.objSize); got != test.want { + t.Fatalf("Expected %v but got %v", test.want, got) + } + }) + } +} + +func TestBatchJobSizeValidate(t *testing.T) { + tests := []struct { + sizeFilter BatchJobSizeFilter + err error + }{ + { + // Unspecified size filter is a valid filter + sizeFilter: BatchJobSizeFilter{ + UpperBound: 0, + LowerBound: 0, + }, + err: nil, + }, + { + sizeFilter: BatchJobSizeFilter{ + UpperBound: 0, + LowerBound: 1 << 20, + }, + err: nil, + }, + { + sizeFilter: BatchJobSizeFilter{ + UpperBound: 10 << 20, + LowerBound: 0, + }, + err: nil, + }, + { + // LowerBound > UpperBound -> empty range + sizeFilter: BatchJobSizeFilter{ + UpperBound: 1 << 20, + LowerBound: 10 << 20, + }, + err: errInvalidBatchJobSizeFilter, + }, + { + // LowerBound == UpperBound -> empty range + sizeFilter: BatchJobSizeFilter{ + UpperBound: 1 << 20, + LowerBound: 1 << 20, + }, + err: errInvalidBatchJobSizeFilter, + }, + } + for i, test := range tests { + t.Run(fmt.Sprintf("test-%d", i+1), func(t *testing.T) { + if err := test.sizeFilter.Validate(); err != test.err { + t.Fatalf("Expected %v but got %v", test.err, err) + } + }) + } +} diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index f46b49590..491826246 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -154,7 +154,6 @@ type BatchJobKeyRotateV1 struct { Flags BatchJobKeyRotateFlags `yaml:"flags" json:"flags"` Bucket string `yaml:"bucket" json:"bucket"` Prefix string `yaml:"prefix" json:"prefix"` - Endpoint string `yaml:"endpoint" json:"endpoint"` Encryption BatchJobKeyRotateEncryption `yaml:"encryption" json:"encryption"` } @@ -380,14 +379,14 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba defer wk.Give() for attempts := 1; attempts <= retryAttempts; attempts++ { attempts := attempts - stopFn := globalBatchJobsMetrics.trace(batchKeyRotationMetricObject, job.ID, attempts, result) + stopFn := globalBatchJobsMetrics.trace(batchJobMetricKeyRotation, job.ID, attempts) success := true if err := r.KeyRotate(ctx, api, result); err != nil { - stopFn(err) + stopFn(result, err) logger.LogIf(ctx, err) success = false } else { - stopFn(nil) + stopFn(result, nil) } ri.trackCurrentBucketObject(r.Bucket, result, success) ri.RetryAttempts = attempts @@ -401,6 +400,10 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba time.Sleep(delay + time.Duration(rnd.Float64()*float64(delay))) } } + + if wait := globalBatchConfig.KeyRotationWait(); wait > 0 { + time.Sleep(wait) + } }() } wk.Wait() diff --git a/cmd/batch-rotate_gen.go b/cmd/batch-rotate_gen.go index b324b83b2..7ddeb5eda 100644 --- a/cmd/batch-rotate_gen.go +++ b/cmd/batch-rotate_gen.go @@ -409,12 +409,6 @@ func (z *BatchJobKeyRotateV1) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Prefix") return } - case "Endpoint": - z.Endpoint, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Endpoint") - return - } case "Encryption": err = z.Encryption.DecodeMsg(dc) if err != nil { @@ -434,9 +428,9 @@ func (z *BatchJobKeyRotateV1) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BatchJobKeyRotateV1) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 5 // write "APIVersion" - err = en.Append(0x86, 0xaa, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + err = en.Append(0x85, 0xaa, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) if err != nil { return } @@ -501,16 +495,6 @@ func (z *BatchJobKeyRotateV1) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Prefix") return } - // write "Endpoint" - err = en.Append(0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) - if err != nil { - return - } - err = en.WriteString(z.Endpoint) - if err != nil { - err = msgp.WrapError(err, "Endpoint") - return - } // write "Encryption" err = en.Append(0xaa, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e) if err != nil { @@ -527,9 +511,9 @@ func (z *BatchJobKeyRotateV1) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *BatchJobKeyRotateV1) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 5 // string "APIVersion" - o = append(o, 0x86, 0xaa, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + o = append(o, 0x85, 0xaa, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) o = msgp.AppendString(o, z.APIVersion) // string "Flags" o = append(o, 0xa5, 0x46, 0x6c, 0x61, 0x67, 0x73) @@ -561,9 +545,6 @@ func (z *BatchJobKeyRotateV1) MarshalMsg(b []byte) (o []byte, err error) { // string "Prefix" o = append(o, 0xa6, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78) o = msgp.AppendString(o, z.Prefix) - // string "Endpoint" - o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) - o = msgp.AppendString(o, z.Endpoint) // string "Encryption" o = append(o, 0xaa, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e) o, err = z.Encryption.MarshalMsg(o) @@ -651,12 +632,6 @@ func (z *BatchJobKeyRotateV1) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Prefix") return } - case "Endpoint": - z.Endpoint, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Endpoint") - return - } case "Encryption": bts, err = z.Encryption.UnmarshalMsg(bts) if err != nil { @@ -677,7 +652,7 @@ func (z *BatchJobKeyRotateV1) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobKeyRotateV1) Msgsize() (s int) { - s = 1 + 11 + msgp.StringPrefixSize + len(z.APIVersion) + 6 + 1 + 7 + z.Flags.Filter.Msgsize() + 7 + z.Flags.Notify.Msgsize() + 6 + z.Flags.Retry.Msgsize() + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 11 + z.Encryption.Msgsize() + s = 1 + 11 + msgp.StringPrefixSize + len(z.APIVersion) + 6 + 1 + 7 + z.Flags.Filter.Msgsize() + 7 + z.Flags.Notify.Msgsize() + 6 + z.Flags.Retry.Msgsize() + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 11 + z.Encryption.Msgsize() return } diff --git a/cmd/batchjobmetric_string.go b/cmd/batchjobmetric_string.go index a127749d3..a1697a19d 100644 --- a/cmd/batchjobmetric_string.go +++ b/cmd/batchjobmetric_string.go @@ -8,13 +8,14 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[batchReplicationMetricObject-0] - _ = x[batchKeyRotationMetricObject-1] + _ = x[batchJobMetricReplication-0] + _ = x[batchJobMetricKeyRotation-1] + _ = x[batchJobMetricExpire-2] } -const _batchJobMetric_name = "batchReplicationMetricObjectbatchKeyRotationMetricObject" +const _batchJobMetric_name = "ReplicationKeyRotationExpire" -var _batchJobMetric_index = [...]uint8{0, 28, 56} +var _batchJobMetric_index = [...]uint8{0, 11, 22, 28} func (i batchJobMetric) String() string { if i >= batchJobMetric(len(_batchJobMetric_index)-1) { diff --git a/cmd/config-current.go b/cmd/config-current.go index fb206b447..b13bb43b7 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -27,6 +27,7 @@ import ( "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/api" + "github.com/minio/minio/internal/config/batch" "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/config/callhome" "github.com/minio/minio/internal/config/compress" @@ -72,6 +73,7 @@ func initHelp() { config.CallhomeSubSys: callhome.DefaultKVS, config.DriveSubSys: drive.DefaultKVS, config.CacheSubSys: cache.DefaultKVS, + config.BatchSubSys: batch.DefaultKVS, } for k, v := range notify.DefaultNotificationKVS { kvs[k] = v @@ -115,6 +117,10 @@ func initHelp() { Key: config.ScannerSubSys, Description: "manage namespace scanning for usage calculation, lifecycle, healing and more", }, + config.HelpKV{ + Key: config.BatchSubSys, + Description: "manage batch job workers and wait times", + }, config.HelpKV{ Key: config.CompressionSubSys, Description: "enable server side compression of objects", @@ -241,6 +247,7 @@ func initHelp() { config.EtcdSubSys: etcd.Help, config.CompressionSubSys: compress.Help, config.HealSubSys: heal.Help, + config.BatchSubSys: batch.Help, config.ScannerSubSys: scanner.Help, config.IdentityOpenIDSubSys: openid.Help, config.IdentityLDAPSubSys: xldap.Help, @@ -301,6 +308,10 @@ func validateSubSysConfig(ctx context.Context, s config.Config, subSys string, o if _, err := api.LookupConfig(s[config.APISubSys][config.Default]); err != nil { return err } + case config.BatchSubSys: + if _, err := batch.LookupConfig(s[config.BatchSubSys][config.Default]); err != nil { + return err + } case config.StorageClassSubSys: if objAPI == nil { return errServerNotInitialized @@ -564,6 +575,12 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf return fmt.Errorf("Unable to apply heal config: %w", err) } globalHealConfig.Update(healCfg) + case config.BatchSubSys: + batchCfg, err := batch.LookupConfig(s[config.BatchSubSys][config.Default]) + if err != nil { + return fmt.Errorf("Unable to apply batch config: %w", err) + } + globalBatchConfig.Update(batchCfg) case config.ScannerSubSys: scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default]) if err != nil { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index b3762739e..9a5b1c6a1 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2035,7 +2035,10 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re return } - versionsSorter(fivs.Versions).reverse() + // Note: entry.fileInfoVersions returns versions sorted in reverse chronological order based on ModTime + if opts.VersionsSort == WalkVersionsSortAsc { + versionsSorter(fivs.Versions).reverse() + } for _, version := range fivs.Versions { if opts.Filter != nil { diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index 2fdfe533f..6f1a6cba4 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -29,6 +29,7 @@ import ( "time" "github.com/minio/kes-go" + "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/mcontext" @@ -98,6 +99,7 @@ func init() { getBucketUsageMetrics(), getHTTPMetrics(true), getBucketTTFBMetric(), + getBatchJobsMetrics(), } bucketPeerMetricsGroups = []*MetricsGroup{ @@ -3247,6 +3249,77 @@ func getClusterHealthMetrics() *MetricsGroup { return mg } +func getBatchJobsMetrics() *MetricsGroup { + mg := &MetricsGroup{ + cacheInterval: 10 * time.Second, + } + + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + objLayer := newObjectLayerFn() + // Service not initialized yet + if objLayer == nil { + return + } + + var m madmin.RealtimeMetrics + mLocal := collectLocalMetrics(madmin.MetricsBatchJobs, collectMetricsOpts{}) + m.Merge(&mLocal) + + mRemote := collectRemoteMetrics(ctx, madmin.MetricsBatchJobs, collectMetricsOpts{}) + m.Merge(&mRemote) + + if m.Aggregated.BatchJobs == nil { + return + } + + for _, mj := range m.Aggregated.BatchJobs.Jobs { + jtype := toSnake(mj.JobType) + var objects, objectsFailed float64 + var bucket string + switch madmin.BatchJobType(mj.JobType) { + case madmin.BatchJobReplicate: + objects = float64(mj.Replicate.Objects) + objectsFailed = float64(mj.Replicate.ObjectsFailed) + bucket = mj.Replicate.Bucket + case madmin.BatchJobKeyRotate: + objects = float64(mj.KeyRotate.Objects) + objectsFailed = float64(mj.KeyRotate.ObjectsFailed) + bucket = mj.KeyRotate.Bucket + case madmin.BatchJobExpire: + objects = float64(mj.Expired.Objects) + objectsFailed = float64(mj.Expired.ObjectsFailed) + bucket = mj.Expired.Bucket + } + metrics = append(metrics, + Metric{ + Description: MetricDescription{ + Namespace: bucketMetricNamespace, + Subsystem: "batch", + Name: MetricName(jtype + "_objects"), + Help: "Get successfully completed batch job " + jtype + "objects", + Type: counterMetric, + }, + Value: objects, + VariableLabels: map[string]string{"bucket": bucket, "jobId": mj.JobID}, + }, + Metric{ + Description: MetricDescription{ + Namespace: bucketMetricNamespace, + Subsystem: "batch", + Name: MetricName(jtype + "_objects_failed"), + Help: "Get failed batch job " + jtype + "objects", + Type: counterMetric, + }, + Value: objectsFailed, + VariableLabels: map[string]string{"bucket": bucket, "jobId": mj.JobID}, + }, + ) + } + return + }) + return mg +} + func getClusterStorageMetrics() *MetricsGroup { mg := &MetricsGroup{ cacheInterval: 1 * time.Minute, diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index fe8f40a91..a796723cf 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -44,6 +44,17 @@ type EvalRetentionBypassFn func(o ObjectInfo, gerr error) error // GetObjectInfoFn is the signature of GetObjectInfo function. type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) +// WalkVersionsSortOrder represents the sort order in which versions of an +// object should be returned by ObjectLayer.Walk method +type WalkVersionsSortOrder uint8 + +const ( + // WalkVersionsSortAsc - Sort in ascending order of ModTime + WalkVersionsSortAsc WalkVersionsSortOrder = iota + // WalkVersionsSortDesc - Sort in descending order of ModTime + WalkVersionsSortDesc +) + // ObjectOptions represents object options for ObjectLayer object operations type ObjectOptions struct { ServerSideEncryption encrypt.ServerSide @@ -111,10 +122,11 @@ type ObjectOptions struct { // WalkOptions provides filtering, marker and other Walk() specific options. type WalkOptions struct { - Filter func(info FileInfo) bool // return WalkFilter returns 'true/false' - Marker string // set to skip until this object - LatestOnly bool // returns only latest versions for all matching objects - AskDisks string // dictates how many disks are being listed + Filter func(info FileInfo) bool // return WalkFilter returns 'true/false' + Marker string // set to skip until this object + LatestOnly bool // returns only latest versions for all matching objects + AskDisks string // dictates how many disks are being listed + VersionsSort WalkVersionsSortOrder // sort order for versions of the same object; default: Ascending order in ModTime } // ExpirationOptions represents object options for object expiration at objectLayer. diff --git a/go.mod b/go.mod index 72620a2da..bedeb7569 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/minio/madmin-go/v3 v3.0.35-0.20231130082526-199918d0ff20 github.com/minio/minio-go/v7 v7.0.65-0.20231122233251-1f7dd6b7e3e1 github.com/minio/mux v1.9.0 + github.com/minio/pkg v1.7.5 github.com/minio/pkg/v2 v2.0.4 github.com/minio/selfupdate v0.6.0 github.com/minio/sha256-simd v1.0.1 @@ -187,7 +188,6 @@ require ( github.com/minio/filepath v1.0.0 // indirect github.com/minio/mc v0.0.0-20231127112613-5e6ae2172e25 // indirect github.com/minio/md5-simd v1.1.2 // indirect - github.com/minio/pkg v1.7.5 // indirect github.com/minio/websocket v1.6.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/internal/config/batch/batch.go b/internal/config/batch/batch.go new file mode 100644 index 000000000..84c3847c0 --- /dev/null +++ b/internal/config/batch/batch.go @@ -0,0 +1,160 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package batch + +import ( + "sync" + "time" + + "github.com/minio/minio/internal/config" + "github.com/minio/pkg/env" +) + +// Batch job environment variables +const ( + ReplicationWorkersWait = "replication_workers_wait" + KeyRotationWorkersWait = "keyrotation_workers_wait" + ExpirationWorkersWait = "expiration_workers_wait" + + EnvReplicationWorkersWait = "MINIO_BATCH_REPLICATION_WORKERS_WAIT" + EnvKeyRotationWorkersWait = "MINIO_BATCH_KEYROTATION_WORKERS_WAIT" + EnvKeyExpirationWorkersWait = "MINIO_BATCH_EXPIRATION_WORKERS_WAIT" +) + +var configMu sync.RWMutex + +// Config represents the batch job settings. +type Config struct { + ReplicationWorkersWait time.Duration `json:"replicationWorkersWait"` + KeyRotationWorkersWait time.Duration `json:"keyRotationWorkersWait"` + ExpirationWorkersWait time.Duration `json:"expirationWorkersWait"` +} + +// ExpirationWait returns the duration for which a batch expiration worker +// would wait before working on next object. +func (opts Config) ExpirationWait() time.Duration { + configMu.RLock() + defer configMu.RUnlock() + + return opts.ExpirationWorkersWait +} + +// ReplicationWait returns the duration for which a batch replication worker +// would wait before working on next object. +func (opts Config) ReplicationWait() time.Duration { + configMu.RLock() + defer configMu.RUnlock() + + return opts.ReplicationWorkersWait +} + +// KeyRotationWait returns the duration for which a batch key-rotation worker +// would wait before working on next object. +func (opts Config) KeyRotationWait() time.Duration { + configMu.RLock() + defer configMu.RUnlock() + + return opts.KeyRotationWorkersWait +} + +// Clone returns a copy of Config value +func (opts Config) Clone() Config { + configMu.RLock() + defer configMu.RUnlock() + + return Config{ + ReplicationWorkersWait: opts.ReplicationWorkersWait, + KeyRotationWorkersWait: opts.KeyRotationWorkersWait, + ExpirationWorkersWait: opts.ExpirationWorkersWait, + } +} + +// Update updates opts with nopts +func (opts *Config) Update(nopts Config) { + configMu.Lock() + defer configMu.Unlock() + + opts.ReplicationWorkersWait = nopts.ReplicationWorkersWait + opts.KeyRotationWorkersWait = nopts.KeyRotationWorkersWait + opts.ExpirationWorkersWait = nopts.ExpirationWorkersWait +} + +// DefaultKVS - default KV config for batch job settings +var DefaultKVS = config.KVS{ + config.KV{ + Key: ReplicationWorkersWait, + Value: "0ms", // No wait by default between each replication attempts. + }, + config.KV{ + Key: KeyRotationWorkersWait, + Value: "0ms", // No wait by default between each key rotation attempts. + }, + config.KV{ + Key: ExpirationWorkersWait, + Value: "0ms", // No wait by default between each expiration attempts. + }, +} + +// LookupConfig - lookup config and override with valid environment settings if any. +func LookupConfig(kvs config.KVS) (cfg Config, err error) { + if err = config.CheckValidKeys(config.BatchSubSys, kvs, DefaultKVS); err != nil { + return cfg, err + } + + cfg.ReplicationWorkersWait = 0 + cfg.KeyRotationWorkersWait = 0 + cfg.ExpirationWorkersWait = 0 + + rduration, err := time.ParseDuration(env.Get(EnvReplicationWorkersWait, kvs.GetWithDefault(ReplicationWorkersWait, DefaultKVS))) + if err != nil { + return cfg, err + } + if rduration < 0 { + return cfg, config.ErrInvalidBatchReplicationWorkersWait(nil) + } + + kduration, err := time.ParseDuration(env.Get(EnvKeyRotationWorkersWait, kvs.GetWithDefault(KeyRotationWorkersWait, DefaultKVS))) + if err != nil { + return cfg, err + } + if kduration < 0 { + return cfg, config.ErrInvalidBatchKeyRotationWorkersWait(nil) + } + + eduration, err := time.ParseDuration(env.Get(EnvKeyExpirationWorkersWait, kvs.GetWithDefault(ExpirationWorkersWait, DefaultKVS))) + if err != nil { + return cfg, err + } + if eduration < 0 { + return cfg, config.ErrInvalidBatchExpirationWorkersWait(nil) + } + + if rduration > 0 { + cfg.ReplicationWorkersWait = rduration + } + + if kduration > 0 { + cfg.KeyRotationWorkersWait = kduration + } + + if eduration > 0 { + cfg.ExpirationWorkersWait = eduration + } + + return cfg, nil +} diff --git a/internal/config/batch/help.go b/internal/config/batch/help.go new file mode 100644 index 000000000..e218edd14 --- /dev/null +++ b/internal/config/batch/help.go @@ -0,0 +1,49 @@ +// Copyright (c) 2015-2023 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package batch + +import "github.com/minio/minio/internal/config" + +// Help template for batch feature. +var ( + defaultHelpPostfix = func(key string) string { + return config.DefaultHelpPostfix(DefaultKVS, key) + } + + // Help provides help for config values + Help = config.HelpKVS{ + config.HelpKV{ + Key: ReplicationWorkersWait, + Description: `maximum sleep duration between objects to slow down batch replication operation` + defaultHelpPostfix(ReplicationWorkersWait), + Optional: true, + Type: "duration", + }, + config.HelpKV{ + Key: KeyRotationWorkersWait, + Description: `maximum sleep duration between objects to slow down batch keyrotation operation` + defaultHelpPostfix(KeyRotationWorkersWait), + Optional: true, + Type: "duration", + }, + config.HelpKV{ + Key: ExpirationWorkersWait, + Description: "maximum sleep duration between objects to slow down batch expiration operation" + defaultHelpPostfix(ExpirationWorkersWait), + Optional: true, + Type: "duration", + }, + } +) diff --git a/internal/config/config.go b/internal/config/config.go index 05c9a4d20..4da4471be 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -118,6 +118,8 @@ const ( SubnetSubSys = madmin.SubnetSubSys CallhomeSubSys = madmin.CallhomeSubSys DriveSubSys = madmin.DriveSubSys + BatchSubSys = madmin.BatchSubSys + // Add new constants here (similar to above) if you add new fields to config. ) @@ -185,6 +187,7 @@ var SubSystemsDynamic = set.CreateStringSet( AuditKafkaSubSys, StorageClassSubSys, CacheSubSys, + BatchSubSys, ) // SubSystemsSingleTargets - subsystems which only support single target. @@ -207,6 +210,7 @@ var SubSystemsSingleTargets = set.CreateStringSet( CallhomeSubSys, DriveSubSys, CacheSubSys, + BatchSubSys, ) // Constant separators diff --git a/internal/config/errors.go b/internal/config/errors.go index 245259d17..751152081 100644 --- a/internal/config/errors.go +++ b/internal/config/errors.go @@ -224,4 +224,19 @@ Examples: "", "MINIO_API_TRANSITION_WORKERS: should be >= GOMAXPROCS/2", ) + ErrInvalidBatchKeyRotationWorkersWait = newErrFn( + "Invalid value for batch key rotation workers wait", + "Please input a non-negative duration", + "keyrotation_workers_wait should be > 0ms", + ) + ErrInvalidBatchReplicationWorkersWait = newErrFn( + "Invalid value for batch replication workers wait", + "Please input a non-negative duration", + "replication_workers_wait should be > 0ms", + ) + ErrInvalidBatchExpirationWorkersWait = newErrFn( + "Invalid value for batch expiration workers wait", + "Please input a non-negative duration", + "expiration_workers_wait should be > 0ms", + ) )