diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 7c93a4b17..ed6047f28 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -527,6 +527,61 @@ func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo return oi } +func (r BatchJobReplicateV1) writeAsArchive(ctx context.Context, objAPI ObjectLayer, remoteClnt *minio.Client, entries []ObjectInfo) error { + input := make(chan minio.SnowballObject, 1) + opts := minio.SnowballOptions{ + Opts: minio.PutObjectOptions{}, + InMemory: *r.Source.Snowball.InMemory, + Compress: *r.Source.Snowball.Compress, + SkipErrs: *r.Source.Snowball.SkipErrs, + } + + go func() { + defer close(input) + + for _, entry := range entries { + gr, err := objAPI.GetObjectNInfo(ctx, r.Source.Bucket, + entry.Name, nil, nil, ObjectOptions{ + VersionID: entry.VersionID, + }) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + snowballObj := minio.SnowballObject{ + // Create path to store objects within the bucket. + Key: entry.Name, + Size: entry.Size, + ModTime: entry.ModTime, + VersionID: entry.VersionID, + Content: gr, + Headers: make(http.Header), + Close: func() { + gr.Close() + }, + } + + opts, err := batchReplicationOpts(ctx, "", gr.ObjInfo) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + for k, vals := range opts.Header() { + for _, v := range vals { + snowballObj.Headers.Add(k, v) + } + } + + input <- snowballObj + } + }() + + // Collect and upload all entries. + return remoteClnt.PutObjectsSnowball(ctx, r.Target.Bucket, opts, input) +} + // ReplicateToTarget read from source and replicate to configured target func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectLayer, c *miniogo.Core, srcObjInfo ObjectInfo, retry bool) error { srcBucket := r.Source.Bucket @@ -941,8 +996,73 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba if err != nil { return err } + c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) + var ( + walkCh = make(chan ObjectInfo, 100) + slowCh = make(chan ObjectInfo, 100) + ) + + if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { + go func() { + defer close(slowCh) + + // Snowball currently needs the high level minio-go Client, not the Core one + cl, err := miniogo.New(u.Host, &miniogo.Options{ + Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken), + Secure: u.Scheme == "https", + Transport: getRemoteInstanceTransport, + BucketLookup: lookupStyle(r.Target.Path), + }) + if err != nil { + logger.LogIf(ctx, err) + return + } + + // Already validated before arriving here + smallerThan, _ := humanize.ParseBytes(*r.Source.Snowball.SmallerThan) + + var ( + obj = ObjectInfo{} + batch = make([]ObjectInfo, 0, *r.Source.Snowball.Batch) + valid = true + ) + + for valid { + obj, valid = <-walkCh + + if !valid { + goto write + } + + if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { + slowCh <- obj + continue + } + + batch = append(batch, obj) + + if len(batch) < *r.Source.Snowball.Batch { + continue + } + + write: + if len(batch) > 0 { + if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { + logger.LogIf(ctx, err) + for _, b := range batch { + slowCh <- b + } + } + batch = batch[:0] + } + } + }() + } else { + slowCh = walkCh + } + workerSize, err := strconv.Atoi(env.Get("_MINIO_BATCH_REPLICATION_WORKERS", strconv.Itoa(runtime.GOMAXPROCS(0)/2))) if err != nil { return err @@ -963,8 +1083,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba // one of source/target is s3, skip delete marker and all versions under the same object name. s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 - results := make(chan ObjectInfo, 100) - if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{ + if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, walkCh, ObjectOptions{ WalkMarker: lastObject, WalkFilter: selectObj, }); err != nil { @@ -976,7 +1095,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba prevObj := "" skipReplicate := false - for result := range results { + for result := range slowCh { result := result if result.Name != prevObj { prevObj = result.Name @@ -1090,6 +1209,9 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, job BatchJobRequest, if err := r.Source.Type.Validate(); err != nil { return err } + if err := r.Source.Snowball.Validate(); err != nil { + return err + } if r.Source.Creds.Empty() && r.Target.Creds.Empty() { return errInvalidArgument } @@ -1399,10 +1521,38 @@ func (a adminAPIHandlers) StartBatchJob(w http.ResponseWriter, r *http.Request) return } + // Validate the incoming job request + if err := job.Validate(ctx, objectAPI); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + job.ID = fmt.Sprintf("%s:%d", shortuuid.New(), GetProxyEndpointLocalIndex(globalProxyEndpoints)) job.User = user job.Started = time.Now() + // Fill with default values + if job.Replicate != nil { + if job.Replicate.Source.Snowball.Disable == nil { + job.Replicate.Source.Snowball.Disable = ptr(false) + } + if job.Replicate.Source.Snowball.Batch == nil { + job.Replicate.Source.Snowball.Batch = ptr(100) + } + if job.Replicate.Source.Snowball.InMemory == nil { + job.Replicate.Source.Snowball.InMemory = ptr(true) + } + if job.Replicate.Source.Snowball.Compress == nil { + job.Replicate.Source.Snowball.Compress = ptr(false) + } + if job.Replicate.Source.Snowball.SmallerThan == nil { + job.Replicate.Source.Snowball.SmallerThan = ptr("5MiB") + } + if job.Replicate.Source.Snowball.SkipErrs == nil { + job.Replicate.Source.Snowball.SkipErrs = ptr(true) + } + } + if err := job.save(ctx, objectAPI); err != nil { writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return diff --git a/cmd/batch-job-common-types.go b/cmd/batch-job-common-types.go index 796a5cf7f..1b1e379be 100644 --- a/cmd/batch-job-common-types.go +++ b/cmd/batch-job-common-types.go @@ -18,9 +18,11 @@ package cmd import ( + "errors" "strings" "time" + "github.com/dustin/go-humanize" "github.com/minio/pkg/v2/wildcard" ) @@ -81,3 +83,32 @@ func (r BatchJobRetry) Validate() error { return nil } + +// # snowball based archive transfer is by default enabled when source +// # is local and target is remote which is also minio. +// snowball: +// disable: false # optionally turn-off snowball archive transfer +// batch: 100 # upto this many objects per archive +// inmemory: true # indicates if the archive must be staged locally or in-memory +// compress: true # S2/Snappy compressed archive +// smallerThan: 5MiB # create archive for all objects smaller than 5MiB +// skipErrs: false # skips any source side read() errors + +// BatchJobSnowball describes the snowball feature when replicating objects from a local source to a remote target +type BatchJobSnowball struct { + Disable *bool `yaml:"disable" json:"disable"` + Batch *int `yaml:"batch" json:"batch"` + InMemory *bool `yaml:"inmemory" json:"inmemory"` + Compress *bool `yaml:"compress" json:"compress"` + SmallerThan *string `yaml:"smallerThan" json:"smallerThan"` + SkipErrs *bool `yaml:"skipErrs" json:"skipErrs"` +} + +// Validate the snowball parameters in the job description +func (b BatchJobSnowball) Validate() error { + if *b.Batch <= 0 { + return errors.New("batch number should be non positive zero") + } + _, err := humanize.ParseBytes(*b.SmallerThan) + return err +} diff --git a/cmd/batch-job-common-types_gen.go b/cmd/batch-job-common-types_gen.go index ca3fb48c6..8b3b519a3 100644 --- a/cmd/batch-job-common-types_gen.go +++ b/cmd/batch-job-common-types_gen.go @@ -389,3 +389,470 @@ func (z BatchJobRetry) Msgsize() (s int) { s = 1 + 9 + msgp.IntSize + 6 + msgp.DurationSize return } + +// DecodeMsg implements msgp.Decodable +func (z *BatchJobSnowball) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Disable": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "Disable") + return + } + z.Disable = nil + } else { + if z.Disable == nil { + z.Disable = new(bool) + } + *z.Disable, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Disable") + return + } + } + case "Batch": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "Batch") + return + } + z.Batch = nil + } else { + if z.Batch == nil { + z.Batch = new(int) + } + *z.Batch, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Batch") + return + } + } + case "InMemory": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "InMemory") + return + } + z.InMemory = nil + } else { + if z.InMemory == nil { + z.InMemory = new(bool) + } + *z.InMemory, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "InMemory") + return + } + } + case "Compress": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "Compress") + return + } + z.Compress = nil + } else { + if z.Compress == nil { + z.Compress = new(bool) + } + *z.Compress, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Compress") + return + } + } + case "SmallerThan": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "SmallerThan") + return + } + z.SmallerThan = nil + } else { + if z.SmallerThan == nil { + z.SmallerThan = new(string) + } + *z.SmallerThan, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "SmallerThan") + return + } + } + case "SkipErrs": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "SkipErrs") + return + } + z.SkipErrs = nil + } else { + if z.SkipErrs == nil { + z.SkipErrs = new(bool) + } + *z.SkipErrs, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "SkipErrs") + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *BatchJobSnowball) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "Disable" + err = en.Append(0x86, 0xa7, 0x44, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65) + if err != nil { + return + } + if z.Disable == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteBool(*z.Disable) + if err != nil { + err = msgp.WrapError(err, "Disable") + return + } + } + // write "Batch" + err = en.Append(0xa5, 0x42, 0x61, 0x74, 0x63, 0x68) + if err != nil { + return + } + if z.Batch == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteInt(*z.Batch) + if err != nil { + err = msgp.WrapError(err, "Batch") + return + } + } + // write "InMemory" + err = en.Append(0xa8, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79) + if err != nil { + return + } + if z.InMemory == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteBool(*z.InMemory) + if err != nil { + err = msgp.WrapError(err, "InMemory") + return + } + } + // write "Compress" + err = en.Append(0xa8, 0x43, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73) + if err != nil { + return + } + if z.Compress == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteBool(*z.Compress) + if err != nil { + err = msgp.WrapError(err, "Compress") + return + } + } + // write "SmallerThan" + err = en.Append(0xab, 0x53, 0x6d, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e) + if err != nil { + return + } + if z.SmallerThan == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteString(*z.SmallerThan) + if err != nil { + err = msgp.WrapError(err, "SmallerThan") + return + } + } + // write "SkipErrs" + err = en.Append(0xa8, 0x53, 0x6b, 0x69, 0x70, 0x45, 0x72, 0x72, 0x73) + if err != nil { + return + } + if z.SkipErrs == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = en.WriteBool(*z.SkipErrs) + if err != nil { + err = msgp.WrapError(err, "SkipErrs") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *BatchJobSnowball) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "Disable" + o = append(o, 0x86, 0xa7, 0x44, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65) + if z.Disable == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendBool(o, *z.Disable) + } + // string "Batch" + o = append(o, 0xa5, 0x42, 0x61, 0x74, 0x63, 0x68) + if z.Batch == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendInt(o, *z.Batch) + } + // string "InMemory" + o = append(o, 0xa8, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79) + if z.InMemory == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendBool(o, *z.InMemory) + } + // string "Compress" + o = append(o, 0xa8, 0x43, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73) + if z.Compress == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendBool(o, *z.Compress) + } + // string "SmallerThan" + o = append(o, 0xab, 0x53, 0x6d, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x54, 0x68, 0x61, 0x6e) + if z.SmallerThan == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendString(o, *z.SmallerThan) + } + // string "SkipErrs" + o = append(o, 0xa8, 0x53, 0x6b, 0x69, 0x70, 0x45, 0x72, 0x72, 0x73) + if z.SkipErrs == nil { + o = msgp.AppendNil(o) + } else { + o = msgp.AppendBool(o, *z.SkipErrs) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BatchJobSnowball) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Disable": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.Disable = nil + } else { + if z.Disable == nil { + z.Disable = new(bool) + } + *z.Disable, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disable") + return + } + } + case "Batch": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.Batch = nil + } else { + if z.Batch == nil { + z.Batch = new(int) + } + *z.Batch, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Batch") + return + } + } + case "InMemory": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.InMemory = nil + } else { + if z.InMemory == nil { + z.InMemory = new(bool) + } + *z.InMemory, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "InMemory") + return + } + } + case "Compress": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.Compress = nil + } else { + if z.Compress == nil { + z.Compress = new(bool) + } + *z.Compress, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Compress") + return + } + } + case "SmallerThan": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.SmallerThan = nil + } else { + if z.SmallerThan == nil { + z.SmallerThan = new(string) + } + *z.SmallerThan, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SmallerThan") + return + } + } + case "SkipErrs": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.SkipErrs = nil + } else { + if z.SkipErrs == nil { + z.SkipErrs = new(bool) + } + *z.SkipErrs, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SkipErrs") + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *BatchJobSnowball) Msgsize() (s int) { + s = 1 + 8 + if z.Disable == nil { + s += msgp.NilSize + } else { + s += msgp.BoolSize + } + s += 6 + if z.Batch == nil { + s += msgp.NilSize + } else { + s += msgp.IntSize + } + s += 9 + if z.InMemory == nil { + s += msgp.NilSize + } else { + s += msgp.BoolSize + } + s += 9 + if z.Compress == nil { + s += msgp.NilSize + } else { + s += msgp.BoolSize + } + s += 12 + if z.SmallerThan == nil { + s += msgp.NilSize + } else { + s += msgp.StringPrefixSize + len(*z.SmallerThan) + } + s += 9 + if z.SkipErrs == nil { + s += msgp.NilSize + } else { + s += msgp.BoolSize + } + return +} diff --git a/cmd/batch-job-common-types_gen_test.go b/cmd/batch-job-common-types_gen_test.go index 6c50a7708..9354197a9 100644 --- a/cmd/batch-job-common-types_gen_test.go +++ b/cmd/batch-job-common-types_gen_test.go @@ -347,3 +347,116 @@ func BenchmarkDecodeBatchJobRetry(b *testing.B) { } } } + +func TestMarshalUnmarshalBatchJobSnowball(t *testing.T) { + v := BatchJobSnowball{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBatchJobSnowball(b *testing.B) { + v := BatchJobSnowball{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBatchJobSnowball(b *testing.B) { + v := BatchJobSnowball{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBatchJobSnowball(b *testing.B) { + v := BatchJobSnowball{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBatchJobSnowball(t *testing.T) { + v := BatchJobSnowball{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBatchJobSnowball Msgsize() is inaccurate") + } + + vn := BatchJobSnowball{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBatchJobSnowball(b *testing.B) { + v := BatchJobSnowball{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBatchJobSnowball(b *testing.B) { + v := BatchJobSnowball{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/batch-replicate.go b/cmd/batch-replicate.go index 3728096a4..2e90b0f36 100644 --- a/cmd/batch-replicate.go +++ b/cmd/batch-replicate.go @@ -155,6 +155,7 @@ type BatchJobReplicateSource struct { Endpoint string `yaml:"endpoint" json:"endpoint"` Path string `yaml:"path" json:"path"` Creds BatchJobReplicateCredentials `yaml:"credentials" json:"credentials"` + Snowball BatchJobSnowball `yaml:"snowball" json:"snowball"` } // ValidPath returns true if path is valid diff --git a/cmd/batch-replicate_gen.go b/cmd/batch-replicate_gen.go index d6be8d555..26a433ddf 100644 --- a/cmd/batch-replicate_gen.go +++ b/cmd/batch-replicate_gen.go @@ -469,6 +469,12 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) { } } } + case "Snowball": + err = z.Snowball.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Snowball") + return + } default: err = dc.Skip() if err != nil { @@ -482,9 +488,9 @@ func (z *BatchJobReplicateSource) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 7 // write "Type" - err = en.Append(0x86, 0xa4, 0x54, 0x79, 0x70, 0x65) + err = en.Append(0x87, 0xa4, 0x54, 0x79, 0x70, 0x65) if err != nil { return } @@ -569,15 +575,25 @@ func (z *BatchJobReplicateSource) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Creds", "SessionToken") return } + // write "Snowball" + err = en.Append(0xa8, 0x53, 0x6e, 0x6f, 0x77, 0x62, 0x61, 0x6c, 0x6c) + if err != nil { + return + } + err = z.Snowball.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Snowball") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 7 // string "Type" - o = append(o, 0x86, 0xa4, 0x54, 0x79, 0x70, 0x65) + o = append(o, 0x87, 0xa4, 0x54, 0x79, 0x70, 0x65) o = msgp.AppendString(o, string(z.Type)) // string "Bucket" o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) @@ -603,6 +619,13 @@ func (z *BatchJobReplicateSource) MarshalMsg(b []byte) (o []byte, err error) { // string "SessionToken" o = append(o, 0xac, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e) o = msgp.AppendString(o, z.Creds.SessionToken) + // string "Snowball" + o = append(o, 0xa8, 0x53, 0x6e, 0x6f, 0x77, 0x62, 0x61, 0x6c, 0x6c) + o, err = z.Snowball.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Snowball") + return + } return } @@ -699,6 +722,12 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error) } } } + case "Snowball": + bts, err = z.Snowball.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Snowball") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -713,7 +742,7 @@ func (z *BatchJobReplicateSource) UnmarshalMsg(bts []byte) (o []byte, err error) // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *BatchJobReplicateSource) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) + s = 1 + 5 + msgp.StringPrefixSize + len(string(z.Type)) + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Prefix) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 5 + msgp.StringPrefixSize + len(z.Path) + 6 + 1 + 10 + msgp.StringPrefixSize + len(z.Creds.AccessKey) + 10 + msgp.StringPrefixSize + len(z.Creds.SecretKey) + 13 + msgp.StringPrefixSize + len(z.Creds.SessionToken) + 9 + z.Snowball.Msgsize() return } diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 29a39a724..4b9ddd18d 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1192,7 +1192,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h }) var opts ObjectOptions - opts, err = putOpts(ctx, r, bucket, object, metadata) + opts, err = putOptsFromReq(ctx, r, bucket, object, metadata) if err != nil { writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) return diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index d1c15f6e4..40b4fc2a1 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -111,21 +111,20 @@ var userMetadataKeyPrefixes = []string{ "x-minio-meta-", } -// extractMetadata extracts metadata from HTTP header and HTTP queryString. -func extractMetadata(ctx context.Context, r *http.Request) (metadata map[string]string, err error) { - query := r.Form - header := r.Header - metadata = make(map[string]string) - // Extract all query values. - err = extractMetadataFromMime(ctx, textproto.MIMEHeader(query), metadata) - if err != nil { - return nil, err - } +// extractMetadataFromReq extracts metadata from HTTP header and HTTP queryString. +func extractMetadataFromReq(ctx context.Context, r *http.Request) (metadata map[string]string, err error) { + return extractMetadata(ctx, textproto.MIMEHeader(r.Form), textproto.MIMEHeader(r.Header)) +} - // Extract all header values. - err = extractMetadataFromMime(ctx, textproto.MIMEHeader(header), metadata) - if err != nil { - return nil, err +func extractMetadata(ctx context.Context, mimesHeader ...textproto.MIMEHeader) (metadata map[string]string, err error) { + metadata = make(map[string]string) + + for _, hdr := range mimesHeader { + // Extract all query values. + err = extractMetadataFromMime(ctx, hdr, metadata) + if err != nil { + return nil, err + } } // Set content-type to default value if it is not set. diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index f432b7188..0a08dfc46 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -199,11 +199,15 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts } // get ObjectOptions for PUT calls from encryption headers and metadata -func putOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { +func putOptsFromReq(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { + return putOpts(ctx, bucket, object, r.Form.Get(xhttp.VersionID), r.Header, metadata) +} + +func putOpts(ctx context.Context, bucket, object, vid string, hdrs http.Header, metadata map[string]string) (opts ObjectOptions, err error) { versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object) versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object) - vid := strings.TrimSpace(r.Form.Get(xhttp.VersionID)) + vid = strings.TrimSpace(vid) if vid != "" && vid != nullVersionID { _, err := uuid.Parse(vid) if err != nil { @@ -221,54 +225,59 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada } } } + opts, err = putOptsFromHeaders(ctx, hdrs, metadata) + if err != nil { + return opts, InvalidArgument{ + Bucket: bucket, + Object: object, + Err: err, + } + } - mtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceMTime)) + opts.VersionID = vid + opts.Versioned = versioned + opts.VersionSuspended = versionSuspended + + // For directory objects skip creating new versions. + if isDirObject(object) && vid == "" { + opts.VersionID = nullVersionID + } + + return opts, nil +} + +func putOptsFromHeaders(ctx context.Context, hdr http.Header, metadata map[string]string) (opts ObjectOptions, err error) { + mtimeStr := strings.TrimSpace(hdr.Get(xhttp.MinIOSourceMTime)) var mtime time.Time if mtimeStr != "" { mtime, err = time.Parse(time.RFC3339Nano, mtimeStr) if err != nil { - return opts, InvalidArgument{ - Bucket: bucket, - Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceMTime, err), - } + return opts, fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceMTime, err) } } - retaintimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceObjectRetentionTimestamp)) + retaintimeStr := strings.TrimSpace(hdr.Get(xhttp.MinIOSourceObjectRetentionTimestamp)) var retaintimestmp time.Time if retaintimeStr != "" { retaintimestmp, err = time.Parse(time.RFC3339, retaintimeStr) if err != nil { - return opts, InvalidArgument{ - Bucket: bucket, - Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceObjectRetentionTimestamp, err), - } + return opts, fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceObjectRetentionTimestamp, err) } } - lholdtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceObjectLegalHoldTimestamp)) + lholdtimeStr := strings.TrimSpace(hdr.Get(xhttp.MinIOSourceObjectLegalHoldTimestamp)) var lholdtimestmp time.Time if lholdtimeStr != "" { lholdtimestmp, err = time.Parse(time.RFC3339, lholdtimeStr) if err != nil { - return opts, InvalidArgument{ - Bucket: bucket, - Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceObjectLegalHoldTimestamp, err), - } + return opts, fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceObjectLegalHoldTimestamp, err) } } - tagtimeStr := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceTaggingTimestamp)) + tagtimeStr := strings.TrimSpace(hdr.Get(xhttp.MinIOSourceTaggingTimestamp)) var taggingtimestmp time.Time if tagtimeStr != "" { taggingtimestmp, err = time.Parse(time.RFC3339, tagtimeStr) if err != nil { - return opts, InvalidArgument{ - Bucket: bucket, - Object: object, - Err: fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceTaggingTimestamp, err), - } + return opts, fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOSourceTaggingTimestamp, err) } } @@ -276,18 +285,14 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada metadata = make(map[string]string) } - wantCRC, err := hash.GetContentChecksum(r.Header) + wantCRC, err := hash.GetContentChecksum(hdr) if err != nil { - return opts, InvalidArgument{ - Bucket: bucket, - Object: object, - Err: fmt.Errorf("invalid/unknown checksum sent: %v", err), - } + return opts, fmt.Errorf("invalid/unknown checksum sent: %v", err) } - etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag)) + etag := strings.TrimSpace(hdr.Get(xhttp.MinIOSourceETag)) - if crypto.S3KMS.IsRequested(r.Header) { - keyID, context, err := crypto.S3KMS.ParseHTTP(r.Header) + if crypto.S3KMS.IsRequested(hdr) { + keyID, context, err := crypto.S3KMS.ParseHTTP(hdr) if err != nil { return ObjectOptions{}, err } @@ -298,29 +303,17 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada return ObjectOptions{ ServerSideEncryption: sseKms, UserDefined: metadata, - VersionID: vid, - Versioned: versioned, - VersionSuspended: versionSuspended, MTime: mtime, WantChecksum: wantCRC, PreserveETag: etag, }, nil } // default case of passing encryption headers and UserDefined metadata to backend - opts, err = getDefaultOpts(r.Header, false, metadata) + opts, err = getDefaultOpts(hdr, false, metadata) if err != nil { return opts, err } - opts.VersionID = vid - opts.Versioned = versioned - opts.VersionSuspended = versionSuspended - - // For directory objects skip creating new versions. - if isDirObject(object) && vid == "" { - opts.VersionID = nullVersionID - } - opts.MTime = mtime opts.ReplicationSourceLegalholdTimestamp = lholdtimestmp opts.ReplicationSourceRetentionTimestamp = retaintimestmp @@ -333,7 +326,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada // get ObjectOptions for Copy calls with encryption headers provided on the target side and source side metadata func copyDstOpts(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string) (opts ObjectOptions, err error) { - return putOpts(ctx, r, bucket, object, metadata) + return putOptsFromReq(ctx, r, bucket, object, metadata) } // get ObjectOptions for Copy calls with encryption headers provided on the source side diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 8ebdd17ac..181382f7c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -18,6 +18,7 @@ package cmd import ( + "archive/tar" "context" "encoding/hex" "encoding/xml" @@ -26,6 +27,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/textproto" "net/url" "os" "sort" @@ -873,7 +875,7 @@ func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta m // if x-amz-metadata-directive says REPLACE then // we extract metadata from the input headers. if isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) { - emetadata, err := extractMetadata(ctx, r) + emetadata, err := extractMetadataFromReq(ctx, r) if err != nil { return nil, err } @@ -1613,7 +1615,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } - metadata, err := extractMetadata(ctx, r) + metadata, err := extractMetadataFromReq(ctx, r) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -1754,7 +1756,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req pReader := NewPutObjReader(rawReader) var opts ObjectOptions - opts, err = putOpts(ctx, r, bucket, object, metadata) + opts, err = putOptsFromReq(ctx, r, bucket, object, metadata) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -2126,11 +2128,40 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano) } - // get encryption options - opts, err := putOpts(ctx, r, bucket, object, metadata) + var ( + versionID string + hdrs http.Header + ) + + if tarHdrs, ok := info.Sys().(*tar.Header); ok && len(tarHdrs.PAXRecords) > 0 { + versionID = tarHdrs.PAXRecords["minio.versionId"] + hdrs = make(http.Header) + for k, v := range tarHdrs.PAXRecords { + if k == "minio.versionId" { + continue + } + if strings.HasPrefix(k, "minio.metadata.") { + k = strings.TrimPrefix(k, "minio.metadata.") + hdrs.Set(k, v) + } + } + m, err := extractMetadata(ctx, textproto.MIMEHeader(hdrs)) + if err != nil { + return err + } + for k, v := range m { + metadata[k] = v + } + } else { + versionID = r.Form.Get(xhttp.VersionID) + hdrs = r.Header + } + + opts, err := putOpts(ctx, bucket, object, versionID, hdrs, metadata) if err != nil { return err } + opts.MTime = info.ModTime() if opts.MTime.Unix() <= 0 { opts.MTime = UTCNow() diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 261ef04f1..3dd05dcb4 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -129,7 +129,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r } // Extract metadata that needs to be saved. - metadata, err := extractMetadata(ctx, r) + metadata, err := extractMetadataFromReq(ctx, r) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -183,7 +183,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 } - opts, err := putOpts(ctx, r, bucket, object, metadata) + opts, err := putOptsFromReq(ctx, r, bucket, object, metadata) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return @@ -748,7 +748,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } - opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined) + opts, err = putOptsFromReq(ctx, r, bucket, object, mi.UserDefined) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return diff --git a/cmd/utils.go b/cmd/utils.go index d8c5c9546..863153aee 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -1269,3 +1269,7 @@ func stringsHasPrefixFold(s, prefix string) bool { // Test match with case first. return len(s) >= len(prefix) && (s[0:len(prefix)] == prefix || strings.EqualFold(s[0:len(prefix)], prefix)) } + +func ptr[T any](a T) *T { + return &a +} diff --git a/go.mod b/go.mod index 42653a69e..fb708578f 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/minio/highwayhash v1.0.2 github.com/minio/kes-go v0.2.0 github.com/minio/madmin-go/v3 v3.0.32 - github.com/minio/minio-go/v7 v7.0.64-0.20231119012610-6eebdd6d5eba + github.com/minio/minio-go/v7 v7.0.64 github.com/minio/mux v1.9.0 github.com/minio/pkg/v2 v2.0.3-0.20231107172951-8a60b89ec9b4 github.com/minio/selfupdate v0.6.0 diff --git a/go.sum b/go.sum index 36c2a7125..4c0ce6b9c 100644 --- a/go.sum +++ b/go.sum @@ -491,8 +491,8 @@ github.com/minio/mc v0.0.0-20231030184332-9f2fb2b6a9f8/go.mod h1:SoPU55ntH5d6IEq github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v6 v6.0.46/go.mod h1:qD0lajrGW49lKZLtXKtCB4X/qkMf0a5tBvN2PaZg7Gg= -github.com/minio/minio-go/v7 v7.0.64-0.20231119012610-6eebdd6d5eba h1:OF4FT9fUnXReMpMLQyoydIYJYPN3W3omFuD/TTL8cZg= -github.com/minio/minio-go/v7 v7.0.64-0.20231119012610-6eebdd6d5eba/go.mod h1:R4WVUR6ZTedlCcGwZRauLMIKjgyaWxhs4Mqi/OMPmEc= +github.com/minio/minio-go/v7 v7.0.64 h1:Zdza8HwOzkld0ZG/og50w56fKi6AAyfqfifmasD9n2Q= +github.com/minio/minio-go/v7 v7.0.64/go.mod h1:R4WVUR6ZTedlCcGwZRauLMIKjgyaWxhs4Mqi/OMPmEc= github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA= github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ= github.com/minio/pkg v1.7.5 h1:UOUJjewE5zoaDPlCMJtNx/swc1jT1ZR+IajT7hrLd44=