程序印象

Istio源码系列3:pilot-discovery 源码分析

2019/02/08 Share

本系列链接:


[TOC]

架构

介绍

完整的 yaml 文件参见 pilot-yaml

Dockerfile

1
2
3
4
5
FROM istionightly/base_debug

ADD pilot-discovery /usr/local/bin/
ADD cacert.pem /cacert.pem
ENTRYPOINT ["/usr/local/bin/pilot-discovery"]

启动命令行

1
2
# ps -ef -www|grep pilot
$/usr/local/bin/pilot-discovery discovery

命令行帮助

1
2
3
4
5
6
7
8
9
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery --help
Usage:
pilot-discovery [command]

Available Commands:
discovery Start Istio proxy discovery service
help Help about any command
request Makes an HTTP request to Pilot metrics/debug endpoint
version Prints out build version information

discovery 相关的帮助

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help
Defaulting container name to discovery.

Start Istio proxy discovery service

Usage:
pilot-discovery discovery [flags]

Flags:
-a, --appNamespace string Restrict the applications namespace the controller manages; if not set, controller watches all namespaces
--cfConfig string Cloud Foundry config file
--clusterRegistriesConfigMap string ConfigMap map for clusters config store
--clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs
--configDir string Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client.
--consulserverInterval duration Interval (in seconds) for polling the Consul service registry (default 2s)
--consulserverURL string URL for the Consul server
--disable-install-crds Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected. It is recommended to be disable for highly available setups.
--discovery_cache Enable caching discovery service responses (default true)
--domain string DNS domain suffix (default "cluster.local")
--grpcAddr string Discovery service grpc address (default ":15010")
-h, --help help for discovery
--httpAddr string Discovery service HTTP address (default ":8080")
--kubeconfig string Use a Kubernetes configuration file instead of in-cluster configuration
--meshConfig string File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh")
--monitoringAddr string HTTP address to use for the exposing pilot self-monitoring information (default ":9093")
-n, --namespace string Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable
--plugins stringSlice comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter])
--profile Enable profiling via web interface host:port/debug/pprof (default true)
--registries stringSlice Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes])
--resync duration Controller resync interval (default 1m0s)
--secureGrpcAddr string Discovery service grpc address, with https (default ":15012")
--webhookEndpoint string Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket

Global Flags:
省略

主要参数:

名称 默认值 备注
–appNamespace 与 helm 安装中的 oneNamespace 对应
–configDir 表明 pilot 的两种来源:配置文件和 CRD
–discovery_cache ture 启动 cache,有助于提升性能
–domain “cluster.local” k8s 中域后缀
–grpcAddr :15010
–httpAddr :8080
–meshConfig “/etc/istio/config/mesh”
–registries Kubernetes

代码分析

函数入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
discoveryCmd = &cobra.Command{
Use: "discovery",
Short: "Start Istio proxy discovery service.",
Args: cobra.ExactArgs(0),
RunE: func(c *cobra.Command, args []string) error {
// ...

// Create the stop channel for all of the servers.
stop := make(chan struct{})

// Create the server for the discovery service.
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {
return fmt.Errorf("failed to create discovery service: %v", err)
}

// Start the server
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery service: %v", err)
}

cmd.WaitSignal(stop)
return nil
},
}

istio.io/istio/pilot/pkg/bootstrap/server.go

mesh 的默认配置参见:https://gist.github.com/DavadDi/f110459d339e260f818250287fc78ccc#file-mesh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// NewServer creates a new Server instance based on the provided arguments.
func NewServer(args PilotArgs) (*Server, error) {
// ...
s := &Server{
filewatcher: filewatcher.NewWatcher(),
}

// 省略错误处理
s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient
s.initMesh(&args) // 初始化配置,并添加到 filewatcher 中, /etc/istio/config/mesh

// 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加
// serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks"
// 初始化配置,并加入到 filewatcher 中监听
s.initMeshNetworks(&args)

// initMixerSan configures the mixerSAN configuration item.
// The mesh must already have been configured.
s.initMixerSan(&args)

// creates the config controller in the pilotConfig.
// 最终会创建一个 crd.NewController 实例
s.initConfigController(&args)
/* 内部以 kube 为例,表明主要流程
controller, err := s.makeKubeConfigController(args)
s.configController = controller

s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop) // 1. 第一个启动的 configController
})
*/

// creates and initializes the service controllers
s.initServiceControllers(&args)
/*
s.createK8sServiceControllers(serviceControllers, args)
--> kube.NewController(s.kubeClient, args.Config.ControllerOptions)

s.addStartFunc(func(stop <-chan struct{}) error {
go s.ServiceController.Run(stop) // 2. 第二个启动的 ServiceController
return nil
})
*/

// 初始化 DiscoveryService gRPC 服务器,后面详细讲解
// 添加2个 func 到 startFuncs 中
s.initDiscoveryService(&args)

// initializes the configuration for the pilot monitoring server.
// 添加1个 func 到 startFuncs 中
s.initMonitor(&args)

// starts the secret controller to watch for remote
// clusters and initialize the multicluster structures.
// 主要指定了,连接到远程集群的配置和需要监控的 namespace,并启动一个 secret controller
// 监视 istio/multiCluster=true 的 secret
// --clusterRegistriesConfigMap ConfigMap map for clusters config store
// --clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs
// 暂时不分析
s.initClusterRegistries(&args)

return s, nil
}

// 将 NewServer 函数中初始化的函数依次启动起来
func (s *Server) Start(stop <-chan struct{}) error {
// Now start all of the components.
for _, fn := range s.startFuncs {
fn(stop)
}
}

istio.io/api/mesh/v1alpha1/network.pb.go

其中 MeshNetworks 结构体和定义说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MeshNetworks (config map) provides information about the set of networks
// inside a mesh and how to route to endpoints in each network. For example
//
// MeshNetworks(file/config map):
// networks:
// - network1:
// - endpoints:
// - fromRegistry: registry1 #must match secret name in kubernetes
// - fromCidr: 192.168.100.0/22 #a VM network for example
// gateways:
// - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local
// port: 15443
// locality: us-east-1a
type MeshNetworks struct {
// REQUIRED: The set of networks inside this mesh. Each network should
// have a unique name and information about how to infer the endpoints in
// the network as well as the gateways associated with the network.
Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
}

经过对于 NewServer 函数的计划分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func NewServer(args PilotArgs) (*Server, error) {
// 省略错误处理
s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient

// 初始化相关配置,并监视配置的变化情况
// ...

// creates the config controller in the pilotConfig.
// 最终会创建一个 crd.NewController 实例
s.initConfigController(&args)
// s.makeKubeConfigController(args)
// s.configController.Run(stop)


// creates and initializes the service controllers
s.initServiceControllers(&args)
// kube.NewController(s.kubeClient, args.Config.ControllerOptions)
// go s.ServiceController.Run(stop)

// 初始化 DiscoveryService gRPC 服务器,后面详细讲解
// 添加 2 个 func 到 startFuncs 中
s.initDiscoveryService(&args)

return s, nil
}

ConfigController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
// k8s 方式下初始化
controller, err := s.makeKubeConfigController(args)
s.configController = controller

// Defer starting the controller until after the service is created.
s.addStartFunc(func(stop <-chan struct{}) error {
go s.configController.Run(stop)
return nil
})

//...

// Create the config store.
s.istioConfigStore = model.MakeIstioStore(s.configController)

return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {
kubeCfgFile := s.getKubeCfgFile(args)
configClient, err := crd.NewClient(
kubeCfgFile, "",
model.IstioConfigTypes, // 全部相关的 CRD 定义
args.Config.ControllerOptions.DomainSuffix)


if !args.Config.DisableInstallCRDs {
// 注册自定义的 CRD
configClient.RegisterResources()
}

return crd.NewController(configClient, args.Config.ControllerOptions), nil
}

其中 crd.NewClient 中的参数 model.IstioConfigTypes 包含了相关的全部 CRD 的定义:

istio.io/istio/pilot/pkg/model/config.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// IstioConfigTypes lists all Istio config types with schemas and validation
IstioConfigTypes = ConfigDescriptor{
VirtualService,
Gateway,
ServiceEntry,
DestinationRule,
EnvoyFilter,
Sidecar,
HTTPAPISpec,
HTTPAPISpecBinding,
QuotaSpec,
QuotaSpecBinding,
AuthenticationPolicy,
AuthenticationMeshPolicy,
ServiceRole,
ServiceRoleBinding,
RbacConfig,
ClusterRbacConfig,
}

其中 crd.NewController 定义在 istio.io/istio/pilot/pkg/config/kube/crd/controller.go 文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NewController creates a new Kubernetes controller for CRDs
// Use "" for namespace to listen for all namespace changes
func NewController(client *Client, options kube.ControllerOptions) model.ConfigStoreCache {
log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)

// Queue requires a time duration for a retry delay after a handler error
out := &controller{
client: client,
queue: kube.NewQueue(1 * time.Second),
kinds: make(map[string]cacheHandler),
}

// add stores for CRD kinds
for _, schema := range client.ConfigDescriptor() {
out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod)
}

return out
}

对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;

1
2
3
4
5
6
7
8
9
10
11
12
// controller is a collection of synchronized resource watchers.
// Caches are thread-safe
type controller struct {
client *Client // 每类资源一个连接
queue kube.Queue
kinds map[string]cacheHandler
}

type cacheHandler struct {
informer cache.SharedIndexInformer
handler *kube.ChainHandler
}
1
2
3
4
5
6
7
8
// Client is a basic REST client for CRDs implementing config store
type Client struct {
// Map of apiVersion to restClient.
clientset map[string]*restClient

// domainSuffix for the config metadata
domainSuffix string
}

ServiceController

istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go

结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
type Controller struct {
domainSuffix string

client kubernetes.Interface
queue Queue
services cacheHandler
endpoints cacheHandler
nodes cacheHandler

pods *PodCache

//....
}

NewController 函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// NewController creates a new Kubernetes controller
// Created by bootstrap and multicluster (see secretcontroler).
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
options.WatchedNamespace, options.ResyncPeriod)

// Queue requires a time duration for a retry delay after a handler error
out := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
queue: NewQueue(1 * time.Second),
ClusterID: options.ClusterID,
XDSUpdater: options.XDSUpdater,
servicesMap: make(map[model.Hostname]*model.Service),
externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
}

sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))

svcInformer := sharedInformers.Core().V1().Services().Informer()
out.services = out.createCacheHandler(svcInformer, "Services")

epInformer := sharedInformers.Core().V1().Endpoints().Informer()
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")

nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")

podInformer := sharedInformers.Core().V1().Pods().Informer()
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)

return out
}

从上面代码可以很清晰看到,ServiceController 监听特定 namespace 下的以下资源:

  1. Services
  2. Endpoints
  3. Nodes
  4. Pod

ServiceDiscovery

istio.io/istio/pilot/pkg/bootstrap/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (s *Server) initDiscoveryService(args *PilotArgs) error {
// 需要注意 env 保存了后面使用的变量包括
// s.istioConfigStore s.ServiceController s.ServiceController
environment := &model.Environment{
Mesh: s.mesh,
MeshNetworks: s.meshNetworks,
IstioConfigStore: s.istioConfigStore, // istio routing rules
ServiceDiscovery: s.ServiceController, // service list
ServiceAccounts: s.ServiceController,
MixerSAN: s.mixerSAN,
}

// Set up discovery service
discovery, err := envoy.NewDiscoveryService(
environment,
args.DiscoveryOptions,
)

s.mux = discovery.RestContainer.ServeMux

// 创建 envoyv2.NewDiscoveryServer 对应的 gRPC Server
s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
environment,
istio_networking.NewConfigGenerator(args.Plugins),
s.ServiceController,
s.configController)

s.EnvoyXdsServer.InitDebug(s.mux, s.ServiceController)

// ...

// create grpc/http server
s.initGrpcServer(args.KeepaliveOptions)
s.httpServer = &http.Server{
Addr: args.DiscoveryOptions.HTTPAddr,
Handler: s.mux,
}

// create http listener
listener, err := net.Listen("tcp", args.DiscoveryOptions.HTTPAddr)
s.HTTPListeningAddr = listener.Addr()

// create grpc listener
grpcListener, err := net.Listen("tcp", args.DiscoveryOptions.GrpcAddr)
s.GRPCListeningAddr = grpcListener.Addr()

s.addStartFunc(func(stop <-chan struct{}) error {
// 启动 http server goroutine
// 启动 gRPC server goroutine
// 等待关闭 goroutine

return nil
})

// run secure grpc server

return nil
}

istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go,envoyv2.NewDiscoveryServer 函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 	s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(
// environment,
// istio_networking.NewConfigGenerator(args.Plugins),
// s.ServiceController,
// s.configController)
// NewDiscoveryServer creates DiscoveryServer that sources data from Pilot's internal mesh data structures
func NewDiscoveryServer(env *model.Environment, generator core.ConfigGenerator, ctl model.Controller, configCache model.ConfigStoreCache) *DiscoveryServer {
// 创建 DiscoveryServer 对象
out := &DiscoveryServer{
Env: env,
ConfigGenerator: generator, // generates xDS responses
ConfigController: configCache,
EndpointShardsByService: map[string]*EndpointShards{},
WorkloadsByID: map[string]*Workload{},
edsUpdates: map[string]*EndpointShards{},
concurrentPushLimit: make(chan struct{}, 20),
updateChannel: make(chan *updateReq, 10),
}
env.PushContext = model.NewPushContext()

// 处理相关的更新操作
// handleUpdates处理来自 updateChannel 的事件它确保自上次事件处理之前至少已经过了minQuiet时间。
// 它还确保在接收事件和处理事件之间最多经过 maxDelay。
// 最后调用 doPush 函数进行推送,根据全量推送标记,将最近更新的 eds 相关信息通过
// XDS Incremental Push 或者 全量推送出去
go out.handleUpdates()

// 以下三种清空 DiscoveryServer 的本地缓存,并注册清理缓存的函数
// 1. service 相关的信息有变化的时候,
// 2. jwt public key 发生变化
// 3. Istio CRD 有变化的
// 当以上三种任一情况发生的时候,会设置信息到 out.handleUpdates() 函数

// 周期性更新
go out.periodicRefresh()

// 周期性更新 metrics
go out.periodicRefreshMetrics()

out.DebugConfigs = pilot.DebugConfigs

pushThrottle := intEnv(pilot.PushThrottle, 10)
pushBurst := intEnv(pilot.PushBurst, 100)

adsLog.Infof("Starting ADS server with rateLimiter=%d burst=%d", pushThrottle, pushBurst)
out.rateLimiter = rate.NewLimiter(rate.Limit(pushThrottle), pushBurst)
out.initRateLimiter = rate.NewLimiter(rate.Limit(pushThrottle*2), pushBurst*2)

return out
}

ADS 相关定义:

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/service/discovery/v2/ads.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// See https://github.com/lyft/envoy-api#apis for a description of the role of
// ADS and how it is intended to be used by a management server. ADS requests
// have the same structure as their singleton xDS counterparts, but can
// multiplex many resource types on a single stream. The type_url in the
// DiscoveryRequest/DiscoveryResponse provides sufficient information to recover
// the multiplexed singleton APIs at the Envoy instance and management server.
service AggregatedDiscoveryService {
// This is a gRPC-only API.
rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest)
returns (stream envoy.api.v2.DiscoveryResponse) {
}

rpc IncrementalAggregatedResources(stream envoy.api.v2.IncrementalDiscoveryRequest)
returns (stream envoy.api.v2.IncrementalDiscoveryResponse) {
}
}

https://github.com/envoyproxy/data-plane-api/blob/master/envoy/api/v2/discovery.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// A DiscoveryRequest requests a set of versioned resources of the same type for
// a given Envoy node on some API.
message DiscoveryRequest {
// The version_info provided in the request messages will be the version_info
// received with the most recent successfully processed response or empty on
// the first request. It is expected that no new request is sent after a
// response is received until the Envoy instance is ready to ACK/NACK the new
// configuration. ACK/NACK takes place by returning the new API config version
// as applied or the previous API config version respectively. Each type_url
// (see below) has an independent version associated with it.
string version_info = 1;

// The node making the request.
core.Node node = 2;

// List of resources to subscribe to, e.g. list of cluster names or a route
// configuration name. If this is empty, all resources for the API are
// returned. LDS/CDS expect empty resource_names, since this is global
// discovery for the Envoy instance. The LDS and CDS responses will then imply
// a number of resources that need to be fetched via EDS/RDS, which will be
// explicitly enumerated in resource_names.
repeated string resource_names = 3;

// Type of the resource that is being requested, e.g.
// "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit
// in requests made via singleton xDS APIs such as CDS, LDS, etc. but is
// required for ADS.
string type_url = 4;

// nonce corresponding to DiscoveryResponse being ACK/NACKed. See above
// discussion on version_info and the DiscoveryResponse nonce comment. This
// may be empty if no nonce is available, e.g. at startup or for non-stream
// xDS implementations.
string response_nonce = 5;

// This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>`
// failed to update configuration. The *message* field in *error_details* provides the Envoy
// internal exception related to the failure. It is only intended for consumption during manual
// debugging, the string provided is not guaranteed to be stable across Envoy versions.
google.rpc.Status error_detail = 6;
}

message DiscoveryResponse {
// The version of the response data.
string version_info = 1;

// The response resources. These resources are typed and depend on the API being called.
repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];

// [#not-implemented-hide:]
// Canary is used to support two Envoy command line flags:
//
// * --terminate-on-canary-transition-failure. When set, Envoy is able to
// terminate if it detects that configuration is stuck at canary. Consider
// this example sequence of updates:
// - Management server applies a canary config successfully.
// - Management server rolls back to a production config.
// - Envoy rejects the new production config.
// Since there is no sensible way to continue receiving configuration
// updates, Envoy will then terminate and apply production config from a
// clean slate.
// * --dry-run-canary. When set, a canary response will never be applied, only
// validated via a dry run.
bool canary = 3;

// Type URL for resources. This must be consistent with the type_url in the
// Any messages for resources if resources is non-empty. This effectively
// identifies the xDS API when muxing over ADS.
string type_url = 4;

// For gRPC based subscriptions, the nonce provides a way to explicitly ack a
// specific DiscoveryResponse in a following DiscoveryRequest. Additional
// messages may have been sent by Envoy to the management server for the
// previous version on the stream prior to this DiscoveryResponse, that were
// unprocessed at response send time. The nonce allows the management server
// to ignore any further DiscoveryRequests for the previous version until a
// DiscoveryRequest bearing the nonce. The nonce is optional and is not
// required for non-stream based xDS implementations.
string nonce = 5;

// [#not-implemented-hide:]
// The control plane instance that sent the response.
core.ControlPlane control_plane = 6;
}

关于 ADS 保证一致性的内容参见:envoy-的-xds-rest-和-grpc-协议详解 中的 ”最终一致性考虑“ 章节:

一般来说,为避免流量丢弃,更新的顺序应该遵循 make before break 模型,其中

  • 必须始终先推送 CDS 更新(如果有)。
  • EDS 更新(如果有)必须在相应集群的 CDS 更新后到达。
  • LDS 更新必须在相应的 CDS/EDS 更新后到达。
  • 与新添加的监听器相关的 RDS 更新必须在最后到达。
  • 最后,删除过期的 CDS 集群和相关的 EDS 端点(不再被引用的端点)。

ADS 允许单一管理服务器通过单个 gRPC 流,提供所有的 API 更新。配合仔细规划的更新顺序,ADS 可规避更新过程中流量丢失。

pilot/pkg/proxy/envoy/v2/ads.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// ...

// InitContext returns immediately if the context was already initialized.
// InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
// 1. initServiceRegistry
// 2. initVirtualServices
// 3. initDestinationRules
// 4. initAuthorizationPolicies
// 5. InitSidecarScopes
err := s.globalPushContext().InitContext(s.Env)
con := newXdsConnection(peerAddr, stream)

reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)

for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
err = s.initConnectionNode(discReq, con)

switch discReq.TypeUrl {
case ClusterType:
// ...
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())

case ListenerType:
// ...
adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr)
con.LDSWatch = true
err := s.pushLds(con, s.globalPushContext(), true, versionInfo())

case RouteType:
// ...
adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes))
err := s.pushRoute(con, s.globalPushContext())

case EndpointType:
// ...
// 各种错误处理

for _, cn := range con.Clusters {
s.removeEdsCon(cn, con.ConID, con)
}

for _, cn := range clusters {
s.addEdsCon(cn, con.ConID, con)
}

con.Clusters = clusters
adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
err := s.pushEds(s.globalPushContext(), con, true, nil)
if err != nil {
return err
}

default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
// ...
case pushEv := <-con.pushChannel:
// ...

err := s.pushConnection(con, pushEv)
if err != nil {
return nil
}

}
}
}

istio.io/istio/pilot/pkg/model/push_context.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// PushContext tracks the status of a push - metrics and errors.
// Metrics are reset after a push - at the beginning all
// values are zero, and when push completes the status is reset.
// The struct is exposed in a debug endpoint - fields public to allow
// easy serialization as json.
type PushContext struct {

// privateServices are reachable within the same namespace.
privateServicesByNamespace map[string][]*Service
// publicServices are services reachable within the mesh.
publicServices []*Service

privateVirtualServicesByNamespace map[string][]Config
publicVirtualServices []Config

// destination rules are of three types:
// namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace
// namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace
// allExportedDestRules: all (public) dest rules across all namespaces
// We need the allExportedDestRules in addition to namespaceExportedDestRules because we select
// the dest rule based on the most specific host match, and not just any destination rule
namespaceLocalDestRules map[string]*processedDestRules
namespaceExportedDestRules map[string]*processedDestRules
allExportedDestRules *processedDestRules

// sidecars for each namespace
sidecarsByNamespace map[string][]*SidecarScope
////////// END ////////

// The following data is either a global index or used in the inbound path.
// Namespace specific views do not apply here.

// ServiceByHostname has all services, indexed by hostname.
ServiceByHostname map[Hostname]*Service `json:"-"`

// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there
// are no authorization policies in the cluster.
AuthzPolicies *AuthorizationPolicies `json:"-"`

// ServicePort2Name is used to keep track of service name and port mapping.
// This is needed because ADS names use port numbers, while endpoints use
// port names. The key is the service name. If a service or port are not found,
// the endpoint needs to be re-evaluated later (eventual consistency)
ServicePort2Name map[string]PortList `json:"-"`

initDone bool
}


// InitContext will initialize the data structures used for code generation.
// This should be called before starting the push, from the thread creating
// the push context.
func (ps *PushContext) InitContext(env *Environment) error {
ps.Mutex.Lock()
defer ps.Mutex.Unlock()
if ps.initDone {
return nil
}
ps.Env = env
var err error

// Caches list of services in the registry, and creates a map
// of hostname to service -> ServicePort2Name map[string]PortList `json:"-"`
ps.initServiceRegistry(env)

// Caches list of virtual services -> publicVirtualServices []Config
ps.initVirtualServices(env)

// Split out of DestinationRule expensive conversions - once per push.
// 最后保存到以下三个变量中:
// * namespaceLocalDestRules map[string]*processedDestRules
// * namespaceExportedDestRules map[string]*processedDestRules
// * allExportedDestRules *processedDestRules
ps.initDestinationRules(env)

// Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies
ps.initAuthorizationPolicies(env)

// Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope
ps.InitSidecarScopes(env)

ps.initDone = true
return nil
}

CDS 为 ADS 中第一个发送的信息,后续我们以 CDS 为例进行详细分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// ...

// InitContext returns immediately if the context was already initialized.
// InitContext 从 env 从取出需要发送到客户端的数据,后续会继续分析
// 1. initServiceRegistry
// 2. initVirtualServices
// 3. initDestinationRules
// 4. initAuthorizationPolicies
// 5. InitSidecarScopes
err := s.globalPushContext().InitContext(s.Env)
con := newXdsConnection(peerAddr, stream)

reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个新的 goroutine 来从客户端接受相关的数据 xdsapi.DiscoveryRequest
go receiveThread(con, reqChannel, &receiveError)

for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
// 主要是调用 `ParseServiceNodeWithMetadata` 函数,
// 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象
// 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain
err = s.initConnectionNode(discReq, con)

switch discReq.TypeUrl {
case ClusterType:
// 如果已经发送过 CDS 数据后的响应消息的处理
if con.CDSWatch {
// ...
}
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String())
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}

//...
}

core.Node

Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{
“id”: “…”,
“cluster”: “…”,
“metadata”: “{…}”,
“locality”: “{…}”,
“build_version”: “…”
}

id
(string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via –service-node.

cluster
(string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via –service-cluster.

metadata
(Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.

locality
(core.Locality) Locality specifying where the Envoy instance is running.

build_version
(string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.

initConnectionNode 函数中,主要是调用 ParseServiceNodeWithMetadata 函数,从 Req 的消息中获取到各种信息,并生产 Proxy 对象;

istio.io/istio/pilot/pkg/model/context.go

1
2
func ParseServiceNodeWithMetadata(s string, metadata map[string]string) (*Proxy, error) {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway,
// etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from
// 'node' info in the protocol as well as data extracted from registries.
//
// In current Istio implementation nodes use a 4-parts '~' delimited ID.
// Type~IPAddress~ID~Domain
type Proxy struct {
// ClusterID specifies the cluster where the proxy resides.
// TODO: clarify if this is needed in the new 'network' model, likely needs to
// be renamed to 'network'
ClusterID string

// Type specifies the node type. First part of the ID.
Type NodeType

// IPAddresses is the IP addresses of the proxy used to identify it and its
// co-located service instances. Example: "10.60.1.6". In some cases, the host
// where the poxy and service instances reside may have more than one IP address
IPAddresses []string

// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and
// namespace.
ID string

// Locality is the location of where Envoy proxy runs.
Locality Locality

// DNSDomain defines the DNS domain suffix for short hostnames (e.g.
// "default.svc.cluster.local")
DNSDomain string

// TrustDomain defines the trust domain of the certificate
TrustDomain string

// ConfigNamespace defines the namespace where this proxy resides
// for the purposes of network scoping.
// NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES
ConfigNamespace string

// Metadata key-value pairs extending the Node identifier
Metadata map[string]string

// the sidecarScope associated with the proxy
SidecarScope *SidecarScope
}

推送 CDS 的核心实现通过函数 s.pushCds(con, s.globalPushContext(), versionInfo())

istio.io/istio/pilot/pkg/proxy/envoy/v2/cds.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
// 通过当前的 con.modelNode 和 push 的上下文生成对应的 rawClusters 对象 []*xdsapi.Cluster
rawClusters, err := s.generateRawClusters(con.modelNode, push)

// DebugConfigs controls saving snapshots of configs for /debug/adsz.
// Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1
// 如果通过 env 开启了此选项,则可以使用 9093 端口 /debug/adsz 查看详细信息,会增加内存开销
if s.DebugConfigs {
con.CDSClusters = rawClusters
}

response := con.clusters(rawClusters)
err = con.send(response)
return nil
}

因此 s.generateRawClusters(con.modelNode, push) 的作用不言而喻,就是将 push 上下文中与 CDS 相关的数据整理并封装成 CDS Reponse 的格式。

1
2
3
4
5
6
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) ([]*xdsapi.Cluster, error) {
rawClusters, err := s.ConfigGenerator.BuildClusters(s.Env, node, push)

// 对 rawClusters 中的信息进行 Validate 验证
return rawClusters, nil
}

对象 rawClusters 是通过 s.ConfigGenerator.BuildClusters 函数基于 s.Env, node, push 三者的信息组合而生成出来的:

  • s.Env 保存了 ServiceController 和 ConfigController 等资源的本地缓存信息

  • node 为本次 Req 请求中生成的包含 Node 相关信息的对象

  • push 为本次推送的上下文,已经将本次推送过程中需要的信息完成了初步的整理(从 s.Env 中生成出来的)

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

BuildClusters 函数的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution
// For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
clusters := make([]*apiv2.Cluster, 0)

switch proxy.Type {
case model.SidecarProxy:
// GetProxyServiceInstances returns service instances co-located with the proxy
// 获取与 proxy 所在主机上的 service instance,包括 headless 服务
instances, err := env.GetProxyServiceInstances(proxy)

sidecarScope := proxy.SidecarScope
recomputeOutboundClusters := true
// 追加 OutboundClusters
if recomputeOutboundClusters {
clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
}

// Let ServiceDiscovery decide which IP and Port are used for management if
// there are multiple IPs
managementPorts := make([]*model.Port, 0)
for _, ip := range proxy.IPAddresses {
managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
}

// 追加与 proxy ip 上 managementPorts 相关的 InboundClusters
clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)

default: // Gateways
// ...
}

// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
// DO NOT CALL PLUGINS for these two clusters.
clusters = append(clusters, buildBlackHoleCluster())
clusters = append(clusters, buildDefaultPassthroughCluster())

// resolves cluster name conflicts.
return normalizeClusters(push, proxy, clusters), nil
}

临时的调试方法,挂载到了 9093 端口,但是是否保存 ads 相关的信息,还会受到 pilot 相关选项的限制,参见

istio.io/istio/pkg/features/pilot/pilot.go

1
2
3
4
5
> // DebugConfigs controls saving snapshots of configs for /debug/adsz.
> // Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1
> // For larger clusters it can increase memory use and GC - useful for small tests.
> DebugConfigs = os.Getenv("PILOT_DEBUG_ADSZ_CONFIG") == "1"
>

istio.io/istio/pilot/pkg/proxy/envoy/v2/debug.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// InitDebug initializes the debug handlers and adds a debug in-memory registry.
func (s *DiscoveryServer) InitDebug(mux *http.ServeMux, sctl *aggregate.Controller) {
// For debugging and load testing v2 we add an memory registry.
s.MemRegistry = NewMemServiceDiscovery(
map[model.Hostname]*model.Service{ // mock.HelloService.Hostname: mock.HelloService,
}, 2)
s.MemRegistry.EDSUpdater = s
s.MemRegistry.ClusterID = "v2-debug"

sctl.AddRegistry(aggregate.Registry{
ClusterID: "v2-debug",
Name: serviceregistry.ServiceRegistry("memAdapter"),
ServiceDiscovery: s.MemRegistry,
ServiceAccounts: s.MemRegistry,
Controller: s.MemRegistry.controller,
})

mux.HandleFunc("/ready", s.ready)

mux.HandleFunc("/debug/edsz", s.edsz)
mux.HandleFunc("/debug/adsz", s.adsz)
mux.HandleFunc("/debug/cdsz", cdsz)
mux.HandleFunc("/debug/syncz", Syncz)

mux.HandleFunc("/debug/registryz", s.registryz)
mux.HandleFunc("/debug/endpointz", s.endpointz)
mux.HandleFunc("/debug/endpointShardz", s.endpointShardz)
mux.HandleFunc("/debug/workloadz", s.workloadz)
mux.HandleFunc("/debug/configz", s.configz)

mux.HandleFunc("/debug/authenticationz", s.authenticationz)
mux.HandleFunc("/debug/config_dump", s.ConfigDump)
mux.HandleFunc("/debug/push_status", s.PushStatusHandler)
}

PushContext 初始化

istio.io/istio/pilot/pkg/model/push_context.go

1
2
3
4
5
6
7
8
9
10
11
12
func (ps *PushContext) InitContext(env *Environment) error {
// 只会初始化一次,如果已经初始化了则直接返回
if ps.initDone {
return nil
}

ps.initServiceRegistry(env)
ps.initVirtualServices(env)
ps.initDestinationRules(env)
ps.initAuthorizationPolicies(env)
ps.InitSidecarScopes(env)
}

其中 env 保存了能够使用的全部信息,包括 ServiceDiscovery 接口。

istio.io/istio/pilot/pkg/model/context.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// Environment provides an aggregate environmental API for Pilot
type Environment struct {
// Discovery interface for listing services and instances.
ServiceDiscovery

ServiceAccounts
IstioConfigStore

Mesh *meshconfig.MeshConfig
MixerSAN []string
PushContext *PushContext
MeshNetworks *meshconfig.MeshNetworks
}
  • initServiceRegistry

    • 从 env.Services() -> ServiceDiscovery::Services
    • 涉及操作的变量包括:
      • privateServicesByNamespace ns 为 key
      • publicServices 列表
      • ServiceByHostname s.Hostname -> Service
      • ServicePort2Name s.Hostname -> Ports
  • initVirtualServices

    • 从 env.List(VirtualService.Type, NamespaceAll) -> IstioConfigStore::ConfigStore::List

    • 将 virtual services 的 host shortnames 转换成 FQDNs

    • 涉及操作的变量包括:

      • privateVirtualServicesByNamespace ns 为 key
      • publicVirtualServices 列表
    • 相关定义

      VirtualService -> (Hosts, []*HTTPRoute) -> (HTTPMatchRequest, HTTPRouteDestination)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38

      apiVersion: networking.istio.io/v1alpha3
      kind: VirtualService
      metadata:
      name: reviews-route
      spec:
      hosts:
      - reviews.prod.svc.cluster.local
      http: # HTTPRoute
      - match:
      - uri:
      prefix: "/wpcatalog"
      - uri:
      prefix: "/consumercatalog"
      rewrite:
      uri: "/newcatalog"
      route:
      - destination: # HTTPRouteDestination
      host: reviews.prod.svc.cluster.local
      subset: v2
      - route:
      - destination:
      host: reviews.prod.svc.cluster.local
      subset: v1

      apiVersion: networking.istio.io/v1alpha3
      kind: DestinationRule
      metadata:
      name: reviews-destination
      spec:
      host: reviews.prod.svc.cluster.local
      subsets:
      - name: v1
      labels:
      version: v1
      - name: v2
      labels:
      version: v2
  • initDestinationRules

    • env.List(DestinationRule.Type, NamespaceAll) -> IstioConfigStore::ConfigStore::List
    • 操作变量涉及
      • namespaceLocalDestRules ns -> processedDestRules, ConfigScope == PRIVATE
      • namespaceExportedDestRules ns -> processedDestRules, ConfigScope == PUBLIC
      • allExportedDestRules 列表
  • initAuthorizationPolicies

    • env.IstioConfigStore.ClusterRbacConfig()
    • 操作变量
      • AuthzPolicies
  • InitSidecarScopes

    • env.List(Sidecar.Type, NamespaceAll) -> env.List(Sidecar.Type, NamespaceAll)
    • 操作的变量
      • sidecarsByNamespace ns -> SidecarScope

CDS 初始化流程详解

DiscoveryServer::pushCds -> DiscoveryServer::generateRawClusters -> ConfigGenerator.BuildClusters,最终的函数主体在函数 BuildClusters 中实现

BuildClusters

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output
// For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname
// Cluster type based on resolution: For inbound (sidecar only):
// Cluster for each inbound endpoint port and for each service port
func (configgen *ConfigGeneratorImpl) BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) ([]*apiv2.Cluster, error) {
clusters := make([]*apiv2.Cluster, 0)

switch proxy.Type {
case model.SidecarProxy:
// 1. 获取 Proxy 所在 Pod 上的监听的服务名,在 k8s 中是通过 proxyIP 进行相关的 Pod 查找
// 主要用于生成 Inbound 相关的集群
// GetProxyServiceInstances returns the service instances that
// co-located(in the same network namespace and security context)
// with a given Proxy
instances, err := env.GetProxyServiceInstances(proxy)

sidecarScope := proxy.SidecarScope
recomputeOutboundClusters := true
if configgen.CanUsePrecomputedCDS(proxy) {
// 如果已经缓存过了,则直接使用
}
}
// 2. 如果没有缓存,则直接计算 configgen.buildOutboundClusters(env, proxy, push)
if recomputeOutboundClusters {
clusters = append(clusters, configgen.buildOutboundClusters(env, proxy, push)...)
}

// Let ServiceDiscovery decide which IP and Port are used for management if
// there are multiple IPs
// managementPorts 主要是从 Pod 中获取 Liveness && Readiness probes 的端口
managementPorts := make([]*model.Port, 0)
for _, ip := range proxy.IPAddresses {
managementPorts = append(managementPorts, env.ManagementPorts(ip)...)
}

//
clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)

// ...

// Add a blackhole and passthrough cluster for catching traffic to unresolved routes
// DO NOT CALL PLUGINS for these two clusters.
// 添加 blackhole 和 passthrough cluster
clusters = append(clusters, buildBlackHoleCluster())
clusters = append(clusters, buildDefaultPassthroughCluster())

}

以多版本的 HelloWorld 为例,最终生成的样例如下,完整样例参见 istio_helloworld_v1_v2.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
  "clusters": {   
"dynamic_active_clusters": [
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
"name": "BlackHoleCluster",
"connect_timeout": "1s"
},
"last_updated": "2019-02-13T09:28:37.971Z"
},
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
"name": "BlackHoleCluster",
"connect_timeout": "1s"
},
"last_updated": "2019-02-13T09:28:37.971Z"
},
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
// fmt.Sprintf("%s|%d|%s|%s", direction, port, subsetName, hostname)
"name": "inbound|5000||helloworld.default.svc.cluster.local",
"connect_timeout": "1s",
"hosts": [
{
"socket_address": {
"address": "127.0.0.1",
"port_value": 5000
}
}
],
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T09:28:37.971Z"
},

{
"version_info": "2019-02-13T12:41:46Z/10",
"cluster": {
"name": "outbound|5000|v1|helloworld.default.svc.cluster.local",
"type": "EDS",
"eds_cluster_config": {
"eds_config": {
"ads": {} // ESS 类型的,需要通过 EDS 来进行获取 !!!
},
"service_name": "outbound|5000|v1|helloworld.default.svc.cluster.local"
},
"connect_timeout": "1s",
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T12:41:46.640Z"
},

{
"version_info": "2019-02-13T12:41:46Z/10",
"cluster": {
"name": "outbound|5000|v2|helloworld.default.svc.cluster.local",
"type": "EDS",
"eds_cluster_config": {
"eds_config": {
"ads": {}
},
"service_name": "outbound|5000|v2|helloworld.default.svc.cluster.local"
},
"connect_timeout": "1s",
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T12:41:46.641Z"
},
{
"version_info": "2019-02-13T09:28:16Z/7",
"cluster": {
"name": "outbound|5000||helloworld.default.svc.cluster.local",
"type": "EDS",
"eds_cluster_config": {
"eds_config": {
"ads": {}
},
"service_name": "outbound|5000||helloworld.default.svc.cluster.local"
},
"connect_timeout": "1s",
"circuit_breakers": {
"thresholds": [
{}
]
}
},
"last_updated": "2019-02-13T09:28:37.928Z"
},
]
},
}

GetProxyServiceInstances

GetProxyServiceInstances 函数在 kube 中的实现通过 ProxyIP 所在的 Pod 查找对应的服务,代码如下:

istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// GetProxyServiceInstances returns service instances co-located with a given proxy
func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.ServiceInstance, error) {
out := make([]*model.ServiceInstance, 0)

// There is only one IP for kube registry
proxyIP := proxy.IPAddresses[0]
proxyNamespace := ""

pod := c.pods.getPodByIP(proxyIP)
if pod != nil {
proxyNamespace = pod.Namespace
// 1. find proxy service by label selector,
// if not any, there may exist headless service
// failover to 2
svcLister := listerv1.NewServiceLister(c.services.informer.GetIndexer())
if services, err := svcLister.GetPodServices(pod); err != nil && len(services) > 0 {
for _, svc := range services {
item, exists, err := c.endpoints.informer.GetStore().GetByKey(KeyFunc(svc.Namespace, svc.Name))

ep := *item.(*v1.Endpoints)
out = append(out, c.getProxyServiceInstancesByEndpoint(ep, proxy)...)
}
return out, nil
}
}

// 2. Headless service
endpointsForPodInSameNS := make([]*model.ServiceInstance, 0)
endpointsForPodInDifferentNS := make([]*model.ServiceInstance, 0)
for _, item := range c.endpoints.informer.GetStore().List() {
ep := *item.(*v1.Endpoints)
endpoints := &endpointsForPodInSameNS
if ep.Namespace != proxyNamespace {
endpoints = &endpointsForPodInDifferentNS
}

*endpoints = append(*endpoints, c.getProxyServiceInstancesByEndpoint(ep, proxy)...)
}

// Put the endpointsForPodInSameNS in front of endpointsForPodInDifferentNS so that Pilot will
// first use endpoints from endpointsForPodInSameNS. This makes sure if there are two endpoints
// referring to the same IP/port, the one in endpointsForPodInSameNS will be used. (The other one
// in endpointsForPodInDifferentNS will thus be rejected by Pilot).
out = append(endpointsForPodInSameNS, endpointsForPodInDifferentNS...)
if len(out) == 0 {
if c.Env != nil {
c.Env.PushContext.Add(model.ProxyStatusNoService, proxy.ID, proxy, "")
status := c.Env.PushContext
} else {}
}
return out, nil
}

buildOutboundClusters

