kubernetes 的 Apiserver 的 storage 使用

Tags: kubernetes 

目录

kubernetes-apiserver中介绍过,apiserver使用的kubernetes-style apiserver:

1. 在APIGroupInfo中装载各类的storage
2. GenericAPIServer依据传入的APIGroupInfo中的storage,自动生成REST handler。

apiserver在存取资源时,最终是通过各个storage完成,这里探究一下storage怎样创建的,又是怎样与etcd关联上的。

注意v1.6.4版本的代码发生了一些变化,主要是apiserver启动时config初始化的代码统一放置在了BuildMasterConfig()中,代码更为清晰了。

func Run(s *options.ServerRunOptions) error {
    config, sharedInformers, err := BuildMasterConfig(s)
    if err != nil {
        return err
    }

    return RunServer(config, sharedInformers, wait.NeverStop)
}

storage的机制没有变化,下面的分析过程依然适用于v1.6.4。

代码结构

registry目录下收录的就是kubernetes的每类资源的REST实现代码。

k8s.io/kubernetes/pkg/registry/
▾ apps/
  ▸ petset/
  ▸ rest/
       storage_apps.go
▸ authentication/
▸ authorization/
▸ autoscaling/
▸ batch/
▸ cachesize/
▸ certificates/
▸ core/             #core/rest中实现了NewLegacyRESTStorage()
▸ extensions/
▸ policy/
▸ rbac/
▸ registrytest/
▸ settings/
▸ storage/

每类资源目录下都有一个rest目录,其中实现了各自的storage。例如apps/rest中的代码定义了可以提供给GenericAPIServer的storage。

设置过程

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:

func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
	kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions)
	...
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, stopCh)
	...

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:

func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New()
	...

记住这里的kubeAPIServerConfig,还会回来找它的!

storage在kubeAPIServerConfig.Complete().New()中完成了设置。

v1.6.4中这部代码做了更改,结构更为清晰了:

cmd/kube-apiserver/app/server.go:

func Run(s *options.ServerRunOptions) error {
	//在BuildMasterConfig中创建了config
	config, sharedInformers, err := BuildMasterConfig(s)
	if err != nil {
		return err
	}
	
	return RunServer(config, sharedInformers, wait.NeverStop)
}

func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error {
	//在config.Complete().New()中完成所有的设置,包括storage的设置。
	m, err := config.Complete().New()
	if err != nil {
		return err
	}
	
	sharedInformers.Start(stopCh)
	return m.GenericAPIServer.PrepareRun().Run(stopCh)
}

storage的设置

k8s.io/kubernetes/pkg/master/master.go:

func (c completedConfig) New() (*Master, error) {
	...
	m := &Master{
		GenericAPIServer: s,
	}
	if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
			StorageFactory:       c.StorageFactory,
			ProxyTransport:       c.ProxyTransport,
			KubeletClientConfig:  c.KubeletClientConfig,
			EventTTL:             c.EventTTL,
			ServiceIPRange:       c.ServiceIPRange,
			ServiceNodePortRange: c.ServiceNodePortRange,
			LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
		}
		//装载pod、service的资源操作的REST api
		m.InstallLegacyAPI(c.Config, c.Config.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
	}
	restStorageProviders := []RESTStorageProvider{
		authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
		authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
		autoscalingrest.RESTStorageProvider{},
		batchrest.RESTStorageProvider{},
		certificatesrest.RESTStorageProvider{},
		extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, c.StorageFactory)},
		policyrest.RESTStorageProvider{},
		rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
		settingsrest.RESTStorageProvider{},
		storagerest.RESTStorageProvider{},
		// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
		// See https://github.com/kubernetes/kubernetes/issues/42392
		appsrest.RESTStorageProvider{},
	}
	//装载其它的api
	m.InstallAPIs(c.Config.APIResourceConfigSource, c.Config.GenericConfig.RESTOptionsGetter, restStorageProviders...)

留意这里创建的legacyRESTStorageProviderrestStorageProviders,通过接下来的过程可以看到storage最终是由它们创建的。

m.InstallLegacyAPI()

//k8s.io/kubernetes/pkg/master/master.go:
func (m *Master) InstallLegacyAPI(c *Config, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
	...
	if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
		glog.Fatalf("Error in registering group versions: %v", err)
	}
	...

可以看到,这里生成了一个apiGroupInfo,然后将其装载到了m.GenericAPIServer中,与GenericAPIServer的工作方式衔接上了。

