mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
validate storage class across pools when setting config (#11320)
``` mc admin config set alias/ storage_class standard=EC:3 ``` should only succeed if parity ratio is valid for all server pools, if not we should fail proactively. This PR also needs to bring other changes now that we need to cater for variadic drive counts per pool. Bonus fixes also various bugs reproduced with - GetObjectWithPartNumber() - CopyObjectPartWithOffsets() - CopyObjectWithMetadata() - PutObjectPart,PutObject with truncated streams
This commit is contained in:
parent
a35cbb3ff3
commit
a6c146bd00
@ -130,13 +130,14 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
dynamic, err := cfg.ReadConfig(bytes.NewReader(kvBytes))
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if err = validateConfig(cfg, objectAPI.SetDriveCount()); err != nil {
|
||||
if err = validateConfig(cfg, objectAPI.SetDriveCounts()); err != nil {
|
||||
writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
|
||||
return
|
||||
}
|
||||
@ -158,15 +159,14 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
||||
saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete)
|
||||
}
|
||||
|
||||
if dynamic {
|
||||
// Apply dynamic values.
|
||||
if err := applyDynamicConfig(GlobalContext, cfg); err != nil {
|
||||
if err := applyDynamicConfig(GlobalContext, objectAPI, cfg); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
globalNotificationSys.SignalService(serviceReloadDynamic)
|
||||
|
||||
// If all values were dynamic, tell the client.
|
||||
if dynamic {
|
||||
w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue)
|
||||
}
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
@ -282,7 +282,7 @@ func (a adminAPIHandlers) RestoreConfigHistoryKVHandler(w http.ResponseWriter, r
|
||||
return
|
||||
}
|
||||
|
||||
if err = validateConfig(cfg, objectAPI.SetDriveCount()); err != nil {
|
||||
if err = validateConfig(cfg, objectAPI.SetDriveCounts()); err != nil {
|
||||
writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
|
||||
return
|
||||
}
|
||||
@ -394,7 +394,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
|
||||
if err = validateConfig(cfg, objectAPI.SetDriveCount()); err != nil {
|
||||
if err = validateConfig(cfg, objectAPI.SetDriveCounts()); err != nil {
|
||||
writeCustomErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminConfigBadJSON), err.Error(), r.URL)
|
||||
return
|
||||
}
|
||||
|
@ -1561,9 +1561,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
|
||||
Type: madmin.ErasureType,
|
||||
OnlineDisks: onlineDisks.Sum(),
|
||||
OfflineDisks: offlineDisks.Sum(),
|
||||
StandardSCData: backendInfo.StandardSCData,
|
||||
StandardSCParity: backendInfo.StandardSCParity,
|
||||
RRSCData: backendInfo.RRSCData,
|
||||
RRSCParity: backendInfo.RRSCParity,
|
||||
}
|
||||
} else {
|
||||
|
@ -87,6 +87,7 @@ const (
|
||||
ErrInvalidMaxUploads
|
||||
ErrInvalidMaxParts
|
||||
ErrInvalidPartNumberMarker
|
||||
ErrInvalidPartNumber
|
||||
ErrInvalidRequestBody
|
||||
ErrInvalidCopySource
|
||||
ErrInvalidMetadataDirective
|
||||
@ -437,6 +438,11 @@ var errorCodes = errorCodeMap{
|
||||
Description: "Argument partNumberMarker must be an integer.",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrInvalidPartNumber: {
|
||||
Code: "InvalidPartNumber",
|
||||
Description: "The requested partnumber is not satisfiable",
|
||||
HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable,
|
||||
},
|
||||
ErrInvalidPolicyDocument: {
|
||||
Code: "InvalidPolicyDocument",
|
||||
Description: "The content of the form does not meet the conditions specified in the policy document.",
|
||||
|
@ -156,16 +156,16 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
|
||||
return err
|
||||
}
|
||||
|
||||
if opts.PartNumber > 0 {
|
||||
rs = partNumberToRangeSpec(objInfo, opts.PartNumber)
|
||||
}
|
||||
|
||||
// For providing ranged content
|
||||
start, rangeLen, err = rs.GetOffsetLength(totalObjectSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rs == nil && opts.PartNumber > 0 {
|
||||
rs = partNumberToRangeSpec(objInfo, opts.PartNumber)
|
||||
}
|
||||
|
||||
// Set content length.
|
||||
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
|
||||
if rs != nil {
|
||||
|
@ -228,7 +228,7 @@ var (
|
||||
globalServerConfigMu sync.RWMutex
|
||||
)
|
||||
|
||||
func validateConfig(s config.Config, setDriveCount int) error {
|
||||
func validateConfig(s config.Config, setDriveCounts []int) error {
|
||||
// We must have a global lock for this so nobody else modifies env while we do.
|
||||
defer env.LockSetEnv()()
|
||||
|
||||
@ -251,10 +251,12 @@ func validateConfig(s config.Config, setDriveCount int) error {
|
||||
}
|
||||
|
||||
if globalIsErasure {
|
||||
for _, setDriveCount := range setDriveCounts {
|
||||
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default]); err != nil {
|
||||
return err
|
||||
@ -342,7 +344,7 @@ func validateConfig(s config.Config, setDriveCount int) error {
|
||||
return notify.TestNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs())
|
||||
}
|
||||
|
||||
func lookupConfigs(s config.Config, minSetDriveCount int) {
|
||||
func lookupConfigs(s config.Config, setDriveCounts []int) {
|
||||
ctx := GlobalContext
|
||||
|
||||
var err error
|
||||
@ -429,7 +431,7 @@ func lookupConfigs(s config.Config, minSetDriveCount int) {
|
||||
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
||||
}
|
||||
|
||||
globalAPIConfig.init(apiConfig, minSetDriveCount)
|
||||
globalAPIConfig.init(apiConfig, setDriveCounts)
|
||||
|
||||
// Initialize remote instance transport once.
|
||||
getRemoteInstanceTransportOnce.Do(func() {
|
||||
@ -437,9 +439,17 @@ func lookupConfigs(s config.Config, minSetDriveCount int) {
|
||||
})
|
||||
|
||||
if globalIsErasure {
|
||||
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], minSetDriveCount)
|
||||
for i, setDriveCount := range setDriveCounts {
|
||||
sc, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
||||
break
|
||||
}
|
||||
// if we validated all setDriveCounts and it was successful
|
||||
// proceed to store the correct storage class globally.
|
||||
if i == len(setDriveCounts)-1 {
|
||||
globalStorageClass = sc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -553,12 +563,16 @@ func lookupConfigs(s config.Config, minSetDriveCount int) {
|
||||
}
|
||||
|
||||
// Apply dynamic config values
|
||||
logger.LogIf(ctx, applyDynamicConfig(ctx, s))
|
||||
logger.LogIf(ctx, applyDynamicConfig(ctx, newObjectLayerFn(), s))
|
||||
}
|
||||
|
||||
// applyDynamicConfig will apply dynamic config values.
|
||||
// Dynamic systems should be in config.SubSystemsDynamic as well.
|
||||
func applyDynamicConfig(ctx context.Context, s config.Config) error {
|
||||
func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config) error {
|
||||
if objAPI == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read all dynamic configs.
|
||||
// API
|
||||
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
|
||||
@ -571,17 +585,16 @@ func applyDynamicConfig(ctx context.Context, s config.Config) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to setup Compression: %w", err)
|
||||
}
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI != nil {
|
||||
|
||||
// Validate if the object layer supports compression.
|
||||
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
|
||||
return fmt.Errorf("Backend does not support compression")
|
||||
}
|
||||
}
|
||||
|
||||
// Heal
|
||||
healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default])
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to apply heal config: %w", err))
|
||||
return fmt.Errorf("Unable to apply heal config: %w", err)
|
||||
}
|
||||
|
||||
// Crawler
|
||||
@ -592,7 +605,7 @@ func applyDynamicConfig(ctx context.Context, s config.Config) error {
|
||||
|
||||
// Apply configurations.
|
||||
// We should not fail after this.
|
||||
globalAPIConfig.init(apiConfig, globalAPIConfig.setDriveCount)
|
||||
globalAPIConfig.init(apiConfig, objAPI.SetDriveCounts())
|
||||
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressConfig = cmpCfg
|
||||
@ -723,7 +736,7 @@ func loadConfig(objAPI ObjectLayer) error {
|
||||
}
|
||||
|
||||
// Override any values from ENVs.
|
||||
lookupConfigs(srvCfg, objAPI.SetDriveCount())
|
||||
lookupConfigs(srvCfg, objAPI.SetDriveCounts())
|
||||
|
||||
// hold the mutex lock before a new config is assigned.
|
||||
globalServerConfigMu.Lock()
|
||||
|
@ -116,9 +116,9 @@ func NewKes(cfg KesConfig) (KMS, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cfg.Transport.TLSClientConfig != nil {
|
||||
if err = loadCACertificates(cfg.CAPath,
|
||||
cfg.Transport.TLSClientConfig.RootCAs); err != nil {
|
||||
|
||||
if cfg.Transport.TLSClientConfig != nil && cfg.Transport.TLSClientConfig.RootCAs != nil {
|
||||
if err = loadCACertificates(cfg.CAPath, cfg.Transport.TLSClientConfig.RootCAs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
@ -132,9 +132,13 @@ func NewKes(cfg KesConfig) (KMS, error) {
|
||||
if err = loadCACertificates(cfg.CAPath, rootCAs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cfg.Transport.TLSClientConfig == nil {
|
||||
cfg.Transport.TLSClientConfig = &tls.Config{
|
||||
RootCAs: rootCAs,
|
||||
}
|
||||
} else {
|
||||
cfg.Transport.TLSClientConfig.RootCAs = rootCAs
|
||||
}
|
||||
}
|
||||
cfg.Transport.TLSClientConfig.Certificates = []tls.Certificate{cert}
|
||||
cfg.Transport.TLSClientConfig.NextProtos = []string{"h2"}
|
||||
|
@ -167,8 +167,12 @@ func (z *erasureServerPools) GetAllLockers() []dsync.NetLocker {
|
||||
return z.serverPools[0].GetAllLockers()
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) SetDriveCount() int {
|
||||
return z.serverPools[0].SetDriveCount()
|
||||
func (z *erasureServerPools) SetDriveCounts() []int {
|
||||
setDriveCounts := make([]int, len(z.serverPools))
|
||||
for i := range z.serverPools {
|
||||
setDriveCounts[i] = z.serverPools[i].SetDriveCount()
|
||||
}
|
||||
return setDriveCounts
|
||||
}
|
||||
|
||||
type serverPoolsAvailableSpace []poolAvailableSpace
|
||||
@ -320,16 +324,19 @@ func (z *erasureServerPools) Shutdown(ctx context.Context) error {
|
||||
func (z *erasureServerPools) BackendInfo() (b BackendInfo) {
|
||||
b.Type = BackendErasure
|
||||
|
||||
setDriveCount := z.SetDriveCount()
|
||||
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
if scParity <= 0 {
|
||||
scParity = z.serverPools[0].defaultParityCount
|
||||
}
|
||||
b.StandardSCData = setDriveCount - scParity
|
||||
b.StandardSCParity = scParity
|
||||
|
||||
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
|
||||
b.RRSCData = setDriveCount - rrSCParity
|
||||
|
||||
// Data blocks can vary per pool, but parity is same.
|
||||
for _, setDriveCount := range z.SetDriveCounts() {
|
||||
b.StandardSCData = append(b.StandardSCData, setDriveCount-scParity)
|
||||
b.RRSCData = append(b.RRSCData, setDriveCount-rrSCParity)
|
||||
}
|
||||
|
||||
b.StandardSCParity = scParity
|
||||
b.RRSCParity = rrSCParity
|
||||
return
|
||||
}
|
||||
@ -1360,7 +1367,7 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
|
||||
// knows that its not our first attempt at 'prefix'
|
||||
err = nil
|
||||
|
||||
if quorumCount == z.SetDriveCount() && opts.ScanMode == madmin.HealNormalScan {
|
||||
if quorumCount == set.setDriveCount && opts.ScanMode == madmin.HealNormalScan {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1482,8 +1489,8 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("maintenance", strconv.FormatBool(opts.Maintenance))
|
||||
|
||||
b := z.BackendInfo()
|
||||
writeQuorum := b.StandardSCData
|
||||
if b.StandardSCData == b.StandardSCParity {
|
||||
writeQuorum := b.StandardSCData[0]
|
||||
if writeQuorum == b.StandardSCParity {
|
||||
writeQuorum++
|
||||
}
|
||||
|
||||
|
15
cmd/fs-v1.go
15
cmd/fs-v1.go
@ -187,9 +187,9 @@ func (fs *FSObjects) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
return fs.nsMutex.NewNSLock(nil, bucket, objects...)
|
||||
}
|
||||
|
||||
// SetDriveCount no-op
|
||||
func (fs *FSObjects) SetDriveCount() int {
|
||||
return 0
|
||||
// SetDriveCounts no-op
|
||||
func (fs *FSObjects) SetDriveCounts() []int {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown - should be called when process shuts down.
|
||||
@ -735,9 +735,9 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
||||
rwPoolUnlocker = func() { fs.rwPool.Close(fsMetaPath) }
|
||||
}
|
||||
|
||||
objReaderFn, off, length, rErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker, rwPoolUnlocker)
|
||||
if rErr != nil {
|
||||
return nil, rErr
|
||||
objReaderFn, off, length, err := NewGetObjectReader(rs, objInfo, opts, nsUnlocker, rwPoolUnlocker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read the object, doesn't exist returns an s3 compatible error.
|
||||
@ -748,10 +748,11 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
||||
nsUnlocker()
|
||||
return nil, toObjectErr(err, bucket, object)
|
||||
}
|
||||
reader := io.LimitReader(readCloser, length)
|
||||
|
||||
closeFn := func() {
|
||||
readCloser.Close()
|
||||
}
|
||||
reader := io.LimitReader(readCloser, length)
|
||||
|
||||
// Check if range is valid
|
||||
if off > size || off+length > size {
|
||||
|
@ -233,7 +233,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||
srvCfg := newServerConfig()
|
||||
|
||||
// Override any values from ENVs.
|
||||
lookupConfigs(srvCfg, 0)
|
||||
lookupConfigs(srvCfg, nil)
|
||||
|
||||
// hold the mutex lock before a new config is assigned.
|
||||
globalServerConfigMu.Lock()
|
||||
|
@ -51,9 +51,9 @@ func (a GatewayUnsupported) NewNSLock(bucket string, objects ...string) RWLocker
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetDriveCount no-op
|
||||
func (a GatewayUnsupported) SetDriveCount() int {
|
||||
return 0
|
||||
// SetDriveCounts no-op
|
||||
func (a GatewayUnsupported) SetDriveCounts() []int {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads lists all multipart uploads.
|
||||
|
@ -35,16 +35,19 @@ type apiConfig struct {
|
||||
listQuorum int
|
||||
extendListLife time.Duration
|
||||
corsAllowOrigins []string
|
||||
setDriveCount int
|
||||
// total drives per erasure set across pools.
|
||||
totalDriveCount int
|
||||
}
|
||||
|
||||
func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
|
||||
func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
t.clusterDeadline = cfg.ClusterDeadline
|
||||
t.corsAllowOrigins = cfg.CorsAllowOrigin
|
||||
t.setDriveCount = setDriveCount
|
||||
for _, setDriveCount := range setDriveCounts {
|
||||
t.totalDriveCount += setDriveCount
|
||||
}
|
||||
|
||||
var apiRequestsMaxPerNode int
|
||||
if cfg.RequestsMax <= 0 {
|
||||
@ -56,8 +59,8 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
|
||||
}
|
||||
// max requests per node is calculated as
|
||||
// total_ram / ram_per_request
|
||||
// ram_per_request is 4MiB * setDriveCount + 2 * 10MiB (default erasure block size)
|
||||
apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(setDriveCount*(blockSizeLarge+blockSizeSmall)+blockSizeV1*2))
|
||||
// ram_per_request is 1MiB * driveCount + 2 * 10MiB (default erasure block size)
|
||||
apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(t.totalDriveCount*(blockSizeLarge+blockSizeSmall)+blockSizeV1*2))
|
||||
} else {
|
||||
apiRequestsMaxPerNode = cfg.RequestsMax
|
||||
if len(globalEndpoints.Hostnames()) > 0 {
|
||||
@ -84,13 +87,6 @@ func (t *apiConfig) getListQuorum() int {
|
||||
return t.listQuorum
|
||||
}
|
||||
|
||||
func (t *apiConfig) getSetDriveCount() int {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
return t.setDriveCount
|
||||
}
|
||||
|
||||
func (t *apiConfig) getExtendListLife() time.Duration {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
@ -540,10 +540,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
}
|
||||
}
|
||||
|
||||
func (er erasureObjects) SetDriveCount() int {
|
||||
return er.setDriveCount
|
||||
}
|
||||
|
||||
// Will return io.EOF if continuing would not yield more results.
|
||||
func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
||||
o.debugf(color.Green("listPath:")+" with options: %#v", o)
|
||||
@ -598,9 +594,9 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
|
||||
askDisks := o.AskDisks
|
||||
listingQuorum := askDisks - 1
|
||||
// Special case: ask all disks if the drive count is 4
|
||||
if askDisks == -1 || er.SetDriveCount() == 4 {
|
||||
if askDisks == -1 || er.setDriveCount == 4 {
|
||||
askDisks = len(disks) // with 'strict' quorum list on all online disks.
|
||||
listingQuorum = getReadQuorum(er.SetDriveCount())
|
||||
listingQuorum = getReadQuorum(er.setDriveCount)
|
||||
}
|
||||
|
||||
if len(disks) < askDisks {
|
||||
|
@ -41,9 +41,7 @@ func TestGetSource(t *testing.T) {
|
||||
|
||||
// Test lock race
|
||||
func TestNSLockRace(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
t.Skip("long test skip it")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -31,9 +31,16 @@ func GetTotalCapacity(ctx context.Context) (capacity uint64) {
|
||||
}
|
||||
|
||||
// GetTotalUsableCapacity gets the total usable capacity in the cluster.
|
||||
// This value is not an accurate representation of total usable in a multi-tenant deployment.
|
||||
func GetTotalUsableCapacity(ctx context.Context, s StorageInfo) (capacity float64) {
|
||||
raw := GetTotalCapacity(ctx)
|
||||
ratio := float64(s.Backend.StandardSCData) / float64(s.Backend.StandardSCData+s.Backend.StandardSCParity)
|
||||
var approxDataBlocks float64
|
||||
var actualDisks float64
|
||||
for _, scData := range s.Backend.StandardSCData {
|
||||
approxDataBlocks += float64(scData)
|
||||
actualDisks += float64(scData + s.Backend.StandardSCParity)
|
||||
}
|
||||
ratio := approxDataBlocks / actualDisks
|
||||
return float64(raw) * ratio
|
||||
}
|
||||
|
||||
@ -47,8 +54,15 @@ func GetTotalCapacityFree(ctx context.Context) (capacity uint64) {
|
||||
}
|
||||
|
||||
// GetTotalUsableCapacityFree gets the total usable capacity free in the cluster.
|
||||
// This value is not an accurate representation of total free in a multi-tenant deployment.
|
||||
func GetTotalUsableCapacityFree(ctx context.Context, s StorageInfo) (capacity float64) {
|
||||
raw := GetTotalCapacityFree(ctx)
|
||||
ratio := float64(s.Backend.StandardSCData) / float64(s.Backend.StandardSCData+s.Backend.StandardSCParity)
|
||||
var approxDataBlocks float64
|
||||
var actualDisks float64
|
||||
for _, scData := range s.Backend.StandardSCData {
|
||||
approxDataBlocks += float64(scData)
|
||||
actualDisks += float64(scData + s.Backend.StandardSCParity)
|
||||
}
|
||||
ratio := approxDataBlocks / actualDisks
|
||||
return float64(raw) * ratio
|
||||
}
|
||||
|
@ -51,9 +51,9 @@ type BackendInfo struct {
|
||||
GatewayOnline bool
|
||||
|
||||
// Following fields are only meaningful if BackendType is Erasure.
|
||||
StandardSCData int // Data disks for currently configured Standard storage class.
|
||||
StandardSCData []int // Data disks for currently configured Standard storage class.
|
||||
StandardSCParity int // Parity disks for currently configured Standard storage class.
|
||||
RRSCData int // Data disks for currently configured Reduced Redundancy storage class.
|
||||
RRSCData []int // Data disks for currently configured Reduced Redundancy storage class.
|
||||
RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class.
|
||||
}
|
||||
|
||||
|
@ -81,8 +81,6 @@ type BackendMetrics struct {
|
||||
|
||||
// ObjectLayer implements primitives for object API layer.
|
||||
type ObjectLayer interface {
|
||||
SetDriveCount() int // Only implemented by erasure layer
|
||||
|
||||
// Locking operations on object.
|
||||
NewNSLock(bucket string, objects ...string) RWLocker
|
||||
|
||||
@ -131,12 +129,6 @@ type ObjectLayer interface {
|
||||
AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error
|
||||
CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
|
||||
// Healing operations.
|
||||
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
|
||||
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn HealObjectFn) error
|
||||
|
||||
// Policy operations
|
||||
SetBucketPolicy(context.Context, string, *policy.Policy) error
|
||||
GetBucketPolicy(context.Context, string) (*policy.Policy, error)
|
||||
@ -149,6 +141,14 @@ type ObjectLayer interface {
|
||||
IsTaggingSupported() bool
|
||||
IsCompressionSupported() bool
|
||||
|
||||
SetDriveCounts() []int // list of erasure stripe size for each pool in order.
|
||||
|
||||
// Healing operations.
|
||||
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
|
||||
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn HealObjectFn) error
|
||||
|
||||
// Backend related metrics
|
||||
GetMetrics(ctx context.Context) (*BackendMetrics, error)
|
||||
|
||||
|
@ -605,15 +605,24 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
// if object is encrypted, transition content without decrypting.
|
||||
if opts.TransitionStatus == lifecycle.TransitionPending && isEncrypted {
|
||||
if opts.TransitionStatus == lifecycle.TransitionPending && (isEncrypted || isCompressed) {
|
||||
isEncrypted = false
|
||||
isCompressed = false
|
||||
}
|
||||
var firstPart = opts.PartNumber
|
||||
var skipLen int64
|
||||
|
||||
// Calculate range to read (different for encrypted/compressed objects)
|
||||
switch {
|
||||
case isCompressed:
|
||||
var firstPart int
|
||||
if opts.PartNumber > 0 {
|
||||
// firstPart is an index to Parts slice,
|
||||
// make sure that PartNumber uses the
|
||||
// index value properly.
|
||||
firstPart = opts.PartNumber - 1
|
||||
}
|
||||
|
||||
// If compressed, we start from the beginning of the part.
|
||||
// Read the decompressed size from the meta.json.
|
||||
actualSize, err := oi.GetActualSize()
|
||||
@ -631,7 +640,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
||||
off, decOff, firstPart = getCompressedOffsets(oi, off)
|
||||
decLength = length
|
||||
length = oi.Size - off
|
||||
|
||||
// For negative length we read everything.
|
||||
if decLength < 0 {
|
||||
decLength = actualSize - decOff
|
||||
@ -663,19 +671,19 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
||||
return nil, err
|
||||
}
|
||||
oi.Size = decLength
|
||||
|
||||
}
|
||||
// Decompression reader.
|
||||
s2Reader := s2.NewReader(inputReader)
|
||||
// Apply the skipLen and limit on the decompressed stream.
|
||||
err = s2Reader.Skip(decOff)
|
||||
if err != nil {
|
||||
if decOff > 0 {
|
||||
if err = s2Reader.Skip(decOff); err != nil {
|
||||
// Call the cleanup funcs
|
||||
for i := len(cFns) - 1; i >= 0; i-- {
|
||||
cFns[i]()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
decReader := io.LimitReader(s2Reader, decLength)
|
||||
if decLength > compReadAheadSize {
|
||||
@ -702,6 +710,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
||||
case isEncrypted:
|
||||
var seqNumber uint32
|
||||
var partStart int
|
||||
var skipLen int64
|
||||
|
||||
off, length, skipLen, seqNumber, partStart, err = oi.GetDecryptedRange(rs)
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
@ -899,20 +909,30 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin
|
||||
|
||||
// newS2CompressReader will read data from r, compress it and return the compressed data as a Reader.
|
||||
// Use Close to ensure resources are released on incomplete streams.
|
||||
func newS2CompressReader(r io.Reader) io.ReadCloser {
|
||||
//
|
||||
// input 'on' is always recommended such that this function works
|
||||
// properly, because we do not wish to create an object even if
|
||||
// client closed the stream prematurely.
|
||||
func newS2CompressReader(r io.Reader, on int64) io.ReadCloser {
|
||||
pr, pw := io.Pipe()
|
||||
comp := s2.NewWriter(pw)
|
||||
// Copy input to compressor
|
||||
go func() {
|
||||
_, err := io.Copy(comp, r)
|
||||
cn, err := io.Copy(comp, r)
|
||||
if err != nil {
|
||||
comp.Close()
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
if on > 0 && on != cn {
|
||||
// if client didn't sent all data
|
||||
// from the client verify here.
|
||||
comp.Close()
|
||||
pw.CloseWithError(IncompleteBody{})
|
||||
return
|
||||
}
|
||||
// Close the stream.
|
||||
err = comp.Close()
|
||||
if err != nil {
|
||||
if err = comp.Close(); err != nil {
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
|
@ -605,7 +605,7 @@ func TestS2CompressReader(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
buf := make([]byte, 100) // make small buffer to ensure multiple reads are required for large case
|
||||
|
||||
r := newS2CompressReader(bytes.NewReader(tt.data))
|
||||
r := newS2CompressReader(bytes.NewReader(tt.data), int64(len(tt.data)))
|
||||
defer r.Close()
|
||||
|
||||
var rdrBuf bytes.Buffer
|
||||
|
@ -162,8 +162,8 @@ func checkPreconditions(ctx context.Context, w http.ResponseWriter, r *http.Requ
|
||||
|
||||
// Check if the part number is correct.
|
||||
if opts.PartNumber > 1 && opts.PartNumber > len(objInfo.Parts) {
|
||||
writeHeaders()
|
||||
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
|
||||
// According to S3 we don't need to set any object information here.
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartNumber), r.URL, guessIsBrowserReq(r))
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -408,11 +408,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
|
||||
if checkPreconditions(ctx, w, r, oi, opts) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
return checkPreconditions(ctx, w, r, oi, opts)
|
||||
}
|
||||
|
||||
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
|
||||
@ -997,8 +993,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
// No need to compress for remote etcd calls
|
||||
// Pass the decompressed stream to such calls.
|
||||
isDstCompressed := objectAPI.IsCompressionSupported() &&
|
||||
isCompressible(r.Header, srcObject) &&
|
||||
!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI)
|
||||
isCompressible(r.Header, dstObject) &&
|
||||
!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) && !cpSrcDstSame
|
||||
if isDstCompressed {
|
||||
compressMetadata = make(map[string]string, 2)
|
||||
// Preserving the compression metadata.
|
||||
@ -1008,14 +1004,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
// avoid copying them in target object.
|
||||
crypto.RemoveInternalEntries(srcInfo.UserDefined)
|
||||
|
||||
s2c := newS2CompressReader(gr)
|
||||
s2c := newS2CompressReader(gr, actualSize)
|
||||
defer s2c.Close()
|
||||
reader = s2c
|
||||
length = -1
|
||||
} else {
|
||||
// Remove the metadata for remote calls.
|
||||
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
|
||||
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
|
||||
reader = gr
|
||||
}
|
||||
|
||||
@ -1215,7 +1208,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
// if encryption is enabled we do not need explicit "REPLACE" metadata to
|
||||
// be enabled as well - this is to allow for key-rotation.
|
||||
if !isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) && !isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) &&
|
||||
srcInfo.metadataOnly && !crypto.IsEncrypted(srcInfo.UserDefined) && srcOpts.VersionID == "" && !objectEncryption {
|
||||
srcInfo.metadataOnly && srcOpts.VersionID == "" && !objectEncryption {
|
||||
// If x-amz-metadata-directive is not set to REPLACE then we need
|
||||
// to error out if source and destination are same.
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopyDest), r.URL, guessIsBrowserReq(r))
|
||||
@ -1243,6 +1236,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
// Remove the metadata for remote calls.
|
||||
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
|
||||
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
|
||||
opts := miniogo.PutObjectOptions{
|
||||
UserMetadata: srcInfo.UserDefined,
|
||||
ServerSideEncryption: dstOpts.ServerSideEncryption,
|
||||
@ -1470,7 +1466,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
|
||||
// Set compression metrics.
|
||||
s2c := newS2CompressReader(actualReader)
|
||||
s2c := newS2CompressReader(actualReader, actualSize)
|
||||
defer s2c.Close()
|
||||
reader = s2c
|
||||
size = -1 // Since compressed size is un-predictable.
|
||||
@ -1971,7 +1967,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
|
||||
// Compress only if the compression is enabled during initial multipart.
|
||||
if isCompressed {
|
||||
s2c := newS2CompressReader(gr)
|
||||
s2c := newS2CompressReader(gr, actualPartSize)
|
||||
defer s2c.Close()
|
||||
reader = s2c
|
||||
length = -1
|
||||
@ -2218,7 +2214,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
// Set compression metrics.
|
||||
s2c := newS2CompressReader(actualReader)
|
||||
s2c := newS2CompressReader(actualReader, actualSize)
|
||||
defer s2c.Close()
|
||||
reader = s2c
|
||||
size = -1 // Since compressed size is un-predictable.
|
||||
|
@ -944,7 +944,12 @@ func testAPIGetObjectWithPartNumberHandler(obj ObjectLayer, instanceType, bucket
|
||||
}
|
||||
|
||||
rs := partNumberToRangeSpec(oinfo, partNumber)
|
||||
off, length, err := rs.GetOffsetLength(oinfo.Size)
|
||||
size, err := oinfo.GetActualSize()
|
||||
if err != nil {
|
||||
t.Fatalf("Object: %s Object Index %d: Unexpected err: %v", object, oindex, err)
|
||||
}
|
||||
|
||||
off, length, err := rs.GetOffsetLength(size)
|
||||
if err != nil {
|
||||
t.Fatalf("Object: %s Object Index %d: Unexpected err: %v", object, oindex, err)
|
||||
}
|
||||
@ -955,6 +960,7 @@ func testAPIGetObjectWithPartNumberHandler(obj ObjectLayer, instanceType, bucket
|
||||
readers = append(readers, NewDummyDataGen(p, cumulativeSum))
|
||||
cumulativeSum += p
|
||||
}
|
||||
|
||||
refReader := io.LimitReader(ioutilx.NewSkipReader(io.MultiReader(readers...), off), length)
|
||||
if ok, msg := cmpReaders(refReader, rec.Body); !ok {
|
||||
t.Fatalf("(%s) Object: %s ObjectIndex %d PartNumber: %d --> data mismatch! (msg: %s)", instanceType, oi.objectName, oindex, partNumber, msg)
|
||||
@ -1247,8 +1253,8 @@ func testAPIPutObjectStreamSigV4Handler(obj ObjectLayer, instanceType, bucketNam
|
||||
apiRouter.ServeHTTP(rec, req)
|
||||
// Assert the response code with the expected status.
|
||||
if rec.Code != testCase.expectedRespStatus {
|
||||
t.Errorf("Test %d %s: Expected the response status to be `%d`, but instead found `%d`",
|
||||
i+1, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
t.Errorf("Test %d %s: Expected the response status to be `%d`, but instead found `%d`: fault case %d",
|
||||
i+1, instanceType, testCase.expectedRespStatus, rec.Code, testCase.fault)
|
||||
}
|
||||
// read the response body.
|
||||
actualContent, err := ioutil.ReadAll(rec.Body)
|
||||
@ -1274,14 +1280,18 @@ func testAPIPutObjectStreamSigV4Handler(obj ObjectLayer, instanceType, bucketNam
|
||||
t.Fatalf("Test %d: %s: ContentEncoding is set to \"%s\" which is unexpected, expected \"%s\"", i+1, instanceType, objInfo.ContentEncoding, expectedContentEncoding)
|
||||
}
|
||||
buffer := new(bytes.Buffer)
|
||||
err = obj.GetObject(context.Background(), testCase.bucketName, testCase.objectName, 0, int64(testCase.dataLen), buffer, objInfo.ETag, opts)
|
||||
r, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, readLock, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
if _, err = io.Copy(buffer, r); err != nil {
|
||||
r.Close()
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
r.Close()
|
||||
if !bytes.Equal(testCase.data, buffer.Bytes()) {
|
||||
t.Errorf("Test %d: %s: Data Mismatch: Data fetched back from the uploaded object doesn't match the original one.", i+1, instanceType)
|
||||
}
|
||||
buffer.Reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1445,12 +1455,16 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
|
||||
}
|
||||
if testCase.expectedRespStatus == http.StatusOK {
|
||||
buffer := new(bytes.Buffer)
|
||||
|
||||
// Fetch the object to check whether the content is same as the one uploaded via PutObject.
|
||||
err = obj.GetObject(context.Background(), testCase.bucketName, testCase.objectName, 0, int64(len(bytesData)), buffer, "", opts)
|
||||
gr, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, readLock, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
if _, err = io.Copy(buffer, gr); err != nil {
|
||||
gr.Close()
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
gr.Close()
|
||||
if !bytes.Equal(bytesData, buffer.Bytes()) {
|
||||
t.Errorf("Test %d: %s: Data Mismatch: Data fetched back from the uploaded object doesn't match the original one.", i+1, instanceType)
|
||||
}
|
||||
@ -1490,10 +1504,15 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
|
||||
if testCase.expectedRespStatus == http.StatusOK {
|
||||
buffer := new(bytes.Buffer)
|
||||
// Fetch the object to check whether the content is same as the one uploaded via PutObject.
|
||||
err = obj.GetObject(context.Background(), testCase.bucketName, testCase.objectName, 0, int64(len(bytesData)), buffer, "", opts)
|
||||
gr, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, readLock, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
if _, err = io.Copy(buffer, gr); err != nil {
|
||||
gr.Close()
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
gr.Close()
|
||||
if !bytes.Equal(bytesData, buffer.Bytes()) {
|
||||
t.Errorf("Test %d: %s: Data Mismatch: Data fetched back from the uploaded object doesn't match the original one.", i+1, instanceType)
|
||||
}
|
||||
@ -1638,9 +1657,15 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err = obj.GetObject(context.Background(), bucketName, testObject, 0, int64(len(bytesData[0].byteData)), &buf, "", opts); err != nil {
|
||||
r, err := obj.GetObjectNInfo(context.Background(), bucketName, testObject, nil, nil, readLock, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Test: %s reading completed file failed: <ERROR> %v", instanceType, err)
|
||||
}
|
||||
if _, err = io.Copy(&buf, r); err != nil {
|
||||
r.Close()
|
||||
t.Fatalf("Test %s: Failed to fetch the copied object: <ERROR> %s", instanceType, err)
|
||||
}
|
||||
r.Close()
|
||||
if !bytes.Equal(buf.Bytes(), bytesData[0].byteData) {
|
||||
t.Fatalf("Test: %s returned data is not expected corruption detected:", instanceType)
|
||||
}
|
||||
@ -2035,6 +2060,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
copySourceHeader string // data for "X-Amz-Copy-Source" header. Contains the object to be copied in the URL.
|
||||
copyModifiedHeader string // data for "X-Amz-Copy-Source-If-Modified-Since" header
|
||||
copyUnmodifiedHeader string // data for "X-Amz-Copy-Source-If-Unmodified-Since" header
|
||||
copySourceSame bool
|
||||
metadataGarbage bool
|
||||
metadataReplace bool
|
||||
metadataCopy bool
|
||||
@ -2079,6 +2105,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
newObjectName: objectName,
|
||||
copySourceHeader: url.QueryEscape(SlashSeparator + bucketName + SlashSeparator + objectName),
|
||||
accessKey: credentials.AccessKey,
|
||||
copySourceSame: true,
|
||||
secretKey: credentials.SecretKey,
|
||||
|
||||
expectedRespStatus: http.StatusBadRequest,
|
||||
@ -2090,6 +2117,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
4: {
|
||||
bucketName: bucketName,
|
||||
newObjectName: objectName,
|
||||
copySourceSame: true,
|
||||
copySourceHeader: url.QueryEscape(bucketName + SlashSeparator + objectName),
|
||||
accessKey: credentials.AccessKey,
|
||||
secretKey: credentials.SecretKey,
|
||||
@ -2140,6 +2168,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
metadata: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
copySourceSame: true,
|
||||
metadataCopy: true,
|
||||
accessKey: credentials.AccessKey,
|
||||
secretKey: credentials.SecretKey,
|
||||
@ -2306,9 +2335,17 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
apiRouter.ServeHTTP(rec, req)
|
||||
// Assert the response code with the expected status.
|
||||
if rec.Code != testCase.expectedRespStatus {
|
||||
if testCase.copySourceSame {
|
||||
// encryption will rotate creds, so fail only for non-encryption scenario.
|
||||
if GlobalKMS == nil {
|
||||
t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if rec.Code == http.StatusOK {
|
||||
var cpObjResp CopyObjectResponse
|
||||
if err = xml.Unmarshal(rec.Body.Bytes(), &cpObjResp); err != nil {
|
||||
@ -2319,22 +2356,19 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
// testing whether the copy was successful.
|
||||
// Note that this goes directly to the file system,
|
||||
// so encryption/compression may interfere at some point.
|
||||
|
||||
globalCompressConfigMu.Lock()
|
||||
cfg := globalCompressConfig
|
||||
globalCompressConfigMu.Unlock()
|
||||
if !cfg.Enabled {
|
||||
err = obj.GetObject(context.Background(), testCase.bucketName, testCase.newObjectName, 0, int64(len(bytesData[0].byteData)), buffers[0], "", opts)
|
||||
buffers[0].Reset()
|
||||
r, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.newObjectName, nil, nil, readLock, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i, instanceType, err)
|
||||
t.Fatalf("Test %d: %s reading completed file failed: <ERROR> %v", i, instanceType, err)
|
||||
}
|
||||
if _, err = io.Copy(buffers[0], r); err != nil {
|
||||
r.Close()
|
||||
t.Fatalf("Test %d %s: Failed to fetch the copied object: <ERROR> %s", i, instanceType, err)
|
||||
}
|
||||
r.Close()
|
||||
if !bytes.Equal(bytesData[0].byteData, buffers[0].Bytes()) {
|
||||
t.Errorf("Test %d: %s: Data Mismatch: Data fetched back from the copied object doesn't match the original one.", i, instanceType)
|
||||
}
|
||||
buffers[0].Reset()
|
||||
} else {
|
||||
t.Log("object not validated due to compression")
|
||||
}
|
||||
}
|
||||
|
||||
// Verify response of the V2 signed HTTP request.
|
||||
@ -2379,7 +2413,14 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
// Call the ServeHTTP to execute the handler.
|
||||
apiRouter.ServeHTTP(recV2, reqV2)
|
||||
if recV2.Code != testCase.expectedRespStatus {
|
||||
t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i, instanceType, testCase.expectedRespStatus, recV2.Code)
|
||||
if testCase.copySourceSame {
|
||||
// encryption will rotate creds, so fail only for non-encryption scenario.
|
||||
if GlobalKMS == nil {
|
||||
t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
}
|
||||
} else {
|
||||
t.Errorf("Test %d: %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -508,27 +508,31 @@ func testBucketRecreateFails(obj ObjectLayer, instanceType string, t TestErrHand
|
||||
}
|
||||
}
|
||||
|
||||
func execExtended(t *testing.T, fn func(t *testing.T)) {
|
||||
// Exec with default settings...
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressConfig.Enabled = false
|
||||
globalCompressConfigMu.Unlock()
|
||||
t.Run("default", func(t *testing.T) {
|
||||
fn(t)
|
||||
})
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
|
||||
func enableCompression(t *testing.T, encrypt bool) {
|
||||
// Enable compression and exec...
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressConfig.Enabled = true
|
||||
globalCompressConfig.MimeTypes = nil
|
||||
globalCompressConfig.Extensions = nil
|
||||
globalCompressConfig.AllowEncrypted = encrypt
|
||||
globalCompressConfigMu.Unlock()
|
||||
if encrypt {
|
||||
globalAutoEncryption = encrypt
|
||||
os.Setenv("MINIO_KMS_MASTER_KEY", "my-minio-key:6368616e676520746869732070617373776f726420746f206120736563726574")
|
||||
defer os.Setenv("MINIO_KMS_MASTER_KEY", "")
|
||||
var err error
|
||||
GlobalKMS, err = crypto.NewKMS(crypto.KMSConfig{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func enableEncrytion(t *testing.T) {
|
||||
// Exec with default settings...
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressConfig.Enabled = false
|
||||
globalCompressConfigMu.Unlock()
|
||||
t.Run("compressed", func(t *testing.T) {
|
||||
fn(t)
|
||||
})
|
||||
|
||||
globalAutoEncryption = true
|
||||
os.Setenv("MINIO_KMS_MASTER_KEY", "my-minio-key:6368616e676520746869732070617373776f726420746f206120736563726574")
|
||||
@ -538,25 +542,46 @@ func execExtended(t *testing.T, fn func(t *testing.T)) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("encrypted", func(t *testing.T) {
|
||||
fn(t)
|
||||
})
|
||||
|
||||
// Enable compression of encrypted and exec...
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressConfig.AllowEncrypted = true
|
||||
globalCompressConfigMu.Unlock()
|
||||
t.Run("compressed+encrypted", func(t *testing.T) {
|
||||
fn(t)
|
||||
})
|
||||
|
||||
func resetCompressEncryption() {
|
||||
// Reset...
|
||||
globalCompressConfigMu.Lock()
|
||||
globalCompressConfig.Enabled = false
|
||||
globalCompressConfig.AllowEncrypted = false
|
||||
globalCompressConfigMu.Unlock()
|
||||
globalAutoEncryption = false
|
||||
GlobalKMS = nil
|
||||
}
|
||||
|
||||
func execExtended(t *testing.T, fn func(t *testing.T)) {
|
||||
// Exec with default settings...
|
||||
resetCompressEncryption()
|
||||
t.Run("default", func(t *testing.T) {
|
||||
fn(t)
|
||||
})
|
||||
|
||||
if testing.Short() {
|
||||
return
|
||||
}
|
||||
|
||||
t.Run("compressed", func(t *testing.T) {
|
||||
resetCompressEncryption()
|
||||
enableCompression(t, false)
|
||||
fn(t)
|
||||
})
|
||||
|
||||
t.Run("encrypted", func(t *testing.T) {
|
||||
resetCompressEncryption()
|
||||
enableEncrytion(t)
|
||||
fn(t)
|
||||
})
|
||||
|
||||
t.Run("compressed+encrypted", func(t *testing.T) {
|
||||
resetCompressEncryption()
|
||||
enableCompression(t, true)
|
||||
fn(t)
|
||||
})
|
||||
}
|
||||
|
||||
// ExecExtendedObjectLayerTest will execute the tests with combinations of encrypted & compressed.
|
||||
|
@ -17,7 +17,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
peerRESTVersion = "v11"
|
||||
peerRESTVersion = "v12"
|
||||
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
|
||||
peerRESTPrefix = minioReservedBucketPath + "/peer"
|
||||
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
|
||||
|
@ -782,13 +782,17 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req
|
||||
case serviceStop:
|
||||
globalServiceSignalCh <- signal
|
||||
case serviceReloadDynamic:
|
||||
srvCfg, err := getValidConfig(newObjectLayerFn())
|
||||
objAPI := newObjectLayerFn()
|
||||
if objAPI == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
srvCfg, err := getValidConfig(objAPI)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
err = applyDynamicConfig(r.Context(), srvCfg)
|
||||
if err != nil {
|
||||
if err = applyDynamicConfig(r.Context(), objAPI, srvCfg); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
}
|
||||
return
|
||||
|
@ -452,7 +452,7 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
|
||||
|
||||
prevGlobalServerConfig := globalServerConfig
|
||||
globalServerConfig = newServerConfig()
|
||||
lookupConfigs(globalServerConfig, 0)
|
||||
lookupConfigs(globalServerConfig, nil)
|
||||
|
||||
restClient := newStorageRESTClient(endpoint, false)
|
||||
|
||||
|
@ -1213,9 +1213,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
|
||||
// Storing the compression metadata.
|
||||
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
|
||||
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
|
||||
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10)
|
||||
|
||||
actualReader, err := hash.NewReader(reader, size, "", "", actualSize, globalCLIContext.StrictS3Compat)
|
||||
actualReader, err := hash.NewReader(reader, actualSize, "", "", actualSize, globalCLIContext.StrictS3Compat)
|
||||
if err != nil {
|
||||
writeWebErrorResponse(w, err)
|
||||
return
|
||||
@ -1223,7 +1223,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Set compression metrics.
|
||||
size = -1 // Since compressed size is un-predictable.
|
||||
s2c := newS2CompressReader(actualReader)
|
||||
s2c := newS2CompressReader(actualReader, actualSize)
|
||||
defer s2c.Close()
|
||||
reader = s2c
|
||||
hashReader, err = hash.NewReader(reader, size, "", "", actualSize, globalCLIContext.StrictS3Compat)
|
||||
|
@ -256,12 +256,8 @@ type ErasureBackend struct {
|
||||
Type backendType `json:"backendType,omitempty"`
|
||||
OnlineDisks int `json:"onlineDisks,omitempty"`
|
||||
OfflineDisks int `json:"offlineDisks,omitempty"`
|
||||
// Data disks for currently configured Standard storage class.
|
||||
StandardSCData int `json:"standardSCData,omitempty"`
|
||||
// Parity disks for currently configured Standard storage class.
|
||||
StandardSCParity int `json:"standardSCParity,omitempty"`
|
||||
// Data disks for currently configured Reduced Redundancy storage class.
|
||||
RRSCData int `json:"rrSCData,omitempty"`
|
||||
// Parity disks for currently configured Reduced Redundancy storage class.
|
||||
RRSCParity int `json:"rrSCParity,omitempty"`
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user