feat: Add support for kakfa audit logger target (#12678)

This commit is contained in:
Harshavardhana
2021-07-13 09:39:13 -07:00
committed by GitHub
parent 559d075627
commit e316873f84
14 changed files with 811 additions and 202 deletions

View File

@@ -28,12 +28,26 @@ import (
"time"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
)
// Timeout for the webhook http call
const webhookCallTimeout = 5 * time.Second
// Config http logger target
type Config struct {
Enabled bool `json:"enabled"`
Name string `json:"name"`
UserAgent string `json:"userAgent"`
Endpoint string `json:"endpoint"`
AuthToken string `json:"authToken"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
Transport http.RoundTripper `json:"-"`
// Custom logger
LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"`
}
// Target implements logger.Target and sends the json
// format of a log entry to the configured http endpoint.
// An internal buffer of logs is maintained but when the
@@ -43,32 +57,24 @@ type Target struct {
// Channel of log entries
logCh chan interface{}
name string
// HTTP(s) endpoint
endpoint string
// Authorization token for `endpoint`
authToken string
// User-Agent to be set on each log to `endpoint`
userAgent string
logKind string
client http.Client
config Config
}
// Endpoint returns the backend endpoint
func (h *Target) Endpoint() string {
return h.endpoint
return h.config.Endpoint
}
func (h *Target) String() string {
return h.name
return h.config.Name
}
// Validate validate the http target
func (h *Target) Validate() error {
// Init validate and initialize the http target
func (h *Target) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.endpoint, strings.NewReader(`{}`))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`))
if err != nil {
return err
}
@@ -77,13 +83,14 @@ func (h *Target) Validate() error {
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.userAgent)
req.Header.Set("User-Agent", h.config.UserAgent)
if h.authToken != "" {
req.Header.Set("Authorization", h.authToken)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
resp, err := h.client.Do(req)
client := http.Client{Transport: h.config.Transport}
resp, err := client.Do(req)
if err != nil {
return err
}
@@ -95,12 +102,13 @@ func (h *Target) Validate() error {
switch resp.StatusCode {
case http.StatusForbidden:
return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set",
h.endpoint, resp.Status)
h.config.Endpoint, resp.Status)
}
return fmt.Errorf("%s returned '%s', please check your endpoint configuration",
h.endpoint, resp.Status)
h.config.Endpoint, resp.Status)
}
go h.startHTTPLogger()
return nil
}
@@ -116,7 +124,7 @@ func (h *Target) startHTTPLogger() {
ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
h.endpoint, bytes.NewReader(logJSON))
h.config.Endpoint, bytes.NewReader(logJSON))
if err != nil {
cancel()
continue
@@ -125,17 +133,17 @@ func (h *Target) startHTTPLogger() {
// Set user-agent to indicate MinIO release
// version to the configured log endpoint
req.Header.Set("User-Agent", h.userAgent)
req.Header.Set("User-Agent", h.config.UserAgent)
if h.authToken != "" {
req.Header.Set("Authorization", h.authToken)
if h.config.AuthToken != "" {
req.Header.Set("Authorization", h.config.AuthToken)
}
resp, err := h.client.Do(req)
client := http.Client{Transport: h.config.Transport}
resp, err := client.Do(req)
cancel()
if err != nil {
logger.LogOnceIf(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration",
h.endpoint, err), h.endpoint)
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint)
continue
}
@@ -145,88 +153,28 @@ func (h *Target) startHTTPLogger() {
if resp.StatusCode != http.StatusOK {
switch resp.StatusCode {
case http.StatusForbidden:
logger.LogOnceIf(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set",
h.endpoint, resp.Status), h.endpoint)
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint)
default:
logger.LogOnceIf(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration",
h.endpoint, resp.Status), h.endpoint)
h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint)
}
}
}
}()
}
// Option is a function type that accepts a pointer Target
type Option func(*Target)
// WithTargetName target name
func WithTargetName(name string) Option {
return func(t *Target) {
t.name = name
}
}
// WithEndpoint adds a new endpoint
func WithEndpoint(endpoint string) Option {
return func(t *Target) {
t.endpoint = endpoint
}
}
// WithLogKind adds a log type for this target
func WithLogKind(logKind string) Option {
return func(t *Target) {
t.logKind = strings.ToUpper(logKind)
}
}
// WithUserAgent adds a custom user-agent sent to the target.
func WithUserAgent(userAgent string) Option {
return func(t *Target) {
t.userAgent = userAgent
}
}
// WithAuthToken adds a new authorization header to be sent to target.
func WithAuthToken(authToken string) Option {
return func(t *Target) {
t.authToken = authToken
}
}
// WithTransport adds a custom transport with custom timeouts and tuning.
func WithTransport(transport *http.Transport) Option {
return func(t *Target) {
t.client = http.Client{
Transport: transport,
}
}
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(opts ...Option) *Target {
func New(config Config) *Target {
h := &Target{
logCh: make(chan interface{}, 10000),
logCh: make(chan interface{}, 10000),
config: config,
}
// Loop through each option
for _, opt := range opts {
// Call the option giving the instantiated
// *Target as the argument
opt(h)
}
h.startHTTPLogger()
return h
}
// Send log message 'e' to http target.
func (h *Target) Send(entry interface{}, errKind string) error {
if h.logKind != errKind && h.logKind != "ALL" {
return nil
}
select {
case h.logCh <- entry:
default:

View File

@@ -0,0 +1,208 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package kafka
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"net"
sarama "github.com/Shopify/sarama"
saramatls "github.com/Shopify/sarama/tools/tls"
"github.com/minio/minio/internal/logger/message/audit"
xnet "github.com/minio/pkg/net"
)
// Target - Kafka target.
type Target struct {
// Channel of log entries
logCh chan interface{}
producer sarama.SyncProducer
kconfig Config
config *sarama.Config
}
// Send log message 'e' to kafka target.
func (h *Target) Send(entry interface{}, errKind string) error {
select {
case h.logCh <- entry:
default:
// log channel is full, do not wait and return
// an error immediately to the caller
return errors.New("log buffer full")
}
return nil
}
func (h *Target) startKakfaLogger() {
// Create a routine which sends json logs received
// from an internal channel.
go func() {
for entry := range h.logCh {
logJSON, err := json.Marshal(&entry)
if err != nil {
continue
}
ae, ok := entry.(audit.Entry)
if ok {
msg := sarama.ProducerMessage{
Topic: h.kconfig.Topic,
Key: sarama.StringEncoder(ae.RequestID),
Value: sarama.ByteEncoder(logJSON),
}
_, _, err = h.producer.SendMessage(&msg)
if err != nil {
h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic)
continue
}
}
}
}()
}
// Config - kafka target arguments.
type Config struct {
Enabled bool `json:"enable"`
Brokers []xnet.Host `json:"brokers"`
Topic string `json:"topic"`
Version string `json:"version"`
TLS struct {
Enable bool `json:"enable"`
RootCAs *x509.CertPool `json:"-"`
SkipVerify bool `json:"skipVerify"`
ClientAuth tls.ClientAuthType `json:"clientAuth"`
ClientTLSCert string `json:"clientTLSCert"`
ClientTLSKey string `json:"clientTLSKey"`
} `json:"tls"`
SASL struct {
Enable bool `json:"enable"`
User string `json:"username"`
Password string `json:"password"`
Mechanism string `json:"mechanism"`
} `json:"sasl"`
// Custom logger
LogOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) `json:"-"`
}
// Check if atleast one broker in cluster is active
func (k Config) pingBrokers() error {
var err error
for _, broker := range k.Brokers {
_, err1 := net.Dial("tcp", broker.String())
if err1 != nil {
if err == nil {
// Set first error
err = err1
}
}
}
return err
}
// Endpoint - return kafka target
func (h *Target) Endpoint() string {
return "kafka"
}
// String - kafka string
func (h *Target) String() string {
return "kafka"
}
// Init initialize kafka target
func (h *Target) Init() error {
if !h.kconfig.Enabled {
return nil
}
if len(h.kconfig.Brokers) == 0 {
return errors.New("no broker address found")
}
for _, b := range h.kconfig.Brokers {
if _, err := xnet.ParseHost(b.String()); err != nil {
return err
}
}
if err := h.kconfig.pingBrokers(); err != nil {
return err
}
sconfig := sarama.NewConfig()
if h.kconfig.Version != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(h.kconfig.Version)
if err != nil {
return err
}
sconfig.Version = kafkaVersion
}
sconfig.Net.SASL.User = h.kconfig.SASL.User
sconfig.Net.SASL.Password = h.kconfig.SASL.Password
initScramClient(h.kconfig, sconfig) // initializes configured scram client.
sconfig.Net.SASL.Enable = h.kconfig.SASL.Enable
tlsConfig, err := saramatls.NewConfig(h.kconfig.TLS.ClientTLSCert, h.kconfig.TLS.ClientTLSKey)
if err != nil {
return err
}
sconfig.Net.TLS.Enable = h.kconfig.TLS.Enable
sconfig.Net.TLS.Config = tlsConfig
sconfig.Net.TLS.Config.InsecureSkipVerify = h.kconfig.TLS.SkipVerify
sconfig.Net.TLS.Config.ClientAuth = h.kconfig.TLS.ClientAuth
sconfig.Net.TLS.Config.RootCAs = h.kconfig.TLS.RootCAs
sconfig.Producer.RequiredAcks = sarama.WaitForAll
sconfig.Producer.Retry.Max = 10
sconfig.Producer.Return.Successes = true
h.config = sconfig
var brokers []string
for _, broker := range h.kconfig.Brokers {
brokers = append(brokers, broker.String())
}
producer, err := sarama.NewSyncProducer(brokers, sconfig)
if err != nil {
return err
}
h.producer = producer
go h.startKakfaLogger()
return nil
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(config Config) *Target {
target := &Target{
logCh: make(chan interface{}, 10000),
kconfig: config,
}
return target
}

View File

@@ -0,0 +1,84 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package kafka
import (
"crypto/sha256"
"crypto/sha512"
"github.com/Shopify/sarama"
"github.com/xdg/scram"
)
func initScramClient(cfg Config, config *sarama.Config) {
if cfg.SASL.Mechanism == "sha512" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
} else if cfg.SASL.Mechanism == "sha256" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
} else {
// default to PLAIN
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
}
}
// KafkaSHA256 is a function that returns a crypto/sha256 hasher and should be used
// to create Client objects configured for SHA-256 hashing.
var KafkaSHA256 scram.HashGeneratorFcn = sha256.New
// KafkaSHA512 is a function that returns a crypto/sha512 hasher and should be used
// to create Client objects configured for SHA-512 hashing.
var KafkaSHA512 scram.HashGeneratorFcn = sha512.New
// XDGSCRAMClient implements the client-side of an authentication
// conversation with a server. A new conversation must be created for
// each authentication attempt.
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
// Begin constructs a SCRAM client component based on a given hash.Hash
// factory receiver. This constructor will normalize the username, password
// and authzID via the SASLprep algorithm, as recommended by RFC-5802. If
// SASLprep fails, the method returns an error.
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
// Step takes a string provided from a server (or just an empty string for the
// very first conversation step) and attempts to move the authentication
// conversation forward. It returns a string to be sent to the server or an
// error if the server message is invalid. Calling Step after a conversation
// completes is also an error.
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
// Done returns true if the conversation is completed or has errored.
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}