telegraf agent 源码分析
2021-11-22
telegraf
1 telegraf
telegraf 是开源agent服务,可收集系统和服务的metric数据. telegraf 从数据的collecting, processing, aggregating, and writing 都采用插件的设计,非常灵活,易扩展。 可以通过官方和自定义插件完成业务功能。
有四种不同类型的插件:
- Input Plugins collect metrics from the system, services, or 3rd party APIs
- Processor Plugins transform, decorate, and/or filter metrics
- Aggregator Plugins create aggregate metrics (e.g. mean, min, max, quantiles, etc.)
- Output Plugins write metrics to various destinations
telegraf 设计是非常优秀,通过插件系统,将业务逻辑,数据差异放到不同流程的插件中处理,流程之间通过channel进行数据交互,解耦非常彻底。 agent 只需要完成插件管理,数据流向,插件调度。
2 telegraf agent
2.1 agent 如何管理数据?
telegraf/agent/agent.go 做为agent 功能核心代码。 主要通过 Agent,inputUnit,processorUnit,aggregatorUnit,outputUnit结构体来完成agent 的功能。 建议看源码,注释写非常明确。
- agent 用来管理agent 配置,
- inputUnit 用来管理Input 插件
- processorUnit 用来管理Processor 插件
- aggregatorUnit 用来管理Aggregator 插件
- outputUnit 用来管理Output 插件
2.2 agent 如何约束插件?
2.2.1 agent 插件全局约束
全局约束是所有类型插件都适用的。
2.2.1.1 必须约束如下
- PluginDescriber 用来描述插件,SampleConfig()在./telegraf –sample-config 可以看到
// PluginDescriber 插件描述
type PluginDescriber interface {
// SampleConfig returns the default configuration of the Processor
SampleConfig() string
// Description returns a one-sentence description on the Processor
Description() string
}
2.2.1.2 可选约束
- Initializer 初始化插件的操作
// Initializer 插件初始化约束
type Initializer interface {
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
Init() error
}
2.3.1 agent 插件约束
实现自定义插件的时候,需要根据插件类型的约束做实现, 实现后还需要注册插件, 需要配合导入包的方式。具体可以看 telegraf/{inputs/outputs/processors/aggregators}/all/aal.go
2.3.1.1 Input 插件
telegraf/input.go
type Input interface {
PluginDescriber
// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every agent.interval
// 实现数据的收集功能,metric数据输入
Gather(Accumulator) error
}
2.3.1.2 Processor 插件
telegraf/processor.go
// Processor is a processor plugin interface for defining new inline processors.
// these are extremely efficient and should be used over StreamingProcessor if
// you do not need asynchronous metric writes.
type Processor interface {
PluginDescriber
// Apply the filter to the given metric.
Apply(in ...Metric) []Metric
}
2.3.1.3 aggregator 插件
telegraf/processor.go
type Aggregator interface {
PluginDescriber
// Add the metric to the aggregator.
Add(in Metric)
// Push pushes the current aggregates to the accumulator.
Push(acc Accumulator)
// Reset resets the aggregators caches and aggregates.
Reset()
}
2.3.1.4 Output 插件
telegraf/output.go
type Output interface {
PluginDescriber
// Connect to the Output; connect is only called once when the plugin starts
Connect() error
// Close any connections to the Output. Close is called once when the output
// is shutting down. Close will not be called until all writes have finished,
// and Write() will not be called once Close() has been, so locking is not
// necessary.
Close() error
// Write takes in group of points to be written to the Output
Write(metrics []Metric) error
}
2.3 agent 代码
telegraf/agent/agent.go 关于agent 的代码,需要注意插件启动顺序,插件的管理,采集异常处理。
2.3.1 agent Run
// Run starts and runs the Agent until the context is done.
func (a *Agent) Run(ctx context.Context) error {
// 执行插件的初始化,前提是插件实现telegraf.Initializer
a.initPlugins()
// 启动ouput,这里主要是执行(*output).connectOutput 方法,
// 设计来源类似linux将一切皆文件的思想,connectOutput类似的open。
next, ou, err := a.startOutputs(ctx, a.Config.Outputs)
// next chan telegraf.Metric 类型,用来不同流程插件间数据交互
var apu []*processorUnit
var au *aggregatorUnit
if len(a.Config.Aggregators) != 0 {
aggC := next
if len(a.Config.AggProcessors) != 0 {
// startProcessors 注意processors 多个插件数据的交互,多个插件之间是有序的数据交互
aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
// aggC 和next 一致, 注意AggProcessors与Processors 却别
}
next, au = a.startAggregators(aggC, next, a.Config.Aggregators)
}
var pu []*processorUnit
if len(a.Config.Processors) != 0 {
// startProcessors 注意processors 多个插件数据的交互,多个插件之间是有序的数据交互
next, pu, err = a.startProcessors(next, a.Config.Processors)
}
// 执行插件的初始化,前提是插件实现telegraf.ServiceInput
// 注意input插件的init 和start区别,
iu, err := a.startInputs(next, a.Config.Inputs)
a.runOutputs(ou)
a.runProcessors(apu)
a.runAggregators(startTime, au)
a.runProcessors(pu)
// runInputs 主要就是通过a.gatherLoop来实现定时收集数据
a.runInputs(ctx, startTime, iu)
}