Store bucket created time as a metadata (#9465)

Fixes #9459
This commit is contained in:
Bala FA 2020-05-01 16:53:14 +00:00 committed by GitHub
parent 086be07bf5
commit 83ccae6c8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 499 additions and 53 deletions

View File

@ -46,8 +46,6 @@ import (
const (
getBucketVersioningResponse = `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`
objectLockConfig = "object-lock.xml"
bucketObjectLockEnabledConfigFile = "object-lock-enabled.json"
bucketObjectLockEnabledConfig = `{"x-amz-bucket-object-lock-enabled":true}`
)
// Check if there are buckets on server without corresponding entry in etcd backend and
@ -292,6 +290,14 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
for i := range bucketsInfo {
meta, err := loadBucketMetadata(ctx, objectAPI, bucketsInfo[i].Name)
logger.LogIf(ctx, err)
if err == nil {
bucketsInfo[i].Created = meta.Created
}
}
}
if s3Error == ErrAccessDenied {
@ -532,13 +538,12 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return
}
if objectLockEnabled {
configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile)
if err = saveConfig(ctx, objectAPI, configFile, []byte(bucketObjectLockEnabledConfig)); err != nil {
meta := newBucketMetadata(bucket)
meta.LockEnabled = objectLockEnabled
if err := meta.save(ctx, objectAPI); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
if err = globalDNSConfig.Put(bucket); err != nil {
objectAPI.DeleteBucket(ctx, bucket, false)
@ -576,15 +581,18 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return
}
if objectLockEnabled && !globalIsGateway {
configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile)
if err = saveConfig(ctx, objectAPI, configFile, []byte(bucketObjectLockEnabledConfig)); err != nil {
if !globalIsGateway {
meta := newBucketMetadata(bucket)
meta.LockEnabled = objectLockEnabled
if err := meta.save(ctx, objectAPI); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if objectLockEnabled {
globalBucketObjectLockConfig.Set(bucket, objectlock.Retention{})
globalNotificationSys.PutBucketObjectLockConfig(ctx, bucket, objectlock.Retention{})
}
}
// Make sure to add Location information here only for bucket
w.Header().Set(xhttp.Location, path.Clean(r.URL.Path)) // Clean any trailing slashes.
@ -942,6 +950,9 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
}
}
// Delete metadata, only log errors.
logger.LogIf(ctx, newBucketMetadata(bucket).delete(ctx, objectAPI))
globalNotificationSys.DeleteBucket(ctx, bucket)
// Write success response.
@ -1032,27 +1043,23 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
writeErrorResponse(ctx, w, apiErr, r.URL, guessIsBrowserReq(r))
return
}
configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile)
configData, err := readConfig(ctx, objectAPI, configFile)
meta, err := loadBucketMetadata(ctx, objectAPI, bucket)
if err != nil {
aerr := toAPIError(ctx, err)
if err == errConfigNotFound {
aerr = errorCodes.ToAPIErr(ErrObjectLockConfigurationNotAllowed)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r))
if !meta.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrObjectLockConfigurationNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
if string(configData) != bucketObjectLockEnabledConfig {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
return
}
data, err := xml.Marshal(config)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
configFile = path.Join(bucketConfigPrefix, bucket, objectLockConfig)
configFile := path.Join(bucketConfigPrefix, bucket, objectLockConfig)
if err = saveConfig(ctx, objectAPI, configFile, data); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
@ -1093,26 +1100,19 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile)
configData, err := readConfig(ctx, objectAPI, configFile)
meta, err := loadBucketMetadata(ctx, objectAPI, bucket)
if err != nil {
var aerr APIError
if err == errConfigNotFound {
aerr = errorCodes.ToAPIErr(ErrObjectLockConfigurationNotFound)
} else {
aerr = toAPIError(ctx, err)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r))
if !meta.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrObjectLockConfigurationNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
if string(configData) != bucketObjectLockEnabledConfig {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
return
}
configFile = path.Join(bucketConfigPrefix, bucket, objectLockConfig)
configData, err = readConfig(ctx, objectAPI, configFile)
configFile := path.Join(bucketConfigPrefix, bucket, objectLockConfig)
configData, err := readConfig(ctx, objectAPI, configFile)
if err != nil {
if err != errConfigNotFound {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))

