state: introduce state

this commit moves all of the read and write logic, and all different parts
of headscale that manages some sort of persistent and in memory state into
a separate package.

The goal of this is to clearly define the boundry between parts of the app
which accesses and modifies data, and where it happens. Previously, different
state (routes, policy, db and so on) was used directly, and sometime passed to
functions as pointers.

Now all access has to go through state. In the initial implementation,
most of the same functions exists and have just been moved. In the future
centralising this will allow us to optimise bottle necks with the database
(in memory state) and make the different parts talking to eachother do so
in the same way across headscale components.

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby
2025-05-27 16:27:16 +02:00
committed by Kristoffer Dalby
parent a975b6a8b1
commit 1553f0ab53
17 changed files with 1390 additions and 1067 deletions

View File

@@ -10,7 +10,6 @@ import (
"time"
"github.com/juanfont/headscale/hscontrol/mapper"
"github.com/juanfont/headscale/hscontrol/policy"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/rs/zerolog/log"
"github.com/sasha-s/go-deadlock"
@@ -95,26 +94,6 @@ func (h *Headscale) newMapSession(
}
}
func (m *mapSession) close() {
m.cancelChMu.Lock()
defer m.cancelChMu.Unlock()
if !m.cancelChOpen {
mapResponseClosed.WithLabelValues("chanclosed").Inc()
return
}
m.tracef("mapSession (%p) sending message on cancel chan", m)
select {
case m.cancelCh <- struct{}{}:
mapResponseClosed.WithLabelValues("sent").Inc()
m.tracef("mapSession (%p) sent message on cancel chan", m)
case <-time.After(30 * time.Second):
mapResponseClosed.WithLabelValues("timeout").Inc()
m.tracef("mapSession (%p) timed out sending close message", m)
}
}
func (m *mapSession) isStreaming() bool {
return m.req.Stream && !m.req.ReadOnly
}
@@ -201,14 +180,14 @@ func (m *mapSession) serveLongPoll() {
// reconnects, the channel might be of another connection.
// In that case, it is not closed and the node is still online.
if m.h.nodeNotifier.RemoveNode(m.node.ID, m.ch) {
// Failover the node's routes if any.
m.h.updateNodeOnlineStatus(false, m.node)
// When a node disconnects, and it causes the primary route map to change,
// send a full update to all nodes.
// TODO(kradalby): This can likely be made more effective, but likely most
// nodes has access to the same routes, so it might not be a big deal.
if m.h.primaryRoutes.SetRoutes(m.node.ID) {
change, err := m.h.state.Disconnect(m.node)
if err != nil {
m.errf(err, "Failed to disconnect node %s", m.node.Hostname)
}
if change {
ctx := types.NotifyCtx(context.Background(), "poll-primary-change", m.node.Hostname)
m.h.nodeNotifier.NotifyAll(ctx, types.UpdateFull())
}
@@ -222,7 +201,7 @@ func (m *mapSession) serveLongPoll() {
m.h.pollNetMapStreamWG.Add(1)
defer m.h.pollNetMapStreamWG.Done()
if m.h.primaryRoutes.SetRoutes(m.node.ID, m.node.SubnetRoutes()...) {
if m.h.state.Connect(m.node) {
ctx := types.NotifyCtx(context.Background(), "poll-primary-change", m.node.Hostname)
m.h.nodeNotifier.NotifyAll(ctx, types.UpdateFull())
}
@@ -240,7 +219,14 @@ func (m *mapSession) serveLongPoll() {
m.keepAliveTicker = time.NewTicker(m.keepAlive)
m.h.nodeNotifier.AddNode(m.node.ID, m.ch)
go m.h.updateNodeOnlineStatus(true, m.node)
go func() {
changed := m.h.state.Connect(m.node)
if changed {
ctx := types.NotifyCtx(context.Background(), "poll-primary-change", m.node.Hostname)
m.h.nodeNotifier.NotifyAll(ctx, types.UpdateFull())
}
}()
m.infof("node has connected, mapSession: %p, chan: %p", m, m.ch)
@@ -282,7 +268,7 @@ func (m *mapSession) serveLongPoll() {
// Ensure the node object is updated, for example, there
// might have been a hostinfo update in a sidechannel
// which contains data needed to generate a map response.
m.node, err = m.h.db.GetNodeByID(m.node.ID)
m.node, err = m.h.state.GetNodeByID(m.node.ID)
if err != nil {
m.errf(err, "Could not get machine from db")
@@ -327,7 +313,7 @@ func (m *mapSession) serveLongPoll() {
updateType = "remove"
case types.StateDERPUpdated:
m.tracef("Sending DERPUpdate MapResponse")
data, err = m.mapper.DERPMapResponse(m.req, m.node, m.h.DERPMap)
data, err = m.mapper.DERPMapResponse(m.req, m.node, m.h.state.DERPMap())
updateType = "derp"
}
@@ -392,31 +378,6 @@ func (m *mapSession) serveLongPoll() {
}
}
// updateNodeOnlineStatus records the last seen status of a node and notifies peers
// about change in their online/offline status.
// It takes a StateUpdateType of either StatePeerOnlineChanged or StatePeerOfflineChanged.
func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
change := &tailcfg.PeerChange{
NodeID: tailcfg.NodeID(node.ID),
Online: &online,
}
if !online {
now := time.Now()
// lastSeen is only relevant if the node is disconnected.
node.LastSeen = &now
change.LastSeen = &now
}
if node.LastSeen != nil {
h.db.SetLastSeen(node.ID, *node.LastSeen)
}
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-onlinestatus", node.Hostname)
h.nodeNotifier.NotifyWithIgnore(ctx, types.UpdatePeerPatch(change), node.ID)
}
func (m *mapSession) handleEndpointUpdate() {
m.tracef("received endpoint update")
@@ -459,18 +420,13 @@ func (m *mapSession) handleEndpointUpdate() {
// If the hostinfo has changed, but not the routes, just update
// hostinfo and let the function continue.
if routesChanged {
// TODO(kradalby): I am not sure if we need this?
nodesChangedHook(m.h.db, m.h.polMan, m.h.nodeNotifier)
// Approve any route that has been defined in policy as
// auto approved. Any change here is not important as any
// actual state change will be detected when the route manager
// is updated.
policy.AutoApproveRoutes(m.h.polMan, m.node)
// Auto approve any routes that have been defined in policy as
// auto approved. Check if this actually changed the node.
routesAutoApproved := m.h.state.AutoApproveRoutes(m.node)
// Update the routes of the given node in the route manager to
// see if an update needs to be sent.
if m.h.primaryRoutes.SetRoutes(m.node.ID, m.node.SubnetRoutes()...) {
if m.h.state.SetNodeRoutes(m.node.ID, m.node.SubnetRoutes()...) {
ctx := types.NotifyCtx(m.ctx, "poll-primary-change", m.node.Hostname)
m.h.nodeNotifier.NotifyAll(ctx, types.UpdateFull())
} else {
@@ -487,6 +443,16 @@ func (m *mapSession) handleEndpointUpdate() {
types.UpdateSelf(m.node.ID),
m.node.ID)
}
// If routes were auto-approved, we need to save the node to persist the changes
if routesAutoApproved {
if _, _, err := m.h.state.SaveNode(m.node); err != nil {
m.errf(err, "Failed to save auto-approved routes to node")
http.Error(m.w, "", http.StatusInternalServerError)
mapResponseEndpointUpdates.WithLabelValues("error").Inc()
return
}
}
}
// Check if there has been a change to Hostname and update them
@@ -495,7 +461,8 @@ func (m *mapSession) handleEndpointUpdate() {
// the hostname change.
m.node.ApplyHostnameFromHostInfo(m.req.Hostinfo)
if err := m.h.db.DB.Save(m.node).Error; err != nil {
_, policyChanged, err := m.h.state.SaveNode(m.node)
if err != nil {
m.errf(err, "Failed to persist/update node in the database")
http.Error(m.w, "", http.StatusInternalServerError)
mapResponseEndpointUpdates.WithLabelValues("error").Inc()
@@ -503,6 +470,12 @@ func (m *mapSession) handleEndpointUpdate() {
return
}
// Send policy update notifications if needed
if policyChanged {
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-policy", m.node.Hostname)
m.h.nodeNotifier.NotifyAll(ctx, types.UpdateFull())
}
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-peers-patch", m.node.Hostname)
m.h.nodeNotifier.NotifyWithIgnore(
ctx,