查找出 proxy 可见的 Service 和 公开的 Service,设置成对应的 Cluster。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
func (configgen *ConfigGeneratorImpl) buildOutboundClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext) []*apiv2.Cluster {
clusters := make([]*apiv2.Cluster, 0)

inputParams := &plugin.InputParams{
Env: env,
Push: push,
Node: proxy,
}
networkView := model.GetNetworkView(proxy)

// push.Services(proxy) 会返回对于Proxy 可见的 PrivateService 和 PublicServices
for _, service := range push.Services(proxy) {
config := push.DestinationRule(proxy, service)
for _, port := range service.Ports {
if port.Protocol == model.ProtocolUDP {
continue
}
inputParams.Service = service
inputParams.Port = port

lbEndpoints := buildLocalityLbEndpoints(env, networkView, service, port.Port, nil)
// create default cluster
// outbound|5000||helloworld.default.svc.cluster.local
discoveryType := convertResolution(service.Resolution)
clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
serviceAccounts := env.ServiceAccounts.GetIstioServiceAccounts(service.Hostname, []int{port.Port})
defaultCluster := buildDefaultCluster(env, clusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound)

updateEds(defaultCluster)
setUpstreamProtocol(defaultCluster, port)
clusters = append(clusters, defaultCluster)

// 如果 service 存在对应的 destinationRule 存在,则需要将 destinationRule
// 中定义的 subset 也添加到 outbound 类型的 cluster 集合中
// outbound|5000|v1|helloworld.default.svc.cluster.local
// outbound|5000|v2|helloworld.default.svc.cluster.local
// v1 和 v2 为 subset name
if config != nil {
destinationRule := config.Spec.(*networking.DestinationRule)
defaultSni := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port)
applyTrafficPolicy(env, defaultCluster, destinationRule.TrafficPolicy, port, serviceAccounts,
defaultSni, DefaultClusterMode, model.TrafficDirectionOutbound)
setLocalityPriority := false
if defaultCluster.OutlierDetection != nil {
setLocalityPriority = true
}
applyLocalityLBSetting(proxy, defaultCluster.LoadAssignment, env.Mesh.LocalityLbSetting, setLocalityPriority)
for _, subset := range destinationRule.Subsets {
inputParams.Subset = subset.Name
subsetClusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port)
defaultSni := model.BuildDNSSrvSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port)

// clusters with discovery type STATIC, STRICT_DNS or LOGICAL_DNS rely on cluster.hosts field
// ServiceEntry's need to filter hosts based on subset.labels in order to perform weighted routing
if discoveryType != apiv2.Cluster_EDS && len(subset.Labels) != 0 {
lbEndpoints = buildLocalityLbEndpoints(env, networkView, service, port.Port, []model.Labels{subset.Labels})
}
subsetCluster := buildDefaultCluster(env, subsetClusterName, discoveryType, lbEndpoints, model.TrafficDirectionOutbound)
updateEds(subsetCluster)
setUpstreamProtocol(subsetCluster, port)
applyTrafficPolicy(env, subsetCluster, destinationRule.TrafficPolicy, port, serviceAccounts, defaultSni,
DefaultClusterMode, model.TrafficDirectionOutbound)
applyTrafficPolicy(env, subsetCluster, subset.TrafficPolicy, port, serviceAccounts, defaultSni,
DefaultClusterMode, model.TrafficDirectionOutbound)
setLocalityPriority = false
if subsetCluster.OutlierDetection != nil {
setLocalityPriority = true
}
applyLocalityLBSetting(proxy, subsetCluster.LoadAssignment, env.Mesh.LocalityLbSetting, setLocalityPriority)
// call plugins
for _, p := range configgen.Plugins {
p.OnOutboundCluster(inputParams, subsetCluster)
}
clusters = append(clusters, subsetCluster)
}
}

// call plugins for the default cluster
for _, p := range configgen.Plugins {
p.OnOutboundCluster(inputParams, defaultCluster)
}
}
}

return clusters
}

buildInboundClusters

buildInboundClusters 中需要添加 k8s 用于管理端口相关的信息,ManagementPorts 函数则是通过 proxyIP 查找到与 ProxyIP 相同的 Pod,然后基于 Pod 的规范获取到 Liveness 和 Readiness probes 中定义的相关端口;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// ManagementPorts implements a service catalog operation
// addr 为 proxy 的 IP 地址
func (c *Controller) ManagementPorts(addr string) model.PortList {
pod := c.pods.getPodByIP(addr)
if pod == nil {
return nil
}

managementPorts, err := convertProbesToPorts(&pod.Spec)
return managementPorts
}

// convertProbesToPorts
// convertProbesToPorts returns a PortList consisting of the ports where the
// pod is configured to do Liveness and Readiness probes
func convertProbesToPorts(t *v1.PodSpec) (model.PortList, error) {
set := make(map[string]*model.Port)
for _, container := range t.Containers {
for _, probe := range []*v1.Probe{container.LivenessProbe, container.ReadinessProbe} {

p, err := convertProbePort(&container, &probe.Handler)
if err != nil {
errs = multierror.Append(errs, err)
} else if p != nil && set[p.Name] == nil {
// Deduplicate along the way. We don't differentiate between HTTP vs TCP mgmt ports
set[p.Name] = p
}
}
}

mgmtPorts := make(model.PortList, 0, len(set))
for _, p := range set {
mgmtPorts = append(mgmtPorts, p)
}
sort.Slice(mgmtPorts, func(i, j int) bool { return mgmtPorts[i].Port < mgmtPorts[j].Port })

return mgmtPorts, errs
}

buildInboundClusters 函数的主体实现如下:

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (configgen *ConfigGeneratorImpl) buildInboundClusters(env *model.Environment, proxy *model.Proxy,
push *model.PushContext, instances []*model.ServiceInstance, managementPorts []*model.Port) []*apiv2.Cluster {

clusters := make([]*apiv2.Cluster, 0)

sidecarScope := proxy.SidecarScope
noneMode := proxy.GetInterceptionMode() == model.InterceptionNone

// 如果没有设置 sidecarScope 和 ingressListener
if sidecarScope == nil || !sidecarScope.HasCustomIngressListeners {
// ...

// 为传入的 instances 建立 inbound 相关的集群
for _, instance := range instances {
pluginParams := &plugin.InputParams{
Env: env,
Node: proxy,
ServiceInstance: instance,
Port: instance.Endpoint.ServicePort,
Push: push,
Bind: LocalhostAddress, // 设定 Hosts 为 127.0.0.1
}
localCluster := configgen.buildInboundClusterForPortOrUDS(pluginParams)
clusters = append(clusters, localCluster)
}

// 添加管理端口, clusterName 格式为 inbound|port_name|mgmCluster|port
// Add a passthrough cluster for traffic to management ports (health check ports)
for _, port := range managementPorts {
clusterName := model.BuildSubsetKey(model.TrafficDirectionInbound, port.Name,
ManagementClusterHostname, port.Port)

localityLbEndpoints := buildInboundLocalityLbEndpoints(LocalhostAddress, port.Port)
mgmtCluster := buildDefaultCluster(env, clusterName, apiv2.Cluster_STATIC, localityLbEndpoints,
model.TrafficDirectionInbound)
setUpstreamProtocol(mgmtCluster, port)
clusters = append(clusters, mgmtCluster)
}
} else {
// ....
}

return clusters
}

接着调用将各种信息拼装成 Plugin 结构,传入 buildInboundClusterForPortOrUDS 来进行最终的拼装,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Plugin is called during the construction of a xdsapi.Listener which may alter the Listener in any
// way. Examples include AuthenticationPlugin that sets up mTLS authentication on the inbound Listener
// and outbound Cluster, the mixer plugin that sets up policy checks on the inbound listener, etc.
type Plugin interface {
// OnOutboundListener is called whenever a new outbound listener is added to the LDS output for a given service.
// Can be used to add additional filters on the outbound path.
OnOutboundListener(in *InputParams, mutable *MutableObjects) error

// OnInboundListener is called whenever a new listener is added to the LDS output for a given service
// Can be used to add additional filters.
OnInboundListener(in *InputParams, mutable *MutableObjects) error

// OnOutboundCluster is called whenever a new cluster is added to the CDS output.
// This is called once per push cycle, and not for every sidecar/gateway, except for gateways with non-standard
// operating modes.
OnOutboundCluster(in *InputParams, cluster *xdsapi.Cluster)

// OnInboundCluster is called whenever a new cluster is added to the CDS output.
// Called for each sidecar
OnInboundCluster(in *InputParams, cluster *xdsapi.Cluster)

// OnOutboundRouteConfiguration is called whenever a new set of virtual hosts (a set of virtual hosts with routes) is
// added to RDS in the outbound path.
OnOutboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)

// OnInboundRouteConfiguration is called whenever a new set of virtual hosts are added to the inbound path.
OnInboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)

// OnInboundFilterChains is called whenever a plugin needs to setup the filter chains, including relevant filter chain
// configuration, like FilterChainMatch and TLSContext.
OnInboundFilterChains(in *InputParams) []FilterChain
}

函数buildInboundClusterForPortOrUDS完成了最后拼装 cluster 对象的接力赛:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
for _, instance := range instances {
pluginParams := &plugin.InputParams{
Env: env,
Node: proxy,
ServiceInstance: instance,
Port: instance.Endpoint.ServicePort,
Push: push,
Bind: LocalhostAddress,
}
localCluster := configgen.buildInboundClusterForPortOrUDS(pluginParams)
clusters = append(clusters, localCluster)
}
*/

func (configgen *ConfigGeneratorImpl) buildInboundClusterForPortOrUDS(pluginParams *plugin.InputParams) *apiv2.Cluster {
instance := pluginParams.ServiceInstance
clusterName := model.BuildSubsetKey(model.TrafficDirectionInbound, instance.Endpoint.ServicePort.Name,
instance.Service.Hostname, instance.Endpoint.ServicePort.Port)

// 设置本地绑定的地址信息
localityLbEndpoints := buildInboundLocalityLbEndpoints(pluginParams.Bind, instance.Endpoint.Port)

localCluster := buildDefaultCluster(pluginParams.Env, clusterName, apiv2.Cluster_STATIC, localityLbEndpoints,
model.TrafficDirectionInbound)
setUpstreamProtocol(localCluster, instance.Endpoint.ServicePort)

// call plugins,用于通知到相关的处理函数调用
for _, p := range configgen.Plugins {
p.OnInboundCluster(pluginParams, localCluster)
}

// When users specify circuit breakers, they need to be set on the receiver end
// (server side) as well as client side, so that the server has enough capacity
// (not the defaults) to handle the increased traffic volume

// DestinationRule returns a destination rule for a service name in a given domain.
config := pluginParams.Push.DestinationRule(pluginParams.Node, instance.Service)
if config != nil {
destinationRule := config.Spec.(*networking.DestinationRule)
if destinationRule.TrafficPolicy != nil {
// only connection pool settings make sense on the inbound path.
// upstream TLS settings/outlier detection/load balancer don't apply here.
applyConnectionPool(pluginParams.Env, localCluster, destinationRule.TrafficPolicy.ConnectionPool,
model.TrafficDirectionInbound)
}
}
return localCluster
}

