mirror of
https://github.com/minio/minio.git
synced 2025-01-13 16:03:21 -05:00
76e2713ffe
Use separate sync.Pool for writes/reads Avoid passing buffers for io.CopyBuffer() if the writer or reader implement io.WriteTo or io.ReadFrom respectively then its useless for sync.Pool to allocate buffers on its own since that will be completely ignored by the io.CopyBuffer Go implementation. Improve this wherever we see this to be optimal. This allows us to be more efficient on memory usage. ``` 385 // copyBuffer is the actual implementation of Copy and CopyBuffer. 386 // if buf is nil, one is allocated. 387 func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) { 388 // If the reader has a WriteTo method, use it to do the copy. 389 // Avoids an allocation and a copy. 390 if wt, ok := src.(WriterTo); ok { 391 return wt.WriteTo(dst) 392 } 393 // Similarly, if the writer has a ReadFrom method, use it to do the copy. 394 if rt, ok := dst.(ReaderFrom); ok { 395 return rt.ReadFrom(src) 396 } ``` From readahead package ``` // WriteTo writes data to w until there's no more data to write or when an error occurs. // The return value n is the number of bytes written. // Any error encountered during the write is also returned. func (a *reader) WriteTo(w io.Writer) (n int64, err error) { if a.err != nil { return 0, a.err } n = 0 for { err = a.fill() if err != nil { return n, err } n2, err := w.Write(a.cur.buffer()) a.cur.inc(n2) n += int64(n2) if err != nil { return n, err } ```
159 lines
3.9 KiB
Go
159 lines
3.9 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/config/api"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/sys"
|
|
)
|
|
|
|
type apiConfig struct {
|
|
mu sync.RWMutex
|
|
|
|
requestsDeadline time.Duration
|
|
requestsPool chan struct{}
|
|
clusterDeadline time.Duration
|
|
listQuorum int
|
|
extendListLife time.Duration
|
|
corsAllowOrigins []string
|
|
setDriveCount int
|
|
}
|
|
|
|
func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
t.clusterDeadline = cfg.ClusterDeadline
|
|
t.corsAllowOrigins = cfg.CorsAllowOrigin
|
|
t.setDriveCount = setDriveCount
|
|
|
|
var apiRequestsMaxPerNode int
|
|
if cfg.RequestsMax <= 0 {
|
|
stats, err := sys.GetStats()
|
|
if err != nil {
|
|
logger.LogIf(GlobalContext, err)
|
|
// Default to 16 GiB, not critical.
|
|
stats.TotalRAM = 16 << 30
|
|
}
|
|
// max requests per node is calculated as
|
|
// total_ram / ram_per_request
|
|
// ram_per_request is 4MiB * setDriveCount + 2 * 10MiB (default erasure block size)
|
|
apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(setDriveCount*(writeBlockSize+readBlockSize)+blockSizeV1*2))
|
|
} else {
|
|
apiRequestsMaxPerNode = cfg.RequestsMax
|
|
if len(globalEndpoints.Hostnames()) > 0 {
|
|
apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames())
|
|
}
|
|
}
|
|
if cap(t.requestsPool) < apiRequestsMaxPerNode {
|
|
// Only replace if needed.
|
|
// Existing requests will use the previous limit,
|
|
// but new requests will use the new limit.
|
|
// There will be a short overlap window,
|
|
// but this shouldn't last long.
|
|
t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode)
|
|
}
|
|
t.requestsDeadline = cfg.RequestsDeadline
|
|
t.listQuorum = cfg.GetListQuorum()
|
|
t.extendListLife = cfg.ExtendListLife
|
|
}
|
|
|
|
func (t *apiConfig) getListQuorum() int {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
return t.listQuorum
|
|
}
|
|
|
|
func (t *apiConfig) getSetDriveCount() int {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
return t.setDriveCount
|
|
}
|
|
|
|
func (t *apiConfig) getExtendListLife() time.Duration {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
return t.extendListLife
|
|
}
|
|
|
|
func (t *apiConfig) getCorsAllowOrigins() []string {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
corsAllowOrigins := make([]string, len(t.corsAllowOrigins))
|
|
copy(corsAllowOrigins, t.corsAllowOrigins)
|
|
return corsAllowOrigins
|
|
}
|
|
|
|
func (t *apiConfig) getClusterDeadline() time.Duration {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
if t.clusterDeadline == 0 {
|
|
return 10 * time.Second
|
|
}
|
|
|
|
return t.clusterDeadline
|
|
}
|
|
|
|
func (t *apiConfig) getRequestsPool() (chan struct{}, time.Duration) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
if t.requestsPool == nil {
|
|
return nil, time.Duration(0)
|
|
}
|
|
|
|
return t.requestsPool, t.requestsDeadline
|
|
}
|
|
|
|
// maxClients throttles the S3 API calls
|
|
func maxClients(f http.HandlerFunc) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
pool, deadline := globalAPIConfig.getRequestsPool()
|
|
if pool == nil {
|
|
f.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
|
|
deadlineTimer := time.NewTimer(deadline)
|
|
defer deadlineTimer.Stop()
|
|
|
|
select {
|
|
case pool <- struct{}{}:
|
|
defer func() { <-pool }()
|
|
f.ServeHTTP(w, r)
|
|
case <-deadlineTimer.C:
|
|
// Send a http timeout message
|
|
writeErrorResponse(r.Context(), w,
|
|
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
|
r.URL, guessIsBrowserReq(r))
|
|
return
|
|
case <-r.Context().Done():
|
|
return
|
|
}
|
|
}
|
|
}
|