fix: Use xtime duration to parse batch job (#20117)

This commit is contained in:
jiuker 2024-07-23 15:05:53 +08:00 committed by GitHub
parent 8e618d45fc
commit b3a94c4e85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 152 additions and 39 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/minio/pkg/v3/env"
"github.com/minio/pkg/v3/wildcard"
"github.com/minio/pkg/v3/workers"
"github.com/minio/pkg/v3/xtime"
"gopkg.in/yaml.v3"
)
@ -116,7 +117,7 @@ func (p BatchJobExpirePurge) Validate() error {
// BatchJobExpireFilter holds all the filters currently supported for batch replication
type BatchJobExpireFilter struct {
line, col int
OlderThan time.Duration `yaml:"olderThan,omitempty" json:"olderThan"`
OlderThan xtime.Duration `yaml:"olderThan,omitempty" json:"olderThan"`
CreatedBefore *time.Time `yaml:"createdBefore,omitempty" json:"createdBefore"`
Tags []BatchJobKV `yaml:"tags,omitempty" json:"tags"`
Metadata []BatchJobKV `yaml:"metadata,omitempty" json:"metadata"`
@ -162,7 +163,7 @@ func (ef BatchJobExpireFilter) Matches(obj ObjectInfo, now time.Time) bool {
if len(ef.Name) > 0 && !wildcard.Match(ef.Name, obj.Name) {
return false
}
if ef.OlderThan > 0 && now.Sub(obj.ModTime) <= ef.OlderThan {
if ef.OlderThan > 0 && now.Sub(obj.ModTime) <= ef.OlderThan.D() {
return false
}

View File

@ -306,7 +306,7 @@ func (z *BatchJobExpireFilter) DecodeMsg(dc *msgp.Reader) (err error) {
}
switch msgp.UnsafeString(field) {
case "OlderThan":
z.OlderThan, err = dc.ReadDuration()
err = z.OlderThan.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
@ -433,7 +433,7 @@ func (z *BatchJobExpireFilter) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
err = en.WriteDuration(z.OlderThan)
err = z.OlderThan.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
@ -544,7 +544,11 @@ func (z *BatchJobExpireFilter) MarshalMsg(b []byte) (o []byte, err error) {
// map header, size 8
// string "OlderThan"
o = append(o, 0x88, 0xa9, 0x4f, 0x6c, 0x64, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e)
o = msgp.AppendDuration(o, z.OlderThan)
o, err = z.OlderThan.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
}
// string "CreatedBefore"
o = append(o, 0xad, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x42, 0x65, 0x66, 0x6f, 0x72, 0x65)
if z.CreatedBefore == nil {
@ -613,7 +617,7 @@ func (z *BatchJobExpireFilter) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
switch msgp.UnsafeString(field) {
case "OlderThan":
z.OlderThan, bts, err = msgp.ReadDurationBytes(bts)
bts, err = z.OlderThan.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
@ -734,7 +738,7 @@ func (z *BatchJobExpireFilter) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BatchJobExpireFilter) Msgsize() (s int) {
s = 1 + 10 + msgp.DurationSize + 14
s = 1 + 10 + z.OlderThan.Msgsize() + 14
if z.CreatedBefore == nil {
s += msgp.NilSize
} else {

View File

@ -20,7 +20,7 @@ package cmd
import (
"testing"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)
func TestParseBatchJobExpire(t *testing.T) {
@ -32,7 +32,7 @@ expire: # Expire objects that match a condition
rules:
- type: object # regular objects with zero or more older versions
name: NAME # match object names that satisfy the wildcard expression.
olderThan: 70h # match objects older than this value
olderThan: 7d10h # match objects older than this value
createdBefore: "2006-01-02T15:04:05.00Z" # match objects created before "date"
tags:
- key: name
@ -64,7 +64,7 @@ expire: # Expire objects that match a condition
delay: 500ms # least amount of delay between each retry
`
var job BatchJobRequest
err := yaml.UnmarshalStrict([]byte(expireYaml), &job)
err := yaml.Unmarshal([]byte(expireYaml), &job)
if err != nil {
t.Fatal("Failed to parse batch-job-expire yaml", err)
}

View File

@ -287,12 +287,12 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
isStorageClassOnly := len(r.Flags.Filter.Metadata) == 1 && strings.EqualFold(r.Flags.Filter.Metadata[0].Key, xhttp.AmzStorageClass)
skip := func(oi ObjectInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan {
if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan.D() {
// skip all objects that are newer than specified older duration
return true
}
if r.Flags.Filter.NewerThan > 0 && time.Since(oi.ModTime) >= r.Flags.Filter.NewerThan {
if r.Flags.Filter.NewerThan > 0 && time.Since(oi.ModTime) >= r.Flags.Filter.NewerThan.D() {
// skip all objects that are older than specified newer duration
return true
}
@ -1022,12 +1022,12 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
selectObj := func(info FileInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan {
if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan.D() {
// skip all objects that are newer than specified older duration
return false
}
if r.Flags.Filter.NewerThan > 0 && time.Since(info.ModTime) >= r.Flags.Filter.NewerThan {
if r.Flags.Filter.NewerThan > 0 && time.Since(info.ModTime) >= r.Flags.Filter.NewerThan.D() {
// skip all objects that are older than specified newer duration
return false
}

View File

@ -21,8 +21,8 @@ import (
"time"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio/internal/auth"
"github.com/minio/pkg/v3/xtime"
)
//go:generate msgp -file $GOFILE
@ -65,12 +65,12 @@ import (
// BatchReplicateFilter holds all the filters currently supported for batch replication
type BatchReplicateFilter struct {
NewerThan time.Duration `yaml:"newerThan,omitempty" json:"newerThan"`
OlderThan time.Duration `yaml:"olderThan,omitempty" json:"olderThan"`
CreatedAfter time.Time `yaml:"createdAfter,omitempty" json:"createdAfter"`
CreatedBefore time.Time `yaml:"createdBefore,omitempty" json:"createdBefore"`
Tags []BatchJobKV `yaml:"tags,omitempty" json:"tags"`
Metadata []BatchJobKV `yaml:"metadata,omitempty" json:"metadata"`
NewerThan xtime.Duration `yaml:"newerThan,omitempty" json:"newerThan"`
OlderThan xtime.Duration `yaml:"olderThan,omitempty" json:"olderThan"`
CreatedAfter time.Time `yaml:"createdAfter,omitempty" json:"createdAfter"`
CreatedBefore time.Time `yaml:"createdBefore,omitempty" json:"createdBefore"`
Tags []BatchJobKV `yaml:"tags,omitempty" json:"tags"`
Metadata []BatchJobKV `yaml:"metadata,omitempty" json:"metadata"`
}
// BatchJobReplicateFlags various configurations for replication job definition currently includes

View File

@ -1409,13 +1409,13 @@ func (z *BatchReplicateFilter) DecodeMsg(dc *msgp.Reader) (err error) {
}
switch msgp.UnsafeString(field) {
case "NewerThan":
z.NewerThan, err = dc.ReadDuration()
err = z.NewerThan.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "NewerThan")
return
}
case "OlderThan":
z.OlderThan, err = dc.ReadDuration()
err = z.OlderThan.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
@ -1489,7 +1489,7 @@ func (z *BatchReplicateFilter) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
err = en.WriteDuration(z.NewerThan)
err = z.NewerThan.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "NewerThan")
return
@ -1499,7 +1499,7 @@ func (z *BatchReplicateFilter) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
err = en.WriteDuration(z.OlderThan)
err = z.OlderThan.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
@ -1567,10 +1567,18 @@ func (z *BatchReplicateFilter) MarshalMsg(b []byte) (o []byte, err error) {
// map header, size 6
// string "NewerThan"
o = append(o, 0x86, 0xa9, 0x4e, 0x65, 0x77, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e)
o = msgp.AppendDuration(o, z.NewerThan)
o, err = z.NewerThan.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "NewerThan")
return
}
// string "OlderThan"
o = append(o, 0xa9, 0x4f, 0x6c, 0x64, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e)
o = msgp.AppendDuration(o, z.OlderThan)
o, err = z.OlderThan.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
}
// string "CreatedAfter"
o = append(o, 0xac, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x66, 0x74, 0x65, 0x72)
o = msgp.AppendTime(o, z.CreatedAfter)
@ -1619,13 +1627,13 @@ func (z *BatchReplicateFilter) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
switch msgp.UnsafeString(field) {
case "NewerThan":
z.NewerThan, bts, err = msgp.ReadDurationBytes(bts)
bts, err = z.NewerThan.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "NewerThan")
return
}
case "OlderThan":
z.OlderThan, bts, err = msgp.ReadDurationBytes(bts)
bts, err = z.OlderThan.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "OlderThan")
return
@ -1694,7 +1702,7 @@ func (z *BatchReplicateFilter) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BatchReplicateFilter) Msgsize() (s int) {
s = 1 + 10 + msgp.DurationSize + 10 + msgp.DurationSize + 13 + msgp.TimeSize + 14 + msgp.TimeSize + 5 + msgp.ArrayHeaderSize
s = 1 + 10 + z.NewerThan.Msgsize() + 10 + z.OlderThan.Msgsize() + 13 + msgp.TimeSize + 14 + msgp.TimeSize + 5 + msgp.ArrayHeaderSize
for za0001 := range z.Tags {
s += z.Tags[za0001].Msgsize()
}

100
cmd/batch-replicate_test.go Normal file
View File

@ -0,0 +1,100 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"testing"
"gopkg.in/yaml.v3"
)
func TestParseBatchJobReplicate(t *testing.T) {
replicateYaml := `
replicate:
apiVersion: v1
# source of the objects to be replicated
source:
type: minio # valid values are "s3" or "minio"
bucket: mytest
prefix: object-prefix1 # 'PREFIX' is optional
# If your source is the 'local' alias specified to 'mc batch start', then the 'endpoint' and 'credentials' fields are optional and can be omitted
# Either the 'source' or 'remote' *must* be the "local" deployment
# endpoint: "http://127.0.0.1:9000"
# # path: "on|off|auto" # "on" enables path-style bucket lookup. "off" enables virtual host (DNS)-style bucket lookup. Defaults to "auto"
# credentials:
# accessKey: minioadmin # Required
# secretKey: minioadmin # Required
# # sessionToken: SESSION-TOKEN # Optional only available when rotating credentials are used
snowball: # automatically activated if the source is local
disable: true # optionally turn-off snowball archive transfer
# batch: 100 # upto this many objects per archive
# inmemory: true # indicates if the archive must be staged locally or in-memory
# compress: false # S2/Snappy compressed archive
# smallerThan: 5MiB # create archive for all objects smaller than 5MiB
# skipErrs: false # skips any source side read() errors
# target where the objects must be replicated
target:
type: minio # valid values are "s3" or "minio"
bucket: mytest
prefix: stage # 'PREFIX' is optional
# If your source is the 'local' alias specified to 'mc batch start', then the 'endpoint' and 'credentials' fields are optional and can be omitted
# Either the 'source' or 'remote' *must* be the "local" deployment
endpoint: "http://127.0.0.1:9001"
# path: "on|off|auto" # "on" enables path-style bucket lookup. "off" enables virtual host (DNS)-style bucket lookup. Defaults to "auto"
credentials:
accessKey: minioadmin
secretKey: minioadmin
# sessionToken: SESSION-TOKEN # Optional only available when rotating credentials are used
# NOTE: All flags are optional
# - filtering criteria only applies for all source objects match the criteria
# - configurable notification endpoints
# - configurable retries for the job (each retry skips successfully previously replaced objects)
flags:
filter:
newerThan: "7d10h31s" # match objects newer than this value (e.g. 7d10h31s)
olderThan: "7d" # match objects older than this value (e.g. 7d10h31s)
# createdAfter: "date" # match objects created after "date"
# createdBefore: "date" # match objects created before "date"
## NOTE: tags are not supported when "source" is remote.
tags:
- key: "name"
value: "pick*" # match objects with tag 'name', with all values starting with 'pick'
metadata:
- key: "content-type"
value: "image/*" # match objects with 'content-type', with all values starting with 'image/'
# notify:
# endpoint: "https://notify.endpoint" # notification endpoint to receive job status events
# token: "Bearer xxxxx" # optional authentication token for the notification endpoint
#
# retry:
# attempts: 10 # number of retries for the job before giving up
# delay: "500ms" # least amount of delay between each retry
`
var job BatchJobRequest
err := yaml.Unmarshal([]byte(replicateYaml), &job)
if err != nil {
t.Fatal("Failed to parse batch-job-replicate yaml", err)
}
}

6
go.mod
View File

@ -54,7 +54,7 @@ require (
github.com/minio/madmin-go/v3 v3.0.58
github.com/minio/minio-go/v7 v7.0.73
github.com/minio/mux v1.9.0
github.com/minio/pkg/v3 v3.0.8
github.com/minio/pkg/v3 v3.0.9
github.com/minio/selfupdate v0.6.0
github.com/minio/simdjson-go v0.4.5
github.com/minio/sio v0.4.0
@ -66,7 +66,7 @@ require (
github.com/nats-io/stan.go v0.10.4
github.com/ncw/directio v1.0.5
github.com/nsqio/go-nsq v1.1.0
github.com/philhofer/fwd v1.1.2
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.6
@ -81,7 +81,7 @@ require (
github.com/rs/cors v1.11.0
github.com/secure-io/sio-go v0.3.1
github.com/shirou/gopsutil/v3 v3.24.5
github.com/tinylib/msgp v1.1.9
github.com/tinylib/msgp v1.2.0
github.com/valyala/bytebufferpool v1.0.0
github.com/xdg/scram v1.0.5
github.com/zeebo/xxh3 v1.0.2

12
go.sum
View File

@ -470,8 +470,8 @@ github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA=
github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ=
github.com/minio/pkg/v2 v2.0.19 h1:r187/k/oVH9H0DDwvLY5WipkJaZ4CLd4KI3KgIUExR0=
github.com/minio/pkg/v2 v2.0.19/go.mod h1:luK9LAhQlAPzSuF6F326XSCKjMc1G3Tbh+a9JYwqh8M=
github.com/minio/pkg/v3 v3.0.8 h1:trJw6D3LzKQ96Hl5nWLwBpstaO56VNdsOmR5rowmDjc=
github.com/minio/pkg/v3 v3.0.8/go.mod h1:njlf539caYrgXqn/CXewqvkqBIMDTQo9oBBEL34LzY0=
github.com/minio/pkg/v3 v3.0.9 h1:LFmPKkmqWYGs8Y689zs0EKkJ/9l6rnBcLtjWNLG0lEI=
github.com/minio/pkg/v3 v3.0.9/go.mod h1:7I+o1o3vbrxVKBiFE5ifUADQMUnhiKdhqmQiq65ylm8=
github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU=
github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
@ -544,8 +544,8 @@ github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzb
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw=
github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0=
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 h1:jYi87L8j62qkXzaYHAQAhEapgukhenIMZRBKTNRLHJ4=
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
@ -659,8 +659,8 @@ github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhso
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU=
github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k=
github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY=
github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro=
github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY=
github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYgY=