heal: Persist MRF queue in the disk during shutdown (#19410)

This commit is contained in:
Anis Eleuch 2024-08-13 23:26:05 +01:00 committed by GitHub
parent e7a56f35b9
commit 51b1f41518
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 649 additions and 84 deletions

View File

@ -358,6 +358,9 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...)
go monitorLocalDisksAndHeal(ctx, z) go monitorLocalDisksAndHeal(ctx, z)
} }
go globalMRFState.startMRFPersistence()
go globalMRFState.healRoutine(z)
} }
func getLocalDisksToHeal() (disksToHeal Endpoints) { func getLocalDisksToHeal() (disksToHeal Endpoints) {

View File

@ -1409,13 +1409,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
} }
if !opts.Speedtest && len(versions) > 0 { if !opts.Speedtest && len(versions) > 0 {
globalMRFState.addPartialOp(partialOperation{ globalMRFState.addPartialOp(PartialOperation{
bucket: bucket, Bucket: bucket,
object: object, Object: object,
queued: time.Now(), Queued: time.Now(),
versions: versions, Versions: versions,
setIndex: er.setIndex, SetIndex: er.setIndex,
poolIndex: er.poolIndex, PoolIndex: er.poolIndex,
}) })
} }

View File

@ -395,24 +395,16 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
// that we have some parts or data blocks missing or corrupted // that we have some parts or data blocks missing or corrupted
// - attempt a heal to successfully heal them for future calls. // - attempt a heal to successfully heal them for future calls.
if written == partLength { if written == partLength {
var scan madmin.HealScanMode if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) {
switch {
case errors.Is(err, errFileNotFound):
scan = madmin.HealNormalScan
case errors.Is(err, errFileCorrupt):
scan = madmin.HealDeepScan
}
switch scan {
case madmin.HealNormalScan, madmin.HealDeepScan:
healOnce.Do(func() { healOnce.Do(func() {
globalMRFState.addPartialOp(partialOperation{ globalMRFState.addPartialOp(PartialOperation{
bucket: bucket, Bucket: bucket,
object: object, Object: object,
versionID: fi.VersionID, VersionID: fi.VersionID,
queued: time.Now(), Queued: time.Now(),
setIndex: er.setIndex, SetIndex: er.setIndex,
poolIndex: er.poolIndex, PoolIndex: er.poolIndex,
scanMode: scan, BitrotScan: errors.Is(err, errFileCorrupt),
}) })
}) })
// Healing is triggered and we have written // Healing is triggered and we have written
@ -814,13 +806,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
// additionally do not heal delete markers inline, let them be // additionally do not heal delete markers inline, let them be
// healed upon regular heal process. // healed upon regular heal process.
if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks { if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks {
globalMRFState.addPartialOp(partialOperation{ globalMRFState.addPartialOp(PartialOperation{
bucket: fi.Volume, Bucket: fi.Volume,
object: fi.Name, Object: fi.Name,
versionID: fi.VersionID, VersionID: fi.VersionID,
queued: time.Now(), Queued: time.Now(),
setIndex: er.setIndex, SetIndex: er.setIndex,
poolIndex: er.poolIndex, PoolIndex: er.poolIndex,
}) })
} }
@ -1572,13 +1564,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
break break
} }
} else { } else {
globalMRFState.addPartialOp(partialOperation{ globalMRFState.addPartialOp(PartialOperation{
bucket: bucket, Bucket: bucket,
object: object, Object: object,
queued: time.Now(), Queued: time.Now(),
versions: versions, Versions: versions,
setIndex: er.setIndex, SetIndex: er.setIndex,
poolIndex: er.poolIndex, PoolIndex: er.poolIndex,
}) })
} }
} }
@ -2107,11 +2099,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
// Send the successful but partial upload/delete, however ignore // Send the successful but partial upload/delete, however ignore
// if the channel is blocked by other items. // if the channel is blocked by other items.
func (er erasureObjects) addPartial(bucket, object, versionID string) { func (er erasureObjects) addPartial(bucket, object, versionID string) {
globalMRFState.addPartialOp(partialOperation{ globalMRFState.addPartialOp(PartialOperation{
bucket: bucket, Bucket: bucket,
object: object, Object: object,
versionID: versionID, VersionID: versionID,
queued: time.Now(), Queued: time.Now(),
}) })
} }

