headscale/app.go

641 lines
17 KiB
Go
Raw Normal View History

2020-06-21 06:32:08 -04:00
package headscale
import (
"context"
"crypto/tls"
"errors"
2020-06-21 06:32:08 -04:00
"fmt"
"io"
"net"
"net/http"
2021-10-22 12:55:14 -04:00
"net/url"
2021-02-21 17:54:15 -05:00
"os"
2021-11-02 17:46:15 -04:00
"os/signal"
"sort"
"strings"
"sync"
2021-11-02 17:46:15 -04:00
"syscall"
"time"
2020-06-21 06:32:08 -04:00
2021-10-18 15:27:52 -04:00
"github.com/coreos/go-oidc/v3/oidc"
"github.com/patrickmn/go-cache"
"golang.org/x/oauth2"
2020-06-21 06:32:08 -04:00
"github.com/gin-gonic/gin"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
v1 "github.com/juanfont/headscale/gen/go/headscale/v1"
"github.com/philip-bui/grpc-zerolog"
zl "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/soheilhy/cmux"
ginprometheus "github.com/zsais/go-gin-prometheus"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
2021-07-04 15:40:46 -04:00
"gorm.io/gorm"
"inet.af/netaddr"
2021-02-20 17:57:06 -05:00
"tailscale.com/tailcfg"
"tailscale.com/types/dnstype"
2021-06-25 12:57:08 -04:00
"tailscale.com/types/wgkey"
2020-06-21 06:32:08 -04:00
)
const (
AUTH_PREFIX = "Bearer "
)
// Config contains the initial Headscale configuration.
2020-06-21 06:32:08 -04:00
type Config struct {
ServerURL string
Addr string
PrivateKeyPath string
EphemeralNodeInactivityTimeout time.Duration
IPPrefix netaddr.IPPrefix
BaseDomain string
2020-06-21 06:32:08 -04:00
2021-10-22 12:55:14 -04:00
DERP DERPConfig
DBtype string
DBpath string
2020-06-21 06:32:08 -04:00
DBhost string
DBport int
DBname string
DBuser string
DBpass string
TLSLetsEncryptListen string
TLSLetsEncryptHostname string
TLSLetsEncryptCacheDir string
TLSLetsEncryptChallengeType string
TLSCertPath string
TLSKeyPath string
2021-08-24 02:09:47 -04:00
ACMEURL string
ACMEEmail string
2021-08-24 02:09:47 -04:00
DNSConfig *tailcfg.DNSConfig
UnixSocket string
2021-10-31 05:40:43 -04:00
2021-10-18 15:27:52 -04:00
OIDC OIDCConfig
2021-10-08 05:43:52 -04:00
CLI CLIConfig
2021-10-10 05:22:42 -04:00
MaxMachineRegistrationDuration time.Duration
DefaultMachineRegistrationDuration time.Duration
2020-06-21 06:32:08 -04:00
}
2021-10-18 15:27:52 -04:00
type OIDCConfig struct {
Issuer string
ClientID string
ClientSecret string
MatchMap map[string]string
2020-06-21 06:32:08 -04:00
}
2021-10-22 12:55:14 -04:00
type DERPConfig struct {
URLs []url.URL
Paths []string
AutoUpdate bool
UpdateFrequency time.Duration
}
type CLIConfig struct {
Address string
APIKey string
Insecure bool
Timeout time.Duration
}
// Headscale represents the base app of the service.
2020-06-21 06:32:08 -04:00
type Headscale struct {
cfg Config
2021-07-04 15:40:46 -04:00
db *gorm.DB
2020-06-21 06:32:08 -04:00
dbString string
dbType string
dbDebug bool
2021-06-25 12:57:08 -04:00
publicKey *wgkey.Key
privateKey *wgkey.Private
2021-10-22 12:55:14 -04:00
DERPMap *tailcfg.DERPMap
2021-07-03 11:31:32 -04:00
aclPolicy *ACLPolicy
aclRules []tailcfg.FilterRule
2021-07-03 11:31:32 -04:00
2021-08-19 13:19:26 -04:00
lastStateChange sync.Map
2021-10-08 05:43:52 -04:00
oidcProvider *oidc.Provider
oauth2Config *oauth2.Config
oidcStateCache *cache.Cache
2020-06-21 06:32:08 -04:00
}
// NewHeadscale returns the Headscale app.
2020-06-21 06:32:08 -04:00
func NewHeadscale(cfg Config) (*Headscale, error) {
2021-02-21 17:54:15 -05:00
content, err := os.ReadFile(cfg.PrivateKeyPath)
2020-06-21 06:32:08 -04:00
if err != nil {
return nil, err
}
2021-06-25 12:57:08 -04:00
privKey, err := wgkey.ParsePrivate(string(content))
2020-06-21 06:32:08 -04:00
if err != nil {
return nil, err
}
pubKey := privKey.Public()
var dbString string
switch cfg.DBtype {
case "postgres":
dbString = fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s sslmode=disable", cfg.DBhost,
cfg.DBport, cfg.DBname, cfg.DBuser, cfg.DBpass)
case "sqlite3":
dbString = cfg.DBpath
default:
2021-07-11 09:10:37 -04:00
return nil, errors.New("unsupported DB")
}
2020-06-21 06:32:08 -04:00
h := Headscale{
cfg: cfg,
dbType: cfg.DBtype,
dbString: dbString,
2020-06-21 06:32:08 -04:00
privateKey: privKey,
publicKey: &pubKey,
aclRules: tailcfg.FilterAllowAll, // default allowall
2020-06-21 06:32:08 -04:00
}
2021-07-04 07:24:05 -04:00
2020-06-21 06:32:08 -04:00
err = h.initDB()
if err != nil {
return nil, err
}
2021-07-04 15:40:46 -04:00
2021-10-18 15:27:52 -04:00
if cfg.OIDC.Issuer != "" {
2021-10-08 05:43:52 -04:00
err = h.initOIDC()
if err != nil {
return nil, err
}
2021-10-18 15:27:52 -04:00
}
2021-10-16 10:31:37 -04:00
if h.cfg.DNSConfig != nil && h.cfg.DNSConfig.Proxied { // if MagicDNS
magicDNSDomains, err := generateMagicDNSRootDomains(h.cfg.IPPrefix, h.cfg.BaseDomain)
if err != nil {
return nil, err
}
// we might have routes already from Split DNS
2021-10-22 12:55:14 -04:00
if h.cfg.DNSConfig.Routes == nil {
h.cfg.DNSConfig.Routes = make(map[string][]dnstype.Resolver)
}
2021-10-10 06:43:41 -04:00
for _, d := range magicDNSDomains {
h.cfg.DNSConfig.Routes[d.WithoutTrailingDot()] = nil
}
}
2020-06-21 06:32:08 -04:00
return &h, nil
}
// Redirect to our TLS url.
func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
target := h.cfg.ServerURL + req.URL.RequestURI()
http.Redirect(w, req, target, http.StatusFound)
}
2021-08-12 15:45:40 -04:00
// expireEphemeralNodes deletes ephemeral machine records that have not been
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
2021-08-12 15:45:40 -04:00
func (h *Headscale) expireEphemeralNodes(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
for range ticker.C {
h.expireEphemeralNodesWorker()
}
}
func (h *Headscale) expireEphemeralNodesWorker() {
namespaces, err := h.ListNamespaces()
if err != nil {
2021-08-05 13:11:26 -04:00
log.Error().Err(err).Msg("Error listing namespaces")
return
}
for _, ns := range namespaces {
machines, err := h.ListMachinesInNamespace(ns.Name)
if err != nil {
2021-08-05 15:57:47 -04:00
log.Error().Err(err).Str("namespace", ns.Name).Msg("Error listing machines in namespace")
return
}
for _, m := range machines {
2021-10-22 12:55:14 -04:00
if m.AuthKey != nil && m.LastSeen != nil && m.AuthKey.Ephemeral &&
time.Now().After(m.LastSeen.Add(h.cfg.EphemeralNodeInactivityTimeout)) {
2021-08-05 15:57:47 -04:00
log.Info().Str("machine", m.Name).Msg("Ephemeral client removed from database")
2021-07-04 15:40:46 -04:00
err = h.db.Unscoped().Delete(m).Error
if err != nil {
2021-10-22 12:55:14 -04:00
log.Error().
Err(err).
Str("machine", m.Name).
Msg("🤮 Cannot delete ephemeral machine from the database")
}
}
}
h.setLastStateChangeToNow(ns.Name)
}
}
// WatchForKVUpdates checks the KV DB table for requests to perform tailnet upgrades
// This is a way to communitate the CLI with the headscale server.
func (h *Headscale) watchForKVUpdates(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
for range ticker.C {
h.watchForKVUpdatesWorker()
}
}
func (h *Headscale) watchForKVUpdatesWorker() {
h.checkForNamespacesPendingUpdates()
// more functions will come here in the future
}
func (h *Headscale) grpcAuthenticationInterceptor(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
// Check if the request is coming from the on-server client.
// This is not secure, but it is to maintain maintainability
// with the "legacy" database-based client
// It is also neede for grpc-gateway to be able to connect to
// the server
p, _ := peer.FromContext(ctx)
log.Trace().Caller().Str("client_address", p.Addr.String()).Msg("Client is trying to authenticate")
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Error().Caller().Str("client_address", p.Addr.String()).Msg("Retrieving metadata is failed")
return ctx, status.Errorf(codes.InvalidArgument, "Retrieving metadata is failed")
}
authHeader, ok := md["authorization"]
if !ok {
log.Error().Caller().Str("client_address", p.Addr.String()).Msg("Authorization token is not supplied")
return ctx, status.Errorf(codes.Unauthenticated, "Authorization token is not supplied")
}
token := authHeader[0]
if !strings.HasPrefix(token, AUTH_PREFIX) {
log.Error().
Caller().
Str("client_address", p.Addr.String()).
Msg(`missing "Bearer " prefix in "Authorization" header`)
return ctx, status.Error(codes.Unauthenticated, `missing "Bearer " prefix in "Authorization" header`)
}
// TODO(kradalby): Implement API key backend:
// - Table in the DB
// - Key name
// - Encrypted
// - Expiry
//
// Currently all other than localhost traffic is unauthorized, this is intentional to allow
// us to make use of gRPC for our CLI, but not having to implement any of the remote capabilities
// and API key auth
return ctx, status.Error(codes.Unauthenticated, "Authentication is not implemented yet")
//if strings.TrimPrefix(token, AUTH_PREFIX) != a.Token {
// log.Error().Caller().Str("client_address", p.Addr.String()).Msg("invalid token")
// return ctx, status.Error(codes.Unauthenticated, "invalid token")
//}
// return handler(ctx, req)
}
func (h *Headscale) httpAuthenticationMiddleware(c *gin.Context) {
log.Trace().
Caller().
Str("client_address", c.ClientIP()).
Msg("HTTP authentication invoked")
authHeader := c.GetHeader("authorization")
if !strings.HasPrefix(authHeader, AUTH_PREFIX) {
log.Error().
Caller().
Str("client_address", c.ClientIP()).
Msg(`missing "Bearer " prefix in "Authorization" header`)
c.AbortWithStatus(http.StatusUnauthorized)
return
}
c.AbortWithStatus(http.StatusUnauthorized)
// TODO(kradalby): Implement API key backend
// Currently all traffic is unauthorized, this is intentional to allow
// us to make use of gRPC for our CLI, but not having to implement any of the remote capabilities
// and API key auth
//
// if strings.TrimPrefix(authHeader, AUTH_PREFIX) != a.Token {
// log.Error().Caller().Str("client_address", c.ClientIP()).Msg("invalid token")
// c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error", "unauthorized"})
// return
// }
// c.Next()
}
// ensureUnixSocketIsAbsent will check if the given path for headscales unix socket is clear
// and will remove it if it is not.
func (h *Headscale) ensureUnixSocketIsAbsent() error {
// File does not exist, all fine
if _, err := os.Stat(h.cfg.UnixSocket); errors.Is(err, os.ErrNotExist) {
return nil
}
return os.Remove(h.cfg.UnixSocket)
}
// Serve launches a GIN server with the Headscale API.
2020-06-21 06:32:08 -04:00
func (h *Headscale) Serve() error {
var err error
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err = h.ensureUnixSocketIsAbsent()
if err != nil {
panic(err)
}
socketListener, err := net.Listen("unix", h.cfg.UnixSocket)
if err != nil {
panic(err)
}
2021-11-02 17:46:15 -04:00
// Handle common process-killing signals so we can gracefully shut down:
sigc := make(chan os.Signal, 1)
2021-11-02 17:49:19 -04:00
signal.Notify(sigc, os.Interrupt, syscall.SIGTERM)
2021-11-02 17:46:15 -04:00
go func(c chan os.Signal) {
// Wait for a SIGINT or SIGKILL:
sig := <-c
log.Printf("Caught signal %s: shutting down.", sig)
// Stop listening (and unlink the socket if unix type):
socketListener.Close()
// And we're done:
os.Exit(0)
}(sigc)
networkListener, err := net.Listen("tcp", h.cfg.Addr)
if err != nil {
panic(err)
}
// Create the cmux object that will multiplex 2 protocols on the same port.
// The two following listeners will be served on the same port below gracefully.
m := cmux.New(networkListener)
// Match gRPC requests here
grpcListener := m.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
// Otherwise match regular http requests.
httpListener := m.Match(cmux.Any())
grpcGatewayMux := runtime.NewServeMux()
// Make the grpc-gateway connect to grpc over socket
grpcGatewayConn, err := grpc.Dial(
h.cfg.UnixSocket,
[]grpc.DialOption{
grpc.WithInsecure(),
2021-10-30 10:29:03 -04:00
grpc.WithContextDialer(GrpcSocketDialer),
}...,
)
if err != nil {
return err
}
// Connect to the gRPC server over localhost to skip
// the authentication.
err = v1.RegisterHeadscaleServiceHandler(ctx, grpcGatewayMux, grpcGatewayConn)
if err != nil {
return err
}
2020-06-21 06:32:08 -04:00
r := gin.Default()
p := ginprometheus.NewPrometheus("gin")
p.Use(r)
r.GET("/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"healthy": "ok"}) })
2020-06-21 06:32:08 -04:00
r.GET("/key", h.KeyHandler)
r.GET("/register", h.RegisterWebAPI)
r.POST("/machine/:id/map", h.PollNetMapHandler)
r.POST("/machine/:id", h.RegistrationHandler)
r.GET("/oidc/register/:mkey", h.RegisterOIDC)
2021-09-26 04:53:05 -04:00
r.GET("/oidc/callback", h.OIDCCallback)
2021-09-19 12:56:29 -04:00
r.GET("/apple", h.AppleMobileConfig)
r.GET("/apple/:platform", h.ApplePlatformConfig)
2021-10-30 10:29:53 -04:00
r.GET("/swagger", SwaggerUI)
r.GET("/swagger/v1/openapiv2.json", SwaggerAPIv1)
api := r.Group("/api")
api.Use(h.httpAuthenticationMiddleware)
{
api.Any("/v1/*any", gin.WrapF(grpcGatewayMux.ServeHTTP))
}
r.NoRoute(stdoutHandler)
2021-10-22 12:55:14 -04:00
// Fetch an initial DERP Map before we start serving
h.DERPMap = GetDERPMap(h.cfg.DERP)
if h.cfg.DERP.AutoUpdate {
derpMapCancelChannel := make(chan struct{})
defer func() { derpMapCancelChannel <- struct{}{} }()
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
}
// I HATE THIS
updateMillisecondsWait := int64(5000)
go h.watchForKVUpdates(updateMillisecondsWait)
go h.expireEphemeralNodes(updateMillisecondsWait)
httpServer := &http.Server{
Addr: h.cfg.Addr,
Handler: r,
ReadTimeout: 30 * time.Second,
// Go does not handle timeouts in HTTP very well, and there is
// no good way to handle streaming timeouts, therefore we need to
// keep this at unlimited and be careful to clean up connections
// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/#aboutstreaming
WriteTimeout: 0,
}
if zl.GlobalLevel() == zl.TraceLevel {
zerolog.RespLog = true
} else {
zerolog.RespLog = false
}
grpcOptions := []grpc.ServerOption{
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
h.grpcAuthenticationInterceptor,
zerolog.NewUnaryServerInterceptor(),
),
),
}
tlsConfig, err := h.getTLSSettings()
if err != nil {
log.Error().Err(err).Msg("Failed to set up TLS configuration")
return err
}
if tlsConfig != nil {
httpServer.TLSConfig = tlsConfig
grpcOptions = append(grpcOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
grpcServer := grpc.NewServer(grpcOptions...)
// Start the local gRPC server without TLS and without authentication
grpcSocket := grpc.NewServer(zerolog.UnaryInterceptor())
v1.RegisterHeadscaleServiceServer(grpcServer, newHeadscaleV1APIServer(h))
v1.RegisterHeadscaleServiceServer(grpcSocket, newHeadscaleV1APIServer(h))
reflection.Register(grpcServer)
reflection.Register(grpcSocket)
g := new(errgroup.Group)
g.Go(func() error { return grpcSocket.Serve(socketListener) })
2021-10-31 12:34:20 -04:00
// TODO(kradalby): Verify if we need the same TLS setup for gRPC as HTTP
g.Go(func() error { return grpcServer.Serve(grpcListener) })
2021-10-31 12:19:38 -04:00
if tlsConfig != nil {
g.Go(func() error {
tlsl := tls.NewListener(httpListener, tlsConfig)
return httpServer.Serve(tlsl)
})
} else {
g.Go(func() error { return httpServer.Serve(httpListener) })
}
g.Go(func() error { return m.Serve() })
log.Info().Msgf("listening and serving (multiplexed HTTP and gRPC) on: %s", h.cfg.Addr)
return g.Wait()
}
func (h *Headscale) getTLSSettings() (*tls.Config, error) {
if h.cfg.TLSLetsEncryptHostname != "" {
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
2021-08-05 13:11:26 -04:00
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
}
m := autocert.Manager{
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(h.cfg.TLSLetsEncryptHostname),
Cache: autocert.DirCache(h.cfg.TLSLetsEncryptCacheDir),
Client: &acme.Client{
DirectoryURL: h.cfg.ACMEURL,
},
Email: h.cfg.ACMEEmail,
}
if h.cfg.TLSLetsEncryptChallengeType == "TLS-ALPN-01" {
// Configuration via autocert with TLS-ALPN-01 (https://tools.ietf.org/html/rfc8737)
// The RFC requires that the validation is done on port 443; in other words, headscale
// must be reachable on port 443.
return m.TLSConfig(), nil
} else if h.cfg.TLSLetsEncryptChallengeType == "HTTP-01" {
// Configuration via autocert with HTTP-01. This requires listening on
// port 80 for the certificate validation in addition to the headscale
// service, which can be configured to run on any other port.
go func() {
2021-08-05 13:11:26 -04:00
log.Fatal().
Err(http.ListenAndServe(h.cfg.TLSLetsEncryptListen, m.HTTPHandler(http.HandlerFunc(h.redirect)))).
Msg("failed to set up a HTTP server")
}()
return m.TLSConfig(), nil
} else {
return nil, errors.New("unknown value for TLSLetsEncryptChallengeType")
}
} else if h.cfg.TLSCertPath == "" {
if !strings.HasPrefix(h.cfg.ServerURL, "http://") {
2021-08-05 13:11:26 -04:00
log.Warn().Msg("Listening without TLS but ServerURL does not start with http://")
}
return nil, nil
} else {
if !strings.HasPrefix(h.cfg.ServerURL, "https://") {
2021-08-05 13:11:26 -04:00
log.Warn().Msg("Listening with TLS but ServerURL does not start with https://")
}
var err error
tlsConfig := &tls.Config{}
tlsConfig.ClientAuth = tls.RequireAnyClientCert
tlsConfig.NextProtos = []string{"http/1.1"}
tlsConfig.Certificates = make([]tls.Certificate, 1)
tlsConfig.Certificates[0], err = tls.LoadX509KeyPair(h.cfg.TLSCertPath, h.cfg.TLSKeyPath)
return tlsConfig, err
}
2020-06-21 06:32:08 -04:00
}
2021-08-19 13:19:26 -04:00
func (h *Headscale) setLastStateChangeToNow(namespace string) {
now := time.Now().UTC()
lastStateUpdate.WithLabelValues("", "headscale").Set(float64(now.Unix()))
2021-08-19 13:19:26 -04:00
h.lastStateChange.Store(namespace, now)
}
func (h *Headscale) getLastStateChange(namespaces ...string) time.Time {
times := []time.Time{}
for _, namespace := range namespaces {
if wrapped, ok := h.lastStateChange.Load(namespace); ok {
lastChange, _ := wrapped.(time.Time)
times = append(times, lastChange)
}
2021-08-19 13:19:26 -04:00
}
sort.Slice(times, func(i, j int) bool {
return times[i].After(times[j])
})
log.Trace().Msgf("Latest times %#v", times)
if len(times) == 0 {
return time.Now().UTC()
} else {
return times[0]
}
}
func stdoutHandler(c *gin.Context) {
b, _ := io.ReadAll(c.Request.Body)
log.Trace().
Interface("header", c.Request.Header).
Interface("proto", c.Request.Proto).
Interface("url", c.Request.URL).
Bytes("body", b).
Msg("Request did not match")
}