Available Commands: discovery Start Istio proxy discovery service help Help about any command request Makes an HTTP request to Pilot metrics/debug endpoint version Prints out build version information
# kubectl exec -ti istio-pilot-f9d78b7b9-fmhfb -n istio-system -c discovery -- /usr/local/bin/pilot-discovery discovery --help Defaulting container name to discovery.
Start Istio proxy discovery service
Usage: pilot-discovery discovery [flags]
Flags: -a, --appNamespace string Restrict the applications namespace the controller manages; if not set, controller watches all namespaces --cfConfig string Cloud Foundry config file --clusterRegistriesConfigMap string ConfigMap map for clusters config store --clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs --configDir string Directory to watch for updates to config yaml files. If specified, the files will be used as the source of config, rather than a CRD client. --consulserverInterval duration Interval (in seconds) for polling the Consul service registry (default 2s) --consulserverURL string URL for the Consul server --disable-install-crds Disable discovery service from verifying the existence of CRDs at startup and then installing if not detected. It is recommended to be disablefor highly available setups. --discovery_cache Enable caching discovery service responses (default true) --domain string DNS domain suffix (default "cluster.local") --grpcAddr string Discovery service grpc address (default ":15010") -h, --helphelpfor discovery --httpAddr string Discovery service HTTP address (default ":8080") --kubeconfig string Use a Kubernetes configuration file instead of in-cluster configuration --meshConfig string File name for Istio mesh configuration. If not specified, a default mesh will be used. (default "/etc/istio/config/mesh") --monitoringAddr string HTTP address to use for the exposing pilot self-monitoring information (default ":9093") -n, --namespace string Select a namespace where the controller resides. If not set, uses ${POD_NAMESPACE} environment variable --plugins stringSlice comma separated list of networking plugins to enable (default [authn,authz,health,mixer,envoyfilter]) --profile Enable profiling via web interface host:port/debug/pprof (default true) --registries stringSlice Comma separated list of platform service registries to read from (choose one or more from {Kubernetes, Consul, CloudFoundry, Mock, Config}) (default [Kubernetes]) --resync duration Controller resync interval (default 1m0s) --secureGrpcAddr string Discovery service grpc address, with https (default ":15012") --webhookEndpoint string Webhook API endpoint (supports http://sockethost, and unix:///absolute/path/to/socket
// Create the stop channel for all of the servers. stop := make(chanstruct{})
// Create the server for the discovery service. discoveryServer, err := bootstrap.NewServer(serverArgs) if err != nil { return fmt.Errorf("failed to create discovery service: %v", err) }
// Start the server if err := discoveryServer.Start(stop); err != nil { return fmt.Errorf("failed to start discovery service: %v", err) }
// NewServer creates a new Server instance based on the provided arguments. funcNewServer(args PilotArgs)(*Server, error) { // ... s := &Server{ filewatcher: filewatcher.NewWatcher(), } // 省略错误处理 s.initKubeClient(&args) // 初始化到 k8s 集群的客户端 s.kubeClient s.initMesh(&args) // 初始化配置,并添加到 filewatcher 中, /etc/istio/config/mesh // 1.0.5 版本的 cmd 中未包括,应该是 1.1 中添加 // serverArgs.NetworksConfigFile, "networksConfig", "/etc/istio/config/meshNetworks" // 初始化配置,并加入到 filewatcher 中监听 s.initMeshNetworks(&args) // initMixerSan configures the mixerSAN configuration item. // The mesh must already have been configured. s.initMixerSan(&args) // creates the config controller in the pilotConfig. // 最终会创建一个 crd.NewController 实例 s.initConfigController(&args) /* 内部以 kube 为例,表明主要流程 controller, err := s.makeKubeConfigController(args) s.configController = controller s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) // 1. 第一个启动的 configController }) */ // creates and initializes the service controllers s.initServiceControllers(&args) /* s.createK8sServiceControllers(serviceControllers, args) --> kube.NewController(s.kubeClient, args.Config.ControllerOptions) s.addStartFunc(func(stop <-chan struct{}) error { go s.ServiceController.Run(stop) // 2. 第二个启动的 ServiceController return nil }) */ // 初始化 DiscoveryService gRPC 服务器,后面详细讲解 // 添加2个 func 到 startFuncs 中 s.initDiscoveryService(&args) // initializes the configuration for the pilot monitoring server. // 添加1个 func 到 startFuncs 中 s.initMonitor(&args) // starts the secret controller to watch for remote // clusters and initialize the multicluster structures. // 主要指定了,连接到远程集群的配置和需要监控的 namespace,并启动一个 secret controller // 监视 istio/multiCluster=true 的 secret // --clusterRegistriesConfigMap ConfigMap map for clusters config store // --clusterRegistriesNamespace string Namespace for ConfigMap which stores clusters configs // 暂时不分析 s.initClusterRegistries(&args)
return s, nil }
// 将 NewServer 函数中初始化的函数依次启动起来 func(s *Server)Start(stop <-chanstruct{})error { // Now start all of the components. for _, fn := range s.startFuncs { fn(stop) } }
istio.io/api/mesh/v1alpha1/network.pb.go
其中 MeshNetworks 结构体和定义说明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// MeshNetworks (config map) provides information about the set of networks // inside a mesh and how to route to endpoints in each network. For example // // MeshNetworks(file/config map): // networks: // - network1: // - endpoints: // - fromRegistry: registry1 #must match secret name in kubernetes // - fromCidr: 192.168.100.0/22 #a VM network for example // gateways: // - registryServiceName: istio-ingressgateway.istio-system.svc.cluster.local // port: 15443 // locality: us-east-1a type MeshNetworks struct { // REQUIRED: The set of networks inside this mesh. Each network should // have a unique name and information about how to infer the endpoints in // the network as well as the gateways associated with the network. Networks map[string]*Network `protobuf:"bytes,1,rep,name=networks" json:"networks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` }
// 初始化相关配置,并监视配置的变化情况 // ... // creates the config controller in the pilotConfig. // 最终会创建一个 crd.NewController 实例 s.initConfigController(&args) // s.makeKubeConfigController(args) // s.configController.Run(stop)
// creates and initializes the service controllers s.initServiceControllers(&args) // kube.NewController(s.kubeClient, args.Config.ControllerOptions) // go s.ServiceController.Run(stop) // 初始化 DiscoveryService gRPC 服务器,后面详细讲解 // 添加 2 个 func 到 startFuncs 中 s.initDiscoveryService(&args)
return s, nil }
ConfigController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// initConfigController creates the config controller in the pilotConfig. func(s *Server)initConfigController(args *PilotArgs)error { // k8s 方式下初始化 controller, err := s.makeKubeConfigController(args) s.configController = controller
// Defer starting the controller until after the service is created. s.addStartFunc(func(stop <-chanstruct{})error { go s.configController.Run(stop) returnnil }) //...
// Create the config store. s.istioConfigStore = model.MakeIstioStore(s.configController)
// NewController creates a new Kubernetes controller for CRDs // Use "" for namespace to listen for all namespace changes funcNewController(client *Client, options kube.ControllerOptions)model.ConfigStoreCache { log.Infof("CRD controller watching namespaces %q", options.WatchedNamespace)
// Queue requires a time duration for a retry delay after a handler error out := &controller{ client: client, queue: kube.NewQueue(1 * time.Second), kinds: make(map[string]cacheHandler), }
// add stores for CRD kinds for _, schema := range client.ConfigDescriptor() { out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod) }
return out }
对于 CRD 的监视,每一类资源需要启动一个单独的客户端连接,目前 CRD 的 Group 主要有 network/config/authentication/rbac 等;
1 2 3 4 5 6 7 8 9 10 11 12
// controller is a collection of synchronized resource watchers. // Caches are thread-safe type controller struct { client *Client // 每类资源一个连接 queue kube.Queue kinds map[string]cacheHandler }
type cacheHandler struct { informer cache.SharedIndexInformer handler *kube.ChainHandler }
1 2 3 4 5 6 7 8
// Client is a basic REST client for CRDs implementing config store type Client struct { // Map of apiVersion to restClient. clientset map[string]*restClient
// domainSuffix for the config metadata domainSuffix string }
// NewController creates a new Kubernetes controller // Created by bootstrap and multicluster (see secretcontroler). funcNewController(client kubernetes.Interface, options ControllerOptions) *Controller { log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s", options.WatchedNamespace, options.ResyncPeriod)
// Queue requires a time duration for a retry delay after a handler error out := &Controller{ domainSuffix: options.DomainSuffix, client: client, queue: NewQueue(1 * time.Second), ClusterID: options.ClusterID, XDSUpdater: options.XDSUpdater, servicesMap: make(map[model.Hostname]*model.Service), externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance), }
// See https://github.com/lyft/envoy-api#apis for a description of the role of // ADS and how it is intended to be used by a management server. ADS requests // have the same structure as their singleton xDS counterparts, but can // multiplex many resource types on a single stream. The type_url in the // DiscoveryRequest/DiscoveryResponse provides sufficient information to recover // the multiplexed singleton APIs at the Envoy instance and management server. service AggregatedDiscoveryService { // This is a gRPC-only API. rpc StreamAggregatedResources(stream envoy.api.v2.DiscoveryRequest) returns (stream envoy.api.v2.DiscoveryResponse) { }
// A DiscoveryRequest requests a set of versioned resources of the same type for // a given Envoy node on some API. message DiscoveryRequest { // The version_info provided in the request messages will be the version_info // received with the most recent successfully processed response or empty on // the first request. It is expected that no new request is sent after a // response is received until the Envoy instance is ready to ACK/NACK the new // configuration. ACK/NACK takes place by returning the new API config version // as applied or the previous API config version respectively. Each type_url // (see below) has an independent version associated with it. string version_info = 1;
// The node making the request. core.Node node = 2;
// List of resources to subscribe to, e.g. list of cluster names or a route // configuration name. If this is empty, all resources for the API are // returned. LDS/CDS expect empty resource_names, since this is global // discovery for the Envoy instance. The LDS and CDS responses will then imply // a number of resources that need to be fetched via EDS/RDS, which will be // explicitly enumerated in resource_names. repeated string resource_names = 3;
// Type of the resource that is being requested, e.g. // "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment". This is implicit // in requests made via singleton xDS APIs such as CDS, LDS, etc. but is // required for ADS. string type_url = 4;
// nonce corresponding to DiscoveryResponse being ACK/NACKed. See above // discussion on version_info and the DiscoveryResponse nonce comment. This // may be empty if no nonce is available, e.g. at startup or for non-stream // xDS implementations. string response_nonce = 5;
// This is populated when the previous :ref:`DiscoveryResponse <envoy_api_msg_DiscoveryResponse>` // failed to update configuration. The *message* field in *error_details* provides the Envoy // internal exception related to the failure. It is only intended for consumption during manual // debugging, the string provided is not guaranteed to be stable across Envoy versions. google.rpc.Status error_detail = 6; }
message DiscoveryResponse { // The version of the response data. string version_info = 1;
// The response resources. These resources are typed and depend on the API being called. repeated google.protobuf.Any resources = 2 [(gogoproto.nullable) = false];
// [#not-implemented-hide:] // Canary is used to support two Envoy command line flags: // // * --terminate-on-canary-transition-failure. When set, Envoy is able to // terminate if it detects that configuration is stuck at canary. Consider // this example sequence of updates: // - Management server applies a canary config successfully. // - Management server rolls back to a production config. // - Envoy rejects the new production config. // Since there is no sensible way to continue receiving configuration // updates, Envoy will then terminate and apply production config from a // clean slate. // * --dry-run-canary. When set, a canary response will never be applied, only // validated via a dry run. bool canary = 3;
// Type URL for resources. This must be consistent with the type_url in the // Any messages for resources if resources is non-empty. This effectively // identifies the xDS API when muxing over ADS. string type_url = 4;
// For gRPC based subscriptions, the nonce provides a way to explicitly ack a // specific DiscoveryResponse in a following DiscoveryRequest. Additional // messages may have been sent by Envoy to the management server for the // previous version on the stream prior to this DiscoveryResponse, that were // unprocessed at response send time. The nonce allows the management server // to ignore any further DiscoveryRequests for the previous version until a // DiscoveryRequest bearing the nonce. The nonce is optional and is not // required for non-stream based xDS implementations. string nonce = 5;
// [#not-implemented-hide:] // The control plane instance that sent the response. core.ControlPlane control_plane = 6; }
for { // Block until either a request is received or a push is triggered. select { case discReq, ok := <-reqChannel: err = s.initConnectionNode(discReq, con)
switch discReq.TypeUrl { case ClusterType: // ... // CDS REQ is the first request an envoy makes. This shows up // immediately after connect. It is followed by EDS REQ as // soon as the CDS push is returned. adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String()) con.CDSWatch = true err := s.pushCds(con, s.globalPushContext(), versionInfo()) case ListenerType: // ... adsLog.Debugf("ADS:LDS: REQ %s %v", con.ConID, peerAddr) con.LDSWatch = true err := s.pushLds(con, s.globalPushContext(), true, versionInfo())
// PushContext tracks the status of a push - metrics and errors. // Metrics are reset after a push - at the beginning all // values are zero, and when push completes the status is reset. // The struct is exposed in a debug endpoint - fields public to allow // easy serialization as json. type PushContext struct {
// privateServices are reachable within the same namespace. privateServicesByNamespace map[string][]*Service // publicServices are services reachable within the mesh. publicServices []*Service
// destination rules are of three types: // namespaceLocalDestRules: all public/private dest rules pertaining to a service defined in a given namespace // namespaceExportedDestRules: all public dest rules pertaining to a service defined in a namespace // allExportedDestRules: all (public) dest rules across all namespaces // We need the allExportedDestRules in addition to namespaceExportedDestRules because we select // the dest rule based on the most specific host match, and not just any destination rule namespaceLocalDestRules map[string]*processedDestRules namespaceExportedDestRules map[string]*processedDestRules allExportedDestRules *processedDestRules
// sidecars for each namespace sidecarsByNamespace map[string][]*SidecarScope ////////// END ////////
// The following data is either a global index or used in the inbound path. // Namespace specific views do not apply here.
// ServiceByHostname has all services, indexed by hostname. ServiceByHostname map[Hostname]*Service `json:"-"`
// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there // are no authorization policies in the cluster. AuthzPolicies *AuthorizationPolicies `json:"-"`
// ServicePort2Name is used to keep track of service name and port mapping. // This is needed because ADS names use port numbers, while endpoints use // port names. The key is the service name. If a service or port are not found, // the endpoint needs to be re-evaluated later (eventual consistency) ServicePort2Name map[string]PortList `json:"-"`
initDone bool }
// InitContext will initialize the data structures used for code generation. // This should be called before starting the push, from the thread creating // the push context. func(ps *PushContext)InitContext(env *Environment)error { ps.Mutex.Lock() defer ps.Mutex.Unlock() if ps.initDone { returnnil } ps.Env = env var err error
// Caches list of services in the registry, and creates a map // of hostname to service -> ServicePort2Name map[string]PortList `json:"-"` ps.initServiceRegistry(env)
// Caches list of virtual services -> publicVirtualServices []Config ps.initVirtualServices(env)
// Split out of DestinationRule expensive conversions - once per push. // 最后保存到以下三个变量中: // * namespaceLocalDestRules map[string]*processedDestRules // * namespaceExportedDestRules map[string]*processedDestRules // * allExportedDestRules *processedDestRules ps.initDestinationRules(env)
// Get the ClusterRbacConfig -> AuthzPolicies *AuthorizationPolicies ps.initAuthorizationPolicies(env)
// Must be initialized in the end -> sidecarsByNamespace map[string][]*SidecarScope ps.InitSidecarScopes(env)
for { // Block until either a request is received or a push is triggered. select { case discReq, ok := <-reqChannel: // 主要是调用 `ParseServiceNodeWithMetadata` 函数, // 从 Req 的消息中获取到各种信息,并生产 `Proxy` 对象 // 当前 discReq.Node.Id 格式为 Type~IPAddress~ID~Domain err = s.initConnectionNode(discReq, con) switch discReq.TypeUrl { case ClusterType: // 如果已经发送过 CDS 数据后的响应消息的处理 if con.CDSWatch { // ... } // CDS REQ is the first request an envoy makes. This shows up // immediately after connect. It is followed by EDS REQ as // soon as the CDS push is returned. adsLog.Infof("ADS:CDS: REQ %v %s %v raw: %s", peerAddr, con.ConID, time.Since(t0), discReq.String()) con.CDSWatch = true err := s.pushCds(con, s.globalPushContext(), versionInfo()) if err != nil { return err } //... }
Identifies a specific Envoy instance. The node identifier is presented to the management server, which may use this identifier to distinguish per Envoy configuration for serving.{ “id”: “…”, “cluster”: “…”, “metadata”: “{…}”, “locality”: “{…}”, “build_version”: “…” }
id (string) An opaque node identifier for the Envoy node. This also provides the local service node name. It should be set if any of the following features are used: statsd, CDS, and HTTP tracing, either in this message or via –service-node.
cluster (string) Defines the local service cluster name where Envoy is running. Though optional, it should be set if any of the following features are used: statsd, health check cluster verification, runtime override directory, user agent addition, HTTP global rate limiting, CDS, and HTTP tracing, either in this message or via –service-cluster.
metadata (Struct) Opaque metadata extending the node identifier. Envoy will pass this directly to the management server.
locality (core.Locality) Locality specifying where the Envoy instance is running.
build_version (string) This is motivated by informing a management server during canary which version of Envoy is being tested in a heterogeneous fleet. This will be set by Envoy in management server RPCs.
// Proxy contains information about an specific instance of a proxy (envoy sidecar, gateway, // etc). The Proxy is initialized when a sidecar connects to Pilot, and populated from // 'node' info in the protocol as well as data extracted from registries. // // In current Istio implementation nodes use a 4-parts '~' delimited ID. // Type~IPAddress~ID~Domain type Proxy struct { // ClusterID specifies the cluster where the proxy resides. // TODO: clarify if this is needed in the new 'network' model, likely needs to // be renamed to 'network' ClusterID string
// Type specifies the node type. First part of the ID. Type NodeType
// IPAddresses is the IP addresses of the proxy used to identify it and its // co-located service instances. Example: "10.60.1.6". In some cases, the host // where the poxy and service instances reside may have more than one IP address IPAddresses []string
// ID is the unique platform-specific sidecar proxy ID. For k8s it is the pod ID and // namespace. ID string
// Locality is the location of where Envoy proxy runs. Locality Locality
// DNSDomain defines the DNS domain suffix for short hostnames (e.g. // "default.svc.cluster.local") DNSDomain string
// TrustDomain defines the trust domain of the certificate TrustDomain string
// ConfigNamespace defines the namespace where this proxy resides // for the purposes of network scoping. // NOTE: DO NOT USE THIS FIELD TO CONSTRUCT DNS NAMES ConfigNamespace string
// Metadata key-value pairs extending the Node identifier Metadata map[string]string
// the sidecarScope associated with the proxy SidecarScope *SidecarScope }
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output // For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname // Cluster type based on resolution // For inbound (sidecar only): Cluster for each inbound endpoint port and for each service port func(configgen *ConfigGeneratorImpl)BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext)([]*apiv2.Cluster, error) { clusters := make([]*apiv2.Cluster, 0)
switch proxy.Type { case model.SidecarProxy: // GetProxyServiceInstances returns service instances co-located with the proxy // 获取与 proxy 所在主机上的 service instance,包括 headless 服务 instances, err := env.GetProxyServiceInstances(proxy)
// Let ServiceDiscovery decide which IP and Port are used for management if // there are multiple IPs managementPorts := make([]*model.Port, 0) for _, ip := range proxy.IPAddresses { managementPorts = append(managementPorts, env.ManagementPorts(ip)...) } // 追加与 proxy ip 上 managementPorts 相关的 InboundClusters clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...)
default: // Gateways // ... }
// Add a blackhole and passthrough cluster for catching traffic to unresolved routes // DO NOT CALL PLUGINS for these two clusters. clusters = append(clusters, buildBlackHoleCluster()) clusters = append(clusters, buildDefaultPassthroughCluster())
临时的调试方法,挂载到了 9093 端口,但是是否保存 ads 相关的信息,还会受到 pilot 相关选项的限制,参见
istio.io/istio/pkg/features/pilot/pilot.go
1 2 3 4 5
> // DebugConfigs controls saving snapshots of configs for /debug/adsz. > // Defaults to false, can be enabled with PILOT_DEBUG_ADSZ_CONFIG=1 > // For larger clusters it can increase memory use and GC - useful for small tests. > DebugConfigs = os.Getenv("PILOT_DEBUG_ADSZ_CONFIG") == "1" >
// Environment provides an aggregate environmental API for Pilot type Environment struct { // Discovery interface for listing services and instances. ServiceDiscovery ServiceAccounts IstioConfigStore
// BuildClusters returns the list of clusters for the given proxy. This is the CDS output // For outbound: Cluster for each service/subset hostname or cidr with SNI set to service hostname // Cluster type based on resolution: For inbound (sidecar only): // Cluster for each inbound endpoint port and for each service port func(configgen *ConfigGeneratorImpl)BuildClusters(env *model.Environment, proxy *model.Proxy, push *model.PushContext)([]*apiv2.Cluster, error) { clusters := make([]*apiv2.Cluster, 0)
switch proxy.Type { case model.SidecarProxy: // 1. 获取 Proxy 所在 Pod 上的监听的服务名,在 k8s 中是通过 proxyIP 进行相关的 Pod 查找 // 主要用于生成 Inbound 相关的集群 // GetProxyServiceInstances returns the service instances that // co-located(in the same network namespace and security context) // with a given Proxy instances, err := env.GetProxyServiceInstances(proxy)
// Let ServiceDiscovery decide which IP and Port are used for management if // there are multiple IPs // managementPorts 主要是从 Pod 中获取 Liveness && Readiness probes 的端口 managementPorts := make([]*model.Port, 0) for _, ip := range proxy.IPAddresses { managementPorts = append(managementPorts, env.ManagementPorts(ip)...) } // clusters = append(clusters, configgen.buildInboundClusters(env, proxy, push, instances, managementPorts)...) // ... // Add a blackhole and passthrough cluster for catching traffic to unresolved routes // DO NOT CALL PLUGINS for these two clusters. // 添加 blackhole 和 passthrough cluster clusters = append(clusters, buildBlackHoleCluster()) clusters = append(clusters, buildDefaultPassthroughCluster()) }
// GetProxyServiceInstances returns service instances co-located with a given proxy func(c *Controller)GetProxyServiceInstances(proxy *model.Proxy)([]*model.ServiceInstance, error) { out := make([]*model.ServiceInstance, 0)
// There is only one IP for kube registry proxyIP := proxy.IPAddresses[0] proxyNamespace := ""
pod := c.pods.getPodByIP(proxyIP) if pod != nil { proxyNamespace = pod.Namespace // 1. find proxy service by label selector, // if not any, there may exist headless service // failover to 2 svcLister := listerv1.NewServiceLister(c.services.informer.GetIndexer()) if services, err := svcLister.GetPodServices(pod); err != nil && len(services) > 0 { for _, svc := range services { item, exists, err := c.endpoints.informer.GetStore().GetByKey(KeyFunc(svc.Namespace, svc.Name))
ep := *item.(*v1.Endpoints) out = append(out, c.getProxyServiceInstancesByEndpoint(ep, proxy)...) } return out, nil } }
// 2. Headless service endpointsForPodInSameNS := make([]*model.ServiceInstance, 0) endpointsForPodInDifferentNS := make([]*model.ServiceInstance, 0) for _, item := range c.endpoints.informer.GetStore().List() { ep := *item.(*v1.Endpoints) endpoints := &endpointsForPodInSameNS if ep.Namespace != proxyNamespace { endpoints = &endpointsForPodInDifferentNS }
// Put the endpointsForPodInSameNS in front of endpointsForPodInDifferentNS so that Pilot will // first use endpoints from endpointsForPodInSameNS. This makes sure if there are two endpoints // referring to the same IP/port, the one in endpointsForPodInSameNS will be used. (The other one // in endpointsForPodInDifferentNS will thus be rejected by Pilot). out = append(endpointsForPodInSameNS, endpointsForPodInDifferentNS...) iflen(out) == 0 { if c.Env != nil { c.Env.PushContext.Add(model.ProxyStatusNoService, proxy.ID, proxy, "") status := c.Env.PushContext } else {} } return out, nil }
buildOutboundClusters
查找出 proxy 可见的 Service 和 公开的 Service,设置成对应的 Cluster。
// push.Services(proxy) 会返回对于Proxy 可见的 PrivateService 和 PublicServices for _, service := range push.Services(proxy) { config := push.DestinationRule(proxy, service) for _, port := range service.Ports { if port.Protocol == model.ProtocolUDP { continue } inputParams.Service = service inputParams.Port = port
// ManagementPorts implements a service catalog operation // addr 为 proxy 的 IP 地址 func(c *Controller)ManagementPorts(addr string)model.PortList { pod := c.pods.getPodByIP(addr) if pod == nil { returnnil }
// convertProbesToPorts // convertProbesToPorts returns a PortList consisting of the ports where the // pod is configured to do Liveness and Readiness probes funcconvertProbesToPorts(t *v1.PodSpec)(model.PortList, error) { set := make(map[string]*model.Port) for _, container := range t.Containers { for _, probe := range []*v1.Probe{container.LivenessProbe, container.ReadinessProbe} {
p, err := convertProbePort(&container, &probe.Handler) if err != nil { errs = multierror.Append(errs, err) } elseif p != nil && set[p.Name] == nil { // Deduplicate along the way. We don't differentiate between HTTP vs TCP mgmt ports set[p.Name] = p } } }
mgmtPorts := make(model.PortList, 0, len(set)) for _, p := range set { mgmtPorts = append(mgmtPorts, p) } sort.Slice(mgmtPorts, func(i, j int)bool { return mgmtPorts[i].Port < mgmtPorts[j].Port })
// Plugin is called during the construction of a xdsapi.Listener which may alter the Listener in any // way. Examples include AuthenticationPlugin that sets up mTLS authentication on the inbound Listener // and outbound Cluster, the mixer plugin that sets up policy checks on the inbound listener, etc. type Plugin interface { // OnOutboundListener is called whenever a new outbound listener is added to the LDS output for a given service. // Can be used to add additional filters on the outbound path. OnOutboundListener(in *InputParams, mutable *MutableObjects) error
// OnInboundListener is called whenever a new listener is added to the LDS output for a given service // Can be used to add additional filters. OnInboundListener(in *InputParams, mutable *MutableObjects) error
// OnOutboundCluster is called whenever a new cluster is added to the CDS output. // This is called once per push cycle, and not for every sidecar/gateway, except for gateways with non-standard // operating modes. OnOutboundCluster(in *InputParams, cluster *xdsapi.Cluster)
// OnInboundCluster is called whenever a new cluster is added to the CDS output. // Called for each sidecar OnInboundCluster(in *InputParams, cluster *xdsapi.Cluster)
// OnOutboundRouteConfiguration is called whenever a new set of virtual hosts (a set of virtual hosts with routes) is // added to RDS in the outbound path. OnOutboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)
// OnInboundRouteConfiguration is called whenever a new set of virtual hosts are added to the inbound path. OnInboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration)
// OnInboundFilterChains is called whenever a plugin needs to setup the filter chains, including relevant filter chain // configuration, like FilterChainMatch and TLSContext. OnInboundFilterChains(in *InputParams) []FilterChain }
// When users specify circuit breakers, they need to be set on the receiver end // (server side) as well as client side, so that the server has enough capacity // (not the defaults) to handle the increased traffic volume
// DestinationRule returns a destination rule for a service name in a given domain. config := pluginParams.Push.DestinationRule(pluginParams.Node, instance.Service) if config != nil { destinationRule := config.Spec.(*networking.DestinationRule) if destinationRule.TrafficPolicy != nil { // only connection pool settings make sense on the inbound path. // upstream TLS settings/outlier detection/load balancer don't apply here. applyConnectionPool(pluginParams.Env, localCluster, destinationRule.TrafficPolicy.ConnectionPool, model.TrafficDirectionInbound) } } return localCluster }
// FIXME: there isn't a way to distinguish between unset values and zero values funcapplyConnectionPool(env *model.Environment, cluster *apiv2.Cluster, settings *networking.ConnectionPoolSettings, direction model.TrafficDirection) { threshold := GetDefaultCircuitBreakerThresholds(direction) if settings.Http != nil { if settings.Http.Http2MaxRequests > 0 { // Envoy only applies MaxRequests in HTTP/2 clusters threshold.MaxRequests = &types.UInt32Value{Value: uint32(settings.Http.Http2MaxRequests)} } if settings.Http.Http1MaxPendingRequests > 0 { // Envoy only applies MaxPendingRequests in HTTP/1.1 clusters threshold.MaxPendingRequests = &types.UInt32Value{Value: uint32(settings.Http.Http1MaxPendingRequests)} }
if settings.Http.MaxRequestsPerConnection > 0 { cluster.MaxRequestsPerConnection = &types.UInt32Value{Value: uint32(settings.Http.MaxRequestsPerConnection)} }
// FIXME: zero is a valid value if explicitly set, otherwise we want to use the default if settings.Http.MaxRetries > 0 { threshold.MaxRetries = &types.UInt32Value{Value: uint32(settings.Http.MaxRetries)} } }
if settings.Tcp != nil { if settings.Tcp.ConnectTimeout != nil { cluster.ConnectTimeout = util.GogoDurationToDuration(settings.Tcp.ConnectTimeout) }
if settings.Tcp.MaxConnections > 0 { threshold.MaxConnections = &types.UInt32Value{Value: uint32(settings.Tcp.MaxConnections)} }
// buildSidecarListeners produces a list of listeners for sidecar proxies func(configgen *ConfigGeneratorImpl)buildSidecarListeners(env *model.Environment, node *model.Proxy, push *model.PushContext)([]*xdsapi.Listener, error) {
if mesh.ProxyListenPort > 0 { // 建立 inbound 相关的 Listeners inbound := configgen.buildSidecarInboundListeners(env, node, push, proxyInstances) // outbound 相关的 Listeners // buildSidecarOutboundListeners generates http and tcp listeners for // outbound connections from the proxy based on the sidecar scope associated with the proxy. // TODO(github.com/istio/pilot/issues/237) // // Sharing tcp_proxy and http_connection_manager filters on the same port for // different destination services doesn't work with Envoy (yet). When the // tcp_proxy filter's route matching fails for the http service the connection // is closed without falling back to the http_connection_manager. // // Temporary workaround is to add a listener for each service IP that requires // TCP routing // // Connections to the ports of non-load balanced services are directed to // the connection's original destination. This avoids costly queries of instance // IPs and ports, but requires that ports of non-load balanced service be unique. outbound := configgen.buildSidecarOutboundListeners(env, node, push, proxyInstances)
// Do not generate any management port listeners if the user has specified a SidecarScope object // with ingress listeners. Specifying the ingress listener implies that the user wants // to only have those specific listeners and nothing else, in the inbound path. generateManagementListeners := true
if mesh.OutboundTrafficPolicy.Mode == meshconfig.MeshConfig_OutboundTrafficPolicy_ALLOW_ANY { // We need a passthrough filter to fill in the filter stack for orig_dst listener tcpProxy = &tcp_proxy.TcpProxy{ StatPrefix: util.PassthroughCluster, ClusterSpecifier: &tcp_proxy.TcpProxy_Cluster{Cluster: util.PassthroughCluster}, } } var transparent *google_protobuf.BoolValue if node.GetInterceptionMode() == model.InterceptionTproxy { transparent = proto.BoolTrue }
// add an extra listener that binds to the port that is the recipient of the iptables redirect listeners = append(listeners, &xdsapi.Listener{ Name: VirtualListenerName, Address: util.BuildAddress(WildcardAddress, uint32(mesh.ProxyListenPort)), Transparent: transparent, UseOriginalDst: proto.BoolTrue, FilterChains: []listener.FilterChain{ { Filters: []listener.Filter{ { Name: xdsutil.TCPProxy, ConfigType: &listener.Filter_Config{ Config: util.MessageToStruct(tcpProxy), }, }, }, }, }, }) }
httpProxyPort := mesh.ProxyHttpPort if httpProxyPort == 0 && noneMode { // make sure http proxy is enabled for 'none' interception. httpProxyPort = int32(pilot.DefaultPortHTTPProxy) } // enable HTTP PROXY port if necessary; this will add an RDS route for this port if httpProxyPort > 0 { useRemoteAddress := false traceOperation := http_conn.EGRESS listenAddress := LocalhostAddress
opts := buildListenerOpts{ env: env, proxy: node, proxyInstances: proxyInstances, bind: listenAddress, port: int(httpProxyPort), filterChainOpts: []*filterChainOpts{{ httpOpts: &httpListenerOpts{ rds: RDSHttpProxy, useRemoteAddress: useRemoteAddress, direction: traceOperation, connectionManager: &http_conn.HttpConnectionManager{ HttpProtocolOptions: &core.Http1ProtocolOptions{ AllowAbsoluteUrl: proto.BoolTrue, }, }, }, }}, bindToPort: true, skipUserFilters: true, } l := buildListener(opts) // TODO: plugins for HTTP_PROXY mode, envoyfilter needs another listener match for SIDECAR_HTTP_PROXY // there is no mixer for http_proxy mutable := &plugin.MutableObjects{ Listener: l, FilterChains: []plugin.FilterChain{{}}, } pluginParams := &plugin.InputParams{ ListenerProtocol: plugin.ListenerProtocolHTTP, ListenerCategory: networking.EnvoyFilter_ListenerMatch_SIDECAR_OUTBOUND, Env: env, Node: node, ProxyInstances: proxyInstances, Push: push, } if err := buildCompleteFilterChain(pluginParams, mutable, opts); err != nil { log.Warna("buildSidecarListeners ", err.Error()) } else { listeners = append(listeners, l) } // TODO: need inbound listeners in HTTP_PROXY case, with dedicated ingress listener. }
// EdsCluster tracks eds-related info for monitored clusters. In practice it'll include // all clusters until we support on-demand cluster loading. type EdsCluster struct { // ... LoadAssignment *xdsapi.ClusterLoadAssignment // 记录当前 cluster 被那些 proxy 使用 // EdsClients keeps track of all nodes monitoring the cluster. EdsClients map[string]*XdsConnection `json:"-"` }
type ClusterLoadAssignment struct { // Name of the cluster. This will be the :ref:`service_name // <envoy_api_field_Cluster.EdsClusterConfig.service_name>` value if specified // in the cluster :ref:`EdsClusterConfig // <envoy_api_msg_Cluster.EdsClusterConfig>`. ClusterName string // List of endpoints to load balance to. Endpoints []endpoint.LocalityLbEndpoints // Map of named endpoints that can be referenced in LocalityLbEndpoints. NamedEndpoints map[string]*endpoint.Endpoint // Load balancing policy settings. Policy *ClusterLoadAssignment_Policy }
// 配置发生变化,如果有更新则出发增量更新 case pushEv := <-con.pushChannel: // It is called when config changes. // This is not optimized yet - we should detect what changed based on event and only // push resources that need to be pushed.
// TODO: possible race condition: if a config change happens while the envoy // was getting the initial config, between LDS and RDS, the push will miss the // monitored 'routes'. Same for CDS/EDS interval. // It is very tricky to handle due to the protocol - but the periodic push recovers // from it.
// pushEds is pushing EDS updates for a single connection. Called the first time // a client connects, for incremental updates and for full periodic updates. func(s *DiscoveryServer)pushEds(push *model.PushContext, con *XdsConnection, full bool, edsUpdatedServices map[string]*EndpointShards)error { loadAssignments := []*xdsapi.ClusterLoadAssignment{}
emptyClusters := 0 endpoints := 0
// 根据 conn 连接上发送过来的 clusters name 循环拉取相关的信息 for _, clusterName := range con.Clusters { _, _, hostname, _ := model.ParseSubsetKey(clusterName) if edsUpdatedServices != nil && edsUpdatedServices[string(hostname)] == nil { // Cluster was not updated, skip recomputing. continue } // 根据 cluster_name + con.modelNode 获取到对应的 EdsCluster 对象 c := s.getEdsCluster(con.modelNode, clusterName)
l := c.LoadAssignment // 如果存在,则重新更新 Cluster 中 LoadAssignment 相关的信息 if l == nil { // fresh cluster edsClusters := map[model.Locality]*EdsCluster{ con.modelNode.Locality: c, } s.updateCluster(push, clusterName, edsClusters) l = loadAssignment(c) }
// If networks are set (by default they aren't) apply the Split Horizon // EDS filter on the endpoints // 根据 networks 进行的 eds 水平切分,则再进行一次过滤 if s.Env.MeshNetworks != nil && len(s.Env.MeshNetworks.Networks) > 0 { endpoints := EndpointsByNetworkFilter(l.Endpoints, con, s.Env) endpoints = LoadBalancingWeightNormalize(endpoints) filteredCLA := &xdsapi.ClusterLoadAssignment{ ClusterName: l.ClusterName, Endpoints: endpoints, Policy: l.Policy, } l = filteredCLA }
// BuildHTTPRoutes produces a list of routes for the proxy func(configgen *ConfigGeneratorImpl)BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeName string)(*xdsapi.RouteConfiguration, error) { proxyInstances, err := env.GetProxyServiceInstances(node)
// buildSidecarOutboundHTTPRouteConfig builds an outbound HTTP Route for sidecar. // Based on port, will determine all virtual hosts that listen on the port. func(configgen *ConfigGeneratorImpl)buildSidecarOutboundHTTPRouteConfig(env *model.Environment, node *model.Proxy, push *model.PushContext, proxyInstances []*model.ServiceInstance, routeName string) *xdsapi.RouteConfiguration {
var virtualServices []model.Config var services []*model.Service
// Get the list of services that correspond to this egressListener from the sidecarScope sidecarScope := node.SidecarScope // sidecarScope should never be nil if sidecarScope != nil && sidecarScope.Config != nil { // egress 相关的 } else { // 从默认的 meshGateway 中获取 meshGateway := map[string]bool{model.IstioMeshGateway: true} services = push.Services(node) virtualServices = push.VirtualServices(node, meshGateway) }
nameToServiceMap := make(map[model.Hostname]*model.Service) for _, svc := range services { if listenerPort == 0 { nameToServiceMap[svc.Hostname] = svc } else { if svcPort, exists := svc.Ports.GetByPort(listenerPort); exists {
// Collect all proxy labels for source match var proxyLabels model.LabelsCollection for _, w := range proxyInstances { proxyLabels = append(proxyLabels, w.Labels) }
// Get list of virtual services bound to the mesh gateway virtualHostWrappers := istio_route.BuildSidecarVirtualHostsFromConfigAndRegistry(node, push, nameToServiceMap, proxyLabels, virtualServices, listenerPort) vHostPortMap := make(map[int][]route.VirtualHost)
for _, virtualHostWrapper := range virtualHostWrappers { virtualHosts := make([]route.VirtualHost, 0, len(virtualHostWrapper.VirtualServiceHosts)+len(virtualHostWrapper.Services)) for _, host := range virtualHostWrapper.VirtualServiceHosts { virtualHosts = append(virtualHosts, route.VirtualHost{ Name: fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port), Domains: []string{host, fmt.Sprintf("%s:%d", host, virtualHostWrapper.Port)}, Routes: virtualHostWrapper.Routes, }) }
Istio 1.1 代码中实现,参见 PR 10278,早期的版本实现是通过注解的方式来进行的,打在 service 上,注解如下:
1 2 3 4 5
> // ServiceConfigScopeAnnotation configs the scope the service visible to. > // "PUBLIC" which is the default, indicates it is reachable within the mesh > // "PRIVATE" indicates it is reachable within its namespace > ServiceConfigScopeAnnotation = "networking.istio.io/configScope" >