simplify usage of mutexes and atomic constants (#9501)

This commit is contained in:
Harshavardhana
2020-05-03 22:35:40 -07:00
committed by GitHub
parent fbd15cb7b7
commit 27d716c663
18 changed files with 419 additions and 305 deletions

View File

@@ -60,9 +60,8 @@ const (
)
var (
errHealIdleTimeout = fmt.Errorf("healing results were not consumed for too long")
errHealPushStopNDiscard = fmt.Errorf("heal push stopped due to heal stop signal")
errHealStopSignalled = fmt.Errorf("heal stop signaled")
errHealIdleTimeout = fmt.Errorf("healing results were not consumed for too long")
errHealStopSignalled = fmt.Errorf("heal stop signaled")
errFnHealFromAPIErr = func(ctx context.Context, err error) error {
apiErr := toAPIError(ctx, err)
@@ -73,10 +72,6 @@ var (
// healSequenceStatus - accumulated status of the heal sequence
type healSequenceStatus struct {
// lock to update this structure as it is concurrently
// accessed
updateLock *sync.RWMutex
// summary and detail for failures
Summary healStatusSummary `json:"Summary"`
FailureDetail string `json:"Detail,omitempty"`
@@ -114,11 +109,9 @@ func initHealState() *allHealState {
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
// Launch clean-up routine to remove this heal sequence (after
// it ends) from the global state after timeout has elapsed.
ticker := time.NewTicker(time.Minute * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-time.After(time.Minute * 5):
now := UTCNow()
ahs.Lock()
for path, h := range ahs.healSeqMap {
@@ -202,9 +195,7 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) (
existsAndLive := false
he, exists := ahs.getHealSequence(h.path)
if exists {
if !he.hasEnded() || len(he.currentStatus.Items) > 0 {
existsAndLive = true
}
existsAndLive = !he.hasEnded()
}
if existsAndLive {
@@ -277,8 +268,8 @@ func (ahs *allHealState) PopHealStatusJSON(path string,
}
// Take lock to access and update the heal-sequence
h.currentStatus.updateLock.Lock()
defer h.currentStatus.updateLock.Unlock()
h.mutex.Lock()
defer h.mutex.Unlock()
numItems := len(h.currentStatus.Items)
@@ -289,20 +280,18 @@ func (ahs *allHealState) PopHealStatusJSON(path string,
lastResultIndex = h.currentStatus.Items[numItems-1].ResultIndex
}
// After sending status to client, and before relinquishing
// the updateLock, reset Item to nil and record the result
// index sent to the client.
defer func(i int64) {
h.lastSentResultIndex = i
h.currentStatus.Items = nil
}(lastResultIndex)
h.lastSentResultIndex = lastResultIndex
jbytes, err := json.Marshal(h.currentStatus)
if err != nil {
h.currentStatus.Items = nil
logger.LogIf(h.ctx, err)
return nil, ErrInternalError
}
h.currentStatus.Items = nil
return jbytes, ErrNone
}
@@ -352,9 +341,8 @@ type healSequence struct {
// completed
traverseAndHealDoneCh chan error
// channel to signal heal sequence to stop (e.g. from the
// heal-stop API)
stopSignalCh chan struct{}
// canceler to cancel heal sequence.
cancelCtx context.CancelFunc
// the last result index sent to client
lastSentResultIndex int64
@@ -380,12 +368,12 @@ type healSequence struct {
// NewHealSequence - creates healSettings, assumes bucket and
// objPrefix are already validated.
func newHealSequence(bucket, objPrefix, clientAddr string,
func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
numDisks int, hs madmin.HealOpts, forceStart bool) *healSequence {
reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket}
reqInfo.AppendTags("prefix", objPrefix)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo))
return &healSequence{
respCh: make(chan healResult),
@@ -402,10 +390,9 @@ func newHealSequence(bucket, objPrefix, clientAddr string,
Summary: healNotStartedStatus,
HealSettings: hs,
NumDisks: numDisks,
updateLock: &sync.RWMutex{},
},
traverseAndHealDoneCh: make(chan error),
stopSignalCh: make(chan struct{}),
cancelCtx: cancel,
ctx: ctx,
scannedItemsMap: make(map[madmin.HealItemType]int64),
healedItemsMap: make(map[madmin.HealItemType]int64),
@@ -488,7 +475,7 @@ func (h *healSequence) gethealFailedItemsMap() map[string]int64 {
// external signal)
func (h *healSequence) isQuitting() bool {
select {
case <-h.stopSignalCh:
case <-h.ctx.Done():
return true
default:
return false
@@ -497,19 +484,15 @@ func (h *healSequence) isQuitting() bool {
// check if the heal sequence has ended
func (h *healSequence) hasEnded() bool {
h.currentStatus.updateLock.RLock()
summary := h.currentStatus.Summary
h.currentStatus.updateLock.RUnlock()
return summary == healStoppedStatus || summary == healFinishedStatus
h.mutex.RLock()
ended := len(h.currentStatus.Items) == 0 || h.currentStatus.Summary == healStoppedStatus || h.currentStatus.Summary == healFinishedStatus
h.mutex.RUnlock()
return ended
}
// stops the heal sequence - safe to call multiple times.
func (h *healSequence) stop() {
select {
case <-h.stopSignalCh:
default:
close(h.stopSignalCh)
}
h.cancelCtx()
}
// pushHealResultItem - pushes a heal result item for consumption in
@@ -536,29 +519,27 @@ func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
var itemsLen int
for {
h.currentStatus.updateLock.Lock()
h.mutex.Lock()
itemsLen = len(h.currentStatus.Items)
if itemsLen == maxUnconsumedHealResultItems {
// unlock and wait to check again if we can push
h.currentStatus.updateLock.Unlock()
// wait for a second, or quit if an external
// stop signal is received or the
// unconsumedTimer fires.
select {
// Check after a second
case <-time.After(time.Second):
h.mutex.Unlock()
continue
case <-h.stopSignalCh:
case <-h.ctx.Done():
h.mutex.Unlock()
// discard result and return.
return errHealPushStopNDiscard
return errHealStopSignalled
// Timeout if no results consumed for too
// long.
// Timeout if no results consumed for too long.
case <-unconsumedTimer.C:
h.mutex.Unlock()
return errHealIdleTimeout
}
}
break
@@ -575,13 +556,7 @@ func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
h.currentStatus.Items = append(h.currentStatus.Items, r)
// release lock
h.currentStatus.updateLock.Unlock()
// This is a "safe" point for the heal sequence to quit if
// signaled externally.
if h.isQuitting() {
return errHealStopSignalled
}
h.mutex.Unlock()
return nil
}
@@ -595,10 +570,10 @@ func (h *healSequence) pushHealResultItem(r madmin.HealResultItem) error {
// the heal-sequence.
func (h *healSequence) healSequenceStart() {
// Set status as running
h.currentStatus.updateLock.Lock()
h.mutex.Lock()
h.currentStatus.Summary = healRunningStatus
h.currentStatus.StartTime = UTCNow()
h.currentStatus.updateLock.Unlock()
h.mutex.Unlock()
if h.sourceCh == nil {
go h.traverseAndHeal()
@@ -608,25 +583,27 @@ func (h *healSequence) healSequenceStart() {
select {
case err, ok := <-h.traverseAndHealDoneCh:
if !ok {
return
}
h.mutex.Lock()
h.endTime = UTCNow()
h.currentStatus.updateLock.Lock()
defer h.currentStatus.updateLock.Unlock()
// Heal traversal is complete.
if ok {
if err == nil {
// heal traversal succeeded.
h.currentStatus.Summary = healFinishedStatus
} else {
// heal traversal had an error.
h.currentStatus.Summary = healStoppedStatus
h.currentStatus.FailureDetail = err.Error()
} else {
// heal traversal succeeded.
h.currentStatus.Summary = healFinishedStatus
}
case <-h.stopSignalCh:
h.mutex.Unlock()
case <-h.ctx.Done():
h.mutex.Lock()
h.endTime = UTCNow()
h.currentStatus.updateLock.Lock()
h.currentStatus.Summary = healStoppedStatus
h.currentStatus.FailureDetail = errHealStopSignalled.Error()
h.currentStatus.updateLock.Unlock()
h.mutex.Unlock()
// drain traverse channel so the traversal
// go-routine does not leak.
@@ -654,13 +631,20 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
select {
case res := <-h.respCh:
if !h.reportProgress {
// Object might have been deleted, by the time heal
// was attempted, we should ignore this object and
// return success.
if isErrObjectNotFound(res.err) {
return nil
}
h.mutex.Lock()
defer h.mutex.Unlock()
// Progress is not reported in case of background heal processing.
// Instead we increment relevant counter based on the heal result
// for prometheus reporting.
if res.err != nil && !isErrObjectNotFound(res.err) {
if res.err != nil {
for _, d := range res.result.After.Drives {
// For failed items we report the endpoint and drive state
// This will help users take corrective actions for drives
@@ -670,7 +654,9 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
// Only object type reported for successful healing
h.healedItemsMap[res.result.Type]++
}
return nil
// Report caller of any failure
return res.err
}
res.result.Type = healType
if res.err != nil {
@@ -688,10 +674,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
return h.pushHealResultItem(res.result)
case <-h.ctx.Done():
return nil
case <-h.traverseAndHealDoneCh:
return nil
}
}
func (h *healSequence) healItemsFromSourceCh() error {
@@ -702,7 +685,10 @@ func (h *healSequence) healItemsFromSourceCh() error {
for {
select {
case source := <-h.sourceCh:
case source, ok := <-h.sourceCh:
if !ok {
return nil
}
var itemType madmin.HealItemType
switch {
case source.path == nopHeal:
@@ -721,8 +707,6 @@ func (h *healSequence) healItemsFromSourceCh() error {
h.scannedItemsMap[itemType]++
h.lastHealActivity = UTCNow()
case <-h.traverseAndHealDoneCh:
return nil
case <-h.ctx.Done():
return nil
}
@@ -763,13 +747,7 @@ func (h *healSequence) healItems(bucketsOnly bool) error {
// two objects.
func (h *healSequence) traverseAndHeal() {
bucketsOnly := false // Heals buckets and objects also.
if err := h.healItems(bucketsOnly); err != nil {
if h.isQuitting() {
err = errHealStopSignalled
}
h.traverseAndHealDoneCh <- err
}
h.traverseAndHealDoneCh <- h.healItems(bucketsOnly)
close(h.traverseAndHealDoneCh)
}