Allow metadata updates on meta bucket even in WORM mode (#8657)

This ensures that we can update the

- .minio.sys is updated for accounting/data usage purposes
- .minio.sys is updated to indicate if backend is encrypted
  or not.
This commit is contained in:
Harshavardhana 2019-12-17 10:13:12 -08:00 committed by GitHub
parent 16ac4a3c64
commit 5f2318567e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 10 additions and 82 deletions

View File

@ -68,13 +68,7 @@ func runDataUsageInfoForFS(ctx context.Context, fsObj *FSObjects, endCh <-chan s
// Save the data usage in the disk
err := storeDataUsageInBackend(ctx, fsObj, usageInfo)
if err != nil {
if globalWORMEnabled {
if _, ok := err.(ObjectAlreadyExists); !ok {
logger.LogIf(ctx, err)
}
} else {
logger.LogIf(ctx, err)
}
logger.LogIf(ctx, err)
}
select {
case <-endCh:
@ -103,13 +97,7 @@ func runDataUsageInfoForXLZones(ctx context.Context, z *xlZones, endCh <-chan st
usageInfo := z.crawlAndGetDataUsage(ctx, endCh)
err := storeDataUsageInBackend(ctx, z, usageInfo)
if err != nil {
if globalWORMEnabled {
if _, ok := err.(ObjectAlreadyExists); !ok {
logger.LogIf(ctx, err)
}
} else {
logger.LogIf(ctx, err)
}
logger.LogIf(ctx, err)
}
select {
case <-endCh:

View File

@ -1,54 +0,0 @@
/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
)
// getDiskUsage walks the file tree rooted at root, calling usageFn
// for each file or directory in the tree, including root.
func getDiskUsage(ctx context.Context, root string, usageFn usageFunc) error {
return walk(ctx, root+SlashSeparator, usageFn)
}
type usageFunc func(ctx context.Context, entry string) error
// walk recursively descends path, calling walkFn.
func walk(ctx context.Context, path string, usageFn usageFunc) error {
if err := usageFn(ctx, path); err != nil {
return err
}
if !HasSuffix(path, SlashSeparator) {
return nil
}
entries, err := readDir(path)
if err != nil {
return usageFn(ctx, path)
}
for _, entry := range entries {
fname := pathJoin(path, entry)
if err = walk(ctx, fname, usageFn); err != nil {
return err
}
}
return nil
}

View File

@ -683,7 +683,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, ok := isWORMEnabled(bucket); ok {
if _, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}

View File

@ -1022,7 +1022,7 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
// Deny if WORM is enabled
if globalWORMEnabled {
if _, ok := isWORMEnabled(bucket); ok {
if _, err := fsStatFile(ctx, fsNSObjPath); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}

View File

@ -266,7 +266,6 @@ var (
// list. Feel free to add new relevant fields.
func getGlobalInfo() (globalInfo map[string]interface{}) {
globalInfo = map[string]interface{}{
"isWorm": globalWORMEnabled,
"serverRegion": globalServerRegion,
// Add more relevant global settings here.
}

View File

@ -206,15 +206,7 @@ func initSafeMode(buckets []BucketInfo) (err error) {
// Migrate all backend configs to encrypted backend configs, optionally
// handles rotating keys for encryption.
if err = handleEncryptedConfigBackend(newObject, true); err != nil {
if globalWORMEnabled {
if _, ok := err.(ObjectAlreadyExists); !ok {
return fmt.Errorf("Unable to handle encrypted backend for config, iam and policies: %w",
err)
}
// Ignore ObjectAlreadyExists if globalWORMEnabled is true.
} else {
return fmt.Errorf("Unable to handle encrypted backend for config, iam and policies: %w", err)
}
return fmt.Errorf("Unable to handle encrypted backend for config, iam and policies: %w", err)
}
// **** WARNING ****

View File

@ -540,6 +540,9 @@ func iamPolicyName() string {
}
func isWORMEnabled(bucket string) (Retention, bool) {
if isMinioMetaBucketName(bucket) {
return Retention{}, false
}
if globalWORMEnabled {
return Retention{}, true
}

View File

@ -708,7 +708,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
if xl.isObject(bucket, object) {
// Deny if WORM is enabled
if globalWORMEnabled {
if _, ok := isWORMEnabled(bucket); ok {
if _, err := xl.getObjectInfo(ctx, bucket, object); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}

View File

@ -611,7 +611,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
if xl.isObject(bucket, object) {
// Deny if WORM is enabled
if globalWORMEnabled {
if _, ok := isWORMEnabled(bucket); ok {
if _, err := xl.getObjectInfo(ctx, bucket, object); err == nil {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}