batcher: send endpoint and derp only updates. (#2856)

This commit is contained in:
Kristoffer Dalby
2025-11-13 13:38:49 -06:00
committed by GitHub
parent 4b25976288
commit 7fb0f9a501
8 changed files with 277 additions and 21 deletions

View File

@@ -8,12 +8,24 @@ import (
"github.com/juanfont/headscale/hscontrol/state"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/types/change"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog/log"
"tailscale.com/tailcfg"
"tailscale.com/types/ptr"
)
var (
mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "headscale",
Name: "mapresponse_generated_total",
Help: "total count of mapresponses generated by response type and change type",
}, []string{"response_type", "change_type"})
errNodeNotFoundInNodeStore = errors.New("node not found in NodeStore")
)
type batcherFunc func(cfg *types.Config, state *state.State) Batcher
// Batcher defines the common interface for all batcher implementations.
@@ -75,21 +87,32 @@ func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion,
}
var (
mapResp *tailcfg.MapResponse
err error
mapResp *tailcfg.MapResponse
err error
responseType string
)
// Record metric when function exits
defer func() {
if err == nil && mapResp != nil && responseType != "" {
mapResponseGenerated.WithLabelValues(responseType, c.Change.String()).Inc()
}
}()
switch c.Change {
case change.DERP:
responseType = "derp"
mapResp, err = mapper.derpMapResponse(nodeID)
case change.NodeCameOnline, change.NodeWentOffline:
if c.IsSubnetRouter {
// TODO(kradalby): This can potentially be a peer update of the old and new subnet router.
responseType = "full"
mapResp, err = mapper.fullMapResponse(nodeID, version)
} else {
// Trust the change type for online/offline status to avoid race conditions
// between NodeStore updates and change processing
responseType = string(patchResponseDebug)
onlineStatus := c.Change == change.NodeCameOnline
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
@@ -105,21 +128,26 @@ func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion,
// to ensure the node sees changes to its own properties (e.g., hostname/DNS name changes)
// without losing its view of peer status during rapid reconnection cycles
if c.IsSelfUpdate(nodeID) {
responseType = "self"
mapResp, err = mapper.selfMapResponse(nodeID, version)
} else {
responseType = "change"
mapResp, err = mapper.peerChangeResponse(nodeID, version, c.NodeID)
}
case change.NodeRemove:
responseType = "remove"
mapResp, err = mapper.peerRemovedResponse(nodeID, c.NodeID)
case change.NodeKeyExpiry:
// If the node is the one whose key is expiring, we send a "full" self update
// as nodes will ignore patch updates about themselves (?).
if c.IsSelfUpdate(nodeID) {
responseType = "self"
mapResp, err = mapper.selfMapResponse(nodeID, version)
// mapResp, err = mapper.fullMapResponse(nodeID, version)
} else {
responseType = "patch"
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
{
NodeID: c.NodeID.NodeID(),
@@ -128,9 +156,35 @@ func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion,
})
}
case change.NodeEndpoint, change.NodeDERP:
// Endpoint or DERP changes can be sent as lightweight patches.
// Query the NodeStore for the current peer state to construct the PeerChange.
// Even if only endpoint or only DERP changed, we include both in the patch
// since they're often updated together and it's minimal overhead.
responseType = "patch"
peer, found := mapper.state.GetNodeByID(c.NodeID)
if !found {
return nil, fmt.Errorf("%w: %d", errNodeNotFoundInNodeStore, c.NodeID)
}
peerChange := &tailcfg.PeerChange{
NodeID: c.NodeID.NodeID(),
Endpoints: peer.Endpoints().AsSlice(),
DERPRegion: 0, // Will be set below if available
}
// Extract DERP region from Hostinfo if available
if hi := peer.AsStruct().Hostinfo; hi != nil && hi.NetInfo != nil {
peerChange.DERPRegion = hi.NetInfo.PreferredDERP
}
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{peerChange})
default:
// The following will always hit this:
// change.Full, change.Policy
responseType = "full"
mapResp, err = mapper.fullMapResponse(nodeID, version)
}

View File