View File

@ -187,19 +187,12 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
globalLeaderLock = newSharedLock(GlobalContext, z, "leader.lock") globalLeaderLock = newSharedLock(GlobalContext, z, "leader.lock")
}) })
// Enable background operations on // Start self healing after the object initialization
// // so various tasks will be useful
// - Disk auto healing
// - MRF (most recently failed) healing
// - Background expiration routine for lifecycle policies
bootstrapTrace("initAutoHeal", func() { bootstrapTrace("initAutoHeal", func() {
initAutoHeal(GlobalContext, z) initAutoHeal(GlobalContext, z)
}) })
bootstrapTrace("initHealMRF", func() {
go globalMRFState.healRoutine(z)
})
// initialize the object layer. // initialize the object layer.
defer setObjectLayer(z) defer setObjectLayer(z)

View File

@ -384,9 +384,7 @@ var (
globalBackgroundHealRoutine = newHealRoutine() globalBackgroundHealRoutine = newHealRoutine()
globalBackgroundHealState = newHealState(GlobalContext, false) globalBackgroundHealState = newHealState(GlobalContext, false)
globalMRFState = mrfState{ globalMRFState = newMRFState()
opCh: make(chan partialOperation, mrfOpsQueueSize),
}
// If writes to FS backend should be O_SYNC. // If writes to FS backend should be O_SYNC.
globalFSOSync bool globalFSOSync bool

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc. // Copyright (c) 2015-2024 MinIO, Inc.
// //
// This file is part of MinIO Object Storage stack // This file is part of MinIO Object Storage stack
// //
@ -15,51 +15,203 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
//go:generate msgp -file=$GOFILE
package cmd package cmd
import ( import (
"context" "context"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
"github.com/minio/pkg/v3/wildcard" "github.com/minio/pkg/v3/wildcard"
"github.com/tinylib/msgp/msgp"
) )
const ( const (
mrfOpsQueueSize = 100000 mrfOpsQueueSize = 100000
) )
// partialOperation is a successful upload/delete of an object const (
healDir = ".heal"
healMRFDir = bucketMetaPrefix + SlashSeparator + healDir + SlashSeparator + "mrf"
healMRFMetaFormat = 1
healMRFMetaVersionV1 = 1
)
// PartialOperation is a successful upload/delete of an object
// but not written in all disks (having quorum) // but not written in all disks (having quorum)
type partialOperation struct { type PartialOperation struct {
bucket string Bucket string
object string Object string
versionID string VersionID string
versions []byte Versions []byte
setIndex, poolIndex int SetIndex, PoolIndex int
queued time.Time Queued time.Time
scanMode madmin.HealScanMode BitrotScan bool
} }
// mrfState sncapsulates all the information // mrfState sncapsulates all the information
// related to the global background MRF. // related to the global background MRF.
type mrfState struct { type mrfState struct {
opCh chan partialOperation opCh chan PartialOperation
closed int32
closing int32
wg sync.WaitGroup
}
func newMRFState() mrfState {
return mrfState{
opCh: make(chan PartialOperation, mrfOpsQueueSize),
}
} }
// Add a partial S3 operation (put/delete) when one or more disks are offline. // Add a partial S3 operation (put/delete) when one or more disks are offline.
func (m *mrfState) addPartialOp(op partialOperation) { func (m *mrfState) addPartialOp(op PartialOperation) {
if m == nil { if m == nil {
return return
} }
if atomic.LoadInt32(&m.closed) == 1 {
return
}
m.wg.Add(1)
defer m.wg.Done()
if atomic.LoadInt32(&m.closing) == 1 {
return
}
select { select {
case m.opCh <- op: case m.opCh <- op:
default: default:
} }
} }
// Do not accept new MRF operations anymore and start to save
// the current heal status in one available disk
func (m *mrfState) shutdown() {
atomic.StoreInt32(&m.closing, 1)
m.wg.Wait()
close(m.opCh)
atomic.StoreInt32(&m.closed, 1)
if len(m.opCh) > 0 {
healingLogEvent(context.Background(), "Saving MRF healing data (%d entries)", len(m.opCh))
}
newReader := func() io.ReadCloser {
r, w := io.Pipe()
go func() {
// Initialize MRF meta header.
var data [4]byte
binary.LittleEndian.PutUint16(data[0:2], healMRFMetaFormat)
binary.LittleEndian.PutUint16(data[2:4], healMRFMetaVersionV1)
mw := msgp.NewWriter(w)
n, err := mw.Write(data[:])
if err != nil {
w.CloseWithError(err)
return
}
if n != len(data) {
w.CloseWithError(io.ErrShortWrite)
return
}
for item := range m.opCh {
err = item.EncodeMsg(mw)
if err != nil {
break
}
}
mw.Flush()
w.CloseWithError(err)
}()
return r
}
globalLocalDrivesMu.RLock()
localDrives := cloneDrives(globalLocalDrivesMap)
globalLocalDrivesMu.RUnlock()
for _, localDrive := range localDrives {
r := newReader()
err := localDrive.CreateFile(context.Background(), "", minioMetaBucket, pathJoin(healMRFDir, "list.bin"), -1, r)
r.Close()
if err == nil {
break
}
}
}
func (m *mrfState) startMRFPersistence() {
loadMRF := func(rc io.ReadCloser, opCh chan PartialOperation) error {
defer rc.Close()
var data [4]byte
n, err := rc.Read(data[:])
if err != nil {
return err
}
if n != len(data) {
return errors.New("heal mrf: no data")
}
// Read resync meta header
switch binary.LittleEndian.Uint16(data[0:2]) {
case healMRFMetaFormat:
default:
return fmt.Errorf("heal mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
}
switch binary.LittleEndian.Uint16(data[2:4]) {
case healMRFMetaVersionV1:
default:
return fmt.Errorf("heal mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
}
mr := msgp.NewReader(rc)
for {
op := PartialOperation{}
err = op.DecodeMsg(mr)
if err != nil {
break
}
opCh <- op
}
return nil
}
globalLocalDrivesMu.RLock()
localDrives := cloneDrives(globalLocalDrivesMap)
globalLocalDrivesMu.RUnlock()
for _, localDrive := range localDrives {
if localDrive == nil {
continue
}
rc, err := localDrive.ReadFileStream(context.Background(), minioMetaBucket, pathJoin(healMRFDir, "list.bin"), 0, -1)
if err != nil {
continue
}
err = loadMRF(rc, m.opCh)
if err != nil {
continue
}
// finally delete the file after processing mrf entries
localDrive.Delete(GlobalContext, minioMetaBucket, pathJoin(healMRFDir, "list.bin"), DeleteOptions{})
break
}
return
}
var healSleeper = newDynamicSleeper(5, time.Second, false) var healSleeper = newDynamicSleeper(5, time.Second, false)
// healRoutine listens to new disks reconnection events and // healRoutine listens to new disks reconnection events and
@ -78,24 +230,24 @@ func (m *mrfState) healRoutine(z *erasureServerPools) {
// We might land at .metacache, .trash, .multipart // We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket // no need to heal them skip, only when bucket
// is '.minio.sys' // is '.minio.sys'
if u.bucket == minioMetaBucket { if u.Bucket == minioMetaBucket {
// No MRF needed for temporary objects // No MRF needed for temporary objects
if wildcard.Match("buckets/*/.metacache/*", u.object) { if wildcard.Match("buckets/*/.metacache/*", u.Object) {
continue continue
} }
if wildcard.Match("tmp/*", u.object) { if wildcard.Match("tmp/*", u.Object) {
continue continue
} }
if wildcard.Match("multipart/*", u.object) { if wildcard.Match("multipart/*", u.Object) {
continue continue
} }
if wildcard.Match("tmp-old/*", u.object) { if wildcard.Match("tmp-old/*", u.Object) {
continue continue
} }
} }
now := time.Now() now := time.Now()
if now.Sub(u.queued) < time.Second { if now.Sub(u.Queued) < time.Second {
// let recently failed networks to reconnect // let recently failed networks to reconnect
// making MRF wait for 1s before retrying, // making MRF wait for 1s before retrying,
// i.e 4 reconnect attempts. // i.e 4 reconnect attempts.
@ -106,21 +258,22 @@ func (m *mrfState) healRoutine(z *erasureServerPools) {
wait := healSleeper.Timer(context.Background()) wait := healSleeper.Timer(context.Background())
scan := madmin.HealNormalScan scan := madmin.HealNormalScan
if u.scanMode != 0 { if u.BitrotScan {
scan = u.scanMode scan = madmin.HealDeepScan
} }
if u.object == "" {
healBucket(u.bucket, scan) if u.Object == "" {
healBucket(u.Bucket, scan)
} else { } else {
if len(u.versions) > 0 { if len(u.Versions) > 0 {
vers := len(u.versions) / 16 vers := len(u.Versions) / 16
if vers > 0 { if vers > 0 {
for i := 0; i < vers; i++ { for i := 0; i < vers; i++ {
healObject(u.bucket, u.object, uuid.UUID(u.versions[16*i:]).String(), scan) healObject(u.Bucket, u.Object, uuid.UUID(u.Versions[16*i:]).String(), scan)
} }
} }
} else { } else {
healObject(u.bucket, u.object, u.versionID, scan) healObject(u.Bucket, u.Object, u.VersionID, scan)
} }
} }

285
cmd/mrf_gen.go Normal file
View File

@ -0,0 +1,285 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *PartialOperation) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Bucket":
z.Bucket, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
case "Object":
z.Object, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
case "VersionID":
z.VersionID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
case "Versions":
z.Versions, err = dc.ReadBytes(z.Versions)
if err != nil {
err = msgp.WrapError(err, "Versions")
return
}
case "SetIndex":
z.SetIndex, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "SetIndex")
return
}
case "PoolIndex":
z.PoolIndex, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "PoolIndex")
return
}
case "Queued":
z.Queued, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "Queued")
return
}
case "BitrotScan":
z.BitrotScan, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "BitrotScan")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *PartialOperation) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 8
// write "Bucket"
err = en.Append(0x88, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
if err != nil {
return
}
err = en.WriteString(z.Bucket)
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
// write "Object"
err = en.Append(0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74)
if err != nil {
return
}
err = en.WriteString(z.Object)
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
// write "VersionID"
err = en.Append(0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x44)
if err != nil {
return
}
err = en.WriteString(z.VersionID)
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
// write "Versions"
err = en.Append(0xa8, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73)
if err != nil {
return
}
err = en.WriteBytes(z.Versions)
if err != nil {
err = msgp.WrapError(err, "Versions")
return
}
// write "SetIndex"
err = en.Append(0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78)
if err != nil {
return
}
err = en.WriteInt(z.SetIndex)
if err != nil {
err = msgp.WrapError(err, "SetIndex")
return
}
// write "PoolIndex"
err = en.Append(0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
if err != nil {
return
}
err = en.WriteInt(z.PoolIndex)
if err != nil {
err = msgp.WrapError(err, "PoolIndex")
return
}
// write "Queued"
err = en.Append(0xa6, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteTime(z.Queued)
if err != nil {
err = msgp.WrapError(err, "Queued")
return
}
// write "BitrotScan"
err = en.Append(0xaa, 0x42, 0x69, 0x74, 0x72, 0x6f, 0x74, 0x53, 0x63, 0x61, 0x6e)
if err != nil {
return
}
err = en.WriteBool(z.BitrotScan)
if err != nil {
err = msgp.WrapError(err, "BitrotScan")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *PartialOperation) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 8
// string "Bucket"
o = append(o, 0x88, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
o = msgp.AppendString(o, z.Bucket)
// string "Object"
o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74)
o = msgp.AppendString(o, z.Object)
// string "VersionID"
o = append(o, 0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x44)
o = msgp.AppendString(o, z.VersionID)
// string "Versions"
o = append(o, 0xa8, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73)
o = msgp.AppendBytes(o, z.Versions)
// string "SetIndex"
o = append(o, 0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78)
o = msgp.AppendInt(o, z.SetIndex)
// string "PoolIndex"
o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
o = msgp.AppendInt(o, z.PoolIndex)
// string "Queued"
o = append(o, 0xa6, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64)
o = msgp.AppendTime(o, z.Queued)
// string "BitrotScan"
o = append(o, 0xaa, 0x42, 0x69, 0x74, 0x72, 0x6f, 0x74, 0x53, 0x63, 0x61, 0x6e)
o = msgp.AppendBool(o, z.BitrotScan)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *PartialOperation) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Bucket":
z.Bucket, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
case "Object":
z.Object, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
case "VersionID":
z.VersionID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
case "Versions":
z.Versions, bts, err = msgp.ReadBytesBytes(bts, z.Versions)
if err != nil {
err = msgp.WrapError(err, "Versions")
return
}
case "SetIndex":
z.SetIndex, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "SetIndex")
return
}
case "PoolIndex":
z.PoolIndex, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "PoolIndex")
return
}
case "Queued":
z.Queued, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Queued")
return
}
case "BitrotScan":
z.BitrotScan, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "BitrotScan")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *PartialOperation) Msgsize() (s int) {
s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 10 + msgp.StringPrefixSize + len(z.VersionID) + 9 + msgp.BytesPrefixSize + len(z.Versions) + 9 + msgp.IntSize + 10 + msgp.IntSize + 7 + msgp.TimeSize + 11 + msgp.BoolSize
return
}