istio.io/istio/pilot/pkg/networking/core/v1alpha3/cluster.go

applyConnectionPool 会根据 service 对应的 destinationRule 信息完成 ConnectionPool 部分的补齐, 函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// FIXME: there isn't a way to distinguish between unset values and zero values
func applyConnectionPool(env *model.Environment, cluster *apiv2.Cluster, settings *networking.ConnectionPoolSettings, direction model.TrafficDirection) {
threshold := GetDefaultCircuitBreakerThresholds(direction)
if settings.Http != nil {
if settings.Http.Http2MaxRequests > 0 {
// Envoy only applies MaxRequests in HTTP/2 clusters
threshold.MaxRequests = &types.UInt32Value{Value: uint32(settings.Http.Http2MaxRequests)}
}
if settings.Http.Http1MaxPendingRequests > 0 {
// Envoy only applies MaxPendingRequests in HTTP/1.1 clusters
threshold.MaxPendingRequests = &types.UInt32Value{Value: uint32(settings.Http.Http1MaxPendingRequests)}
}

if settings.Http.MaxRequestsPerConnection > 0 {
cluster.MaxRequestsPerConnection = &types.UInt32Value{Value: uint32(settings.Http.MaxRequestsPerConnection)}
}

// FIXME: zero is a valid value if explicitly set, otherwise we want to use the default
if settings.Http.MaxRetries > 0 {
threshold.MaxRetries = &types.UInt32Value{Value: uint32(settings.Http.MaxRetries)}
}
}

if settings.Tcp != nil {
if settings.Tcp.ConnectTimeout != nil {
cluster.ConnectTimeout = util.GogoDurationToDuration(settings.Tcp.ConnectTimeout)
}

if settings.Tcp.MaxConnections > 0 {
threshold.MaxConnections = &types.UInt32Value{Value: uint32(settings.Tcp.MaxConnections)}
}

applyTCPKeepalive(env, cluster, settings)
}

cluster.CircuitBreakers = &v2Cluster.CircuitBreakers{
Thresholds: []*v2Cluster.CircuitBreakers_Thresholds{threshold},
}
}

LDS 初始化流程详解

DiscoveryServer::pushLds -> DiscoveryServer::generateRawListeners -> ConfigGenerator.BuildListeners 的流程进行,主要的实现在函数 BuildListeners 中。

仍然以 HelloWorld 为例,只保留了2个相关的样例:envoy_listeners.json

相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# kubectl get pod -n istio-system -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE
helloworld-v1-8f8dd85-d59lh 2/2 Running 0 155m 10.128.5.4 node06 <none>
helloworld-v2-f9cf47df4-w9mfn 2/2 Running 0 155m 10.128.13.2 node04 <none>

# kubectl get svc -n istio-system -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
helloworld ClusterIP 10.0.40.71 <none> 5000/TCP 155m app=helloworld


# kubectl -n istio-system exec -ti helloworld-v1-8f8dd85-d59lh -c istio-proxy -- netstat -t -anlp
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:5000 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:15090 0.0.0.0:* LISTEN 23/envoy
tcp 0 0 127.0.0.1:15000 0.0.0.0:* LISTEN 23/envoy
tcp 0 0 0.0.0.0:15001 0.0.0.0:* LISTEN 23/envoy
端口号 服务名 说明
5000 helloworld endpoint 监听端口
15090 静态监听 通过 /stats/prometheus,转发到 15000 管理端口
15000 管理端口 监听在本机 127.0.0.1
15001 virtual 端口 BlackHoleCluster,用于接受 iptable 的重定向

envoy 打印的日志,可以查看到对应的加载的信息:

1
2
3
4
5
6
7
8
[lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[lds_api.cc:80] lds: add/update listener 'virtual'

会从相关的 namespace 中查找出对应暴露出去的对应端口,生成相关记录,并针对本地 endpoint 端口增加一条记录,有去重的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"listeners": {
"dynamic_active_listeners": [
// 生成一个 endpoint_ip:5000 端口的 hellworld 的记录
{
"version_info": "2019-02-13T09:28:16Z/7",
"listener": {
"name": "10.128.5.4_5000",
"address": {
"socket_address": {
"address": "10.128.5.4",
"port_value": 5000
}
},
"filter_chains": [
],
"deprecated_v1": {
"bind_to_port": false
},
"listener_filters": [
{
"name": "envoy.listener.tls_inspector",
"config": {}
}
]
},
"last_updated": "2019-02-13T09:28:38.027Z"
},

// 生成一个 0.0.0.0:5000 端口的 hellworld 的记录
{
"version_info": "2019-02-15T05:44:18Z/6",
"listener": {
"name": "0.0.0.0_5000",
"address": {
"socket_address": {
"address": "0.0.0.0",
"port_value": 5000
}
},
"filter_chains": [
],
"deprecated_v1": {
"bind_to_port": false
}
},
"last_updated": "2019-02-15T05:44:18.648Z"
},

// 为可见的其他 service 暴露出来的端口生成对应的记录

// BlackHoleCluster
{
"version_info": "2019-02-13T09:28:16Z/7",
"listener": {
"name": "virtual",
"address": {
"socket_address": {
"address": "0.0.0.0",
"port_value": 15001
}
},
"filter_chains": [
{
"filters": [
{
"name": "envoy.tcp_proxy",
"config": {
"stat_prefix": "BlackHoleCluster",
"cluster": "BlackHoleCluster"
}
}
]
}
],
"use_original_dst": true
},
"last_updated": "2019-02-13T09:28:38.080Z"
},
]
}

BuildListeners 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
// BuildListeners produces a list of listeners and referenced clusters for all proxies
func (configgen *ConfigGeneratorImpl) BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*xdsapi.Listener, error) {
switch node.Type {
case model.SidecarProxy:
// 获取本机上监听服务的 endpoint,然后根据 endpoint.ip 和 port 生成对应的记录
return configgen.buildSidecarListeners(env, node, push)
case model.Router, model.Ingress:
return configgen.buildGatewayListeners(env, node, push)
}
return nil, nil
}

buildSidecarListeners 函数主要流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// buildSidecarListeners produces a list of listeners for sidecar proxies
func (configgen *ConfigGeneratorImpl) buildSidecarListeners(env *model.Environment, node *model.Proxy,
push *model.PushContext) ([]*xdsapi.Listener, error) {

mesh := env.Mesh

proxyInstances, err := env.GetProxyServiceInstances(node)

noneMode := node.GetInterceptionMode() == model.InterceptionNone
listeners := make([]*xdsapi.Listener, 0)

if mesh.ProxyListenPort > 0 {
// 建立 inbound 相关的 Listeners
inbound := configgen.buildSidecarInboundListeners(env, node, push, proxyInstances)

// outbound 相关的 Listeners
// buildSidecarOutboundListeners generates http and tcp listeners for
// outbound connections from the proxy based on the sidecar scope associated with the proxy.
// TODO(github.com/istio/pilot/issues/237)
//
// Sharing tcp_proxy and http_connection_manager filters on the same port for
// different destination services doesn't work with Envoy (yet). When the
// tcp_proxy filter's route matching fails for the http service the connection
// is closed without falling back to the http_connection_manager.
//
// Temporary workaround is to add a listener for each service IP that requires
// TCP routing
//
// Connections to the ports of non-load balanced services are directed to
// the connection's original destination. This avoids costly queries of instance
// IPs and ports, but requires that ports of non-load balanced service be unique.
outbound := configgen.buildSidecarOutboundListeners(env, node, push, proxyInstances)

listeners = append(listeners, inbound...)
listeners = append(listeners, outbound...)

// Do not generate any management port listeners if the user has specified a SidecarScope object
// with ingress listeners. Specifying the ingress listener implies that the user wants
// to only have those specific listeners and nothing else, in the inbound path.
generateManagementListeners := true

sidecarScope := node.SidecarScope
if sidecarScope != nil && sidecarScope.HasCustomIngressListeners ||
noneMode {
generateManagementListeners = false
}

if generateManagementListeners {
// ...
}

tcpProxy := &tcp_proxy.TcpProxy{
StatPrefix: util.BlackHoleCluster,
ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.BlackHoleCluster},
}

if mesh.OutboundTrafficPolicy.Mode == meshconfig.MeshConfig_OutboundTrafficPolicy_ALLOW_ANY {
// We need a passthrough filter to fill in the filter stack for orig_dst listener
tcpProxy = &tcp_proxy.TcpProxy{
StatPrefix: util.PassthroughCluster,
ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.PassthroughCluster},
}
}
var transparent *google_protobuf.BoolValue
if node.GetInterceptionMode() == model.InterceptionTproxy {
transparent = proto.BoolTrue
}

// add an extra listener that binds to the port that is the recipient of the iptables redirect
listeners = append(listeners, &xdsapi.Listener{
Name: VirtualListenerName,
Address: util.BuildAddress(WildcardAddress, uint32(mesh.ProxyListenPort)),
Transparent: transparent,
UseOriginalDst: proto.BoolTrue,
FilterChains: []listener.FilterChain{
{
Filters: []listener.Filter{
{
Name: xdsutil.TCPProxy,
ConfigType: &listener.Filter_Config{
Config: util.MessageToStruct(tcpProxy),
},
},
},
},
},
})
}

httpProxyPort := mesh.ProxyHttpPort
if httpProxyPort == 0 && noneMode { // make sure http proxy is enabled for 'none' interception.
httpProxyPort = int32(pilot.DefaultPortHTTPProxy)
}
// enable HTTP PROXY port if necessary; this will add an RDS route for this port
if httpProxyPort > 0 {
useRemoteAddress := false
traceOperation := http_conn.EGRESS
listenAddress := LocalhostAddress

opts := buildListenerOpts{
env: env,
proxy: node,
proxyInstances: proxyInstances,
bind: listenAddress,
port: int(httpProxyPort),
filterChainOpts: []*filterChainOpts{{
httpOpts: &httpListenerOpts{
rds: RDSHttpProxy,
useRemoteAddress: useRemoteAddress,
direction: traceOperation,
connectionManager: &http_conn.HttpConnectionManager{
HttpProtocolOptions: &core.Http1ProtocolOptions{
AllowAbsoluteUrl: proto.BoolTrue,
},
},
},
}},
bindToPort: true,
skipUserFilters: true,
}
l := buildListener(opts)
// TODO: plugins for HTTP_PROXY mode, envoyfilter needs another listener match for SIDECAR_HTTP_PROXY
// there is no mixer for http_proxy
mutable := &plugin.MutableObjects{
Listener: l,
FilterChains: []plugin.FilterChain{{}},
}
pluginParams := &plugin.InputParams{
ListenerProtocol: plugin.ListenerProtocolHTTP,
ListenerCategory: networking.EnvoyFilter_ListenerMatch_SIDECAR_OUTBOUND,
Env: env,
Node: node,
ProxyInstances: proxyInstances,
Push: push,
}
if err := buildCompleteFilterChain(pluginParams, mutable, opts); err != nil {
log.Warna("buildSidecarListeners ", err.Error())
} else {
listeners = append(listeners, l)
}
// TODO: need inbound listeners in HTTP_PROXY case, with dedicated ingress listener.
}