legacyRESTStorageProvider

在上面的代码中看到,apiGroupInfo是通过调用 legacyRESTStorageProvider.NewLegacyRESTStorage () 创建的。

//k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
	apiGroupInfo := genericapiserver.APIGroupInfo{
		GroupMeta:                    *api.Registry.GroupOrDie(api.GroupName),
		VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
		Scheme:                      api.Scheme,
		ParameterCodec:              api.ParameterCodec,
		NegotiatedSerializer:        api.Codecs,
		SubresourceGroupVersionKind: map[string]schema.GroupVersionKind{},
	}
	restStorage := LegacyRESTStorage{}
	...
	nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
	...
	//按资源创建了Storage
	podTemplateStorage := podtemplatestore.NewREST(restOptionsGetter)
	eventStorage := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
	limitRangeStorage := limitrangestore.NewREST(restOptionsGetter)
	...
	podStorage := podstore.NewStorage(
			restOptionsGetter,
			nodeStorage.KubeletConnectionInfo,
			c.ProxyTransport,
			podDisruptionClient,
			)
	...
	
	//将新建的storage保存到
	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		...
		"nodes":        nodeStorage.Node,
	...
	//restStorageMap被装载到了apiGroupInfo
	apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
	...

重点分析几个具体的storage,了解它们的实现。

nodeStorage

在上面的代码中可以看到nodeStorage是通过调用nodestore.NewStorage创建的。

//k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:
func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
	store := &genericregistry.Store{
	...
	return &NodeStorage{
		Node:   nodeREST,
		Status: statusREST,
		Proxy:  proxyREST,
		KubeletConnectionInfo: connectionInfoGetter,
	}, nil
	...

NodeStorage的成员变量Status实现了Get()、New()、Update(), Status类型是`StatusREST`。

-+StatusREST : struct
    [fields]
   -store : *genericregistry.Store
    [methods]
   +Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) : runtime.Object, error
   +New() : runtime.Object
   +Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) : runtime.Object, bool, error

StatusREST.Get():

func (r *StatusREST) Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	return r.store.Get(ctx, name, options)
}

回到创建NodeStorage的函数中,找到变量StatusREST.store的创建。

//k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:
func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
	store := &genericregistry.Store{
		Copier:      api.Scheme,
		NewFunc:     func() runtime.Object { return &api.Node{} },
		NewListFunc: func() runtime.Object { return &api.NodeList{} },
		ObjectNameFunc: func(obj runtime.Object) (string, error) {
			return obj.(*api.Node).Name, nil
		},
		PredicateFunc:     node.MatchNode,
		QualifiedResource: api.Resource("nodes"),
		WatchCacheSize:    cachesize.GetWatchCacheSizeByResource("nodes"),
		
		CreateStrategy: node.Strategy,
		UpdateStrategy: node.Strategy,
		DeleteStrategy: node.Strategy,
		ExportStrategy: node.Strategy,
	}
	...
	statusStore := *store
	statusStore.UpdateStrategy = node.StatusStrategy
	...
	//r.store就是statusStore
	statusREST := &StatusREST{store: &statusStore}
	...
	return &NodeStorage{
		Node:   nodeREST,
		Status: statusREST,
		Proxy:  proxyREST,
		KubeletConnectionInfo: connectionInfoGetter,
	}, nil
}

可以看到,StatusREST.storegenericregistry.Store

genericregistry.Store中包含了NewFunc、NewListFunc等函数变量,需要看一下genericregistry.Store的Get等方法是怎样实现的。

genericregistry.Store

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:
// Store implements pkg/api/rest.StandardStorage. It's intended to be
// embeddable and allows the consumer to implement any non-generic functions
// that are required. This object is intended to be copyable so that it can be
// used in different ways but share the same underlying behavior.
//
// All fields are required unless specified.
//
// The intended use of this type is embedding within a Kind specific
// RESTStorage implementation. This type provides CRUD semantics on a Kubelike
// resource, handling details like conflict detection with ResourceVersion and
// semantics. The RESTCreateStrategy, RESTUpdateStrategy, and
// RESTDeleteStrategy are generic across all backends, and encapsulate logic
// specific to the API.

genericregistry.Store的成员:

