2019-10-01 12:22:30 -03:00

435 lines
11 KiB
Go

package neutrino
import (
"container/heap"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/neutrino/headerfs"
)
// getUtxoResult is a simple pair type holding a spend report and error.
type getUtxoResult struct {
report *SpendReport
err error
}
// GetUtxoRequest is a request to scan for InputWithScript from the height
// BirthHeight.
type GetUtxoRequest struct {
// Input is the target outpoint with script to watch for spentness.
Input *InputWithScript
// BirthHeight is the height at which we expect to find the original
// unspent outpoint. This is also the height used when starting the
// search for spends.
BirthHeight uint32
// resultChan either the spend report or error for this request.
resultChan chan *getUtxoResult
// result caches the first spend report or error returned for this
// request.
result *getUtxoResult
// mu ensures the first response delivered via resultChan is in fact
// what gets cached in result.
mu sync.Mutex
quit chan struct{}
}
// deliver tries to deliver the report or error to any subscribers. If
// resultChan cannot accept a new update, this method will not block.
func (r *GetUtxoRequest) deliver(report *SpendReport, err error) {
select {
case r.resultChan <- &getUtxoResult{report, err}:
default:
log.Warnf("duplicate getutxo result delivered for "+
"outpoint=%v, spend=%v, err=%v",
r.Input.OutPoint, report, err)
}
}
// Result is callback returning either a spend report or an error.
func (r *GetUtxoRequest) Result(cancel <-chan struct{}) (*SpendReport, error) {
r.mu.Lock()
defer r.mu.Unlock()
select {
case result := <-r.resultChan:
// Cache the first result returned, in case we have multiple
// readers calling Result.
if r.result == nil {
r.result = result
}
return r.result.report, r.result.err
case <-cancel:
return nil, ErrGetUtxoCancelled
case <-r.quit:
return nil, ErrShuttingDown
}
}
// UtxoScannerConfig exposes configurable methods for interacting with the blockchain.
type UtxoScannerConfig struct {
// BestSnapshot returns the block stamp of the current chain tip.
BestSnapshot func() (*headerfs.BlockStamp, error)
// GetBlockHash returns the block hash at given height in main chain.
GetBlockHash func(height int64) (*chainhash.Hash, error)
// BlockFilterMatches checks the cfilter for the block hash for matches
// against the rescan options.
BlockFilterMatches func(ro *rescanOptions, blockHash *chainhash.Hash) (bool, error)
// GetBlock fetches a block from the p2p network.
GetBlock func(chainhash.Hash, ...QueryOption) (*btcutil.Block, error)
}
// UtxoScanner batches calls to GetUtxo so that a single scan can search for
// multiple outpoints. If a scan is in progress when a new element is added, we
// check whether it can safely be added to the current batch, if not it will be
// included in the next batch.
type UtxoScanner struct {
started uint32
stopped uint32
cfg *UtxoScannerConfig
pq GetUtxoRequestPQ
nextBatch []*GetUtxoRequest
mu sync.Mutex
cv *sync.Cond
wg sync.WaitGroup
quit chan struct{}
shutdown chan struct{}
}
// NewUtxoScanner creates a new instance of UtxoScanner using the given chain
// interface.
func NewUtxoScanner(cfg *UtxoScannerConfig) *UtxoScanner {
scanner := &UtxoScanner{
cfg: cfg,
quit: make(chan struct{}),
shutdown: make(chan struct{}),
}
scanner.cv = sync.NewCond(&scanner.mu)
return scanner
}
// Start begins running scan batches.
func (s *UtxoScanner) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
s.wg.Add(1)
go s.batchManager()
return nil
}
// Stop any in-progress scan.
func (s *UtxoScanner) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
close(s.quit)
batchShutdown:
for {
select {
case <-s.shutdown:
break batchShutdown
case <-time.After(50 * time.Millisecond):
s.cv.Signal()
}
}
// Cancel all pending get utxo requests that were not pulled into the
// batchManager's main goroutine.
for !s.pq.IsEmpty() {
pendingReq := heap.Pop(&s.pq).(*GetUtxoRequest)
pendingReq.deliver(nil, ErrShuttingDown)
}
return nil
}
// Enqueue takes a GetUtxoRequest and adds it to the next applicable batch.
func (s *UtxoScanner) Enqueue(input *InputWithScript,
birthHeight uint32) (*GetUtxoRequest, error) {
log.Debugf("Enqueuing request for %s with birth height %d",
input.OutPoint.String(), birthHeight)
req := &GetUtxoRequest{
Input: input,
BirthHeight: birthHeight,
resultChan: make(chan *getUtxoResult, 1),
quit: s.quit,
}
s.cv.L.Lock()
select {
case <-s.quit:
s.cv.L.Unlock()
return nil, ErrShuttingDown
default:
}
// Insert the request into the queue and signal any threads that might be
// waiting for new elements.
heap.Push(&s.pq, req)
s.cv.L.Unlock()
s.cv.Signal()
return req, nil
}
// batchManager is responsible for scheduling batches of UTXOs to scan. Any
// incoming requests whose start height has already been passed will be added to
// the next batch, which gets scheduled after the current batch finishes.
//
// NOTE: This method MUST be spawned as a goroutine.
func (s *UtxoScanner) batchManager() {
defer close(s.shutdown)
for {
s.cv.L.Lock()
// Re-queue previously skipped requests for next batch.
for _, request := range s.nextBatch {
heap.Push(&s.pq, request)
}
s.nextBatch = nil
// Wait for the queue to be non-empty.
for s.pq.IsEmpty() {
s.cv.Wait()
select {
case <-s.quit:
s.cv.L.Unlock()
return
default:
}
}
req := s.pq.Peek()
s.cv.L.Unlock()
// Break out now before starting a scan if a shutdown was
// requested.
select {
case <-s.quit:
return
default:
}
// Initiate a scan, starting from the birth height of the
// least-height request currently in the queue.
err := s.scanFromHeight(req.BirthHeight)
if err != nil {
log.Errorf("utxo scan failed: %v", err)
}
}
}
// dequeueAtHeight returns all GetUtxoRequests that have starting height of the
// given height.
func (s *UtxoScanner) dequeueAtHeight(height uint32) []*GetUtxoRequest {
s.cv.L.Lock()
defer s.cv.L.Unlock()
// Take any requests that are too old to go in this batch and keep them for
// the next batch.
for !s.pq.IsEmpty() && s.pq.Peek().BirthHeight < height {
item := heap.Pop(&s.pq).(*GetUtxoRequest)
s.nextBatch = append(s.nextBatch, item)
}
var requests []*GetUtxoRequest
for !s.pq.IsEmpty() && s.pq.Peek().BirthHeight == height {
item := heap.Pop(&s.pq).(*GetUtxoRequest)
requests = append(requests, item)
}
return requests
}
// scanFromHeight runs a single batch, pulling in any requests that get added
// above the batch's last processed height. If there was an error, then return
// the outstanding requests.
func (s *UtxoScanner) scanFromHeight(initHeight uint32) error {
// Before beginning the scan, grab the best block stamp we know of,
// which will serve as an initial estimate for the end height of the
// scan.
bestStamp, err := s.cfg.BestSnapshot()
if err != nil {
return err
}
var (
// startHeight and endHeight bound the range of the current
// scan. If more blocks are found while a scan is running,
// these values will be updated afterwards to scan for the new
// blocks.
startHeight = initHeight
endHeight = uint32(bestStamp.Height)
)
reporter := newBatchSpendReporter()
scanToEnd:
// Scan forward through the blockchain and look for any transactions that
// might spend the given UTXOs.
for height := startHeight; height <= endHeight; height++ {
// Before beginning to scan this height, check to see if the
// utxoscanner has been signaled to exit.
select {
case <-s.quit:
return reporter.FailRemaining(ErrShuttingDown)
default:
}
hash, err := s.cfg.GetBlockHash(int64(height))
if err != nil {
return reporter.FailRemaining(err)
}
// If there are any new requests that can safely be added to this batch,
// then try and fetch them.
newReqs := s.dequeueAtHeight(height)
// If an outpoint is created in this block, then fetch it regardless.
// Otherwise check to see if the filter matches any of our watched
// outpoints.
fetch := len(newReqs) > 0
if !fetch {
options := rescanOptions{
watchList: reporter.filterEntries,
}
match, err := s.cfg.BlockFilterMatches(&options, hash)
if err != nil {
return reporter.FailRemaining(err)
}
// If still no match is found, we have no reason to
// fetch this block, and can continue to next height.
if !match {
continue
}
}
// At this point, we've determined that we either (1) have new
// requests which we need the block to scan for originating
// UTXOs, or (2) the watchlist triggered a match against the
// neutrino filter. Before fetching the block, check to see if
// the utxoscanner has been signaled to exit so that we can exit
// the rescan before performing an expensive operation.
select {
case <-s.quit:
return reporter.FailRemaining(ErrShuttingDown)
default:
}
log.Debugf("Fetching block height=%d hash=%s", height, hash)
block, err := s.cfg.GetBlock(*hash)
if err != nil {
return reporter.FailRemaining(err)
}
// Check again to see if the utxoscanner has been signaled to exit.
select {
case <-s.quit:
return reporter.FailRemaining(ErrShuttingDown)
default:
}
log.Debugf("Processing block height=%d hash=%s", height, hash)
reporter.ProcessBlock(block.MsgBlock(), newReqs, height)
}
// We've scanned up to the end height, now perform a check to see if we
// still have any new blocks to process. If this is the first time
// through, we might have a few blocks that were added since the
// scan started.
currStamp, err := s.cfg.BestSnapshot()
if err != nil {
return reporter.FailRemaining(err)
}
// If the returned height is higher, we still have more blocks to go.
// Shift the start and end heights and continue scanning.
if uint32(currStamp.Height) > endHeight {
startHeight = endHeight + 1
endHeight = uint32(currStamp.Height)
goto scanToEnd
}
reporter.NotifyUnspentAndUnfound()
return nil
}
// A GetUtxoRequestPQ implements heap.Interface and holds GetUtxoRequests. The
// queue maintains that heap.Pop() will always return the GetUtxo request with
// the least starting height. This allows us to add new GetUtxo requests to
// an already running batch.
type GetUtxoRequestPQ []*GetUtxoRequest
func (pq GetUtxoRequestPQ) Len() int { return len(pq) }
func (pq GetUtxoRequestPQ) Less(i, j int) bool {
// We want Pop to give us the least BirthHeight.
return pq[i].BirthHeight < pq[j].BirthHeight
}
func (pq GetUtxoRequestPQ) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
// Push is called by the heap.Interface implementation to add an element to the
// end of the backing store. The heap library will then maintain the heap
// invariant.
func (pq *GetUtxoRequestPQ) Push(x interface{}) {
item := x.(*GetUtxoRequest)
*pq = append(*pq, item)
}
// Peek returns the least height element in the queue without removing it.
func (pq *GetUtxoRequestPQ) Peek() *GetUtxoRequest {
return (*pq)[0]
}
// Pop is called by the heap.Interface implementation to remove an element from
// the end of the backing store. The heap library will then maintain the heap
// invariant.
func (pq *GetUtxoRequestPQ) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// IsEmpty returns true if the queue has no elements.
func (pq *GetUtxoRequestPQ) IsEmpty() bool {
return pq.Len() == 0
}