return listeners, nil
}

EDS 初始化流程详解

eds_clusters 跟踪的架构大体如下:

cluster_name -> proxy.LocalityA -> eds_clusterA -> (node1,node2….)
proxy.LocalityB -> eds_clusterB -> (node1,node2….)

EdsClusterClusterLoadAssignment 核心结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// EdsCluster tracks eds-related info for monitored clusters. In practice it'll include
// all clusters until we support on-demand cluster loading.
type EdsCluster struct {
// ...
LoadAssignment *xdsapi.ClusterLoadAssignment

// 记录当前 cluster 被那些 proxy 使用
// EdsClients keeps track of all nodes monitoring the cluster.
EdsClients map[string]*XdsConnection `json:"-"`
}

type ClusterLoadAssignment struct {
// Name of the cluster. This will be the :ref:`service_name
// <envoy_api_field_Cluster.EdsClusterConfig.service_name>` value if specified
// in the cluster :ref:`EdsClusterConfig
// <envoy_api_msg_Cluster.EdsClusterConfig>`.
ClusterName string
// List of endpoints to load balance to.
Endpoints []endpoint.LocalityLbEndpoints
// Map of named endpoints that can be referenced in LocalityLbEndpoints.
NamedEndpoints map[string]*endpoint.Endpoint
// Load balancing policy settings.
Policy *ClusterLoadAssignment_Policy
}

对于 EDS 推送的主体函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
	case EndpointType:
// ...
// 将当前已经存在的 cluster 从当前 eds_cluster 资源前跟踪的 Node 信息删除掉
for _, cn := range con.Clusters {
s.removeEdsCon(cn, con.ConID, con)
}

// 将当前请求的 cluster 信息和 node 信息添加到,cluster 跟踪的 Node 列表中
for _, cn := range clusters {
s.addEdsCon(cn, con.ConID, con)
// addEdsCon 函数会处理不存在的情况,并初始化 eds_cluster
// -> s.getOrAddEdsCluster(connection.modelNode, clusterName)
// c := edsClusters[clusterName]
// if c == nil {
// c := &EdsCluster{
// discovery: s,
// EdsClients: map[string]*XdsConnection{},
// FirstUse: time.Now(),
// }
// edsClusters[clusterName] = map[model.Locality]*EdsCluster{
// proxy.Locality: c,
// }
// return c
// }
}

con.Clusters = clusters
adsLog.Debugf("ADS:EDS: REQ %s %s clusters: %d", peerAddr, con.ConID, len(con.Clusters))
err := s.pushEds(s.globalPushContext(), con, true, nil)
if err != nil {
return err
}

// 配置发生变化,如果有更新则出发增量更新
case pushEv := <-con.pushChannel:
// It is called when config changes.
// This is not optimized yet - we should detect what changed based on event and only
// push resources that need to be pushed.

// TODO: possible race condition: if a config change happens while the envoy
// was getting the initial config, between LDS and RDS, the push will miss the
// monitored 'routes'. Same for CDS/EDS interval.
// It is very tricky to handle due to the protocol - but the periodic push recovers
// from it.

err := s.pushConnection(con, pushEv)
if err != nil {
return nil
}

}

函数 pushEds 负责主体的推送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// pushEds is pushing EDS updates for a single connection. Called the first time
// a client connects, for incremental updates and for full periodic updates.
func (s *DiscoveryServer) pushEds(push *model.PushContext, con *XdsConnection,
full bool, edsUpdatedServices map[string]*EndpointShards) error {
loadAssignments := []*xdsapi.ClusterLoadAssignment{}

emptyClusters := 0
endpoints := 0

// 根据 conn 连接上发送过来的 clusters name 循环拉取相关的信息
for _, clusterName := range con.Clusters {
_, _, hostname, _ := model.ParseSubsetKey(clusterName)
if edsUpdatedServices != nil && edsUpdatedServices[string(hostname)] == nil {
// Cluster was not updated, skip recomputing.
continue
}

// 根据 cluster_name + con.modelNode 获取到对应的 EdsCluster 对象
c := s.getEdsCluster(con.modelNode, clusterName)

l := c.LoadAssignment

// 如果存在,则重新更新 Cluster 中 LoadAssignment 相关的信息
if l == nil { // fresh cluster
edsClusters := map[model.Locality]*EdsCluster{
con.modelNode.Locality: c,
}
s.updateCluster(push, clusterName, edsClusters)
l = loadAssignment(c)
}

// If networks are set (by default they aren't) apply the Split Horizon
// EDS filter on the endpoints
// 根据 networks 进行的 eds 水平切分,则再进行一次过滤
if s.Env.MeshNetworks != nil && len(s.Env.MeshNetworks.Networks) > 0 {
endpoints := EndpointsByNetworkFilter(l.Endpoints, con, s.Env)
endpoints = LoadBalancingWeightNormalize(endpoints)
filteredCLA := &xdsapi.ClusterLoadAssignment{
ClusterName: l.ClusterName,
Endpoints: endpoints,
Policy: l.Policy,
}
l = filteredCLA
}

endpoints += len(l.Endpoints)
loadAssignments = append(loadAssignments, l)
}

response := endpointDiscoveryResponse(loadAssignments)
err := con.send(response)
return nil
}

RDS 初始化流程详解

结合 LDS 中加载的日志信息

1
2
3
4
5
6
7
8
[lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[lds_api.cc:80] lds: add/update listener 'virtual'

在 RDS 的请求中会包括 LDS 接口中的 IP 地址为 0.0.0.0 表明是外部访问的端口,将端口号作为请求的 route_names 向 ADS Server 发起请求,比如 ”8080,9093,8060,5000,15010“ 共 5 个发起请求,响应结果仅以 helloworld 5000 端口为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
   "dynamic_route_configs": [
{
"version_info": "2019-02-15T05:53:49Z/8",
"route_config": {
"name": "5000",
"virtual_hosts": [
{
"name": "helloworld.istio-system.svc.cluster.local:5000",
"domains": [
"helloworld.istio-system.svc.cluster.local",
"helloworld.istio-system.svc.cluster.local:5000",
"helloworld",
"helloworld:5000",
"helloworld.istio-system.svc.cluster",
"helloworld.istio-system.svc.cluster:5000",
"helloworld.istio-system.svc",
"helloworld.istio-system.svc:5000",
"helloworld.istio-system",
"helloworld.istio-system:5000",
"10.0.40.71",
"10.0.40.71:5000"
],
"routes": [
{
"match": {
"prefix": "/"
},
"route": {
"weighted_clusters": {
"clusters": [
{
"name": "outbound|5000|v1|helloworld.istio-system.svc.cluster.local",
"weight": 90,
"per_filter_config": {
}
},
{
"name": "outbound|5000|v2|helloworld.istio-system.svc.cluster.local",
"weight": 10,
"per_filter_config": {
}
}
]
},
"timeout": "0s",
"max_grpc_timeout": "0s"
},
"decorator": {
"operation": "helloworld:5000/*"
},
"per_filter_config": {
"mixer": {
"disable_check_calls": true
}
}
}
]
}
],
"validate_clusters": false
},
"last_updated": "2019-02-15T05:53:49.910Z"
},
]

处理入口代码:

1
2
3
4
5
6
7
8
9
10
11
12
case RouteType:
routes := discReq.GetResourceNames()

// ...

if sortedRoutes == nil {
sort.Strings(routes)
sortedRoutes = routes
}
con.Routes = sortedRoutes
adsLog.Debugf("ADS:RDS: REQ %s %s routes: %d", peerAddr, con.ConID, len(con.Routes))
err := s.pushRoute(con, s.globalPushContext())

pushRoute 函数实现如下:

1
2
3
4
5
6
7
8
func (s *DiscoveryServer) pushRoute(con *XdsConnection, push *model.PushContext) error {
rawRoutes, err := s.generateRawRoutes(con, push)

response := routeDiscoveryResponse(rawRoutes)
err = con.send(response)
// ...
return nil
}

generateRawRoutes 主要调用函数 ConfigGenerator.BuildHTTPRoutes 完成数据的准备:

1
2
3
4
5
6
7
8
9
10
11
func (s *DiscoveryServer) generateRawRoutes(con *XdsConnection, push *model.PushContext) ([]*xdsapi.RouteConfiguration, error) {
rc := make([]*xdsapi.RouteConfiguration, 0)
for _, routeName := range con.Routes {
r, err := s.ConfigGenerator.BuildHTTPRoutes(s.Env, con.modelNode, push, routeName)
}

// 验证并追加
rc = append(rc, r)
}
return rc, nil
}

ConfigGeneratorImpl 函数根据 model.SidecarProxy 的类型调用对应的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
// BuildHTTPRoutes produces a list of routes for the proxy
func (configgen *ConfigGeneratorImpl) BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext,
routeName string) (*xdsapi.RouteConfiguration, error) {
proxyInstances, err := env.GetProxyServiceInstances(node)

switch node.Type {
case model.SidecarProxy:
return configgen.buildSidecarOutboundHTTPRouteConfig(env, node, push, proxyInstances, routeName), nil
case model.Router, model.Ingress:
return configgen.buildGatewayHTTPRouteConfig(env, node, push, proxyInstances, routeName)
}
return nil, nil
}

