mirror of
https://github.com/muun/recovery.git
synced 2025-02-23 19:42:30 -05:00
291 lines
7.9 KiB
Go
291 lines
7.9 KiB
Go
|
package pushtx
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||
|
"github.com/btcsuite/btcd/wire"
|
||
|
"github.com/btcsuite/btcwallet/wtxmgr"
|
||
|
"github.com/lightninglabs/neutrino/blockntfns"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// ErrBroadcastStopped is an error returned when we attempt to process a
|
||
|
// request to broadcast a transaction but the Broadcaster has already
|
||
|
// been stopped.
|
||
|
ErrBroadcasterStopped = errors.New("broadcaster has been stopped")
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// DefaultRebroadcastInterval is the default period that we'll wait
|
||
|
// between blocks to attempt another rebroadcast.
|
||
|
DefaultRebroadcastInterval = time.Minute
|
||
|
)
|
||
|
|
||
|
// broadcastReq is an internal message the Broadcaster will use to process
|
||
|
// transaction broadcast requests.
|
||
|
type broadcastReq struct {
|
||
|
tx *wire.MsgTx
|
||
|
errChan chan error
|
||
|
}
|
||
|
|
||
|
// Config contains all of the external dependencies required for the Broadcaster
|
||
|
// to properly carry out its duties.
|
||
|
type Config struct {
|
||
|
// Broadcast broadcasts a transaction to the network. We expect certain
|
||
|
// BroadcastError's to be returned to handle special cases, namely
|
||
|
// errors with the codes Mempool and Confirmed.
|
||
|
Broadcast func(*wire.MsgTx) error
|
||
|
|
||
|
// SubscribeBlocks returns a block subscription that delivers block
|
||
|
// notifications in order. This will be used to rebroadcast all
|
||
|
// transactions once a new block arrives.
|
||
|
SubscribeBlocks func() (*blockntfns.Subscription, error)
|
||
|
|
||
|
// RebroadcastInterval is the interval that we'll continually try to
|
||
|
// re-broadcast transactions in-between new block arrival.
|
||
|
RebroadcastInterval time.Duration
|
||
|
}
|
||
|
|
||
|
// Broadcaster is a subsystem responsible for reliably broadcasting transactions
|
||
|
// to the network. Each transaction will be rebroadcast upon every new block
|
||
|
// being connected/disconnected to/from the chain.
|
||
|
type Broadcaster struct {
|
||
|
start sync.Once
|
||
|
stop sync.Once
|
||
|
|
||
|
cfg Config
|
||
|
|
||
|
// broadcastReqs is a channel through which new transaction broadcast
|
||
|
// requests from external callers will be streamed through.
|
||
|
broadcastReqs chan *broadcastReq
|
||
|
|
||
|
quit chan struct{}
|
||
|
wg sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
// NewBroadcaster creates a new Broadcaster backed by the given config.
|
||
|
func NewBroadcaster(cfg *Config) *Broadcaster {
|
||
|
b := &Broadcaster{
|
||
|
cfg: *cfg,
|
||
|
broadcastReqs: make(chan *broadcastReq),
|
||
|
quit: make(chan struct{}),
|
||
|
}
|
||
|
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
// Start starts all of the necessary steps for the Broadcaster to begin properly
|
||
|
// carrying out its duties.
|
||
|
func (b *Broadcaster) Start() error {
|
||
|
var err error
|
||
|
b.start.Do(func() {
|
||
|
sub, err := b.cfg.SubscribeBlocks()
|
||
|
if err != nil {
|
||
|
err = fmt.Errorf("unable to subscribe for block "+
|
||
|
"notifications: %v", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
b.wg.Add(1)
|
||
|
go b.broadcastHandler(sub)
|
||
|
})
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Stop halts the Broadcaster from rebroadcasting pending transactions.
|
||
|
func (b *Broadcaster) Stop() {
|
||
|
b.stop.Do(func() {
|
||
|
close(b.quit)
|
||
|
b.wg.Wait()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// broadcastHandler is the main event handler of the Broadcaster responsible for
|
||
|
// handling new broadcast requests, rebroadcasting transactions upon every new
|
||
|
// block, etc.
|
||
|
//
|
||
|
// NOTE: This must be run as a goroutine.
|
||
|
func (b *Broadcaster) broadcastHandler(sub *blockntfns.Subscription) {
|
||
|
defer b.wg.Done()
|
||
|
defer sub.Cancel()
|
||
|
|
||
|
log.Infof("Broadcaster now active")
|
||
|
|
||
|
// transactions is the set of transactions we have broadcast so far,
|
||
|
// and are still not confirmed.
|
||
|
transactions := make(map[chainhash.Hash]*wire.MsgTx)
|
||
|
|
||
|
// confChan is a channel used to notify the broadcast handler about
|
||
|
// confirmed transactions.
|
||
|
confChan := make(chan chainhash.Hash)
|
||
|
|
||
|
// The rebroadcast semaphore is used to ensure we have only one
|
||
|
// rebroadcast running at a time.
|
||
|
rebroadcastSem := make(chan struct{}, 1)
|
||
|
rebroadcastSem <- struct{}{}
|
||
|
|
||
|
// triggerRebroadcast is a helper method that checks whether the
|
||
|
// rebroadcast semaphore is available, and if it is spawns a goroutine
|
||
|
// to rebroadcast all pending transactions.
|
||
|
triggerRebroadcast := func() {
|
||
|
select {
|
||
|
// If the rebroadcast semaphore is available, start a
|
||
|
// new goroutine to exectue a rebroadcast.
|
||
|
case <-rebroadcastSem:
|
||
|
default:
|
||
|
log.Tracef("Existing rebroadcast still in " +
|
||
|
"progress")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Make a copy of the current set of transactions to hand to
|
||
|
// the goroutine.
|
||
|
txs := make(map[chainhash.Hash]*wire.MsgTx)
|
||
|
for k, v := range transactions {
|
||
|
txs[k] = v.Copy()
|
||
|
}
|
||
|
|
||
|
b.wg.Add(1)
|
||
|
go func() {
|
||
|
defer b.wg.Done()
|
||
|
|
||
|
b.rebroadcast(txs, confChan)
|
||
|
rebroadcastSem <- struct{}{}
|
||
|
}()
|
||
|
|
||
|
}
|
||
|
|
||
|
reBroadcastTicker := time.NewTicker(b.cfg.RebroadcastInterval)
|
||
|
defer reBroadcastTicker.Stop()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
// A new broadcast request was submitted by an external caller.
|
||
|
case req := <-b.broadcastReqs:
|
||
|
err := b.cfg.Broadcast(req.tx)
|
||
|
if err != nil && !IsBroadcastError(err, Mempool) {
|
||
|
log.Errorf("Broadcast attempt failed: %v", err)
|
||
|
req.errChan <- err
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
transactions[req.tx.TxHash()] = req.tx
|
||
|
req.errChan <- nil
|
||
|
|
||
|
// A tx was confirmed, and we can remove it from our set of
|
||
|
// transactions.
|
||
|
case txHash := <-confChan:
|
||
|
delete(transactions, txHash)
|
||
|
|
||
|
// A new block notification has arrived, so we'll rebroadcast
|
||
|
// all of our pending transactions.
|
||
|
case _, ok := <-sub.Notifications:
|
||
|
if !ok {
|
||
|
log.Warn("Unable to rebroadcast transactions: " +
|
||
|
"block subscription was canceled")
|
||
|
continue
|
||
|
}
|
||
|
triggerRebroadcast()
|
||
|
|
||
|
// Between blocks, we'll also try to attempt additional
|
||
|
// re-broadcasts to ensure a timely confirmation.
|
||
|
case <-reBroadcastTicker.C:
|
||
|
triggerRebroadcast()
|
||
|
|
||
|
case <-b.quit:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// rebroadcast rebroadcasts all of the currently pending transactions. Care has
|
||
|
// been taken to ensure that the transactions are sorted in their dependency
|
||
|
// order to prevent peers from deeming our transactions as invalid due to
|
||
|
// broadcasting them before their pending dependencies.
|
||
|
func (b *Broadcaster) rebroadcast(txs map[chainhash.Hash]*wire.MsgTx,
|
||
|
confChan chan<- chainhash.Hash) {
|
||
|
|
||
|
// Return immediately if there are no transactions to re-broadcast.
|
||
|
if len(txs) == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
log.Debugf("Re-broadcasting %d transactions", len(txs))
|
||
|
|
||
|
sortedTxs := wtxmgr.DependencySort(txs)
|
||
|
for _, tx := range sortedTxs {
|
||
|
// Before attempting to broadcast this transaction, we check
|
||
|
// whether we are shutting down.
|
||
|
select {
|
||
|
case <-b.quit:
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
err := b.cfg.Broadcast(tx)
|
||
|
switch {
|
||
|
// If the transaction has already confirmed on-chain, we can
|
||
|
// stop broadcasting it further.
|
||
|
//
|
||
|
// TODO(wilmer); This should ideally be implemented by checking
|
||
|
// the chain ourselves rather than trusting our peers.
|
||
|
case IsBroadcastError(err, Confirmed):
|
||
|
log.Debugf("Re-broadcast of txid=%v, now confirmed!",
|
||
|
tx.TxHash())
|
||
|
|
||
|
select {
|
||
|
case confChan <- tx.TxHash():
|
||
|
case <-b.quit:
|
||
|
return
|
||
|
}
|
||
|
continue
|
||
|
|
||
|
// If the transaction already exists within our peers' mempool,
|
||
|
// we'll continue to rebroadcast it to ensure it actually
|
||
|
// propagates throughout the network.
|
||
|
//
|
||
|
// TODO(wilmer): Rate limit peers that have already accepted our
|
||
|
// transaction into their mempool to prevent resending to them
|
||
|
// every time.
|
||
|
case IsBroadcastError(err, Mempool):
|
||
|
log.Debugf("Re-broadcast of txid=%v, still "+
|
||
|
"pending...", tx.TxHash())
|
||
|
|
||
|
continue
|
||
|
|
||
|
case err != nil:
|
||
|
log.Errorf("Unable to rebroadcast transaction %v: %v",
|
||
|
tx.TxHash(), err)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Broadcast submits a request to the Broadcaster to reliably broadcast the
|
||
|
// given transaction. An error won't be returned if the transaction already
|
||
|
// exists within the mempool. Any transaction broadcast through this method will
|
||
|
// be rebroadcast upon every change of the tip of the chain.
|
||
|
func (b *Broadcaster) Broadcast(tx *wire.MsgTx) error {
|
||
|
errChan := make(chan error, 1)
|
||
|
|
||
|
select {
|
||
|
case b.broadcastReqs <- &broadcastReq{
|
||
|
tx: tx,
|
||
|
errChan: errChan,
|
||
|
}:
|
||
|
case <-b.quit:
|
||
|
return ErrBroadcasterStopped
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case err := <-errChan:
|
||
|
return err
|
||
|
case <-b.quit:
|
||
|
return ErrBroadcasterStopped
|
||
|
}
|
||
|
}
|