Allow usage check to be configurable (#6006)

This commit is contained in:
Harshavardhana 2018-06-04 18:35:41 -07:00 committed by kannappanr
parent df1b33013f
commit 6fb0604502
12 changed files with 332 additions and 55 deletions

View File

@ -153,6 +153,15 @@ func handleCommonEnvVars() {
globalCacheExpiry = expiry
}
if intervalStr := os.Getenv("MINIO_USAGE_CHECK_INTERVAL"); intervalStr != "" {
interval, err := parseDuration(intervalStr)
if err != nil {
logger.Fatal(uiErrInvalidUsageCheckIntervalValue(err), "Invalid MINIO_USAGE_CHECK_INTERVAL value (`%s`)", intervalStr)
}
globalUsageCheckInterval = interval
globalIsEnvUsageCheck = true
}
// In place update is true by default if the MINIO_UPDATE is not set
// or is not set to 'off', if MINIO_UPDATE is set to 'off' then
// in-place update is off.

View File

@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
@ -39,9 +40,9 @@ import (
// 6. Make changes in config-current_test.go for any test change
// Config version
const serverConfigVersion = "23"
const serverConfigVersion = "24"
type serverConfig = serverConfigV23
type serverConfig = serverConfigV24
var (
// globalServerConfig server config.
@ -104,6 +105,17 @@ func (s *serverConfig) GetBrowser() bool {
return bool(s.Browser)
}
// Set new usage configuration, currently only supports configuring
// usage check interval.
func (s *serverConfig) SetUsageConfig(checkUsageInterval time.Duration) {
s.Usage = usageConfig{checkUsageInterval}
}
// Get current usage configuration.
func (s *serverConfig) GetUsageConfig() usageConfig {
return s.Usage
}
// SetCacheConfig sets the current cache config
func (s *serverConfig) SetCacheConfig(drives, exclude []string, expiry int) {
s.Cache.Drives = drives
@ -141,6 +153,8 @@ func (s *serverConfig) ConfigDiff(t *serverConfig) string {
return "StorageClass configuration differs"
case !reflect.DeepEqual(s.Cache, t.Cache):
return "Cache configuration differs"
case !reflect.DeepEqual(s.Usage, t.Usage):
return "Usage configuration differs"
case !reflect.DeepEqual(s.Notify.AMQP, t.Notify.AMQP):
return "AMQP Notification configuration differs"
case !reflect.DeepEqual(s.Notify.NATS, t.Notify.NATS):
@ -186,6 +200,7 @@ func newServerConfig() *serverConfig {
Exclude: []string{},
Expiry: globalCacheExpiry,
},
Usage: usageConfig{globalDefaultUsageCheckInterval},
Notify: notifier{},
}
@ -246,6 +261,10 @@ func newConfig() error {
srvCfg.SetCacheConfig(globalCacheDrives, globalCacheExcludes, globalCacheExpiry)
}
if globalIsEnvUsageCheck {
srvCfg.SetUsageConfig(globalUsageCheckInterval)
}
// hold the mutex lock before a new config is assigned.
// Save the new config globally.
// unlock the mutex.
@ -339,6 +358,9 @@ func loadConfig() error {
globalCacheExcludes = cacheConf.Exclude
globalCacheExpiry = cacheConf.Expiry
}
if !globalIsEnvUsageCheck {
globalUsageCheckInterval = globalServerConfig.GetUsageConfig().UsageCheckInterval
}
globalServerConfigMu.Unlock()
return nil

View File

@ -172,6 +172,11 @@ func migrateConfig() error {
return err
}
fallthrough
case "23":
if err = migrateV23ToV24(); err != nil {
return err
}
fallthrough
case serverConfigVersion:
// No migration needed. this always points to current version.
err = nil
@ -1951,3 +1956,120 @@ func migrateV22ToV23() error {
logger.Info(configMigrateMSGTemplate, configFile, cv22.Version, srvConfig.Version)
return nil
}
func migrateV23ToV24() error {
configFile := getConfigFile()
cv23 := &serverConfigV23{}
_, err := quick.Load(configFile, cv23)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return fmt.Errorf("Unable to load config version 23. %v", err)
}
if cv23.Version != "23" {
return nil
}
// Copy over fields from V23 into V24 config struct
srvConfig := &serverConfigV24{
Notify: notifier{},
}
srvConfig.Version = "24"
srvConfig.Credential = cv23.Credential
srvConfig.Region = cv23.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = globalMinioDefaultRegion
}
if len(cv23.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP = make(map[string]target.AMQPArgs)
srvConfig.Notify.AMQP["1"] = target.AMQPArgs{}
} else {
srvConfig.Notify.AMQP = cv23.Notify.AMQP
}
if len(cv23.Notify.Elasticsearch) == 0 {
srvConfig.Notify.Elasticsearch = make(map[string]target.ElasticsearchArgs)
srvConfig.Notify.Elasticsearch["1"] = target.ElasticsearchArgs{
Format: event.NamespaceFormat,
}
} else {
srvConfig.Notify.Elasticsearch = cv23.Notify.Elasticsearch
}
if len(cv23.Notify.Redis) == 0 {
srvConfig.Notify.Redis = make(map[string]target.RedisArgs)
srvConfig.Notify.Redis["1"] = target.RedisArgs{
Format: event.NamespaceFormat,
}
} else {
srvConfig.Notify.Redis = cv23.Notify.Redis
}
if len(cv23.Notify.PostgreSQL) == 0 {
srvConfig.Notify.PostgreSQL = make(map[string]target.PostgreSQLArgs)
srvConfig.Notify.PostgreSQL["1"] = target.PostgreSQLArgs{
Format: event.NamespaceFormat,
}
} else {
srvConfig.Notify.PostgreSQL = cv23.Notify.PostgreSQL
}
if len(cv23.Notify.Kafka) == 0 {
srvConfig.Notify.Kafka = make(map[string]target.KafkaArgs)
srvConfig.Notify.Kafka["1"] = target.KafkaArgs{}
} else {
srvConfig.Notify.Kafka = cv23.Notify.Kafka
}
if len(cv23.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]target.NATSArgs)
srvConfig.Notify.NATS["1"] = target.NATSArgs{}
} else {
srvConfig.Notify.NATS = cv23.Notify.NATS
}
if len(cv23.Notify.Webhook) == 0 {
srvConfig.Notify.Webhook = make(map[string]target.WebhookArgs)
srvConfig.Notify.Webhook["1"] = target.WebhookArgs{}
} else {
srvConfig.Notify.Webhook = cv23.Notify.Webhook
}
if len(cv23.Notify.MySQL) == 0 {
srvConfig.Notify.MySQL = make(map[string]target.MySQLArgs)
srvConfig.Notify.MySQL["1"] = target.MySQLArgs{
Format: event.NamespaceFormat,
}
} else {
srvConfig.Notify.MySQL = cv23.Notify.MySQL
}
if len(cv23.Notify.MQTT) == 0 {
srvConfig.Notify.MQTT = make(map[string]target.MQTTArgs)
srvConfig.Notify.MQTT["1"] = target.MQTTArgs{}
} else {
srvConfig.Notify.MQTT = cv23.Notify.MQTT
}
// Load browser config from existing config in the file.
srvConfig.Browser = cv23.Browser
// Load domain config from existing config in the file.
srvConfig.Domain = cv23.Domain
// Load storage class config from existing storage class config in the file.
srvConfig.StorageClass.RRS = cv23.StorageClass.RRS
srvConfig.StorageClass.Standard = cv23.StorageClass.Standard
// Load cache config from existing cache config in the file.
srvConfig.Cache.Drives = cv23.Cache.Drives
srvConfig.Cache.Exclude = cv23.Cache.Exclude
srvConfig.Cache.Expiry = cv23.Cache.Expiry
// Init usage config. For future migration, usage config needs
// to be copied over from previous version.
srvConfig.Usage = usageConfig{globalDefaultUsageCheckInterval}
if err = quick.Save(configFile, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %v", cv23.Version, srvConfig.Version, err)
}
logger.Info(configMigrateMSGTemplate, configFile, cv23.Version, srvConfig.Version)
return nil
}

