var ( handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler) )
// Register registers a problem daemon factory method, which will be used to create the problem daemon. func Register(problemDaemonType types.ProblemDaemonType, handler types.ProblemDaemonHandler) { handlers[problemDaemonType] = handler }
// GetProblemDaemonNames retrieves all available problem daemon types. func GetProblemDaemonNames() []types.ProblemDaemonType { problemDaemonTypes := []types.ProblemDaemonType{} for problemDaemonType := range handlers { problemDaemonTypes = append(problemDaemonTypes, problemDaemonType) } return problemDaemonTypes }
检索所有可用的问题守护进程类型。
返回值:返回一个包含所有注册的守护进程类型的切片。
实现细节:遍历 handlers 映射,将每种问题守护进程类型添加到返回的切片中。
2.3 GetProblemDaemonHandlerOrDie
pkg/problemdaemon/problem_daemon.go:46
1 2 3 4 5 6 7 8
// GetProblemDaemonHandlerOrDie retrieves the ProblemDaemonHandler for a specific type of problem daemon, panic if error occurs.. func GetProblemDaemonHandlerOrDie(problemDaemonType types.ProblemDaemonType) types.ProblemDaemonHandler { handler, ok := handlers[problemDaemonType] if !ok { panic(fmt.Sprintf("Problem daemon handler for %v does not exist", problemDaemonType)) } return handler }
// NewProblemDaemons creates all problem daemons based on the configurations provided. func NewProblemDaemons(monitorConfigPaths types.ProblemDaemonConfigPathMap) []types.Monitor { problemDaemonMap := make(map[string]types.Monitor) for problemDaemonType, configs := range monitorConfigPaths { for _, config := range *configs { if _, ok := problemDaemonMap[config]; ok { // Skip the config if it's duplicated. klog.Warningf("Duplicated problem daemon configuration %q", config) continue } problemDaemonMap[config] = handlers[problemDaemonType].CreateProblemDaemonOrDie(config) } }
problemDaemons := []types.Monitor{} for _, problemDaemon := range problemDaemonMap { problemDaemons = append(problemDaemons, problemDaemon) } return problemDaemons }
// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event. type ProblemDetector interface { Run(context.Context) error }
type problemDetector struct { monitors []types.Monitor exporters []types.Exporter }
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but // in the future we may want to let the problem daemons register themselves. func NewProblemDetector(monitors []types.Monitor, exporters []types.Exporter) ProblemDetector { return &problemDetector{ monitors: monitors, exporters: exporters, } }
// Run starts the problem detector. func (p *problemDetector) Run(ctx context.Context) error { // Start the log monitors one by one. var chans []<-chan *types.Status failureCount := 0 for _, m := range p.monitors { ch, err := m.Start() if err != nil { // Do not return error and keep on trying the following config files. klog.Errorf("Failed to start problem daemon %v: %v", m, err) failureCount++ continue } if ch != nil { chans = append(chans, ch) } } allMonitors := p.monitors
if len(allMonitors) == failureCount { return fmt.Errorf("no problem daemon is successfully setup") }
defer func() { for _, m := range allMonitors { m.Stop() } }()
for { select { case <-ctx.Done(): return nil case status := <-ch: for _, exporter := range p.exporters { exporter.ExportProblems(status) } } } }
执行问题检测器的主要运行逻辑。
实现细节:
开始监控器:遍历并启动每个监控器。
如果启动失败,记录错误并增加失败计数。
如果成功,则将返回的通道添加到 chans 切片。
检查失败情况:如果所有监控器都失败,则返回错误。
延迟关闭:在函数结束时停止所有监控器。
通道组合:调用 groupChannel 函数,用于将所有监控器的状态通道组合成单一通道。
监听状态:使用 select 不断监听上下文和监控器的状态。
如果收到状态,从所有导出器导出问题。
3.4 groupChannel
pkg/problemdetector/problem_detector.go:91
1 2 3 4 5 6 7 8 9 10 11
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status { statuses := make(chan *types.Status) for _, ch := range chans { go func(c <-chan *types.Status) { for status := range c { statuses <- status } }(ch) } return statuses }
// GlobalProblemMetricsManager is a singleton of ProblemMetricsManager, // which should be used to manage all problem-converted metrics across all // problem daemons. var GlobalProblemMetricsManager *ProblemMetricsManager
var err error pmm.problemCounter, err = metrics.NewInt64Metric( metrics.ProblemCounterID, string(metrics.ProblemCounterID), "Number of times a specific type of problem have occurred.", "1", metrics.Sum, []string{"reason"}) if err != nil { klog.Fatalf("Failed to create problem_counter metric: %v", err) }
pmm.problemGauge, err = metrics.NewInt64Metric( metrics.ProblemGaugeID, string(metrics.ProblemGaugeID), "Whether a specific type of problem is affecting the node or not.", "1", metrics.LastValue, []string{"type", "reason"}) if err != nil { klog.Fatalf("Failed to create problem_gauge metric: %v", err) }
pmm.problemTypeToReason = make(map[string]string)
return &pmm }
创建并初始化 ProblemMetricsManager 实例。
实现细节:
创建 problemCounter 指标,用于记录特定问题的发生次数,接收标签 reason。
创建 problemGauge 指标,用于指示特定问题是否正在影响节点,接收标签 type 和 reason。
如果创建指标失败,则程序会崩溃(Fatalf)。
初始化问题类型到原因的映射。
4.4 IncrementProblemCounter
pkg/problemmetrics/problem_metrics.go:79
1 2 3 4 5 6 7 8
// IncrementProblemCounter increments the value of a problem counter. func (pmm *ProblemMetricsManager) IncrementProblemCounter(reason string, count int64) error { if pmm.problemCounter == nil { return errors.New("problem counter is being incremented before initialized.") }
// SetProblemGauge sets the value of a problem gauge. func (pmm *ProblemMetricsManager) SetProblemGauge(problemType string, reason string, value bool) error { if pmm.problemGauge == nil { return errors.New("problem gauge is being set before initialized.") }
// We clear the last reason, because the expected behavior is that at any point of time, // for each type of permanent problem, there should be at most one reason got set to 1. // This behavior is consistent with the behavior of node condition in Kubernetes. // However, problemGauges with different "type" and "reason" are considered as different // metrics in Prometheus. So we need to clear the previous metrics explicitly. if lastReason, ok := pmm.problemTypeToReason[problemType]; ok { err := pmm.problemGauge.Record(map[string]string{"type": problemType, "reason": lastReason}, 0) if err != nil { return fmt.Errorf("failed to clear previous reason %q for type %q: %v", problemType, lastReason, err) } }
pmm.problemTypeToReason[problemType] = reason
var valueInt int64 if value { valueInt = 1 } return pmm.problemGauge.Record(map[string]string{"type": problemType, "reason": reason}, valueInt) }
type healthChecker struct { component string service string enableRepair bool healthCheckFunc func () (bool, error) // The repair is "best-effort" and ignores the error from the underlying actions. // The bash commands to kill the process will fail if the service is down and hence ignore. repairFunc func () uptimeFunc func () (time.Duration, error) crictlPath string healthCheckTimeout time.Duration coolDownTime time.Duration loopBackTime time.Duration logPatternsToCheck map[string]int }
// CheckHealth checks for the health of the component and tries to repair if enabled. // Returns true if healthy, false otherwise. func (hc *healthChecker) CheckHealth() (bool, error) { healthy, err := hc.healthCheckFunc() if err != nil { return healthy, err } logPatternHealthy, err := logPatternHealthCheck(hc.service, hc.loopBackTime, hc.logPatternsToCheck) if err != nil { return logPatternHealthy, err } if healthy && logPatternHealthy { return true, nil }
// The service is unhealthy. // Attempt repair based on flag. if hc.enableRepair { // repair if the service has been up for the cool down period. uptime, err := hc.uptimeFunc() if err != nil { klog.Infof("error in getting uptime for %v: %v\n", hc.component, err) return false, nil } klog.Infof("%v is unhealthy, component uptime: %v\n", hc.component, uptime) if uptime > hc.coolDownTime { klog.Infof("%v cooldown period of %v exceeded, repairing", hc.component, hc.coolDownTime) hc.repairFunc() } } return false, nil }
// logPatternHealthCheck checks for the provided logPattern occurrences in the service logs. // Returns true if the pattern is empty or does not exist logThresholdCount times since start of service, false otherwise. func logPatternHealthCheck(service string, loopBackTime time.Duration, logPatternsToCheck map[string]int) (bool, error) { if len(logPatternsToCheck) == 0 { return true, nil } uptimeFunc := getUptimeFunc(service) klog.Infof("Getting uptime for service: %v\n", service) uptime, err := uptimeFunc() if err != nil { klog.Warningf("Failed to get the uptime: %+v", err) return true, err }
// getUptimeFunc returns the time for which the given service has been running. func getUptimeFunc(service string) func() (time.Duration, error) { return func() (time.Duration, error) { // Using InactiveExitTimestamp to capture the exact time when systemd tried starting the service. The service will // transition from inactive -> activating and the timestamp is captured. // Source : https://www.freedesktop.org/wiki/Software/systemd/dbus/ // Using ActiveEnterTimestamp resulted in race condition where the service was repeatedly killed by plugin when // RestartSec of systemd and invoke interval of plugin got in sync. The service was repeatedly killed in // activating state and hence ActiveEnterTimestamp was never updated. out, err := execCommand(types.CmdTimeout, "systemctl", "show", service, "--property=InactiveExitTimestamp")
if err != nil { return time.Duration(0), err } val := strings.Split(out, "=") if len(val) < 2 { return time.Duration(0), errors.New("could not parse the service uptime time correctly") } t, err := time.Parse(types.UptimeTimeLayout, val[1]) if err != nil { return time.Duration(0), err } return time.Since(t), nil } }
返回一个函数,计算指定服务的运行时间。
参数:
service string - 被监控服务的名称。
返回值:返回一个函数,该函数返回服务运行的持续时间或错误。
实现细节
使用 systemctl show 命令获取 InactiveExitTimestamp,这是服务从 inactive 状态转换时的时间戳。
// getRepairFunc returns the repair function based on the component. func getRepairFunc(hco *options.HealthCheckerOptions) func() { // Use `systemctl kill` instead of `systemctl restart` for the repair function. // We start to rely on the kernel message difference for the two commands to // indicate if the component restart is due to an administrative plan (restart) // or a system issue that needs repair (kill). // See https://github.com/kubernetes/node-problem-detector/issues/847. switch hco.Component { case types.DockerComponent: // Use "docker ps" for docker health check. Not using crictl for docker to remove // dependency on the kubelet. return func() { execCommand(types.CmdTimeout, "pkill", "-SIGUSR1", "dockerd") execCommand(types.CmdTimeout, "systemctl", "kill", "--kill-who=main", hco.Service) } default: // Just kill the service for all other components return func() { execCommand(types.CmdTimeout, "systemctl", "kill", "--kill-who=main", hco.Service) } } }