feat: implement support batch replication (#15554)

This commit is contained in:
Harshavardhana
2022-10-03 02:10:15 -07:00
parent 4d761fda81
commit 2a13cc28f2
23 changed files with 5617 additions and 172 deletions

View File

@@ -411,6 +411,7 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
}
}
}
jobID := r.Form.Get("by-jobID")
hosts := strings.Split(r.Form.Get("hosts"), ",")
byHost := strings.EqualFold(r.Form.Get("by-host"), "true")
@@ -432,12 +433,20 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
enc := json.NewEncoder(w)
for n > 0 {
var m madmin.RealtimeMetrics
mLocal := collectLocalMetrics(types, hostMap, diskMap)
mLocal := collectLocalMetrics(types, collectMetricsOpts{
hosts: hostMap,
disks: diskMap,
jobID: jobID,
})
m.Merge(&mLocal)
// Allow half the interval for collecting remote...
cctx, cancel := context.WithTimeout(ctx, interval/2)
mRemote := collectRemoteMetrics(cctx, types, hostMap, diskMap)
mRemote := collectRemoteMetrics(cctx, types, collectMetricsOpts{
hosts: hostMap,
disks: diskMap,
jobID: jobID,
})
cancel()
m.Merge(&mRemote)
if !byHost {
@@ -449,7 +458,7 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
m.Final = n <= 1
// Marshal API response
// Marshal API reesponse
if err := enc.Encode(&m); err != nil {
n = 0
}

View File

@@ -206,6 +206,16 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPost).Path(adminVersion+"/replication/diff").HandlerFunc(
gz(httpTraceHdrs(adminAPI.ReplicationDiffHandler))).Queries("bucket", "{bucket:.*}")
// Batch job operations
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/start-job").HandlerFunc(
gz(httpTraceHdrs(adminAPI.StartBatchJob)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/list-jobs").HandlerFunc(
gz(httpTraceHdrs(adminAPI.ListBatchJobs)))
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/describe-job").HandlerFunc(
gz(httpTraceHdrs(adminAPI.DescribeBatchJob)))
// Bucket migration operations
// ExportBucketMetaHandler
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/export-bucket-metadata").HandlerFunc(

View File

@@ -263,6 +263,7 @@ const (
ErrAdminNoSuchUser
ErrAdminNoSuchGroup
ErrAdminGroupNotEmpty
ErrAdminNoSuchJob
ErrAdminNoSuchPolicy
ErrAdminInvalidArgument
ErrAdminInvalidAccessKey
@@ -1226,6 +1227,11 @@ var errorCodes = errorCodeMap{
Description: "The specified group does not exist.",
HTTPStatusCode: http.StatusNotFound,
},
ErrAdminNoSuchJob: {
Code: "XMinioAdminNoSuchJob",
Description: "The specified job does not exist.",
HTTPStatusCode: http.StatusNotFound,
},
ErrAdminGroupNotEmpty: {
Code: "XMinioAdminGroupNotEmpty",
Description: "The specified group is not empty - cannot remove it.",
@@ -1923,6 +1929,8 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrAdminNoSuchGroup
case errGroupNotEmpty:
apiErr = ErrAdminGroupNotEmpty
case errNoSuchJob:
apiErr = ErrAdminNoSuchJob
case errNoSuchPolicy:
apiErr = ErrAdminNoSuchPolicy
case errSignatureMismatch:

File diff suppressed because one or more lines are too long

1079
cmd/batch-handlers.go Normal file

File diff suppressed because it is too large Load Diff

2876
cmd/batch-handlers_gen.go Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,23 @@
// Code generated by "stringer -type=batchReplicationMetric -trimprefix=batchReplicationMetric batch-handlers.go"; DO NOT EDIT.
package cmd
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[batchReplicationMetricObject-0]
}
const _batchReplicationMetric_name = "Object"
var _batchReplicationMetric_index = [...]uint8{0, 6}
func (i batchReplicationMetric) String() string {
if i >= batchReplicationMetric(len(_batchReplicationMetric_index)-1) {
return "batchReplicationMetric(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _batchReplicationMetric_name[_batchReplicationMetric_index[i]:_batchReplicationMetric_index[i+1]]
}

View File

@@ -1875,12 +1875,22 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
versionsSorter(fivs.Versions).reverse()
for _, version := range fivs.Versions {
send := true
if opts.WalkFilter != nil && !opts.WalkFilter(version) {
send = false
}
if !send {
continue
}
versioned := vcfg != nil && vcfg.Versioned(version.Name)
objInfo := version.ToObjectInfo(bucket, version.Name, versioned)
select {
case <-ctx.Done():
return
case results <- version.ToObjectInfo(bucket, version.Name, versioned):
case results <- objInfo:
}
}
}
@@ -1904,7 +1914,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
path: path,
filterPrefix: filterPrefix,
recursive: true,
forwardTo: "",
forwardTo: opts.WalkMarker,
minDisks: 1,
reportNotFound: false,
agreed: loadEntry,

View File

@@ -3050,12 +3050,22 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
versionsSorter(fivs.Versions).reverse()
for _, version := range fivs.Versions {
send := true
if opts.WalkFilter != nil && !opts.WalkFilter(version) {
send = false
}
if !send {
continue
}
versioned := vcfg != nil && vcfg.Versioned(version.Name)
objInfo := version.ToObjectInfo(bucket, version.Name, versioned)
select {
case <-ctx.Done():
return
case results <- version.ToObjectInfo(bucket, version.Name, versioned):
case results <- objInfo:
}
}
}
@@ -3079,7 +3089,7 @@ func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, result
path: path,
filterPrefix: filterPrefix,
recursive: true,
forwardTo: "",
forwardTo: opts.WalkMarker,
minDisks: 1,
reportNotFound: false,
agreed: loadEntry,

View File

@@ -25,23 +25,29 @@ import (
"github.com/minio/minio/internal/disk"
)
func collectLocalMetrics(types madmin.MetricType, hosts map[string]struct{}, disks map[string]struct{}) (m madmin.RealtimeMetrics) {
type collectMetricsOpts struct {
hosts map[string]struct{}
disks map[string]struct{}
jobID string
}
func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m madmin.RealtimeMetrics) {
if types == madmin.MetricsNone {
return
}
if len(hosts) > 0 {
if _, ok := hosts[globalMinioAddr]; !ok {
if len(opts.hosts) > 0 {
if _, ok := opts.hosts[globalMinioAddr]; !ok {
return
}
}
if types.Contains(madmin.MetricsDisk) && !globalIsGateway {
if types.Contains(madmin.MetricsDisk) {
m.ByDisk = make(map[string]madmin.DiskMetric)
aggr := madmin.DiskMetric{
CollectedAt: time.Now(),
}
for name, disk := range collectLocalDisksMetrics(disks) {
for name, disk := range collectLocalDisksMetrics(opts.disks) {
m.ByDisk[name] = disk
aggr.Merge(&disk)
}
@@ -56,6 +62,10 @@ func collectLocalMetrics(types madmin.MetricType, hosts map[string]struct{}, dis
metrics := globalOSMetrics.report()
m.Aggregated.OS = &metrics
}
if types.Contains(madmin.MetricsBatchJobs) {
m.Aggregated.BatchJobs = globalBatchJobsMetrics.report(opts.jobID)
}
// Add types...
// ByHost is a shallow reference, so careful about sharing.
@@ -143,11 +153,11 @@ func collectLocalDisksMetrics(disks map[string]struct{}) map[string]madmin.DiskM
return metrics
}
func collectRemoteMetrics(ctx context.Context, types madmin.MetricType, hosts map[string]struct{}, disks map[string]struct{}) (m madmin.RealtimeMetrics) {
func collectRemoteMetrics(ctx context.Context, types madmin.MetricType, opts collectMetricsOpts) (m madmin.RealtimeMetrics) {
if !globalIsDistErasure {
return
}
all := globalNotificationSys.GetMetrics(ctx, types, hosts, disks)
all := globalNotificationSys.GetMetrics(ctx, types, opts)
for _, remote := range all {
m.Merge(&remote)
}

View File

@@ -731,7 +731,7 @@ func (sys *NotificationSys) GetOSInfo(ctx context.Context) []madmin.OSInfo {
}
// GetMetrics - Get metrics from all peers.
func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType, hosts map[string]struct{}, disks map[string]struct{}) []madmin.RealtimeMetrics {
func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType, opts collectMetricsOpts) []madmin.RealtimeMetrics {
reply := make([]madmin.RealtimeMetrics, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
@@ -740,8 +740,8 @@ func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType,
continue
}
host := client.host.String()
if len(hosts) > 0 {
if _, ok := hosts[host]; !ok {
if len(opts.hosts) > 0 {
if _, ok := opts.hosts[host]; !ok {
continue
}
}
@@ -749,7 +749,7 @@ func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType,
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].GetMetrics(ctx, t, disks)
reply[index], err = sys.peerClients[index].GetMetrics(ctx, t, opts)
return err
}, index)
}

View File

@@ -85,6 +85,8 @@ type ObjectOptions struct {
// mainly set for certain WRITE operations.
SkipDecommissioned bool
WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false'
WalkMarker string // set to skip until this object
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
// IndexCB will return any index created but the compression.

View File

@@ -195,12 +195,13 @@ func (client *peerRESTClient) GetMemInfo(ctx context.Context) (info madmin.MemIn
}
// GetMetrics - fetch metrics from a remote node.
func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricType, diskMap map[string]struct{}) (info madmin.RealtimeMetrics, err error) {
func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricType, opts collectMetricsOpts) (info madmin.RealtimeMetrics, err error) {
values := make(url.Values)
values.Set(peerRESTTypes, strconv.FormatUint(uint64(t), 10))
for disk := range diskMap {
values.Set(peerRESTMetricsTypes, strconv.FormatUint(uint64(t), 10))
for disk := range opts.disks {
values.Set(peerRESTDisk, disk)
}
values.Set(peerRESTJobID, opts.jobID)
respBody, err := client.callWithContext(ctx, peerRESTMethodMetrics, values, nil, -1)
if err != nil {
return

View File

@@ -91,8 +91,9 @@ const (
peerRESTConcurrent = "concurrent"
peerRESTDuration = "duration"
peerRESTStorageClass = "storage-class"
peerRESTTypes = "types"
peerRESTMetricsTypes = "types"
peerRESTDisk = "disk"
peerRESTJobID = "job-id"
peerRESTListenBucket = "bucket"
peerRESTListenPrefix = "prefix"

View File

@@ -425,23 +425,25 @@ func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Reques
}
var types madmin.MetricType
if t, _ := strconv.ParseUint(r.Form.Get(peerRESTTypes), 10, 64); t != 0 {
if t, _ := strconv.ParseUint(r.Form.Get(peerRESTMetricsTypes), 10, 64); t != 0 {
types = madmin.MetricType(t)
} else {
types = madmin.MetricsAll
}
diskMap := make(map[string]struct{})
if r.Form != nil {
for _, disk := range r.Form[peerRESTDisk] {
diskMap[disk] = struct{}{}
}
for _, disk := range r.Form[peerRESTDisk] {
diskMap[disk] = struct{}{}
}
jobID := r.Form.Get(peerRESTJobID)
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := collectLocalMetrics(types, nil, diskMap)
info := collectLocalMetrics(types, collectMetricsOpts{
disks: diskMap,
jobID: jobID,
})
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
@@ -1308,7 +1310,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodProcInfo).HandlerFunc(httpTraceHdrs(server.GetProcInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMemInfo).HandlerFunc(httpTraceHdrs(server.GetMemInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMetrics).HandlerFunc(httpTraceHdrs(server.GetMetricsHandler)).Queries(restQueries(peerRESTTypes)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMetrics).HandlerFunc(httpTraceHdrs(server.GetMetricsHandler)).Queries(restQueries(peerRESTMetricsTypes)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSysErrors).HandlerFunc(httpTraceHdrs(server.GetSysErrorsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSysServices).HandlerFunc(httpTraceHdrs(server.GetSysServicesHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSysConfig).HandlerFunc(httpTraceHdrs(server.GetSysConfigHandler))

View File

@@ -601,6 +601,8 @@ func serverMain(ctx *cli.Context) {
initBackgroundReplication(GlobalContext, newObject)
initBackgroundTransition(GlobalContext, newObject)
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)
go func() {
err := globalTierConfigMgr.Init(GlobalContext, newObject)
if err != nil {

View File

@@ -45,8 +45,8 @@ func oldLinux() bool {
func setMaxResources() (err error) {
// Set the Go runtime max threads threshold to 90% of kernel setting.
sysMaxThreads, mErr := sys.GetMaxThreads()
if mErr == nil {
sysMaxThreads, err := sys.GetMaxThreads()
if err == nil {
minioMaxThreads := (sysMaxThreads * 90) / 100
// Only set max threads if it is greater than the default one
if minioMaxThreads > 10000 {