Add krb5 support for HDFS gateway (#7933)

This commit is contained in:
Harshavardhana
2019-07-24 18:05:48 -07:00
committed by GitHub
parent a4ce1daf99
commit 6f2b4675fa
4 changed files with 104 additions and 97 deletions

View File

@@ -18,6 +18,7 @@ package hdfs
import (
"context"
"fmt"
"io"
"net"
"net/http"
@@ -30,7 +31,11 @@ import (
"time"
"github.com/minio/cli"
krb "github.com/minio/gokrb5/v7/client"
"github.com/minio/gokrb5/v7/config"
"github.com/minio/gokrb5/v7/credentials"
"github.com/minio/hdfs/v3"
"github.com/minio/hdfs/v3/hadoopconf"
"github.com/minio/minio-go/v6/pkg/s3utils"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger"
@@ -102,7 +107,7 @@ EXAMPLES:
// Handler for 'minio gateway hdfs' command line.
func hdfsGatewayMain(ctx *cli.Context) {
// Validate gateway arguments.
if !ctx.Args().Present() || ctx.Args().First() == "help" {
if ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, hdfsBackend, 1)
}
@@ -119,6 +124,43 @@ func (g *HDFS) Name() string {
return hdfsBackend
}
func getKerberosClient() (*krb.Client, error) {
configPath := os.Getenv("KRB5_CONFIG")
if configPath == "" {
configPath = "/etc/krb5.conf"
}
cfg, err := config.Load(configPath)
if err != nil {
return nil, err
}
// Determine the ccache location from the environment,
// falling back to the default location.
ccachePath := os.Getenv("KRB5CCNAME")
if strings.Contains(ccachePath, ":") {
if strings.HasPrefix(ccachePath, "FILE:") {
ccachePath = strings.TrimPrefix(ccachePath, "FILE:")
} else {
return nil, fmt.Errorf("unable to use kerberos ccache: %s", ccachePath)
}
} else if ccachePath == "" {
u, err := user.Current()
if err != nil {
return nil, err
}
ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
}
ccache, err := credentials.LoadCCache(ccachePath)
if err != nil {
return nil, err
}
return krb.NewClientFromCCache(ccache, cfg)
}
// NewGatewayLayer returns hdfs gatewaylayer.
func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
dialFunc := (&net.Dialer{
@@ -127,25 +169,42 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error
DualStack: true,
}).DialContext
var addresses []string
for _, s := range g.args {
u, err := xnet.ParseURL(s)
if err != nil {
return nil, err
}
addresses = append(addresses, u.Host)
}
user, err := user.Current()
hconfig, err := hadoopconf.LoadFromEnvironment()
if err != nil {
return nil, err
}
opts := hdfs.ClientOptions{
Addresses: addresses,
User: user.Username,
NamenodeDialFunc: dialFunc,
DatanodeDialFunc: dialFunc,
opts := hdfs.ClientOptionsFromConf(hconfig)
opts.NamenodeDialFunc = dialFunc
opts.DatanodeDialFunc = dialFunc
// Not addresses found, load it from command line.
if len(opts.Addresses) == 0 {
var addresses []string
for _, s := range g.args {
u, err := xnet.ParseURL(s)
if err != nil {
return nil, err
}
addresses = append(addresses, u.Host)
}
opts.Addresses = addresses
}
if opts.KerberosClient != nil {
opts.KerberosClient, err = getKerberosClient()
if err != nil {
return nil, fmt.Errorf("Unable to initialize kerberos client: %s", err)
}
} else {
opts.User = os.Getenv("HADOOP_USER_NAME")
if opts.User == "" {
u, err := user.Current()
if err != nil {
return nil, fmt.Errorf("Unable to lookup local user: %s", err)
}
opts.User = u.Username
}
}
clnt, err := hdfs.NewClient(opts)