程序印象

Prometheus Discovery 之 K8S 代码分析

2018/09/13 Share

Service Discovery interface

Service Discovery 必须实现 Discovery 接口,定义如下:

1
2
3
type Discoverer interface {
Run(ctx context.Context, ch chan<- []*targetgroup.Group)
}

Prometheus 支持了众多的 SD 发现机制,代码位于 discovery 目录下。

Group 的结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
// Group is a set of targets with a common label set(production , test, staging etc.).
type Group struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Targets []model.LabelSet
// Labels is a set of labels that is common across all targets in the group.
Labels model.LabelSet

// Source is an identifier that describes a group of targets.
Source string
}

K8S 的 Discoverer 定义在文件 gprometheus/discovery/kubernetes/kubernetes.go 中。

在 init 函数中注册了metrics prometheus_sd_kubernetes_events_total,用于分析发现过程中的事件接受数量:

1
2
3
4
5
6
7
8
9
10
func init() {
prometheus.MustRegister(eventCount)

// Initialize metric vectors.
for _, role := range []string{"endpoints", "node", "pod", "service"} {
for _, evt := range []string{"add", "delete", "update"} {
eventCount.WithLabelValues(role, evt)
}
}
}

第一次可以全量的将全部事件发送到接口中定义的 ch 中,后续的更新事件,只需要发送更新的事件信息内容即可,如果信息被删除了则可以发送一个为空的事件内容(包含 Source),所有事件通过 Source 字段作为唯一 key。

1
2
3
4
5
6
7
8
9
10
11
// Discovery implements the discoverer interface for discovering
// targets from Kubernetes.
// 每个 role 会启动一个单独的 Discovery 进行跟踪
type Discovery struct {
sync.RWMutex
client kubernetes.Interface // 连接到 k8s 的 client
role Role
logger log.Logger
namespaceDiscovery *NamespaceDiscovery // 保存需要监控的 namespace
discoverers []discoverer // 每个 role 按照 namespace 进行划分,单独跟踪的 sd
}

其中 discoverers 分不同的 namespace,每个 namespace 会单独起一个内部的 discoverer 来进行单独的跟踪。

1
2
3
4
// This is only for internal use.
type discoverer interface {
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

初始化和运行

main 函数入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func main() {
// ...
// 初始化
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"))

// Notify 的作用待定?
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"))

// ...

reloaders := []func(cfg *config.Config) error{
// JobName 为 key ,对应到相关配置
// v.JobName] = v.ServiceDiscoveryConfig
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
// ...

{
// Scrape discovery manager.
g.Add(
func() error {
// 调用 Run 启动
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
},
)
}

if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}

整个 DS 的入口在 prometheus/discovery/manager.go

NewManager 函数用于生成 DS Mgr 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
// NewManager is the Discovery Manager constructor
func NewManager(ctx context.Context, logger log.Logger) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
return &Manager{
logger: logger,
syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group),
discoverCancel: []context.CancelFunc{},
ctx: ctx,
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
m.mtx.Lock()
defer m.mtx.Unlock()

m.cancelDiscoverers()
// 对于配置文件组织管理
for name, scfg := range cfg {
m.registerProviders(scfg, name)
}

// 全部启动
for _, prov := range m.providers {
m.startProvider(m.ctx, prov)
}

return nil
}

将每个 JobName 为 key 的结构进行注册管理

1
2
3
4
5
6
7
8
// provider 用于管理可能相同配置的不同 job 任务
// provider holds a Discoverer instance, its configuration and its subscribers.
type provider struct {
name string // "kubernetes_sd_configs/[0-n]"
d Discoverer // 对应的 Discoverer
subs []string // 配置相关情况下的,可能是不同的 JobName
config interface{} // 对应的相关配置
}

Manager 结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Manager maintains a set of discovery providers and sends each update to a map channel.
// Targets are grouped by the target set name.
type Manager struct {
logger log.Logger
mtx sync.RWMutex
ctx context.Context
discoverCancel []context.CancelFunc

// Some Discoverers(eg. k8s) send only the updates for a given target group
// so we use map[tg.Source]*targetgroup.Group to know which group to update.
// poolkey: job_name + provider_n, 将事件放到各个 job 任务的队列中
targets map[poolKey]map[string]*targetgroup.Group
// providers keeps track of SD providers.
providers []*provider
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
// 经过聚合后,将需要更新的对象信息发送到 jobName 的队列中
syncCh chan map[string][]*targetgroup.Group
}

registerProviders:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *Manager) registerProviders(cfg sd_config.ServiceDiscoveryConfig, setName string){
// setName 为 JobName
add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
t := reflect.TypeOf(cfg).String() // kubernetes_sd_configs
for _, p := range m.providers {
if reflect.DeepEqual(cfg, p.config) {
p.subs = append(p.subs, setName)
return
}
}

// call kubernetes.New(log.With(m.logger, "discovery", "k8s"), cfg)
d, err := newDiscoverer()
provider := provider{
// t = "kubernetes_sd_configs"
name: fmt.Sprintf("%s/%d", t, len(m.providers)),
d: d,
config: cfg,
subs: []string{setName},
}
m.providers = append(m.providers, &provider)
}

