mirror of
https://github.com/minio/minio.git
synced 2025-07-08 08:32:18 -04:00
Add trace sizes to more trace types (#19864)
Add trace sizes to * ILM traces * Replication traces * Healing traces * Decommission traces * Rebalance traces * (s)ftp traces * http traces.
This commit is contained in:
parent
3ba857dfa1
commit
0a63dc199c
@ -72,6 +72,7 @@ func NewLifecycleSys() *LifecycleSys {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event string) madmin.TraceInfo {
|
func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event string) madmin.TraceInfo {
|
||||||
|
sz, _ := oi.GetActualSize()
|
||||||
return madmin.TraceInfo{
|
return madmin.TraceInfo{
|
||||||
TraceType: madmin.TraceILM,
|
TraceType: madmin.TraceILM,
|
||||||
Time: startTime,
|
Time: startTime,
|
||||||
@ -79,6 +80,7 @@ func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event
|
|||||||
FuncName: event,
|
FuncName: event,
|
||||||
Duration: duration,
|
Duration: duration,
|
||||||
Path: pathJoin(oi.Bucket, oi.Name),
|
Path: pathJoin(oi.Bucket, oi.Name),
|
||||||
|
Bytes: sz,
|
||||||
Error: "",
|
Error: "",
|
||||||
Message: getSource(4),
|
Message: getSource(4),
|
||||||
Custom: map[string]string{"version-id": oi.VersionID},
|
Custom: map[string]string{"version-id": oi.VersionID},
|
||||||
|
@ -2849,17 +2849,19 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
|
|||||||
ReplicationProxyRequest: "false",
|
ReplicationProxyRequest: "false",
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
sz := roi.Size
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
|
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
|
||||||
st.ReplicatedCount++
|
st.ReplicatedCount++
|
||||||
} else {
|
} else {
|
||||||
st.FailedCount++
|
st.FailedCount++
|
||||||
}
|
}
|
||||||
|
sz = 0
|
||||||
} else {
|
} else {
|
||||||
st.ReplicatedCount++
|
st.ReplicatedCount++
|
||||||
st.ReplicatedSize += roi.Size
|
st.ReplicatedSize += roi.Size
|
||||||
}
|
}
|
||||||
traceFn(err)
|
traceFn(sz, err)
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
@ -2974,17 +2976,17 @@ func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opt
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *replicationResyncer) trace(resyncID string, path string) func(err error) {
|
func (s *replicationResyncer) trace(resyncID string, path string) func(sz int64, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
return func(err error) {
|
return func(sz int64, err error) {
|
||||||
duration := time.Since(startTime)
|
duration := time.Since(startTime)
|
||||||
if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 {
|
if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 {
|
||||||
globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err))
|
globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err, sz))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
|
func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error, sz int64) madmin.TraceInfo {
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errStr = err.Error()
|
errStr = err.Error()
|
||||||
@ -2998,6 +3000,7 @@ func replicationResyncTrace(resyncID string, startTime time.Time, duration time.
|
|||||||
Duration: duration,
|
Duration: duration,
|
||||||
Path: path,
|
Path: path,
|
||||||
Error: errStr,
|
Error: errStr,
|
||||||
|
Bytes: sz,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1065,6 +1065,7 @@ func healTrace(funcName healingMetric, startTime time.Time, bucket, object strin
|
|||||||
if result != nil {
|
if result != nil {
|
||||||
tr.Custom["version-id"] = result.VersionID
|
tr.Custom["version-id"] = result.VersionID
|
||||||
tr.Custom["disks"] = strconv.Itoa(result.DiskCount)
|
tr.Custom["disks"] = strconv.Itoa(result.DiskCount)
|
||||||
|
tr.Bytes = result.ObjectSize
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -836,7 +836,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
if filterLifecycle(bi.Name, version.Name, version) {
|
if filterLifecycle(bi.Name, version.Name, version) {
|
||||||
expired++
|
expired++
|
||||||
decommissioned++
|
decommissioned++
|
||||||
stopFn(errors.New("ILM expired object/version will be skipped"))
|
stopFn(version.Size, errors.New("ILM expired object/version will be skipped"))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -846,7 +846,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
remainingVersions := len(fivs.Versions) - expired
|
remainingVersions := len(fivs.Versions) - expired
|
||||||
if version.Deleted && remainingVersions == 1 {
|
if version.Deleted && remainingVersions == 1 {
|
||||||
decommissioned++
|
decommissioned++
|
||||||
stopFn(errors.New("DELETE marked object with no other non-current versions will be skipped"))
|
stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped"))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -877,7 +877,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stopFn(err)
|
stopFn(version.Size, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decomLogIf(ctx, err)
|
decomLogIf(ctx, err)
|
||||||
failure = true
|
failure = true
|
||||||
@ -902,12 +902,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
MTime: version.ModTime,
|
MTime: version.ModTime,
|
||||||
UserDefined: version.Metadata,
|
UserDefined: version.Metadata,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
stopFn(err)
|
stopFn(version.Size, err)
|
||||||
failure = true
|
failure = true
|
||||||
decomLogIf(ctx, err)
|
decomLogIf(ctx, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
stopFn(nil)
|
stopFn(version.Size, nil)
|
||||||
failure = false
|
failure = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -925,14 +925,14 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
// object deleted by the application, nothing to do here we move on.
|
// object deleted by the application, nothing to do here we move on.
|
||||||
ignore = true
|
ignore = true
|
||||||
stopFn(nil)
|
stopFn(version.Size, nil)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil && !ignore {
|
if err != nil && !ignore {
|
||||||
// if usage-cache.bin is not readable log and ignore it.
|
// if usage-cache.bin is not readable log and ignore it.
|
||||||
if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) {
|
if bi.Name == minioMetaBucket && strings.Contains(version.Name, dataUsageCacheName) {
|
||||||
ignore = true
|
ignore = true
|
||||||
stopFn(err)
|
stopFn(version.Size, err)
|
||||||
decomLogIf(ctx, err)
|
decomLogIf(ctx, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -940,16 +940,16 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
failure = true
|
failure = true
|
||||||
decomLogIf(ctx, err)
|
decomLogIf(ctx, err)
|
||||||
stopFn(err)
|
stopFn(version.Size, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil {
|
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil {
|
||||||
stopFn(err)
|
stopFn(version.Size, err)
|
||||||
failure = true
|
failure = true
|
||||||
decomLogIf(ctx, err)
|
decomLogIf(ctx, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
stopFn(nil)
|
stopFn(version.Size, nil)
|
||||||
failure = false
|
failure = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -977,7 +977,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
NoAuditLog: true,
|
NoAuditLog: true,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
stopFn(err)
|
stopFn(0, err)
|
||||||
auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err)
|
auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decomLogIf(ctx, err)
|
decomLogIf(ctx, err)
|
||||||
@ -1038,7 +1038,7 @@ const (
|
|||||||
decomMetricDecommissionRemoveObject
|
decomMetricDecommissionRemoveObject
|
||||||
)
|
)
|
||||||
|
|
||||||
func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo {
|
func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.Duration, path string, err error, sz int64) madmin.TraceInfo {
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errStr = err.Error()
|
errStr = err.Error()
|
||||||
@ -1051,15 +1051,16 @@ func decomTrace(d decomMetric, poolIdx int, startTime time.Time, duration time.D
|
|||||||
Duration: duration,
|
Duration: duration,
|
||||||
Path: path,
|
Path: path,
|
||||||
Error: errStr,
|
Error: errStr,
|
||||||
|
Bytes: sz,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(err error) {
|
func (m *decomMetrics) log(d decomMetric, poolIdx int, paths ...string) func(z int64, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
return func(err error) {
|
return func(sz int64, err error) {
|
||||||
duration := time.Since(startTime)
|
duration := time.Since(startTime)
|
||||||
if globalTrace.NumSubscribers(madmin.TraceDecommission) > 0 {
|
if globalTrace.NumSubscribers(madmin.TraceDecommission) > 0 {
|
||||||
globalTrace.Publish(decomTrace(d, poolIdx, startTime, duration, strings.Join(paths, " "), err))
|
globalTrace.Publish(decomTrace(d, poolIdx, startTime, duration, strings.Join(paths, " "), err, sz))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1092,10 +1093,10 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i
|
|||||||
}
|
}
|
||||||
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionBucket, idx, bucket.Name)
|
stopFn := globalDecommissionMetrics.log(decomMetricDecommissionBucket, idx, bucket.Name)
|
||||||
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
|
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
|
||||||
stopFn(err)
|
stopFn(0, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stopFn(nil)
|
stopFn(0, nil)
|
||||||
|
|
||||||
z.poolMetaMutex.Lock()
|
z.poolMetaMutex.Lock()
|
||||||
if z.poolMeta.BucketDone(idx, bucket) {
|
if z.poolMeta.BucketDone(idx, bucket) {
|
||||||
|
@ -427,7 +427,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
|
|||||||
|
|
||||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg)
|
||||||
err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats)
|
err := z.saveRebalanceStats(GlobalContext, poolIdx, rebalSaveStats)
|
||||||
stopFn(err)
|
stopFn(0, err)
|
||||||
rebalanceLogIf(GlobalContext, err)
|
rebalanceLogIf(GlobalContext, err)
|
||||||
|
|
||||||
if quit {
|
if quit {
|
||||||
@ -456,7 +456,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
|
|||||||
|
|
||||||
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket)
|
||||||
if err = z.rebalanceBucket(ctx, bucket, poolIdx); err != nil {
|
if err = z.rebalanceBucket(ctx, bucket, poolIdx); err != nil {
|
||||||
stopFn(err)
|
stopFn(0, err)
|
||||||
if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) {
|
if errors.Is(err, errServerNotInitialized) || errors.Is(err, errBucketMetadataNotInitialized) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -464,7 +464,7 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
|
|||||||
doneCh <- err
|
doneCh <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
stopFn(nil)
|
stopFn(0, nil)
|
||||||
z.bucketRebalanceDone(bucket, poolIdx)
|
z.bucketRebalanceDone(bucket, poolIdx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -692,24 +692,24 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
|||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
// object deleted by the application, nothing to do here we move on.
|
// object deleted by the application, nothing to do here we move on.
|
||||||
ignore = true
|
ignore = true
|
||||||
stopFn(nil)
|
stopFn(0, nil)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failure = true
|
failure = true
|
||||||
rebalanceLogIf(ctx, err)
|
rebalanceLogIf(ctx, err)
|
||||||
stopFn(err)
|
stopFn(0, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
|
if err = z.rebalanceObject(ctx, bucket, gr); err != nil {
|
||||||
failure = true
|
failure = true
|
||||||
rebalanceLogIf(ctx, err)
|
rebalanceLogIf(ctx, err)
|
||||||
stopFn(err)
|
stopFn(version.Size, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
stopFn(nil)
|
stopFn(version.Size, nil)
|
||||||
failure = false
|
failure = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -735,7 +735,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
|||||||
NoAuditLog: true,
|
NoAuditLog: true,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
stopFn(err)
|
stopFn(0, err)
|
||||||
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
|
auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rebalanceLogIf(ctx, err)
|
rebalanceLogIf(ctx, err)
|
||||||
@ -935,7 +935,7 @@ func (z *erasureServerPools) StartRebalance() {
|
|||||||
go func(idx int) {
|
go func(idx int) {
|
||||||
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
|
stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx)
|
||||||
err := z.rebalanceBuckets(ctx, idx)
|
err := z.rebalanceBuckets(ctx, idx)
|
||||||
stopfn(err)
|
stopfn(0, err)
|
||||||
}(poolIdx)
|
}(poolIdx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -975,7 +975,7 @@ const (
|
|||||||
rebalanceMetricSaveMetadata
|
rebalanceMetricSaveMetadata
|
||||||
)
|
)
|
||||||
|
|
||||||
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo {
|
func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string, sz int64) madmin.TraceInfo {
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errStr = err.Error()
|
errStr = err.Error()
|
||||||
@ -988,15 +988,16 @@ func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duratio
|
|||||||
Duration: duration,
|
Duration: duration,
|
||||||
Path: path,
|
Path: path,
|
||||||
Error: errStr,
|
Error: errStr,
|
||||||
|
Bytes: sz,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) {
|
func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(sz int64, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
return func(err error) {
|
return func(sz int64, err error) {
|
||||||
duration := time.Since(startTime)
|
duration := time.Since(startTime)
|
||||||
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
|
if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 {
|
||||||
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " ")))
|
globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " "), sz))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ type ftpMetrics struct{}
|
|||||||
|
|
||||||
var globalFtpMetrics ftpMetrics
|
var globalFtpMetrics ftpMetrics
|
||||||
|
|
||||||
func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err error) madmin.TraceInfo {
|
func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err error, sz int64) madmin.TraceInfo {
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errStr = err.Error()
|
errStr = err.Error()
|
||||||
@ -114,25 +114,33 @@ func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err e
|
|||||||
TraceType: madmin.TraceFTP,
|
TraceType: madmin.TraceFTP,
|
||||||
Time: startTime,
|
Time: startTime,
|
||||||
NodeName: globalLocalNodeName,
|
NodeName: globalLocalNodeName,
|
||||||
FuncName: fmt.Sprintf("ftp USER=%s COMMAND=%s PARAM=%s ISLOGIN=%t, Source=%s", s.Sess.LoginUser(), s.Cmd, s.Param, s.Sess.IsLogin(), source),
|
FuncName: fmt.Sprintf(s.Cmd),
|
||||||
Duration: time.Since(startTime),
|
Duration: time.Since(startTime),
|
||||||
Path: objPath,
|
Path: objPath,
|
||||||
Error: errStr,
|
Error: errStr,
|
||||||
|
Bytes: sz,
|
||||||
|
Custom: map[string]string{
|
||||||
|
"user": s.Sess.LoginUser(),
|
||||||
|
"cmd": s.Cmd,
|
||||||
|
"param": s.Param,
|
||||||
|
"login": fmt.Sprintf("%t", s.Sess.IsLogin()),
|
||||||
|
"source": source,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ftpMetrics) log(s *ftp.Context, paths ...string) func(err error) {
|
func (m *ftpMetrics) log(s *ftp.Context, paths ...string) func(sz int64, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
source := getSource(2)
|
source := getSource(2)
|
||||||
return func(err error) {
|
return func(sz int64, err error) {
|
||||||
globalTrace.Publish(ftpTrace(s, startTime, source, strings.Join(paths, " "), err))
|
globalTrace.Publish(ftpTrace(s, startTime, source, strings.Join(paths, " "), err, sz))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat implements ftpDriver
|
// Stat implements ftpDriver
|
||||||
func (driver *ftpDriver) Stat(ctx *ftp.Context, objPath string) (fi os.FileInfo, err error) {
|
func (driver *ftpDriver) Stat(ctx *ftp.Context, objPath string) (fi os.FileInfo, err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
if objPath == SlashSeparator {
|
if objPath == SlashSeparator {
|
||||||
return &minioFileInfo{
|
return &minioFileInfo{
|
||||||
@ -190,7 +198,7 @@ func (driver *ftpDriver) Stat(ctx *ftp.Context, objPath string) (fi os.FileInfo,
|
|||||||
// ListDir implements ftpDriver
|
// ListDir implements ftpDriver
|
||||||
func (driver *ftpDriver) ListDir(ctx *ftp.Context, objPath string, callback func(os.FileInfo) error) (err error) {
|
func (driver *ftpDriver) ListDir(ctx *ftp.Context, objPath string, callback func(os.FileInfo) error) (err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
clnt, err := driver.getMinIOClient(ctx)
|
clnt, err := driver.getMinIOClient(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -252,7 +260,7 @@ func (driver *ftpDriver) ListDir(ctx *ftp.Context, objPath string, callback func
|
|||||||
|
|
||||||
func (driver *ftpDriver) CheckPasswd(c *ftp.Context, username, password string) (ok bool, err error) {
|
func (driver *ftpDriver) CheckPasswd(c *ftp.Context, username, password string) (ok bool, err error) {
|
||||||
stopFn := globalFtpMetrics.log(c, username)
|
stopFn := globalFtpMetrics.log(c, username)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
if globalIAMSys.LDAPConfig.Enabled() {
|
if globalIAMSys.LDAPConfig.Enabled() {
|
||||||
sa, _, err := globalIAMSys.getServiceAccount(context.Background(), username)
|
sa, _, err := globalIAMSys.getServiceAccount(context.Background(), username)
|
||||||
@ -376,7 +384,7 @@ func (driver *ftpDriver) getMinIOClient(ctx *ftp.Context) (*minio.Client, error)
|
|||||||
// DeleteDir implements ftpDriver
|
// DeleteDir implements ftpDriver
|
||||||
func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, objPath string) (err error) {
|
func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, objPath string) (err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
bucket, prefix := path2BucketObject(objPath)
|
bucket, prefix := path2BucketObject(objPath)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
@ -426,7 +434,7 @@ func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, objPath string) (err error)
|
|||||||
// DeleteFile implements ftpDriver
|
// DeleteFile implements ftpDriver
|
||||||
func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, objPath string) (err error) {
|
func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, objPath string) (err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
bucket, object := path2BucketObject(objPath)
|
bucket, object := path2BucketObject(objPath)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
@ -444,7 +452,7 @@ func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, objPath string) (err error
|
|||||||
// Rename implements ftpDriver
|
// Rename implements ftpDriver
|
||||||
func (driver *ftpDriver) Rename(ctx *ftp.Context, fromObjPath string, toObjPath string) (err error) {
|
func (driver *ftpDriver) Rename(ctx *ftp.Context, fromObjPath string, toObjPath string) (err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, fromObjPath, toObjPath)
|
stopFn := globalFtpMetrics.log(ctx, fromObjPath, toObjPath)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
return NotImplemented{}
|
return NotImplemented{}
|
||||||
}
|
}
|
||||||
@ -452,7 +460,7 @@ func (driver *ftpDriver) Rename(ctx *ftp.Context, fromObjPath string, toObjPath
|
|||||||
// MakeDir implements ftpDriver
|
// MakeDir implements ftpDriver
|
||||||
func (driver *ftpDriver) MakeDir(ctx *ftp.Context, objPath string) (err error) {
|
func (driver *ftpDriver) MakeDir(ctx *ftp.Context, objPath string) (err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
bucket, prefix := path2BucketObject(objPath)
|
bucket, prefix := path2BucketObject(objPath)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
@ -479,7 +487,7 @@ func (driver *ftpDriver) MakeDir(ctx *ftp.Context, objPath string) (err error) {
|
|||||||
// GetFile implements ftpDriver
|
// GetFile implements ftpDriver
|
||||||
func (driver *ftpDriver) GetFile(ctx *ftp.Context, objPath string, offset int64) (n int64, rc io.ReadCloser, err error) {
|
func (driver *ftpDriver) GetFile(ctx *ftp.Context, objPath string, offset int64) (n int64, rc io.ReadCloser, err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(n, err)
|
||||||
|
|
||||||
bucket, object := path2BucketObject(objPath)
|
bucket, object := path2BucketObject(objPath)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
@ -511,14 +519,14 @@ func (driver *ftpDriver) GetFile(ctx *ftp.Context, objPath string, offset int64)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
n = info.Size - offset
|
||||||
return info.Size - offset, obj, nil
|
return n, obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutFile implements ftpDriver
|
// PutFile implements ftpDriver
|
||||||
func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reader, offset int64) (n int64, err error) {
|
func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reader, offset int64) (n int64, err error) {
|
||||||
stopFn := globalFtpMetrics.log(ctx, objPath)
|
stopFn := globalFtpMetrics.log(ctx, objPath)
|
||||||
defer stopFn(err)
|
defer stopFn(n, err)
|
||||||
|
|
||||||
bucket, object := path2BucketObject(objPath)
|
bucket, object := path2BucketObject(objPath)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
@ -539,5 +547,6 @@ func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reade
|
|||||||
ContentType: mimedb.TypeByExtension(path.Ext(object)),
|
ContentType: mimedb.TypeByExtension(path.Ext(object)),
|
||||||
DisableContentSha256: true,
|
DisableContentSha256: true,
|
||||||
})
|
})
|
||||||
return info.Size, err
|
n = info.Size
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
@ -142,6 +142,7 @@ func httpTracerMiddleware(h http.Handler) http.Handler {
|
|||||||
Time: reqStartTime,
|
Time: reqStartTime,
|
||||||
Duration: reqEndTime.Sub(respRecorder.StartTime),
|
Duration: reqEndTime.Sub(respRecorder.StartTime),
|
||||||
Path: reqPath,
|
Path: reqPath,
|
||||||
|
Bytes: int64(inputBytes + respRecorder.Size()),
|
||||||
HTTP: &madmin.TraceHTTPStats{
|
HTTP: &madmin.TraceHTTPStats{
|
||||||
ReqInfo: madmin.TraceRequestInfo{
|
ReqInfo: madmin.TraceRequestInfo{
|
||||||
Time: reqStartTime,
|
Time: reqStartTime,
|
||||||
|
@ -53,7 +53,7 @@ type sftpMetrics struct{}
|
|||||||
|
|
||||||
var globalSftpMetrics sftpMetrics
|
var globalSftpMetrics sftpMetrics
|
||||||
|
|
||||||
func sftpTrace(s *sftp.Request, startTime time.Time, source string, user string, err error) madmin.TraceInfo {
|
func sftpTrace(s *sftp.Request, startTime time.Time, source string, user string, err error, sz int64) madmin.TraceInfo {
|
||||||
var errStr string
|
var errStr string
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errStr = err.Error()
|
errStr = err.Error()
|
||||||
@ -62,18 +62,25 @@ func sftpTrace(s *sftp.Request, startTime time.Time, source string, user string,
|
|||||||
TraceType: madmin.TraceFTP,
|
TraceType: madmin.TraceFTP,
|
||||||
Time: startTime,
|
Time: startTime,
|
||||||
NodeName: globalLocalNodeName,
|
NodeName: globalLocalNodeName,
|
||||||
FuncName: fmt.Sprintf("sftp USER=%s COMMAND=%s PARAM=%s, Source=%s", user, s.Method, s.Filepath, source),
|
FuncName: s.Method,
|
||||||
Duration: time.Since(startTime),
|
Duration: time.Since(startTime),
|
||||||
Path: s.Filepath,
|
Path: s.Filepath,
|
||||||
Error: errStr,
|
Error: errStr,
|
||||||
|
Bytes: sz,
|
||||||
|
Custom: map[string]string{
|
||||||
|
"user": user,
|
||||||
|
"cmd": s.Method,
|
||||||
|
"param": s.Filepath,
|
||||||
|
"source": source,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *sftpMetrics) log(s *sftp.Request, user string) func(err error) {
|
func (m *sftpMetrics) log(s *sftp.Request, user string) func(sz int64, err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
source := getSource(2)
|
source := getSource(2)
|
||||||
return func(err error) {
|
return func(sz int64, err error) {
|
||||||
globalTrace.Publish(sftpTrace(s, startTime, source, user, err))
|
globalTrace.Publish(sftpTrace(s, startTime, source, user, err, sz))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,8 +201,9 @@ func (f *sftpDriver) AccessKey() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *sftpDriver) Fileread(r *sftp.Request) (ra io.ReaderAt, err error) {
|
func (f *sftpDriver) Fileread(r *sftp.Request) (ra io.ReaderAt, err error) {
|
||||||
|
// This is not timing the actual read operation, but the time it takes to prepare the reader.
|
||||||
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
flags := r.Pflags()
|
flags := r.Pflags()
|
||||||
if !flags.Read {
|
if !flags.Read {
|
||||||
@ -301,7 +309,12 @@ again:
|
|||||||
|
|
||||||
func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
||||||
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
||||||
defer stopFn(err)
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
// If there is an error, we never started the goroutine.
|
||||||
|
stopFn(0, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
flags := r.Pflags()
|
flags := r.Pflags()
|
||||||
if !flags.Write {
|
if !flags.Write {
|
||||||
@ -336,10 +349,11 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
|||||||
}
|
}
|
||||||
wa.wg.Add(1)
|
wa.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
_, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{
|
oi, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{
|
||||||
ContentType: mimedb.TypeByExtension(path.Ext(object)),
|
ContentType: mimedb.TypeByExtension(path.Ext(object)),
|
||||||
DisableContentSha256: true,
|
DisableContentSha256: true,
|
||||||
})
|
})
|
||||||
|
stopFn(oi.Size, err)
|
||||||
pr.CloseWithError(err)
|
pr.CloseWithError(err)
|
||||||
wa.wg.Done()
|
wa.wg.Done()
|
||||||
}()
|
}()
|
||||||
@ -348,7 +362,7 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
|
|||||||
|
|
||||||
func (f *sftpDriver) Filecmd(r *sftp.Request) (err error) {
|
func (f *sftpDriver) Filecmd(r *sftp.Request) (err error) {
|
||||||
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
clnt, err := f.getMinIOClient()
|
clnt, err := f.getMinIOClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -444,7 +458,7 @@ func (f listerAt) ListAt(ls []os.FileInfo, offset int64) (int, error) {
|
|||||||
|
|
||||||
func (f *sftpDriver) Filelist(r *sftp.Request) (la sftp.ListerAt, err error) {
|
func (f *sftpDriver) Filelist(r *sftp.Request) (la sftp.ListerAt, err error) {
|
||||||
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
stopFn := globalSftpMetrics.log(r, f.AccessKey())
|
||||||
defer stopFn(err)
|
defer stopFn(0, err)
|
||||||
|
|
||||||
clnt, err := f.getMinIOClient()
|
clnt, err := f.getMinIOClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user