mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
fix: simplify background heal and trigger heal items early (#9928)
Bonus fix during versioning merge one of the PR was missing the offline/online disk count fix from #9801 port it correctly over to the master branch from release. Additionally, add versionID support for MRF Fixes #9910 Fixes #9931
This commit is contained in:
parent
7cea3f7da4
commit
a38ce29137
@ -33,6 +33,7 @@ function start_minio_3_node() {
|
|||||||
declare -a ARGS
|
declare -a ARGS
|
||||||
export MINIO_ACCESS_KEY=minio
|
export MINIO_ACCESS_KEY=minio
|
||||||
export MINIO_SECRET_KEY=minio123
|
export MINIO_SECRET_KEY=minio123
|
||||||
|
export MINIO_ERASURE_SET_DRIVE_COUNT=6
|
||||||
|
|
||||||
start_port=$(shuf -i 10000-65000 -n 1)
|
start_port=$(shuf -i 10000-65000 -n 1)
|
||||||
for i in $(seq 1 3); do
|
for i in $(seq 1 3); do
|
||||||
|
@ -173,7 +173,7 @@ func (a adminAPIHandlers) GetConfigKVHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg := globalServerConfig
|
cfg := globalServerConfig
|
||||||
if globalSafeMode {
|
if newObjectLayerFn() == nil {
|
||||||
var err error
|
var err error
|
||||||
cfg, err = getValidConfig(objectAPI)
|
cfg, err = getValidConfig(objectAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -718,10 +718,6 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// find number of disks in the setup, ignore any errors here.
|
|
||||||
info, _ := objectAPI.StorageInfo(ctx, false)
|
|
||||||
numDisks := info.Backend.OfflineDisks.Sum() + info.Backend.OnlineDisks.Sum()
|
|
||||||
|
|
||||||
healPath := pathJoin(hip.bucket, hip.objPrefix)
|
healPath := pathJoin(hip.bucket, hip.objPrefix)
|
||||||
if hip.clientToken == "" && !hip.forceStart && !hip.forceStop {
|
if hip.clientToken == "" && !hip.forceStart && !hip.forceStop {
|
||||||
nh, exists := globalAllHealState.getHealSequence(healPath)
|
nh, exists := globalAllHealState.getHealSequence(healPath)
|
||||||
@ -764,7 +760,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
respCh <- hr
|
respCh <- hr
|
||||||
}()
|
}()
|
||||||
case hip.clientToken == "":
|
case hip.clientToken == "":
|
||||||
nh := newHealSequence(GlobalContext, hip.bucket, hip.objPrefix, handlers.GetSourceIP(r), numDisks, hip.hs, hip.forceStart)
|
nh := newHealSequence(GlobalContext, hip.bucket, hip.objPrefix, handlers.GetSourceIP(r), hip.hs, hip.forceStart)
|
||||||
go func() {
|
go func() {
|
||||||
respBytes, apiErr, errMsg := globalAllHealState.LaunchNewHealSequence(nh)
|
respBytes, apiErr, errMsg := globalAllHealState.LaunchNewHealSequence(nh)
|
||||||
hr := healResp{respBytes, apiErr, errMsg}
|
hr := healResp{respBytes, apiErr, errMsg}
|
||||||
@ -1409,10 +1405,8 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mode := ""
|
mode := "safe"
|
||||||
if globalSafeMode {
|
if newObjectLayerFn() != nil {
|
||||||
mode = "safe"
|
|
||||||
} else {
|
|
||||||
mode = "online"
|
mode = "online"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,9 +76,6 @@ type healSequenceStatus struct {
|
|||||||
FailureDetail string `json:"Detail,omitempty"`
|
FailureDetail string `json:"Detail,omitempty"`
|
||||||
StartTime time.Time `json:"StartTime"`
|
StartTime time.Time `json:"StartTime"`
|
||||||
|
|
||||||
// disk information
|
|
||||||
NumDisks int `json:"NumDisks"`
|
|
||||||
|
|
||||||
// settings for the heal sequence
|
// settings for the heal sequence
|
||||||
HealSettings madmin.HealOpts `json:"Settings"`
|
HealSettings madmin.HealOpts `json:"Settings"`
|
||||||
|
|
||||||
@ -94,8 +91,8 @@ type allHealState struct {
|
|||||||
healSeqMap map[string]*healSequence
|
healSeqMap map[string]*healSequence
|
||||||
}
|
}
|
||||||
|
|
||||||
// initHealState - initialize healing apparatus
|
// newHealState - initialize global heal state management
|
||||||
func initHealState() *allHealState {
|
func newHealState() *allHealState {
|
||||||
healState := &allHealState{
|
healState := &allHealState{
|
||||||
healSeqMap: make(map[string]*healSequence),
|
healSeqMap: make(map[string]*healSequence),
|
||||||
}
|
}
|
||||||
@ -367,7 +364,7 @@ type healSequence struct {
|
|||||||
// NewHealSequence - creates healSettings, assumes bucket and
|
// NewHealSequence - creates healSettings, assumes bucket and
|
||||||
// objPrefix are already validated.
|
// objPrefix are already validated.
|
||||||
func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
||||||
numDisks int, hs madmin.HealOpts, forceStart bool) *healSequence {
|
hs madmin.HealOpts, forceStart bool) *healSequence {
|
||||||
|
|
||||||
reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket}
|
reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket}
|
||||||
reqInfo.AppendTags("prefix", objPrefix)
|
reqInfo.AppendTags("prefix", objPrefix)
|
||||||
@ -386,7 +383,6 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
|||||||
currentStatus: healSequenceStatus{
|
currentStatus: healSequenceStatus{
|
||||||
Summary: healNotStartedStatus,
|
Summary: healNotStartedStatus,
|
||||||
HealSettings: hs,
|
HealSettings: hs,
|
||||||
NumDisks: numDisks,
|
|
||||||
},
|
},
|
||||||
traverseAndHealDoneCh: make(chan error),
|
traverseAndHealDoneCh: make(chan error),
|
||||||
cancelCtx: cancel,
|
cancelCtx: cancel,
|
||||||
@ -677,11 +673,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *healSequence) healItemsFromSourceCh() error {
|
func (h *healSequence) healItemsFromSourceCh() error {
|
||||||
bucketsOnly := true // heal buckets only, not objects.
|
|
||||||
if err := h.healItems(bucketsOnly); err != nil {
|
|
||||||
logger.LogIf(h.ctx, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case source, ok := <-h.sourceCh:
|
case source, ok := <-h.sourceCh:
|
||||||
@ -716,7 +707,7 @@ func (h *healSequence) healFromSourceCh() {
|
|||||||
h.healItemsFromSourceCh()
|
h.healItemsFromSourceCh()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *healSequence) healItems(bucketsOnly bool) error {
|
func (h *healSequence) healDiskMeta() error {
|
||||||
// Start with format healing
|
// Start with format healing
|
||||||
if err := h.healDiskFormat(); err != nil {
|
if err := h.healDiskFormat(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -728,7 +719,11 @@ func (h *healSequence) healItems(bucketsOnly bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start healing the bucket config prefix.
|
// Start healing the bucket config prefix.
|
||||||
if err := h.healMinioSysMeta(bucketConfigPrefix)(); err != nil {
|
return h.healMinioSysMeta(bucketConfigPrefix)()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *healSequence) healItems(bucketsOnly bool) error {
|
||||||
|
if err := h.healDiskMeta(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initHealRoutine() *healRoutine {
|
func newHealRoutine() *healRoutine {
|
||||||
return &healRoutine{
|
return &healRoutine{
|
||||||
tasks: make(chan healTask),
|
tasks: make(chan healTask),
|
||||||
doneCh: make(chan struct{}),
|
doneCh: make(chan struct{}),
|
||||||
@ -112,22 +112,22 @@ func initHealRoutine() *healRoutine {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func startBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||||
// Run the background healer
|
// Run the background healer
|
||||||
globalBackgroundHealRoutine = initHealRoutine()
|
globalBackgroundHealRoutine = newHealRoutine()
|
||||||
go globalBackgroundHealRoutine.run(ctx, objAPI)
|
go globalBackgroundHealRoutine.run(ctx, objAPI)
|
||||||
|
|
||||||
// Launch the background healer sequence to track
|
nh := newBgHealSequence()
|
||||||
// background healing operations, ignore errors
|
// Heal any disk format and metadata early, if possible.
|
||||||
// errors are handled into offline disks already.
|
if err := nh.healDiskMeta(); err != nil {
|
||||||
info, _ := objAPI.StorageInfo(ctx, false)
|
if newObjectLayerFn() != nil {
|
||||||
numDisks := info.Backend.OnlineDisks.Sum() + info.Backend.OfflineDisks.Sum()
|
// log only in situations, when object layer
|
||||||
nh := newBgHealSequence(numDisks)
|
// has fully initialized.
|
||||||
globalBackgroundHealState.LaunchNewHealSequence(nh)
|
logger.LogIf(nh.ctx, err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
globalBackgroundHealState.LaunchNewHealSequence(nh)
|
||||||
go startBackgroundHealing(ctx, objAPI)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// healDiskFormat - heals format.json, return value indicates if a
|
// healDiskFormat - heals format.json, return value indicates if a
|
||||||
@ -144,12 +144,20 @@ func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpt
|
|||||||
// Healing succeeded notify the peers to reload format and re-initialize disks.
|
// Healing succeeded notify the peers to reload format and re-initialize disks.
|
||||||
// We will not notify peers if healing is not required.
|
// We will not notify peers if healing is not required.
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
|
// Notify servers in background and retry if needed.
|
||||||
if nerr.Err != nil {
|
go func() {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
retry:
|
||||||
logger.LogIf(ctx, nerr.Err)
|
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
|
||||||
|
if nerr.Err != nil {
|
||||||
|
if nerr.Err.Error() == errServerNotInitialized.Error() {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
goto retry
|
||||||
|
}
|
||||||
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
|
logger.LogIf(ctx, nerr.Err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
@ -149,7 +149,7 @@ func readServerConfig(ctx context.Context, objAPI ObjectLayer) (config.Config, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// Config not found for some reason, allow things to continue
|
// Config not found for some reason, allow things to continue
|
||||||
// by initializing a new fresh config in safe mode.
|
// by initializing a new fresh config in safe mode.
|
||||||
if err == errConfigNotFound && globalSafeMode {
|
if err == errConfigNotFound && newObjectLayerFn() == nil {
|
||||||
return newServerConfig(), nil
|
return newServerConfig(), nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -668,9 +668,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if there is any offline disk and add it to the MRF list
|
// Check if there is any offline disk and add it to the MRF list
|
||||||
for i := 0; i < len(onlineDisks); i++ {
|
for i, disk := range onlineDisks {
|
||||||
if onlineDisks[i] == nil || storageDisks[i] == nil {
|
if disk == nil || storageDisks[i] == nil {
|
||||||
er.addPartialUpload(bucket, object)
|
er.addPartial(bucket, object, fi.VersionID)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -714,7 +714,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
// during this upload, send it to the MRF list.
|
// during this upload, send it to the MRF list.
|
||||||
for i := 0; i < len(onlineDisks); i++ {
|
for i := 0; i < len(onlineDisks); i++ {
|
||||||
if onlineDisks[i] == nil || storageDisks[i] == nil {
|
if onlineDisks[i] == nil || storageDisks[i] == nil {
|
||||||
er.addPartialUpload(bucket, object)
|
er.addPartial(bucket, object, fi.VersionID)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -897,6 +897,23 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check failed deletes across multiple objects
|
||||||
|
for _, version := range versions {
|
||||||
|
// Check if there is any offline disk and add it to the MRF list
|
||||||
|
for _, disk := range storageDisks {
|
||||||
|
if disk == nil {
|
||||||
|
// ignore delete markers for quorum
|
||||||
|
if version.Deleted {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// all other direct versionId references we should
|
||||||
|
// ensure no dangling file is left over.
|
||||||
|
er.addPartial(bucket, version.Name, version.VersionID)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return dobjects, errs
|
return dobjects, errs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -935,14 +952,21 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
|||||||
return objInfo, toObjectErr(err, bucket, object)
|
return objInfo, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, disk := range storageDisks {
|
||||||
|
if disk == nil {
|
||||||
|
er.addPartial(bucket, object, opts.VersionID)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ObjectInfo{Bucket: bucket, Name: object, VersionID: opts.VersionID}, nil
|
return ObjectInfo{Bucket: bucket, Name: object, VersionID: opts.VersionID}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the successful but partial upload, however ignore
|
// Send the successful but partial upload/delete, however ignore
|
||||||
// if the channel is blocked by other items.
|
// if the channel is blocked by other items.
|
||||||
func (er erasureObjects) addPartialUpload(bucket, key string) {
|
func (er erasureObjects) addPartial(bucket, object, versionID string) {
|
||||||
select {
|
select {
|
||||||
case er.mrfUploadCh <- partialUpload{bucket: bucket, object: key}:
|
case er.mrfOpCh <- partialOperation{bucket: bucket, object: object, versionID: versionID}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,8 +92,8 @@ type erasureSets struct {
|
|||||||
poolSplunk *MergeWalkPool
|
poolSplunk *MergeWalkPool
|
||||||
poolVersions *MergeWalkVersionsPool
|
poolVersions *MergeWalkVersionsPool
|
||||||
|
|
||||||
mrfMU sync.Mutex
|
mrfMU sync.Mutex
|
||||||
mrfUploads map[healSource]int
|
mrfOperations map[healSource]int
|
||||||
}
|
}
|
||||||
|
|
||||||
func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool {
|
func isEndpointConnected(diskMap map[string]StorageAPI, endpoint string) bool {
|
||||||
@ -307,7 +307,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
|||||||
pool: NewMergeWalkPool(globalMergeLookupTimeout),
|
pool: NewMergeWalkPool(globalMergeLookupTimeout),
|
||||||
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
|
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
|
||||||
poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout),
|
poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout),
|
||||||
mrfUploads: make(map[healSource]int),
|
mrfOperations: make(map[healSource]int),
|
||||||
}
|
}
|
||||||
|
|
||||||
mutex := newNSLock(globalIsDistErasure)
|
mutex := newNSLock(globalIsDistErasure)
|
||||||
@ -351,7 +351,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
|||||||
getEndpoints: s.GetEndpoints(i),
|
getEndpoints: s.GetEndpoints(i),
|
||||||
nsMutex: mutex,
|
nsMutex: mutex,
|
||||||
bp: bp,
|
bp: bp,
|
||||||
mrfUploadCh: make(chan partialUpload, 10000),
|
mrfOpCh: make(chan partialOperation, 10000),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1608,9 +1608,9 @@ func (s *erasureSets) IsReady(_ context.Context) bool {
|
|||||||
// from all underlying er.sets and puts them in a global map which
|
// from all underlying er.sets and puts them in a global map which
|
||||||
// should not have more than 10000 entries.
|
// should not have more than 10000 entries.
|
||||||
func (s *erasureSets) maintainMRFList() {
|
func (s *erasureSets) maintainMRFList() {
|
||||||
var agg = make(chan partialUpload, 10000)
|
var agg = make(chan partialOperation, 10000)
|
||||||
for i, er := range s.sets {
|
for i, er := range s.sets {
|
||||||
go func(c <-chan partialUpload, setIndex int) {
|
go func(c <-chan partialOperation, setIndex int) {
|
||||||
for msg := range c {
|
for msg := range c {
|
||||||
msg.failedSet = setIndex
|
msg.failedSet = setIndex
|
||||||
select {
|
select {
|
||||||
@ -1618,19 +1618,20 @@ func (s *erasureSets) maintainMRFList() {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(er.mrfUploadCh, i)
|
}(er.mrfOpCh, i)
|
||||||
}
|
}
|
||||||
|
|
||||||
for fUpload := range agg {
|
for fOp := range agg {
|
||||||
s.mrfMU.Lock()
|
s.mrfMU.Lock()
|
||||||
if len(s.mrfUploads) > 10000 {
|
if len(s.mrfOperations) > 10000 {
|
||||||
s.mrfMU.Unlock()
|
s.mrfMU.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.mrfUploads[healSource{
|
s.mrfOperations[healSource{
|
||||||
bucket: fUpload.bucket,
|
bucket: fOp.bucket,
|
||||||
object: fUpload.object,
|
object: fOp.object,
|
||||||
}] = fUpload.failedSet
|
versionID: fOp.versionID,
|
||||||
|
}] = fOp.failedSet
|
||||||
s.mrfMU.Unlock()
|
s.mrfMU.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1656,17 +1657,17 @@ func (s *erasureSets) healMRFRoutine() {
|
|||||||
for e := range s.disksConnectEvent {
|
for e := range s.disksConnectEvent {
|
||||||
// Get the list of objects related the er.set
|
// Get the list of objects related the er.set
|
||||||
// to which the connected disk belongs.
|
// to which the connected disk belongs.
|
||||||
var mrfUploads []healSource
|
var mrfOperations []healSource
|
||||||
s.mrfMU.Lock()
|
s.mrfMU.Lock()
|
||||||
for k, v := range s.mrfUploads {
|
for k, v := range s.mrfOperations {
|
||||||
if v == e.setIndex {
|
if v == e.setIndex {
|
||||||
mrfUploads = append(mrfUploads, k)
|
mrfOperations = append(mrfOperations, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mrfMU.Unlock()
|
s.mrfMU.Unlock()
|
||||||
|
|
||||||
// Heal objects
|
// Heal objects
|
||||||
for _, u := range mrfUploads {
|
for _, u := range mrfOperations {
|
||||||
// Send an object to be healed with a timeout
|
// Send an object to be healed with a timeout
|
||||||
select {
|
select {
|
||||||
case bgSeq.sourceCh <- u:
|
case bgSeq.sourceCh <- u:
|
||||||
@ -1674,7 +1675,7 @@ func (s *erasureSets) healMRFRoutine() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.mrfMU.Lock()
|
s.mrfMU.Lock()
|
||||||
delete(s.mrfUploads, u)
|
delete(s.mrfOperations, u)
|
||||||
s.mrfMU.Unlock()
|
s.mrfMU.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2018,18 +2018,27 @@ func (z *erasureZones) PutObjectTags(ctx context.Context, bucket, object string,
|
|||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.zones[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
return z.zones[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.zones {
|
for _, zone := range z.zones {
|
||||||
err := zone.PutObjectTags(ctx, bucket, object, tags, opts)
|
err := zone.PutObjectTags(ctx, bucket, object, tags, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrBucketNotFound(err) {
|
if isErrObjectNotFound(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return BucketNotFound{
|
if opts.VersionID != "" {
|
||||||
|
return VersionNotFound{
|
||||||
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
|
VersionID: opts.VersionID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ObjectNotFound{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2041,15 +2050,23 @@ func (z *erasureZones) DeleteObjectTags(ctx context.Context, bucket, object stri
|
|||||||
for _, zone := range z.zones {
|
for _, zone := range z.zones {
|
||||||
err := zone.DeleteObjectTags(ctx, bucket, object, opts)
|
err := zone.DeleteObjectTags(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrBucketNotFound(err) {
|
if isErrObjectNotFound(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return BucketNotFound{
|
if opts.VersionID != "" {
|
||||||
|
return VersionNotFound{
|
||||||
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
|
VersionID: opts.VersionID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ObjectNotFound{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2061,14 +2078,22 @@ func (z *erasureZones) GetObjectTags(ctx context.Context, bucket, object string,
|
|||||||
for _, zone := range z.zones {
|
for _, zone := range z.zones {
|
||||||
tags, err := zone.GetObjectTags(ctx, bucket, object, opts)
|
tags, err := zone.GetObjectTags(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrBucketNotFound(err) {
|
if isErrObjectNotFound(err) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return tags, err
|
return tags, err
|
||||||
}
|
}
|
||||||
return tags, nil
|
return tags, nil
|
||||||
}
|
}
|
||||||
return nil, BucketNotFound{
|
if opts.VersionID != "" {
|
||||||
|
return nil, VersionNotFound{
|
||||||
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
|
VersionID: opts.VersionID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, ObjectNotFound{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,11 +34,12 @@ import (
|
|||||||
// OfflineDisk represents an unavailable disk.
|
// OfflineDisk represents an unavailable disk.
|
||||||
var OfflineDisk StorageAPI // zero value is nil
|
var OfflineDisk StorageAPI // zero value is nil
|
||||||
|
|
||||||
// partialUpload is a successful upload of an object
|
// partialOperation is a successful upload/delete of an object
|
||||||
// but not written in all disks (having quorum)
|
// but not written in all disks (having quorum)
|
||||||
type partialUpload struct {
|
type partialOperation struct {
|
||||||
bucket string
|
bucket string
|
||||||
object string
|
object string
|
||||||
|
versionID string
|
||||||
failedSet int
|
failedSet int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,7 +63,7 @@ type erasureObjects struct {
|
|||||||
// Byte pools used for temporary i/o buffers.
|
// Byte pools used for temporary i/o buffers.
|
||||||
bp *bpool.BytePoolCap
|
bp *bpool.BytePoolCap
|
||||||
|
|
||||||
mrfUploadCh chan partialUpload
|
mrfOpCh chan partialOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||||
@ -87,21 +88,17 @@ func (d byDiskTotal) Less(i, j int) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getDisksInfo - fetch disks info across all other storage API.
|
// getDisksInfo - fetch disks info across all other storage API.
|
||||||
func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
|
func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []DiskInfo, errs []error, onlineDisks, offlineDisks madmin.BackendDisks) {
|
||||||
disksInfo = make([]DiskInfo, len(disks))
|
disksInfo = make([]DiskInfo, len(disks))
|
||||||
onlineDisks = make(madmin.BackendDisks)
|
onlineDisks = make(madmin.BackendDisks)
|
||||||
offlineDisks = make(madmin.BackendDisks)
|
offlineDisks = make(madmin.BackendDisks)
|
||||||
|
|
||||||
for _, disk := range disks {
|
for _, ep := range endpoints {
|
||||||
if disk == OfflineDisk {
|
if _, ok := offlineDisks[ep]; !ok {
|
||||||
continue
|
offlineDisks[ep] = 0
|
||||||
}
|
}
|
||||||
peerAddr := disk.Hostname()
|
if _, ok := onlineDisks[ep]; !ok {
|
||||||
if _, ok := offlineDisks[peerAddr]; !ok {
|
onlineDisks[ep] = 0
|
||||||
offlineDisks[peerAddr] = 0
|
|
||||||
}
|
|
||||||
if _, ok := onlineDisks[peerAddr]; !ok {
|
|
||||||
onlineDisks[peerAddr] = 0
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,36 +127,12 @@ func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []
|
|||||||
errs = g.Wait()
|
errs = g.Wait()
|
||||||
// Wait for the routines.
|
// Wait for the routines.
|
||||||
for i, diskInfoErr := range errs {
|
for i, diskInfoErr := range errs {
|
||||||
if disks[i] == OfflineDisk {
|
ep := endpoints[i]
|
||||||
continue
|
|
||||||
}
|
|
||||||
if diskInfoErr != nil {
|
if diskInfoErr != nil {
|
||||||
offlineDisks[disks[i].Hostname()]++
|
offlineDisks[ep]++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
onlineDisks[disks[i].Hostname()]++
|
onlineDisks[ep]++
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over the passed endpoints arguments and check
|
|
||||||
// if there are still disks missing from the offline/online lists
|
|
||||||
// and update them accordingly.
|
|
||||||
missingOfflineDisks := make(map[string]int)
|
|
||||||
for _, zone := range globalEndpoints {
|
|
||||||
for _, endpoint := range zone.Endpoints {
|
|
||||||
// if local is set and endpoint is not local
|
|
||||||
// we are not interested in remote disks.
|
|
||||||
if local && !endpoint.IsLocal {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok := offlineDisks[endpoint.Host]; !ok {
|
|
||||||
missingOfflineDisks[endpoint.Host]++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for missingDisk, n := range missingOfflineDisks {
|
|
||||||
onlineDisks[missingDisk] = 0
|
|
||||||
offlineDisks[missingDisk] = n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success.
|
// Success.
|
||||||
@ -167,8 +140,8 @@ func getDisksInfo(disks []StorageAPI, local bool) (disksInfo []DiskInfo, errs []
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get an aggregated storage info across all disks.
|
// Get an aggregated storage info across all disks.
|
||||||
func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) {
|
func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []error) {
|
||||||
disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, local)
|
disksInfo, errs, onlineDisks, offlineDisks := getDisksInfo(disks, endpoints)
|
||||||
|
|
||||||
// Sort so that the first element is the smallest.
|
// Sort so that the first element is the smallest.
|
||||||
sort.Sort(byDiskTotal(disksInfo))
|
sort.Sort(byDiskTotal(disksInfo))
|
||||||
@ -203,19 +176,23 @@ func getStorageInfo(disks []StorageAPI, local bool) (StorageInfo, []error) {
|
|||||||
// StorageInfo - returns underlying storage statistics.
|
// StorageInfo - returns underlying storage statistics.
|
||||||
func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
|
func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
endpoints := er.getEndpoints()
|
||||||
if local {
|
if local {
|
||||||
var localDisks []StorageAPI
|
var localDisks []StorageAPI
|
||||||
for _, disk := range disks {
|
var localEndpoints []string
|
||||||
|
for i, disk := range disks {
|
||||||
if disk != nil {
|
if disk != nil {
|
||||||
if disk.IsLocal() {
|
if disk.IsLocal() {
|
||||||
// Append this local disk since local flag is true
|
// Append this local disk since local flag is true
|
||||||
localDisks = append(localDisks, disk)
|
localDisks = append(localDisks, disk)
|
||||||
|
localEndpoints = append(localEndpoints, endpoints[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
disks = localDisks
|
disks = localDisks
|
||||||
|
endpoints = localEndpoints
|
||||||
}
|
}
|
||||||
return getStorageInfo(disks, local)
|
return getStorageInfo(disks, endpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMetrics - is not implemented and shouldn't be called.
|
// GetMetrics - is not implemented and shouldn't be called.
|
||||||
|
@ -1552,9 +1552,5 @@ func (fs *FSObjects) IsReady(_ context.Context) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
globalObjLayerMutex.RLock()
|
return newObjectLayerFn() != nil
|
||||||
res := globalObjectAPI != nil && !globalSafeMode
|
|
||||||
globalObjLayerMutex.RUnlock()
|
|
||||||
|
|
||||||
return res
|
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,7 @@ var leaderLockTimeout = newDynamicTimeout(time.Minute, time.Minute)
|
|||||||
|
|
||||||
// NewBgHealSequence creates a background healing sequence
|
// NewBgHealSequence creates a background healing sequence
|
||||||
// operation which crawls all objects and heal them.
|
// operation which crawls all objects and heal them.
|
||||||
func newBgHealSequence(numDisks int) *healSequence {
|
func newBgHealSequence() *healSequence {
|
||||||
|
|
||||||
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
|
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
|
||||||
ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo))
|
ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo))
|
||||||
@ -54,7 +54,6 @@ func newBgHealSequence(numDisks int) *healSequence {
|
|||||||
currentStatus: healSequenceStatus{
|
currentStatus: healSequenceStatus{
|
||||||
Summary: healNotStartedStatus,
|
Summary: healNotStartedStatus,
|
||||||
HealSettings: hs,
|
HealSettings: hs,
|
||||||
NumDisks: numDisks,
|
|
||||||
},
|
},
|
||||||
cancelCtx: cancelCtx,
|
cancelCtx: cancelCtx,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
@ -207,6 +207,12 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
|
|||||||
}
|
}
|
||||||
}(txnLk)
|
}(txnLk)
|
||||||
|
|
||||||
|
// Enable healing to heal drives if possible
|
||||||
|
if globalIsErasure {
|
||||||
|
initBackgroundHealing(ctx, newObject)
|
||||||
|
initLocalDisksAutoHeal(ctx, newObject)
|
||||||
|
}
|
||||||
|
|
||||||
// **** WARNING ****
|
// **** WARNING ****
|
||||||
// Migrating to encrypted backend should happen before initialization of any
|
// Migrating to encrypted backend should happen before initialization of any
|
||||||
// sub-systems, make sure that we do not move the above codeblock elsewhere.
|
// sub-systems, make sure that we do not move the above codeblock elsewhere.
|
||||||
@ -422,9 +428,9 @@ func serverMain(ctx *cli.Context) {
|
|||||||
setMaxResources()
|
setMaxResources()
|
||||||
|
|
||||||
if globalIsErasure {
|
if globalIsErasure {
|
||||||
// Init global heal state
|
// New global heal state
|
||||||
globalAllHealState = initHealState()
|
globalAllHealState = newHealState()
|
||||||
globalBackgroundHealState = initHealState()
|
globalBackgroundHealState = newHealState()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure server.
|
// Configure server.
|
||||||
@ -504,12 +510,6 @@ func serverMain(ctx *cli.Context) {
|
|||||||
|
|
||||||
newAllSubsystems()
|
newAllSubsystems()
|
||||||
|
|
||||||
// Enable healing to heal drives if possible
|
|
||||||
if globalIsErasure {
|
|
||||||
initBackgroundHealing(GlobalContext, newObject)
|
|
||||||
initLocalDisksAutoHeal(GlobalContext, newObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
go startBackgroundOps(GlobalContext, newObject)
|
go startBackgroundOps(GlobalContext, newObject)
|
||||||
|
|
||||||
logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode")
|
logger.FatalIf(initSafeMode(GlobalContext, newObject), "Unable to initialize server switching into safe-mode")
|
||||||
|
@ -407,7 +407,7 @@ func resetGlobalIsErasure() {
|
|||||||
func resetGlobalHealState() {
|
func resetGlobalHealState() {
|
||||||
// Init global heal state
|
// Init global heal state
|
||||||
if globalAllHealState == nil {
|
if globalAllHealState == nil {
|
||||||
globalAllHealState = initHealState()
|
globalAllHealState = newHealState()
|
||||||
} else {
|
} else {
|
||||||
globalAllHealState.Lock()
|
globalAllHealState.Lock()
|
||||||
for _, v := range globalAllHealState.healSeqMap {
|
for _, v := range globalAllHealState.healSeqMap {
|
||||||
@ -420,7 +420,7 @@ func resetGlobalHealState() {
|
|||||||
|
|
||||||
// Init background heal state
|
// Init background heal state
|
||||||
if globalBackgroundHealState == nil {
|
if globalBackgroundHealState == nil {
|
||||||
globalBackgroundHealState = initHealState()
|
globalBackgroundHealState = newHealState()
|
||||||
} else {
|
} else {
|
||||||
globalBackgroundHealState.Lock()
|
globalBackgroundHealState.Lock()
|
||||||
for _, v := range globalBackgroundHealState.healSeqMap {
|
for _, v := range globalBackgroundHealState.healSeqMap {
|
||||||
|
@ -77,7 +77,6 @@ type HealTaskStatus struct {
|
|||||||
FailureDetail string `json:"detail"`
|
FailureDetail string `json:"detail"`
|
||||||
StartTime time.Time `json:"startTime"`
|
StartTime time.Time `json:"startTime"`
|
||||||
HealSettings HealOpts `json:"settings"`
|
HealSettings HealOpts `json:"settings"`
|
||||||
NumDisks int `json:"numDisks"`
|
|
||||||
|
|
||||||
Items []HealResultItem `json:"items,omitempty"`
|
Items []HealResultItem `json:"items,omitempty"`
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user