kafka: Do not require key when sending a message (#17962)

Keys are helpful to ensure the strict ordering of messages, however currently the
code uses a random request id for every log, hence using the request-id
as a Kafka key is not serve any purpose;

This commit removes the usage of the key, to also fix the audit issue from
internal subsystem that does not have a request ID.
This commit is contained in:
Anis Eleuch 2023-09-01 16:37:22 +01:00 committed by GitHub
parent b1c1f02132
commit 6a8d8f34a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 1 additions and 9 deletions

View File

@ -22,7 +22,7 @@ package cmd
import "errors"
// Rename captures time taken to call os.Rename
// Rename2 is not implemented in a non linux environment
func Rename2(src, dst string) (err error) {
defer updateOSMetrics(osMetricRename2, src, dst)(errors.New("not implemented, skipping"))
return errSkipFile

View File

@ -27,14 +27,12 @@ import (
"net"
"os"
"path/filepath"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/IBM/sarama"
saramatls "github.com/IBM/sarama/tools/tls"
"github.com/tidwall/gjson"
"github.com/minio/minio/internal/logger/target/types"
"github.com/minio/minio/internal/once"
@ -241,14 +239,8 @@ func (h *Target) send(entry interface{}) error {
if err != nil {
return err
}
requestID := gjson.GetBytes(logJSON, "requestID").Str
if requestID == "" {
// unsupported data structure
return fmt.Errorf("unsupported data structure: %s must be either audit.Entry or log.Entry", reflect.TypeOf(entry))
}
msg := sarama.ProducerMessage{
Topic: h.kconfig.Topic,
Key: sarama.StringEncoder(requestID),
Value: sarama.ByteEncoder(logJSON),
}
_, _, err = h.producer.SendMessage(&msg)