remove unnecessary code checking for supported features (#16423)

This commit is contained in:
Harshavardhana 2023-01-17 19:37:47 +05:30 committed by GitHub
parent 3db658e51e
commit b4ef5ff294
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 298 additions and 623 deletions

View File

@ -51,11 +51,6 @@ func (api objectAPIHandlers) PutBucketEncryptionHandler(w http.ResponseWriter, r
return
}
if !objAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]

View File

@ -890,11 +890,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
bucket := mux.Vars(r)["bucket"]
// Require Content-Length to be set in the request
@ -1066,7 +1061,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
if objectAPI.IsEncryptionSupported() {
if crypto.Requested(formValues) && !HasSuffix(object, SlashSeparator) { // handle SSE requests
if crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
@ -1111,7 +1106,6 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
}
}
objInfo, err := objectAPI.PutObject(ctx, bucket, object, pReader, opts)
if err != nil {

View File

@ -50,11 +50,6 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter,
return
}
if !objAPI.IsNotificationSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetBucketNotificationAction, bucketName, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
@ -119,11 +114,6 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
return
}
if !objectAPI.IsNotificationSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
bucketName := vars["bucket"]

View File

@ -297,16 +297,9 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
return err
}
case config.CompressionSubSys:
compCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
if err != nil {
if _, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]); err != nil {
return err
}
if objAPI != nil {
if compCfg.Enabled && !objAPI.IsCompressionSupported() {
return fmt.Errorf("Backend does not support compression")
}
}
case config.HealSubSys:
if _, err := heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil {
return err
@ -548,10 +541,6 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
if err != nil {
return fmt.Errorf("Unable to setup Compression: %w", err)
}
// Validate if the object layer supports compression.
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
return fmt.Errorf("Backend does not support compression")
}
globalCompressConfigMu.Lock()
globalCompressConfig = cmpCfg
globalCompressConfigMu.Unlock()

View File

@ -1,100 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"errors"
"github.com/minio/minio/internal/sync/errgroup"
)
// list all errors that can be ignore in a bucket operation.
var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnformattedDisk)
// list all errors that can be ignored in a bucket metadata operation.
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
// markDelete creates a vol entry in .minio.sys/buckets/.deleted until site replication
// syncs the delete to peers
func (er erasureObjects) markDelete(ctx context.Context, bucket, prefix string) error {
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index
if storageDisks[index] == nil {
continue
}
g.Go(func() error {
if err := storageDisks[index].MakeVol(ctx, pathJoin(bucket, prefix)); err != nil {
if errors.Is(err, errVolumeExists) {
return nil
}
return err
}
return nil
}, index)
}
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, er.defaultWQuorum())
return toObjectErr(err, bucket)
}
// purgeDelete deletes vol entry in .minio.sys/buckets/.deleted after site replication
// syncs the delete to peers OR on a new MakeBucket call.
func (er erasureObjects) purgeDelete(ctx context.Context, bucket, prefix string) error {
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] != nil {
return storageDisks[index].DeleteVol(ctx, pathJoin(bucket, prefix), true)
}
return errDiskNotFound
}, index)
}
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, er.defaultWQuorum())
return toObjectErr(err, bucket)
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (er erasureObjects) IsNotificationSupported() bool {
return true
}
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
func (er erasureObjects) IsListenSupported() bool {
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
func (er erasureObjects) IsEncryptionSupported() bool {
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (er erasureObjects) IsCompressionSupported() bool {
return true
}
// IsTaggingSupported indicates whether erasureObjects implements tagging support.
func (er erasureObjects) IsTaggingSupported() bool {
return true
}

View File

@ -1612,30 +1612,6 @@ func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, o
return bucketInfo, nil
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (z *erasureServerPools) IsNotificationSupported() bool {
return true
}
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
func (z *erasureServerPools) IsListenSupported() bool {
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
func (z *erasureServerPools) IsEncryptionSupported() bool {
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (z *erasureServerPools) IsCompressionSupported() bool {
return true
}
func (z *erasureServerPools) IsTaggingSupported() bool {
return true
}
// DeleteBucket - deletes a bucket on all serverPools simultaneously,
// even if one of the serverPools fail to delete buckets, we proceed to
// undo a successful operation.
@ -1666,7 +1642,7 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, op
// If site replication is configured, hold on to deleted bucket state until sites sync
switch opts.SRDeleteOp {
case MarkDelete:
z.markDelete(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket))
z.s3Peer.MakeBucket(context.Background(), pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket), MakeBucketOptions{})
}
}
@ -1697,27 +1673,6 @@ func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix strin
}
}
// markDelete will create a directory of deleted bucket in .minio.sys/buckets/.deleted across all disks
// in situations where the deleted bucket needs to be held on to until all sites are in sync for
// site replication
func (z *erasureServerPools) markDelete(ctx context.Context, bucket, prefix string) {
for _, servers := range z.serverPools {
for _, set := range servers.sets {
set.markDelete(ctx, bucket, prefix)
}
}
}
// purgeDelete deletes vol entry in .minio.sys/buckets/.deleted after site replication
// syncs the delete to peers.
func (z *erasureServerPools) purgeDelete(ctx context.Context, bucket, prefix string) {
for _, servers := range z.serverPools {
for _, set := range servers.sets {
set.purgeDelete(ctx, bucket, prefix)
}
}
}
// List all buckets from one of the serverPools, we are not doing merge
// sort here just for simplification. As per design it is assumed
// that all buckets are present on all serverPools.

