feat: support elasticsearch notification endpoint compression codec (#18562)

This commit is contained in:
jiuker 2023-11-30 16:25:03 +08:00 committed by GitHub
parent 0ee722f8c3
commit 34187e047d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 47 deletions

View File

@ -1150,6 +1150,8 @@ MINIO_NOTIFY_KAFKA_QUEUE_DIR (path) staging dir for unde
MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '100000' MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '100000'
MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting
MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0' MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0'
MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC (none|snappy|gzip|lz4|zstd) compression codec for producer messages
MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL (number) compression level for producer messages, defaults to '0'
``` ```
To update the configuration, use `mc admin config get` command to get the current configuration. To update the configuration, use `mc admin config get` command to get the current configuration.

View File

@ -262,6 +262,18 @@ var (
Optional: true, Optional: true,
Type: "sentence", Type: "sentence",
}, },
config.HelpKV{
Key: target.KafkaCompressionCodec,
Description: "specify compression_codec of the Kafka cluster",
Optional: true,
Type: "none|snappy|gzip|lz4|zstd",
},
config.HelpKV{
Key: target.KafkaCompressionLevel,
Description: "specify compression level of the Kafka cluster",
Optional: true,
Type: "number",
},
} }
HelpMQTT = config.HelpKVS{ HelpMQTT = config.HelpKVS{

View File

@ -95,6 +95,14 @@ func SetNotifyKafka(s config.Config, name string, cfg target.KafkaArgs) error {
Key: target.KafkaSASLPassword, Key: target.KafkaSASLPassword,
Value: cfg.SASL.Password, Value: cfg.SASL.Password,
}, },
config.KV{
Key: target.KafkaCompressionCodec,
Value: cfg.Producer.Compression,
},
config.KV{
Key: target.KafkaCompressionLevel,
Value: strconv.Itoa(cfg.Producer.CompressionLevel),
},
} }
return nil return nil
} }

View File

@ -366,6 +366,14 @@ var (
Key: target.KafkaBatchSize, Key: target.KafkaBatchSize,
Value: "0", Value: "0",
}, },
config.KV{
Key: target.KafkaCompressionCodec,
Value: "",
},
config.KV{
Key: target.KafkaCompressionLevel,
Value: "",
},
} }
) )
@ -483,6 +491,19 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs
kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert)) kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert))
kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey)) kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey))
compressionCodecEnv := target.EnvKafkaProducerCompressionCodec
if k != config.Default {
compressionCodecEnv = compressionCodecEnv + config.Default + k
}
kafkaArgs.Producer.Compression = env.Get(compressionCodecEnv, kv.Get(target.KafkaCompressionCodec))
compressionLevelEnv := target.EnvKafkaProducerCompressionLevel
if k != config.Default {
compressionLevelEnv = compressionLevelEnv + config.Default + k
}
compressionLevel, _ := strconv.Atoi(env.Get(compressionLevelEnv, kv.Get(target.KafkaCompressionLevel)))
kafkaArgs.Producer.CompressionLevel = compressionLevel
saslEnableEnv := target.EnvKafkaSASLEnable saslEnableEnv := target.EnvKafkaSASLEnable
if k != config.Default { if k != config.Default {
saslEnableEnv = saslEnableEnv + config.Default + k saslEnableEnv = saslEnableEnv + config.Default + k

View File

@ -28,6 +28,7 @@ import (
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
@ -58,6 +59,8 @@ const (
KafkaClientTLSKey = "client_tls_key" KafkaClientTLSKey = "client_tls_key"
KafkaVersion = "version" KafkaVersion = "version"
KafkaBatchSize = "batch_size" KafkaBatchSize = "batch_size"
KafkaCompressionCodec = "compression_codec"
KafkaCompressionLevel = "compression_level"
EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS"
@ -75,8 +78,18 @@ const (
EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION"
EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE" EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE"
EnvKafkaProducerCompressionCodec = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC"
EnvKafkaProducerCompressionLevel = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL"
) )
var codecs = map[string]sarama.CompressionCodec{
"none": sarama.CompressionNone,
"gzip": sarama.CompressionGZIP,
"snappy": sarama.CompressionSnappy,
"lz4": sarama.CompressionLZ4,
"zstd": sarama.CompressionZSTD,
}
// KafkaArgs - Kafka target arguments. // KafkaArgs - Kafka target arguments.
type KafkaArgs struct { type KafkaArgs struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
@ -100,6 +113,10 @@ type KafkaArgs struct {
Password string `json:"password"` Password string `json:"password"`
Mechanism string `json:"mechanism"` Mechanism string `json:"mechanism"`
} `json:"sasl"` } `json:"sasl"`
Producer struct {
Compression string `json:"compression"`
CompressionLevel int `json:"compressionLevel"`
} `json:"producer"`
} }
// Validate KafkaArgs fields // Validate KafkaArgs fields
@ -391,6 +408,13 @@ func (target *KafkaTarget) initKafka() error {
config.Producer.Return.Errors = true config.Producer.Return.Errors = true
config.Producer.RequiredAcks = 1 config.Producer.RequiredAcks = 1
config.Producer.Timeout = (5 * time.Second) config.Producer.Timeout = (5 * time.Second)
// Set Producer Compression
cc, ok := codecs[strings.ToLower(args.Producer.Compression)]
if ok {
config.Producer.Compression = cc
config.Producer.CompressionLevel = args.Producer.CompressionLevel
}
config.Net.ReadTimeout = (5 * time.Second) config.Net.ReadTimeout = (5 * time.Second)
config.Net.DialTimeout = (5 * time.Second) config.Net.DialTimeout = (5 * time.Second)
config.Net.WriteTimeout = (5 * time.Second) config.Net.WriteTimeout = (5 * time.Second)