hscontrol/state: make NodeStore batch configuration tunable (#2886)

This commit is contained in:
Kristoffer Dalby
2025-11-28 16:38:29 +01:00
committed by GitHub
parent 9c4c017eac
commit db293e0698
11 changed files with 267 additions and 140 deletions

View File

@@ -295,7 +295,8 @@ dns:
# Split DNS (see https://tailscale.com/kb/1054/dns/), # Split DNS (see https://tailscale.com/kb/1054/dns/),
# a map of domains and which DNS server to use for each. # a map of domains and which DNS server to use for each.
split: {} split:
{}
# foo.bar.com: # foo.bar.com:
# - 1.1.1.1 # - 1.1.1.1
# darp.headscale.net: # darp.headscale.net:
@@ -407,3 +408,14 @@ logtail:
# default static port 41641. This option is intended as a workaround for some buggy # default static port 41641. This option is intended as a workaround for some buggy
# firewall devices. See https://tailscale.com/kb/1181/firewalls/ for more information. # firewall devices. See https://tailscale.com/kb/1181/firewalls/ for more information.
randomize_client_port: false randomize_client_port: false
# Advanced performance tuning parameters.
# The defaults are carefully chosen and should rarely need adjustment.
# Only modify these if you have identified a specific performance issue.
#
# tuning:
# # NodeStore write batching configuration.
# # The NodeStore batches write operations before rebuilding peer relationships,
# # which is computationally expensive. Batching reduces rebuild frequency.
# #
# # node_store_batch_size: 100
# # node_store_batch_timeout: 500ms

View File

@@ -749,15 +749,25 @@ func (hsdb *HSDatabase) allocateTestIPs(nodeID types.NodeID) (*netip.Addr, *neti
} }
// Use simple sequential allocation for tests // Use simple sequential allocation for tests
// IPv4: 100.64.0.x (where x is nodeID) // IPv4: 100.64.x.y (where x = nodeID/256, y = nodeID%256)
// IPv6: fd7a:115c:a1e0::x (where x is nodeID) // IPv6: fd7a:115c:a1e0::x:y (where x = high byte, y = low byte)
// This supports up to 65535 nodes
const (
maxTestNodes = 65535
ipv4ByteDivisor = 256
)
if nodeID > 254 { if nodeID > maxTestNodes {
return nil, nil, fmt.Errorf("test node ID %d too large for simple IP allocation", nodeID) return nil, nil, ErrCouldNotAllocateIP
} }
ipv4 := netip.AddrFrom4([4]byte{100, 64, 0, byte(nodeID)}) // Split nodeID into high and low bytes for IPv4 (100.64.high.low)
ipv6 := netip.AddrFrom16([16]byte{0xfd, 0x7a, 0x11, 0x5c, 0xa1, 0xe0, 0, 0, 0, 0, 0, 0, 0, 0, 0, byte(nodeID)}) highByte := byte(nodeID / ipv4ByteDivisor)
lowByte := byte(nodeID % ipv4ByteDivisor)
ipv4 := netip.AddrFrom4([4]byte{100, 64, highByte, lowByte})
// For IPv6, use the last two bytes of the address (fd7a:115c:a1e0::high:low)
ipv6 := netip.AddrFrom16([16]byte{0xfd, 0x7a, 0x11, 0x5c, 0xa1, 0xe0, 0, 0, 0, 0, 0, 0, 0, 0, highByte, lowByte})
return &ipv4, &ipv6, nil return &ipv4, &ipv6, nil
} }

View File