123
cmd/mrf_gen_test.go Normal file
View File

@ -0,0 +1,123 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalPartialOperation(t *testing.T) {
v := PartialOperation{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgPartialOperation(b *testing.B) {
v := PartialOperation{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgPartialOperation(b *testing.B) {
v := PartialOperation{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalPartialOperation(b *testing.B) {
v := PartialOperation{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodePartialOperation(t *testing.T) {
v := PartialOperation{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodePartialOperation Msgsize() is inaccurate")
}
vn := PartialOperation{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodePartialOperation(b *testing.B) {
v := PartialOperation{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodePartialOperation(b *testing.B) {
v := PartialOperation{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -258,9 +258,9 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu
for bktName, count := range bucketsMap { for bktName, count := range bucketsMap {
if count < quorum { if count < quorum {
// Queue a bucket heal task // Queue a bucket heal task
globalMRFState.addPartialOp(partialOperation{ globalMRFState.addPartialOp(PartialOperation{
bucket: bktName, Bucket: bktName,
queued: time.Now(), Queued: time.Now(),
}) })
} }
} }

View File

@ -23,11 +23,26 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"time"
"github.com/coreos/go-systemd/v22/daemon" "github.com/coreos/go-systemd/v22/daemon"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
) )
func shutdownHealMRFWithTimeout() {
const shutdownTimeout = time.Minute
finished := make(chan struct{})
go func() {
globalMRFState.shutdown()
close(finished)
}()
select {
case <-time.After(shutdownTimeout):
case <-finished:
}
}
func handleSignals() { func handleSignals() {
// Custom exit function // Custom exit function
exit := func(success bool) { exit := func(success bool) {
@ -50,6 +65,9 @@ func handleSignals() {
} }
stopProcess := func() bool { stopProcess := func() bool {
shutdownHealMRFWithTimeout() // this can take time sometimes, it needs to be executed
// before stopping s3 operations
// send signal to various go-routines that they need to quit. // send signal to various go-routines that they need to quit.
cancelGlobalContext() cancelGlobalContext()