kubenetes源码分析之DNS(三)

    xiaoxiao2021-04-16  31

    之前做了一些基础知识,下面开始kubedns源码阅读,这个字项目的结构和kubernetes的代码结构是一样的:首先看cmd/kube-dns/dns.go,他是项目起点:

    func main() { config := options.NewKubeDNSConfig() config.AddFlags(pflag.CommandLine) flag.InitFlags() goflag.CommandLine.Parse([]string{}) logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() glog.V(0).Infof("version: %+v", version.Get()) server := app.NewKubeDNSServerDefault(config) server.Run() }

    首先是解析config参数,然后启动server。参数有哪些呢?

    func NewKubeDNSConfig() *KubeDNSConfig { return &KubeDNSConfig{ ClusterDomain: "cluster.local.", HealthzPort: 8081, DNSBindAddress: "0.0.0.0", DNSPort: 53, InitialSyncTimeout: 60 * time.Second, Federations: make(map[string]string), ConfigMapNs: api.NamespaceSystem, ConfigMap: "", // default to using command line flags ConfigPeriod: 10 * time.Second, ConfigDir: "", NameServers: "", } }

    上面先是通过options.NewKubeDNSConfig获取kubedns默认启动参数,包括绑定地址,监听端口,默认域名和监控检查端口等信息,代码如下:

    func NewKubeDNSConfig() *KubeDNSConfig { return &KubeDNSConfig{ ClusterDomain: "cluster.local.", HealthzPort: 8081, DNSBindAddress: "0.0.0.0", DNSPort: 53, InitialSyncTimeout: 60 * time.Second, Federations: make(map[string]string), ConfigMapNs: api.NamespaceSystem, ConfigMap: "", // default to using command line flags ConfigPeriod: 10 * time.Second, ConfigDir: "", NameServers: "", } }

    然后再通过AddFlags制定用户自己的参数,当然会覆盖上面的默认参数,代码如下:

    func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) { fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain", "domain under which to create names") fs.StringVar(&s.NameServers, "nameservers", s.NameServers, "List of ip:port, separated by commas of nameservers to forward queries to. "+ "If set, overrides upstream servers taken from the nameserver option in /etc/resolv.conf. "+ "Example: 8.8.8.8:53,8.8.4.4 (default port is 53)") fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile, "Location of kubecfg file for access to kubernetes master service;"+ " --kube-master-url overrides the URL part of this; if this is not"+ " provided, defaults to service account tokens") fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url", "URL to reach kubernetes master. Env variables in this flag will be expanded.") fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "port on which to serve a kube-dns HTTP readiness probe.") fs.StringVar(&s.DNSBindAddress, "dns-bind-address", s.DNSBindAddress, "address on which to serve DNS requests.") fs.IntVar(&s.DNSPort, "dns-port", s.DNSPort, "port on which to serve DNS requests.") fs.Var(federationsVar{s.Federations}, "federations", "a comma separated list of the federation names and their corresponding"+ " domain names to which this cluster belongs. Example:"+ " \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"."+ " It is an error to set both the federations and config-map or config-dir flags.") fs.MarkDeprecated("federations", "use config-dir instead. Will be removed in future version") fs.StringVar(&s.ConfigMapNs, "config-map-namespace", s.ConfigMapNs, "namespace for the config-map") fs.StringVar(&s.ConfigMap, "config-map", s.ConfigMap, "config-map name. If empty, then the config-map will not used. Cannot be "+ "used in conjunction with federations or config-dir flag. config-map contains "+ "dynamically adjustable configuration.") fs.DurationVar(&s.InitialSyncTimeout, "initial-sync-timeout", s.InitialSyncTimeout, "Timeout for initial resource sync.") fs.StringVar(&s.ConfigDir, "config-dir", s.ConfigDir, "directory to read config values from. Cannot be "+ "used in conjunction with federations or config-map flag.") fs.DurationVar(&s.ConfigPeriod, "config-period", s.ConfigPeriod, "period at which to check for updates in config-dir.") }

    这里就把参数全部收集完了,下面举一个kubernetes提供的一个启动参数 - –dns-port=10053 - –config-dir=/kube-dns-config - –v=2 当准备好所有参数的时候就可以创建服务了NewKubeDNSServerDefault,代码如下:

    func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { kubeClient, err := newKubeClient(config) if err != nil { glog.Fatalf("Failed to create a kubernetes client: %v", err) } var configSync dnsconfig.Sync switch { case config.ConfigMap != "" && config.ConfigDir != "": glog.Fatal("Cannot use both ConfigMap and ConfigDir") case config.ConfigMap != "": glog.V(0).Infof("Using configuration read from ConfigMap: %v:%v", config.ConfigMapNs, config.ConfigMap) configSync = dnsconfig.NewConfigMapSync(kubeClient, config.ConfigMapNs, config.ConfigMap) case config.ConfigDir != "": glog.V(0).Infof("Using configuration read from directory: %v with period %v", config.ConfigDir, config.ConfigPeriod) configSync = dnsconfig.NewFileSync(config.ConfigDir, config.ConfigPeriod) default: glog.V(0).Infof("ConfigMap and ConfigDir not configured, using values from command line flags") configSync = dnsconfig.NewNopSync(&dnsconfig.Config{Federations: config.Federations}) } return &KubeDNSServer{ domain: config.ClusterDomain, healthzPort: config.HealthzPort, dnsBindAddress: config.DNSBindAddress, dnsPort: config.DNSPort, nameServers: config.NameServers, kd: dns.NewKubeDNS(kubeClient, config.ClusterDomain, config.InitialSyncTimeout, configSync), } }

    上面的代码显示创建了一个调用kubernetes接口的kubeclient,下面是switch分之判读kubedns的配置方式:ConfigMap还是ConfigDir,现在代码以ConfigDir为线索往下走,创建一个文件同步器NewFileSync,这个文件同步器就是周期检查文件是否被修改,具体代码在pkg/dns/config/sync_dir.go里面的Periodic方法实现。

    func (syncSource *kubeFileSyncSource) Periodic() <-chan syncResult { // TODO: drive via inotify? go func() { ticker := syncSource.clock.Tick(syncSource.period) for { if result, err := syncSource.load(); err != nil { glog.Errorf("Error loading config from %s: %v", syncSource.dir, err) } else { syncSource.channel <- result } <-ticker } }() return syncSource.channel }

    创建完文件同步器后就开始创建kubedns了:NewKubeDNS()。这个方法如下(pkg/dns/dns.go):

    func NewKubeDNS(client clientset.Interface, clusterDomain string, timeout time.Duration, configSync config.Sync) *KubeDNS { kd := &KubeDNS{ kubeClient: client, domain: clusterDomain, cache: treecache.NewTreeCache(), cacheLock: sync.RWMutex{}, nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*v1.Service), domainPath: util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")), initialSyncTimeout: timeout, configLock: sync.RWMutex{}, configSync: configSync, } kd.setEndpointsStore() kd.setServicesStore() return kd }

    这个里面先创建KubeDNS然后启动Endpoint和Service的listwatch。这个大家可能都比较熟悉了,在之前blog介绍很多watchlist的,直接看代码:

    //Endpoint的listwatch func (kd *KubeDNS) setEndpointsStore() { // Returns a cache.ListWatch that gets all changes to endpoints. kd.endpointsStore, kd.endpointsController = kcache.NewInformer( &kcache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).Watch(options) }, }, &v1.Endpoints{}, resyncPeriod, kcache.ResourceEventHandlerFuncs{ AddFunc: kd.handleEndpointAdd, UpdateFunc: kd.handleEndpointUpdate, // If Service is named headless need to remove the reverse dns entries. DeleteFunc: kd.handleEndpointDelete, }, ) } //Service的listwatch func (kd *KubeDNS) setServicesStore() { // Returns a cache.ListWatch that gets all changes to services. kd.servicesStore, kd.serviceController = kcache.NewInformer( &kcache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return kd.kubeClient.Core().Services(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return kd.kubeClient.Core().Services(v1.NamespaceAll).Watch(options) }, }, &v1.Service{}, resyncPeriod, kcache.ResourceEventHandlerFuncs{ AddFunc: kd.newService, DeleteFunc: kd.removeService, UpdateFunc: kd.updateService, }, ) }

    创建完成后,就可以启动服务了,回到第一个方法server.Run()。

    func (server *KubeDNSServer) Run() { pflag.VisitAll(func(flag *pflag.Flag) { glog.V(0).Infof("FLAG: --%s=%q", flag.Name, flag.Value) }) setupSignalHandlers() server.startSkyDNSServer() server.kd.Start() server.setupHandlers() glog.V(0).Infof("Status HTTP port %v", server.healthzPort) if server.nameServers != "" { glog.V(0).Infof("Upstream nameservers: %s", server.nameServers) } glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil)) }

    这里面分为四步

    setupSignalHandlers() 屏蔽系统停止信号(SIGTERM、SIGINT),除非kill -9(SIGKILL) 。

    func setupSignalHandlers() { sigChan := make(chan os.Signal) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { for { glog.V(0).Infof("Ignoring signal %v (can only be terminated by SIGKILL)", <-sigChan) glog.Flush() } }() }

    startSkyDNSServer() 启动skydns服务,它是一个DNS服务,具体服务内容在后面继续详解

    func (d *KubeDNSServer) startSkyDNSServer() { glog.V(0).Infof("Starting SkyDNS server (%v:%v)", d.dnsBindAddress, d.dnsPort) skydnsConfig := &server.Config{ Domain: d.domain, DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort), } if d.nameServers != "" { for _, nameServer := range strings.Split(d.nameServers, ",") { r, _ := regexp.Compile(":\\d+$") if !r.MatchString(nameServer) { nameServer = nameServer + ":53" } if err := validateHostAndPort(nameServer); err != nil { glog.Fatalf("nameserver is invalid: %s", err) } skydnsConfig.Nameservers = append(skydnsConfig.Nameservers, nameServer) } } server.SetDefaults(skydnsConfig) s := server.New(d.kd, skydnsConfig) if err := metrics.Metrics(); err != nil { glog.Fatalf("Skydns metrics error: %s", err) } else if metrics.Port != "" { glog.V(0).Infof("Skydns metrics enabled (%v:%v)", metrics.Path, metrics.Port) } else { glog.V(0).Infof("Skydns metrics not enabled") } go s.Run() }

    kd.Start() 启动对kubernetes api的listwatch

    func (kd *KubeDNS) Start() { glog.V(2).Infof("Starting endpointsController") go kd.endpointsController.Run(wait.NeverStop) glog.V(2).Infof("Starting serviceController") go kd.serviceController.Run(wait.NeverStop) kd.startConfigMapSync() // Wait synchronously for the initial list operations to be // complete of endpoints and services from APIServer. kd.waitForResourceSyncedOrDie() }

    setupHandlers() 为kubedns提供健康检查服务

    func (server *KubeDNSServer) setupHandlers() { glog.V(0).Infof("Setting up Healthz Handler (/readiness)") http.HandleFunc("/readiness", func(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, "ok\n") }) glog.V(0).Infof("Setting up cache handler (/cache)") http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) { serializedJSON, err := server.kd.GetCacheAsJSON() if err == nil { fmt.Fprint(w, serializedJSON) } else { w.WriteHeader(http.StatusInternalServerError) fmt.Fprint(w, err) } }) }

    好了,至此服务已经启动。后续的blog将介绍kubedns的工作细节。

    转载请注明原文地址: https://ju.6miu.com/read-672635.html

    最新回复(0)