View File

@ -602,3 +602,30 @@ type serverConfigV23 struct {
// Notification queue configuration.
Notify notifier `json:"notify"`
}
// serverConfigV24 is just like version '23' with addition of usage interval
// field.
//
// IMPORTANT NOTE: When updating this struct make sure that
// serverConfig.ConfigDiff() is updated as necessary.
type serverConfigV24 struct {
Version string `json:"version"`
// S3 API configuration.
Credential auth.Credentials `json:"credential"`
Region string `json:"region"`
Browser BrowserFlag `json:"browser"`
Domain string `json:"domain"`
// Storage class configuration
StorageClass storageClassConfig `json:"storageclass"`
// Cache configuration
Cache CacheConfig `json:"cache"`
// Usage configuration
Usage usageConfig `json:"usage"`
// Notification queue configuration.
Notify notifier `json:"notify"`
}

View File

@ -18,12 +18,55 @@ package cmd
import (
"context"
"encoding/json"
"fmt"
"time"
)
const (
usageCheckInterval = 12 * time.Hour // 12 hours
)
// Captures configurable parameters of usage check.
type usageConfig struct {
UsageCheckInterval time.Duration
}
// MarshalJSON - encodes to JSON data.
func (u usageConfig) MarshalJSON() ([]byte, error) {
type _usageConfig struct {
UsageCheckInterval string `json:"interval"`
}
return json.Marshal(_usageConfig{u.UsageCheckInterval.String()})
}
// parseDuration - parse duration string
func parseDuration(dStr string) (time.Duration, error) {
d, err := time.ParseDuration(dStr)
if err != nil {
return d, err
}
if d < globalMinimumUsageCheckInterval {
return d, fmt.Errorf("interval %s is not allowed, minimum required value is %s",
d, globalMinimumUsageCheckInterval)
}
return d, nil
}
// UnmarshalJSON - decodes JSON data.
func (u *usageConfig) UnmarshalJSON(data []byte) error {
type _usageConfig struct {
UsageCheckInterval string `json:"interval"`
}
var u1 = _usageConfig{}
if err := json.Unmarshal(data, &u1); err != nil {
return err
}
if !globalIsEnvUsageCheck {
d, err := parseDuration(u1.UsageCheckInterval)
if err != nil {
return err
}
u.UsageCheckInterval = d
}
return nil
}
// getDiskUsage walks the file tree rooted at root, calling usageFn
// for each file or directory in the tree, including root.