View File

@ -718,30 +718,6 @@ func (s *erasureSets) getHashedSet(input string) (set *erasureObjects) {
return s.sets[s.getHashedSetIndex(input)]
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (s *erasureSets) IsNotificationSupported() bool {
return s.getHashedSet("").IsNotificationSupported()
}
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
func (s *erasureSets) IsListenSupported() bool {
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
func (s *erasureSets) IsEncryptionSupported() bool {
return s.getHashedSet("").IsEncryptionSupported()
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (s *erasureSets) IsCompressionSupported() bool {
return s.getHashedSet("").IsCompressionSupported()
}
func (s *erasureSets) IsTaggingSupported() bool {
return true
}
// listDeletedBuckets lists deleted buckets from all disks.
func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBuckets map[string]VolInfo, readQuorum int) error {
g := errgroup.WithNErrs(len(storageDisks))

View File

@ -36,6 +36,12 @@ import (
"github.com/minio/pkg/console"
)
// list all errors that can be ignore in a bucket operation.
var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnformattedDisk)
// list all errors that can be ignored in a bucket metadata operation.
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
// OfflineDisk represents an unavailable disk.
var OfflineDisk StorageAPI // zero value is nil

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -41,16 +41,6 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
return
}
if !objAPI.IsNotificationSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if !objAPI.IsListenSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
bucketName := vars["bucket"]

View File

@ -239,12 +239,6 @@ type ObjectLayer interface {
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
// Supported operations check
IsNotificationSupported() bool
IsListenSupported() bool
IsEncryptionSupported() bool
IsTaggingSupported() bool
IsCompressionSupported() bool
SetDriveCounts() []int // list of erasure stripe size for each pool in order.
// Healing operations.

View File

@ -116,11 +116,6 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
@ -224,12 +219,10 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
actualSize, err := objInfo.GetActualSize()
if err != nil {
@ -288,7 +281,6 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
}
// Set encryption response headers
if objectAPI.IsEncryptionSupported() {
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
@ -307,7 +299,6 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
}
}
s3Select.Evaluate(w)
@ -328,10 +319,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
@ -408,12 +395,10 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
// Validate pre-conditions if any.
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkPreconditions(ctx, w, r, oi, opts)
}
@ -519,7 +504,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
// Set encryption response headers
if objectAPI.IsEncryptionSupported() {
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
@ -533,7 +517,6 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
}
}
if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" {
hash.AddChecksumHeader(w, objInfo.decryptChecksums())
@ -632,10 +615,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrBadRequest))
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
@ -780,12 +759,10 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
}
// Validate pre-conditions if any.
if checkPreconditions(ctx, w, r, objInfo, opts) {
@ -813,7 +790,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
}
// Set encryption response headers
if objectAPI.IsEncryptionSupported() {
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
@ -832,7 +808,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
}
}
if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" {
hash.AddChecksumHeader(w, objInfo.decryptChecksums())
@ -1016,13 +991,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
return
}
if crypto.Requested(r.Header) {
if !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
}
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject, err := unescapePath(vars["object"])
@ -1128,12 +1096,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
checkCopyPrecondFn := func(o ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&o, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkCopyObjectPreconditions(ctx, w, r, o)
}
getOpts.CheckPrecondFn = checkCopyPrecondFn
@ -1214,8 +1180,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
var compressMetadata map[string]string
// No need to compress for remote etcd calls
// Pass the decompressed stream to such calls.
isDstCompressed := objectAPI.IsCompressionSupported() &&
isCompressible(r.Header, dstObject) &&
isDstCompressed := isCompressible(r.Header, dstObject) &&
length > minCompressibleSize &&
!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) && !cpSrcDstSame && !objectEncryption
if isDstCompressed {
@ -1225,7 +1190,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
compressMetadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10)
reader = etag.NewReader(reader, nil)
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(reader, actualSize, wantEncryption)
dstOpts.IndexCB = cb
defer s2c.Close()
@ -1247,7 +1212,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Handle encryption
encMetadata := make(map[string]string)
if objectAPI.IsEncryptionSupported() {
// Encryption parameters not applicable for this object.
if _, ok := crypto.IsEncrypted(srcInfo.UserDefined); !ok && crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
@ -1376,7 +1340,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
}
}
}
srcInfo.PutObjReader = pReader
@ -1630,13 +1593,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
if crypto.Requested(r.Header) {
if !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
@ -1699,11 +1655,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
if objTags := r.Header.Get(xhttp.AmzObjectTagging); objTags != "" {
if !objectAPI.IsTaggingSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if _, err := tags.ParseObjectTags(objTags); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@ -1773,7 +1724,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
actualSize := size
var idxCb func() []byte
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > minCompressibleSize {
if isCompressible(r.Header, object) && size > minCompressibleSize {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
@ -1789,7 +1740,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
// Set compression metrics.
var s2c io.ReadCloser
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
wantEncryption := crypto.Requested(r.Header)
s2c, idxCb = newS2CompressReader(actualReader, actualSize, wantEncryption)
defer s2c.Close()
@ -1822,12 +1773,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
if !opts.MTime.IsZero() && opts.PreserveETag != "" {
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkPreconditionsPUT(ctx, w, r, oi, opts)
}
}
@ -1863,7 +1812,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() {
if crypto.Requested(r.Header) && !HasSuffix(object, SlashSeparator) { // handle SSE requests
if crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
@ -1898,7 +1846,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
opts.EncryptFn = metadataEncrypter(objectEncryptionKey)
}
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
@ -2001,13 +1948,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
return
}
if crypto.Requested(r.Header) {
if !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
@ -2145,7 +2085,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
actualSize := size
var idxCb func() []byte
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > minCompressibleSize {
if isCompressible(r.Header, object) && size > minCompressibleSize {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
@ -2156,7 +2096,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
}
// Set compression metrics.
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption)
defer s2c.Close()
idxCb = cb
@ -2212,7 +2152,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
}
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() {
if crypto.Requested(r.Header) && !HasSuffix(object, SlashSeparator) { // handle SSE requests
if crypto.SSECopy.IsRequested(r.Header) {
return errInvalidEncryptionParameters
@ -2243,7 +2182,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
if opts.IndexCB != nil {
opts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, opts.IndexCB)
}
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
@ -2261,6 +2199,7 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
}
return nil
}
var opts untarOptions
opts.ignoreDirs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreDirs), "true")
opts.ignoreErrs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreErrors), "true")
@ -2827,11 +2766,6 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h
return
}
if !objAPI.IsTaggingSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
// Allow getObjectTagging if policy action is set.
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
@ -2889,10 +2823,6 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if !objAPI.IsTaggingSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
// Tags XML will not be bigger than 1MiB in size, fail if its bigger.
tags, err := tags.ParseObjectXML(io.LimitReader(r.Body, 1<<20))
@ -2971,10 +2901,6 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if !objAPI.IsTaggingSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]