-+Store : struct
    [fields]
   +AfterCreate : ObjectFunc
   +AfterDelete : ObjectFunc
   +AfterUpdate : ObjectFunc
   +Copier : runtime.ObjectCopier
   +CreateStrategy : rest.RESTCreateStrategy
   +Decorator : ObjectFunc
   +DeleteCollectionWorkers : int
   +DeleteStrategy : rest.RESTDeleteStrategy
   +DestroyFunc : func()
   +EnableGarbageCollection : bool
   +ExportStrategy : rest.RESTExportStrategy
   +KeyFunc : func(ctx genericapirequest.Context, name string) string, error
   +KeyRootFunc : func(ctx genericapirequest.Context) string
   +NewFunc : func() runtime.Object
   +NewListFunc : func() runtime.Object
   +ObjectNameFunc : func(obj runtime.Object) string, error
   +PredicateFunc : func(label labels.Selector, field fields.Selector) storage.SelectionPredicate
   +QualifiedResource : schema.GroupResource
   +ReturnDeletedObject : bool
   +Storage : storage.Interface
   +TTLFunc : func(obj runtime.Object, existing uint64, update bool) uint64, error
   +UpdateStrategy : rest.RESTUpdateStrategy
   +WatchCacheSize : int
    [methods]
   +CompleteWithOptions(options *generic.StoreOptions) : error
   +Create(ctx genericapirequest.Context, obj runtime.Object) : runtime.Object, error
   +Delete(ctx genericapirequest.Context, name string, options *metav1.DeleteOptions) : runtime.Object, bool, error
   +DeleteCollection(ctx genericapirequest.Context, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) : runtime.Object, error
   +Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) : runtime.Object, error
   +Get(ctx genericapirequest.Context, name string, options *metav1.GetOptions) : runtime.Object, error
   +List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) : runtime.Object, error
   +ListPredicate(ctx genericapirequest.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) : runtime.Object, error
   +New() : runtime.Object
   +NewList() : runtime.Object
   +Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) : runtime.Object, bool, error
   +Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) : watch.Interface, error
   +WatchPredicate(ctx genericapirequest.Context, p storage.SelectionPredicate, resourceVersion string) : watch.Interface, error

看一下Create()方法的实现:

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go,
func (e *Store) Create(ctx genericapirequest.Context, obj runtime.Object) (runtime.Object, error) {
	if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
		return nil, err
	}
	
	name, err := e.ObjectNameFunc(obj)
	...
	key, err := e.KeyFunc(ctx, name)
	...
	ttl, err := e.calculateTTL(obj, 0, false)
	...
	out := e.NewFunc()
	if err := e.Storage.Create(ctx, key, obj, out, ttl); err != nil {
		...
	}
	if e.AfterCreate != nil {
		...
	}
	if e.Decorator != nil {
		...
	}
	return out, nil
}

e.NewFunc()是创建时传入的变量,对于NodeStorage而言是:

 store := &genericregistry.Store{
    Copier:      api.Scheme,
    NewFunc:     func() runtime.Object { return &api.Node{} },
    NewListFunc: func() runtime.Object { return &api.NodeList{} },
    ObjectNameFunc: func(obj runtime.Object) (string, error) {
        return obj.(*api.Node).Name, nil
    },
    PredicateFunc:     node.MatchNode,
    QualifiedResource: api.Resource("nodes"),
    WatchCacheSize:    cachesize.GetWatchCacheSizeByResource("nodes"),

    CreateStrategy: node.Strategy,
    UpdateStrategy: node.Strategy,
    DeleteStrategy: node.Strategy,
    ExportStrategy: node.Strategy,
}

创建时在调用了e.NewFunc()之后,又调用了e.Storage.Create(ctx, key, obj, out, ttl),需要继续回溯找到创建e.Storage的地方。

创建store时,传入参数中没有e.Storage,所以应该是store建立后再设置的e.Storage。

//k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:
func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
	store := &genericregistry.Store{
		Copier:      api.Scheme,
	...
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: node.GetAttrs, TriggerFunc: node.NodeNameTriggerFunc}
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, err
	}
	...

回溯代码的时候,发现了store.CompleteWithOptions(),kubernetes的代码中经常会用这种方式来补全一个结构体的成员变量。

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
	...
	opts, err := options.RESTOptions.GetRESTOptions(e.QualifiedResource)
	...
	if e.Storage == nil {
		capacity := DefaultWatchCacheSize
		if e.WatchCacheSize != 0 {
			capacity = e.WatchCacheSize
		}
		e.Storage, e.DestroyFunc = opts.Decorator(
			e.Copier,
			opts.StorageConfig,
			capacity,
			e.NewFunc(),
			prefix,
			keyFunc,
			e.NewListFunc,
			options.AttrFunc,
			triggerFunc,
		)
	}
	...

