百事通!【prometheus】-08 图解云原生服务发现机制
来源:腾讯云 | 2023-03-25 19:02:40

Prometheus服务发现机制之Kubernetes

概述

分析过云原生监控接入方案,下面开始看下云原生服务发现机制。Prometheus本身就是作为云原生监控出现的,所以对云原生服务发现支持具有天然优势。Kubernetes服务发现协议允许使用Kubernetes Rest API检索出Prometheus需要监控的targets,并且跟着集群状态进行同步变更。

kubernetes_sd_configs表示基于Kubernetes进行服务发现,服务发现目标类型使用role表示,比如:role=service,表示针对Kubernetes中的service资源对象,进行具体的服务发现操作。kubernetes_sd_configs支持的role包括:node、service、pod、endpoints、ingress


(资料图片)

原理

基于Kubernetes进行服务发现,主要针对Kubernetes中的service、pod、node等资源对象进行服务发现,Prometheus使用client-gorole中指定的资源对象进行监听。一般Prometheus部署在Kubernetes集群中的话,Prometheus可以直接利用指定的Service AccountKubernetes API进行访问。若PrometheusKubernetes集群之外,则kubernetes_sd_configs还需指定监控集群的API ServerURL以及相关的认证信息,从而能够创建对应集群的Client

“client-go是kubernetes官方提供的go语言的客户端库,go应用使用该库可以访问kubernetes的API Server,这样我们就能通过编程来对kubernetes资源进行增删改查操作。

配置示例:

- job_name: kubernetes-pod    metrics_path: /metrics    kubernetes_sd_configs:    - role: pod      namespaces:        names:        - "test01"      api_server: https://apiserver.simon:6443      bearer_token_file: d:/token.k8s       tls_config:        insecure_skip_verify: true    bearer_token_file: d:/token.k8s    tls_config:      insecure_skip_verify: true

协议分析

Kubernetes服务发现大致原理如下图:

1、通过clientset访问API Server,根据role配置获取不同的集群资源对象;

2、通过List & Watch机制,注册监听事件:

p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) {  podAddCount.Inc()  p.enqueue(o) }, DeleteFunc: func(o interface{}) {  podDeleteCount.Inc()  p.enqueue(o) }, UpdateFunc: func(_, o interface{}) {  podUpdateCount.Inc()  p.enqueue(o) },})

通过informer.AddEventHandler函数可以为集群资源添加资源事件回调方法,支持3种资源事件回调方法:AddFunc、DeleteFunc、UpdateFunc,分别对应新增资源、修改资源和删除资源时事件触发。

3、资源变更注册回调方法中,将目标资源对象转成key放入到队列queue中,如下pod资源:

func (p *Pod) enqueue(obj interface{}) {    //obj是pod资源对象,通过DeletionHandlingMetaNamespaceKeyFunc将其转换成key    //比如key=test01/nginx-deployment-5ffc5bf56c-n2pl8,即namespace/pod_name格式 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil {  return } p.queue.Add(key)}

4、后台goroutines无限循环执行process逻辑,process逻辑中就是不停从queue中提取数据进行处理,比如pod.go对应逻辑如下:

func (p *Pod) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := p.queue.Get() if quit {  return false } defer p.queue.Done(keyObj) key := keyObj.(string) //与 MetaNamespaceKeyFunc() 功能相反的是 SplitMetaNamespaceKey() 函数,它将传入的 Key 分解,返回对象所在的命名空间和对象名称。 namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil {  return true }    //根据key获取资源对象obj o, exists, err := p.store.GetByKey(key) if err != nil {  return true } if !exists {  //pod被删除时,exists=false  // 然后发送targets为空的tg,即移除  send(ctx, ch, &targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)})  return true } pod, err := convertToPod(o) if err != nil {  level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err)  return true }    //p.buildPod(pod):将资源对象信息转成target groups send(ctx, ch, p.buildPod(pod)) return true}

大致逻辑:

a、根据从queue中提取的key,使用p.store.GetByKey(key)获取对应的资源对象,比如pod、service等对象;

b、如果对象不存在,则表示资源对象被删除,则创建一个targets集合为空的target groups,这样Scrape Manager就会移除targets

c、使用buildXXX(obj)将资源对象解析成target groups,如buildNode()、buildPod()等;

d、最后使用send()方法将解析的target groups通过通道channel传递出去,最终传递给Scrape Manager,这样target groupstargets将被Prometheus抓取监控数据。

pod资源的target groups结构如下示例,每个pod对象都会被解析成target groups,其中包含targets集合、labels标签集合:

Discovery创建

1、假如我们定义如下抓取作业:

- job_name: kubernetes-nodes-cadvisor    metrics_path: /metrics    scheme: https    kubernetes_sd_configs:    - role: node      api_server: https://apiserver.simon:6443      bearer_token_file: d:/token.k8s       tls_config:        insecure_skip_verify: true    bearer_token_file: d:/token.k8s    tls_config:      insecure_skip_verify: true    relabel_configs:    # 将标签(.*)作为新标签名,原有值不变    - action: labelmap      regex: __meta_kubernetes_node_label_(.*)    # 修改NodeIP:10250为APIServerIP:6443    - action: replace      regex: (.*)      source_labels: ["__address__"]      target_label: __address__      replacement: 192.168.52.151:6443    - action: replace      source_labels: [__meta_kubernetes_node_name]      target_label: __metrics_path__      regex: (.*)      replacement: /api/v1/nodes/${1}/proxy/metrics/cadvisor  

会被解析成kubernetes.SDConfig如下:

kubernetes.SDConfig定义如下:

type SDConfig struct { APIServer          config.URL              `yaml:"api_server,omitempty"` Role               Role                    `yaml:"role"` HTTPClientConfig   config.HTTPClientConfig `yaml:",inline"` NamespaceDiscovery NamespaceDiscovery      `yaml:"namespaces,omitempty"` Selectors          []SelectorConfig        `yaml:"selectors,omitempty"`}

2、Discovery创建

//创建Clientset,可看成操作Kubernetes API的客户端c, err := kubernetes.NewForConfig(kcfg) if err != nil {  return nil, err }return &Discovery{  client:             c,  logger:             l,  role:               conf.Role,  namespaceDiscovery: &conf.NamespaceDiscovery,  discoverers:        make([]discovery.Discoverer, 0),  selectors:          mapSelector(conf.Selectors),}, nil

3、Discovery创建完成,最后会调用Discovery.Run()启动服务发现:

func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { d.Lock() namespaces := d.getNamespaces() switch d.role { case RoleEndpointSlice:  role=endpointslice逻辑 case RoleEndpoint:  role=endpoints逻辑 case RolePod:  role=pod逻辑 case RoleService:  role=service逻辑 case RoleIngress:  role=ingress逻辑 case RoleNode:  role=node逻辑 default:  level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role) } var wg sync.WaitGroup for _, dd := range d.discoverers {  wg.Add(1)  go func(d discovery.Discoverer) {   defer wg.Done()   d.Run(ctx, ch)  }(dd) } d.Unlock() wg.Wait() <-ctx.Done()}

4、注册集群资源对象监听事件回调逻辑:

for _, namespace := range namespaces { p := d.client.CoreV1().Pods(namespace) plw := &cache.ListWatch{  ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {   options.FieldSelector = d.selectors.pod.field   options.LabelSelector = d.selectors.pod.label   return p.List(ctx, options)  },  WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {   options.FieldSelector = d.selectors.pod.field   options.LabelSelector = d.selectors.pod.label   return p.Watch(ctx, options)  }, } pod := NewPod(  log.With(d.logger, "role", "pod"),  cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) d.discoverers = append(d.discoverers, pod) go pod.informer.Run(ctx.Done())}
精彩推荐