log: Add logger.Event to send to console and other logger targets (#19060)

Add a new function logger.Event() to send the log to Console and
http/kafka log webhooks. This will include some internal events such as
disk healing and rebalance/decommissioning
This commit is contained in:
Anis Eleuch 2024-02-16 00:13:30 +01:00 committed by GitHub
parent f9dbf41e27
commit 68dde2359f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 71 additions and 39 deletions

View File

@ -713,7 +713,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
select {
case globalBackgroundHealRoutine.tasks <- task:
if serverDebugLog {
logger.Info("Task in the queue: %#v", task)
fmt.Printf("Task in the queue: %#v\n", task)
}
default:
// task queue is full, no more workers, we shall move on and heal later.
@ -730,7 +730,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
select {
case globalBackgroundHealRoutine.tasks <- task:
if serverDebugLog {
logger.Info("Task in the queue: %#v", task)
fmt.Printf("Task in the queue: %#v\n", task)
}
case <-h.ctx.Done():
return nil

View File

@ -891,7 +891,7 @@ func writeResponse(w http.ResponseWriter, statusCode int, response []byte, mType
}
// Similar check to http.checkWriteHeaderCode
if statusCode < 100 || statusCode > 999 {
logger.Error(fmt.Sprintf("invalid WriteHeader code %v", statusCode))
logger.LogIf(context.Background(), fmt.Errorf("invalid WriteHeader code %v", statusCode))
statusCode = http.StatusInternalServerError
}
setCommonHeaders(w)
@ -961,7 +961,7 @@ func writeErrorResponse(ctx context.Context, w http.ResponseWriter, err APIError
// Similar check to http.checkWriteHeaderCode
if err.HTTPStatusCode < 100 || err.HTTPStatusCode > 999 {
logger.Error(fmt.Sprintf("invalid WriteHeader code %v from %v", err.HTTPStatusCode, err.Code))
logger.LogIf(ctx, fmt.Errorf("invalid WriteHeader code %v from %v", err.HTTPStatusCode, err.Code))
err.HTTPStatusCode = http.StatusInternalServerError
}

View File

@ -420,7 +420,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
tracker = initHealingTracker(disk, mustGetUUID())
}
logger.Info(fmt.Sprintf("Healing drive '%s' - 'mc admin heal alias/ --verbose' to check the current status.", endpoint))
logger.Event(ctx, "Healing drive '%s' - 'mc admin heal alias/ --verbose' to check the current status.", endpoint)
buckets, _ := z.ListBuckets(ctx, BucketOptions{})
// Buckets data are dispersed in multiple pools/sets, make
@ -440,10 +440,6 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
return buckets[i].Created.After(buckets[j].Created)
})
if serverDebugLog {
logger.Info("Healing drive '%v' on %s pool, belonging to %s erasure set", disk, humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
}
// Load bucket totals
cache := dataUsageCache{}
if err := cache.load(ctx, z.serverPools[poolIdx].sets[setIdx], dataUsageCacheName); err == nil {
@ -464,9 +460,9 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
}
if tracker.ItemsFailed > 0 {
logger.Info("Healing of drive '%s' failed (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
logger.Event(ctx, "Healing of drive '%s' failed (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
} else {
logger.Info("Healing of drive '%s' complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
logger.Event(ctx, "Healing of drive '%s' complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
}
if len(tracker.QueuedBuckets) > 0 {
@ -475,7 +471,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
if serverDebugLog {
tracker.printTo(os.Stdout)
logger.Info("\n")
fmt.Printf("\n")
}
if tracker.HealID == "" { // HealID was empty only before Feb 2023

View File

@ -1167,7 +1167,7 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
z.poolMetaMutex.Unlock()
if !failed {
logger.Info("Decommissioning complete for pool '%s', verifying for any pending objects", poolCmdLine)
logger.Event(dctx, "Decommissioning complete for pool '%s', verifying for any pending objects", poolCmdLine)
err := z.checkAfterDecom(dctx, idx)
if err != nil {
logger.LogIf(ctx, err)

View File

@ -432,6 +432,8 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
}
}()
logger.Event(ctx, "Pool %d rebalancing is started", poolIdx+1)
for {
select {
case <-ctx.Done():
@ -456,6 +458,8 @@ func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int)
z.bucketRebalanceDone(bucket, poolIdx)
}
logger.Event(ctx, "Pool %d rebalancing is done", poolIdx+1)
return err
}

View File

@ -35,7 +35,7 @@ type minioLogger struct{}
// Print implement Logger
func (log *minioLogger) Print(sessionID string, message interface{}) {
if serverDebugLog {
logger.Info("%s %s", sessionID, message)
fmt.Printf("%s %s\n", sessionID, message)
}
}
@ -43,9 +43,9 @@ func (log *minioLogger) Print(sessionID string, message interface{}) {
func (log *minioLogger) Printf(sessionID string, format string, v ...interface{}) {
if serverDebugLog {
if sessionID != "" {
logger.Info("%s %s", sessionID, fmt.Sprintf(format, v...))
fmt.Printf("%s %s\n", sessionID, fmt.Sprintf(format, v...))
} else {
logger.Info(format, v...)
fmt.Printf(format+"\n", v...)
}
}
}
@ -54,9 +54,9 @@ func (log *minioLogger) Printf(sessionID string, format string, v ...interface{}
func (log *minioLogger) PrintCommand(sessionID string, command string, params string) {
if serverDebugLog {
if command == "PASS" {
logger.Info("%s > PASS ****", sessionID)
fmt.Printf("%s > PASS ****\n", sessionID)
} else {
logger.Info("%s > %s %s", sessionID, command, params)
fmt.Printf("%s > %s %s\n", sessionID, command, params)
}
}
}
@ -64,7 +64,7 @@ func (log *minioLogger) PrintCommand(sessionID string, command string, params st
// PrintResponse implement Logger
func (log *minioLogger) PrintResponse(sessionID string, code int, message string) {
if serverDebugLog {
logger.Info("%s < %d %s", sessionID, code, message)
fmt.Printf("%s < %d %s\n", sessionID, code, message)
}
}

View File

@ -177,7 +177,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
numHealers = uint64(v)
}
logger.Info(fmt.Sprintf("Healing drive '%s' - use %d parallel workers.", tracker.disk.String(), numHealers))
logger.Event(ctx, fmt.Sprintf("Healing drive '%s' - use %d parallel workers.", tracker.disk.String(), numHealers))
jt, _ := workers.New(int(numHealers))

View File

@ -478,7 +478,7 @@ func bootstrapTraceMsg(msg string) {
globalBootstrapTracer.Record(info)
if serverDebugLog {
logger.Info(fmt.Sprint(time.Now().Round(time.Millisecond).Format(time.RFC3339), " bootstrap: ", msg))
fmt.Println(time.Now().Round(time.Millisecond).Format(time.RFC3339), " bootstrap: ", msg)
}
noSubs := globalTrace.NumSubscribers(madmin.TraceBootstrap) == 0
@ -491,7 +491,7 @@ func bootstrapTraceMsg(msg string) {
func bootstrapTrace(msg string, worker func()) {
if serverDebugLog {
logger.Info(fmt.Sprint(time.Now().Round(time.Millisecond).Format(time.RFC3339), " bootstrap: ", msg))
fmt.Println(time.Now().Round(time.Millisecond).Format(time.RFC3339), " bootstrap: ", msg)
}
now := time.Now()
@ -1031,8 +1031,8 @@ func serverMain(ctx *cli.Context) {
globalMinioClient.SetAppInfo("minio-perf-test", ReleaseTag)
if serverDebugLog {
logger.Info("== DEBUG Mode enabled ==")
logger.Info("Currently set environment settings:")
fmt.Println("== DEBUG Mode enabled ==")
fmt.Println("Currently set environment settings:")
ks := []string{
config.EnvAccessKey,
config.EnvSecretKey,
@ -1044,9 +1044,9 @@ func serverMain(ctx *cli.Context) {
if slices.Contains(ks, strings.Split(v, "=")[0]) {
continue
}
logger.Info(v)
fmt.Println(v)
}
logger.Info("======")
fmt.Println("======")
}
daemon.SdNotify(false, daemon.SdNotifyReady)

View File

@ -912,7 +912,7 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string)
})
if err == nil {
logger.Info("node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String())
logger.Event(context.Background(), "node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String())
p.health.status.Store(diskHealthOK)
p.health.waiting.Add(-1)
return

View File

@ -258,6 +258,20 @@ func LogIfNot(ctx context.Context, err error, ignored ...error) {
}
func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entry {
var l string
if anonFlag {
l = reflect.TypeOf(err).String()
} else {
l = fmt.Sprintf("%v (%T)", err, err)
}
return buildLogEntry(ctx, l, getTrace(3), errKind...)
}
func logToEntry(ctx context.Context, message string, errKind ...interface{}) log.Entry {
return buildLogEntry(ctx, message, nil, errKind...)
}
func buildLogEntry(ctx context.Context, message string, trace []string, errKind ...interface{}) log.Entry {
logKind := madmin.LogKindError
if len(errKind) > 0 {
if ek, ok := errKind[0].(madmin.LogKind); ok {
@ -266,7 +280,6 @@ func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entr
}
req := GetReqInfo(ctx)
if req == nil {
req = &ReqInfo{
API: "SYSTEM",
@ -287,11 +300,7 @@ func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entr
tags[entry.Key] = entry.Val
}
// Get full stack trace
trace := getTrace(3)
// Get the cause for the Error
message := fmt.Sprintf("%v (%T)", err, err)
deploymentID := req.DeploymentID
if req.DeploymentID == "" {
deploymentID = xhttp.GlobalDeploymentID
@ -322,18 +331,22 @@ func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entr
Objects: objects,
},
},
Trace: &log.Trace{
}
if trace != nil {
entry.Trace = &log.Trace{
Message: message,
Source: trace,
Variables: tags,
},
}
} else {
entry.Message = message
}
if anonFlag {
entry.API.Args.Bucket = HashString(entry.API.Args.Bucket)
entry.API.Args.Object = HashString(entry.API.Args.Object)
entry.RemoteHost = HashString(entry.RemoteHost)
entry.Trace.Message = reflect.TypeOf(err).String()
entry.Trace.Variables = make(map[string]interface{})
}
@ -346,7 +359,6 @@ func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) {
if DisableErrorLog {
return
}
if consoleTgt != nil {
entry := errToEntry(ctx, err, errKind...)
consoleTgt.Send(ctx, entry)
@ -359,17 +371,23 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
if DisableErrorLog {
return
}
if err == nil {
return
}
entry := errToEntry(ctx, err, errKind...)
sendLog(ctx, entry)
}
func sendLog(ctx context.Context, entry log.Entry) {
systemTgts := SystemTargets()
if len(systemTgts) == 0 {
return
}
entry := errToEntry(ctx, err, errKind...)
// Iterate over all logger targets to send the log entry
for _, t := range systemTgts {
if err := t.Send(ctx, entry); err != nil {
if consoleTgt != nil {
if consoleTgt != nil { // Sending to the console never fails
entry.Trace.Message = fmt.Sprintf("event(%#v) was not sent to Logger target (%#v): %#v", entry, t, err)
consoleTgt.Send(ctx, entry)
}
@ -377,6 +395,15 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
}
}
// Event sends a event log to log targets
func Event(ctx context.Context, msg string, args ...interface{}) {
if DisableErrorLog {
return
}
entry := logToEntry(ctx, fmt.Sprintf(msg, args...), EventKind)
sendLog(ctx, entry)
}
// ErrCritical is the value panic'd whenever CriticalIf is called.
var ErrCritical struct{}

View File

@ -62,6 +62,11 @@ func (c *Target) Send(e interface{}) error {
return nil
}
if entry.Level == logger.EventKind {
fmt.Println(entry.Message)
return nil
}
traceLength := len(entry.Trace.Source)
trace := make([]string, traceLength)

View File

@ -469,7 +469,7 @@ func (c *Client) MarkOffline(err error) bool {
if atomic.CompareAndSwapInt32(&c.connected, offline, online) {
now := time.Now()
disconnected := now.Sub(c.LastConn())
logger.Info("Client '%s' re-connected in %s", c.url.String(), disconnected)
logger.Event(context.Background(), "Client '%s' re-connected in %s", c.url.String(), disconnected)
atomic.StoreInt64(&c.lastConn, now.UnixNano())
}
return