不出所料,e.Storage就是在这里创建的,还需要继续回溯,找到opts.Decorator()的实现。

opts.Decorator()

要找到opts.Decorator()的实现,看它是如何创建了e.Storage。

在上面的代码中可以看到opts是通过options.RESTOptions.GetRESTOptions()创建的。

而options则是在NewStorage中调用store.CompletWithOptions之前创建的,如下所示:

//k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:
func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
	store := &genericregistry.Store{
		Copier:      api.Scheme,
	...
	//options的创建
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: node.GetAttrs, TriggerFunc: node.NodeNameTriggerFunc}
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, err
	}
	...

options.RESTOptions就是变量optsGetter,继续回溯,找到optsGetter的实现:

//k8s.io/kubernetes/pkg/master/master.go:
func (c completedConfig) New() (*Master, error) {
	...
	m := &Master{
		GenericAPIServer: s,
	}
	if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
		...
		//装载pod、service的资源操作的REST api
		//参数2就是optsGetter
		m.InstallLegacyAPI(c.Config, c.Config.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
	}

c.Config.GenericConfig.RESTOptionsGetter就是optsGetter,而c.Config就是一开始就提醒要记住的kubeAPIServerConfig:

//k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
	kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions)
	...

要找到 kubeAPIServerConfig.GenericConfig. RESTOptionsGetter

//src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {

	//genericConfig在这里创建
	genericConfig, sharedInformers, insecureServingOptions, err := BuildGenericConfig(s)
	...
	config := &master.Config{
		GenericConfig: genericConfig,
	...
	return config, sharedInformers, insecureServingOptions, nil
}

BuildGenericConfig():

//k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
	genericConfig := genericapiserver.NewConfig(api.Codecs)

NewConfig():

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/config.go:
func NewConfig(codecs serializer.CodecFactory) *Config {
	return &Config{
		Serializer:                  codecs,
		ReadWritePort:               443,
		RequestContextMapper:        apirequest.NewRequestContextMapper(),
		BuildHandlerChainFunc:       DefaultBuildHandlerChain,
		LegacyAPIGroupPrefixes:      sets.NewString(DefaultLegacyAPIPrefix),
		HealthzChecks:               []healthz.HealthzChecker{healthz.PingHealthz},
		EnableIndex:                 true,
		EnableDiscovery:             true,
		EnableProfiling:             true,
		MaxRequestsInFlight:         400,
		MaxMutatingRequestsInFlight: 200,
		MinRequestTimeout:           1800,
		
		// Default to treating watch as a long-running operation
		// Generic API servers have no inherent long-running subresources
		LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
	}
}

kubeAPIServerConfig的创建过程中,没有设置RESTOptionsGetter,回溯继续找。

//k8s.io/kubernetes/cmd/kube-apiserver/app/server.go
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
	//这里没有设置RESTOptionsGetter
	kubeAPIServerConfig, sharedInformers, insecureServingOptions, err := CreateKubeAPIServerConfig(runOptions)
	if err != nil {
		return err
	}
	//到这里看一下
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, stopCh)
	if err != nil {
		return err
	}

CreateKubeAPIServer()中看一下:

//k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) (*master.Master, error) {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New()
	...

这里又有一个Complete()函数,看到这个就开心了,因为kubernetes中通常都是在Complete()中设置必须设置的变量。

//k8s.io/kubernetes/pkg/master/master.go:
func (c *Config) Complete() completedConfig {
	c.GenericConfig.Complete()
	...

我们现在要找的是 kubeAPIServerConfig.GenericConfig.RESTOptionsGetter ,在 c.GenericConfig 的 Complete() 方法没有发现设置 RESTOptionsGetter。

回到创建c.GenericConfig的地方,继续寻找。

//k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:
func CreateKubeAPIServerConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
	...
	genericConfig, sharedInformers, insecureServingOptions, err := BuildGenericConfig(s)
	...
	config := &master.Config{
		GenericConfig: genericConfig,
	...

c.GenericConfig是在BuildGenericConfig中创建的。

BuildGenericConfig():

//k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:
func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, informers.SharedInformerFactory, *kubeserver.InsecureServingInfo, error) {
	//这里没有设置RESTOptionsGetter
	genericConfig := genericapiserver.NewConfig(api.Codecs)
	...
	if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil {
		return nil, nil, nil, err
	}

进入到s.Etcd.ApplyWithStorageFactoryTo()中,才猛然发现:

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
	c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
	return nil
}

而且注意s的类型是EtcdOptions!

要找的目标是e.Storage -> opts.Decorator -> opts -> options.RESTOptions.GetRESTOptions。

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)
	if err != nil {
		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
	}

	ret := generic.RESTOptions{
		StorageConfig:           storageConfig,
		Decorator:               generic.UndecoratedStorage,
		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
		EnableGarbageCollection: f.Options.EnableGarbageCollection,
		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
	}
	if f.Options.EnableWatchCache {
		ret.Decorator = genericregistry.StorageWithCacher
	}

	return ret, nil
}

