2019-10-01 12:22:30 -03:00

1502 lines
44 KiB
Go

// NOTE: THIS API IS UNSTABLE RIGHT NOW.
package neutrino
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/neutrino/cache"
"github.com/lightninglabs/neutrino/filterdb"
"github.com/lightninglabs/neutrino/pushtx"
)
var (
// QueryTimeout specifies how long to wait for a peer to answer a
// query.
QueryTimeout = time.Second * 10
// QueryBatchTimout is the total time we'll wait for a batch fetch
// query to complete.
// TODO(halseth): instead use timeout since last received response?
QueryBatchTimeout = time.Second * 30
// QueryPeerCooldown is the time we'll wait before re-assigning a query
// to a peer that previously failed because of a timeout.
QueryPeerCooldown = time.Second * 5
// QueryNumRetries specifies how many times to retry sending a query to
// each peer before we've concluded we aren't going to get a valid
// response. This allows to make up for missed messages in some
// instances.
QueryNumRetries = 2
// QueryPeerConnectTimeout specifies how long to wait for the
// underlying chain service to connect to a peer before giving up
// on a query in case we don't have any peers.
QueryPeerConnectTimeout = time.Second * 30
// QueryEncoding specifies the default encoding (witness or not) for
// `getdata` and other similar messages.
QueryEncoding = wire.WitnessEncoding
)
// QueryAccess is an interface that gives acces to query a set of peers in
// different ways.
type QueryAccess interface {
queryAllPeers(
queryMsg wire.Message,
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}, peerQuit chan<- struct{}),
options ...QueryOption)
}
// A compile-time check to ensure that ChainService implements the
// QueryAccess interface.
var _ QueryAccess = (*ChainService)(nil)
// queries are a set of options that can be modified per-query, unlike global
// options.
//
// TODO: Make more query options that override global options.
type queryOptions struct {
// timeout lets the query know how long to wait for a peer to answer
// the query before moving onto the next peer.
timeout time.Duration
// numRetries tells the query how many times to retry asking each peer
// the query.
numRetries uint8
// peerConnectTimeout lets the query know how long to wait for the
// underlying chain service to connect to a peer before giving up
// on a query in case we don't have any peers.
peerConnectTimeout time.Duration
// encoding lets the query know which encoding to use when queueing
// messages to a peer.
encoding wire.MessageEncoding
// doneChan lets the query signal the caller when it's done, in case
// it's run in a goroutine.
doneChan chan<- struct{}
// persistToDisk indicates whether the filter should also be written
// to disk in addition to the memory cache. For "normal" wallets, they'll
// almost never need to re-match a filter once it's been fetched unless
// they're doing something like a key import.
persistToDisk bool
// optimisticBatch indicates whether we expect more calls to follow,
// and that we should attempt to batch more items with the query such
// that they can be cached, avoiding the extra round trip.
optimisticBatch optimisticBatchType
}
// optimisticBatchType is a type indicating the kind of batching we want to
// execute with a query.
type optimisticBatchType uint8
const (
// noBatch indicates no other than the specified item should be
// queried.
noBatch optimisticBatchType = iota
// forwardBatch is used to indicate we should also query for items
// following, as they most likely will be fetched next.
forwardBatch
// reverseBatch is used to indicate we should also query for items
// preceding, as they most likely will be fetched next.
reverseBatch
)
// QueryOption is a functional option argument to any of the network query
// methods, such as GetBlock and GetCFilter (when that resorts to a network
// query). These are always processed in order, with later options overriding
// earlier ones.
type QueryOption func(*queryOptions)
// defaultQueryOptions returns a queryOptions set to package-level defaults.
func defaultQueryOptions() *queryOptions {
return &queryOptions{
timeout: QueryTimeout,
numRetries: uint8(QueryNumRetries),
peerConnectTimeout: QueryPeerConnectTimeout,
encoding: QueryEncoding,
optimisticBatch: noBatch,
}
}
// applyQueryOptions updates a queryOptions set with functional options.
func (qo *queryOptions) applyQueryOptions(options ...QueryOption) {
for _, option := range options {
option(qo)
}
}
// Timeout is a query option that lets the query know how long to wait for each
// peer we ask the query to answer it before moving on.
func Timeout(timeout time.Duration) QueryOption {
return func(qo *queryOptions) {
qo.timeout = timeout
}
}
// NumRetries is a query option that lets the query know the maximum number of
// times each peer should be queried. The default is one.
func NumRetries(numRetries uint8) QueryOption {
return func(qo *queryOptions) {
qo.numRetries = numRetries
}
}
// PeerConnectTimeout is a query option that lets the query know how long to
// wait for the underlying chain service to connect to a peer before giving up
// on a query in case we don't have any peers.
func PeerConnectTimeout(timeout time.Duration) QueryOption {
return func(qo *queryOptions) {
qo.peerConnectTimeout = timeout
}
}
// Encoding is a query option that allows the caller to set a message encoding
// for the query messages.
func Encoding(encoding wire.MessageEncoding) QueryOption {
return func(qo *queryOptions) {
qo.encoding = encoding
}
}
// DoneChan allows the caller to pass a channel that will get closed when the
// query is finished.
func DoneChan(doneChan chan<- struct{}) QueryOption {
return func(qo *queryOptions) {
qo.doneChan = doneChan
}
}
// PersistToDisk allows the caller to tell that the filter should be kept
// on disk once it's found.
func PersistToDisk() QueryOption {
return func(qo *queryOptions) {
qo.persistToDisk = true
}
}
// OptimisticBatch allows the caller to tell that items following the requested
// one should be included in the query.
func OptimisticBatch() QueryOption {
return func(qo *queryOptions) {
qo.optimisticBatch = forwardBatch
}
}
// OptimisticReverseBatch allows the caller to tell that items preceding the
// requested one should be included in the query.
func OptimisticReverseBatch() QueryOption {
return func(qo *queryOptions) {
qo.optimisticBatch = reverseBatch
}
}
// queryState is an atomically updated per-query state for each query in a
// batch.
//
// State transitions are:
//
// * queryWaitSubmit->queryWaitResponse - send query to peer
// * queryWaitResponse->queryWaitSubmit - query timeout with no acceptable
// response
// * queryWaitResponse->queryAnswered - acceptable response to query received
type queryState uint32
const (
// Waiting to be submitted to a peer.
queryWaitSubmit queryState = iota
// Submitted to a peer, waiting for reply.
queryWaitResponse
// Valid reply received.
queryAnswered
)
// We provide 3 kinds of queries:
//
// * queryAllPeers allows a single query to be broadcast to all peers, and
// then waits for as many peers as possible to answer that query within
// a timeout. This allows for doing things like checking cfilter checkpoints.
//
// * queryPeers allows a single query to be passed to one peer at a time until
// the query is deemed answered. This is good for getting a single piece of
// data, such as a filter or a block.
//
// * queryBatch allows a batch of queries to be distributed among all peers,
// recirculating upon timeout.
//
// TODO(aakselrod): maybe abstract the query scheduler into a functional option
// and provide some presets (including the ones below) prior to factoring out
// the query API into its own package?
// queryChainServiceBatch is a helper function that sends a batch of queries to
// the entire pool of peers of the given ChainService, attempting to get them
// all answered unless the quit channel is closed. It continues to update its
// view of the connected peers in case peers connect or disconnect during the
// query. The package-level QueryTimeout parameter, overridable by the Timeout
// option, determines how long a peer waits for a query before moving onto the
// next one. The NumRetries option and the QueryNumRetries package-level
// variable are ignored; the query continues until it either completes or the
// passed quit channel is closed. For memory efficiency, we attempt to get
// responses as close to ordered as we can, so that the caller can cache as few
// responses as possible before committing to storage.
//
// TODO(aakselrod): support for more than one in-flight query per peer to
// reduce effects of latency.
func queryChainServiceBatch(
// s is the ChainService to use.
s *ChainService,
// queryMsgs is a slice of queries for which the caller wants responses.
queryMsgs []wire.Message,
// checkResponse is called for every received message to see if it
// answers the query message. It should return true if so.
checkResponse func(sp *ServerPeer, query wire.Message,
resp wire.Message) bool,
// queryQuit forces the query to end before it's complete.
queryQuit <-chan struct{},
// options takes functional options for executing the query.
options ...QueryOption) {
// Starting with the set of default options, we'll apply any specified
// functional options to the query.
qo := defaultQueryOptions()
qo.applyQueryOptions(options...)
// Shared state between this goroutine and the per-peer goroutines.
queryStates := make([]uint32, len(queryMsgs))
// subscription allows us to subscribe to notifications from peers.
msgChan := make(chan spMsg, len(queryMsgs))
subQuit := make(chan struct{})
subscription := spMsgSubscription{
msgChan: msgChan,
quitChan: subQuit,
}
defer close(subQuit)
// peerState holds a query message and an answer channel that get a
// notice when the query got a match. If it's the peer's match, the
// peer can mark the query a success and move on to the next query
// ahead of timeout.
type peerState struct {
msg wire.Message
matchSignal chan struct{}
}
// peerStates and its companion mutex allow the peer goroutines to
// tell the main goroutine what query they're currently working on, and
// for the main goroutine to signal the peer when a match has been
// received.
peerStates := make(map[string]peerState)
var mtxPeerStates sync.RWMutex
// allDone is a helper closure we'll use to determine whether we can
// exit due to all of our queries being answered.
allDone := func() bool {
for i := 0; i < len(queryStates); i++ {
if atomic.LoadUint32(&queryStates[i]) !=
uint32(queryAnswered) {
return false
}
}
return true
}
// allDoneSignal is a channel we'll close to signal to the main loop
// that all of the queries have been answered.
var allDoneOnce sync.Once
allDoneSignal := make(chan struct{})
peerGoroutine := func(sp *ServerPeer, quit <-chan struct{},
allDoneSignal chan struct{}) {
// Subscribe to messages from the peer.
sp.subscribeRecvMsg(subscription)
defer sp.unsubscribeRecvMsgs(subscription)
defer func() {
mtxPeerStates.Lock()
delete(peerStates, sp.Addr())
mtxPeerStates.Unlock()
}()
// Track the last query our peer failed to answer and skip over
// it for the next attempt. This helps prevent most instances
// of the same peer being asked for the same query every time.
firstUnfinished, handleQuery := 0, -1
for firstUnfinished < len(queryMsgs) {
select {
case <-queryQuit:
return
case <-s.quit:
return
case <-quit:
return
default:
}
handleQuery = -1
for i := firstUnfinished; i < len(queryMsgs); i++ {
// If this query is finished and we're at
// firstUnfinished, update firstUnfinished.
if i == firstUnfinished &&
atomic.LoadUint32(&queryStates[i]) ==
uint32(queryAnswered) {
firstUnfinished++
log.Tracef("Query #%v already answered, "+
"skipping", i)
continue
}
// We check to see if the query is waiting to
// be handled. If so, we mark it as being
// handled. If not, we move to the next one.
if !atomic.CompareAndSwapUint32(
&queryStates[i],
uint32(queryWaitSubmit),
uint32(queryWaitResponse),
) {
log.Tracef("Query #%v already being "+
"queried for, skipping", i)
continue
}
// The query is now marked as in-process. We
// begin to process it.
handleQuery = i
sp.QueueMessageWithEncoding(queryMsgs[i],
nil, qo.encoding)
break
}
// Regardless of whether we have a query or not, we
// need a timeout.
timeout := time.After(qo.timeout)
if handleQuery == -1 {
if firstUnfinished == len(queryMsgs) {
// We've now answered all the queries.
return
}
// We have nothing to work on but not all
// queries are answered yet. Wait for a query
// timeout, or a quit signal, then see if
// anything needs our help.
select {
case <-queryQuit:
return
case <-s.quit:
return
case <-quit:
return
case <-timeout:
if sp.Connected() {
continue
} else {
return
}
}
}
// We have a query we're working on.
matchSignal := make(chan struct{}, 1)
mtxPeerStates.Lock()
peerStates[sp.Addr()] = peerState{
msg: queryMsgs[handleQuery],
matchSignal: matchSignal,
}
mtxPeerStates.Unlock()
exiting := false
select {
case <-queryQuit:
exiting = true
case <-s.quit:
exiting = true
case <-quit:
exiting = true
case <-timeout:
// We failed, so set the query state back to
// zero and update our lastFailed state.
atomic.StoreUint32(
&queryStates[handleQuery], uint32(queryWaitSubmit),
)
// Delete the query from our peer states, to
// indicate we are no longer expecting a
// response.
mtxPeerStates.Lock()
delete(peerStates, sp.Addr())
mtxPeerStates.Unlock()
log.Tracef("Query for #%v failed, moving "+
"on: %v", handleQuery,
newLogClosure(func() string {
return spew.Sdump(queryMsgs[handleQuery])
}))
// To allow other peers to pick up this query,
// let the peer that just timed out wait a
// cooldown period before handing it the next
// query.
select {
case <-time.After(QueryPeerCooldown):
case <-queryQuit:
return
case <-s.quit:
return
case <-quit:
return
}
case _, ok := <-matchSignal:
if !ok {
exiting = true
break
}
log.Tracef("Query #%v answered, updating state",
handleQuery)
// We got a match signal so we can mark this
// query a success.
atomic.StoreUint32(&queryStates[handleQuery],
uint32(queryAnswered))
// If we're done answering all of our queries,
// we can exit now.
if allDone() {
allDoneOnce.Do(func() {
close(allDoneSignal)
})
return
}
}
// Before exiting the peer goroutine, reset the query
// state to ensure other peers can pick it up.
if exiting {
atomic.StoreUint32(
&queryStates[handleQuery], uint32(queryWaitSubmit),
)
// Delete the current state of the peer, so we
// won't process any lingering responses from
// it.
mtxPeerStates.Lock()
delete(peerStates, sp.Addr())
mtxPeerStates.Unlock()
return
}
}
}
// peerQuits holds per-peer quit channels so we can kill the goroutines
// when they disconnect.
peerQuits := make(map[string]chan struct{})
// Clean up on exit.
defer func() {
for _, quitChan := range peerQuits {
close(quitChan)
}
}()
for {
// Update our view of peers, starting new workers for new peers
// and removing disconnected/banned peers.
for _, peer := range s.Peers() {
sp := peer.Addr()
if _, ok := peerQuits[sp]; !ok && peer.Connected() {
peerQuits[sp] = make(chan struct{})
go peerGoroutine(
peer, peerQuits[sp], allDoneSignal,
)
}
}
for peer, quitChan := range peerQuits {
p := s.PeerByAddr(peer)
if p == nil || !p.Connected() {
close(quitChan)
delete(peerQuits, peer)
}
}
select {
case msg := <-msgChan:
mtxPeerStates.RLock()
curQuery, ok := peerStates[msg.sp.Addr()]
mtxPeerStates.RUnlock()
// Break if we didn't expect a response from this peer.
if !ok {
break
}
if checkResponse(msg.sp, curQuery.msg, msg.msg) {
select {
case <-queryQuit:
return
case <-s.quit:
return
case curQuery.matchSignal <- struct{}{}:
}
}
case <-time.After(qo.timeout):
case <-allDoneSignal:
return
case <-queryQuit:
return
case <-s.quit:
return
}
}
}
// queryAllPeers is a helper function that sends a query to all peers and waits
// for a timeout specified by the QueryTimeout package-level variable or the
// Timeout functional option. The NumRetries option is set to 1 by default
// unless overridden by the caller.
func (s *ChainService) queryAllPeers(
// queryMsg is the message to broadcast to all peers.
queryMsg wire.Message,
// checkResponse is called for every message within the timeout period.
// The quit channel lets the query know to terminate because the
// required response has been found. This is done by closing the
// channel. The peerQuit lets the query know to terminate the query for
// the peer which sent the response, allowing releasing resources for
// peers which respond quickly while continuing to wait for slower
// peers to respond and nonresponsive peers to time out.
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}, peerQuit chan<- struct{}),
// options takes functional options for executing the query.
options ...QueryOption) {
// Starting with the set of default options, we'll apply any specified
// functional options to the query.
qo := defaultQueryOptions()
qo.numRetries = 1
qo.applyQueryOptions(options...)
// This is done in a single-threaded query because the peerState is
// held in a single thread. This is the only part of the query
// framework that requires access to peerState, so it's done once per
// query.
peers := s.Peers()
// This will be shared state between the per-peer goroutines.
queryQuit := make(chan struct{})
allQuit := make(chan struct{})
var wg sync.WaitGroup
msgChan := make(chan spMsg)
subscription := spMsgSubscription{
msgChan: msgChan,
quitChan: allQuit,
}
// Now we start a goroutine for each peer which manages the peer's
// message subscription.
peerQuits := make(map[string]chan struct{})
for _, sp := range peers {
sp.subscribeRecvMsg(subscription)
wg.Add(1)
peerQuits[sp.Addr()] = make(chan struct{})
go func(sp *ServerPeer, peerQuit <-chan struct{}) {
defer wg.Done()
defer sp.unsubscribeRecvMsgs(subscription)
for i := uint8(0); i < qo.numRetries; i++ {
timeout := time.After(qo.timeout)
sp.QueueMessageWithEncoding(queryMsg,
nil, qo.encoding)
select {
case <-queryQuit:
return
case <-s.quit:
return
case <-peerQuit:
return
case <-timeout:
}
}
}(sp, peerQuits[sp.Addr()])
}
// This goroutine will wait until all of the peer-query goroutines have
// terminated, and then initiate a query shutdown.
go func() {
wg.Wait()
// Make sure our main goroutine and the subscription know to
// quit.
close(allQuit)
// Close the done channel, if any.
if qo.doneChan != nil {
close(qo.doneChan)
}
}()
// Loop for any messages sent to us via our subscription channel and
// check them for whether they satisfy the query. Break the loop when
// allQuit is closed.
checkResponses:
for {
select {
case <-queryQuit:
break checkResponses
case <-s.quit:
break checkResponses
case <-allQuit:
break checkResponses
// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
case sm := <-msgChan:
// TODO: This will get stuck if checkResponse gets
// stuck. This is a caveat for callers that should be
// fixed before exposing this function for public use.
select {
case <-peerQuits[sm.sp.Addr()]:
default:
checkResponse(sm.sp, sm.msg, queryQuit,
peerQuits[sm.sp.Addr()])
}
}
}
}
// queryChainServicePeers is a helper function that sends a query to one or
// more peers of the given ChainService, and waits for an answer. The timeout
// for queries is set by the QueryTimeout package-level variable or the Timeout
// functional option.
func queryChainServicePeers(
// s is the ChainService to use.
s *ChainService,
// queryMsg is the message to send to each peer selected by selectPeer.
queryMsg wire.Message,
// checkResponse is called for every message within the timeout period.
// The quit channel lets the query know to terminate because the
// required response has been found. This is done by closing the
// channel.
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}),
// options takes functional options for executing the query.
options ...QueryOption) {
// Starting with the set of default options, we'll apply any specified
// functional options to the query.
qo := defaultQueryOptions()
qo.applyQueryOptions(options...)
// We get an initial view of our peers, to be updated each time a peer
// query times out.
queryPeer := s.blockManager.SyncPeer()
peerTries := make(map[string]uint8)
// This will be state used by the peer query goroutine.
queryQuit := make(chan struct{})
subQuit := make(chan struct{})
// Increase this number to be able to handle more queries at once as
// each channel gets results for all queries, otherwise messages can
// get mixed and there's a vicious cycle of retries causing a bigger
// message flood, more of which get missed.
msgChan := make(chan spMsg)
subscription := spMsgSubscription{
msgChan: msgChan,
quitChan: subQuit,
}
// Loop for any messages sent to us via our subscription channel and
// check them for whether they satisfy the query. Break the loop if
// it's time to quit.
peerTimeout := time.NewTimer(qo.timeout)
connectionTimeout := time.NewTimer(qo.peerConnectTimeout)
connectionTicker := connectionTimeout.C
if queryPeer != nil {
peerTries[queryPeer.Addr()]++
queryPeer.subscribeRecvMsg(subscription)
queryPeer.QueueMessageWithEncoding(queryMsg, nil, qo.encoding)
}
checkResponses:
for {
select {
case <-connectionTicker:
// When we time out, we're done.
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
break checkResponses
case <-queryQuit:
// Same when we get a quit signal.
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
break checkResponses
case <-s.quit:
// Same when chain server's quit is signaled.
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
break checkResponses
// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
case sm := <-msgChan:
// TODO: This will get stuck if checkResponse gets
// stuck. This is a caveat for callers that should be
// fixed before exposing this function for public use.
checkResponse(sm.sp, sm.msg, queryQuit)
// Each time we receive a response from the current
// peer, we'll reset the main peer timeout as they're
// being responsive.
if !peerTimeout.Stop() {
select {
case <-peerTimeout.C:
default:
}
}
peerTimeout.Reset(qo.timeout)
// Also at this point, if the peerConnectTimeout is
// still active, then we can disable it, as we're
// receiving responses from the current peer.
if connectionTicker != nil && !connectionTimeout.Stop() {
select {
case <-connectionTimeout.C:
default:
}
}
connectionTicker = nil
// The current peer we're querying has failed to answer the
// query. Time to select a new peer and query it.
case <-peerTimeout.C:
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
}
queryPeer = nil
for _, peer := range s.Peers() {
// If the peer is no longer connected, we'll
// skip them.
if !peer.Connected() {
continue
}
// If we've yet to try this peer, we'll make
// sure to do so. If we've exceeded the number
// of tries we should retry this peer, then
// we'll skip them.
numTries, ok := peerTries[peer.Addr()]
if ok && numTries >= qo.numRetries {
continue
}
queryPeer = peer
// Found a peer we can query.
peerTries[queryPeer.Addr()]++
queryPeer.subscribeRecvMsg(subscription)
queryPeer.QueueMessageWithEncoding(
queryMsg, nil, qo.encoding,
)
break
}
// If at this point, we don't yet have a query peer,
// then we'll exit now as all the peers are exhausted.
if queryPeer == nil {
break checkResponses
}
}
}
// Close the subscription quit channel and the done channel, if any.
close(subQuit)
peerTimeout.Stop()
if qo.doneChan != nil {
close(qo.doneChan)
}
}
// getFilterFromCache returns a filter from ChainService's FilterCache if it
// exists, returning nil and error if it doesn't.
func (s *ChainService) getFilterFromCache(blockHash *chainhash.Hash,
filterType filterdb.FilterType) (*gcs.Filter, error) {
cacheKey := cache.FilterCacheKey{*blockHash, filterType}
filterValue, err := s.FilterCache.Get(cacheKey)
if err != nil {
return nil, err
}
return filterValue.(*cache.CacheableFilter).Filter, nil
}
// putFilterToCache inserts a given filter in ChainService's FilterCache.
func (s *ChainService) putFilterToCache(blockHash *chainhash.Hash,
filterType filterdb.FilterType, filter *gcs.Filter) (bool, error) {
cacheKey := cache.FilterCacheKey{*blockHash, filterType}
return s.FilterCache.Put(cacheKey, &cache.CacheableFilter{Filter: filter})
}
// cfiltersQuery is a struct that holds all the information necessary to
// perform batch GetCFilters request, and handle the responses.
type cfiltersQuery struct {
filterType wire.FilterType
startHeight int64
stopHeight int64
stopHash *chainhash.Hash
filterHeaders []chainhash.Hash
headerIndex map[chainhash.Hash]int
targetHash chainhash.Hash
filterChan chan *gcs.Filter
options []QueryOption
}
// queryMsg returns the wire message to perform this query.
func (q *cfiltersQuery) queryMsg() wire.Message {
return wire.NewMsgGetCFilters(
q.filterType, uint32(q.startHeight), q.stopHash,
)
}
// prepareCFiltersQuery creates a cfiltersQuery that can be used to fetch a
// CFilter fo the given block hash.
func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash,
filterType wire.FilterType, options ...QueryOption) (
*cfiltersQuery, error) {
_, height, err := s.BlockHeaders.FetchHeader(&blockHash)
if err != nil {
return nil, fmt.Errorf("unable to get header for start "+
"block=%v: %v", blockHash, err)
}
bestBlock, err := s.BestBlock()
if err != nil {
return nil, fmt.Errorf("unable to get best block: %v", err)
}
bestHeight := int64(bestBlock.Height)
qo := defaultQueryOptions()
qo.applyQueryOptions(options...)
// If the query specifies an optimistic batch we will attempt to fetch
// the maximum number of filters in anticipation of calls for the
// following or preceding filters.
var startHeight, stopHeight int64
switch qo.optimisticBatch {
// No batching, the start and stop height will be the same.
case noBatch:
startHeight = int64(height)
stopHeight = int64(height)
// Forward batch, fetch as many of the following filters as possible.
case forwardBatch:
startHeight = int64(height)
stopHeight = startHeight + wire.MaxGetCFiltersReqRange - 1
// We need a longer timeout, since we are going to receive more
// than a single response.
options = append(options, Timeout(QueryBatchTimeout))
// Reverse batch, fetch as many of the preceding filters as possible.
case reverseBatch:
stopHeight = int64(height)
startHeight = stopHeight - wire.MaxGetCFiltersReqRange + 1
// We need a longer timeout, since we are going to receive more
// than a single response.
options = append(options, Timeout(QueryBatchTimeout))
}
// Block 1 is the earliest one we can fetch.
if startHeight < 1 {
startHeight = 1
}
// If the stop height with the maximum batch size is above our best
// known block, then we use the best block height instead.
if stopHeight > bestHeight {
stopHeight = bestHeight
}
stopHash, err := s.GetBlockHash(stopHeight)
if err != nil {
return nil, fmt.Errorf("unable to get hash for "+
"stopHeight=%d: %v", stopHeight, err)
}
// In order to verify the authenticity of the received filters, we'll
// fetch the block headers and filter headers in the range
// [startHeight-1, stopHeight]. We go one below our startHeight since
// the hash of the previous block is needed for validation.
numFilters := uint32(stopHeight - startHeight + 1)
blockHeaders, _, err := s.BlockHeaders.FetchHeaderAncestors(
numFilters, stopHash,
)
if err != nil {
return nil, fmt.Errorf("unable to get %d block header "+
"ancestors for stopHash=%v: %v", numFilters,
stopHash, err)
}
if len(blockHeaders) != int(numFilters)+1 {
return nil, fmt.Errorf("expected %d block headers, got %d",
numFilters+1, len(blockHeaders))
}
filterHeaders, _, err := s.RegFilterHeaders.FetchHeaderAncestors(
numFilters, stopHash,
)
if err != nil {
return nil, fmt.Errorf("unable to get %d filter header "+
"ancestors for stopHash=%v: %v", numFilters, stopHash,
err)
}
if len(filterHeaders) != int(numFilters)+1 {
return nil, fmt.Errorf("expected %d filter headers, got %d",
numFilters+1, len(filterHeaders))
}
// We create a header index such that we can easily index into our
// header slices for a given block hash in the received response,
// without consulting the database. This also keeps track of which
// blocks we are still awaiting a response for. We start at index=1, as
// 0 is for the block startHeight-1, which is only needed for
// validation.
headerIndex := make(map[chainhash.Hash]int, len(blockHeaders)-1)
for i := 1; i < len(blockHeaders); i++ {
block := blockHeaders[i]
headerIndex[block.BlockHash()] = i
}
// We'll immediately respond to the caller with the requested filter
// when it is received, so we make a channel to notify on when it's
// ready.
filterChan := make(chan *gcs.Filter, 1)
return &cfiltersQuery{
filterType: filterType,
startHeight: startHeight,
stopHeight: stopHeight,
stopHash: stopHash,
filterHeaders: filterHeaders,
headerIndex: headerIndex,
targetHash: blockHash,
filterChan: filterChan,
options: options,
}, nil
}
// handleCFiltersRespons is called every time we receive a response for the
// GetCFilters request.
func (s *ChainService) handleCFiltersResponse(q *cfiltersQuery,
resp wire.Message, quit chan<- struct{}) {
// We're only interested in "cfilter" messages.
response, ok := resp.(*wire.MsgCFilter)
if !ok {
return
}
// If the response doesn't match our request, ignore this message.
if q.filterType != response.FilterType {
return
}
// If this filter is for a block not in our index, we can ignore it, as
// we either already got it, or it is out of our queried range.
i, ok := q.headerIndex[response.BlockHash]
if !ok {
return
}
gotFilter, err := gcs.FromNBytes(
builder.DefaultP, builder.DefaultM, response.Data,
)
if err != nil {
// Malformed filter data. We can ignore this message.
return
}
// Now that we have a proper filter, ensure that re-calculating the
// filter header hash for the header _after_ the filter in the chain
// checks out. If not, we can ignore this response.
curHeader := q.filterHeaders[i]
prevHeader := q.filterHeaders[i-1]
gotHeader, err := builder.MakeHeaderForFilter(
gotFilter, prevHeader,
)
if err != nil {
return
}
if gotHeader != curHeader {
return
}
// At this point, the filter matches what we know about it and we
// declare it sane. If this is the filter requested initially, send it
// to the caller immediately.
if response.BlockHash == q.targetHash {
q.filterChan <- gotFilter
}
// Put the filter in the cache and persistToDisk if the caller
// requested it.
// TODO(halseth): for an LRU we could take care to insert the next
// height filter last.
dbFilterType := filterdb.RegularFilter
evict, err := s.putFilterToCache(
&response.BlockHash, dbFilterType, gotFilter,
)
if err != nil {
log.Warnf("Couldn't write filter to cache: %v", err)
}
// TODO(halseth): dynamically increase/decrease the batch size to match
// our cache capacity.
numFilters := q.stopHeight - q.startHeight + 1
if evict && s.FilterCache.Len() < int(numFilters) {
log.Debugf("Items evicted from the cache with less "+
"than %d elements. Consider increasing the "+
"cache size...", numFilters)
}
qo := defaultQueryOptions()
qo.applyQueryOptions(q.options...)
if qo.persistToDisk {
err = s.FilterDB.PutFilter(
&response.BlockHash, gotFilter, dbFilterType,
)
if err != nil {
log.Warnf("Couldn't write filter to filterDB: "+
"%v", err)
}
log.Tracef("Wrote filter for block %s, type %d",
&response.BlockHash, dbFilterType)
}
// Finally, we can delete it from the headerIndex.
delete(q.headerIndex, response.BlockHash)
// If the headerIndex is empty, we got everything we wanted, and can
// exit.
if len(q.headerIndex) == 0 {
close(quit)
}
}
// GetCFilter gets a cfilter from the database. Failing that, it requests the
// cfilter from the network and writes it to the database. If extended is true,
// an extended filter will be queried for. Otherwise, we'll fetch the regular
// filter.
func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
filterType wire.FilterType, options ...QueryOption) (*gcs.Filter, error) {
// The only supported filter atm is the regular filter, so we'll reject
// all other filters.
if filterType != wire.GCSFilterRegular {
return nil, fmt.Errorf("unknown filter type: %v", filterType)
}
// Based on if extended is true or not, we'll set up our set of
// querying, and db-write functions.
dbFilterType := filterdb.RegularFilter
// First check the cache to see if we already have this filter. If
// so, then we can return it an exit early.
filter, err := s.getFilterFromCache(&blockHash, dbFilterType)
if err == nil && filter != nil {
return filter, nil
}
if err != nil && err != cache.ErrElementNotFound {
return nil, err
}
// If not in cache, check if it's in database, returning early if yes.
filter, err = s.FilterDB.FetchFilter(&blockHash, dbFilterType)
if err == nil && filter != nil {
return filter, nil
}
if err != nil && err != filterdb.ErrFilterNotFound {
return nil, err
}
// We acquire the mutex ensuring we don't have several redundant
// CFilter queries running in parallel.
s.mtxCFilter.Lock()
// Since another request might have added the filter to the cache while
// we were waiting for the mutex, we do a final lookup before starting
// our own query.
filter, err = s.getFilterFromCache(&blockHash, dbFilterType)
if err == nil && filter != nil {
s.mtxCFilter.Unlock()
return filter, nil
}
if err != nil && err != cache.ErrElementNotFound {
s.mtxCFilter.Unlock()
return nil, err
}
// We didn't get the filter from the DB, so we'll try to get it from
// the network.
query, err := s.prepareCFiltersQuery(blockHash, filterType, options...)
if err != nil {
s.mtxCFilter.Unlock()
return nil, err
}
// With all the necessary items retrieved, we'll launch our concurrent
// query to the set of connected peers.
log.Debugf("Fetching filters for heights=[%v, %v], stophash=%v",
query.startHeight, query.stopHeight, query.stopHash)
go func() {
defer s.mtxCFilter.Unlock()
defer close(query.filterChan)
s.queryPeers(
// Send a wire.MsgGetCFilters
query.queryMsg(),
// Check responses and if we get one that matches, end
// the query early.
func(_ *ServerPeer, resp wire.Message, quit chan<- struct{}) {
s.handleCFiltersResponse(query, resp, quit)
},
query.options...,
)
// If there are elements left to receive, the query failed.
if len(query.headerIndex) > 0 {
numFilters := query.stopHeight - query.startHeight + 1
log.Errorf("Query failed with %d out of %d filters "+
"received", len(query.headerIndex), numFilters)
return
}
}()
var ok bool
var resultFilter *gcs.Filter
// We will wait for the query to finish before we return the requested
// filter to the caller.
for {
select {
case filter, ok = <-query.filterChan:
if !ok {
// Query has finished, if we have a result we'll return it.
return resultFilter, nil
}
// We'll store the filter so we can return it later to the caller.
resultFilter = filter
case <-s.quit:
// TODO(halseth): return error?
return nil, nil
}
}
}
// GetBlock gets a block by requesting it from the network, one peer at a
// time, until one answers. If the block is found in the cache, it will be
// returned immediately.
func (s *ChainService) GetBlock(blockHash chainhash.Hash,
options ...QueryOption) (*btcutil.Block, error) {
// Fetch the corresponding block header from the database. If this
// isn't found, then we don't have the header for this block so we
// can't request it.
blockHeader, height, err := s.BlockHeaders.FetchHeader(&blockHash)
if err != nil || blockHeader.BlockHash() != blockHash {
return nil, fmt.Errorf("Couldn't get header for block %s "+
"from database", blockHash)
}
// Starting with the set of default options, we'll apply any specified
// functional options to the query so that we can check what inv type
// to use.
qo := defaultQueryOptions()
qo.applyQueryOptions(options...)
invType := wire.InvTypeWitnessBlock
if qo.encoding == wire.BaseEncoding {
invType = wire.InvTypeBlock
}
// Create an inv vector for getting this block.
inv := wire.NewInvVect(invType, &blockHash)
// If the block is already in the cache, we can return it immediately.
blockValue, err := s.BlockCache.Get(*inv)
if err == nil && blockValue != nil {
return blockValue.(*cache.CacheableBlock).Block, err
}
if err != nil && err != cache.ErrElementNotFound {
return nil, err
}
// Construct the appropriate getdata message to fetch the target block.
getData := wire.NewMsgGetData()
getData.AddInvVect(inv)
// The block is only updated from the checkResponse function argument,
// which is always called single-threadedly. We don't check the block
// until after the query is finished, so we can just write to it
// naively.
var foundBlock *btcutil.Block
s.queryPeers(
// Send a wire.GetDataMsg
getData,
// Check responses and if we get one that matches, end the
// query early.
func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}) {
switch response := resp.(type) {
// We're only interested in "block" messages.
case *wire.MsgBlock:
// Only keep this going if we haven't already
// found a block, or we risk closing an already
// closed channel.
if foundBlock != nil {
return
}
// If this isn't our block, ignore it.
if response.BlockHash() != blockHash {
return
}
block := btcutil.NewBlock(response)
// Only set height if btcutil hasn't
// automagically put one in.
if block.Height() == btcutil.BlockHeightUnknown {
block.SetHeight(int32(height))
}
// If this claims our block but doesn't pass
// the sanity check, the peer is trying to
// bamboozle us. Disconnect it.
if err := blockchain.CheckBlockSanity(
block,
// We don't need to check PoW because
// by the time we get here, it's been
// checked during header
// synchronization
s.chainParams.PowLimit,
s.timeSource,
); err != nil {
log.Warnf("Invalid block for %s "+
"received from %s -- "+
"disconnecting peer", blockHash,
sp.Addr())
sp.Disconnect()
return
}
// TODO(roasbeef): modify CheckBlockSanity to
// also check witness commitment
// At this point, the block matches what we
// know about it and we declare it sane. We can
// kill the query and pass the response back to
// the caller.
foundBlock = block
close(quit)
default:
}
},
options...,
)
if foundBlock == nil {
return nil, fmt.Errorf("Couldn't retrieve block %s from "+
"network", blockHash)
}
// Add block to the cache before returning it.
_, err = s.BlockCache.Put(*inv, &cache.CacheableBlock{foundBlock})
if err != nil {
log.Warnf("couldn't write block to cache: %v", err)
}
return foundBlock, nil
}
// sendTransaction sends a transaction to all peers. It returns an error if any
// peer rejects the transaction.
//
// TODO: Better privacy by sending to only one random peer and watching
// propagation, requires better peer selection support in query API.
//
// TODO(wilmer): Move to pushtx package after introducing a query package. This
// cannot be done at the moment due to circular dependencies.
func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) error {
// Starting with the set of default options, we'll apply any specified
// functional options to the query so that we can check what inv type
// to use. Broadcast the inv to all peers, responding to any getdata
// messages for the transaction.
qo := defaultQueryOptions()
qo.applyQueryOptions(options...)
invType := wire.InvTypeWitnessTx
if qo.encoding == wire.BaseEncoding {
invType = wire.InvTypeTx
}
// Create an inv.
txHash := tx.TxHash()
inv := wire.NewMsgInv()
inv.AddInvVect(wire.NewInvVect(invType, &txHash))
// We'll gather all of the peers who replied to our query, along with
// the ones who rejected it and their reason for rejecting it. We'll use
// this to determine whether our transaction was actually rejected.
numReplied := 0
rejections := make(map[pushtx.BroadcastError]int)
// Send the peer query and listen for getdata.
s.queryAllPeers(
inv,
func(sp *ServerPeer, resp wire.Message, quit chan<- struct{},
peerQuit chan<- struct{}) {
switch response := resp.(type) {
// A peer has replied with a GetData message, so we'll
// send them the transaction.
case *wire.MsgGetData:
for _, vec := range response.InvList {
if vec.Hash == txHash {
sp.QueueMessageWithEncoding(
tx, nil, qo.encoding,
)
numReplied++
}
}
// A peer has rejected our transaction for whatever
// reason. Rather than returning to the caller upon the
// first rejection, we'll gather them all to determine
// whether it is critical/fatal.
case *wire.MsgReject:
// Ensure this rejection is for the transaction
// we're attempting to broadcast.
if response.Hash != txHash {
return
}
broadcastErr := pushtx.ParseBroadcastError(
response, sp.Addr(),
)
rejections[*broadcastErr]++
}
},
// Default to 500ms timeout. Default for queryAllPeers is a
// single try.
//
// TODO(wilmer): Is this timeout long enough assuming a
// worst-case round trip? Also needs to take into account that
// the other peer must query its own state to determine whether
// it should accept the transaction.
append(
[]QueryOption{Timeout(time.Millisecond * 500)},
options...,
)...,
)
// If none of our peers replied to our query, we'll avoid returning an
// error as the reliable broadcaster will take care of broadcasting this
// transaction upon every block connected/disconnected.
if numReplied == 0 {
log.Debugf("No peers replied to inv message for transaction %v",
tx.TxHash())
return nil
}
// If all of our peers who replied to our query also rejected our
// transaction, we'll deem that there was actually something wrong with
// it so we'll return the most rejected error between all of our peers.
//
// TODO(wilmer): This might be too naive, some rejections are more
// critical than others.
//
// TODO(wilmer): This does not cover the case where a peer also rejected
// our transaction but didn't send the response within our given timeout
// and certain other cases. Due to this, we should probably decide on a
// threshold of rejections instead.
if numReplied == len(rejections) {
log.Warnf("All peers rejected transaction %v checking errors",
tx.TxHash())
mostRejectedCount := 0
var mostRejectedErr pushtx.BroadcastError
for broadcastErr, count := range rejections {
if count > mostRejectedCount {
mostRejectedCount = count
mostRejectedErr = broadcastErr
}
}
return &mostRejectedErr
}
return nil
}