// ...
// 循环处理 k8s 相关的配置
for _, c := range cfg.KubernetesSDConfigs {
add(c, func() (Discoverer, error) {
return kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
})
}
// ...

整体结构如下:

1
2
3
Manager --> []provider -> provider[k8s/0] -->  Discovery ->  [roleA]discoverys -> ns1, ns2 
provider[k8s/n] -> [roleB]discoverys -> ns1, ns2
provider[xxx/n] discovery: Service, Endpoints, Service..

启动:

1
2
3
4
5
6
7
8
9
10
11
12
func (m *Manager) startProvider(ctx context.Context, p *provider) {
level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
ctx, cancel := context.WithCancel(ctx)

// updates 为 SD 对外输出目标的通道,需要重点关注
updates := make(chan []*targetgroup.Group)

m.discoverCancel = append(m.discoverCancel, cancel)

go p.d.Run(ctx, updates) // 循环启动
go m.updater(ctx, p, updates)
}

updater(ctx, p, updates) 负责从 SD 发送的通道中读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

triggerUpdate := make(chan struct{}, 1)

for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
if !ok {
level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name)
select {
case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update.
level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name)
return
case <-ctx.Done():
return
}

}

// s: job_name, provider: k8s/n
// 针对可能的订阅者(包括相同配置的订阅者) 发送事件
for _, s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}

select {
case triggerUpdate <- struct{}{}:
default:
}
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
case <-triggerUpdate:
select {
// m.allGroups() 按照 pkey.SetName (job_name 进行聚合)
case m.syncCh <- m.allGroups():
default:
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name)
select {
case triggerUpdate <- struct{}{}:
default:
}
}
default:
}
}
}
}

读取 channel 数据后,放入到相对应的 poolkey 中进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) {
m.mtx.Lock()
defer m.mtx.Unlock()

for _, tg := range tgs {
if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics.
if _, ok := m.targets[poolKey]; !ok {
m.targets[poolKey] = make(map[string]*targetgroup.Group)
}
// poolkey: job_name + provider_n,
m.targets[poolKey][tg.Source] = tg
}
}
}

最终 Manager 将数据汇总到了 syncCh chan map[string][]*targetgroup.Group 中的定义的 JobName 对应的队列中,通过其 SyncCh 函数将该通道返回出去

处理更新后的事件

main 函数中,scrapeManager 负责从 DS Manager 的输出队列中读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int main(){

// ...
{
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C

err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}

// ...
}

scraple/manager.go 中定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
logger log.Logger
append Appendable
graceShut chan struct{}

mtxTargets sync.Mutex // Guards the fields below.
targetsActive []*Target
targetsDropped []*Target
targetsAll map[string][]*Target

mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
}

// Run starts background processing to handle target updates and reload the scraping loops.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
for {
select {
case ts := <-tsets:
m.reload(ts)
case <-m.graceShut:
return nil
}
}
}

reload 函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *Manager) reload(t map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

tDropped := make(map[string][]*Target)
tActive := make(map[string][]*Target)

for tsetName, tgroup := range t {
var sp *scrapePool
if existing, ok := m.scrapePools[tsetName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
}
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
} else {
sp = existing
}

// Sync 函数中用于过滤相关符合条件的 Target
tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
}

// 更新获取和丢弃的目标,可以通过界面查询到对应的结果
m.targetsUpdate(tActive, tDropped)
}