找到Decorator了,e.Storage就是通过调用这个函数实现的,也就是genericregistry.StorageWithCacher:

genericregistry.StorageWithCacher()

别忘了NodeStorage最终落实到e.Storage,而e.Storage是通过opts.Decorator()创建的。

opt.Decorator就是genericregistry.StorageWithCacher()

//k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:
func StorageWithCacher(
	copier runtime.ObjectCopier,
	storageConfig *storagebackend.Config,
	capacity int,
	objectType runtime.Object,
	resourcePrefix string,
	keyFunc func(obj runtime.Object) (string, error),
	newListFunc func() runtime.Object,
	getAttrsFunc storage.AttrFunc,
	triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
	...
s, d := generic.NewRawStorage(storageConfig)
cacherConfig := storage.CacherConfig{
	CacheCapacity:        capacity,
	Storage:              s,
	Versioner:            etcdstorage.APIObjectVersioner{},
	Copier:               copier,
	Type:                 objectType,
	ResourcePrefix:       resourcePrefix,
	KeyFunc:              keyFunc,
	NewListFunc:          newListFunc,
	GetAttrsFunc:         getAttrsFunc,
	TriggerPublisherFunc: triggerFunc,
	Codec:                storageConfig.Codec,
}
cacher := storage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
	cacher.Stop()
	d()
}

return cacher, destroyFunc
}

StorageWithCache又是一个很复杂的过程,它会与etcd通信。

m.InstallAPIs()

m.InstallAPIS()装载了其它个api组

authenticationrest. RESTStorageProvider

//k8s.io/kubernetes/pkg/master/master.go:
restStorageProviders := []RESTStorageProvider{
	authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
	...

NewRESTStorage():

//k8s.io/kubernetes/pkg/registry/authentication/rest/storage_authentication.go:
func (p RESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool) {
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(authentication.GroupName, api.Registry, api.Scheme, api.ParameterCodec, api.Codecs)
	...
	if apiResourceConfigSource.AnyResourcesForVersionEnabled(authenticationv1.SchemeGroupVersion) {
		apiGroupInfo.VersionedResourcesStorageMap[authenticationv1.SchemeGroupVersion.Version] = p.v1Storage(apiResourceConfigSource, restOptionsGetter)
		apiGroupInfo.GroupMeta.GroupVersion = authenticationv1.SchemeGroupVersion
	}
	
	return apiGroupInfo, true
}

可以看到SchemeGroupVersion中存放了groupname和version:

//k8s.io/kubernetes/pkg/apis/authentication/v1/register.go,
const GroupName = "authentication.k8s.io"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

v1Storage():

//k8s.io/kubernetes/pkg/registry/authentication/rest/storage_authentication.go,:
func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) map[string]rest.Storage {
	version := authenticationv1.SchemeGroupVersion

	storage := map[string]rest.Storage{}
	if apiResourceConfigSource.AnyResourcesForVersionEnabled(authenticationv1.SchemeGroupVersion) {
		if apiResourceConfigSource.ResourceEnabled(version.WithResource("tokenreviews")) {
			tokenReviewStorage := tokenreview.NewREST(p.Authenticator)
			storage["tokenreviews"] = tokenReviewStorage
		}
	}
	
	return storage
}

可以看到这里最终使用的storage是tokenreview.NewREST()创建的。

参考

  1. kubernetes-apiserver

