mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
tests: Do not allow forced type asserts (#20905)
This commit is contained in:
45
internal/bpool/pool.go
Normal file
45
internal/bpool/pool.go
Normal file
@@ -0,0 +1,45 @@
|
||||
// Copyright (c) 2015-2025 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package bpool
|
||||
|
||||
import "sync"
|
||||
|
||||
// Pool is a single type sync.Pool with a few extra properties:
|
||||
// If New is not set Get may return the zero value of T.
|
||||
type Pool[T any] struct {
|
||||
New func() T
|
||||
p sync.Pool
|
||||
}
|
||||
|
||||
// Get will retuen a new T
|
||||
func (p *Pool[T]) Get() T {
|
||||
v, ok := p.p.Get().(T)
|
||||
if ok {
|
||||
return v
|
||||
}
|
||||
if p.New == nil {
|
||||
var t T
|
||||
return t
|
||||
}
|
||||
return p.New()
|
||||
}
|
||||
|
||||
// Put a used T.
|
||||
func (p *Pool[T]) Put(t T) {
|
||||
p.p.Put(t)
|
||||
}
|
||||
@@ -127,7 +127,6 @@ func (m *Monitor) getReport(selectBucket SelectionFunction) *BucketBandwidthRepo
|
||||
}
|
||||
}
|
||||
m.tlock.RUnlock()
|
||||
|
||||
}
|
||||
return report
|
||||
}
|
||||
|
||||
@@ -64,7 +64,6 @@ func (r *MonitoredReader) Read(buf []byte) (n int, err error) {
|
||||
r.opts.HeaderSize = 0
|
||||
need = int(math.Min(float64(b-hdr), float64(need))) // use remaining tokens towards payload
|
||||
tokens = need + hdr
|
||||
|
||||
} else { // part of header can be accommodated
|
||||
r.opts.HeaderSize -= b - 1
|
||||
need = 1 // to ensure we read at least one byte for every Read
|
||||
|
||||
@@ -207,7 +207,6 @@ func (lc Lifecycle) HasActiveRules(prefix string) bool {
|
||||
if !rule.Transition.IsNull() { // this allows for Transition.Days to be zero.
|
||||
return true
|
||||
}
|
||||
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -365,7 +365,6 @@ func TestHasActiveRules(t *testing.T) {
|
||||
t.Fatalf("Expected result with recursive set to true: `%v`, got: `%v`", tc.expectedRec, got)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -67,6 +67,5 @@ func TestMetadataReplicate(t *testing.T) {
|
||||
t.Fatalf("Expected result with recursive set to false: `%v`, got: `%v`", tc.expectedResult, got)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,10 +22,11 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func createTempFile(prefix, content string) (tempFile string, err error) {
|
||||
func createTempFile(t testing.TB, prefix, content string) (tempFile string, err error) {
|
||||
t.Helper()
|
||||
var tmpfile *os.File
|
||||
|
||||
if tmpfile, err = os.CreateTemp("", prefix); err != nil {
|
||||
if tmpfile, err = os.CreateTemp(t.TempDir(), prefix); err != nil {
|
||||
return tempFile, err
|
||||
}
|
||||
|
||||
@@ -42,14 +43,13 @@ func createTempFile(prefix, content string) (tempFile string, err error) {
|
||||
}
|
||||
|
||||
func TestParsePublicCertFile(t *testing.T) {
|
||||
tempFile1, err := createTempFile("public-cert-file", "")
|
||||
tempFile1, err := createTempFile(t, "public-cert-file", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to create temporary file. %v", err)
|
||||
}
|
||||
defer os.Remove(tempFile1)
|
||||
|
||||
tempFile2, err := createTempFile("public-cert-file",
|
||||
`-----BEGIN CERTIFICATE-----
|
||||
tempFile2, err := createTempFile(t, "public-cert-file", `-----BEGIN CERTIFICATE-----
|
||||
MIICdTCCAd4CCQCO5G/W1xcE9TANBgkqhkiG9w0BAQUFADB/MQswCQYDVQQGEwJa
|
||||
WTEOMAwGA1UECBMFTWluaW8xETAPBgNVBAcTCEludGVybmV0MQ4wDAYDVQQKEwVN
|
||||
aW5pbzEOMAwGA1UECxMFTWluaW8xDjAMBgNVBAMTBU1pbmlvMR0wGwYJKoZIhvcN
|
||||
@@ -70,8 +70,7 @@ M9ofSEt/bdRD
|
||||
}
|
||||
defer os.Remove(tempFile2)
|
||||
|
||||
tempFile3, err := createTempFile("public-cert-file",
|
||||
`-----BEGIN CERTIFICATE-----
|
||||
tempFile3, err := createTempFile(t, "public-cert-file", `-----BEGIN CERTIFICATE-----
|
||||
MIICdTCCAd4CCQCO5G/W1xcE9TANBgkqhkiG9w0BAQUFADB/MQswCQYDVQQGEwJa
|
||||
WTEOMAwGA1UECBMFTWluaW8xETAPBgNVBAcTCEludGVybmV0MQ4wDAYDVQQKEwVN
|
||||
aW5pbzEOMAwGA1UECxMFTWluaW8xDjAMBgNVBAMTBU1pbmlvMR0wGwYJKoZIhvcN
|
||||
@@ -92,8 +91,7 @@ M9ofSEt/bdRD
|
||||
}
|
||||
defer os.Remove(tempFile3)
|
||||
|
||||
tempFile4, err := createTempFile("public-cert-file",
|
||||
`-----BEGIN CERTIFICATE-----
|
||||
tempFile4, err := createTempFile(t, "public-cert-file", `-----BEGIN CERTIFICATE-----
|
||||
MIICdTCCAd4CCQCO5G/W1xcE9TANBgkqhkiG9w0BAQUFADB/MQswCQYDVQQGEwJa
|
||||
WTEOMAwGA1UECBMFTWluaW8xETAPBgNVBAcTCEludGVybmV0MQ4wDAYDVQQKEwVN
|
||||
aW5pbzEOMAwGA1UECxMFTWluaW8xDjAMBgNVBAMTBU1pbmlvMR0wGwYJKoZIhvcN
|
||||
@@ -114,8 +112,7 @@ M9ofSEt/bdRD
|
||||
}
|
||||
defer os.Remove(tempFile4)
|
||||
|
||||
tempFile5, err := createTempFile("public-cert-file",
|
||||
`-----BEGIN CERTIFICATE-----
|
||||
tempFile5, err := createTempFile(t, "public-cert-file", `-----BEGIN CERTIFICATE-----
|
||||
MIICdTCCAd4CCQCO5G/W1xcE9TANBgkqhkiG9w0BAQUFADB/MQswCQYDVQQGEwJa
|
||||
WTEOMAwGA1UECBMFTWluaW8xETAPBgNVBAcTCEludGVybmV0MQ4wDAYDVQQKEwVN
|
||||
aW5pbzEOMAwGA1UECxMFTWluaW8xDjAMBgNVBAMTBU1pbmlvMR0wGwYJKoZIhvcN
|
||||
@@ -184,11 +181,11 @@ func TestLoadX509KeyPair(t *testing.T) {
|
||||
os.Unsetenv(EnvCertPassword)
|
||||
})
|
||||
for i, testCase := range loadX509KeyPairTests {
|
||||
privateKey, err := createTempFile("private.key", testCase.privateKey)
|
||||
privateKey, err := createTempFile(t, "private.key", testCase.privateKey)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: failed to create tmp private key file: %v", i, err)
|
||||
}
|
||||
certificate, err := createTempFile("public.crt", testCase.certificate)
|
||||
certificate, err := createTempFile(t, "public.crt", testCase.certificate)
|
||||
if err != nil {
|
||||
os.Remove(privateKey)
|
||||
t.Fatalf("Test %d: failed to create tmp certificate file: %v", i, err)
|
||||
|
||||
@@ -143,7 +143,6 @@ func (c *CoreDNS) list(key string, domain bool) ([]SrvRecord, error) {
|
||||
|
||||
srvRecord.Key = msgUnPath(srvRecord.Key)
|
||||
srvRecords = append(srvRecords, srvRecord)
|
||||
|
||||
}
|
||||
sort.Slice(srvRecords, func(i int, j int) bool {
|
||||
return srvRecords[i].Key < srvRecords[j].Key
|
||||
|
||||
@@ -534,7 +534,6 @@ func (r *Config) GetSettings() madmin.OpenIDSettings {
|
||||
HashedClientSecret: hashedSecret,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return res
|
||||
|
||||
@@ -125,7 +125,6 @@ func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, t
|
||||
return nil, err
|
||||
}
|
||||
targets = append(targets, t)
|
||||
|
||||
}
|
||||
case config.NotifyKafkaSubSys:
|
||||
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
|
||||
@@ -142,7 +141,6 @@ func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, t
|
||||
return nil, err
|
||||
}
|
||||
targets = append(targets, t)
|
||||
|
||||
}
|
||||
|
||||
case config.NotifyMQTTSubSys:
|
||||
|
||||
@@ -127,7 +127,6 @@ func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cfg.License = strings.TrimSpace(env.Get(config.EnvMinIOSubnetLicense, kvs.Get(config.License)))
|
||||
@@ -142,9 +141,11 @@ func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err
|
||||
|
||||
// Make sure to clone the transport before editing the ProxyURL
|
||||
if proxyURL != nil {
|
||||
ctransport := transport.(*http.Transport).Clone()
|
||||
ctransport.Proxy = http.ProxyURL((*url.URL)(proxyURL))
|
||||
cfg.transport = ctransport
|
||||
if tr, ok := transport.(*http.Transport); ok {
|
||||
ctransport := tr.Clone()
|
||||
ctransport.Proxy = http.ProxyURL((*url.URL)(proxyURL))
|
||||
cfg.transport = ctransport
|
||||
}
|
||||
} else {
|
||||
cfg.transport = transport
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ func TestReadDriveStats(t *testing.T) {
|
||||
for _, testCase := range testCases {
|
||||
testCase := testCase
|
||||
t.Run("", func(t *testing.T) {
|
||||
tmpfile, err := os.CreateTemp("", "testfile")
|
||||
tmpfile, err := os.CreateTemp(t.TempDir(), "testfile")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -85,7 +85,6 @@ var _ = uint64(1 << objectSingleTypesEnd)
|
||||
// Expand - returns expanded values of abbreviated event type.
|
||||
func (name Name) Expand() []Name {
|
||||
switch name {
|
||||
|
||||
case ObjectAccessedAll:
|
||||
return []Name{
|
||||
ObjectAccessedGet, ObjectAccessedHead,
|
||||
|
||||
@@ -463,8 +463,7 @@ func (c *esClientV7) createIndex(args ElasticsearchArgs) error {
|
||||
indices, ok := v["indices"].([]interface{})
|
||||
if ok {
|
||||
for _, index := range indices {
|
||||
name := index.(map[string]interface{})["name"]
|
||||
if name == args.Index {
|
||||
if name, ok := index.(map[string]interface{}); ok && name["name"] == args.Index {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
@@ -1748,20 +1748,20 @@ func (c *Connection) debugMsg(d debugMsg, args ...any) {
|
||||
case debugSetConnPingDuration:
|
||||
c.connMu.Lock()
|
||||
defer c.connMu.Unlock()
|
||||
c.connPingInterval = args[0].(time.Duration)
|
||||
c.connPingInterval, _ = args[0].(time.Duration)
|
||||
if c.connPingInterval < time.Second {
|
||||
panic("CONN ping interval too low")
|
||||
}
|
||||
case debugSetClientPingDuration:
|
||||
c.connMu.Lock()
|
||||
defer c.connMu.Unlock()
|
||||
c.clientPingInterval = args[0].(time.Duration)
|
||||
c.clientPingInterval, _ = args[0].(time.Duration)
|
||||
case debugAddToDeadline:
|
||||
c.addDeadline = args[0].(time.Duration)
|
||||
c.addDeadline, _ = args[0].(time.Duration)
|
||||
case debugIsOutgoingClosed:
|
||||
// params: muxID uint64, isClosed func(bool)
|
||||
muxID := args[0].(uint64)
|
||||
resp := args[1].(func(b bool))
|
||||
muxID, _ := args[0].(uint64)
|
||||
resp, _ := args[1].(func(b bool))
|
||||
mid, ok := c.outgoing.Load(muxID)
|
||||
if !ok || mid == nil {
|
||||
resp(true)
|
||||
@@ -1772,7 +1772,8 @@ func (c *Connection) debugMsg(d debugMsg, args ...any) {
|
||||
mid.respMu.Unlock()
|
||||
case debugBlockInboundMessages:
|
||||
c.connMu.Lock()
|
||||
block := (<-chan struct{})(args[0].(chan struct{}))
|
||||
a, _ := args[0].(chan struct{})
|
||||
block := (<-chan struct{})(a)
|
||||
c.blockMessages.Store(&block)
|
||||
c.connMu.Unlock()
|
||||
}
|
||||
|
||||
@@ -28,11 +28,11 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
)
|
||||
|
||||
// ErrDisconnected is returned when the connection to the remote has been lost during the call.
|
||||
@@ -68,15 +68,15 @@ const (
|
||||
defaultSingleRequestTimeout = time.Minute
|
||||
)
|
||||
|
||||
var internalByteBuffer = sync.Pool{
|
||||
New: func() any {
|
||||
var internalByteBuffer = bpool.Pool[*[]byte]{
|
||||
New: func() *[]byte {
|
||||
m := make([]byte, 0, defaultBufferSize)
|
||||
return &m
|
||||
},
|
||||
}
|
||||
|
||||
var internal32KByteBuffer = sync.Pool{
|
||||
New: func() any {
|
||||
var internal32KByteBuffer = bpool.Pool[*[]byte]{
|
||||
New: func() *[]byte {
|
||||
m := make([]byte, 0, biggerBufMin)
|
||||
return &m
|
||||
},
|
||||
@@ -87,7 +87,7 @@ var internal32KByteBuffer = sync.Pool{
|
||||
// When replacing PutByteBuffer should also be replaced
|
||||
// There is no minimum size.
|
||||
var GetByteBuffer = func() []byte {
|
||||
b := *internalByteBuffer.Get().(*[]byte)
|
||||
b := *internalByteBuffer.Get()
|
||||
return b[:0]
|
||||
}
|
||||
|
||||
@@ -101,7 +101,7 @@ func GetByteBufferCap(wantSz int) []byte {
|
||||
PutByteBuffer(b)
|
||||
}
|
||||
if wantSz <= maxBufferSize {
|
||||
b := *internal32KByteBuffer.Get().(*[]byte)
|
||||
b := *internal32KByteBuffer.Get()
|
||||
if cap(b) >= wantSz {
|
||||
return b[:0]
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/hash/sha256"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
@@ -431,12 +431,12 @@ func recycleFunc[RT RoundTripper](newRT func() RT) (newFn func() RT, recycle fun
|
||||
}
|
||||
}
|
||||
}
|
||||
pool := sync.Pool{
|
||||
New: func() interface{} {
|
||||
pool := bpool.Pool[RT]{
|
||||
New: func() RT {
|
||||
return newRT()
|
||||
},
|
||||
}
|
||||
return func() RT { return pool.Get().(RT) },
|
||||
return pool.Get,
|
||||
func(r RT) {
|
||||
if r != rZero {
|
||||
//nolint:staticcheck // SA6002 IT IS A GENERIC VALUE!
|
||||
@@ -632,8 +632,8 @@ type StreamTypeHandler[Payload, Req, Resp RoundTripper] struct {
|
||||
// Will be 0 if newReq is nil.
|
||||
InCapacity int
|
||||
|
||||
reqPool sync.Pool
|
||||
respPool sync.Pool
|
||||
reqPool bpool.Pool[Req]
|
||||
respPool bpool.Pool[Resp]
|
||||
id HandlerID
|
||||
newPayload func() Payload
|
||||
nilReq Req
|
||||
@@ -653,13 +653,13 @@ func NewStream[Payload, Req, Resp RoundTripper](h HandlerID, newPayload func() P
|
||||
|
||||
s := newStreamHandler[Payload, Req, Resp](h)
|
||||
if newReq != nil {
|
||||
s.reqPool.New = func() interface{} {
|
||||
s.reqPool.New = func() Req {
|
||||
return newReq()
|
||||
}
|
||||
} else {
|
||||
s.InCapacity = 0
|
||||
}
|
||||
s.respPool.New = func() interface{} {
|
||||
s.respPool.New = func() Resp {
|
||||
return newResp()
|
||||
}
|
||||
s.newPayload = newPayload
|
||||
@@ -682,7 +682,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) NewPayload() Payload {
|
||||
// NewRequest creates a new request.
|
||||
// The struct may be reused, so caller should clear any fields.
|
||||
func (h *StreamTypeHandler[Payload, Req, Resp]) NewRequest() Req {
|
||||
return h.reqPool.Get().(Req)
|
||||
return h.reqPool.Get()
|
||||
}
|
||||
|
||||
// PutRequest will accept a request for reuse.
|
||||
@@ -706,7 +706,7 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) PutResponse(r Resp) {
|
||||
// NewResponse creates a new response.
|
||||
// Handlers can use this to create a reusable response.
|
||||
func (h *StreamTypeHandler[Payload, Req, Resp]) NewResponse() Resp {
|
||||
return h.respPool.Get().(Resp)
|
||||
return h.respPool.Get()
|
||||
}
|
||||
|
||||
func newStreamHandler[Payload, Req, Resp RoundTripper](h HandlerID) *StreamTypeHandler[Payload, Req, Resp] {
|
||||
|
||||
@@ -388,6 +388,5 @@ func (m *muxServer) close() {
|
||||
if m.outBlock != nil {
|
||||
xioutil.SafeClose(m.outBlock)
|
||||
m.outBlock = nil
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
)
|
||||
|
||||
@@ -53,7 +54,7 @@ func (m *MSS) Get(key string) string {
|
||||
// Set a key, value pair.
|
||||
func (m *MSS) Set(key, value string) {
|
||||
if m == nil {
|
||||
*m = mssPool.Get().(map[string]string)
|
||||
*m = mssPool.Get()
|
||||
}
|
||||
(*m)[key] = value
|
||||
}
|
||||
@@ -130,7 +131,7 @@ func (m *MSS) Msgsize() int {
|
||||
|
||||
// NewMSS returns a new MSS.
|
||||
func NewMSS() *MSS {
|
||||
m := MSS(mssPool.Get().(map[string]string))
|
||||
m := MSS(mssPool.Get())
|
||||
for k := range m {
|
||||
delete(m, k)
|
||||
}
|
||||
@@ -143,8 +144,8 @@ func NewMSSWith(m map[string]string) *MSS {
|
||||
return &m2
|
||||
}
|
||||
|
||||
var mssPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
var mssPool = bpool.Pool[map[string]string]{
|
||||
New: func() map[string]string {
|
||||
return make(map[string]string, 5)
|
||||
},
|
||||
}
|
||||
@@ -152,7 +153,7 @@ var mssPool = sync.Pool{
|
||||
// Recycle the underlying map.
|
||||
func (m *MSS) Recycle() {
|
||||
if m != nil && *m != nil {
|
||||
mssPool.Put(map[string]string(*m))
|
||||
mssPool.Put(*m)
|
||||
*m = nil
|
||||
}
|
||||
}
|
||||
@@ -279,15 +280,15 @@ func (b *Bytes) Recycle() {
|
||||
// URLValues can be used for url.Values.
|
||||
type URLValues map[string][]string
|
||||
|
||||
var urlValuesPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
var urlValuesPool = bpool.Pool[map[string][]string]{
|
||||
New: func() map[string][]string {
|
||||
return make(map[string][]string, 10)
|
||||
},
|
||||
}
|
||||
|
||||
// NewURLValues returns a new URLValues.
|
||||
func NewURLValues() *URLValues {
|
||||
u := URLValues(urlValuesPool.Get().(map[string][]string))
|
||||
u := URLValues(urlValuesPool.Get())
|
||||
return &u
|
||||
}
|
||||
|
||||
@@ -342,7 +343,7 @@ func (u *URLValues) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
return
|
||||
}
|
||||
if *u == nil {
|
||||
*u = urlValuesPool.Get().(map[string][]string)
|
||||
*u = urlValuesPool.Get()
|
||||
}
|
||||
if len(*u) > 0 {
|
||||
for key := range *u {
|
||||
@@ -424,9 +425,11 @@ func NewJSONPool[T any]() *JSONPool[T] {
|
||||
|
||||
func (p *JSONPool[T]) new() *T {
|
||||
var zero T
|
||||
t := p.pool.Get().(*T)
|
||||
*t = zero
|
||||
return t
|
||||
if t, ok := p.pool.Get().(*T); ok {
|
||||
*t = zero
|
||||
return t
|
||||
}
|
||||
return &zero
|
||||
}
|
||||
|
||||
// JSON is a wrapper around a T object that can be serialized.
|
||||
@@ -557,15 +560,15 @@ func (NoPayload) Recycle() {}
|
||||
|
||||
// ArrayOf wraps an array of Messagepack compatible objects.
|
||||
type ArrayOf[T RoundTripper] struct {
|
||||
aPool sync.Pool // Arrays
|
||||
ePool sync.Pool // Elements
|
||||
aPool sync.Pool // Arrays
|
||||
ePool bpool.Pool[T] // Elements
|
||||
}
|
||||
|
||||
// NewArrayOf returns a new ArrayOf.
|
||||
// You must provide a function that returns a new instance of T.
|
||||
func NewArrayOf[T RoundTripper](newFn func() T) *ArrayOf[T] {
|
||||
return &ArrayOf[T]{
|
||||
ePool: sync.Pool{New: func() any {
|
||||
ePool: bpool.Pool[T]{New: func() T {
|
||||
return newFn()
|
||||
}},
|
||||
}
|
||||
@@ -609,7 +612,7 @@ func (p *ArrayOf[T]) putA(v []T) {
|
||||
}
|
||||
|
||||
func (p *ArrayOf[T]) newE() T {
|
||||
return p.ePool.Get().(T)
|
||||
return p.ePool.Get()
|
||||
}
|
||||
|
||||
// Array provides a wrapper for an underlying array of serializable objects.
|
||||
|
||||
@@ -24,8 +24,9 @@ import (
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
)
|
||||
|
||||
const defaultFlushInterval = time.Duration(100) * time.Millisecond
|
||||
@@ -53,7 +54,7 @@ func NewForwarder(f *Forwarder) *Forwarder {
|
||||
|
||||
type bufPool struct {
|
||||
sz int
|
||||
pool sync.Pool
|
||||
pool bpool.Pool[*[]byte]
|
||||
}
|
||||
|
||||
func (b *bufPool) Put(buf []byte) {
|
||||
@@ -66,13 +67,16 @@ func (b *bufPool) Put(buf []byte) {
|
||||
}
|
||||
|
||||
func (b *bufPool) Get() []byte {
|
||||
bufp := b.pool.Get().(*[]byte)
|
||||
bufp := b.pool.Get()
|
||||
if bufp == nil || cap(*bufp) < b.sz {
|
||||
return make([]byte, 0, b.sz)
|
||||
}
|
||||
return (*bufp)[:b.sz]
|
||||
}
|
||||
|
||||
func newBufPool(sz int) httputil.BufferPool {
|
||||
return &bufPool{sz: sz, pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &bufPool{sz: sz, pool: bpool.Pool[*[]byte]{
|
||||
New: func() *[]byte {
|
||||
buf := make([]byte, sz)
|
||||
return &buf
|
||||
},
|
||||
|
||||
27
internal/http/flush.go
Normal file
27
internal/http/flush.go
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright (c) 2015-2025 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package http
|
||||
|
||||
import "net/http"
|
||||
|
||||
// Flush the ResponseWriter.
|
||||
func Flush(w http.ResponseWriter) {
|
||||
if f, ok := w.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
@@ -100,13 +100,15 @@ func (listener *httpListener) Addr() (addr net.Addr) {
|
||||
return addr
|
||||
}
|
||||
|
||||
tcpAddr := addr.(*net.TCPAddr)
|
||||
if ip := net.ParseIP("0.0.0.0"); ip != nil {
|
||||
tcpAddr.IP = ip
|
||||
}
|
||||
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
|
||||
if ip := net.ParseIP("0.0.0.0"); ip != nil {
|
||||
tcpAddr.IP = ip
|
||||
}
|
||||
|
||||
addr = tcpAddr
|
||||
return addr
|
||||
addr = tcpAddr
|
||||
return addr
|
||||
}
|
||||
panic("unknown address type on listener")
|
||||
}
|
||||
|
||||
// Addrs - returns all address information of TCP listeners.
|
||||
|
||||
@@ -25,10 +25,10 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/disk"
|
||||
)
|
||||
|
||||
@@ -39,28 +39,39 @@ const (
|
||||
LargeBlock = 1 * humanize.MiByte // Default r/w block size for normal objects.
|
||||
)
|
||||
|
||||
// AlignedBytePool is a pool of fixed size aligned blocks
|
||||
type AlignedBytePool struct {
|
||||
size int
|
||||
p bpool.Pool[*[]byte]
|
||||
}
|
||||
|
||||
// NewAlignedBytePool creates a new pool with the specified size.
|
||||
func NewAlignedBytePool(sz int) *AlignedBytePool {
|
||||
return &AlignedBytePool{size: sz, p: bpool.Pool[*[]byte]{New: func() *[]byte {
|
||||
b := disk.AlignedBlock(sz)
|
||||
return &b
|
||||
}}}
|
||||
}
|
||||
|
||||
// aligned sync.Pool's
|
||||
var (
|
||||
ODirectPoolLarge = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(LargeBlock)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
ODirectPoolMedium = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(MediumBlock)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
ODirectPoolSmall = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := disk.AlignedBlock(SmallBlock)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
ODirectPoolLarge = NewAlignedBytePool(LargeBlock)
|
||||
ODirectPoolMedium = NewAlignedBytePool(MediumBlock)
|
||||
ODirectPoolSmall = NewAlignedBytePool(SmallBlock)
|
||||
)
|
||||
|
||||
// Get a block.
|
||||
func (p *AlignedBytePool) Get() *[]byte {
|
||||
return p.p.Get()
|
||||
}
|
||||
|
||||
// Put a block.
|
||||
func (p *AlignedBytePool) Put(pb *[]byte) {
|
||||
if pb != nil && len(*pb) == p.size {
|
||||
p.p.Put(pb)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteOnCloser implements io.WriteCloser and always
|
||||
// executes at least one write operation if it is closed.
|
||||
//
|
||||
@@ -250,15 +261,6 @@ func NopCloser(w io.Writer) io.WriteCloser {
|
||||
return nopCloser{w}
|
||||
}
|
||||
|
||||
const copyBufferSize = 32 * 1024
|
||||
|
||||
var copyBufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := make([]byte, copyBufferSize)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
// SkipReader skips a given number of bytes and then returns all
|
||||
// remaining data.
|
||||
type SkipReader struct {
|
||||
@@ -274,12 +276,11 @@ func (s *SkipReader) Read(p []byte) (int, error) {
|
||||
}
|
||||
if s.skipCount > 0 {
|
||||
tmp := p
|
||||
if s.skipCount > l && l < copyBufferSize {
|
||||
if s.skipCount > l && l < SmallBlock {
|
||||
// We may get a very small buffer, so we grab a temporary buffer.
|
||||
bufp := copyBufPool.Get().(*[]byte)
|
||||
buf := *bufp
|
||||
tmp = buf[:copyBufferSize]
|
||||
defer copyBufPool.Put(bufp)
|
||||
bufp := ODirectPoolSmall.Get()
|
||||
tmp = *bufp
|
||||
defer ODirectPoolSmall.Put(bufp)
|
||||
l = int64(len(tmp))
|
||||
}
|
||||
for s.skipCount > 0 {
|
||||
@@ -309,7 +310,7 @@ type writerOnly struct {
|
||||
|
||||
// Copy is exactly like io.Copy but with reusable buffers.
|
||||
func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
|
||||
bufp := ODirectPoolMedium.Get().(*[]byte)
|
||||
bufp := ODirectPoolMedium.Get()
|
||||
defer ODirectPoolMedium.Put(bufp)
|
||||
buf := *bufp
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ func TestCloseOnWriter(t *testing.T) {
|
||||
|
||||
// Test for AppendFile.
|
||||
func TestAppendFile(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "")
|
||||
f, err := os.CreateTemp(t.TempDir(), "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func TestAppendFile(t *testing.T) {
|
||||
f.WriteString("aaaaaaaaaa")
|
||||
f.Close()
|
||||
|
||||
f, err = os.CreateTemp("", "")
|
||||
f, err = os.CreateTemp(t.TempDir(), "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -162,7 +162,7 @@ func TestSkipReader(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSameFile(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "")
|
||||
f, err := os.CreateTemp(t.TempDir(), "")
|
||||
if err != nil {
|
||||
t.Errorf("Error creating tmp file: %v", err)
|
||||
}
|
||||
@@ -193,7 +193,7 @@ func TestSameFile(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCopyAligned(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "")
|
||||
f, err := os.CreateTemp(t.TempDir(), "")
|
||||
if err != nil {
|
||||
t.Errorf("Error creating tmp file: %v", err)
|
||||
}
|
||||
@@ -202,7 +202,7 @@ func TestCopyAligned(t *testing.T) {
|
||||
|
||||
r := strings.NewReader("hello world")
|
||||
|
||||
bufp := ODirectPoolSmall.Get().(*[]byte)
|
||||
bufp := ODirectPoolSmall.Get()
|
||||
defer ODirectPoolSmall.Put(bufp)
|
||||
|
||||
written, err := CopyAligned(f, io.LimitReader(r, 5), *bufp, r.Size(), f)
|
||||
|
||||
@@ -30,13 +30,13 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/buger/jsonparser"
|
||||
"github.com/dustin/go-humanize"
|
||||
jwtgo "github.com/golang-jwt/jwt/v4"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
)
|
||||
|
||||
// SigningMethodHMAC - Implements the HMAC-SHA family of signing methods signing methods
|
||||
@@ -44,7 +44,7 @@ import (
|
||||
type SigningMethodHMAC struct {
|
||||
Name string
|
||||
Hash crypto.Hash
|
||||
HasherPool sync.Pool
|
||||
HasherPool bpool.Pool[hash.Hash]
|
||||
}
|
||||
|
||||
// Specific instances for HS256, HS384, HS512
|
||||
@@ -57,13 +57,13 @@ var (
|
||||
const base64BufferSize = 64 * humanize.KiByte
|
||||
|
||||
var (
|
||||
base64BufPool sync.Pool
|
||||
base64BufPool bpool.Pool[*[]byte]
|
||||
hmacSigners []*SigningMethodHMAC
|
||||
)
|
||||
|
||||
func init() {
|
||||
base64BufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
base64BufPool = bpool.Pool[*[]byte]{
|
||||
New: func() *[]byte {
|
||||
buf := make([]byte, base64BufferSize)
|
||||
return &buf
|
||||
},
|
||||
@@ -76,7 +76,7 @@ func init() {
|
||||
}
|
||||
for i := range hmacSigners {
|
||||
h := hmacSigners[i].Hash
|
||||
hmacSigners[i].HasherPool.New = func() interface{} {
|
||||
hmacSigners[i].HasherPool.New = func() hash.Hash {
|
||||
return h.New()
|
||||
}
|
||||
}
|
||||
@@ -89,13 +89,13 @@ func (s *SigningMethodHMAC) HashBorrower() HashBorrower {
|
||||
|
||||
// HashBorrower keeps track of borrowed hashers and allows to return them all.
|
||||
type HashBorrower struct {
|
||||
pool *sync.Pool
|
||||
pool *bpool.Pool[hash.Hash]
|
||||
borrowed []hash.Hash
|
||||
}
|
||||
|
||||
// Borrow a single hasher.
|
||||
func (h *HashBorrower) Borrow() hash.Hash {
|
||||
hasher := h.pool.Get().(hash.Hash)
|
||||
hasher := h.pool.Get()
|
||||
h.borrowed = append(h.borrowed, hasher)
|
||||
hasher.Reset()
|
||||
return hasher
|
||||
@@ -323,10 +323,10 @@ func ParseWithStandardClaims(tokenStr string, claims *StandardClaims, key []byte
|
||||
return jwtgo.NewValidationError("no key was provided.", jwtgo.ValidationErrorUnverifiable)
|
||||
}
|
||||
|
||||
bufp := base64BufPool.Get().(*[]byte)
|
||||
bufp := base64BufPool.Get()
|
||||
defer base64BufPool.Put(bufp)
|
||||
|
||||
tokenBuf := base64BufPool.Get().(*[]byte)
|
||||
tokenBuf := base64BufPool.Get()
|
||||
defer base64BufPool.Put(tokenBuf)
|
||||
|
||||
token := *tokenBuf
|
||||
@@ -419,10 +419,10 @@ func ParseWithClaims(tokenStr string, claims *MapClaims, fn func(*MapClaims) ([]
|
||||
return jwtgo.NewValidationError("no Keyfunc was provided.", jwtgo.ValidationErrorUnverifiable)
|
||||
}
|
||||
|
||||
bufp := base64BufPool.Get().(*[]byte)
|
||||
bufp := base64BufPool.Get()
|
||||
defer base64BufPool.Put(bufp)
|
||||
|
||||
tokenBuf := base64BufPool.Get().(*[]byte)
|
||||
tokenBuf := base64BufPool.Get()
|
||||
defer base64BufPool.Put(tokenBuf)
|
||||
|
||||
token := *tokenBuf
|
||||
|
||||
@@ -26,7 +26,7 @@ func TestIsPresent(t *testing.T) {
|
||||
for i, test := range isPresentTests {
|
||||
os.Clearenv()
|
||||
for k, v := range test.Env {
|
||||
os.Setenv(k, v)
|
||||
t.Setenv(k, v)
|
||||
}
|
||||
|
||||
ok, err := IsPresent()
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
|
||||
// Test lock fails.
|
||||
func TestLockFail(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "lock")
|
||||
f, err := os.CreateTemp(t.TempDir(), "lock")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -55,7 +55,7 @@ func TestLockDirFail(t *testing.T) {
|
||||
|
||||
// Tests rwlock methods.
|
||||
func TestRWLockedFile(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "lock")
|
||||
f, err := os.CreateTemp(t.TempDir(), "lock")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -118,7 +118,7 @@ func TestRWLockedFile(t *testing.T) {
|
||||
|
||||
// Tests lock and unlock semantics.
|
||||
func TestLockAndUnlock(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "lock")
|
||||
f, err := os.CreateTemp(t.TempDir(), "lock")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -370,7 +370,6 @@ func lookupLegacyConfigForSubSys(ctx context.Context, subSys string) Config {
|
||||
Endpoint: url,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
@@ -466,7 +466,6 @@ func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -538,9 +537,11 @@ func New(config Config) (*Target, error) {
|
||||
if h.config.Proxy != "" {
|
||||
proxyURL, _ := url.Parse(h.config.Proxy)
|
||||
transport := h.config.Transport
|
||||
ctransport := transport.(*http.Transport).Clone()
|
||||
ctransport.Proxy = http.ProxyURL(proxyURL)
|
||||
h.config.Transport = ctransport
|
||||
if tr, ok := transport.(*http.Transport); ok {
|
||||
ctransport := tr.Clone()
|
||||
ctransport.Proxy = http.ProxyURL(proxyURL)
|
||||
h.config.Transport = ctransport
|
||||
}
|
||||
}
|
||||
|
||||
h.client = &http.Client{Transport: h.config.Transport}
|
||||
|
||||
@@ -188,7 +188,6 @@ func (h *Target) startKafkaLogger() {
|
||||
// We are not allowed to add when logCh is nil
|
||||
h.wg.Add(1)
|
||||
defer h.wg.Done()
|
||||
|
||||
}
|
||||
h.logChMu.RUnlock()
|
||||
|
||||
|
||||
@@ -56,13 +56,13 @@ func IsLikelyMountPoint(path string) bool {
|
||||
}
|
||||
|
||||
// If the directory has a different device as parent, then it is a mountpoint.
|
||||
if s1.Sys().(*syscall.Stat_t).Dev != s2.Sys().(*syscall.Stat_t).Dev {
|
||||
// path/.. on a different device as path
|
||||
return true
|
||||
}
|
||||
|
||||
// path/.. is the same i-node as path - this check is for bind mounts.
|
||||
return s1.Sys().(*syscall.Stat_t).Ino == s2.Sys().(*syscall.Stat_t).Ino
|
||||
ss1, ok1 := s1.Sys().(*syscall.Stat_t)
|
||||
ss2, ok2 := s2.Sys().(*syscall.Stat_t)
|
||||
return ok1 && ok2 &&
|
||||
// path/.. on a different device as path
|
||||
(ss1.Dev != ss2.Dev ||
|
||||
// path/.. is the same i-node as path - this check is for bind mounts.
|
||||
ss1.Ino == ss2.Ino)
|
||||
}
|
||||
|
||||
// CheckCrossDevice - check if any list of paths has any sub-mounts at /proc/mounts.
|
||||
|
||||
@@ -40,7 +40,9 @@ var mountPointCache sync.Map
|
||||
func IsLikelyMountPoint(path string) bool {
|
||||
path = filepath.Dir(path)
|
||||
if v, ok := mountPointCache.Load(path); ok {
|
||||
return v.(bool)
|
||||
if b, ok := v.(bool); ok {
|
||||
return b
|
||||
}
|
||||
}
|
||||
wpath, _ := windows.UTF16PtrFromString(path)
|
||||
wvolume := make([]uint16, len(path)+1)
|
||||
|
||||
@@ -27,25 +27,26 @@ import (
|
||||
"unicode/utf8"
|
||||
|
||||
csv "github.com/minio/csvparser"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/s3select/sql"
|
||||
)
|
||||
|
||||
// Reader - CSV record reader for S3Select.
|
||||
type Reader struct {
|
||||
args *ReaderArgs
|
||||
readCloser io.ReadCloser // raw input
|
||||
buf *bufio.Reader // input to the splitter
|
||||
columnNames []string // names of columns
|
||||
nameIndexMap map[string]int64 // name to column index
|
||||
current [][]string // current block of results to be returned
|
||||
recordsRead int // number of records read in current slice
|
||||
input chan *queueItem // input for workers
|
||||
queue chan *queueItem // output from workers in order
|
||||
err error // global error state, only touched by Reader.Read
|
||||
bufferPool sync.Pool // pool of []byte objects for input
|
||||
csvDstPool sync.Pool // pool of [][]string used for output
|
||||
close chan struct{} // used for shutting down the splitter before end of stream
|
||||
readerWg sync.WaitGroup // used to keep track of async reader.
|
||||
readCloser io.ReadCloser // raw input
|
||||
buf *bufio.Reader // input to the splitter
|
||||
columnNames []string // names of columns
|
||||
nameIndexMap map[string]int64 // name to column index
|
||||
current [][]string // current block of results to be returned
|
||||
recordsRead int // number of records read in current slice
|
||||
input chan *queueItem // input for workers
|
||||
queue chan *queueItem // output from workers in order
|
||||
err error // global error state, only touched by Reader.Read
|
||||
bufferPool bpool.Pool[[]byte] // pool of []byte objects for input
|
||||
csvDstPool bpool.Pool[[][]string] // pool of [][]string used for output
|
||||
close chan struct{} // used for shutting down the splitter before end of stream
|
||||
readerWg sync.WaitGroup // used to keep track of async reader.
|
||||
}
|
||||
|
||||
// queueItem is an item in the queue.
|
||||
@@ -69,7 +70,7 @@ func (r *Reader) Read(dst sql.Record) (sql.Record, error) {
|
||||
r.err = io.EOF
|
||||
return nil, r.err
|
||||
}
|
||||
//nolint:staticcheck // SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
|
||||
|
||||
r.csvDstPool.Put(r.current)
|
||||
r.current = <-item.dst
|
||||
r.err = item.err
|
||||
@@ -182,12 +183,12 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error {
|
||||
}
|
||||
}
|
||||
|
||||
r.bufferPool.New = func() interface{} {
|
||||
r.bufferPool.New = func() []byte {
|
||||
return make([]byte, csvSplitSize+1024)
|
||||
}
|
||||
|
||||
// Return first block
|
||||
next, nextErr := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte))
|
||||
next, nextErr := r.nextSplit(csvSplitSize, r.bufferPool.Get())
|
||||
// Check if first block is valid.
|
||||
if !utf8.Valid(next) {
|
||||
return errInvalidTextEncodingError()
|
||||
@@ -224,7 +225,7 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error {
|
||||
// Exit on any error.
|
||||
return
|
||||
}
|
||||
next, nextErr = r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte))
|
||||
next, nextErr = r.nextSplit(csvSplitSize, r.bufferPool.Get())
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -236,8 +237,8 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error {
|
||||
in.dst <- nil
|
||||
continue
|
||||
}
|
||||
dst, ok := r.csvDstPool.Get().([][]string)
|
||||
if !ok {
|
||||
dst := r.csvDstPool.Get()
|
||||
if len(dst) < 1000 {
|
||||
dst = make([][]string, 0, 1000)
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/s3select/jstream"
|
||||
"github.com/minio/minio/internal/s3select/sql"
|
||||
)
|
||||
@@ -32,17 +33,17 @@ import (
|
||||
// Operates concurrently on line-delimited JSON.
|
||||
type PReader struct {
|
||||
args *ReaderArgs
|
||||
readCloser io.ReadCloser // raw input
|
||||
buf *bufio.Reader // input to the splitter
|
||||
current []jstream.KVS // current block of results to be returned
|
||||
recordsRead int // number of records read in current slice
|
||||
input chan *queueItem // input for workers
|
||||
queue chan *queueItem // output from workers in order
|
||||
err error // global error state, only touched by Reader.Read
|
||||
bufferPool sync.Pool // pool of []byte objects for input
|
||||
kvDstPool sync.Pool // pool of []jstream.KV used for output
|
||||
close chan struct{} // used for shutting down the splitter before end of stream
|
||||
readerWg sync.WaitGroup // used to keep track of async reader.
|
||||
readCloser io.ReadCloser // raw input
|
||||
buf *bufio.Reader // input to the splitter
|
||||
current []jstream.KVS // current block of results to be returned
|
||||
recordsRead int // number of records read in current slice
|
||||
input chan *queueItem // input for workers
|
||||
queue chan *queueItem // output from workers in order
|
||||
err error // global error state, only touched by Reader.Read
|
||||
bufferPool bpool.Pool[[]byte] // pool of []byte objects for input
|
||||
kvDstPool bpool.Pool[[]jstream.KVS] // pool of []jstream.KVS used for output
|
||||
close chan struct{} // used for shutting down the splitter before end of stream
|
||||
readerWg sync.WaitGroup // used to keep track of async reader.
|
||||
}
|
||||
|
||||
// queueItem is an item in the queue.
|
||||
@@ -66,7 +67,6 @@ func (r *PReader) Read(dst sql.Record) (sql.Record, error) {
|
||||
r.err = io.EOF
|
||||
return nil, r.err
|
||||
}
|
||||
//nolint:staticcheck // SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
|
||||
r.kvDstPool.Put(r.current)
|
||||
r.current = <-item.dst
|
||||
r.err = item.err
|
||||
@@ -133,7 +133,7 @@ const jsonSplitSize = 128 << 10
|
||||
// and a number of workers based on GOMAXPROCS.
|
||||
// If an error is returned no goroutines have been started and r.err will have been set.
|
||||
func (r *PReader) startReaders() {
|
||||
r.bufferPool.New = func() interface{} {
|
||||
r.bufferPool.New = func() []byte {
|
||||
return make([]byte, jsonSplitSize+1024)
|
||||
}
|
||||
|
||||
@@ -148,7 +148,7 @@ func (r *PReader) startReaders() {
|
||||
defer close(r.queue)
|
||||
defer r.readerWg.Done()
|
||||
for {
|
||||
next, err := r.nextSplit(jsonSplitSize, r.bufferPool.Get().([]byte))
|
||||
next, err := r.nextSplit(jsonSplitSize, r.bufferPool.Get())
|
||||
q := queueItem{
|
||||
input: next,
|
||||
dst: make(chan []jstream.KVS, 1),
|
||||
@@ -180,8 +180,8 @@ func (r *PReader) startReaders() {
|
||||
in.dst <- nil
|
||||
continue
|
||||
}
|
||||
dst, ok := r.kvDstPool.Get().([]jstream.KVS)
|
||||
if !ok {
|
||||
dst := r.kvDstPool.Get()
|
||||
if len(dst) < 1000 {
|
||||
dst = make([]jstream.KVS, 0, 1000)
|
||||
}
|
||||
|
||||
@@ -193,7 +193,7 @@ func (r *PReader) startReaders() {
|
||||
if mv.ValueType == jstream.Object {
|
||||
// This is a JSON object type (that preserves key
|
||||
// order)
|
||||
kvs = mv.Value.(jstream.KVS)
|
||||
kvs, _ = mv.Value.(jstream.KVS)
|
||||
} else {
|
||||
// To be AWS S3 compatible Select for JSON needs to
|
||||
// output non-object JSON as single column value
|
||||
|
||||
@@ -51,7 +51,7 @@ func (r *Reader) Read(dst sql.Record) (sql.Record, error) {
|
||||
if v.ValueType == jstream.Object {
|
||||
// This is a JSON object type (that preserves key
|
||||
// order)
|
||||
kvs = v.Value.(jstream.KVS)
|
||||
kvs, _ = v.Value.(jstream.KVS)
|
||||
} else {
|
||||
// To be AWS S3 compatible Select for JSON needs to
|
||||
// output non-object JSON as single column value
|
||||
|
||||
@@ -26,6 +26,8 @@ import (
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
xhttp "github.com/minio/minio/internal/http"
|
||||
)
|
||||
|
||||
// A message is in the format specified in
|
||||
@@ -262,7 +264,7 @@ func (writer *messageWriter) write(data []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
writer.writer.(http.Flusher).Flush()
|
||||
xhttp.Flush(writer.writer)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -56,7 +56,6 @@ func (pr *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
|
||||
|
||||
kvs := jstream.KVS{}
|
||||
for _, col := range pr.r.Columns() {
|
||||
|
||||
var value interface{}
|
||||
if v, ok := nextRow[col.FlatName()]; ok {
|
||||
value, err = convertFromAnnotation(col.Element(), v)
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"github.com/klauspost/compress/s2"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
"github.com/minio/minio/internal/bpool"
|
||||
"github.com/minio/minio/internal/config"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/s3select/csv"
|
||||
@@ -81,15 +82,15 @@ func init() {
|
||||
parquetSupport = env.Get("MINIO_API_SELECT_PARQUET", config.EnableOff) == config.EnableOn
|
||||
}
|
||||
|
||||
var bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
var bufPool = bpool.Pool[*bytes.Buffer]{
|
||||
New: func() *bytes.Buffer {
|
||||
// make a buffer with a reasonable capacity.
|
||||
return bytes.NewBuffer(make([]byte, 0, maxRecordSize))
|
||||
},
|
||||
}
|
||||
|
||||
var bufioWriterPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
var bufioWriterPool = bpool.Pool[*bufio.Writer]{
|
||||
New: func() *bufio.Writer {
|
||||
// io.Discard is just used to create the writer. Actual destination
|
||||
// writer is set later by Reset() before using it.
|
||||
return bufio.NewWriter(xioutil.Discard)
|
||||
@@ -468,7 +469,7 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
|
||||
switch s3Select.Output.format {
|
||||
case csvFormat:
|
||||
// Use bufio Writer to prevent csv.Writer from allocating a new buffer.
|
||||
bufioWriter := bufioWriterPool.Get().(*bufio.Writer)
|
||||
bufioWriter := bufioWriterPool.Get()
|
||||
defer func() {
|
||||
bufioWriter.Reset(xioutil.Discard)
|
||||
bufioWriterPool.Put(bufioWriter)
|
||||
@@ -530,7 +531,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
}
|
||||
var err error
|
||||
sendRecord := func() bool {
|
||||
buf := bufPool.Get().(*bytes.Buffer)
|
||||
buf := bufPool.Get()
|
||||
buf.Reset()
|
||||
|
||||
for _, outputRecord := range outputQueue {
|
||||
|
||||
@@ -107,7 +107,6 @@ func (e *FuncExpr) evalSQLFnNode(r Record, tableAlias string) (res *Value, err e
|
||||
|
||||
case sqlFnDateDiff:
|
||||
return handleDateDiff(r, e.DateDiff, tableAlias)
|
||||
|
||||
}
|
||||
|
||||
// For all simple argument functions, we evaluate the arguments here
|
||||
|
||||
@@ -107,7 +107,6 @@ func evalSQLLike(text, pattern string, escape rune) (match bool, err error) {
|
||||
default:
|
||||
s = append(s, r)
|
||||
}
|
||||
|
||||
}
|
||||
if hasLeadingPercent {
|
||||
return strings.HasSuffix(text, string(s)), nil
|
||||
|
||||
@@ -175,7 +175,6 @@ func dateDiff(timePart string, ts1, ts2 time.Time) (*Value, error) {
|
||||
seconds := duration / time.Second
|
||||
return FromInt(int64(seconds)), nil
|
||||
default:
|
||||
|
||||
}
|
||||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
@@ -663,8 +663,13 @@ func inferTypeForArithOp(a *Value) error {
|
||||
a.setFloat(f)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := fmt.Errorf("Could not convert %q to a number", string(a.value.([]byte)))
|
||||
var s string
|
||||
if v, ok := a.value.([]byte); ok {
|
||||
s = string(v)
|
||||
} else {
|
||||
s = fmt.Sprint(a.value)
|
||||
}
|
||||
err := fmt.Errorf("Could not convert %q to a number", s)
|
||||
return errInvalidDataType(err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user