165
cmd/bucket-meta.go Normal file
View File

@ -0,0 +1,165 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
"path"
"time"
"github.com/minio/minio/cmd/logger"
)
const (
legacyBucketObjectLockEnabledConfigFile = "object-lock-enabled.json"
legacyBucketObjectLockEnabledConfig = `{"x-amz-bucket-object-lock-enabled":true}`
bucketMetadataFile = ".metadata.bin"
bucketMetadataFormat = 1
bucketMetadataVersion = 1
)
//go:generate msgp -file $GOFILE -unexported
// bucketMetadata contains bucket metadata.
// When adding/removing fields, regenerate the marshal code using the go generate above.
// Only changing meaning of fields requires a version bump.
// bucketMetadataFormat refers to the format.
// bucketMetadataVersion can be used to track a rolling upgrade of a field.
type bucketMetadata struct {
Name string
Created time.Time
LockEnabled bool
}
// newBucketMetadata creates bucketMetadata with the supplied name and Created to Now.
func newBucketMetadata(name string) bucketMetadata {
return bucketMetadata{
Name: name,
Created: UTCNow(),
}
}
// loadBucketMeta loads the metadata of bucket by name from ObjectLayer o.
// If an error is returned the returned metadata will be default initialized.
func loadBucketMeta(ctx context.Context, o ObjectLayer, name string) (bucketMetadata, error) {
b := newBucketMetadata(name)
configFile := path.Join(bucketConfigPrefix, name, bucketMetadataFile)
data, err := readConfig(ctx, o, configFile)
if err != nil {
return b, err
}
if len(data) <= 4 {
return b, fmt.Errorf("loadBucketMetadata: no data")
}
// Read header
switch binary.LittleEndian.Uint16(data[0:2]) {
case bucketMetadataFormat:
default:
return b, fmt.Errorf("loadBucketMetadata: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case bucketMetadataVersion:
default:
return b, fmt.Errorf("loadBucketMetadata: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
// OK, parse data.
_, err = b.UnmarshalMsg(data[4:])
return b, err
}
var errMetaDataConverted = errors.New("metadata converted")
// loadBucketMetadata loads and migrates to bucket metadata.
func loadBucketMetadata(ctx context.Context, objectAPI ObjectLayer, bucket string) (bucketMetadata, error) {
meta, err := loadBucketMeta(ctx, objectAPI, bucket)
if err == nil {
return meta, nil
}
if err != errConfigNotFound {
return meta, err
}
// Control here means old bucket without bucket metadata. Hence we migrate existing settings.
if err = meta.convertLegacyLockconfig(ctx, objectAPI); err != nil {
return meta, err
}
return meta, errMetaDataConverted
}
func (b *bucketMetadata) convertLegacyLockconfig(ctx context.Context, objectAPI ObjectLayer) error {
configFile := path.Join(bucketConfigPrefix, b.Name, legacyBucketObjectLockEnabledConfigFile)
save := func() error {
if err := b.save(ctx, objectAPI); err != nil {
return err
}
logger.LogIf(ctx, deleteConfig(ctx, objectAPI, configFile))
return nil
}
configData, err := readConfig(ctx, objectAPI, configFile)
if err != nil {
if err != errConfigNotFound {
return err
}
return save()
}
if string(configData) != legacyBucketObjectLockEnabledConfig {
return fmt.Errorf("content mismatch in config file %v", configFile)
}
b.LockEnabled = true
return save()
}
// save config to supplied ObjectLayer o.
func (b *bucketMetadata) save(ctx context.Context, o ObjectLayer) error {
data := make([]byte, 4, b.Msgsize()+4)
// Put header
binary.LittleEndian.PutUint16(data[0:2], bucketMetadataFormat)
binary.LittleEndian.PutUint16(data[2:4], bucketMetadataVersion)
// Add data
data, err := b.MarshalMsg(data)
if err != nil {
return err
}
configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)
return saveConfig(ctx, o, configFile, data)
}
// delete the config metadata.
// If config does not exist no error is returned.
func (b bucketMetadata) delete(ctx context.Context, o ObjectLayer) error {
configFile := path.Join(bucketConfigPrefix, b.Name, bucketMetadataFile)
err := deleteConfig(ctx, o, configFile)
if err == errConfigNotFound {
// We don't care
err = nil
}
return err
}

160
cmd/bucket-meta_gen.go Normal file
View File

@ -0,0 +1,160 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *bucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Name":
z.Name, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Name")
return
}
case "Created":
z.Created, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "Created")
return
}
case "LockEnabled":
z.LockEnabled, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "LockEnabled")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z bucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "Name"
err = en.Append(0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Name)
if err != nil {
err = msgp.WrapError(err, "Name")
return
}
// write "Created"
err = en.Append(0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteTime(z.Created)
if err != nil {
err = msgp.WrapError(err, "Created")
return
}
// write "LockEnabled"
err = en.Append(0xab, 0x4c, 0x6f, 0x63, 0x6b, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteBool(z.LockEnabled)
if err != nil {
err = msgp.WrapError(err, "LockEnabled")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z bucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "Name"
o = append(o, 0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65)
o = msgp.AppendString(o, z.Name)
// string "Created"
o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64)
o = msgp.AppendTime(o, z.Created)
// string "LockEnabled"
o = append(o, 0xab, 0x4c, 0x6f, 0x63, 0x6b, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64)
o = msgp.AppendBool(o, z.LockEnabled)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *bucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Name":
z.Name, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Name")
return
}
case "Created":
z.Created, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Created")
return
}
case "LockEnabled":
z.LockEnabled, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "LockEnabled")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z bucketMetadata) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize
return
}

