diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index bd17a5e1f..c2a73d07e 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -358,6 +358,9 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) go monitorLocalDisksAndHeal(ctx, z) } + + go globalMRFState.startMRFPersistence() + go globalMRFState.healRoutine(z) } func getLocalDisksToHeal() (disksToHeal Endpoints) { diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 72a402b23..26f9fea42 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -1409,13 +1409,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str } if !opts.Speedtest && len(versions) > 0 { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - queued: time.Now(), - versions: versions, - setIndex: er.setIndex, - poolIndex: er.poolIndex, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + Queued: time.Now(), + Versions: versions, + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, }) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 2be683d52..500644e85 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -395,24 +395,16 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // that we have some parts or data blocks missing or corrupted // - attempt a heal to successfully heal them for future calls. if written == partLength { - var scan madmin.HealScanMode - switch { - case errors.Is(err, errFileNotFound): - scan = madmin.HealNormalScan - case errors.Is(err, errFileCorrupt): - scan = madmin.HealDeepScan - } - switch scan { - case madmin.HealNormalScan, madmin.HealDeepScan: + if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) { healOnce.Do(func() { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - versionID: fi.VersionID, - queued: time.Now(), - setIndex: er.setIndex, - poolIndex: er.poolIndex, - scanMode: scan, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + VersionID: fi.VersionID, + Queued: time.Now(), + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, + BitrotScan: errors.Is(err, errFileCorrupt), }) }) // Healing is triggered and we have written @@ -814,13 +806,13 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s // additionally do not heal delete markers inline, let them be // healed upon regular heal process. if missingBlocks > 0 && missingBlocks < fi.Erasure.DataBlocks { - globalMRFState.addPartialOp(partialOperation{ - bucket: fi.Volume, - object: fi.Name, - versionID: fi.VersionID, - queued: time.Now(), - setIndex: er.setIndex, - poolIndex: er.poolIndex, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: fi.Volume, + Object: fi.Name, + VersionID: fi.VersionID, + Queued: time.Now(), + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, }) } @@ -1572,13 +1564,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st break } } else { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - queued: time.Now(), - versions: versions, - setIndex: er.setIndex, - poolIndex: er.poolIndex, + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + Queued: time.Now(), + Versions: versions, + SetIndex: er.setIndex, + PoolIndex: er.poolIndex, }) } } @@ -2107,11 +2099,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string // Send the successful but partial upload/delete, however ignore // if the channel is blocked by other items. func (er erasureObjects) addPartial(bucket, object, versionID string) { - globalMRFState.addPartialOp(partialOperation{ - bucket: bucket, - object: object, - versionID: versionID, - queued: time.Now(), + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bucket, + Object: object, + VersionID: versionID, + Queued: time.Now(), }) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 55dc5f890..e4287a958 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -187,19 +187,12 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ globalLeaderLock = newSharedLock(GlobalContext, z, "leader.lock") }) - // Enable background operations on - // - // - Disk auto healing - // - MRF (most recently failed) healing - // - Background expiration routine for lifecycle policies + // Start self healing after the object initialization + // so various tasks will be useful bootstrapTrace("initAutoHeal", func() { initAutoHeal(GlobalContext, z) }) - bootstrapTrace("initHealMRF", func() { - go globalMRFState.healRoutine(z) - }) - // initialize the object layer. defer setObjectLayer(z) diff --git a/cmd/globals.go b/cmd/globals.go index e5bea1fba..4f18c6b39 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -384,9 +384,7 @@ var ( globalBackgroundHealRoutine = newHealRoutine() globalBackgroundHealState = newHealState(GlobalContext, false) - globalMRFState = mrfState{ - opCh: make(chan partialOperation, mrfOpsQueueSize), - } + globalMRFState = newMRFState() // If writes to FS backend should be O_SYNC. globalFSOSync bool diff --git a/cmd/mrf.go b/cmd/mrf.go index 27ec966fa..f97583ad9 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2024 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -15,51 +15,203 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//go:generate msgp -file=$GOFILE + package cmd import ( "context" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" "time" "github.com/google/uuid" "github.com/minio/madmin-go/v3" "github.com/minio/pkg/v3/wildcard" + "github.com/tinylib/msgp/msgp" ) const ( mrfOpsQueueSize = 100000 ) -// partialOperation is a successful upload/delete of an object +const ( + healDir = ".heal" + healMRFDir = bucketMetaPrefix + SlashSeparator + healDir + SlashSeparator + "mrf" + healMRFMetaFormat = 1 + healMRFMetaVersionV1 = 1 +) + +// PartialOperation is a successful upload/delete of an object // but not written in all disks (having quorum) -type partialOperation struct { - bucket string - object string - versionID string - versions []byte - setIndex, poolIndex int - queued time.Time - scanMode madmin.HealScanMode +type PartialOperation struct { + Bucket string + Object string + VersionID string + Versions []byte + SetIndex, PoolIndex int + Queued time.Time + BitrotScan bool } // mrfState sncapsulates all the information // related to the global background MRF. type mrfState struct { - opCh chan partialOperation + opCh chan PartialOperation + + closed int32 + closing int32 + wg sync.WaitGroup +} + +func newMRFState() mrfState { + return mrfState{ + opCh: make(chan PartialOperation, mrfOpsQueueSize), + } } // Add a partial S3 operation (put/delete) when one or more disks are offline. -func (m *mrfState) addPartialOp(op partialOperation) { +func (m *mrfState) addPartialOp(op PartialOperation) { if m == nil { return } + if atomic.LoadInt32(&m.closed) == 1 { + return + } + + m.wg.Add(1) + defer m.wg.Done() + + if atomic.LoadInt32(&m.closing) == 1 { + return + } + select { case m.opCh <- op: default: } } +// Do not accept new MRF operations anymore and start to save +// the current heal status in one available disk +func (m *mrfState) shutdown() { + atomic.StoreInt32(&m.closing, 1) + m.wg.Wait() + close(m.opCh) + atomic.StoreInt32(&m.closed, 1) + + if len(m.opCh) > 0 { + healingLogEvent(context.Background(), "Saving MRF healing data (%d entries)", len(m.opCh)) + } + + newReader := func() io.ReadCloser { + r, w := io.Pipe() + go func() { + // Initialize MRF meta header. + var data [4]byte + binary.LittleEndian.PutUint16(data[0:2], healMRFMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], healMRFMetaVersionV1) + mw := msgp.NewWriter(w) + n, err := mw.Write(data[:]) + if err != nil { + w.CloseWithError(err) + return + } + if n != len(data) { + w.CloseWithError(io.ErrShortWrite) + return + } + for item := range m.opCh { + err = item.EncodeMsg(mw) + if err != nil { + break + } + } + mw.Flush() + w.CloseWithError(err) + }() + return r + } + + globalLocalDrivesMu.RLock() + localDrives := cloneDrives(globalLocalDrivesMap) + globalLocalDrivesMu.RUnlock() + + for _, localDrive := range localDrives { + r := newReader() + err := localDrive.CreateFile(context.Background(), "", minioMetaBucket, pathJoin(healMRFDir, "list.bin"), -1, r) + r.Close() + if err == nil { + break + } + } +} + +func (m *mrfState) startMRFPersistence() { + loadMRF := func(rc io.ReadCloser, opCh chan PartialOperation) error { + defer rc.Close() + var data [4]byte + n, err := rc.Read(data[:]) + if err != nil { + return err + } + if n != len(data) { + return errors.New("heal mrf: no data") + } + // Read resync meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case healMRFMetaFormat: + default: + return fmt.Errorf("heal mrf: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case healMRFMetaVersionV1: + default: + return fmt.Errorf("heal mrf: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + + mr := msgp.NewReader(rc) + for { + op := PartialOperation{} + err = op.DecodeMsg(mr) + if err != nil { + break + } + opCh <- op + } + + return nil + } + + globalLocalDrivesMu.RLock() + localDrives := cloneDrives(globalLocalDrivesMap) + globalLocalDrivesMu.RUnlock() + + for _, localDrive := range localDrives { + if localDrive == nil { + continue + } + rc, err := localDrive.ReadFileStream(context.Background(), minioMetaBucket, pathJoin(healMRFDir, "list.bin"), 0, -1) + if err != nil { + continue + } + err = loadMRF(rc, m.opCh) + if err != nil { + continue + } + // finally delete the file after processing mrf entries + localDrive.Delete(GlobalContext, minioMetaBucket, pathJoin(healMRFDir, "list.bin"), DeleteOptions{}) + break + } + + return +} + var healSleeper = newDynamicSleeper(5, time.Second, false) // healRoutine listens to new disks reconnection events and @@ -78,24 +230,24 @@ func (m *mrfState) healRoutine(z *erasureServerPools) { // We might land at .metacache, .trash, .multipart // no need to heal them skip, only when bucket // is '.minio.sys' - if u.bucket == minioMetaBucket { + if u.Bucket == minioMetaBucket { // No MRF needed for temporary objects - if wildcard.Match("buckets/*/.metacache/*", u.object) { + if wildcard.Match("buckets/*/.metacache/*", u.Object) { continue } - if wildcard.Match("tmp/*", u.object) { + if wildcard.Match("tmp/*", u.Object) { continue } - if wildcard.Match("multipart/*", u.object) { + if wildcard.Match("multipart/*", u.Object) { continue } - if wildcard.Match("tmp-old/*", u.object) { + if wildcard.Match("tmp-old/*", u.Object) { continue } } now := time.Now() - if now.Sub(u.queued) < time.Second { + if now.Sub(u.Queued) < time.Second { // let recently failed networks to reconnect // making MRF wait for 1s before retrying, // i.e 4 reconnect attempts. @@ -106,21 +258,22 @@ func (m *mrfState) healRoutine(z *erasureServerPools) { wait := healSleeper.Timer(context.Background()) scan := madmin.HealNormalScan - if u.scanMode != 0 { - scan = u.scanMode + if u.BitrotScan { + scan = madmin.HealDeepScan } - if u.object == "" { - healBucket(u.bucket, scan) + + if u.Object == "" { + healBucket(u.Bucket, scan) } else { - if len(u.versions) > 0 { - vers := len(u.versions) / 16 + if len(u.Versions) > 0 { + vers := len(u.Versions) / 16 if vers > 0 { for i := 0; i < vers; i++ { - healObject(u.bucket, u.object, uuid.UUID(u.versions[16*i:]).String(), scan) + healObject(u.Bucket, u.Object, uuid.UUID(u.Versions[16*i:]).String(), scan) } } } else { - healObject(u.bucket, u.object, u.versionID, scan) + healObject(u.Bucket, u.Object, u.VersionID, scan) } } diff --git a/cmd/mrf_gen.go b/cmd/mrf_gen.go new file mode 100644 index 000000000..b2ec14465 --- /dev/null +++ b/cmd/mrf_gen.go @@ -0,0 +1,285 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *PartialOperation) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "VersionID": + z.VersionID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "Versions": + z.Versions, err = dc.ReadBytes(z.Versions) + if err != nil { + err = msgp.WrapError(err, "Versions") + return + } + case "SetIndex": + z.SetIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + case "PoolIndex": + z.PoolIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + case "Queued": + z.Queued, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Queued") + return + } + case "BitrotScan": + z.BitrotScan, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "BitrotScan") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *PartialOperation) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 8 + // write "Bucket" + err = en.Append(0x88, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Object" + err = en.Append(0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + // write "VersionID" + err = en.Append(0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x44) + if err != nil { + return + } + err = en.WriteString(z.VersionID) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + // write "Versions" + err = en.Append(0xa8, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteBytes(z.Versions) + if err != nil { + err = msgp.WrapError(err, "Versions") + return + } + // write "SetIndex" + err = en.Append(0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.SetIndex) + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + // write "PoolIndex" + err = en.Append(0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.PoolIndex) + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + // write "Queued" + err = en.Append(0xa6, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteTime(z.Queued) + if err != nil { + err = msgp.WrapError(err, "Queued") + return + } + // write "BitrotScan" + err = en.Append(0xaa, 0x42, 0x69, 0x74, 0x72, 0x6f, 0x74, 0x53, 0x63, 0x61, 0x6e) + if err != nil { + return + } + err = en.WriteBool(z.BitrotScan) + if err != nil { + err = msgp.WrapError(err, "BitrotScan") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *PartialOperation) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 8 + // string "Bucket" + o = append(o, 0x88, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Object" + o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + o = msgp.AppendString(o, z.Object) + // string "VersionID" + o = append(o, 0xa9, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x44) + o = msgp.AppendString(o, z.VersionID) + // string "Versions" + o = append(o, 0xa8, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) + o = msgp.AppendBytes(o, z.Versions) + // string "SetIndex" + o = append(o, 0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.SetIndex) + // string "PoolIndex" + o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.PoolIndex) + // string "Queued" + o = append(o, 0xa6, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64) + o = msgp.AppendTime(o, z.Queued) + // string "BitrotScan" + o = append(o, 0xaa, 0x42, 0x69, 0x74, 0x72, 0x6f, 0x74, 0x53, 0x63, 0x61, 0x6e) + o = msgp.AppendBool(o, z.BitrotScan) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *PartialOperation) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "VersionID": + z.VersionID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "VersionID") + return + } + case "Versions": + z.Versions, bts, err = msgp.ReadBytesBytes(bts, z.Versions) + if err != nil { + err = msgp.WrapError(err, "Versions") + return + } + case "SetIndex": + z.SetIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + case "PoolIndex": + z.PoolIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + case "Queued": + z.Queued, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Queued") + return + } + case "BitrotScan": + z.BitrotScan, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BitrotScan") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *PartialOperation) Msgsize() (s int) { + s = 1 + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 10 + msgp.StringPrefixSize + len(z.VersionID) + 9 + msgp.BytesPrefixSize + len(z.Versions) + 9 + msgp.IntSize + 10 + msgp.IntSize + 7 + msgp.TimeSize + 11 + msgp.BoolSize + return +} diff --git a/cmd/mrf_gen_test.go b/cmd/mrf_gen_test.go new file mode 100644 index 000000000..49ac17340 --- /dev/null +++ b/cmd/mrf_gen_test.go @@ -0,0 +1,123 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalPartialOperation(t *testing.T) { + v := PartialOperation{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgPartialOperation(b *testing.B) { + v := PartialOperation{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgPartialOperation(b *testing.B) { + v := PartialOperation{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalPartialOperation(b *testing.B) { + v := PartialOperation{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodePartialOperation(t *testing.T) { + v := PartialOperation{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodePartialOperation Msgsize() is inaccurate") + } + + vn := PartialOperation{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodePartialOperation(b *testing.B) { + v := PartialOperation{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodePartialOperation(b *testing.B) { + v := PartialOperation{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index 29e215e85..16395cf5d 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -258,9 +258,9 @@ func (sys *S3PeerSys) ListBuckets(ctx context.Context, opts BucketOptions) ([]Bu for bktName, count := range bucketsMap { if count < quorum { // Queue a bucket heal task - globalMRFState.addPartialOp(partialOperation{ - bucket: bktName, - queued: time.Now(), + globalMRFState.addPartialOp(PartialOperation{ + Bucket: bktName, + Queued: time.Now(), }) } } diff --git a/cmd/signals.go b/cmd/signals.go index db0ead79e..09f3fe691 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -23,11 +23,26 @@ import ( "net/http" "os" "strings" + "time" "github.com/coreos/go-systemd/v22/daemon" "github.com/minio/minio/internal/logger" ) +func shutdownHealMRFWithTimeout() { + const shutdownTimeout = time.Minute + + finished := make(chan struct{}) + go func() { + globalMRFState.shutdown() + close(finished) + }() + select { + case <-time.After(shutdownTimeout): + case <-finished: + } +} + func handleSignals() { // Custom exit function exit := func(success bool) { @@ -50,6 +65,9 @@ func handleSignals() { } stopProcess := func() bool { + shutdownHealMRFWithTimeout() // this can take time sometimes, it needs to be executed + // before stopping s3 operations + // send signal to various go-routines that they need to quit. cancelGlobalContext()