View File

@ -42,6 +42,11 @@ var defaultEtag = "00000000000000000000000000000000-1"
// FSObjects - Implements fs object layer.
type FSObjects struct {
// Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// Disk usage running routine
usageRunning int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// Path to be exported over S3 API.
fsPath string
// meta json filename, varies by fs / cache backend.
@ -64,10 +69,6 @@ type FSObjects struct {
// To manage the appendRoutine go-routines
nsMutex *nsLockMap
// Disk usage metrics
totalUsed uint64
usageCheckInterval time.Duration
}
// Represents the background append file.
@ -137,7 +138,6 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
nsMutex: newNSLock(false),
listPool: newTreeWalkPool(globalLookupTimeout),
appendFileMap: make(map[string]*fsAppendFile),
usageCheckInterval: usageCheckInterval,
}
// Once the filesystem has initialized hold the read lock for
@ -173,7 +173,7 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error {
// diskUsage returns du information for the posix path, in a continuous routine.
func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
ticker := time.NewTicker(fs.usageCheckInterval)
ticker := time.NewTicker(globalUsageCheckInterval)
defer ticker.Stop()
usageFn := func(ctx context.Context, entry string) error {
@ -191,6 +191,13 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
return nil
}
// Check if disk usage routine is running, if yes then return.
if atomic.LoadInt32(&fs.usageRunning) == 1 {
return
}
atomic.StoreInt32(&fs.usageRunning, 1)
defer atomic.StoreInt32(&fs.usageRunning, 0)
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
return
}
@ -200,6 +207,12 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
case <-doneCh:
return
case <-ticker.C:
// Check if disk usage routine is running, if yes let it finish.
if atomic.LoadInt32(&fs.usageRunning) == 1 {
continue
}
atomic.StoreInt32(&fs.usageRunning, 1)
var usage uint64
usageFn = func(ctx context.Context, entry string) error {
var fi os.FileInfo
@ -215,9 +228,13 @@ func (fs *FSObjects) diskUsage(doneCh chan struct{}) {
usage = usage + uint64(fi.Size())
return nil
}
if err := getDiskUsage(context.Background(), fs.fsPath, usageFn); err != nil {
atomic.StoreInt32(&fs.usageRunning, 0)
continue
}
atomic.StoreInt32(&fs.usageRunning, 0)
atomic.StoreUint64(&fs.totalUsed, usage)
}
}

