mirror of
https://github.com/minio/minio.git
synced 2025-01-26 06:03:17 -05:00
a3e806ed61
This PR adds disk based edge caching support for minio server. Cache settings can be configured in config.json to take list of disk drives, cache expiry in days and file patterns to exclude from cache or via environment variables MINIO_CACHE_DRIVES, MINIO_CACHE_EXCLUDE and MINIO_CACHE_EXPIRY Design assumes that Atime support is enabled and the list of cache drives is fixed. - Objects are cached on both GET and PUT/POST operations. - Expiry is used as hint to evict older entries from cache, or if 80% of cache capacity is filled. - When object storage backend is down, GET, LIST and HEAD operations fetch object seamlessly from cache. Current Limitations - Bucket policies are not cached, so anonymous operations are not supported in offline mode. - Objects are distributed using deterministic hashing among list of cache drives specified.If one or more drives go offline, or cache drive configuration is altered - performance could degrade to linear lookup. Fixes #4026
368 lines
9.3 KiB
Go
368 lines
9.3 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2015, 2016, 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
|
|
humanize "github.com/dustin/go-humanize"
|
|
"github.com/gorilla/mux"
|
|
"github.com/pkg/profile"
|
|
)
|
|
|
|
// Close Http tracing file.
|
|
func stopHTTPTrace() {
|
|
if globalHTTPTraceFile != nil {
|
|
errorIf(globalHTTPTraceFile.Close(), "Unable to close httpTraceFile %s", globalHTTPTraceFile.Name())
|
|
globalHTTPTraceFile = nil
|
|
}
|
|
}
|
|
|
|
// make a copy of http.Header
|
|
func cloneHeader(h http.Header) http.Header {
|
|
h2 := make(http.Header, len(h))
|
|
for k, vv := range h {
|
|
vv2 := make([]string, len(vv))
|
|
copy(vv2, vv)
|
|
h2[k] = vv2
|
|
|
|
}
|
|
return h2
|
|
}
|
|
|
|
// Convert url path into bucket and object name.
|
|
func urlPath2BucketObjectName(path string) (bucketName, objectName string) {
|
|
// Trim any preceding slash separator.
|
|
urlPath := strings.TrimPrefix(path, slashSeparator)
|
|
|
|
// Split urlpath using slash separator into a given number of
|
|
// expected tokens.
|
|
tokens := strings.SplitN(urlPath, slashSeparator, 2)
|
|
bucketName = tokens[0]
|
|
if len(tokens) == 2 {
|
|
objectName = tokens[1]
|
|
}
|
|
|
|
// Success.
|
|
return bucketName, objectName
|
|
}
|
|
|
|
// URI scheme constants.
|
|
const (
|
|
httpScheme = "http"
|
|
httpsScheme = "https"
|
|
)
|
|
|
|
// xmlDecoder provide decoded value in xml.
|
|
func xmlDecoder(body io.Reader, v interface{}, size int64) error {
|
|
var lbody io.Reader
|
|
if size > 0 {
|
|
lbody = io.LimitReader(body, size)
|
|
} else {
|
|
lbody = body
|
|
}
|
|
d := xml.NewDecoder(lbody)
|
|
return d.Decode(v)
|
|
}
|
|
|
|
// checkValidMD5 - verify if valid md5, returns md5 in bytes.
|
|
func checkValidMD5(h http.Header) ([]byte, error) {
|
|
md5B64, ok := h["Content-Md5"]
|
|
if ok {
|
|
if md5B64[0] == "" {
|
|
return nil, fmt.Errorf("Content-Md5 header set to empty value")
|
|
}
|
|
return base64.StdEncoding.DecodeString(md5B64[0])
|
|
}
|
|
return []byte{}, nil
|
|
}
|
|
|
|
/// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
|
|
const (
|
|
// Maximum object size per PUT request is 5TB.
|
|
// This is a divergence from S3 limit on purpose to support
|
|
// use cases where users are going to upload large files
|
|
// using 'curl' and presigned URL.
|
|
globalMaxObjectSize = 5 * humanize.TiByte
|
|
|
|
// Minimum Part size for multipart upload is 5MiB
|
|
globalMinPartSize = 5 * humanize.MiByte
|
|
|
|
// Maximum Part size for multipart upload is 5GiB
|
|
globalMaxPartSize = 5 * humanize.GiByte
|
|
|
|
// Maximum Part ID for multipart upload is 10000
|
|
// (Acceptable values range from 1 to 10000 inclusive)
|
|
globalMaxPartID = 10000
|
|
)
|
|
|
|
// isMaxObjectSize - verify if max object size
|
|
func isMaxObjectSize(size int64) bool {
|
|
return size > globalMaxObjectSize
|
|
}
|
|
|
|
// // Check if part size is more than maximum allowed size.
|
|
func isMaxAllowedPartSize(size int64) bool {
|
|
return size > globalMaxPartSize
|
|
}
|
|
|
|
// Check if part size is more than or equal to minimum allowed size.
|
|
func isMinAllowedPartSize(size int64) bool {
|
|
return size >= globalMinPartSize
|
|
}
|
|
|
|
// isMaxPartNumber - Check if part ID is greater than the maximum allowed ID.
|
|
func isMaxPartID(partID int) bool {
|
|
return partID > globalMaxPartID
|
|
}
|
|
|
|
func contains(slice interface{}, elem interface{}) bool {
|
|
v := reflect.ValueOf(slice)
|
|
if v.Kind() == reflect.Slice {
|
|
for i := 0; i < v.Len(); i++ {
|
|
if v.Index(i).Interface() == elem {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Starts a profiler returns nil if profiler is not enabled, caller needs to handle this.
|
|
func startProfiler(profiler string) interface {
|
|
Stop()
|
|
} {
|
|
// Enable profiler if ``_MINIO_PROFILER`` is set. Supported options are [cpu, mem, block].
|
|
switch profiler {
|
|
case "cpu":
|
|
return profile.Start(profile.CPUProfile, profile.NoShutdownHook)
|
|
case "mem":
|
|
return profile.Start(profile.MemProfile, profile.NoShutdownHook)
|
|
case "block":
|
|
return profile.Start(profile.BlockProfile, profile.NoShutdownHook)
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Global profiler to be used by service go-routine.
|
|
var globalProfiler interface {
|
|
Stop()
|
|
}
|
|
|
|
// dump the request into a string in JSON format.
|
|
func dumpRequest(r *http.Request) string {
|
|
header := cloneHeader(r.Header)
|
|
header.Set("Host", r.Host)
|
|
// Replace all '%' to '%%' so that printer format parser
|
|
// to ignore URL encoded values.
|
|
rawURI := strings.Replace(r.RequestURI, "%", "%%", -1)
|
|
req := struct {
|
|
Method string `json:"method"`
|
|
RequestURI string `json:"reqURI"`
|
|
Header http.Header `json:"header"`
|
|
}{r.Method, rawURI, header}
|
|
|
|
var buffer bytes.Buffer
|
|
enc := json.NewEncoder(&buffer)
|
|
enc.SetEscapeHTML(false)
|
|
if err := enc.Encode(&req); err != nil {
|
|
// Upon error just return Go-syntax representation of the value
|
|
return fmt.Sprintf("%#v", req)
|
|
}
|
|
|
|
// Formatted string.
|
|
return strings.TrimSpace(string(buffer.Bytes()))
|
|
}
|
|
|
|
// isFile - returns whether given path is a file or not.
|
|
func isFile(path string) bool {
|
|
if fi, err := os.Stat(path); err == nil {
|
|
return fi.Mode().IsRegular()
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// checkURL - checks if passed address correspond
|
|
func checkURL(urlStr string) (*url.URL, error) {
|
|
if urlStr == "" {
|
|
return nil, errors.New("Address cannot be empty")
|
|
}
|
|
u, err := url.Parse(urlStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("`%s` invalid: %s", urlStr, err.Error())
|
|
}
|
|
return u, nil
|
|
}
|
|
|
|
// UTCNow - returns current UTC time.
|
|
func UTCNow() time.Time {
|
|
return time.Now().UTC()
|
|
}
|
|
|
|
// GenETag - generate UUID based ETag
|
|
func GenETag() string {
|
|
return ToS3ETag(getMD5Hash([]byte(mustGetUUID())))
|
|
}
|
|
|
|
// ToS3ETag - return checksum to ETag
|
|
func ToS3ETag(etag string) string {
|
|
etag = canonicalizeETag(etag)
|
|
|
|
if !strings.HasSuffix(etag, "-1") {
|
|
// Tools like s3cmd uses ETag as checksum of data to validate.
|
|
// Append "-1" to indicate ETag is not a checksum.
|
|
etag += "-1"
|
|
}
|
|
|
|
return etag
|
|
}
|
|
|
|
// NewCustomHTTPTransport returns a new http configuration
|
|
// used while communicating with the cloud backends.
|
|
// This sets the value for MaxIdleConnsPerHost from 2 (go default)
|
|
// to 100.
|
|
func NewCustomHTTPTransport() http.RoundTripper {
|
|
return &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
DialContext: (&net.Dialer{
|
|
Timeout: 30 * time.Second,
|
|
KeepAlive: 30 * time.Second,
|
|
}).DialContext,
|
|
MaxIdleConns: 100,
|
|
MaxIdleConnsPerHost: 100,
|
|
IdleConnTimeout: 90 * time.Second,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
ExpectContinueTimeout: 1 * time.Second,
|
|
TLSClientConfig: &tls.Config{RootCAs: globalRootCAs},
|
|
DisableCompression: true,
|
|
}
|
|
}
|
|
|
|
// Load the json (typically from disk file).
|
|
func jsonLoad(r io.ReadSeeker, data interface{}) error {
|
|
if _, err := r.Seek(0, io.SeekStart); err != nil {
|
|
return err
|
|
}
|
|
return json.NewDecoder(r).Decode(data)
|
|
}
|
|
|
|
// Save to disk file in json format.
|
|
func jsonSave(f interface {
|
|
io.WriteSeeker
|
|
Truncate(int64) error
|
|
}, data interface{}) error {
|
|
b, err := json.Marshal(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = f.Truncate(0); err != nil {
|
|
return err
|
|
}
|
|
if _, err = f.Seek(0, io.SeekStart); err != nil {
|
|
return err
|
|
}
|
|
_, err = f.Write(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ceilFrac takes a numerator and denominator representing a fraction
|
|
// and returns its ceiling. If denominator is 0, it returns 0 instead
|
|
// of crashing.
|
|
func ceilFrac(numerator, denominator int64) (ceil int64) {
|
|
if denominator == 0 {
|
|
// do nothing on invalid input
|
|
return
|
|
}
|
|
// Make denominator positive
|
|
if denominator < 0 {
|
|
numerator = -numerator
|
|
denominator = -denominator
|
|
}
|
|
ceil = numerator / denominator
|
|
if numerator > 0 && numerator%denominator != 0 {
|
|
ceil++
|
|
}
|
|
return
|
|
}
|
|
|
|
// Returns context with ReqInfo details set in the context.
|
|
func newContext(r *http.Request, api string) context.Context {
|
|
vars := mux.Vars(r)
|
|
bucket := vars["bucket"]
|
|
object := vars["object"]
|
|
prefix := vars["prefix"]
|
|
|
|
if prefix != "" {
|
|
object = prefix
|
|
}
|
|
|
|
return logger.SetContext(context.Background(), &logger.ReqInfo{r.RemoteAddr, r.Header.Get("user-agent"), "", api, bucket, object, nil})
|
|
}
|
|
|
|
// isNetworkOrHostDown - if there was a network error or if the host is down.
|
|
func isNetworkOrHostDown(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
switch err.(type) {
|
|
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
|
|
return true
|
|
case *url.Error:
|
|
// For a URL error, where it replies back "connection closed"
|
|
if strings.Contains(err.Error(), "Connection closed by foreign host") {
|
|
return true
|
|
}
|
|
return true
|
|
default:
|
|
if strings.Contains(err.Error(), "net/http: TLS handshake timeout") {
|
|
// If error is - tlsHandshakeTimeoutError,.
|
|
return true
|
|
} else if strings.Contains(err.Error(), "i/o timeout") {
|
|
// If error is - tcp timeoutError.
|
|
return true
|
|
} else if strings.Contains(err.Error(), "connection timed out") {
|
|
// If err is a net.Dial timeout.
|
|
return true
|
|
} else if strings.Contains(err.Error(), "net/http: HTTP/1.x transport connection broken") {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|