From a22ce4550c64d5337756df911f78f1df24b00170 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 24 Jun 2024 10:59:48 -0700 Subject: [PATCH] protect workers and simplify use of atomics (#19982) without atomic load() it is possible that for a slow receiver we would get into a hot-loop, when logCh is full and there are many incoming callers. to avoid this as a workaround enable BATCH_SIZE greater than 100 to ensure that your slow receiver receives data in bulk to avoid being throttled in some manner. this PR however fixes the unprotected access to the current workers value. --- .../replication/setup_3site_replication.sh | 4 +- docs/iam/policies/pbac-tests.sh | 30 +++++----- internal/logger/target/http/http.go | 55 +++++++++++-------- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/docs/bucket/replication/setup_3site_replication.sh b/docs/bucket/replication/setup_3site_replication.sh index 869d9f4b8..8cbb104dc 100755 --- a/docs/bucket/replication/setup_3site_replication.sh +++ b/docs/bucket/replication/setup_3site_replication.sh @@ -43,8 +43,8 @@ unset MINIO_KMS_KES_KEY_FILE unset MINIO_KMS_KES_ENDPOINT unset MINIO_KMS_KES_KEY_NAME -wget -q -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && - chmod +x mc +go install -v github.com/minio/mc@master +cp -a $(go env GOPATH)/bin/mc ./mc if [ ! -f mc.RELEASE.2021-03-12T03-36-59Z ]; then wget -q -O mc.RELEASE.2021-03-12T03-36-59Z https://dl.minio.io/client/mc/release/linux-amd64/archive/mc.RELEASE.2021-03-12T03-36-59Z && diff --git a/docs/iam/policies/pbac-tests.sh b/docs/iam/policies/pbac-tests.sh index c645db281..607abc3eb 100755 --- a/docs/iam/policies/pbac-tests.sh +++ b/docs/iam/policies/pbac-tests.sh @@ -8,10 +8,8 @@ pkill minio pkill kes rm -rf /tmp/xl -if [ ! -f ./mc ]; then - wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && - chmod +x mc -fi +go install -v github.com/minio/mc@master +cp -a $(go env GOPATH)/bin/mc ./mc if [ ! -f ./kes ]; then wget --quiet -O kes https://github.com/minio/kes/releases/latest/download/kes-linux-amd64 && @@ -39,37 +37,37 @@ export MC_HOST_myminio="http://minioadmin:minioadmin@localhost:9000/" (minio server http://localhost:9000/tmp/xl/{1...10}/disk{0...1} 2>&1 >/dev/null) & pid=$! -./mc ready myminio +mc ready myminio -./mc admin user add myminio/ minio123 minio123 +mc admin user add myminio/ minio123 minio123 -./mc admin policy create myminio/ deny-non-sse-kms-pol ./docs/iam/policies/deny-non-sse-kms-objects.json -./mc admin policy create myminio/ deny-invalid-sse-kms-pol ./docs/iam/policies/deny-objects-with-invalid-sse-kms-key-id.json +mc admin policy create myminio/ deny-non-sse-kms-pol ./docs/iam/policies/deny-non-sse-kms-objects.json +mc admin policy create myminio/ deny-invalid-sse-kms-pol ./docs/iam/policies/deny-objects-with-invalid-sse-kms-key-id.json -./mc admin policy attach myminio deny-non-sse-kms-pol --user minio123 -./mc admin policy attach myminio deny-invalid-sse-kms-pol --user minio123 -./mc admin policy attach myminio consoleAdmin --user minio123 +mc admin policy attach myminio deny-non-sse-kms-pol --user minio123 +mc admin policy attach myminio deny-invalid-sse-kms-pol --user minio123 +mc admin policy attach myminio consoleAdmin --user minio123 -./mc mb -l myminio/test-bucket -./mc mb -l myminio/multi-key-poc +mc mb -l myminio/test-bucket +mc mb -l myminio/multi-key-poc export MC_HOST_myminio1="http://minio123:minio123@localhost:9000/" -./mc cp /etc/issue myminio1/test-bucket +mc cp /etc/issue myminio1/test-bucket ret=$? if [ $ret -ne 0 ]; then echo "BUG: PutObject to bucket: test-bucket should succeed. Failed" exit 1 fi -./mc cp /etc/issue myminio1/multi-key-poc | grep -q "Insufficient permissions to access this path" +mc cp /etc/issue myminio1/multi-key-poc | grep -q "Insufficient permissions to access this path" ret=$? if [ $ret -eq 0 ]; then echo "BUG: PutObject to bucket: multi-key-poc without sse-kms should fail. Succedded" exit 1 fi -./mc cp /etc/hosts myminio1/multi-key-poc/hosts --enc-kms "myminio1/multi-key-poc/hosts=minio-default-key" +mc cp /etc/hosts myminio1/multi-key-poc/hosts --enc-kms "myminio1/multi-key-poc/hosts=minio-default-key" ret=$? if [ $ret -ne 0 ]; then echo "BUG: PutObject to bucket: multi-key-poc with valid sse-kms should succeed. Failed" diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index c29a97786..f1fd35fbb 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -90,13 +90,14 @@ type Config struct { // buffer is full, new logs are just ignored and an error // is returned to the caller. type Target struct { - totalMessages int64 - failedMessages int64 - status int32 + totalMessages atomic.Int64 + failedMessages atomic.Int64 + status atomic.Int32 // Worker control - workers int64 + workers atomic.Int64 maxWorkers int64 + // workerStartMu sync.Mutex lastStarted time.Time @@ -157,7 +158,7 @@ func (h *Target) String() string { // IsOnline returns true if the target is reachable using a cached value func (h *Target) IsOnline(ctx context.Context) bool { - return atomic.LoadInt32(&h.status) == statusOnline + return h.status.Load() == statusOnline } // Stats returns the target statistics. @@ -166,8 +167,8 @@ func (h *Target) Stats() types.TargetStats { queueLength := len(h.logCh) h.logChMu.RUnlock() stats := types.TargetStats{ - TotalMessages: atomic.LoadInt64(&h.totalMessages), - FailedMessages: atomic.LoadInt64(&h.failedMessages), + TotalMessages: h.totalMessages.Load(), + FailedMessages: h.failedMessages.Load(), QueueLength: queueLength, } @@ -221,9 +222,9 @@ func (h *Target) initMemoryStore(ctx context.Context) (err error) { func (h *Target) send(ctx context.Context, payload []byte, payloadType string, timeout time.Duration) (err error) { defer func() { if err != nil { - atomic.StoreInt32(&h.status, statusOffline) + h.status.Store(statusOffline) } else { - atomic.StoreInt32(&h.status, statusOnline) + h.status.Store(statusOnline) } }() @@ -275,8 +276,8 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } h.logChMu.RUnlock() - atomic.AddInt64(&h.workers, 1) - defer atomic.AddInt64(&h.workers, -1) + h.workers.Add(1) + defer h.workers.Add(-1) h.wg.Add(1) defer h.wg.Done() @@ -353,7 +354,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { } if !isTick { - atomic.AddInt64(&h.totalMessages, 1) + h.totalMessages.Add(1) if !isDirQueue { if err := enc.Encode(&entry); err != nil { @@ -362,7 +363,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry), h.Name(), ) - atomic.AddInt64(&h.failedMessages, 1) + h.failedMessages.Add(1) continue } } @@ -395,7 +396,7 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) { // and when it's been at least 30 seconds since // we launched a new worker. if mainWorker && len(h.logCh) > cap(h.logCh)/2 { - nWorkers := atomic.LoadInt64(&h.workers) + nWorkers := h.workers.Load() if nWorkers < h.maxWorkers { if time.Since(h.lastStarted).Milliseconds() > 10 { h.lastStarted = time.Now() @@ -493,10 +494,10 @@ func New(config Config) (*Target, error) { h := &Target{ logCh: make(chan interface{}, config.QueueSize), config: config, - status: statusOffline, batchSize: config.BatchSize, maxWorkers: int64(maxWorkers), } + h.status.Store(statusOffline) if config.BatchSize > 1 { h.payloadType = "" @@ -528,10 +529,17 @@ func (h *Target) SendFromStore(key store.Key) (err error) { return err } + h.failedMessages.Add(1) + defer func() { + if err == nil { + h.failedMessages.Add(-1) + } + }() + if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil { - atomic.AddInt64(&h.failedMessages, 1) return err } + // Delete the event from store. return h.store.Del(key.Name) } @@ -540,7 +548,7 @@ func (h *Target) SendFromStore(key store.Key) (err error) { // Messages are queued in the disk if the store is enabled // If Cancel has been called the message is ignored. func (h *Target) Send(ctx context.Context, entry interface{}) error { - if atomic.LoadInt32(&h.status) == statusClosed { + if h.status.Load() == statusClosed { if h.migrateTarget != nil { return h.migrateTarget.Send(ctx, entry) } @@ -557,7 +565,7 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error { retry: select { case h.logCh <- entry: - atomic.AddInt64(&h.totalMessages, 1) + h.totalMessages.Add(1) case <-ctx.Done(): // return error only for context timedout. if errors.Is(ctx.Err(), context.DeadlineExceeded) { @@ -565,11 +573,14 @@ retry: } return nil default: - if h.workers < h.maxWorkers { + nWorkers := h.workers.Load() + if nWorkers < h.maxWorkers { + // Just sleep to avoid any possible hot-loops. + time.Sleep(50 * time.Millisecond) goto retry } - atomic.AddInt64(&h.totalMessages, 1) - atomic.AddInt64(&h.failedMessages, 1) + h.totalMessages.Add(1) + h.failedMessages.Add(1) return errors.New("log buffer full") } @@ -580,7 +591,7 @@ retry: // All queued messages are flushed and the function returns afterwards. // All messages sent to the target after this function has been called will be dropped. func (h *Target) Cancel() { - atomic.StoreInt32(&h.status, statusClosed) + h.status.Store(statusClosed) h.storeCtxCancel() // Wait for messages to be sent...