From db293e069857eef070cf7b8d37783f823824ca1c Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Fri, 28 Nov 2025 16:38:29 +0100 Subject: [PATCH] hscontrol/state: make NodeStore batch configuration tunable (#2886) --- config-example.yaml | 14 ++- hscontrol/db/node.go | 22 ++-- hscontrol/mapper/batcher_test.go | 23 +++-- hscontrol/state/debug_test.go | 6 +- hscontrol/state/endpoint_test.go | 2 +- hscontrol/state/ephemeral_test.go | 155 ++++++++++++++--------------- hscontrol/state/node_store.go | 27 ++--- hscontrol/state/node_store_test.go | 34 ++++--- hscontrol/state/state.go | 30 +++++- hscontrol/state/test_helpers.go | 12 +++ hscontrol/types/config.go | 82 +++++++++++++-- 11 files changed, 267 insertions(+), 140 deletions(-) create mode 100644 hscontrol/state/test_helpers.go diff --git a/config-example.yaml b/config-example.yaml index ec14dc03..7a60529e 100644 --- a/config-example.yaml +++ b/config-example.yaml @@ -295,7 +295,8 @@ dns: # Split DNS (see https://tailscale.com/kb/1054/dns/), # a map of domains and which DNS server to use for each. - split: {} + split: + {} # foo.bar.com: # - 1.1.1.1 # darp.headscale.net: @@ -407,3 +408,14 @@ logtail: # default static port 41641. This option is intended as a workaround for some buggy # firewall devices. See https://tailscale.com/kb/1181/firewalls/ for more information. randomize_client_port: false +# Advanced performance tuning parameters. +# The defaults are carefully chosen and should rarely need adjustment. +# Only modify these if you have identified a specific performance issue. +# +# tuning: +# # NodeStore write batching configuration. +# # The NodeStore batches write operations before rebuilding peer relationships, +# # which is computationally expensive. Batching reduces rebuild frequency. +# # +# # node_store_batch_size: 100 +# # node_store_batch_timeout: 500ms diff --git a/hscontrol/db/node.go b/hscontrol/db/node.go index 060196a9..cfeefb82 100644 --- a/hscontrol/db/node.go +++ b/hscontrol/db/node.go @@ -749,15 +749,25 @@ func (hsdb *HSDatabase) allocateTestIPs(nodeID types.NodeID) (*netip.Addr, *neti } // Use simple sequential allocation for tests - // IPv4: 100.64.0.x (where x is nodeID) - // IPv6: fd7a:115c:a1e0::x (where x is nodeID) + // IPv4: 100.64.x.y (where x = nodeID/256, y = nodeID%256) + // IPv6: fd7a:115c:a1e0::x:y (where x = high byte, y = low byte) + // This supports up to 65535 nodes + const ( + maxTestNodes = 65535 + ipv4ByteDivisor = 256 + ) - if nodeID > 254 { - return nil, nil, fmt.Errorf("test node ID %d too large for simple IP allocation", nodeID) + if nodeID > maxTestNodes { + return nil, nil, ErrCouldNotAllocateIP } - ipv4 := netip.AddrFrom4([4]byte{100, 64, 0, byte(nodeID)}) - ipv6 := netip.AddrFrom16([16]byte{0xfd, 0x7a, 0x11, 0x5c, 0xa1, 0xe0, 0, 0, 0, 0, 0, 0, 0, 0, 0, byte(nodeID)}) + // Split nodeID into high and low bytes for IPv4 (100.64.high.low) + highByte := byte(nodeID / ipv4ByteDivisor) + lowByte := byte(nodeID % ipv4ByteDivisor) + ipv4 := netip.AddrFrom4([4]byte{100, 64, highByte, lowByte}) + + // For IPv6, use the last two bytes of the address (fd7a:115c:a1e0::high:low) + ipv6 := netip.AddrFrom16([16]byte{0xfd, 0x7a, 0x11, 0x5c, 0xa1, 0xe0, 0, 0, 0, 0, 0, 0, 0, 0, highByte, lowByte}) return &ipv4, &ipv6, nil } diff --git a/hscontrol/mapper/batcher_test.go b/hscontrol/mapper/batcher_test.go index a132e026..24910947 100644 --- a/hscontrol/mapper/batcher_test.go +++ b/hscontrol/mapper/batcher_test.go @@ -203,8 +203,10 @@ func setupBatcherWithTestData( }, }, Tuning: types.Tuning{ - BatchChangeDelay: 10 * time.Millisecond, - BatcherWorkers: types.DefaultBatcherWorkers(), // Use same logic as config.go + BatchChangeDelay: 10 * time.Millisecond, + BatcherWorkers: types.DefaultBatcherWorkers(), // Use same logic as config.go + NodeStoreBatchSize: state.TestBatchSize, + NodeStoreBatchTimeout: state.TestBatchTimeout, }, } @@ -572,14 +574,12 @@ func TestBatcherScalabilityAllToAll(t *testing.T) { name string nodeCount int }{ - {"10_nodes", 10}, - {"50_nodes", 50}, - {"100_nodes", 100}, - // Grinds to a halt because of Database bottleneck - // {"250_nodes", 250}, - // {"500_nodes", 500}, - // {"1000_nodes", 1000}, - // {"5000_nodes", 5000}, + {"10_nodes", 10}, // Quick baseline test + {"100_nodes", 100}, // Full scalability test ~2 minutes + // Large-scale tests commented out - uncomment for scalability testing + // {"1000_nodes", 1000}, // ~12 minutes + // {"2000_nodes", 2000}, // ~60+ minutes + // {"5000_nodes", 5000}, // Not recommended - database bottleneck } for _, batcherFunc := range allBatcherFunctions { @@ -600,7 +600,8 @@ func TestBatcherScalabilityAllToAll(t *testing.T) { // Use large buffer to avoid blocking during rapid joins // Buffer needs to handle nodeCount * average_updates_per_node // Estimate: each node receives ~2*nodeCount updates during all-to-all - bufferSize := max(1000, tc.nodeCount*2) + // For very large tests (>1000 nodes), limit buffer to avoid excessive memory + bufferSize := max(1000, min(tc.nodeCount*2, 10000)) testData, cleanup := setupBatcherWithTestData( t, diff --git a/hscontrol/state/debug_test.go b/hscontrol/state/debug_test.go index 60d77245..6fd528a8 100644 --- a/hscontrol/state/debug_test.go +++ b/hscontrol/state/debug_test.go @@ -15,7 +15,7 @@ func TestNodeStoreDebugString(t *testing.T) { { name: "empty nodestore", setupFn: func() *NodeStore { - return NewNodeStore(nil, allowAllPeersFunc) + return NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, contains: []string{ "=== NodeStore Debug Information ===", @@ -30,7 +30,7 @@ func TestNodeStoreDebugString(t *testing.T) { node1 := createTestNode(1, 1, "user1", "node1") node2 := createTestNode(2, 2, "user2", "node2") - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() _ = store.PutNode(node1) @@ -66,7 +66,7 @@ func TestNodeStoreDebugString(t *testing.T) { func TestDebugRegistrationCache(t *testing.T) { // Create a minimal NodeStore for testing debug methods - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) debugStr := store.DebugString() diff --git a/hscontrol/state/endpoint_test.go b/hscontrol/state/endpoint_test.go index a406fb2b..b8905ab7 100644 --- a/hscontrol/state/endpoint_test.go +++ b/hscontrol/state/endpoint_test.go @@ -19,7 +19,7 @@ func TestEndpointStorageInNodeStore(t *testing.T) { node2 := createTestNode(2, 1, "test-user", "node2") // Create NodeStore with allow-all peers function - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() diff --git a/hscontrol/state/ephemeral_test.go b/hscontrol/state/ephemeral_test.go index e3acc9b9..632af13c 100644 --- a/hscontrol/state/ephemeral_test.go +++ b/hscontrol/state/ephemeral_test.go @@ -20,7 +20,7 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) { node := createTestNode(1, 1, "test-user", "test-node") // Create NodeStore - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -57,8 +57,6 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) { // Goroutine 2: DeleteNode (simulates handleLogout for ephemeral node) go func() { - // Small delay to increase chance of batching together - time.Sleep(1 * time.Millisecond) store.DeleteNode(node.ID) done <- true }() @@ -67,15 +65,11 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) { <-done <-done - // Give batching time to complete - time.Sleep(50 * time.Millisecond) - - // The key assertion: if UpdateNode and DeleteNode were batched together - // with DELETE after UPDATE, then UpdateNode should return an invalid node - // OR it should return a valid node but the node should no longer exist in the store - - _, found = store.GetNode(node.ID) - assert.False(t, found, "node should be deleted from NodeStore") + // Verify node is eventually deleted + require.EventuallyWithT(t, func(c *assert.CollectT) { + _, found = store.GetNode(node.ID) + assert.False(c, found, "node should be deleted from NodeStore") + }, 1*time.Second, 10*time.Millisecond, "waiting for node to be deleted") // If the update happened before delete in the batch, the returned node might be invalid if updateOk { @@ -95,22 +89,21 @@ func TestEphemeralNodeDeleteWithConcurrentUpdate(t *testing.T) { func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) { node := createTestNode(2, 1, "test-user", "test-node-2") - store := NewNodeStore(nil, allowAllPeersFunc) + // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together + store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout) store.Start() defer store.Stop() // Put node in store _ = store.PutNode(node) - // Simulate the exact sequence: UpdateNode gets queued, then DeleteNode gets queued, - // they batch together, and we check what UpdateNode returns - + // Queue UpdateNode and DeleteNode - with batch size of 2, they will batch together resultChan := make(chan struct { node types.NodeView ok bool }) - // Start UpdateNode - it will block until batch is applied + // Start UpdateNode in goroutine - it will queue and wait for batch go func() { node, ok := store.UpdateNode(node.ID, func(n *types.Node) { n.LastSeen = ptr.To(time.Now()) @@ -121,18 +114,15 @@ func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) { }{node, ok} }() - // Give UpdateNode a moment to queue its work - time.Sleep(5 * time.Millisecond) - - // Now queue DeleteNode - should batch with the UPDATE - store.DeleteNode(node.ID) + // Start DeleteNode in goroutine - it will queue and trigger batch processing + // Since batch size is 2, both operations will be processed together + go func() { + store.DeleteNode(node.ID) + }() // Get the result from UpdateNode result := <-resultChan - // Wait for batch to complete - time.Sleep(50 * time.Millisecond) - // Node should be deleted _, found := store.GetNode(node.ID) assert.False(t, found, "node should be deleted") @@ -157,7 +147,7 @@ func TestUpdateNodeReturnsInvalidWhenDeletedInSameBatch(t *testing.T) { func TestPersistNodeToDBPreventsRaceCondition(t *testing.T) { node := createTestNode(3, 1, "test-user", "test-node-3") - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -174,12 +164,11 @@ func TestPersistNodeToDBPreventsRaceCondition(t *testing.T) { // Now delete the node (simulating ephemeral logout happening concurrently) store.DeleteNode(node.ID) - // Wait for deletion to complete - time.Sleep(50 * time.Millisecond) - - // Verify node is deleted - _, found := store.GetNode(node.ID) - require.False(t, found, "node should be deleted") + // Verify node is eventually deleted + require.EventuallyWithT(t, func(c *assert.CollectT) { + _, found := store.GetNode(node.ID) + assert.False(c, found, "node should be deleted") + }, 1*time.Second, 10*time.Millisecond, "waiting for node to be deleted") // Now try to use the updatedNode from before the deletion // In the old code, this would re-insert the node into the database @@ -213,7 +202,8 @@ func TestEphemeralNodeLogoutRaceCondition(t *testing.T) { Ephemeral: true, } - store := NewNodeStore(nil, allowAllPeersFunc) + // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together + store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout) store.Start() defer store.Stop() @@ -238,7 +228,6 @@ func TestEphemeralNodeLogoutRaceCondition(t *testing.T) { // Goroutine 2: DeleteNode (simulates handleLogout for ephemeral node) go func() { - time.Sleep(1 * time.Millisecond) // Slight delay to batch operations store.DeleteNode(ephemeralNode.ID) done <- true }() @@ -247,12 +236,11 @@ func TestEphemeralNodeLogoutRaceCondition(t *testing.T) { <-done <-done - // Give batching time to complete - time.Sleep(50 * time.Millisecond) - - // Node should be deleted from store - _, found := store.GetNode(ephemeralNode.ID) - assert.False(t, found, "ephemeral node should be deleted from NodeStore") + // Verify node is eventually deleted + require.EventuallyWithT(t, func(c *assert.CollectT) { + _, found := store.GetNode(ephemeralNode.ID) + assert.False(c, found, "ephemeral node should be deleted from NodeStore") + }, 1*time.Second, 10*time.Millisecond, "waiting for ephemeral node to be deleted") // Critical assertion: if UpdateNode returned before DeleteNode completed, // the updatedNode might be valid but the node is actually deleted. @@ -288,51 +276,57 @@ func TestUpdateNodeFromMapRequestEphemeralLogoutSequence(t *testing.T) { Ephemeral: true, } - store := NewNodeStore(nil, allowAllPeersFunc) + // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together + // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together + store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout) store.Start() defer store.Stop() - // Initial state: ephemeral node exists + // Put ephemeral node in store _ = store.PutNode(ephemeralNode) // Step 1: UpdateNodeFromMapRequest calls UpdateNode // (simulating client sending MapRequest with endpoint updates) - updateStarted := make(chan bool) - var updatedNode types.NodeView - var updateOk bool + updateResult := make(chan struct { + node types.NodeView + ok bool + }) go func() { - updateStarted <- true - updatedNode, updateOk = store.UpdateNode(ephemeralNode.ID, func(n *types.Node) { + node, ok := store.UpdateNode(ephemeralNode.ID, func(n *types.Node) { n.LastSeen = ptr.To(time.Now()) endpoint := netip.MustParseAddrPort("10.0.0.1:41641") n.Endpoints = []netip.AddrPort{endpoint} }) + updateResult <- struct { + node types.NodeView + ok bool + }{node, ok} }() - <-updateStarted - // Small delay to ensure UpdateNode is queued - time.Sleep(5 * time.Millisecond) - // Step 2: Logout happens - handleLogout calls DeleteNode - // (simulating client sending logout with past expiry) - store.DeleteNode(ephemeralNode.ID) + // With batch size of 2, this will trigger batch processing with UpdateNode + go func() { + store.DeleteNode(ephemeralNode.ID) + }() - // Wait for batching to complete - time.Sleep(50 * time.Millisecond) + // Step 3: Wait and verify node is eventually deleted + require.EventuallyWithT(t, func(c *assert.CollectT) { + _, nodeExists := store.GetNode(ephemeralNode.ID) + assert.False(c, nodeExists, "ephemeral node must be deleted after logout") + }, 1*time.Second, 10*time.Millisecond, "waiting for ephemeral node to be deleted") - // Step 3: Check results - _, nodeExists := store.GetNode(ephemeralNode.ID) - assert.False(t, nodeExists, "ephemeral node must be deleted after logout") + // Step 4: Get the update result + result := <-updateResult - // Step 4: Simulate what happens if we try to persist the updatedNode - if updateOk && updatedNode.Valid() { + // Simulate what happens if we try to persist the updatedNode + if result.ok && result.node.Valid() { // This is the problematic path - UpdateNode returned a valid node // but the node was deleted in the same batch t.Log("UpdateNode returned valid node even though node was deleted") // The fix: persistNodeToDB must check NodeStore before persisting - _, checkExists := store.GetNode(updatedNode.ID()) + _, checkExists := store.GetNode(result.node.ID()) if checkExists { t.Error("BUG: Node still exists in NodeStore after deletion - should be impossible") } else { @@ -353,14 +347,15 @@ func TestUpdateNodeFromMapRequestEphemeralLogoutSequence(t *testing.T) { func TestUpdateNodeDeletedInSameBatchReturnsInvalid(t *testing.T) { node := createTestNode(6, 1, "test-user", "test-node-6") - store := NewNodeStore(nil, allowAllPeersFunc) + // Use batch size of 2 to guarantee UpdateNode and DeleteNode batch together + store := NewNodeStore(nil, allowAllPeersFunc, 2, TestBatchTimeout) store.Start() defer store.Stop() // Put node in store _ = store.PutNode(node) - // Queue UpdateNode + // Queue UpdateNode and DeleteNode - with batch size of 2, they will batch together updateDone := make(chan struct { node types.NodeView ok bool @@ -376,18 +371,14 @@ func TestUpdateNodeDeletedInSameBatchReturnsInvalid(t *testing.T) { }{updatedNode, ok} }() - // Small delay to ensure UpdateNode is queued - time.Sleep(5 * time.Millisecond) - - // Queue DeleteNode - should batch with UpdateNode - store.DeleteNode(node.ID) + // Queue DeleteNode - with batch size of 2, this triggers batch processing + go func() { + store.DeleteNode(node.ID) + }() // Get UpdateNode result result := <-updateDone - // Wait for batch to complete - time.Sleep(50 * time.Millisecond) - // Node should be deleted _, exists := store.GetNode(node.ID) assert.False(t, exists, "node should be deleted from store") @@ -417,30 +408,28 @@ func TestPersistNodeToDBChecksNodeStoreBeforePersist(t *testing.T) { Ephemeral: true, } - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() - // Put node in store + // Put node _ = store.PutNode(ephemeralNode) - // Simulate the race: - // 1. UpdateNode is called (from UpdateNodeFromMapRequest) + // UpdateNode returns a node updatedNode, ok := store.UpdateNode(ephemeralNode.ID, func(n *types.Node) { n.LastSeen = ptr.To(time.Now()) }) require.True(t, ok, "UpdateNode should succeed") - require.True(t, updatedNode.Valid(), "UpdateNode should return valid node") + require.True(t, updatedNode.Valid(), "updated node should be valid") - // 2. Node is deleted (from handleLogout for ephemeral node) + // Delete the node store.DeleteNode(ephemeralNode.ID) - // Wait for deletion - time.Sleep(50 * time.Millisecond) - - // 3. Verify node is deleted from store - _, exists := store.GetNode(ephemeralNode.ID) - require.False(t, exists, "node should be deleted from NodeStore") + // Verify node is eventually deleted + require.EventuallyWithT(t, func(c *assert.CollectT) { + _, exists := store.GetNode(ephemeralNode.ID) + assert.False(c, exists, "node should be deleted from NodeStore") + }, 1*time.Second, 10*time.Millisecond, "waiting for node to be deleted") // 4. Simulate what persistNodeToDB does - check if node still exists // The fix in persistNodeToDB checks NodeStore before persisting: diff --git a/hscontrol/state/node_store.go b/hscontrol/state/node_store.go index a06151a5..adcc410a 100644 --- a/hscontrol/state/node_store.go +++ b/hscontrol/state/node_store.go @@ -14,11 +14,6 @@ import ( "tailscale.com/types/views" ) -const ( - batchSize = 100 - batchTimeout = 500 * time.Millisecond -) - const ( put = 1 del = 2 @@ -92,9 +87,12 @@ type NodeStore struct { peersFunc PeersFunc writeQueue chan work + + batchSize int + batchTimeout time.Duration } -func NewNodeStore(allNodes types.Nodes, peersFunc PeersFunc) *NodeStore { +func NewNodeStore(allNodes types.Nodes, peersFunc PeersFunc, batchSize int, batchTimeout time.Duration) *NodeStore { nodes := make(map[types.NodeID]types.Node, len(allNodes)) for _, n := range allNodes { nodes[n.ID] = *n @@ -102,7 +100,9 @@ func NewNodeStore(allNodes types.Nodes, peersFunc PeersFunc) *NodeStore { snap := snapshotFromNodes(nodes, peersFunc) store := &NodeStore{ - peersFunc: peersFunc, + peersFunc: peersFunc, + batchSize: batchSize, + batchTimeout: batchTimeout, } store.data.Store(&snap) @@ -249,9 +249,10 @@ func (s *NodeStore) Stop() { // processWrite processes the write queue in batches. func (s *NodeStore) processWrite() { - c := time.NewTicker(batchTimeout) + c := time.NewTicker(s.batchTimeout) defer c.Stop() - batch := make([]work, 0, batchSize) + + batch := make([]work, 0, s.batchSize) for { select { @@ -264,17 +265,19 @@ func (s *NodeStore) processWrite() { return } batch = append(batch, w) - if len(batch) >= batchSize { + if len(batch) >= s.batchSize { s.applyBatch(batch) batch = batch[:0] - c.Reset(batchTimeout) + + c.Reset(s.batchTimeout) } case <-c.C: if len(batch) != 0 { s.applyBatch(batch) batch = batch[:0] } - c.Reset(batchTimeout) + + c.Reset(s.batchTimeout) } } } diff --git a/hscontrol/state/node_store_test.go b/hscontrol/state/node_store_test.go index 64ee0406..79f3b1e0 100644 --- a/hscontrol/state/node_store_test.go +++ b/hscontrol/state/node_store_test.go @@ -236,7 +236,7 @@ func TestNodeStoreOperations(t *testing.T) { { name: "create empty store and add single node", setupFunc: func(t *testing.T) *NodeStore { - return NewNodeStore(nil, allowAllPeersFunc) + return NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -276,7 +276,8 @@ func TestNodeStoreOperations(t *testing.T) { setupFunc: func(t *testing.T) *NodeStore { node1 := createTestNode(1, 1, "user1", "node1") initialNodes := types.Nodes{&node1} - return NewNodeStore(initialNodes, allowAllPeersFunc) + + return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -346,7 +347,7 @@ func TestNodeStoreOperations(t *testing.T) { node3 := createTestNode(3, 2, "user2", "node3") initialNodes := types.Nodes{&node1, &node2, &node3} - return NewNodeStore(initialNodes, allowAllPeersFunc) + return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -405,7 +406,8 @@ func TestNodeStoreOperations(t *testing.T) { node1 := createTestNode(1, 1, "user1", "node1") node2 := createTestNode(2, 1, "user1", "node2") initialNodes := types.Nodes{&node1, &node2} - return NewNodeStore(initialNodes, allowAllPeersFunc) + + return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -443,7 +445,7 @@ func TestNodeStoreOperations(t *testing.T) { { name: "test with odd-even peers filtering", setupFunc: func(t *testing.T) *NodeStore { - return NewNodeStore(nil, oddEvenPeersFunc) + return NewNodeStore(nil, oddEvenPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -502,7 +504,8 @@ func TestNodeStoreOperations(t *testing.T) { node1 := createTestNode(1, 1, "user1", "node1") node2 := createTestNode(2, 1, "user1", "node2") initialNodes := types.Nodes{&node1, &node2} - return NewNodeStore(initialNodes, allowAllPeersFunc) + + return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -673,7 +676,8 @@ func TestNodeStoreOperations(t *testing.T) { node1 := createTestNode(1, 1, "user1", "node1") node2 := createTestNode(2, 1, "user1", "node2") initialNodes := types.Nodes{&node1, &node2} - return NewNodeStore(initialNodes, allowAllPeersFunc) + + return NewNodeStore(initialNodes, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) }, steps: []testStep{ { @@ -861,7 +865,8 @@ func createConcurrentTestNode(id types.NodeID, hostname string) types.Node { // --- Concurrency: concurrent PutNode operations --- func TestNodeStoreConcurrentPutNode(t *testing.T) { const concurrentOps = 20 - store := NewNodeStore(nil, allowAllPeersFunc) + + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -892,7 +897,8 @@ func TestNodeStoreConcurrentPutNode(t *testing.T) { func TestNodeStoreBatchingEfficiency(t *testing.T) { const batchSize = 10 const ops = 15 // more than batchSize - store := NewNodeStore(nil, allowAllPeersFunc) + + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -921,7 +927,7 @@ func TestNodeStoreBatchingEfficiency(t *testing.T) { // --- Race conditions: many goroutines on same node --- func TestNodeStoreRaceConditions(t *testing.T) { - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -979,7 +985,7 @@ func TestNodeStoreRaceConditions(t *testing.T) { // --- Resource cleanup: goroutine leak detection --- func TestNodeStoreResourceCleanup(t *testing.T) { // initialGoroutines := runtime.NumGoroutine() - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -1011,7 +1017,7 @@ func TestNodeStoreResourceCleanup(t *testing.T) { // --- Timeout/deadlock: operations complete within reasonable time --- func TestNodeStoreOperationTimeout(t *testing.T) { - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() @@ -1095,7 +1101,7 @@ func TestNodeStoreOperationTimeout(t *testing.T) { // --- Edge case: update non-existent node --- func TestNodeStoreUpdateNonExistentNode(t *testing.T) { for i := 0; i < 10; i++ { - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() nonExistentID := types.NodeID(999 + i) updateCallCount := 0 @@ -1114,7 +1120,7 @@ func TestNodeStoreUpdateNonExistentNode(t *testing.T) { // --- Allocation benchmark --- func BenchmarkNodeStoreAllocations(b *testing.B) { - store := NewNodeStore(nil, allowAllPeersFunc) + store := NewNodeStore(nil, allowAllPeersFunc, TestBatchSize, TestBatchTimeout) store.Start() defer store.Stop() diff --git a/hscontrol/state/state.go b/hscontrol/state/state.go index 2e361f46..e044241f 100644 --- a/hscontrol/state/state.go +++ b/hscontrol/state/state.go @@ -40,6 +40,14 @@ const ( // registerCacheCleanup defines the interval for cleaning up expired cache entries. registerCacheCleanup = time.Minute * 20 + + // defaultNodeStoreBatchSize is the default number of write operations to batch + // before rebuilding the in-memory node snapshot. + defaultNodeStoreBatchSize = 100 + + // defaultNodeStoreBatchTimeout is the default maximum time to wait before + // processing a partial batch of node operations. + defaultNodeStoreBatchTimeout = 500 * time.Millisecond ) // ErrUnsupportedPolicyMode is returned for invalid policy modes. Valid modes are "file" and "db". @@ -132,11 +140,27 @@ func NewState(cfg *types.Config) (*State, error) { return nil, fmt.Errorf("init policy manager: %w", err) } + // Apply defaults for NodeStore batch configuration if not set. + // This ensures tests that create Config directly (without viper) still work. + batchSize := cfg.Tuning.NodeStoreBatchSize + if batchSize == 0 { + batchSize = defaultNodeStoreBatchSize + } + batchTimeout := cfg.Tuning.NodeStoreBatchTimeout + if batchTimeout == 0 { + batchTimeout = defaultNodeStoreBatchTimeout + } + // PolicyManager.BuildPeerMap handles both global and per-node filter complexity. // This moves the complex peer relationship logic into the policy package where it belongs. - nodeStore := NewNodeStore(nodes, func(nodes []types.NodeView) map[types.NodeID][]types.NodeView { - return polMan.BuildPeerMap(views.SliceOf(nodes)) - }) + nodeStore := NewNodeStore( + nodes, + func(nodes []types.NodeView) map[types.NodeID][]types.NodeView { + return polMan.BuildPeerMap(views.SliceOf(nodes)) + }, + batchSize, + batchTimeout, + ) nodeStore.Start() return &State{ diff --git a/hscontrol/state/test_helpers.go b/hscontrol/state/test_helpers.go new file mode 100644 index 00000000..95203106 --- /dev/null +++ b/hscontrol/state/test_helpers.go @@ -0,0 +1,12 @@ +package state + +import ( + "time" +) + +// Test configuration for NodeStore batching. +// These values are optimized for test speed rather than production use. +const ( + TestBatchSize = 5 + TestBatchTimeout = 5 * time.Millisecond +) diff --git a/hscontrol/types/config.go b/hscontrol/types/config.go index 732b4d5a..dfc3498d 100644 --- a/hscontrol/types/config.go +++ b/hscontrol/types/config.go @@ -28,6 +28,8 @@ const ( maxDuration time.Duration = 1<<63 - 1 PKCEMethodPlain string = "plain" PKCEMethodS256 string = "S256" + + defaultNodeStoreBatchSize = 100 ) var ( @@ -230,13 +232,63 @@ type LogConfig struct { Level zerolog.Level } +// Tuning contains advanced performance tuning parameters for Headscale. +// These settings control internal batching, timeouts, and resource allocation. +// The defaults are carefully chosen for typical deployments and should rarely +// need adjustment. Changes to these values can significantly impact performance +// and resource usage. type Tuning struct { - NotifierSendTimeout time.Duration - BatchChangeDelay time.Duration + // NotifierSendTimeout is the maximum time to wait when sending notifications + // to connected clients about network changes. + NotifierSendTimeout time.Duration + + // BatchChangeDelay controls how long to wait before sending batched updates + // to clients when multiple changes occur in rapid succession. + BatchChangeDelay time.Duration + + // NodeMapSessionBufferedChanSize sets the buffer size for the channel that + // queues map updates to be sent to connected clients. NodeMapSessionBufferedChanSize int - BatcherWorkers int - RegisterCacheCleanup time.Duration - RegisterCacheExpiration time.Duration + + // BatcherWorkers controls the number of parallel workers processing map + // updates for connected clients. + BatcherWorkers int + + // RegisterCacheCleanup is the interval between cleanup operations for + // expired registration cache entries. + RegisterCacheCleanup time.Duration + + // RegisterCacheExpiration is how long registration cache entries remain + // valid before being eligible for cleanup. + RegisterCacheExpiration time.Duration + + // NodeStoreBatchSize controls how many write operations are accumulated + // before rebuilding the in-memory node snapshot. + // + // The NodeStore batches write operations (add/update/delete nodes) before + // rebuilding its in-memory data structures. Rebuilding involves recalculating + // peer relationships between all nodes based on the current ACL policy, which + // is computationally expensive and scales with the square of the number of nodes. + // + // By batching writes, Headscale can process N operations but only rebuild once, + // rather than rebuilding N times. This significantly reduces CPU usage during + // bulk operations like initial sync or policy updates. + // + // Trade-off: Higher values reduce CPU usage from rebuilds but increase latency + // for individual operations waiting for their batch to complete. + NodeStoreBatchSize int + + // NodeStoreBatchTimeout is the maximum time to wait before processing a + // partial batch of node operations. + // + // When NodeStoreBatchSize operations haven't accumulated, this timeout ensures + // writes don't wait indefinitely. The batch processes when either the size + // threshold is reached OR this timeout expires, whichever comes first. + // + // Trade-off: Lower values provide faster response for individual operations + // but trigger more frequent (expensive) peer map rebuilds. Higher values + // optimize for bulk throughput at the cost of individual operation latency. + NodeStoreBatchTimeout time.Duration } func validatePKCEMethod(method string) error { @@ -336,6 +388,8 @@ func LoadConfig(path string, isFile bool) error { viper.SetDefault("tuning.notifier_send_timeout", "800ms") viper.SetDefault("tuning.batch_change_delay", "800ms") viper.SetDefault("tuning.node_mapsession_buffered_chan_size", 30) + viper.SetDefault("tuning.node_store_batch_size", defaultNodeStoreBatchSize) + viper.SetDefault("tuning.node_store_batch_timeout", "500ms") viper.SetDefault("prefixes.allocation", string(IPAllocationStrategySequential)) @@ -437,6 +491,21 @@ func validateServerConfig() error { } } + // Validate tuning parameters + if size := viper.GetInt("tuning.node_store_batch_size"); size <= 0 { + errorText += fmt.Sprintf( + "Fatal config error: tuning.node_store_batch_size must be positive, got %d\n", + size, + ) + } + + if timeout := viper.GetDuration("tuning.node_store_batch_timeout"); timeout <= 0 { + errorText += fmt.Sprintf( + "Fatal config error: tuning.node_store_batch_timeout must be positive, got %s\n", + timeout, + ) + } + if errorText != "" { // nolint return errors.New(strings.TrimSuffix(errorText, "\n")) @@ -991,7 +1060,6 @@ func LoadServerConfig() (*Config, error) { Log: logConfig, - // TODO(kradalby): Document these settings when more stable Tuning: Tuning{ NotifierSendTimeout: viper.GetDuration("tuning.notifier_send_timeout"), BatchChangeDelay: viper.GetDuration("tuning.batch_change_delay"), @@ -1006,6 +1074,8 @@ func LoadServerConfig() (*Config, error) { }(), RegisterCacheCleanup: viper.GetDuration("tuning.register_cache_cleanup"), RegisterCacheExpiration: viper.GetDuration("tuning.register_cache_expiration"), + NodeStoreBatchSize: viper.GetInt("tuning.node_store_batch_size"), + NodeStoreBatchTimeout: viper.GetDuration("tuning.node_store_batch_timeout"), }, }, nil }