123
cmd/bucket-meta_gen_test.go Normal file
View File

@ -0,0 +1,123 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalbucketMetadata(t *testing.T) {
v := bucketMetadata{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgbucketMetadata(b *testing.B) {
v := bucketMetadata{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgbucketMetadata(b *testing.B) {
v := bucketMetadata{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalbucketMetadata(b *testing.B) {
v := bucketMetadata{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodebucketMetadata(t *testing.T) {
v := bucketMetadata{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodebucketMetadata Msgsize() is inaccurate")
}
vn := bucketMetadata{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodebucketMetadata(b *testing.B) {
v := bucketMetadata{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodebucketMetadata(b *testing.B) {
v := bucketMetadata{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -376,22 +376,20 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
func initBucketObjectLockConfig(buckets []BucketInfo, objAPI ObjectLayer) error {
for _, bucket := range buckets {
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
configFile := path.Join(bucketConfigPrefix, bucket.Name, bucketObjectLockEnabledConfigFile)
bucketObjLockData, err := readConfig(ctx, objAPI, configFile)
meta, err := loadBucketMetadata(ctx, objAPI, bucket.Name)
if err != nil {
if errors.Is(err, errConfigNotFound) {
continue
}
if err != errMetaDataConverted {
return err
}
if string(bucketObjLockData) != bucketObjectLockEnabledConfig {
// this should never happen
logger.LogIf(ctx, objectlock.ErrMalformedBucketObjectConfig)
meta.Created = bucket.Created
logger.LogIf(ctx, meta.save(ctx, objAPI))
}
if !meta.LockEnabled {
continue
}
configFile = path.Join(bucketConfigPrefix, bucket.Name, objectLockConfig)
configFile := path.Join(bucketConfigPrefix, bucket.Name, objectLockConfig)
configData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if errors.Is(err, errConfigNotFound) {