View File

@ -19,7 +19,6 @@ package cmd
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"testing"
@ -191,16 +190,15 @@ func TestFSGetBucketInfo(t *testing.T) {
t.Fatal("BucketNotFound error not returned")
}
globalServiceDoneCh <- struct{}{}
// Check for buckets and should get disk not found.
fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
os.RemoveAll(disk)
_, err = fs.GetBucketInfo(context.Background(), bucketName)
if _, err = fs.GetBucketInfo(context.Background(), bucketName); err != nil {
if !isSameType(err, BucketNotFound{}) {
t.Fatal("BucketNotFound error not returned")
}
}
}
func TestFSPutObject(t *testing.T) {
// Prepare for tests
@ -303,10 +301,8 @@ func TestFSDeleteObject(t *testing.T) {
t.Fatal("Unexpected error: ", err)
}
globalServiceDoneCh <- struct{}{}
// Delete object should err disk not found.
fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
os.RemoveAll(disk)
if err := fs.DeleteObject(context.Background(), bucketName, objectName); err != nil {
if !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
@ -346,10 +342,8 @@ func TestFSDeleteBucket(t *testing.T) {
obj.MakeBucketWithLocation(context.Background(), bucketName, "")
globalServiceDoneCh <- struct{}{}
// Delete bucket should get error disk not found.
fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
os.RemoveAll(disk)
if err = fs.DeleteBucket(context.Background(), bucketName); err != nil {
if !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
@ -393,21 +387,12 @@ func TestFSListBuckets(t *testing.T) {
}
// Test ListBuckets with disk not found.
fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
os.RemoveAll(disk)
if _, err := fs.ListBuckets(context.Background()); err != nil {
if err != errDiskNotFound {
t.Fatal("Unexpected error: ", err)
}
}
longPath := fmt.Sprintf("%0256d", 1)
fs.fsPath = longPath
if _, err := fs.ListBuckets(context.Background()); err != nil {
if err != errFileNameTooLong {
t.Fatal("Unexpected error: ", err)
}
}
}
// TestFSHealObject - tests for fs HealObject

View File

@ -188,6 +188,14 @@ var (
globalCacheExpiry = 90
// Add new variable global values here.
// Minimum required usage check interval value.
globalMinimumUsageCheckInterval = 2 * time.Hour // 2 hours
// Default usage check interval value.
globalDefaultUsageCheckInterval = 12 * time.Hour // 12 hours
// Usage check interval value.
globalUsageCheckInterval = globalDefaultUsageCheckInterval
// Is env usage check interval set.
globalIsEnvUsageCheck bool
)
// global colors.

View File

@ -44,15 +44,18 @@ const (
// posix - implements StorageAPI interface.
type posix struct {
// Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// Disk usage running routine
usageRunning int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
ioErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
diskPath string
pool sync.Pool
connected bool
// Disk usage metrics
stopUsageCh chan struct{}
totalUsage uint64
usageCheckInterval time.Duration
}
// checkPathLength - returns error if given path name length more than 255
@ -165,12 +168,11 @@ func newPosix(path string) (StorageAPI, error) {
},
},
stopUsageCh: make(chan struct{}),
usageCheckInterval: usageCheckInterval,
}
st.connected = true
go st.diskUsage()
go st.diskUsage(globalServiceDoneCh)
// Success.
return st, nil
@ -281,7 +283,7 @@ func (s *posix) DiskInfo() (info DiskInfo, err error) {
return DiskInfo{
Total: di.Total,
Free: di.Free,
Used: atomic.LoadUint64(&s.totalUsage),
Used: atomic.LoadUint64(&s.totalUsed),
}, nil
}
@ -315,8 +317,8 @@ func (s *posix) checkDiskFound() (err error) {
}
// diskUsage returns du information for the posix path, in a continuous routine.
func (s *posix) diskUsage() {
ticker := time.NewTicker(s.usageCheckInterval)
func (s *posix) diskUsage(doneCh chan struct{}) {
ticker := time.NewTicker(globalUsageCheckInterval)
defer ticker.Stop()
usageFn := func(ctx context.Context, entry string) error {
@ -328,11 +330,18 @@ func (s *posix) diskUsage() {
if err != nil {
return err
}
atomic.AddUint64(&s.totalUsage, uint64(fi.Size()))
atomic.AddUint64(&s.totalUsed, uint64(fi.Size()))
return nil
}
}
// Check if disk usage routine is running, if yes then return.
if atomic.LoadInt32(&s.usageRunning) == 1 {
return
}
atomic.StoreInt32(&s.usageRunning, 1)
defer atomic.StoreInt32(&s.usageRunning, 0)
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil {
return
}
@ -341,9 +350,16 @@ func (s *posix) diskUsage() {
select {
case <-s.stopUsageCh:
return
case <-globalServiceDoneCh:
case <-doneCh:
return
case <-ticker.C:
// Check if disk usage routine is running, if yes let it
// finish, before starting a new one.
if atomic.LoadInt32(&s.usageRunning) == 1 {
continue
}
atomic.StoreInt32(&s.usageRunning, 1)
var usage uint64
usageFn = func(ctx context.Context, entry string) error {
select {
@ -358,10 +374,14 @@ func (s *posix) diskUsage() {
return nil
}
}
if err := getDiskUsage(context.Background(), s.diskPath, usageFn); err != nil {
atomic.StoreInt32(&s.usageRunning, 0)
continue
}
atomic.StoreUint64(&s.totalUsage, usage)
atomic.StoreInt32(&s.usageRunning, 0)
atomic.StoreUint64(&s.totalUsed, usage)
}
}
}

View File

@ -47,6 +47,15 @@ var (
"MINIO_CACHE_EXPIRY: Valid cache expiry duration is in days.",
)
uiErrInvalidUsageCheckIntervalValue = newUIErrFn(
"Invalid usage check interval value",
"Please check the passed value",
`MINIO_USAGE_CHECK_INTERVAL: Valid usage check interval duration string is a signed sequence of decimal numbers,
each with optional fraction and a unit suffix, such as "2h45m". Valid time units are "ns", "us", "ms", "s", "m", "h".
Minimum supported value is '2h'.
`,
)
uiErrInvalidCredentials = newUIErrFn(
"Invalid credentials",
"Please provide correct credentials",

View File

@ -99,6 +99,18 @@ By default, parity for objects with standard storage class is set to `N/2`, and
|``exclude`` | _[]string_ | List of wildcard patterns for prefixes to exclude from cache |
|``expiry`` | _int_ | Days to cache expiry |
### Usage
|Field|Type|Description|
|:---|:---|:---|
|``interval``| _string_ | Valid usage check interval duration string is a signed sequence of decimal numbers, each with optional fraction and a unit suffix, such as "2h45m". Valid time units are "ns", "us", "ms", "s", "m", "h". Minimum supported value is '2h'.|
Example: Run usage check every 4 hours 3 minutes 10 seconds.
```sh
export MINIO_USAGE_CHECK_INTERVAL="4h3m10s"
minio server /data
```
#### Notify
|Field|Type|Description|
|:---|:---|:---|

View File

@ -1,5 +1,5 @@
{
"version": "22",
"version": "24",
"credential": {
"accessKey": "USWUXHGYZQYFYFFIT3RE",
"secretKey": "MOJRH0mkL1IPauahWITSVvyDrQbEEIwljvmxdq03"
@ -16,6 +16,9 @@
"expiry": 90,
"exclude": []
},
"usage": {
"interval": "3h"
}
"notify": {
"amqp": {
"1": {