mirror of https://github.com/minio/minio.git
do not panic on rebalance during server restarts (#19563)
This PR makes a feasible approach to handle all the scenarios that we must face to avoid returning "panic." Instead, we must return "errServerNotInitialized" when a bucketMetadataSys.Get() is called, allowing the caller to retry their operation and wait. Bonus fix the way data-usage-cache stores the object. Instead of storing usage-cache.bin with the bucket as `.minio.sys/buckets`, the `buckets` must be relative to the bucket `.minio.sys` as part of the object name. Otherwise, there is no way to decommission entries at `.minio.sys/buckets` and their final erasure set positions. A bucket must never have a `/` in it. Adds code to read() from existing data-usage.bin upon upgrade.
This commit is contained in:
parent
6bfff7532e
commit
95c65f4e8f
|
@ -263,6 +263,7 @@ const (
|
|||
ErrInvalidResourceName
|
||||
ErrInvalidLifecycleQueryParameter
|
||||
ErrServerNotInitialized
|
||||
ErrBucketMetadataNotInitialized
|
||||
ErrRequestTimedout
|
||||
ErrClientDisconnected
|
||||
ErrTooManyRequests
|
||||
|
@ -1295,7 +1296,12 @@ var errorCodes = errorCodeMap{
|
|||
},
|
||||
ErrServerNotInitialized: {
|
||||
Code: "XMinioServerNotInitialized",
|
||||
Description: "Server not initialized, please try again.",
|
||||
Description: "Server not initialized yet, please try again.",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrBucketMetadataNotInitialized: {
|
||||
Code: "XMinioBucketMetadataNotInitialized",
|
||||
Description: "Bucket metadata not initialized yet, please try again.",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrMalformedJSON: {
|
||||
|
@ -2211,6 +2217,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
|
|||
apiErr = ErrInvalidMaxParts
|
||||
case ioutil.ErrOverread:
|
||||
apiErr = ErrExcessData
|
||||
case errServerNotInitialized:
|
||||
apiErr = ErrServerNotInitialized
|
||||
case errBucketMetadataNotInitialized:
|
||||
apiErr = ErrBucketMetadataNotInitialized
|
||||
}
|
||||
|
||||
// Compression errors
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -46,6 +46,7 @@ type BucketMetadataSys struct {
|
|||
objAPI ObjectLayer
|
||||
|
||||
sync.RWMutex
|
||||
initialized bool
|
||||
metadataMap map[string]BucketMetadata
|
||||
}
|
||||
|
||||
|
@ -433,6 +434,8 @@ func (sys *BucketMetadataSys) GetConfigFromDisk(ctx context.Context, bucket stri
|
|||
return loadBucketMetadata(ctx, objAPI, bucket)
|
||||
}
|
||||
|
||||
var errBucketMetadataNotInitialized = errors.New("bucket metadata not initialized yet")
|
||||
|
||||
// GetConfig returns a specific configuration from the bucket metadata.
|
||||
// The returned object may not be modified.
|
||||
// reloaded will be true if metadata refreshed from disk
|
||||
|
@ -454,6 +457,10 @@ func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (met
|
|||
}
|
||||
meta, err = loadBucketMetadata(ctx, objAPI, bucket)
|
||||
if err != nil {
|
||||
if !sys.Initialized() {
|
||||
// bucket metadata not yet initialized
|
||||
return newBucketMetadata(bucket), reloaded, errBucketMetadataNotInitialized
|
||||
}
|
||||
return meta, reloaded, err
|
||||
}
|
||||
sys.Lock()
|
||||
|
@ -498,9 +505,10 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
|||
}
|
||||
|
||||
errs := g.Wait()
|
||||
for _, err := range errs {
|
||||
for index, err := range errs {
|
||||
if err != nil {
|
||||
internalLogIf(ctx, err, logger.WarningKind)
|
||||
internalLogOnceIf(ctx, fmt.Errorf("Unable to load bucket metadata, will be retried: %w", err),
|
||||
"load-bucket-metadata-"+buckets[index].Name, logger.WarningKind)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -583,6 +591,14 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa
|
|||
}
|
||||
}
|
||||
|
||||
// Initialized indicates if bucket metadata sys is initialized atleast once.
|
||||
func (sys *BucketMetadataSys) Initialized() bool {
|
||||
sys.RLock()
|
||||
defer sys.RUnlock()
|
||||
|
||||
return sys.initialized
|
||||
}
|
||||
|
||||
// Loads bucket metadata for all buckets into BucketMetadataSys.
|
||||
func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) {
|
||||
count := 100 // load 100 bucket metadata at a time.
|
||||
|
@ -596,6 +612,10 @@ func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) {
|
|||
buckets = buckets[count:]
|
||||
}
|
||||
|
||||
sys.Lock()
|
||||
sys.initialized = true
|
||||
sys.Unlock()
|
||||
|
||||
if globalIsDistErasure {
|
||||
go sys.refreshBucketsMetadataLoop(ctx, failedBuckets)
|
||||
}
|
||||
|
|
|
@ -44,7 +44,6 @@ func (sys *BucketObjectLockSys) Get(bucketName string) (r objectlock.Retention,
|
|||
if errors.Is(err, errInvalidArgument) {
|
||||
return r, err
|
||||
}
|
||||
logger.CriticalIf(context.Background(), err)
|
||||
return r, err
|
||||
}
|
||||
return config.ToRetention(), nil
|
||||
|
|
|
@ -81,13 +81,10 @@ const (
|
|||
// gets replication config associated to a given bucket name.
|
||||
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
||||
rCfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
|
||||
if err != nil {
|
||||
if errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) || errors.Is(err, errInvalidArgument) {
|
||||
return rCfg, err
|
||||
}
|
||||
logger.CriticalIf(ctx, err)
|
||||
if err != nil && !errors.Is(err, BucketReplicationConfigNotFound{Bucket: bucketName}) {
|
||||
return rCfg, err
|
||||
}
|
||||
return rCfg, err
|
||||
return rCfg, nil
|
||||
}
|
||||
|
||||
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
||||
|
@ -261,10 +258,16 @@ func mustReplicate(ctx context.Context, bucket, object string, mopts mustReplica
|
|||
if mopts.replicationRequest { // incoming replication request on target cluster
|
||||
return
|
||||
}
|
||||
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return
|
||||
}
|
||||
if cfg == nil {
|
||||
return
|
||||
}
|
||||
|
||||
opts := replication.ObjectOpts{
|
||||
Name: object,
|
||||
SSEC: crypto.SSEC.IsEncrypted(mopts.meta),
|
||||
|
@ -312,6 +315,7 @@ var standardHeaders = []string{
|
|||
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
|
||||
c, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil || c == nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return false
|
||||
}
|
||||
for _, obj := range objects {
|
||||
|
@ -331,6 +335,7 @@ func isStandardHeader(matchHeaderKey string) bool {
|
|||
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, delOpts ObjectOptions, gerr error) (dsc ReplicateDecision) {
|
||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil || rcfg == nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return
|
||||
}
|
||||
// If incoming request is a replication request, it does not need to be re-replicated.
|
||||
|
@ -2231,6 +2236,8 @@ func getProxyTargets(ctx context.Context, bucket, object string, opts ObjectOpti
|
|||
}
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil || cfg == nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
|
||||
return &madmin.BucketTargets{}
|
||||
}
|
||||
topts := replication.ObjectOpts{Name: object}
|
||||
|
@ -3124,7 +3131,7 @@ func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationR
|
|||
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) {
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
replLogIf(ctx, err)
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return nil, err
|
||||
}
|
||||
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||
|
@ -3217,7 +3224,11 @@ func QueueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, ret
|
|||
if oi.ModTime.IsZero() {
|
||||
return
|
||||
}
|
||||
rcfg, _ := getReplicationConfig(ctx, bucket)
|
||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
replLogOnceIf(ctx, err, bucket)
|
||||
return
|
||||
}
|
||||
tgts, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||
queueReplicationHeal(ctx, bucket, oi, replicationConfig{
|
||||
Config: rcfg,
|
||||
|
|
|
@ -82,7 +82,7 @@ func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r
|
|||
}, r.URL)
|
||||
return
|
||||
}
|
||||
if _, err := getReplicationConfig(ctx, bucket); err == nil && v.Suspended() {
|
||||
if rc, _ := getReplicationConfig(ctx, bucket); rc != nil && v.Suspended() {
|
||||
writeErrorResponse(ctx, w, APIError{
|
||||
Code: "InvalidBucketState",
|
||||
Description: "A replication configuration is present on this bucket, bucket wide versioning cannot be suspended.",
|
||||
|
|
|
@ -36,7 +36,9 @@ import (
|
|||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
"github.com/minio/minio/internal/bucket/object/lock"
|
||||
objectlock "github.com/minio/minio/internal/bucket/object/lock"
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/minio/minio/internal/bucket/versioning"
|
||||
"github.com/minio/minio/internal/color"
|
||||
"github.com/minio/minio/internal/config/heal"
|
||||
"github.com/minio/minio/internal/event"
|
||||
|
@ -952,10 +954,32 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
|
|||
}
|
||||
|
||||
versionID := oi.VersionID
|
||||
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
|
||||
rCfg, _ := globalBucketObjectLockSys.Get(i.bucket)
|
||||
replcfg, _ := getReplicationConfig(ctx, i.bucket)
|
||||
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, replcfg, oi)
|
||||
|
||||
var vc *versioning.Versioning
|
||||
var lr objectlock.Retention
|
||||
var rcfg *replication.Config
|
||||
if i.bucket != minioMetaBucket {
|
||||
vc, err = globalBucketVersioningSys.Get(i.bucket)
|
||||
if err != nil {
|
||||
scannerLogOnceIf(ctx, err, i.bucket)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if bucket is object locked.
|
||||
lr, err = globalBucketObjectLockSys.Get(i.bucket)
|
||||
if err != nil {
|
||||
scannerLogOnceIf(ctx, err, i.bucket)
|
||||
return
|
||||
}
|
||||
|
||||
rcfg, err = getReplicationConfig(ctx, i.bucket)
|
||||
if err != nil {
|
||||
scannerLogOnceIf(ctx, err, i.bucket)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, lr, rcfg, oi)
|
||||
if i.debug {
|
||||
if versionID != "" {
|
||||
console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, lcEvt.Action)
|
||||
|
@ -973,7 +997,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
|
|||
size = 0
|
||||
case lifecycle.DeleteAction:
|
||||
// On a non-versioned bucket, DeleteObject removes the only version permanently.
|
||||
if !vcfg.PrefixEnabled(oi.Name) {
|
||||
if !vc.PrefixEnabled(oi.Name) {
|
||||
size = 0
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -36,7 +35,6 @@ import (
|
|||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
"github.com/valyala/bytebufferpool"
|
||||
)
|
||||
|
@ -1005,11 +1003,23 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
|
|||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
|
||||
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, name), nil, http.Header{}, ObjectOptions{NoLock: true})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound, BucketNotFound:
|
||||
return false, nil
|
||||
r, err = store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound, BucketNotFound:
|
||||
return false, nil
|
||||
case InsufficientReadQuorum, StorageErr:
|
||||
return true, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
err = d.deserialize(r)
|
||||
r.Close()
|
||||
return err != nil, nil
|
||||
case InsufficientReadQuorum, StorageErr:
|
||||
return true, nil
|
||||
}
|
||||
|
@ -1070,24 +1080,11 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
|
|||
}
|
||||
|
||||
save := func(name string, timeout time.Duration) error {
|
||||
hr, err := hash.NewReader(ctx, bytes.NewReader(buf.Bytes()), int64(buf.Len()), "", "", int64(buf.Len()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Abandon if more than a minute, so we don't hold up scanner.
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
_, err = store.PutObject(ctx,
|
||||
dataUsageBucket,
|
||||
name,
|
||||
NewPutObjReader(hr),
|
||||
ObjectOptions{NoLock: true})
|
||||
if isErrBucketNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
return saveConfig(ctx, store, pathJoin(bucketMetaPrefix, name), buf.Bytes())
|
||||
}
|
||||
defer save(name+".bkp", 5*time.Second) // Keep a backup as well
|
||||
|
||||
|
|
|
@ -1833,9 +1833,18 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||
var replcfg *replication.Config
|
||||
if opts.Expiration.Expire {
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, _ = globalLifecycleSys.Get(bucket)
|
||||
rcfg, _ = globalBucketObjectLockSys.Get(bucket)
|
||||
replcfg, _ = getReplicationConfig(ctx, bucket)
|
||||
lc, err = globalLifecycleSys.Get(bucket)
|
||||
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bucket}) {
|
||||
return objInfo, err
|
||||
}
|
||||
rcfg, err = globalBucketObjectLockSys.Get(bucket)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
replcfg, err = getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
}
|
||||
|
||||
// expiration attempted on a bucket with no lifecycle
|
||||
|
|
|
@ -31,6 +31,10 @@ import (
|
|||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
objectlock "github.com/minio/minio/internal/bucket/object/lock"
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/minio/minio/internal/bucket/versioning"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/v2/console"
|
||||
|
@ -754,14 +758,33 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||
return err
|
||||
}
|
||||
|
||||
vc, _ := globalBucketVersioningSys.Get(bi.Name)
|
||||
var vc *versioning.Versioning
|
||||
var lc *lifecycle.Lifecycle
|
||||
var lr objectlock.Retention
|
||||
var rcfg *replication.Config
|
||||
if bi.Name != minioMetaBucket {
|
||||
vc, err = globalBucketVersioningSys.Get(bi.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, _ := globalLifecycleSys.Get(bi.Name)
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, err = globalLifecycleSys.Get(bi.Name)
|
||||
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bi.Name}) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if bucket is object locked.
|
||||
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
|
||||
rcfg, _ := getReplicationConfig(ctx, bi.Name)
|
||||
// Check if bucket is object locked.
|
||||
lr, err = globalBucketObjectLockSys.Get(bi.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rcfg, err = getReplicationConfig(ctx, bi.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for setIdx, set := range pool.sets {
|
||||
set := set
|
||||
|
@ -1088,14 +1111,33 @@ func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error
|
|||
pool := z.serverPools[idx]
|
||||
for _, set := range pool.sets {
|
||||
for _, bi := range buckets {
|
||||
vc, _ := globalBucketVersioningSys.Get(bi.Name)
|
||||
var vc *versioning.Versioning
|
||||
var lc *lifecycle.Lifecycle
|
||||
var lr objectlock.Retention
|
||||
var rcfg *replication.Config
|
||||
if bi.Name != minioMetaBucket {
|
||||
vc, err = globalBucketVersioningSys.Get(bi.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, _ := globalLifecycleSys.Get(bi.Name)
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, err = globalLifecycleSys.Get(bi.Name)
|
||||
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bi.Name}) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if bucket is object locked.
|
||||
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
|
||||
rcfg, _ := getReplicationConfig(ctx, bi.Name)
|
||||
// Check if bucket is object locked.
|
||||
lr, err = globalBucketObjectLockSys.Get(bi.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rcfg, err = getReplicationConfig(ctx, bi.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
|
||||
if lc == nil {
|
||||
|
@ -1118,7 +1160,7 @@ func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error
|
|||
}
|
||||
|
||||
var versionsFound int
|
||||
err := set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) {
|
||||
if err = set.listObjectsToDecommission(ctx, bi, func(entry metaCacheEntry) {
|
||||
if !entry.isObject() {
|
||||
return
|
||||
}
|
||||
|
@ -1146,8 +1188,7 @@ func (z *erasureServerPools) checkAfterDecom(ctx context.Context, idx int) error
|
|||
|
||||
versionsFound++
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,10 @@ import (
|
|||
"github.com/dustin/go-humanize"
|
||||
"github.com/lithammer/shortuuid/v4"
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
objectlock "github.com/minio/minio/internal/bucket/object/lock"
|
||||
"github.com/minio/minio/internal/bucket/replication"
|
||||
"github.com/minio/minio/internal/bucket/versioning"
|
||||
"github.com/minio/minio/internal/hash"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
|
@ -448,9 +452,11 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
|
|||
}
|
||||
|
||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
||||
err = z.rebalanceBucket(ctx, bucket, poolIdx)
|
||||
if err != nil {
|
||||
if err = z.rebalanceBucket(ctx, bucket, poolIdx); err != nil {
|
||||
stopFn(err)
|
||||
if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) {
|
||||
continue
|
||||
}
|
||||
rebalanceLogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
@ -521,14 +527,36 @@ func (set *erasureObjects) listObjectsToRebalance(ctx context.Context, bucketNam
|
|||
}
|
||||
|
||||
// rebalanceBucket rebalances objects under bucket in poolIdx pool
|
||||
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error {
|
||||
func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) (err error) {
|
||||
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
|
||||
vc, _ := globalBucketVersioningSys.Get(bucket)
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, _ := globalLifecycleSys.Get(bucket)
|
||||
// Check if bucket is object locked.
|
||||
lr, _ := globalBucketObjectLockSys.Get(bucket)
|
||||
rcfg, _ := getReplicationConfig(ctx, bucket)
|
||||
|
||||
var vc *versioning.Versioning
|
||||
var lc *lifecycle.Lifecycle
|
||||
var lr objectlock.Retention
|
||||
var rcfg *replication.Config
|
||||
if bucket != minioMetaBucket {
|
||||
vc, err = globalBucketVersioningSys.Get(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, err = globalLifecycleSys.Get(bucket)
|
||||
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bucket}) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if bucket is object locked.
|
||||
lr, err = globalBucketObjectLockSys.Get(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rcfg, err = getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
pool := z.serverPools[poolIdx]
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
|
@ -213,14 +214,35 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||
continue
|
||||
}
|
||||
|
||||
vc, _ := globalBucketVersioningSys.Get(bucket)
|
||||
vc, err := globalBucketVersioningSys.Get(bucket)
|
||||
if err != nil {
|
||||
retErr = err
|
||||
healingLogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
lc, _ := globalLifecycleSys.Get(bucket)
|
||||
lc, err := globalLifecycleSys.Get(bucket)
|
||||
if err != nil && !errors.Is(err, BucketLifecycleNotFound{Bucket: bucket}) {
|
||||
retErr = err
|
||||
healingLogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if bucket is object locked.
|
||||
lr, _ := globalBucketObjectLockSys.Get(bucket)
|
||||
rcfg, _ := getReplicationConfig(ctx, bucket)
|
||||
lr, err := globalBucketObjectLockSys.Get(bucket)
|
||||
if err != nil {
|
||||
retErr = err
|
||||
healingLogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
retErr = err
|
||||
healingLogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if serverDebugLog {
|
||||
console.Debugf(color.Green("healDrive:")+" healing bucket %s content on %s erasure set\n",
|
||||
|
@ -442,7 +464,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||
bucket: actualBucket,
|
||||
}
|
||||
|
||||
err := listPathRaw(ctx, listPathRawOptions{
|
||||
err = listPathRaw(ctx, listPathRawOptions{
|
||||
disks: disks,
|
||||
fallbackDisks: fallbackDisks,
|
||||
bucket: actualBucket,
|
||||
|
|
|
@ -556,15 +556,23 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
|||
if !proxy.Proxy { // apply lifecycle rules only for local requests
|
||||
// Automatically remove the object/version if an expiry lifecycle rule can be applied
|
||||
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
|
||||
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
|
||||
replcfg, _ := getReplicationConfig(ctx, bucket)
|
||||
rcfg, err := globalBucketObjectLockSys.Get(bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
replcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
|
||||
if event.Action.Delete() {
|
||||
// apply whatever the expiry rule is.
|
||||
applyExpiryRule(event, lcEventSrc_s3GetObject, objInfo)
|
||||
if !event.Action.DeleteRestored() {
|
||||
// If the ILM action is not on restored object return error.
|
||||
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchKey), r.URL)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -728,7 +736,7 @@ func (api objectAPIHandlers) getObjectAttributesHandler(ctx context.Context, obj
|
|||
}
|
||||
|
||||
if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
|
||||
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1077,8 +1085,16 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
|||
if !proxy.Proxy { // apply lifecycle rules only locally not for proxied requests
|
||||
// Automatically remove the object/version if an expiry lifecycle rule can be applied
|
||||
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
|
||||
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
|
||||
replcfg, _ := getReplicationConfig(ctx, bucket)
|
||||
rcfg, err := globalBucketObjectLockSys.Get(bucket)
|
||||
if err != nil {
|
||||
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
|
||||
return
|
||||
}
|
||||
replcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
|
||||
return
|
||||
}
|
||||
event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
|
||||
if event.Action.Delete() {
|
||||
// apply whatever the expiry rule is.
|
||||
|
|
|
@ -108,6 +108,16 @@ func reliableMkdirAll(dirPath string, mode os.FileMode, baseDir string) (err err
|
|||
// Retry only for the first retryable error.
|
||||
if osIsNotExist(err) && i == 0 {
|
||||
i++
|
||||
// Determine if os.NotExist error is because of
|
||||
// baseDir's parent being present, retry it once such
|
||||
// that the MkdirAll is retried once for the parent
|
||||
// of dirPath.
|
||||
// Because it is worth a retry to skip a different
|
||||
// baseDir which is slightly higher up the depth.
|
||||
nbaseDir := path.Dir(baseDir)
|
||||
if baseDir != "" && nbaseDir != "" && nbaseDir != SlashSeparator {
|
||||
baseDir = nbaseDir
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue