mirror of
https://github.com/muun/recovery.git
synced 2025-11-13 07:11:45 -05:00
Update project structure and build process
This commit is contained in:
217
recovery_tool/scanner/scanner.go
Normal file
217
recovery_tool/scanner/scanner.go
Normal file
@@ -0,0 +1,217 @@
|
||||
package scanner
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/muun/libwallet"
|
||||
"github.com/muun/recovery/electrum"
|
||||
"github.com/muun/recovery/utils"
|
||||
)
|
||||
|
||||
const taskTimeout = 15 * time.Minute
|
||||
const batchSize = 100
|
||||
|
||||
// Scanner finds unspent outputs and their transactions when given a map of addresses.
|
||||
//
|
||||
// It implements multi-server support, batching feature detection and use, concurrency control,
|
||||
// timeouts and cancellations, and provides a channel-based interface.
|
||||
//
|
||||
// Servers are provided by a ServerProvider instance, and rotated when unreachable or faulty. We
|
||||
// trust ServerProvider to prioritize good targets.
|
||||
//
|
||||
// Batching is leveraged when supported by a particular server, falling back to sequential requests
|
||||
// for single addresses (which is much slower, but can get us out of trouble when better servers are
|
||||
// not available).
|
||||
//
|
||||
// Timeouts and cancellations are an internal affair, not configurable by callers. See taskTimeout
|
||||
// declared above.
|
||||
//
|
||||
// Concurrency control works by using an electrum.Pool, limiting access to clients, and not an
|
||||
// internal worker pool. This is the Go way (limiting access to resources rather than having a fixed
|
||||
// number of parallel goroutines), and (more to the point) semantically correct. We don't care
|
||||
// about the number of concurrent workers, what we want to avoid is too many connections to
|
||||
// Electrum servers.
|
||||
type Scanner struct {
|
||||
pool *electrum.Pool
|
||||
servers *electrum.ServerProvider
|
||||
log *utils.Logger
|
||||
}
|
||||
|
||||
// Report contains information about an ongoing scan.
|
||||
type Report struct {
|
||||
ScannedAddresses int
|
||||
UtxosFound []*Utxo
|
||||
Err error
|
||||
}
|
||||
|
||||
// Utxo references a transaction output, plus the associated MuunAddress and script.
|
||||
type Utxo struct {
|
||||
TxID string
|
||||
OutputIndex int
|
||||
Amount int64
|
||||
Address libwallet.MuunAddress
|
||||
Script []byte
|
||||
}
|
||||
|
||||
// scanContext contains the synchronization objects for a single Scanner round, to manage Tasks.
|
||||
type scanContext struct {
|
||||
// Task management:
|
||||
addresses chan libwallet.MuunAddress
|
||||
results chan *scanTaskResult
|
||||
stopScan chan struct{}
|
||||
stopCollect chan struct{}
|
||||
wg *sync.WaitGroup
|
||||
|
||||
// Progress reporting:
|
||||
reports chan *Report
|
||||
reportCache *Report
|
||||
}
|
||||
|
||||
// NewScanner creates an initialized Scanner.
|
||||
func NewScanner(connectionPool *electrum.Pool, electrumProvider *electrum.ServerProvider) *Scanner {
|
||||
return &Scanner{
|
||||
pool: connectionPool,
|
||||
servers: electrumProvider,
|
||||
log: utils.NewLogger("Scanner"),
|
||||
}
|
||||
}
|
||||
|
||||
// Scan an address space and return all relevant transactions for a sweep.
|
||||
func (s *Scanner) Scan(addresses chan libwallet.MuunAddress) <-chan *Report {
|
||||
var waitGroup sync.WaitGroup
|
||||
|
||||
// Create the Context that goroutines will share:
|
||||
ctx := &scanContext{
|
||||
addresses: addresses,
|
||||
results: make(chan *scanTaskResult),
|
||||
stopScan: make(chan struct{}),
|
||||
stopCollect: make(chan struct{}),
|
||||
wg: &waitGroup,
|
||||
|
||||
reports: make(chan *Report),
|
||||
reportCache: &Report{
|
||||
ScannedAddresses: 0,
|
||||
UtxosFound: []*Utxo{},
|
||||
},
|
||||
}
|
||||
|
||||
// Start the scan in background:
|
||||
go s.startCollect(ctx)
|
||||
go s.startScan(ctx)
|
||||
|
||||
return ctx.reports
|
||||
}
|
||||
|
||||
func (s *Scanner) startCollect(ctx *scanContext) {
|
||||
// Collect all results until the done signal, or abort on the first error:
|
||||
for {
|
||||
select {
|
||||
case result := <-ctx.results:
|
||||
s.log.Printf("Scanned %d, found %d (err %v)", len(result.Task.addresses), len(result.Utxos), result.Err)
|
||||
|
||||
newReport := *ctx.reportCache // create a new private copy
|
||||
ctx.reportCache = &newReport
|
||||
|
||||
if result.Err != nil {
|
||||
ctx.reportCache.Err = s.log.Errorf("Scan failed: %w", result.Err)
|
||||
ctx.reports <- ctx.reportCache
|
||||
|
||||
close(ctx.stopScan) // failed after several retries, we give up and terminate all tasks
|
||||
close(ctx.reports) // close the report channel to let callers know we're done
|
||||
return
|
||||
}
|
||||
|
||||
ctx.reportCache.ScannedAddresses += len(result.Task.addresses)
|
||||
ctx.reportCache.UtxosFound = append(ctx.reportCache.UtxosFound, result.Utxos...)
|
||||
ctx.reports <- ctx.reportCache
|
||||
|
||||
case <-ctx.stopCollect:
|
||||
close(ctx.reports) // close the report channel to let callers know we're done
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scanner) startScan(ctx *scanContext) {
|
||||
s.log.Printf("Scan started")
|
||||
|
||||
batches := streamBatches(ctx.addresses)
|
||||
|
||||
var client *electrum.Client
|
||||
|
||||
for batch := range batches {
|
||||
// Stop the loop until a client becomes available, or the scan is canceled:
|
||||
select {
|
||||
case <-ctx.stopScan:
|
||||
return
|
||||
|
||||
case client = <-s.pool.Acquire():
|
||||
}
|
||||
|
||||
// Start scanning this address in background:
|
||||
ctx.wg.Add(1)
|
||||
|
||||
go func(batch []libwallet.MuunAddress) {
|
||||
defer s.pool.Release(client)
|
||||
defer ctx.wg.Done()
|
||||
|
||||
s.scanBatch(ctx, client, batch)
|
||||
}(batch)
|
||||
}
|
||||
|
||||
// Wait for all tasks that are still executing to complete:
|
||||
ctx.wg.Wait()
|
||||
s.log.Printf("Scan complete")
|
||||
|
||||
// Signal to the collector that this Context has no more pending work:
|
||||
close(ctx.stopCollect)
|
||||
}
|
||||
|
||||
func (s *Scanner) scanBatch(ctx *scanContext, client *electrum.Client, batch []libwallet.MuunAddress) {
|
||||
// NOTE:
|
||||
// We begin by building the task, passing our selected Client. Since we're choosing the instance,
|
||||
// it's our job to control acquisition and release of Clients to prevent sharing (remember,
|
||||
// clients are single-user). The task won't enforce this safety measure (it can't), it's fully
|
||||
// up to us.
|
||||
task := &scanTask{
|
||||
servers: s.servers,
|
||||
client: client,
|
||||
addresses: batch,
|
||||
timeout: taskTimeout,
|
||||
exit: ctx.stopCollect,
|
||||
}
|
||||
|
||||
// Do the thing and send back the result:
|
||||
ctx.results <- task.Execute()
|
||||
}
|
||||
|
||||
func streamBatches(addresses chan libwallet.MuunAddress) chan []libwallet.MuunAddress {
|
||||
batches := make(chan []libwallet.MuunAddress)
|
||||
|
||||
go func() {
|
||||
var nextBatch []libwallet.MuunAddress
|
||||
|
||||
for address := range addresses {
|
||||
// Add items to the batch until we reach the limit:
|
||||
nextBatch = append(nextBatch, address)
|
||||
|
||||
if len(nextBatch) < batchSize {
|
||||
continue
|
||||
}
|
||||
|
||||
// Send back the batch and start over:
|
||||
batches <- nextBatch
|
||||
nextBatch = []libwallet.MuunAddress{}
|
||||
}
|
||||
|
||||
// Send back an incomplete batch with any remaining addresses:
|
||||
if len(nextBatch) > 0 {
|
||||
batches <- nextBatch
|
||||
}
|
||||
|
||||
close(batches)
|
||||
}()
|
||||
|
||||
return batches
|
||||
}
|
||||
197
recovery_tool/scanner/task.go
Normal file
197
recovery_tool/scanner/task.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package scanner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/muun/libwallet"
|
||||
"github.com/muun/libwallet/btcsuitew/btcutilw"
|
||||
"github.com/muun/libwallet/btcsuitew/txscriptw"
|
||||
"github.com/muun/recovery/electrum"
|
||||
)
|
||||
|
||||
// scanTask encapsulates a parallelizable Scanner unit of work.
|
||||
type scanTask struct {
|
||||
servers *electrum.ServerProvider
|
||||
client *electrum.Client
|
||||
addresses []libwallet.MuunAddress
|
||||
timeout time.Duration
|
||||
exit chan struct{}
|
||||
}
|
||||
|
||||
// scanTaskResult contains a summary of the execution of a task.
|
||||
type scanTaskResult struct {
|
||||
Task *scanTask
|
||||
Utxos []*Utxo
|
||||
Err error
|
||||
}
|
||||
|
||||
// Execute obtains the Utxo set for the Task address, implementing a retry strategy.
|
||||
func (t *scanTask) Execute() *scanTaskResult {
|
||||
results := make(chan *scanTaskResult)
|
||||
timeout := time.After(t.timeout)
|
||||
|
||||
// Keep the last error around, in case we reach the timeout and want to know the reason:
|
||||
var lastError error
|
||||
|
||||
for {
|
||||
// Attempt to run the task:
|
||||
go t.tryExecuteAsync(results)
|
||||
|
||||
// Wait until a result is sent, the timeout is reached or the task canceled, capturing errors
|
||||
// errors along the way:
|
||||
select {
|
||||
case <-t.exit:
|
||||
return t.exitResult() // stop retrying when we get the done signal
|
||||
|
||||
case result := <-results:
|
||||
if result.Err == nil {
|
||||
return result // we're done! nice work everyone.
|
||||
}
|
||||
|
||||
lastError = result.Err // keep retrying when an attempt fails
|
||||
|
||||
case <-timeout:
|
||||
return t.errorResult(fmt.Errorf("Task timed out. Last error: %w", lastError)) // stop on timeout
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *scanTask) tryExecuteAsync(results chan *scanTaskResult) {
|
||||
// Errors will almost certainly arise from Electrum server failures, which are extremely
|
||||
// common. Unreachable IPs, dropped connections, sudden EOFs, etc. We'll run this task, assuming
|
||||
// the servers are at fault when something fails, disconnecting and cycling them as we retry.
|
||||
result := t.tryExecute()
|
||||
|
||||
if result.Err != nil {
|
||||
t.client.Disconnect()
|
||||
}
|
||||
|
||||
results <- result
|
||||
}
|
||||
|
||||
func (t *scanTask) tryExecute() *scanTaskResult {
|
||||
// If our client is not connected, make an attempt to connect to a server:
|
||||
if !t.client.IsConnected() {
|
||||
err := t.client.Connect(t.servers.NextServer())
|
||||
|
||||
if err != nil {
|
||||
return t.errorResult(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare the output scripts for all given addresses:
|
||||
outputScripts, err := getOutputScripts(t.addresses)
|
||||
if err != nil {
|
||||
return t.errorResult(err)
|
||||
}
|
||||
|
||||
// Prepare the index hashes that Electrum requires to list outputs:
|
||||
indexHashes, err := getIndexHashes(outputScripts)
|
||||
if err != nil {
|
||||
return t.errorResult(err)
|
||||
}
|
||||
|
||||
// Call Electrum to get the unspent output list, grouped by index for each address:
|
||||
var unspentRefGroups [][]electrum.UnspentRef
|
||||
|
||||
if t.client.SupportsBatching() {
|
||||
unspentRefGroups, err = t.listUnspentWithBatching(indexHashes)
|
||||
} else {
|
||||
unspentRefGroups, err = t.listUnspentWithoutBatching(indexHashes)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return t.errorResult(err)
|
||||
}
|
||||
|
||||
// Compile the results into a list of `Utxos`:
|
||||
var utxos []*Utxo
|
||||
|
||||
for i, unspentRefGroup := range unspentRefGroups {
|
||||
for _, unspentRef := range unspentRefGroup {
|
||||
newUtxo := &Utxo{
|
||||
TxID: unspentRef.TxHash,
|
||||
OutputIndex: unspentRef.TxPos,
|
||||
Amount: unspentRef.Value,
|
||||
Script: outputScripts[i],
|
||||
Address: t.addresses[i],
|
||||
}
|
||||
|
||||
utxos = append(utxos, newUtxo)
|
||||
}
|
||||
}
|
||||
|
||||
return t.successResult(utxos)
|
||||
}
|
||||
|
||||
func (t *scanTask) listUnspentWithBatching(indexHashes []string) ([][]electrum.UnspentRef, error) {
|
||||
unspentRefGroups, err := t.client.ListUnspentBatch(indexHashes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Listing with batching failed: %w", err)
|
||||
}
|
||||
|
||||
return unspentRefGroups, nil
|
||||
}
|
||||
|
||||
func (t *scanTask) listUnspentWithoutBatching(indexHashes []string) ([][]electrum.UnspentRef, error) {
|
||||
var unspentRefGroups [][]electrum.UnspentRef
|
||||
|
||||
for _, indexHash := range indexHashes {
|
||||
newGroup, err := t.client.ListUnspent(indexHash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Listing without batching failed: %w", err)
|
||||
}
|
||||
|
||||
unspentRefGroups = append(unspentRefGroups, newGroup)
|
||||
}
|
||||
|
||||
return unspentRefGroups, nil
|
||||
}
|
||||
|
||||
func (t *scanTask) errorResult(err error) *scanTaskResult {
|
||||
return &scanTaskResult{Task: t, Err: err}
|
||||
}
|
||||
|
||||
func (t *scanTask) successResult(utxos []*Utxo) *scanTaskResult {
|
||||
return &scanTaskResult{Task: t, Utxos: utxos}
|
||||
}
|
||||
|
||||
func (t *scanTask) exitResult() *scanTaskResult {
|
||||
return &scanTaskResult{Task: t}
|
||||
}
|
||||
|
||||
// getIndexHashes calculates all the Electrum index hashes for a list of output scripts.
|
||||
func getIndexHashes(outputScripts [][]byte) ([]string, error) {
|
||||
indexHashes := make([]string, len(outputScripts))
|
||||
|
||||
for i, outputScript := range outputScripts {
|
||||
indexHashes[i] = electrum.GetIndexHash(outputScript)
|
||||
}
|
||||
|
||||
return indexHashes, nil
|
||||
}
|
||||
|
||||
// getOutputScripts creates all the scripts that send to an list of Bitcoin address.
|
||||
func getOutputScripts(addresses []libwallet.MuunAddress) ([][]byte, error) {
|
||||
outputScripts := make([][]byte, len(addresses))
|
||||
|
||||
for i, address := range addresses {
|
||||
rawAddress := address.Address()
|
||||
|
||||
decodedAddress, err := btcutilw.DecodeAddress(rawAddress, &chaincfg.MainNetParams)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to decode address %s: %w", rawAddress, err)
|
||||
}
|
||||
|
||||
outputScript, err := txscriptw.PayToAddrScript(decodedAddress)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to craft script for %s: %w", rawAddress, err)
|
||||
}
|
||||
|
||||
outputScripts[i] = outputScript
|
||||
}
|
||||
|
||||
return outputScripts, nil
|
||||
}
|
||||
Reference in New Issue
Block a user