View File

@ -71,13 +71,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
if crypto.Requested(r.Header) {
if !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
@ -107,7 +100,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
encMetadata := map[string]string{}
if objectAPI.IsEncryptionSupported() {
if crypto.Requested(r.Header) {
if err = setEncryptionMetadata(r, bucket, object, encMetadata); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
@ -117,7 +109,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
// decryption if the file was actually multipart or not.
encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
}
}
// Extract metadata that needs to be saved.
metadata, err := extractMetadata(ctx, r)
@ -127,11 +118,6 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
}
if objTags := r.Header.Get(xhttp.AmzObjectTagging); objTags != "" {
if !objectAPI.IsTaggingSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if _, err := tags.ParseObjectTags(objTags); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@ -176,7 +162,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) {
if isCompressible(r.Header, object) {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
}
@ -189,12 +175,10 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
if !opts.MTime.IsZero() && opts.PreserveETag != "" {
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkPreconditionsPUT(ctx, w, r, oi, opts)
}
}
@ -245,11 +229,6 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject, err := unescapePath(vars["object"])
@ -351,12 +330,10 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
}
checkCopyPartPrecondFn := func(o ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&o, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
if checkCopyObjectPartPreconditions(ctx, w, r, o) {
return true
}
@ -462,7 +439,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
// Compress only if the compression is enabled during initial multipart.
var idxCb func() []byte
if isCompressed {
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(reader, actualPartSize, wantEncryption)
idxCb = cb
defer s2c.Close()
@ -488,7 +465,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && isEncrypted {
if isEncrypted {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
return
@ -578,13 +555,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
return
}
if crypto.Requested(r.Header) {
if !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
@ -711,7 +681,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
var idxCb func() []byte
if objectAPI.IsCompressionSupported() && isCompressed {
if isCompressed {
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
@ -723,7 +693,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
}
// Set compression metrics.
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption)
idxCb = cb
defer s2c.Close()
@ -747,7 +717,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && isEncrypted {
if isEncrypted {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
return
@ -1157,7 +1127,7 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
// Due to AWS S3, SSE-S3 encrypted parts return the plaintext ETag
// being the content MD5 of that particular part. This is not the
// case for SSE-C and SSE-KMS objects.
if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok && objectAPI.IsEncryptionSupported() {
if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok {
var objectEncryptionKey []byte
if kind == crypto.S3 {
objectEncryptionKey, err = decryptObjectMeta(nil, bucket, object, listPartsInfo.UserDefined)

View File

@ -67,10 +67,6 @@ func (api objectAPIHandlers) getObjectInArchiveFileHandler(ctx context.Context,
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
zipPath, object, err := splitZipExtensionPath(object)
if err != nil {
@ -135,12 +131,10 @@ func (api objectAPIHandlers) getObjectInArchiveFileHandler(ctx context.Context,
// Validate pre-conditions if any.
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
return checkPreconditions(ctx, w, r, oi, opts)
}
@ -381,10 +375,6 @@ func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context,
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrBadRequest))
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
zipPath, object, err := splitZipExtensionPath(object)
if err != nil {

View File

@ -4206,7 +4206,7 @@ func (c *SiteReplicationSys) purgeDeletedBucket(ctx context.Context, objAPI Obje
if !ok {
return
}
z.purgeDelete(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket))
z.s3Peer.DeleteBucket(context.Background(), pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket), DeleteBucketOptions{})
}
// healBucket creates/deletes the bucket according to latest state across clusters participating in site replication.