type Discoverer interface { Run(ctx context.Context, ch chan<- []*targetgroup.Group) }
Prometheus 支持了众多的 SD 发现机制,代码位于 discovery 目录下。
Group 的结构定义如下:
1 2 3 4 5 6 7 8 9 10 11
// Group is a set of targets with a common label set(production , test, staging etc.). type Group struct { // Targets is a list of targets identified by a label set. Each target is // uniquely identifiable in the group by its address label. Targets []model.LabelSet // Labels is a set of labels that is common across all targets in the group. Labels model.LabelSet
// Source is an identifier that describes a group of targets. Source string }
// ApplyConfig removes all running discovery providers and starts new ones using the provided config. func(m *Manager)ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig)error { m.mtx.Lock() defer m.mtx.Unlock()
m.cancelDiscoverers() // 对于配置文件组织管理 for name, scfg := range cfg { m.registerProviders(scfg, name) } // 全部启动 for _, prov := range m.providers { m.startProvider(m.ctx, prov) }
returnnil }
将每个 JobName 为 key 的结构进行注册管理
1 2 3 4 5 6 7 8
// provider 用于管理可能相同配置的不同 job 任务 // provider holds a Discoverer instance, its configuration and its subscribers. type provider struct { name string// "kubernetes_sd_configs/[0-n]" d Discoverer // 对应的 Discoverer subs []string// 配置相关情况下的,可能是不同的 JobName config interface{} // 对应的相关配置 }
Manager 结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Manager maintains a set of discovery providers and sends each update to a map channel. // Targets are grouped by the target set name. type Manager struct { logger log.Logger mtx sync.RWMutex ctx context.Context discoverCancel []context.CancelFunc
// Some Discoverers(eg. k8s) send only the updates for a given target group // so we use map[tg.Source]*targetgroup.Group to know which group to update. // poolkey: job_name + provider_n, 将事件放到各个 job 任务的队列中 targets map[poolKey]map[string]*targetgroup.Group // providers keeps track of SD providers. providers []*provider // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. // 经过聚合后,将需要更新的对象信息发送到 jobName 的队列中 syncCh chanmap[string][]*targetgroup.Group }
for { select { case <-ctx.Done(): return case tgs, ok := <-updates: if !ok { level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name) select { case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update. level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name) return case <-ctx.Done(): return }
} // s: job_name, provider: k8s/n // 针对可能的订阅者(包括相同配置的订阅者) 发送事件 for _, s := range p.subs { m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) }
select { case triggerUpdate <- struct{}{}: default: } case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { case <-triggerUpdate: select { // m.allGroups() 按照 pkey.SetName (job_name 进行聚合) case m.syncCh <- m.allGroups(): default: level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name) select { case triggerUpdate <- struct{}{}: default: } } default: } } } }
for _, tg := range tgs { if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. if _, ok := m.targets[poolKey]; !ok { m.targets[poolKey] = make(map[string]*targetgroup.Group) } // poolkey: job_name + provider_n, m.targets[poolKey][tg.Source] = tg } } }
int main(){ // ... { // Scrape manager. g.Add( func() error { // When the scrape manager receives a new targets list // it needs to read a valid config for each job. // It depends on the config being in sync with the discovery manager so // we wait until the config is fully loaded. <-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err }, func(err error) { // Scrape manager needs to be stopped before closing the local TSDB // so that it doesn't try to write samples to a closed storage. level.Info(logger).Log("msg", "Stopping scrape manager...") scrapeManager.Stop() }, ) } // ... }
// Manager maintains a set of scrape pools and manages start/stop cycles // when receiving new target groups form the discovery manager. type Manager struct { logger log.Logger append Appendable graceShut chanstruct{}
// Run starts background processing to handle target updates and reload the scraping loops. func(m *Manager)Run(tsets <-chanmap[string][]*targetgroup.Group)error { for { select { case ts := <-tsets: m.reload(ts) case <-m.graceShut: returnnil } } }
// Sync converts target groups into actual scrape targets and synchronizes // the currently running scraper with the resulting set and returns all scraped and dropped targets. // 同步将目标组转换为实际的抓取目标,并将当前运行的抓取与结果集同步,并返回所有刮取和删除的目标。 func(sp *scrapePool)Sync(tgs []*targetgroup.Group)(tActive []*Target, tDropped []*Target) { start := time.Now()
var all []*Target sp.mtx.Lock() sp.droppedTargets = []*Target{} for _, tg := range tgs { // targetsFromGroup 函数通过相关配置完成转换 targets, err := targetsFromGroup(tg, sp.config)
for _, t := range targets { // 返回目标的标签,不是处理后的,不以 “__” 为前缀 if t.Labels().Len() > 0 { // 不存在以 “__" 为前缀匹配的标签 all = append(all, t) } elseif t.DiscoveredLabels().Len() > 0 { sp.droppedTargets = append(sp.droppedTargets, t) } } } sp.mtx.Unlock() sp.sync(all)
// Target refers to a singular HTTP or HTTPS endpoint. type Target struct { // Labels before any processing. discoveredLabels labels.Labels // Any labels that are added to this target and its metrics. labels labels.Labels // Additional URL parmeters that are part of the target URL. params url.Values
// populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling. // 函数根据给定的 labels 和 相关配置的选项,来进行 relabel 的处理,返回的第一个参数为匹配后的结果集,第二个参数返回应用之前的 labels, 如果第一个参数为空,则表示该目标被丢失,比如 action:drop funcpopulateLabels(lset labels.Labels, cfg *config.ScrapeConfig)(res, orig labels.Labels, err error) { // Copy labels into the labelset for the target if they are not set already. scrapeLabels := []labels.Label{ {Name: model.JobLabel, Value: cfg.JobName}, {Name: model.MetricsPathLabel, Value: cfg.MetricsPath}, {Name: model.SchemeLabel, Value: cfg.Scheme}, } lb := labels.NewBuilder(lset)
for _, l := range scrapeLabels { if lv := lset.Get(l.Name); lv == "" { lb.Set(l.Name, l.Value) } } // Encode scrape query parameters as labels. for k, v := range cfg.Params { iflen(v) > 0 { lb.Set(model.ParamLabelPrefix+k, v[0]) } }
// Check if the target was dropped. if lset == nil { returnnil, preRelabelLabels, nil } if v := lset.Get(model.AddressLabel); v == "" { returnnil, nil, fmt.Errorf("no address") }
lb = labels.NewBuilder(lset)
// addPort checks whether we should add a default port to the address. // If the address is not valid, we don't append a port either. addPort := func(s string)bool { // If we can split, a port exists and we don't have to add one. if _, _, err := net.SplitHostPort(s); err == nil { returnfalse } // If adding a port makes it valid, the previous error // was not due to an invalid address and we can append a port. _, _, err := net.SplitHostPort(s + ":1234") return err == nil } addr := lset.Get(model.AddressLabel) // If it's an address with no trailing port, infer it based on the used scheme. if addPort(addr) { // Addresses reaching this point are already wrapped in [] if necessary. switch lset.Get(model.SchemeLabel) { case"http", "": addr = addr + ":80" case"https": addr = addr + ":443" default: returnnil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) } lb.Set(model.AddressLabel, addr) }
// Meta labels are deleted after relabelling. Other internal labels propagate to // the target which decides whether they will be part of their label set. for _, l := range lset { if strings.HasPrefix(l.Name, model.MetaLabelPrefix) { lb.Del(l.Name) } }
// Default the instance label to the target address. if v := lset.Get(model.InstanceLabel); v == "" { lb.Set(model.InstanceLabel, addr) }
res = lb.Labels() for _, l := range res { // Check label values are valid, drop the target if not. if !model.LabelValue(l.Value).IsValid() { returnnil, nil, fmt.Errorf("invalid label value for %q: %q", l.Name, l.Value) } } return res, preRelabelLabels, nil }