Enhancements to daily-sweeper routine to reduce CPU load (#8209)

- ListObjectsHeal should list only objects
  which need healing, not the entire namespace.
- DeleteObjects() to be used to delete 1000s of
  objects in bulk instead of serially.
This commit is contained in:
Harshavardhana 2019-09-10 12:08:44 -07:00 committed by kannappanr
parent 432cb38dbd
commit e12f52e2c6
3 changed files with 29 additions and 31 deletions

View File

@ -139,22 +139,24 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
if err != nil {
continue
}
var objects []string
for _, obj := range res.Objects {
// Find the action that need to be executed
action := l.ComputeAction(obj.Name, obj.ModTime)
switch action {
case lifecycle.DeleteAction:
objAPI.DeleteObject(ctx, bucket.Name, obj.Name)
objects = append(objects, obj.Name)
default:
// Nothing
// Do nothing, for now.
}
}
// Deletes a list of objects.
objAPI.DeleteObjects(ctx, bucket.Name, objects)
if !res.IsTruncated {
// We are done here, proceed to next bucket.
break
} else {
marker = res.NextMarker
}
marker = res.NextMarker
}
}

View File

@ -91,6 +91,7 @@ func sweepRound(ctx context.Context, objAPI ObjectLayer) error {
if err != nil {
continue
}
for _, obj := range res.Objects {
for _, l := range copyDailySweepListeners() {
l <- pathJoin(bucket.Name, obj.Name)
@ -133,7 +134,6 @@ func dailySweeper() {
// Start with random sleep time, so as to avoid "synchronous checks" between servers
time.Sleep(time.Duration(rand.Float64() * float64(time.Hour)))
// Perform a sweep round each month
for {
if time.Since(lastSweepTime) < 30*24*time.Hour {
time.Sleep(time.Hour)
@ -147,13 +147,12 @@ func dailySweeper() {
// instance doing the sweep round
case OperationTimedOut:
lastSweepTime = time.Now()
default:
continue
}
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
} else {
lastSweepTime = time.Now()
}
}
}

View File

@ -823,7 +823,7 @@ func (f *FileInfoCh) Push(fi FileInfo) {
// Calculate least entry across multiple FileInfo channels, additionally
// returns a boolean to indicate if the caller needs to call again.
func leastEntry(entriesCh []FileInfoCh, readQuorum int) (FileInfo, bool) {
func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, bool) {
var entriesValid = make([]bool, len(entriesCh))
var entries = make([]FileInfo, len(entriesCh))
for i := range entriesCh {
@ -879,27 +879,30 @@ func leastEntry(entriesCh []FileInfoCh, readQuorum int) (FileInfo, bool) {
entriesCh[i].Push(entries[i])
}
if readQuorum < 0 {
return lentry, isTruncated
}
quorum := lentry.Quorum
if quorum == 0 {
quorum = readQuorum
quorum = totalDrives / 2
}
if heal {
// When healing is enabled, we should
// list only objects which need healing.
if leastEntryCount != totalDrives {
return lentry, isTruncated
}
} else {
if leastEntryCount >= quorum {
return lentry, isTruncated
}
return leastEntry(entriesCh, readQuorum)
}
return leastEntry(entriesCh, totalDrives, heal)
}
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeEntriesCh(entriesCh []FileInfoCh, maxKeys int, readQuorum int) (entries FilesInfo) {
func mergeEntriesCh(entriesCh []FileInfoCh, maxKeys int, totalDrives int, heal bool) (entries FilesInfo) {
var i = 0
for {
fi, valid := leastEntry(entriesCh, readQuorum)
fi, valid := leastEntry(entriesCh, totalDrives, heal)
if !valid {
break
}
@ -948,7 +951,6 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
recursive := true
entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", recursive, endWalkCh)
readQuorum := s.drivesPerSet / 2
var objInfos []ObjectInfo
var eof bool
var prevPrefix string
@ -957,7 +959,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
if len(objInfos) == maxKeys {
break
}
result, ok := leastEntry(entryChs, readQuorum)
result, ok := leastEntry(entryChs, s.drivesPerSet, false)
if !ok {
eof = true
break
@ -1087,12 +1089,7 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh)
}
readQuorum := s.drivesPerSet / 2
if heal {
readQuorum = -1
}
entries := mergeEntriesCh(entryChs, maxKeys, readQuorum)
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet, heal)
if len(entries.Files) == 0 {
return loi, nil
}