From 08b3a466e8072f3b5f31311d393595e8812fb025 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 12 Sep 2023 12:41:52 -0700 Subject: [PATCH] fix: allow concurrent SFTP connections (#18013) current implementation did not fully implement the concurrent SFTP connection implementation, this PR properly handles this. fixes #17914 --- cmd/ftp-server.go | 126 ++++++++++++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 44 deletions(-) diff --git a/cmd/ftp-server.go b/cmd/ftp-server.go index 39387120f..245ee6dd6 100644 --- a/cmd/ftp-server.go +++ b/cmd/ftp-server.go @@ -21,11 +21,11 @@ import ( "context" "crypto/subtle" "fmt" - "io" "net" "os" "strconv" "strings" + "time" "github.com/minio/cli" "github.com/minio/minio/internal/logger" @@ -75,6 +75,69 @@ func (log *minioLogger) PrintResponse(sessionID string, code int, message string } } +func acceptSFTPConnection(conn net.Conn, config *ssh.ServerConfig) { + // For SSH handshake keep a 2 minute deadline, as OpensSSH default. + conn.SetDeadline(time.Now().Add(2 * time.Minute)) + + // Before use, a handshake must be performed on the incoming net.Conn. + sconn, chans, reqs, err := ssh.NewServerConn(conn, config) + if err != nil { + return + } + + // Once we are done with SSH handshake, remove deadline. + conn.SetDeadline(time.Time{}) + + // The incoming Request channel must be serviced. + go ssh.DiscardRequests(reqs) + + // Service the incoming Channel channel. + for newChannel := range chans { + // Channels have a type, depending on the application level + // protocol intended. In the case of an SFTP session, this is "subsystem" + // with a payload string of "sftp" + if newChannel.ChannelType() != "session" { + newChannel.Reject(ssh.UnknownChannelType, "unknown channel type") + continue + } + + channel, requests, err := newChannel.Accept() + if err != nil { + logger.LogOnceIf(context.Background(), fmt.Errorf("unable to accept the connection requests channel: %w", err), "accept-channel-sftp") + continue + } + + // Sessions have out-of-band requests such as "shell", + // "pty-req" and "env". Here we handle only the + // "subsystem" request. + go func(in <-chan *ssh.Request) { + for req := range in { + ok := false + if req.Type == "subsystem" { + if len(req.Payload) > 4 && string(req.Payload[4:]) == "sftp" { + ok = true + go handleSFTPConnection(channel, sconn) + } + } + + if req.WantReply { + // We only reply to SSH packets that have `sftp` payload, all other + // packets are rejected + req.Reply(ok, nil) + } + } + }(requests) + } +} + +func handleSFTPConnection(channel ssh.Channel, sconn *ssh.ServerConn) { + // Create the server instance for the channel using the handler we created above. + server := sftp.NewRequestServer(channel, NewSFTPDriver(sconn.Permissions), sftp.WithRSAllocator()) + defer server.Close() + + server.Serve() +} + func startSFTPServer(c *cli.Context) { args := c.StringSlice("sftp") @@ -176,54 +239,29 @@ func startSFTPServer(c *cli.Context) { logger.Info(fmt.Sprintf("MinIO SFTP Server listening on %s", net.JoinHostPort(publicIP, strconv.Itoa(port)))) + var tempDelay time.Duration // how long to sleep on accept failure + for { - nConn, err := listener.Accept() + conn, err := listener.Accept() if err != nil { - logger.LogIf(context.Background(), err) - continue - } - - // Before use, a handshake must be performed on the incoming net.Conn. - sconn, chans, reqs, err := ssh.NewServerConn(nConn, config) - if err != nil { - logger.LogIf(context.Background(), err) - continue - } - - // The incoming Request channel must be serviced. - go ssh.DiscardRequests(reqs) - - // Service the incoming Channel channel. - for newChannel := range chans { - // Channels have a type, depending on the application level - // protocol intended. In the case of an SFTP session, this is "subsystem" - // with a payload string of "sftp" - if newChannel.ChannelType() != "session" { - newChannel.Reject(ssh.UnknownChannelType, "unknown channel type") + // From https://github.com/golang/go/blob/4aa1efed4853ea067d665a952eee77c52faac774/src/net/http/server.go#L3046 + if ne, ok := err.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + logger.LogOnceIf(context.Background(), fmt.Errorf("error while accepting connections: %w, retrying in %s", err, tempDelay), "accept-limit-sftp") + time.Sleep(tempDelay) continue } - channel, requests, err := newChannel.Accept() - if err != nil { - logger.Fatal(err, "unable to accept the connection requests channel") - } - - // Sessions have out-of-band requests such as "shell", - // "pty-req" and "env". Here we handle only the - // "subsystem" request. - go func(in <-chan *ssh.Request) { - for req := range in { - // We only reply to SSH packets that have `sftp` payload. - req.Reply(req.Type == "subsystem" && string(req.Payload[4:]) == "sftp", nil) - } - }(requests) - - server := sftp.NewRequestServer(channel, NewSFTPDriver(sconn.Permissions)) - if err := server.Serve(); err == io.EOF { - server.Close() - } else if err != nil { - logger.Fatal(err, "unable to start SFTP server") - } + logger.Fatal(err, "unrecoverable error while accepting new connections") } + + go acceptSFTPConnection(conn, config) } }