@@ -203,8 +203,10 @@ func setupBatcherWithTestData(
}, },
}, },
Tuning: types.Tuning{ Tuning: types.Tuning{
BatchChangeDelay: 10 * time.Millisecond, BatchChangeDelay: 10 * time.Millisecond,
BatcherWorkers: types.DefaultBatcherWorkers(), // Use same logic as config.go BatcherWorkers: types.DefaultBatcherWorkers(), // Use same logic as config.go
NodeStoreBatchSize: state.TestBatchSize,
NodeStoreBatchTimeout: state.TestBatchTimeout,
}, },
} }
@@ -572,14 +574,12 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
name string name string
nodeCount int nodeCount int
}{ }{
{"10_nodes", 10}, {"10_nodes", 10}, // Quick baseline test
{"50_nodes", 50}, {"100_nodes", 100}, // Full scalability test ~2 minutes
{"100_nodes", 100}, // Large-scale tests commented out - uncomment for scalability testing
// Grinds to a halt because of Database bottleneck // {"1000_nodes", 1000}, // ~12 minutes
// {"250_nodes", 250}, // {"2000_nodes", 2000}, // ~60+ minutes
// {"500_nodes", 500}, // {"5000_nodes", 5000}, // Not recommended - database bottleneck
// {"1000_nodes", 1000},
// {"5000_nodes", 5000},
} }
for _, batcherFunc := range allBatcherFunctions { for _, batcherFunc := range allBatcherFunctions {
@@ -600,7 +600,8 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
// Use large buffer to avoid blocking during rapid joins // Use large buffer to avoid blocking during rapid joins
// Buffer needs to handle nodeCount * average_updates_per_node // Buffer needs to handle nodeCount * average_updates_per_node
// Estimate: each node receives ~2*nodeCount updates during all-to-all // Estimate: each node receives ~2*nodeCount updates during all-to-all
bufferSize := max(1000, tc.nodeCount*2) // For very large tests (>1000 nodes), limit buffer to avoid excessive memory
bufferSize := max(1000, min(tc.nodeCount*2, 10000))
testData, cleanup := setupBatcherWithTestData( testData, cleanup := setupBatcherWithTestData(
t, t,

View File

@@ -15,7 +15,7 @@ func TestNodeStoreDebugString(t *testing.T) {
{ {
name: "empty nodestore", name: "empty nodestore",
setupFn: func() *NodeStore { setupFn: func() *NodeStore {
return NewNodeStore(nil, allowAllPeersFunc) return NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
contains: []string{ contains: []string{
"=== NodeStore Debug Information ===", "=== NodeStore Debug Information ===",
@@ -30,7 +30,7 @@ func TestNodeStoreDebugString(t *testing.T) {
node1 := createTestNode(1, 1, "user1", "node1") node1 := createTestNode(1, 1, "user1", "node1")
node2 := createTestNode(2, 2, "user2", "node2") node2 := createTestNode(2, 2, "user2", "node2")
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
_ = store.PutNode(node1) _ = store.PutNode(node1)
@@ -66,7 +66,7 @@ func TestNodeStoreDebugString(t *testing.T) {
func TestDebugRegistrationCache(t *testing.T) { func TestDebugRegistrationCache(t *testing.T) {
// Create a minimal NodeStore for testing debug methods // Create a minimal NodeStore for testing debug methods
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
debugStr := store.DebugString() debugStr := store.DebugString()

View File

@@ -19,7 +19,7 @@ func TestEndpointStorageInNodeStore(t *testing.T) {
node2 := createTestNode(2, 1, "test-user", "node2") node2 := createTestNode(2, 1, "test-user", "node2")
// Create NodeStore with allow-all peers function // Create NodeStore with allow-all peers function
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()

View File

@@ -20,7 +20,7 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) {
node := createTestNode(1, 1, "test-user", "test-node") node := createTestNode(1, 1, "test-user", "test-node")
// Create NodeStore // Create NodeStore
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -57,8 +57,6 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) {
// Goroutine 2: DeleteNode (simulates handleLogout for ephemeral node) // Goroutine 2: DeleteNode (simulates handleLogout for ephemeral node)
go func() { go func() {
// Small delay to increase chance of batching together
time.Sleep(1 * time.Millisecond)
store.DeleteNode(node.ID) store.DeleteNode(node.ID)
done <- true done <- true
}() }()
@@ -67,15 +65,11 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) {
<-done <-done
<-done <-done
// Give batching time to complete // Verify node is eventually deleted
time.Sleep(50 * time.Millisecond) require.EventuallyWithT(t, func(c *assert.CollectT) {
_, found = store.GetNode(node.ID)
// The key assertion: if UpdateNode and DeleteNode were batched together assert.False(c, found, "node should be deleted from NodeStore")
// with DELETE after UPDATE, then UpdateNode should return an invalid node }, 1*time.Second, 10*time.Millisecond, "waiting for node to be deleted")
// OR it should return a valid node but the node should no longer exist in the store
_, found = store.GetNode(node.ID)
assert.False(t, found, "node should be deleted from NodeStore")
// If the update happened before delete in the batch, the returned node might be invalid // If the update happened before delete in the batch, the returned node might be invalid
if updateOk { if updateOk {
@@ -95,22 +89,21 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) {
func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) { func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) {
node := createTestNode(2, 1, "test-user", "test-node-2") node := createTestNode(2, 1, "test-user", "test-node-2")
store := NewNodeStore(nil, allowAllPeersFunc) // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together
store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
// Put node in store // Put node in store
_ = store.PutNode(node) _ = store.PutNode(node)
// Simulate the exact sequence: UpdateNode gets queued, then DeleteNode gets queued, // Queue UpdateNode and DeleteNode - with batch size of 2, they will batch together
// they batch together, and we check what UpdateNode returns
resultChan := make(chan struct { resultChan := make(chan struct {
node types.NodeView node types.NodeView
ok bool ok bool
}) })
// Start UpdateNode - it will block until batch is applied // Start UpdateNode in goroutine - it will queue and wait for batch
go func() { go func() {
node, ok := store.UpdateNode(node.ID, func(n *types.Node) { node, ok := store.UpdateNode(node.ID, func(n *types.Node) {
n.LastSeen = ptr.To(time.Now()) n.LastSeen = ptr.To(time.Now())
@@ -121,18 +114,15 @@ func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) {
}{node, ok} }{node, ok}
}() }()
// Give UpdateNode a moment to queue its work // Start DeleteNode in goroutine - it will queue and trigger batch processing
time.Sleep(5 * time.Millisecond) // Since batch size is 2, both operations will be processed together
go func() {
// Now queue DeleteNode - should batch with the UPDATE store.DeleteNode(node.ID)
store.DeleteNode(node.ID) }()
// Get the result from UpdateNode // Get the result from UpdateNode
result := <-resultChan result := <-resultChan
// Wait for batch to complete
time.Sleep(50 * time.Millisecond)
// Node should be deleted // Node should be deleted
_, found := store.GetNode(node.ID) _, found := store.GetNode(node.ID)
assert.False(t, found, "node should be deleted") assert.False(t, found, "node should be deleted")
@@ -157,7 +147,7 @@ func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) {
func TestPersistNodeToDBPreventsRaceCondition(t *testing.T) { func TestPersistNodeToDBPreventsRaceCondition(t *testing.T) {
node := createTestNode(3, 1, "test-user", "test-node-3") node := createTestNode(3, 1, "test-user", "test-node-3")
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -174,12 +164,11 @@ func TestPersistNodeToDBPreventsRaceCondition(t *testing.T) {
// Now delete the node (simulating ephemeral logout happening concurrently) // Now delete the node (simulating ephemeral logout happening concurrently)
store.DeleteNode(node.ID) store.DeleteNode(node.ID)
// Wait for deletion to complete // Verify node is eventually deleted
time.Sleep(50 * time.Millisecond) require.EventuallyWithT(t, func(c *assert.CollectT) {
_, found := store.GetNode(node.ID)
// Verify node is deleted assert.False(c, found, "node should be deleted")
_, found := store.GetNode(node.ID) }, 1*time.Second, 10*time.Millisecond, "waiting for node to be deleted")
require.False(t, found, "node should be deleted")
// Now try to use the updatedNode from before the deletion // Now try to use the updatedNode from before the deletion
// In the old code, this would re-insert the node into the database // In the old code, this would re-insert the node into the database
@@ -213,7 +202,8 @@ func TestEphemeralNodeLogoutRaceCondition(t *testing.T) {
Ephemeral: true, Ephemeral: true,
} }
store := NewNodeStore(nil, allowAllPeersFunc) // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together
store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -238,7 +228,6 @@ func TestEphemeralNodeLogoutRaceCondition(t *testing.T) {
// Goroutine 2: DeleteNode (simulates handleLogout for ephemeral node) // Goroutine 2: DeleteNode (simulates handleLogout for ephemeral node)
go func() { go func() {
time.Sleep(1 * time.Millisecond) // Slight delay to batch operations
store.DeleteNode(ephemeralNode.ID) store.DeleteNode(ephemeralNode.ID)
done <- true done <- true
}() }()
@@ -247,12 +236,11 @@ func TestEphemeralNodeLogoutRaceCondition(t *testing.T) {
<-done <-done
<-done <-done
// Give batching time to complete // Verify node is eventually deleted
time.Sleep(50 * time.Millisecond) require.EventuallyWithT(t, func(c *assert.CollectT) {
_, found := store.GetNode(ephemeralNode.ID)
// Node should be deleted from store assert.False(c, found, "ephemeral node should be deleted from NodeStore")
_, found := store.GetNode(ephemeralNode.ID) }, 1*time.Second, 10*time.Millisecond, "waiting for ephemeral node to be deleted")
assert.False(t, found, "ephemeral node should be deleted from NodeStore")
// Critical assertion: if UpdateNode returned before DeleteNode completed, // Critical assertion: if UpdateNode returned before DeleteNode completed,
// the updatedNode might be valid but the node is actually deleted. // the updatedNode might be valid but the node is actually deleted.
@@ -288,51 +276,57 @@ func TestUpdateNodeFromMapRequestEphemeralLogoutSequence(t *testing.T) {
Ephemeral: true, Ephemeral: true,
} }
store := NewNodeStore(nil, allowAllPeersFunc) // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together
// Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together
store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
// Initial state: ephemeral node exists // Put ephemeral node in store
_ = store.PutNode(ephemeralNode) _ = store.PutNode(ephemeralNode)
// Step 1: UpdateNodeFromMapRequest calls UpdateNode // Step 1: UpdateNodeFromMapRequest calls UpdateNode
// (simulating client sending MapRequest with endpoint updates) // (simulating client sending MapRequest with endpoint updates)
updateStarted := make(chan bool) updateResult := make(chan struct {
var updatedNode types.NodeView node types.NodeView
var updateOk bool ok bool
})
go func() { go func() {
updateStarted <- true node, ok := store.UpdateNode(ephemeralNode.ID, func(n *types.Node) {
updatedNode, updateOk = store.UpdateNode(ephemeralNode.ID, func(n *types.Node) {
n.LastSeen = ptr.To(time.Now()) n.LastSeen = ptr.To(time.Now())
endpoint := netip.MustParseAddrPort("10.0.0.1:41641") endpoint := netip.MustParseAddrPort("10.0.0.1:41641")
n.Endpoints = []netip.AddrPort{endpoint} n.Endpoints = []netip.AddrPort{endpoint}
}) })
updateResult <- struct {
node types.NodeView
ok bool
}{node, ok}
}() }()
<-updateStarted
// Small delay to ensure UpdateNode is queued
time.Sleep(5 * time.Millisecond)
// Step 2: Logout happens - handleLogout calls DeleteNode // Step 2: Logout happens - handleLogout calls DeleteNode
// (simulating client sending logout with past expiry) // With batch size of 2, this will trigger batch processing with UpdateNode
store.DeleteNode(ephemeralNode.ID) go func() {
store.DeleteNode(ephemeralNode.ID)
}()
// Wait for batching to complete // Step 3: Wait and verify node is eventually deleted
time.Sleep(50 * time.Millisecond) require.EventuallyWithT(t, func(c *assert.CollectT) {
_, nodeExists := store.GetNode(ephemeralNode.ID)
assert.False(c, nodeExists, "ephemeral node must be deleted after logout")
}, 1*time.Second, 10*time.Millisecond, "waiting for ephemeral node to be deleted")
// Step 3: Check results // Step 4: Get the update result
_, nodeExists := store.GetNode(ephemeralNode.ID) result := <-updateResult
assert.False(t, nodeExists, "ephemeral node must be deleted after logout")
// Step 4: Simulate what happens if we try to persist the updatedNode // Simulate what happens if we try to persist the updatedNode
if updateOk && updatedNode.Valid() { if result.ok && result.node.Valid() {
// This is the problematic path - UpdateNode returned a valid node // This is the problematic path - UpdateNode returned a valid node
// but the node was deleted in the same batch // but the node was deleted in the same batch
t.Log("UpdateNode returned valid node even though node was deleted") t.Log("UpdateNode returned valid node even though node was deleted")
// The fix: persistNodeToDB must check NodeStore before persisting // The fix: persistNodeToDB must check NodeStore before persisting
_, checkExists := store.GetNode(updatedNode.ID()) _, checkExists := store.GetNode(result.node.ID())
if checkExists { if checkExists {
t.Error("BUG: Node still exists in NodeStore after deletion - should be impossible") t.Error("BUG: Node still exists in NodeStore after deletion - should be impossible")
} else { } else {
@@ -353,14 +347,15 @@ func TestUpdateNodeFromMapRequestEphemeralLogoutSequence(t *testing.T) {
func TestUpdateNodeDeletedInSameBatchReturnsInvalid(t *testing.T) { func TestUpdateNodeDeletedInSameBatchReturnsInvalid(t *testing.T) {
node := createTestNode(6, 1, "test-user", "test-node-6") node := createTestNode(6, 1, "test-user", "test-node-6")
store := NewNodeStore(nil, allowAllPeersFunc) // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together
store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
// Put node in store // Put node in store
_ = store.PutNode(node) _ = store.PutNode(node)
// Queue UpdateNode // Queue UpdateNode and DeleteNode - with batch size of 2, they will batch together
updateDone := make(chan struct { updateDone := make(chan struct {
node types.NodeView node types.NodeView
ok bool ok bool
@@ -376,18 +371,14 @@ func TestUpdateNodeDeletedInSameBatchReturnsInvalid(t *testing.T) {
}{updatedNode, ok} }{updatedNode, ok}
}() }()
// Small delay to ensure UpdateNode is queued // Queue DeleteNode - with batch size of 2, this triggers batch processing
time.Sleep(5 * time.Millisecond) go func() {
store.DeleteNode(node.ID)
// Queue DeleteNode - should batch with UpdateNode }()
store.DeleteNode(node.ID)
// Get UpdateNode result // Get UpdateNode result
result := <-updateDone result := <-updateDone
// Wait for batch to complete
time.Sleep(50 * time.Millisecond)
// Node should be deleted // Node should be deleted
_, exists := store.GetNode(node.ID) _, exists := store.GetNode(node.ID)
assert.False(t, exists, "node should be deleted from store") assert.False(t, exists, "node should be deleted from store")
@@ -417,30 +408,28 @@ func TestPersistNodeToDBChecksNodeStoreBeforePersist(t *testing.T) {
Ephemeral: true, Ephemeral: true,
} }
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
// Put node in store // Put node
_ = store.PutNode(ephemeralNode) _ = store.PutNode(ephemeralNode)
// Simulate the race: // UpdateNode returns a node
// 1. UpdateNode is called (from UpdateNodeFromMapRequest)
updatedNode, ok := store.UpdateNode(ephemeralNode.ID, func(n *types.Node) { updatedNode, ok := store.UpdateNode(ephemeralNode.ID, func(n *types.Node) {
n.LastSeen = ptr.To(time.Now()) n.LastSeen = ptr.To(time.Now())
}) })
require.True(t, ok, "UpdateNode should succeed") require.True(t, ok, "UpdateNode should succeed")
require.True(t, updatedNode.Valid(), "UpdateNode should return valid node") require.True(t, updatedNode.Valid(), "updated node should be valid")
// 2. Node is deleted (from handleLogout for ephemeral node) // Delete the node
store.DeleteNode(ephemeralNode.ID) store.DeleteNode(ephemeralNode.ID)
// Wait for deletion // Verify node is eventually deleted
time.Sleep(50 * time.Millisecond) require.EventuallyWithT(t, func(c *assert.CollectT) {
_, exists := store.GetNode(ephemeralNode.ID)
// 3. Verify node is deleted from store assert.False(c, exists, "node should be deleted from NodeStore")
_, exists := store.GetNode(ephemeralNode.ID) }, 1*time.Second, 10*time.Millisecond, "waiting for node to be deleted")
require.False(t, exists, "node should be deleted from NodeStore")
// 4. Simulate what persistNodeToDB does - check if node still exists // 4. Simulate what persistNodeToDB does - check if node still exists
// The fix in persistNodeToDB checks NodeStore before persisting: // The fix in persistNodeToDB checks NodeStore before persisting:

View File

@@ -14,11 +14,6 @@ import (
"tailscale.com/types/views" "tailscale.com/types/views"
) )
const (
batchSize = 100
batchTimeout = 500 * time.Millisecond
)
const ( const (
put = 1 put = 1
del = 2 del = 2
@@ -92,9 +87,12 @@ type NodeStore struct {
peersFunc PeersFunc peersFunc PeersFunc
writeQueue chan work writeQueue chan work
batchSize int
batchTimeout time.Duration
} }
func NewNodeStore(allNodes types.Nodes, peersFunc PeersFunc) *NodeStore { func NewNodeStore(allNodes types.Nodes, peersFunc PeersFunc, batchSize int, batchTimeout time.Duration) *NodeStore {
nodes := make(map[types.NodeID]types.Node, len(allNodes)) nodes := make(map[types.NodeID]types.Node, len(allNodes))
for _, n := range allNodes { for _, n := range allNodes {
nodes[n.ID] = *n nodes[n.ID] = *n
@@ -102,7 +100,9 @@ func NewNodeStore(allNodes types.Nodes, peersFunc PeersFunc) *NodeStore {
snap := snapshotFromNodes(nodes, peersFunc) snap := snapshotFromNodes(nodes, peersFunc)
store := &NodeStore{ store := &NodeStore{
peersFunc: peersFunc, peersFunc: peersFunc,
batchSize: batchSize,
batchTimeout: batchTimeout,
} }
store.data.Store(&snap) store.data.Store(&snap)
@@ -249,9 +249,10 @@ func (s *NodeStore) Stop() {
// processWrite processes the write queue in batches. // processWrite processes the write queue in batches.
func (s *NodeStore) processWrite() { func (s *NodeStore) processWrite() {
c := time.NewTicker(batchTimeout) c := time.NewTicker(s.batchTimeout)
defer c.Stop() defer c.Stop()
batch := make([]work, 0, batchSize)
batch := make([]work, 0, s.batchSize)
for { for {
select { select {
@@ -264,17 +265,19 @@ func (s *NodeStore) processWrite() {
return return
} }
batch = append(batch, w) batch = append(batch, w)
if len(batch) >= batchSize { if len(batch) >= s.batchSize {
s.applyBatch(batch) s.applyBatch(batch)
batch = batch[:0] batch = batch[:0]
c.Reset(batchTimeout)
c.Reset(s.batchTimeout)
} }
case <-c.C: case <-c.C:
if len(batch) != 0 { if len(batch) != 0 {
s.applyBatch(batch) s.applyBatch(batch)
batch = batch[:0] batch = batch[:0]
} }
c.Reset(batchTimeout)
c.Reset(s.batchTimeout)
} }
} }
} }

View File

@@ -236,7 +236,7 @@ func TestNodeStoreOperations(t *testing.T) {
{ {
name: "create empty store and add single node", name: "create empty store and add single node",
setupFunc: func(t *testing.T) *NodeStore { setupFunc: func(t *testing.T) *NodeStore {
return NewNodeStore(nil, allowAllPeersFunc) return NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -276,7 +276,8 @@ func TestNodeStoreOperations(t *testing.T) {
setupFunc: func(t *testing.T) *NodeStore { setupFunc: func(t *testing.T) *NodeStore {
node1 := createTestNode(1, 1, "user1", "node1") node1 := createTestNode(1, 1, "user1", "node1")
initialNodes := types.Nodes{&node1} initialNodes := types.Nodes{&node1}
return NewNodeStore(initialNodes, allowAllPeersFunc)
return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -346,7 +347,7 @@ func TestNodeStoreOperations(t *testing.T) {
node3 := createTestNode(3, 2, "user2", "node3") node3 := createTestNode(3, 2, "user2", "node3")
initialNodes := types.Nodes{&node1, &node2, &node3} initialNodes := types.Nodes{&node1, &node2, &node3}
return NewNodeStore(initialNodes, allowAllPeersFunc) return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -405,7 +406,8 @@ func TestNodeStoreOperations(t *testing.T) {
node1 := createTestNode(1, 1, "user1", "node1") node1 := createTestNode(1, 1, "user1", "node1")
node2 := createTestNode(2, 1, "user1", "node2") node2 := createTestNode(2, 1, "user1", "node2")
initialNodes := types.Nodes{&node1, &node2} initialNodes := types.Nodes{&node1, &node2}
return NewNodeStore(initialNodes, allowAllPeersFunc)
return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -443,7 +445,7 @@ func TestNodeStoreOperations(t *testing.T) {
{ {
name: "test with odd-even peers filtering", name: "test with odd-even peers filtering",
setupFunc: func(t *testing.T) *NodeStore { setupFunc: func(t *testing.T) *NodeStore {
return NewNodeStore(nil, oddEvenPeersFunc) return NewNodeStore(nil, oddEvenPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -502,7 +504,8 @@ func TestNodeStoreOperations(t *testing.T) {
node1 := createTestNode(1, 1, "user1", "node1") node1 := createTestNode(1, 1, "user1", "node1")
node2 := createTestNode(2, 1, "user1", "node2") node2 := createTestNode(2, 1, "user1", "node2")
initialNodes := types.Nodes{&node1, &node2} initialNodes := types.Nodes{&node1, &node2}
return NewNodeStore(initialNodes, allowAllPeersFunc)
return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -673,7 +676,8 @@ func TestNodeStoreOperations(t *testing.T) {
node1 := createTestNode(1, 1, "user1", "node1") node1 := createTestNode(1, 1, "user1", "node1")
node2 := createTestNode(2, 1, "user1", "node2") node2 := createTestNode(2, 1, "user1", "node2")
initialNodes := types.Nodes{&node1, &node2} initialNodes := types.Nodes{&node1, &node2}
return NewNodeStore(initialNodes, allowAllPeersFunc)
return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
}, },
steps: []testStep{ steps: []testStep{
{ {
@@ -861,7 +865,8 @@ func createConcurrentTestNode(id types.NodeID, hostname string) types.Node {
// --- Concurrency: concurrent PutNode operations --- // --- Concurrency: concurrent PutNode operations ---
func TestNodeStoreConcurrentPutNode(t *testing.T) { func TestNodeStoreConcurrentPutNode(t *testing.T) {
const concurrentOps = 20 const concurrentOps = 20
store := NewNodeStore(nil, allowAllPeersFunc)
store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -892,7 +897,8 @@ func TestNodeStoreConcurrentPutNode(t *testing.T) {
func TestNodeStoreBatchingEfficiency(t *testing.T) { func TestNodeStoreBatchingEfficiency(t *testing.T) {
const batchSize = 10 const batchSize = 10
const ops = 15 // more than batchSize const ops = 15 // more than batchSize
store := NewNodeStore(nil, allowAllPeersFunc)
store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -921,7 +927,7 @@ func TestNodeStoreBatchingEfficiency(t *testing.T) {
// --- Race conditions: many goroutines on same node --- // --- Race conditions: many goroutines on same node ---
func TestNodeStoreRaceConditions(t *testing.T) { func TestNodeStoreRaceConditions(t *testing.T) {
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -979,7 +985,7 @@ func TestNodeStoreRaceConditions(t *testing.T) {
// --- Resource cleanup: goroutine leak detection --- // --- Resource cleanup: goroutine leak detection ---
func TestNodeStoreResourceCleanup(t *testing.T) { func TestNodeStoreResourceCleanup(t *testing.T) {
// initialGoroutines := runtime.NumGoroutine() // initialGoroutines := runtime.NumGoroutine()
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -1011,7 +1017,7 @@ func TestNodeStoreResourceCleanup(t *testing.T) {
// --- Timeout/deadlock: operations complete within reasonable time --- // --- Timeout/deadlock: operations complete within reasonable time ---
func TestNodeStoreOperationTimeout(t *testing.T) { func TestNodeStoreOperationTimeout(t *testing.T) {
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()
@@ -1095,7 +1101,7 @@ func TestNodeStoreOperationTimeout(t *testing.T) {
// --- Edge case: update non-existent node --- // --- Edge case: update non-existent node ---
func TestNodeStoreUpdateNonExistentNode(t *testing.T) { func TestNodeStoreUpdateNonExistentNode(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
nonExistentID := types.NodeID(999 + i) nonExistentID := types.NodeID(999 + i)
updateCallCount := 0 updateCallCount := 0
@@ -1114,7 +1120,7 @@ func TestNodeStoreUpdateNonExistentNode(t *testing.T) {
// --- Allocation benchmark --- // --- Allocation benchmark ---
func BenchmarkNodeStoreAllocations(b *testing.B) { func BenchmarkNodeStoreAllocations(b *testing.B) {
store := NewNodeStore(nil, allowAllPeersFunc) store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout)
store.Start() store.Start()
defer store.Stop() defer store.Stop()

View File

@@ -40,6 +40,14 @@ const (
// registerCacheCleanup defines the interval for cleaning up expired cache entries. // registerCacheCleanup defines the interval for cleaning up expired cache entries.
registerCacheCleanup = time.Minute * 20 registerCacheCleanup = time.Minute * 20
// defaultNodeStoreBatchSize is the default number of write operations to batch
// before rebuilding the in-memory node snapshot.
defaultNodeStoreBatchSize = 100
// defaultNodeStoreBatchTimeout is the default maximum time to wait before
// processing a partial batch of node operations.
defaultNodeStoreBatchTimeout = 500 * time.Millisecond
) )
// ErrUnsupportedPolicyMode is returned for invalid policy modes. Valid modes are "file" and "db". // ErrUnsupportedPolicyMode is returned for invalid policy modes. Valid modes are "file" and "db".
@@ -132,11 +140,27 @@ func NewState(cfg *types.Config) (*State, error) {
return nil, fmt.Errorf("init policy manager: %w", err) return nil, fmt.Errorf("init policy manager: %w", err)
} }
// Apply defaults for NodeStore batch configuration if not set.
// This ensures tests that create Config directly (without viper) still work.
batchSize := cfg.Tuning.NodeStoreBatchSize
if batchSize == 0 {
batchSize = defaultNodeStoreBatchSize
}
batchTimeout := cfg.Tuning.NodeStoreBatchTimeout
if batchTimeout == 0 {
batchTimeout = defaultNodeStoreBatchTimeout
}
// PolicyManager.BuildPeerMap handles both global and per-node filter complexity. // PolicyManager.BuildPeerMap handles both global and per-node filter complexity.
// This moves the complex peer relationship logic into the policy package where it belongs. // This moves the complex peer relationship logic into the policy package where it belongs.
nodeStore := NewNodeStore(nodes, func(nodes []types.NodeView) map[types.NodeID][]types.NodeView { nodeStore := NewNodeStore(
return polMan.BuildPeerMap(views.SliceOf(nodes)) nodes,
}) func(nodes []types.NodeView) map[types.NodeID][]types.NodeView {
return polMan.BuildPeerMap(views.SliceOf(nodes))
},
batchSize,
batchTimeout,
)
nodeStore.Start() nodeStore.Start()
return &State{ return &State{

View File

@@ -0,0 +1,12 @@
package state
import (
"time"
)
// Test configuration for NodeStore batching.
// These values are optimized for test speed rather than production use.
const (
TestBatchSize = 5
TestBatchTimeout = 5 * time.Millisecond
)

View File

@@ -28,6 +28,8 @@ const (
maxDuration time.Duration = 1<<63 - 1 maxDuration time.Duration = 1<<63 - 1
PKCEMethodPlain string = "plain" PKCEMethodPlain string = "plain"
PKCEMethodS256 string = "S256" PKCEMethodS256 string = "S256"
defaultNodeStoreBatchSize = 100
) )
var ( var (
@@ -230,13 +232,63 @@ type LogConfig struct {
Level zerolog.Level Level zerolog.Level
} }
// Tuning contains advanced performance tuning parameters for Headscale.
// These settings control internal batching, timeouts, and resource allocation.
// The defaults are carefully chosen for typical deployments and should rarely
// need adjustment. Changes to these values can significantly impact performance
// and resource usage.
type Tuning struct { type Tuning struct {
NotifierSendTimeout time.Duration // NotifierSendTimeout is the maximum time to wait when sending notifications
BatchChangeDelay time.Duration // to connected clients about network changes.
NotifierSendTimeout time.Duration
// BatchChangeDelay controls how long to wait before sending batched updates
// to clients when multiple changes occur in rapid succession.
BatchChangeDelay time.Duration
// NodeMapSessionBufferedChanSize sets the buffer size for the channel that
// queues map updates to be sent to connected clients.
NodeMapSessionBufferedChanSize int NodeMapSessionBufferedChanSize int
BatcherWorkers int
RegisterCacheCleanup time.Duration // BatcherWorkers controls the number of parallel workers processing map
RegisterCacheExpiration time.Duration // updates for connected clients.
BatcherWorkers int
// RegisterCacheCleanup is the interval between cleanup operations for
// expired registration cache entries.
RegisterCacheCleanup time.Duration
// RegisterCacheExpiration is how long registration cache entries remain
// valid before being eligible for cleanup.
RegisterCacheExpiration time.Duration
// NodeStoreBatchSize controls how many write operations are accumulated
// before rebuilding the in-memory node snapshot.
//
// The NodeStore batches write operations (add/update/delete nodes) before
// rebuilding its in-memory data structures. Rebuilding involves recalculating
// peer relationships between all nodes based on the current ACL policy, which
// is computationally expensive and scales with the square of the number of nodes.
//
// By batching writes, Headscale can process N operations but only rebuild once,
// rather than rebuilding N times. This significantly reduces CPU usage during
// bulk operations like initial sync or policy updates.
//
// Trade-off: Higher values reduce CPU usage from rebuilds but increase latency
// for individual operations waiting for their batch to complete.
NodeStoreBatchSize int
// NodeStoreBatchTimeout is the maximum time to wait before processing a
// partial batch of node operations.
//
// When NodeStoreBatchSize operations haven't accumulated, this timeout ensures
// writes don't wait indefinitely. The batch processes when either the size
// threshold is reached OR this timeout expires, whichever comes first.
//
// Trade-off: Lower values provide faster response for individual operations
// but trigger more frequent (expensive) peer map rebuilds. Higher values
// optimize for bulk throughput at the cost of individual operation latency.
NodeStoreBatchTimeout time.Duration
} }
func validatePKCEMethod(method string) error { func validatePKCEMethod(method string) error {
@@ -336,6 +388,8 @@ func LoadConfig(path string, isFile bool) error {
viper.SetDefault("tuning.notifier_send_timeout", "800ms") viper.SetDefault("tuning.notifier_send_timeout", "800ms")
viper.SetDefault("tuning.batch_change_delay", "800ms") viper.SetDefault("tuning.batch_change_delay", "800ms")
viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30) viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30)
viper.SetDefault("tuning.node_store_batch_size", defaultNodeStoreBatchSize)
viper.SetDefault("tuning.node_store_batch_timeout", "500ms")
viper.SetDefault("prefixes.allocation", string(IPAllocationStrategySequential)) viper.SetDefault("prefixes.allocation", string(IPAllocationStrategySequential))
@@ -437,6 +491,21 @@ func validateServerConfig() error {
} }
} }
// Validate tuning parameters
if size := viper.GetInt("tuning.node_store_batch_size"); size <= 0 {
errorText += fmt.Sprintf(
"Fatal config error: tuning.node_store_batch_size must be positive, got %d\n",
size,
)
}
if timeout := viper.GetDuration("tuning.node_store_batch_timeout"); timeout <= 0 {
errorText += fmt.Sprintf(
"Fatal config error: tuning.node_store_batch_timeout must be positive, got %s\n",
timeout,
)
}
if errorText != "" { if errorText != "" {
// nolint // nolint
return errors.New(strings.TrimSuffix(errorText, "\n")) return errors.New(strings.TrimSuffix(errorText, "\n"))
@@ -991,7 +1060,6 @@ func LoadServerConfig() (*Config, error) {
Log: logConfig, Log: logConfig,
// TODO(kradalby): Document these settings when more stable
Tuning: Tuning{ Tuning: Tuning{
NotifierSendTimeout: viper.GetDuration("tuning.notifier_send_timeout"), NotifierSendTimeout: viper.GetDuration("tuning.notifier_send_timeout"),
BatchChangeDelay: viper.GetDuration("tuning.batch_change_delay"), BatchChangeDelay: viper.GetDuration("tuning.batch_change_delay"),
@@ -1006,6 +1074,8 @@ func LoadServerConfig() (*Config, error) {
}(), }(),
RegisterCacheCleanup: viper.GetDuration("tuning.register_cache_cleanup"), RegisterCacheCleanup: viper.GetDuration("tuning.register_cache_cleanup"),
RegisterCacheExpiration: viper.GetDuration("tuning.register_cache_expiration"), RegisterCacheExpiration: viper.GetDuration("tuning.register_cache_expiration"),
NodeStoreBatchSize: viper.GetInt("tuning.node_store_batch_size"),
NodeStoreBatchTimeout: viper.GetDuration("tuning.node_store_batch_timeout"),
}, },
}, nil }, nil
} }