最终工作,会基于 routeName 和相对应的 VirtualService 共同生成最终的 Route 配置,格式上面已经给出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// buildSidecarOutboundHTTPRouteConfig builds an outbound HTTP Route for sidecar.
// Based on port, will determine all virtual hosts that listen on the port.
func (configgen *ConfigGeneratorImpl) buildSidecarOutboundHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext,
proxyInstances []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration {

listenerPort := 0
listenerPort, err = strconv.Atoi(routeName)

var virtualServices []model.Config
var services []*model.Service

// Get the list of services that correspond to this egressListener from the sidecarScope
sidecarScope := node.SidecarScope
// sidecarScope should never be nil
if sidecarScope != nil && sidecarScope.Config != nil {
// egress 相关的
} else { // 从默认的 meshGateway 中获取
meshGateway := map[string]bool{model.IstioMeshGateway: true}
services = push.Services(node)
virtualServices = push.VirtualServices(node, meshGateway)
}

nameToServiceMap := make(map[model.Hostname]*model.Service)
for _, svc := range services {
if listenerPort == 0 {
nameToServiceMap[svc.Hostname] = svc
} else {
if svcPort, exists := svc.Ports.GetByPort(listenerPort); exists {

nameToServiceMap[svc.Hostname] = &model.Service{
Hostname: svc.Hostname,
Address: svc.Address,
MeshExternal: svc.MeshExternal,
Ports: []*model.Port{svcPort},
}
}
}
}

// Collect all proxy labels for source match
var proxyLabels model.LabelsCollection
for _, w := range proxyInstances {
proxyLabels = append(proxyLabels, w.Labels)
}

// Get list of virtual services bound to the mesh gateway
virtualHostWrappers := istio_route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, nameToServiceMap, proxyLabels, virtualServices, listenerPort)
vHostPortMap := make(map[int][]route.VirtualHost)

for _, virtualHostWrapper := range virtualHostWrappers {
virtualHosts := make([]route.VirtualHost, 0, len(virtualHostWrapper.VirtualServiceHosts)+len(virtualHostWrapper.Services))
for _, host := range virtualHostWrapper.VirtualServiceHosts {
virtualHosts = append(virtualHosts, route.VirtualHost{
Name: fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port),
Domains: []string{host, fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port)},
Routes: virtualHostWrapper.Routes,
})
}

for _, svc := range virtualHostWrapper.Services {
virtualHosts = append(virtualHosts, route.VirtualHost{
Name: fmt.Sprintf("%s:%d", svc.Hostname, virtualHostWrapper.Port),
Domains: generateVirtualHostDomains(svc, virtualHostWrapper.Port, node),
Routes: virtualHostWrapper.Routes,
})
}

vHostPortMap[virtualHostWrapper.Port] = append(vHostPortMap[virtualHostWrapper.Port], virtualHosts...)
}

var virtualHosts []route.VirtualHost
if listenerPort == 0 {
virtualHosts = mergeAllVirtualHosts(vHostPortMap)
} else {
virtualHosts = vHostPortMap[listenerPort]
}

util.SortVirtualHosts(virtualHosts)
out := &xdsapi.RouteConfiguration{
Name: routeName,
VirtualHosts: virtualHosts,
ValidateClusters: proto.BoolFalse,
}

// call plugins
for _, p := range configgen.Plugins {
in := &plugin.InputParams{
ListenerProtocol: plugin.ListenerProtocolHTTP,
Env: env,
Node: node,
Push: push,
}
p.OnOutboundRouteConfiguration(in, out)
}

return out
}

Envoy 重启的 Pilot 连接日志

CDS -> EDS -> LDS -> RDS

1
2
3
4
5
6
7
8
9
10
11
2019-02-16T06:27:14.131219Z	info	ads	ADS:CDS: REQ 10.128.69.0:48816 sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local-29 66.451µs raw: node:<id:"sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local" cluster:"helloworld" metadata:<fields:<key:"INTERCEPTION_MODE" value:<string_value:"REDIRECT" > > fields:<key:"ISTIO_PROXY_SHA" value:<string_value:"istio-proxy:930841ca88b15365737acb7eddeea6733d4f98b9" > > fields:<key:"ISTIO_PROXY_VERSION" value:<string_value:"1.0.2" > > fields:<key:"ISTIO_VERSION" value:<string_value:"1.0.5" > > fields:<key:"POD_NAME" value:<string_value:"helloworld-v1-8f8dd85-f99wk" > > fields:<key:"app" value:<string_value:"helloworld" > > fields:<key:"istio" value:<string_value:"sidecar" > > fields:<key:"version" value:<string_value:"v1" > > > build_version:"0/1.8.0-dev//RELEASE" > type_url:"type.googleapis.com/envoy.api.v2.Cluster"

2019-02-16T06:27:14.131444Z info ads CDS: PUSH 2019-02-15T05:53:49Z/8 for helloworld-v1-8f8dd85-f99wk.istio-system "10.128.69.0:48816", Clusters: 11, Services 3

2019-02-16T06:27:14.141574Z info ads EDS: PUSH for sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local-29 clusters 9 endpoints 9 empty 0

2019-02-16T06:27:14.142920Z info Uses TLS multiplexing for helloworld.istio-system.svc.cluster.local {http 5000 HTTP}

2019-02-16T06:27:14.149656Z info ads LDS: PUSH for node:helloworld-v1-8f8dd85-f99wk.istio-system addr:"10.128.69.0:48816" listeners:8 10981

2019-02-16T06:27:14.163397Z info ads ADS: RDS: PUSH for node: helloworld-v1-8f8dd85-f99wk.istio-system addr:10.128.69.0:48816 routes:5 ver:2019-02-15T05:53:49Z/8

envoy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2019-02-16T06:27:13.308594Z	info	Version root@6f6ea1061f2b-docker.io/istio-1.0.5-c1707e45e71c75d74bf3a5dec8c7086f32f32fad-Clean

2019-02-16T06:27:13.308678Z info Proxy role: model.Proxy{ClusterID:"", Type:"sidecar", IPAddress:"10.128.69.4", ID:"helloworld-v1-8f8dd85-f99wk.istio-system", Domain:"istio-system.svc.cluster.local", Metadata:map[string]string(nil)}

2019-02-16T06:27:13.309241Z info Effective config: binaryPath: /usr/local/bin/envoy
configPath: /etc/istio/proxy
connectTimeout: 10s
discoveryAddress: istio-pilot.istio-system:15007
discoveryRefreshDelay: 1s
drainDuration: 45s
parentShutdownDuration: 60s
proxyAdminPort: 15000
serviceCluster: helloworld
zipkinAddress: zipkin.istio-system:9411

2019-02-16T06:27:13.309274Z info Monitored certs: []envoy.CertSource{envoy.CertSource{Directory:"/etc/certs/", Files:[]string{"cert-chain.pem", "key.pem", "root-cert.pem"}}}

2019-02-16T06:27:13.309457Z info Starting proxy agent
2019-02-16T06:27:13.309573Z info Received new config, resetting budget
2019-02-16T06:27:13.310745Z info Reconciling configuration (budget 10)
2019-02-16T06:27:13.310805Z info Epoch 0 starting
2019-02-16T06:27:13.311883Z info Envoy command: [-c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60 --service-cluster helloworld --service-node sidecar~10.128.69.4~helloworld-v1-8f8dd85-f99wk.istio-system~istio-system.svc.cluster.local --max-obj-name-len 189 --allow-unknown-fields -l warn --v2-config-only]

// ...

[2019-02-16 06:27:13.347][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:130] cm init: initializing cds
[2019-02-16 06:27:14.133][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|15010||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.134][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|15011||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.134][24][info][upstream]
external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|8080||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.135][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|9093||istio-pilot.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.136][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|8060||istio-citadel.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.137][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|9093||istio-citadel.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.138][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000||helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.138][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000|v1|helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.139][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster outbound|5000|v2|helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.140][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster inbound|5000||helloworld.istio-system.svc.cluster.local during init

[2019-02-16 06:27:14.141][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:494] add/update cluster BlackHoleCluster during init
[2019-02-16 06:27:14.141][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:111] cm init: initializing secondary clusters

[2019-02-16 06:27:14.142][24][info][upstream] external/envoy/source/common/upstream/cluster_manager_impl.cc:134] cm init: all clusters initialized
[2019-02-16 06:27:14.142][24][info][main] external/envoy/source/server/server.cc:401] all clusters initialized. initializing init manager
[2019-02-16 06:27:14.155][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '10.128.69.4_5000'
[2019-02-16 06:27:14.156][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '10.0.79.108_15011'
[2019-02-16 06:27:14.157][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_8080'
[2019-02-16 06:27:14.158][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_9093'
[2019-02-16 06:27:14.159][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_8060'
[2019-02-16 06:27:14.160][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_5000'
[2019-02-16 06:27:14.160][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener '0.0.0.0_15010'
[2019-02-16 06:27:14.161][24][info][upstream] external/envoy/source/server/lds_api.cc:80] lds: add/update listener 'virtual'
[2019-02-16 06:27:14.165][24][info][config] external/envoy/source/server/listener_manager_impl.cc:908] all dependencies initialized. starting workers

configScope

Istio 1.1 代码中实现,参见 PR 10278,早期的版本实现是通过注解的方式来进行的,打在 service 上,注解如下:

1
2
3
4
5
> // ServiceConfigScopeAnnotation configs the scope the service visible to.
> // "PUBLIC" which is the default, indicates it is reachable within the mesh
> // "PRIVATE" indicates it is reachable within its namespace
> ServiceConfigScopeAnnotation = "networking.istio.io/configScope"
>

ServiceEntry,VirtualService,Gateway, DestinationRule等都可以通过spec.configScope设置作用范围。ConfigScope 可以设置为”PUBLIC”,”PRIVATE”类型:

  • “PUBLIC” 表示规则对网格内所有的工作负载可见,这也是默认值。

  • “PRIVATE” 表示规则仅对同一namespace下面的工作负载可见。

样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: virtual-service-scope-private
spec:
configScope: PRIVATE
hosts:
- "bookinfo.com"
http:
- route:
- destination:
host: "bookinfo.com"
headers:
request:
add:
scope: private

参考

  1. debug-istio

除特别声明本站文章均属原创(翻译内容除外),如需要转载请事先联系,转载需要注明作者原文链接地址。


CATALOG
  1. 1. 架构
  2. 2. 介绍
  3. 3. 代码分析
    1. 3.1. ConfigController
    2. 3.2. ServiceController
    3. 3.3. ServiceDiscovery
      1. 3.3.1. core.Node
    4. 3.4. PushContext 初始化
    5. 3.5. CDS 初始化流程详解
      1. 3.5.1. BuildClusters
      2. 3.5.2. GetProxyServiceInstances
      3. 3.5.3. buildOutboundClusters
      4. 3.5.4. buildInboundClusters
    6. 3.6. LDS 初始化流程详解
    7. 3.7. EDS 初始化流程详解
    8. 3.8. RDS 初始化流程详解
    9. 3.9. Envoy 重启的 Pilot 连接日志
    10. 3.10. configScope
  4. 4. 参考