@@ -1,6 +1,7 @@
package mapper
import (
"errors"
"fmt"
"net/netip"
"strings"
@@ -21,6 +22,8 @@ import (
"zgo.at/zcache/v2"
)
var errNodeNotFoundAfterAdd = errors.New("node not found after adding to batcher")
// batcherTestCase defines a batcher function with a descriptive name for testing.
type batcherTestCase struct {
name string
@@ -50,7 +53,12 @@ func (t *testBatcherWrapper) AddNode(id types.NodeID, c chan<- *tailcfg.MapRespo
// Send the online notification that poll.go would normally send
// This ensures other nodes get notified about this node coming online
t.AddWork(change.NodeOnline(id))
node, ok := t.state.GetNodeByID(id)
if !ok {
return fmt.Errorf("%w: %d", errNodeNotFoundAfterAdd, id)
}
t.AddWork(change.NodeOnline(node))
return nil
}
@@ -65,7 +73,10 @@ func (t *testBatcherWrapper) RemoveNode(id types.NodeID, c chan<- *tailcfg.MapRe
// Send the offline notification that poll.go would normally send
// Do this BEFORE removing from batcher so the change can be processed
t.AddWork(change.NodeOffline(id))
node, ok := t.state.GetNodeByID(id)
if ok {
t.AddWork(change.NodeOffline(node))
}
// Finally remove from the real batcher
removed := t.Batcher.RemoveNode(id, c)

View File

@@ -0,0 +1,113 @@
package state
import (
"net/netip"
"testing"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"tailscale.com/tailcfg"
)
// TestEndpointStorageInNodeStore verifies that endpoints sent in MapRequest via ApplyPeerChange
// are correctly stored in the NodeStore and can be retrieved for sending to peers.
// This test reproduces the issue reported in https://github.com/juanfont/headscale/issues/2846
func TestEndpointStorageInNodeStore(t *testing.T) {
// Create two test nodes
node1 := createTestNode(1, 1, "test-user", "node1")
node2 := createTestNode(2, 1, "test-user", "node2")
// Create NodeStore with allow-all peers function
store := NewNodeStore(nil, allowAllPeersFunc)
store.Start()
defer store.Stop()
// Add both nodes to NodeStore
store.PutNode(node1)
store.PutNode(node2)
// Create a MapRequest with endpoints for node1
endpoints := []netip.AddrPort{
netip.MustParseAddrPort("192.168.1.1:41641"),
netip.MustParseAddrPort("10.0.0.1:41641"),
}
mapReq := tailcfg.MapRequest{
NodeKey: node1.NodeKey,
DiscoKey: node1.DiscoKey,
Endpoints: endpoints,
Hostinfo: &tailcfg.Hostinfo{
Hostname: "node1",
},
}
// Simulate what UpdateNodeFromMapRequest does: create PeerChange and apply it
peerChange := node1.PeerChangeFromMapRequest(mapReq)
// Verify PeerChange has endpoints
require.NotNil(t, peerChange.Endpoints, "PeerChange should contain endpoints")
assert.Len(t, peerChange.Endpoints, len(endpoints),
"PeerChange should have same number of endpoints as MapRequest")
// Apply the PeerChange via NodeStore.UpdateNode
updatedNode, ok := store.UpdateNode(node1.ID, func(n *types.Node) {
n.ApplyPeerChange(&peerChange)
})
require.True(t, ok, "UpdateNode should succeed")
require.True(t, updatedNode.Valid(), "Updated node should be valid")
// Verify endpoints are in the updated node view
storedEndpoints := updatedNode.Endpoints().AsSlice()
assert.Len(t, storedEndpoints, len(endpoints),
"NodeStore should have same number of endpoints as sent")
if len(storedEndpoints) == len(endpoints) {
for i, ep := range endpoints {
assert.Equal(t, ep, storedEndpoints[i],
"Endpoint %d should match", i)
}
}
// Verify we can retrieve the node again and endpoints are still there
retrievedNode, found := store.GetNode(node1.ID)
require.True(t, found, "node1 should exist in NodeStore")
retrievedEndpoints := retrievedNode.Endpoints().AsSlice()
assert.Len(t, retrievedEndpoints, len(endpoints),
"Retrieved node should have same number of endpoints")
// Verify that when we get node1 as a peer of node2, it has endpoints
// This is the critical part that was failing in the bug report
peers := store.ListPeers(node2.ID)
require.Positive(t, peers.Len(), "node2 should have at least one peer")
// Find node1 in the peer list
var node1Peer types.NodeView
foundPeer := false
for _, peer := range peers.All() {
if peer.ID() == node1.ID {
node1Peer = peer
foundPeer = true
break
}
}
require.True(t, foundPeer, "node1 should be in node2's peer list")
// Check that node1's endpoints are available in the peer view
peerEndpoints := node1Peer.Endpoints().AsSlice()
assert.Len(t, peerEndpoints, len(endpoints),
"Peer view should have same number of endpoints as sent")
if len(peerEndpoints) == len(endpoints) {
for i, ep := range endpoints {
assert.Equal(t, ep, peerEndpoints[i],
"Peer endpoint %d should match", i)
}
}
}

View File

@@ -459,7 +459,8 @@ func (s *State) Connect(id types.NodeID) []change.ChangeSet {
if !ok {
return nil
}
c := []change.ChangeSet{change.NodeOnline(id)}
c := []change.ChangeSet{change.NodeOnline(node)}
log.Info().Uint64("node.id", id.Uint64()).Str("node.name", node.Hostname()).Msg("Node connected")
@@ -505,7 +506,7 @@ func (s *State) Disconnect(id types.NodeID) ([]change.ChangeSet, error) {
// announced are served to any nodes.
routeChange := s.primaryRoutes.SetRoutes(id)
cs := []change.ChangeSet{change.NodeOffline(id), c}
cs := []change.ChangeSet{change.NodeOffline(node), c}
// If we have a policy change or route change, return that as it's more comprehensive
// Otherwise, return the NodeOffline change to ensure nodes are notified
@@ -1343,7 +1344,6 @@ func (s *State) HandleNodeFromPreAuthKey(
Bool("authkey.reusable", pak.Reusable).
Bool("nodekey.rotation", isNodeKeyRotation).
Msg("Existing node re-registering with same NodeKey and auth key, skipping validation")
} else {
// New node or NodeKey rotation: require valid auth key.
err = pak.Validate()
@@ -1631,13 +1631,21 @@ func (s *State) UpdateNodeFromMapRequest(id types.NodeID, req tailcfg.MapRequest
Interface("request", req).
Msg("Processing MapRequest for node")
var routeChange bool
var hostinfoChanged bool
var needsRouteApproval bool
var (
routeChange bool
hostinfoChanged bool
needsRouteApproval bool
endpointChanged bool
derpChanged bool
)
// We need to ensure we update the node as it is in the NodeStore at
// the time of the request.
updatedNode, ok := s.nodeStore.UpdateNode(id, func(currentNode *types.Node) {
peerChange := currentNode.PeerChangeFromMapRequest(req)
// Track what specifically changed
endpointChanged = peerChange.Endpoints != nil
derpChanged = peerChange.DERPRegion != 0
hostinfoChanged = !hostinfoEqual(currentNode.View(), req.Hostinfo)
// Get the correct NetInfo to use
@@ -1787,6 +1795,24 @@ func (s *State) UpdateNodeFromMapRequest(id types.NodeID, req tailcfg.MapRequest
return nodeRouteChange, nil
}
// Determine the most specific change type based on what actually changed.
// This allows us to send lightweight patch updates instead of full map responses.
// Hostinfo changes require NodeAdded (full update) as they may affect many fields.
if hostinfoChanged {
return change.NodeAdded(id), nil
}
// Return specific change types for endpoint and/or DERP updates.
// The batcher will query NodeStore for current state and include both in PeerChange if both changed.
// Prioritize endpoint changes as they're more common and important for connectivity.
if endpointChanged {
return change.EndpointUpdate(id), nil
}
if derpChanged {
return change.DERPUpdate(id), nil
}
return change.NodeAdded(id), nil
}

View File

@@ -34,6 +34,8 @@ const (
NodeRemove Change = 23
NodeKeyExpiry Change = 24
NodeNewOrUpdate Change = 25
NodeEndpoint Change = 26
NodeDERP Change = 27
// User changes.
UserNewOrUpdate Change = 51
@@ -174,17 +176,19 @@ func NodeRemoved(id types.NodeID) ChangeSet {
}
}
func NodeOnline(id types.NodeID) ChangeSet {
func NodeOnline(node types.NodeView) ChangeSet {
return ChangeSet{
Change: NodeCameOnline,
NodeID: id,
Change: NodeCameOnline,
NodeID: node.ID(),
IsSubnetRouter: node.IsSubnetRouter(),
}
}
func NodeOffline(id types.NodeID) ChangeSet {
func NodeOffline(node types.NodeView) ChangeSet {
return ChangeSet{
Change: NodeWentOffline,
NodeID: id,
Change: NodeWentOffline,
NodeID: node.ID(),
IsSubnetRouter: node.IsSubnetRouter(),
}
}
@@ -196,6 +200,20 @@ func KeyExpiry(id types.NodeID, expiry time.Time) ChangeSet {
}
}
func EndpointUpdate(id types.NodeID) ChangeSet {
return ChangeSet{
Change: NodeEndpoint,
NodeID: id,
}
}
func DERPUpdate(id types.NodeID) ChangeSet {
return ChangeSet{
Change: NodeDERP,
NodeID: id,
}
}
func UserAdded(id types.UserID) ChangeSet {
return ChangeSet{
Change: UserNewOrUpdate,

View File

@@ -18,6 +18,8 @@ func _() {
_ = x[NodeRemove-23]
_ = x[NodeKeyExpiry-24]
_ = x[NodeNewOrUpdate-25]
_ = x[NodeEndpoint-26]
_ = x[NodeDERP-27]
_ = x[UserNewOrUpdate-51]
_ = x[UserRemove-52]
}
@@ -26,13 +28,13 @@ const (
_Change_name_0 = "ChangeUnknown"
_Change_name_1 = "Full"
_Change_name_2 = "PolicyDERPExtraRecords"
_Change_name_3 = "NodeCameOnlineNodeWentOfflineNodeRemoveNodeKeyExpiryNodeNewOrUpdate"
_Change_name_3 = "NodeCameOnlineNodeWentOfflineNodeRemoveNodeKeyExpiryNodeNewOrUpdateNodeEndpointNodeDERP"
_Change_name_4 = "UserNewOrUpdateUserRemove"
)
var (
_Change_index_2 = [...]uint8{0, 6, 10, 22}
_Change_index_3 = [...]uint8{0, 14, 29, 39, 52, 67}
_Change_index_3 = [...]uint8{0, 14, 29, 39, 52, 67, 79, 87}
_Change_index_4 = [...]uint8{0, 15, 25}
)
@@ -45,7 +47,7 @@ func (i Change) String() string {
case 11 <= i && i <= 13:
i -= 11
return _Change_name_2[_Change_index_2[i]:_Change_index_2[i+1]]
case 21 <= i && i <= 25:
case 21 <= i && i <= 27:
i -= 21
return _Change_name_3[_Change_index_3[i]:_Change_index_3[i+1]]
case 51 <= i && i <= 52:

View File

@@ -535,8 +535,10 @@ func (node *Node) PeerChangeFromMapRequest(req tailcfg.MapRequest) tailcfg.PeerC
}
}
// TODO(kradalby): Find a good way to compare updates
ret.Endpoints = req.Endpoints
// Compare endpoints using order-independent comparison
if EndpointsChanged(node.Endpoints, req.Endpoints) {
ret.Endpoints = req.Endpoints
}
now := time.Now()
ret.LastSeen = &now
@@ -544,6 +546,32 @@ func (node *Node) PeerChangeFromMapRequest(req tailcfg.MapRequest) tailcfg.PeerC
return ret
}
// EndpointsChanged compares two endpoint slices and returns true if they differ.
// The comparison is order-independent - endpoints are sorted before comparison.
func EndpointsChanged(oldEndpoints, newEndpoints []netip.AddrPort) bool {
if len(oldEndpoints) != len(newEndpoints) {
return true
}
if len(oldEndpoints) == 0 {
return false
}
// Make copies to avoid modifying the original slices
oldCopy := slices.Clone(oldEndpoints)
newCopy := slices.Clone(newEndpoints)
// Sort both slices to enable order-independent comparison
slices.SortFunc(oldCopy, func(a, b netip.AddrPort) int {
return a.Compare(b)
})
slices.SortFunc(newCopy, func(a, b netip.AddrPort) int {
return a.Compare(b)
})
return !slices.Equal(oldCopy, newCopy)
}
func (node *Node) RegisterMethodToV1Enum() v1.RegisterMethod {
switch node.RegisterMethod {
case "authkey":