sync 定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
// 同步将目标组转换为实际的抓取目标,并将当前运行的抓取与结果集同步,并返回所有刮取和删除的目标。
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) {
start := time.Now()

var all []*Target
sp.mtx.Lock()
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
// targetsFromGroup 函数通过相关配置完成转换
targets, err := targetsFromGroup(tg, sp.config)

for _, t := range targets {
// 返回目标的标签,不是处理后的,不以 “__” 为前缀
if t.Labels().Len() > 0 { // 不存在以 “__" 为前缀匹配的标签
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
sp.mtx.Unlock()
sp.sync(all)

targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()

sp.mtx.RLock()
for _, t := range sp.targets {
tActive = append(tActive, t)
}
tDropped = sp.droppedTargets
sp.mtx.RUnlock()

return tActive, tDropped
}

targetsFromGroup函数根据输入的对象信息和相关配置完成过滤的整体工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// targetsFromGroup builds targets based on the given TargetGroup and config.
// targetsFromGroup 根据给定的 TargetGroup 和 config 构建目标
func targetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets))

for i, tlset := range tg.Targets {
// 将每个数组中的标签和通用标签合并进行过滤
lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))

// 合并发现的标签
for ln, lv := range tlset {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}

// 合并通用标签
for ln, lv := range tg.Labels {
if _, ok := tlset[ln]; !ok {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}

// 复制一份合并后的标签列表
lset := labels.New(lbls...)

// 根据 cfg 配置来进行过滤, lset 为本次的全量标签; lbls 为根据配置处理后的标签集合,origLabels 为处理之前的原始标签集
lbls, origLabels, err := populateLabels(lset, cfg)

// 如果 lbls 或者 origLabels 有一个不为空,则加入
if lbls != nil || origLabels != nil { // cfg.Params 配置中添加到 url 后的参数
targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
}
}
return targets, nil
}

过滤后的 Target 结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// Labels before any processing.
discoveredLabels labels.Labels

// Any labels that are added to this target and its metrics.
labels labels.Labels

// Additional URL parmeters that are part of the target URL.
params url.Values

mtx sync.RWMutex
lastError error
lastScrape time.Time
health TargetHealth
metadata metricMetadataStore
}

populateLabels 根据给定的标签集和 scrape 配置构建标签集。
会在重新标记应用之前返回标签集作为第二个返回值。
如果在重新标记期间丢弃目标,则返回在应用重新标记之前找到的原始发现标签集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
// 函数根据给定的 labels 和 相关配置的选项,来进行 relabel 的处理,返回的第一个参数为匹配后的结果集,第二个参数返回应用之前的 labels, 如果第一个参数为空,则表示该目标被丢失,比如 action:drop
func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
// Copy labels into the labelset for the target if they are not set already.
scrapeLabels := []labels.Label{
{Name: model.JobLabel, Value: cfg.JobName},
{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
{Name: model.SchemeLabel, Value: cfg.Scheme},
}
lb := labels.NewBuilder(lset)

for _, l := range scrapeLabels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if len(v) > 0 {
lb.Set(model.ParamLabelPrefix+k, v[0])
}
}

preRelabelLabels := lb.Labels()
// relabel.Process 进程返回给定标签集的重新标记的副本。 relabel 按输入顺序应用。
// 如果删除标签集,则返回nill
// 可以返回修改的输入 labelSet。
// Process 会自动添加 job 和 instance 两个lable, 如果 lset 为空这说明不是监控的目标
lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)

// Check if the target was dropped.
if lset == nil {
return nil, preRelabelLabels, nil
}
if v := lset.Get(model.AddressLabel); v == "" {
return nil, nil, fmt.Errorf("no address")
}

lb = labels.NewBuilder(lset)

// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
if _, _, err := net.SplitHostPort(s); err == nil {
return false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
}
addr := lset.Get(model.AddressLabel)
// If it's an address with no trailing port, infer it based on the used scheme.
if addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset.Get(model.SchemeLabel) {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
default:
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
}
lb.Set(model.AddressLabel, addr)
}

if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return nil, nil, err
}

// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
for _, l := range lset {
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
lb.Del(l.Name)
}
}

// Default the instance label to the target address.
if v := lset.Get(model.InstanceLabel); v == "" {
lb.Set(model.InstanceLabel, addr)
}

res = lb.Labels()
for _, l := range res {
// Check label values are valid, drop the target if not.
if !model.LabelValue(l.Value).IsValid() {
return nil, nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value)
}
}
return res, preRelabelLabels, nil
}

关于 prometheus_client 相关的测试样例参见: https://github.com/DavadDi/Kubernetes_study/tree/master/prometheus_client

CATALOG
  1. 1. Service Discovery interface
  2. 2. 初始化和运行
  3. 3. 处理更新后的事件