Kong Ingress Controller可以将Kong与Kubernetes无缝集成,自动将kubernetes中的操作同步到kong中。 API网关Kong(二):Kong与Kubernetes集成的方法有过介绍,这里做代码级别的了解,部署与使用方法参考API网关Kong(二):Kong与Kubernetes集成的方法。
2019-05-06 16:28:56:kong 1.1.x有了一个重大变换,实现了db-less模式,可以不使用数据库了,见笔记二十六:查看全部笔记。
如果是刚开始学习kong,直接从1.x开始,0.x已经不再维护,0.15是0.x的最后一个版本。
前19篇笔记是刚开始接触kong时记录的,使用的版本是0.14.1,当时对kong一知半解,笔记比较杂乱。第二十篇开始是再次折腾时的笔记,使用的版本是1.0.3,笔记相对条理一些。
从0.x到1.x需要关注的变化有:
API网关Kong(二):Kong与Kubernetes集成的方法: CustomResourceDefinitions中介绍了Kong Ingress Controller在Kubernetes中定义的CRD。
这些CRD中的记录需要被同步到Kong中,Kong Ingress Controller监督这些CRD以及Kubernetes集群中的其它相关资源,发现变化后,及时将其同步。
Kong kubernetes ingress controller使用dep管理依赖包,将代码下载之后,先用下面的命令导入依赖包:
make deps
然后编译:
make build
主流程在ngx中实现,ngx中包含两个client,一个是访问kubernetes的kubeClient,一个是访问kong的kongClient。
//kubernetes-ingress-controller/cli/ingress-controller/main.go
func main() {
...
conf.KubeClient = kubeClient
conf.KubeConf = kubeCfg
conf.Kong.Client = kongClient
ngx := controller.NewNGINXController(conf, fs)
...
ngx.Start()
...
ngx中有一个store
成员:
//kubernetes-ingress-controller/internal/ingress/controller/run.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
...
n.store = store.New(
config.EnableSSLChainCompletion,
config.Namespace,
"",
"",
"",
"",
config.ResyncPeriod,
config.KubeClient,
config.KubeConf,
fs,
n.updateCh)
...
n.store中存放的是kubernetes的client-go实现的informer
,当kubernetes中的资源发生变化时,注册到informer中的handler
会被调用:
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
filesystem: fs,
updateCh: updateCh,
mu: &sync.Mutex{},
secretIngressMap: NewObjectRefMap(),
}
...
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
...
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
...
store.informers.Ingress.AddEventHandler(ingEventHandler)
...
一共有5个Handler:
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
...
secrEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sec := obj.(*corev1.Secret)
...
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
...
serviceEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
},
...
crdEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: obj,
}
},
...
这5个Handler被注册到8个informer中,分别监听kubernetes中的ingress、endpoint、secret、service、kong plugin、kong consumer、kong credential、kong configuration:
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.Service.AddEventHandler(serviceEventHandler)
store.informers.Kong.Plugin.AddEventHandler(crdEventHandler)
store.informers.Kong.Consumer.AddEventHandler(crdEventHandler)
store.informers.Kong.Credential.AddEventHandler(crdEventHandler)
store.informers.Kong.Configuration.AddEventHandler(crdEventHandler)
需要注意的CRD的的监听方式,就是Kong.Plugin,Kong.Consumer,Kong.Credential,Kong.Configuration,它们不是Kubernetes中的原生类型。
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
pluginClient, _ := pluginclientv1.NewForConfig(clientConf)
pluginFactory := plugininformer.NewFilteredSharedInformerFactory(pluginClient, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Kong.Plugin = pluginFactory.Configuration().V1().KongPlugins().Informer()
store.listers.Kong.Plugin = store.informers.Kong.Plugin.GetStore()
store.informers.Kong.Plugin.AddEventHandler(crdEventHandler)
它们对应的informer实现略复杂一些,是通过封装Kubernetes的REST Client实现的,后面单独分析。
其它的informer是直接调用client-go的方法创建的):
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
CRD的informer是通过封装Kubernetes的REST Client实现的:
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
pluginClient, _ := pluginclientv1.NewForConfig(clientConf)
pluginFactory := plugininformer.NewFilteredSharedInformerFactory(pluginClient, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Kong.Plugin = pluginFactory.Configuration().V1().KongPlugins().Informer()
...
pluginclientv1.NewForConfig()
封装了client-go中的原生REST:
//kubernetes-ingress-controller/internal/client/plugin/clientset/versioned/clientset.go
func NewForConfig(c *rest.Config) (*Clientset, error) {
...
var cs Clientset
cs.configurationV1, err = configurationv1.NewForConfig(&configShallowCopy)
...
cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
可以看到创建了cs.configurationV1
和cs.DiscoveryClient
两个client。
cs.DiscoveryClient是调用client-go的接口创建,没有特别之处,重点是cs.configurationV1。
cs.configurationV1
的实现如下:
//kubernetes-ingress-controller/internal/client/plugin/clientset/versioned/typed/plugin/v1/plugin_client.go
func NewForConfig(c *rest.Config) (*ConfigurationV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &ConfigurationV1Client{client}, nil
}
关键是setConfigDefaults(&config)
中,设置了config:
//kubernetes-ingress-controller/internal/client/plugin/clientset/versioned/typed/plugin/v1/plugin_client.go
func setConfigDefaults(config *rest.Config) error {
gv := v1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
config.GroupVersion的值为:
config.GroupVersion -- > gv --> &v1.SchemeGroupVersion
继续追究v1.SchemeGroupVersion,发现它包含了CRD的Group名称和版本:
//kubernetes-ingress-controller/internal/apis/plugin/v1/register.go
var SchemeGroupVersion = info.SchemeGroupVersion
//kubernetes-ingress-controller/internal/apis/group/info.go
var GroupName = "configuration.konghq.com"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
这个GroupName和Version是与CRD定义中的字段对应的:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: kongconsumers.configuration.konghq.com
spec:
group: configuration.konghq.com
version: v1
scope: Namespaced
names:
kind: KongConsumer
plural: kongconsumers
shortNames:
- kc
上一节注册的Handler函数中,最后都会向updateCh
中写入事件:
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
updateCh *channels.RingChannel) Storer {
...
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
...
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
...
这个updateCh和ngx.updateCh是同一个,创建n.store的时候传入的:
//kubernetes-ingress-controller/internal/ingress/controller/run.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
...
n := &NGINXController{
...
}
n.store = store.New( config.EnableSSLChainCompletion,
config.Namespace,
"",
"",
"",
"",
config.ResyncPeriod,
config.KubeClient,
config.KubeConf,
fs,
n.updateCh)
...
ngx调用Start()启动之后,每从updateCh中收到一个事件,就向任务队列中添加一个任务:
//kubernetes-ingress-controller/internal/ingress/controller/run.go
func (n *NGINXController) Start() {
glog.Infof("starting Ingress controller")
...
n.store.Run(n.stopCh)
...
go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.Enqueue(&extensions.Ingress{})
for {
select {
...
case event := <-n.updateCh.Out():
...
if evt, ok := event.(store.Event); ok {
...
n.syncQueue.Enqueue(evt.Obj)
...
case <-n.stopCh:
break
}
}
任务队列也是在创建ngx的时候创建的:
//kubernetes-ingress-controller/internal/ingress/controller/run.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
...
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.syncIngress(interface{}))
传入参数是通过event.Obj
生成的字符串key,分析kubernetes-ingress-controller/internal/task/queue.go
中queue的实现可以知晓。
n.syncIngress()是输入参数它的用途是将配置信息同步到kong中,当队列中有事件时,被触发执行,并不关心具体是什么事件。
n.syncIngress()的实现分为两步,第一步从n.store中读取ings信息,第二步是将信息传递给n.OnUpdate()
//kubernetes-ingress-controller/internal/ingress/controller/controller.go:
func (n *NGINXController) syncIngress(interface{}) error {
...
ings := n.store.ListIngresses()
...
upstreams, servers := n.getBackendServers(ings)
pcfg := ingress.Configuration{
Backends: upstreams,
Servers: servers,
}
err := n.OnUpdate(&pcfg)
n.runningConfig = &pcfg
...
注意当前syncIngress(0.2.0版本)的实现中,无论输入参数是什么,都进行全局更新,这可能是一个隐患,需要改成根据输入参数进行部分更新。
ings是存放在n.store中的,n.store.ListIngresses()使用的是通过client-go创建的informer,以及store
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func (s k8sStore) ListIngresses() []*extensions.Ingress {
...
var ingresses []*extensions.Ingress
for _, item := range s.listers.Ingress.List() {
ing := item.(*extensions.Ingress)
...
ingresses = append(ingresses, ing)
}
return ingresses
}
//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
...
因此查询出来的ings就是kubernetes中标准的ingress:
//k8s.io/api/extentions/v1beta1/types.go
type Ingress struct {
metav1.TypeMeta
metav1.ObjectMeta
Spec IngressSpec
//这里直接嵌套IngressSpec的定义,这不是Go的语法,只是方便查看,下同
type IngressSpec struct {
//默认Backend,至少需要一个Backend或者一个Rule
Backend *IngressBackend
TLS []IngressTLS
type IngressTLS struct {
//证书绑定的hostname
Hosts []string
//证书
SecretName string
}
Rules []IngressRule
type IngressRule struct {
Host string
IngressRuleValue
type IngressRuleValue struct {
HTTP *HTTPIngressRuleValue
type HTTPIngressRuleValue struct {
Paths []HTTPIngressPath
type HTTPIngressPath struct {
Path string
Backend IngressBackend
type IngressBackend struct {
ServiceName string
ServicePort intstr.IntOrString
}
}
}
}
}
}
Status IngressStatus
}
通过遍历所有的ingress,得到server和backend列表:
//kubernetes-ingress-controller/internal/ingress/controller/controller.go:
func (n *NGINXController) syncIngress(interface{}) error {
...
upstreams, servers := n.getBackendServers(ings)
...
pcfg := ingress.Configuration{
Backends: upstreams,
Servers: servers,
}
err := n.OnUpdate(&pcfg)
...
通过函数n.OnUpdate()
进行更新:
//kubernetes-ingress-controller/internal/ingress/controller/controller.go:
func (n *NGINXController) syncIngress(interface{}) error {
...
err := n.syncCertificates(ingressCfg.Servers)
for _, server := range ingressCfg.Servers {
...
err := n.syncUpstreams(server.Locations, ingressCfg.Backends)
...
}
err = n.syncConsumers()
...
err = n.syncCredentials()
...
err = n.syncGlobalPlugins()
...
checkServices, err := n.syncServices(ingressCfg)
...
checkRoutes, err := n.syncRoutes(ingressCfg)
...