kubernetes

  1. kubernetes 使用:多可用区、Pod 部署拓扑与 Topology Aware Routing
  2. kubernetes 扩展:Cloud Controller Manager
  3. kubernetes 准入:操作合法性检查(Admission Control)
  4. kubernetes 鉴权:用户操作权限鉴定(Authorization)
  5. kubernetes 认证:用户管理与身份认证(Authenticating)
  6. kubernetes 开发:代码生成工具
  7. kubernetes 扩展:operator 开发
  8. kubernetes 扩展:CRD 的使用方法
  9. kubernetes configmap 热加载,inotifywatch 监测文件触发热更新
  10. kubernetes 扩展:扩展点和方法(api/cr/plugin...)
  11. kubernetes 调度组件 kube-scheduler 1.16.3 源代码阅读指引
  12. kubernetes 代码中的 k8s.io 是怎么回事?
  13. 阅读笔记《不一样的 双11 技术,阿里巴巴经济体云原生实践》
  14. kubernetes ingress-nginx 启用 upstream 长连接,需要注意,否则容易 502
  15. ingress-nginx 的限速功能在 nginx.conf 中的对应配置
  16. kubernetes 中的容器设置透明代理,自动在 HTTP 请求头中注入 Pod 信息
  17. kubernetes ingress-nginx 的测试代码(单元测试+e2e测试)
  18. kubernetes ingress-nginx http 请求复制功能与 nginx mirror 的行为差异
  19. kubernetes 基于 openresty 的 ingress-nginx 的状态和配置查询
  20. kubernetes ingress-nginx 0.25 源代码走读笔记
  21. kubernetes ingress-nginx 的金丝雀(canary)/灰度发布功能的使用方法
  22. kubernetes 操作命令 kubectl 在 shell 中的自动补全配置
  23. kubernetes 组件 kube-proxy 的 IPVS 功能的使用
  24. kubernetes initializer 功能的使用方法: 在 Pod 等 Resource 落地前进行修改
  25. kubernetes 版本特性: 新特性支持版本和组件兼容版本
  26. kubernetes API 与 Operator: 不为人知的开发者战争(完整篇)
  27. kubernetes 1.12 从零开始(七): kubernetes开发资源
  28. kubernetes 1.12 从零开始(六): 从代码编译到自动部署
  29. kubernetes 网络方案 Flannel 的学习笔记
  30. kubernetes 1.12 从零开始(五): 自己动手部署 kubernetes
  31. kubernetes 1.12 从零开始(四): 必须先讲一下基本概念
  32. kubernetes 1.12 从零开始(三): 用 kubeadm 部署多节点集群
  33. kubernetes 1.12 从零开始(二): 用 minikube 部署开发测试环境
  34. kubernetes 1.12 从零开始(一): 部署环境准备
  35. kubernetes 1.12 从零开始(零): 遇到的问题与解决方法
  36. kubernetes 1.12 从零开始(初): 课程介绍与官方文档汇总
  37. kubernetes 集群状态监控:通过 grafana 和 prometheus
  38. 一些比较有意思的Kubernetes周边产品
  39. Borg论文阅读笔记
  40. kubelet下载pod镜像时,docker口令文件的查找顺序
  41. kubernetes 的 Client Libraries 的使用
  42. kubernetes的网络隔离networkpolicy
  43. kube-router的源码走读
  44. kubernetes 跨网段通信: 通过 calico 的 ipip 模式
  45. kubernetes的调试方法
  46. kubernetes 与 calico 的衔接过程
  47. 怎样理解 kubernetes 以及微服务?
  48. kubernetes中部署有状态的复杂分布式系统
  49. kubernetes的apiserver的启动过程
  50. kubernetes的api定义与装载
  51. kubernetes的federation部署,跨区Service
  52. kubernetes的编译、打包、发布
  53. kubernetes的第三方包的使用
  54. kubernetes的Storage的实现
  55. kubernetes 的 Apiserver 的 storage 使用
  56. kubernetes的Controller-manager的工作过程
  57. kubernetes的Client端Cache
  58. kubernetes 的 Apiserver 的工作过程
  59. kubernetes的CNI插件初始化与Pod网络设置
  60. kubernetes的Pod变更过程
  61. kubernetes的kubelet的工作过程
  62. kuberntes 的 Cmdline 实现
  63. kubernetes的Pod内挂载的Service Account的使用方法
  64. kubernetes的社区资源与项目参与方式
  65. kubernetes的Kube-proxy的转发规则分析
  66. kubernetes的基本操作
  67. kubernetes在CentOS上的集群部署
  68. kubernetes在CentOS上的All In One部署
  69. 怎样选择集群管理系统?

推荐阅读

Copyright @2011-2019 All rights reserved. 转载请添加原文连接,合作请加微信lijiaocn或者发送邮件: [email protected],备注网站合作

友情链接:  系统软件  程序语言  运营经验  水库文集  网络课程  微信网文  发现知识星球