09丨K8s Main Server Implementation#
通过之前的学习,已经学习过了如下知识:
- 注册表的填充
- Server 的 Options To Configs
- API 的注入
而想要完整走完 K8s 主 Server 的实现,我们只需要再学习在 API 注入之前,主 Server 如何构造出 API 的输入参数的。
注入 API#
主 Server 的 Kubernetes API 分两批注入底座 Generic Server:
注入核心 API,调用 Generic Server 实例的
InstallLegacyAPIGroup()方法完成。// 代码: pkg\controlplane\apiserver\apis.go#L140 if err := s.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {注入其它内置 API,调用 Generic Server 实例的
InstallAPIGroups()方法完成。// 代码: pkg\controlplane\apiserver\apis.go#L149 if err := s.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
注入接口的核心输入参数是 APIGroupInfo 结构体的实例或数组,每个 API Group 有一个该类型实例。API 的注入过程在讲解 Generic Server 时介绍过,主 Server 如何构造出该输入参数是接下来的要点。
New函数中,会先构建出restStorageProviders,然后将构建出的restStorageProviders传给InstallAPIs函数,用来注入 API。// 代码: pkg\controlplane\instance.go#L316-L384 func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) { ... restStorageProviders, err := c.StorageProviders(client) if err != nil { return nil, err } if err := s.ControlPlane.InstallAPIs(restStorageProviders...); err != nil { return nil, err } }先看是如何构建的
restStorageProviders:func (c CompletedConfig) StorageProviders(client *kubernetes.Clientset) ([]controlplaneapiserver.RESTStorageProvider, error) { // 1. 首先创建 Core API (Legacy) Provider【后续会讲解】 legacyRESTStorageProvider, err := corerest.New(...) if err != nil { return nil, err } // 2. 返回所有 Provider 的列表 providers := []controlplaneapiserver.RESTStorageProvider{ legacyRESTStorageProvider, // 3. 添加非核心 API apiserverinternalrest.StorageProvider{}, ... } return providers, nil }得到
restStorageProviders后,再将其交给InstallAPIs进行API 的注入。InstallAPIs代码如下:// InstallAPIs will install the APIs for the restStorageProviders if they are enabled. func (s *Server) InstallAPIs(restStorageProviders ...RESTStorageProvider) error { nonLegacy := []*genericapiserver.APIGroupInfo{} // 要点① ... for _, restStorageBuilder := range restStorageProviders { groupName := restStorageBuilder.GroupName() // 要点② apiGroupInfo, err := restStorageBuilder.NewRESTStorage(s.APIResourceConfigSource, s.RESTOptionsGetter) // 要点③ ... if len(groupName) == 0 { // 要点④ // the legacy group for core APIs is special that it is installed into /api via this special install method. if err := s.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("error in registering legacy API: %w", err) } } else { // everything else goes to /apis nonLegacy = append(nonLegacy, &apiGroupInfo) // 要点⑤ } } if err := s.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil { // 要点⑥ return fmt.Errorf("error in registering group versions: %w", err) } return nil- 对于 core API 组,通过要点②处的
groupName := restStorageBuilder.GroupName()得到该groupName = "",然后调用要点③处的NewRESTStorage函数得到apiGroupInfo,通过要点④处的判断后交给要点⑤处的nonLegacy = append(nonLegacy, &apiGroupInfo)进行注册。 - 对于其他 API 组,先定义出要点①的
nonLegacy,然后通过要点③处的NewRESTStorage函数得到apiGroupInfo,由于无法通过要点④处的判断,会加入到nonLegacy中,等待所有restStorageProviders都被访问过后,交给要点⑥进行集中注册。
注意,
NewRESTStorage函数是每个实现 RESTStorageProvider 接口的结构体自己需要实现的方法,该函数签名如下:func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error)输入参数:
apiResourceConfigSource(APIResourceConfigSource 接口)- 作用:判断哪些资源被启用/禁用
- 方法:
ResourceEnabled(resource)- 检查某个资源是否启用AnyResourceForGroupEnabled(group)- 检查某个组是否有任何资源启用
restOptionsGetter(RESTOptionsGetter 接口)- 作用:提供存储配置(如何连接 etcd)
- 方法:
GetRESTOptions(resource, example)- 获取资源的存储选项
输出结果:
APIGroupInfo结构体,包含:PrioritizedVersions- API 组的版本优先级列表VersionedResourcesStorageMap- 核心!资源到 Storage 的映射,是否很熟悉?这个正是注入API 中使用的。结构:
map[version]map[resource]Storage例如:
{ "v1": { "deployments": deploymentStorage, "deployments/status": deploymentStatusStorage, "deployments/scale": deploymentScaleStorage, "statefulsets": statefulSetStorage, "daemonsets": daemonSetStorage, // ... } }
Scheme- 序列化/反序列化方案NegotiatedSerializer- 内容协商序列化器ParameterCodec- 查询参数编解码器
- 对于 core API 组,通过要点②处的
NewRESTStorage#
以 apps 为例,来看转换的过程:
// 代码: pkg\registry\apps\rest\storage_apps.go#L38-L50
// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
// 要点①
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
// 要点②
if storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter); err != nil {
return genericapiserver.APIGroupInfo{}, err
} else if len(storageMap) > 0 {
// 要点③
apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
}
return apiGroupInfo, nil
}创建默认的 APIGroupInfo
调用 v1Storage 来填充 storageMap.
将填充好的 storageMap,写入 VersionedResourcesStorageMap 中.
// 代码: pkg\registry\apps\rest\storage_apps.go#L52-L108
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
storage := map[string]rest.Storage{}
// deployments
if resource := "deployments"; apiResourceConfigSource.ResourceEnabled(appsapiv1.SchemeGroupVersion.WithResource(resource)) { // 要点①
deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter) // 要点①
if err != nil {
return storage, err
}
// 要点③
storage[resource] = deploymentStorage.Deployment
storage[resource+"/status"] = deploymentStorage.Status
storage[resource+"/scale"] = deploymentStorage.Scale
}
...
return storage, nil
}- 通过要点①处的
apiResourceConfigSource.ResourceEnabled检查资源是否启用,如果启用则创建 Storage; - 在要点②处使用 restOptionsGetter 创建 Deployment 的 Storage。
- 然后将 Storage 注册到 map 中【要点③】。
构建 core API RestStorageProviders#
在构建的 restStorageProviders 时,可以发现,其他 API 组的 API 构建十分简单,只需要进行声明即可:
apiserverinternalrest.StorageProvider{},而 core API 组的 restStorageProviders 的构建还有些麻烦:
// 代码: pkg\controlplane\instance.go#L387-L399
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: *c.ControlPlane.NewCoreGenericConfig(),
Proxy: corerest.ProxyConfig{
Transport: c.ControlPlane.Extra.ProxyTransport,
KubeletClientConfig: c.Extra.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: c.Extra.ServiceIPRange,
SecondaryClusterIPRange: c.Extra.SecondaryServiceIPRange,
NodePortRange: c.Extra.ServiceNodePortRange,
IPRepairInterval: c.Extra.RepairServicesInterval,
},
})核心 API 组需要特殊构建的原因:
管理集群基础设施(Service IP、NodePort 分配)
// 核心 API 需要管理 Service 的 ClusterIP 和 NodePort 分配 type legacyProvider struct { primaryServiceClusterIPAllocator ipallocator.Interface serviceClusterIPAllocators map[api.IPFamily]ipallocator.Interface serviceNodePortAllocator *portallocator.PortAllocator startServiceNodePortsRepair func(...) startServiceClusterIPRepair func(...) }需要与 Kubelet 交互(Pod logs、exec、port-forward)
// 核心 API 需要代理到 Kubelet 的请求(如 logs, exec, port-forward) Proxy: corerest.ProxyConfig{ Transport: c.ControlPlane.Extra.ProxyTransport, KubeletClientConfig: c.Extra.KubeletClientConfig, }需要修复控制器保证分配一致性
是集群的基础,需要更多的初始化和状态管理
- Pod、Service、Node、Endpoint 等是集群的基础
- 需要与底层基础设施(Kubelet、网络)交互
- 需要特殊的分配和修复逻辑
其他 API 组简单的原因:
- 只是应用层资源(Deployment、Job 等)
- 不需要特殊的分配器或控制器
- 无状态,只需要标准的 etcd 存储
1. 调用 corerest.New() 创建 Provider#
// 代码: pkg\controlplane\instance.go#L387-L399
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
// 1. 通用配置
GenericConfig: *c.ControlPlane.NewCoreGenericConfig(),
// 2. Proxy 配置(用于 Pod logs, exec, port-forward)
Proxy: corerest.ProxyConfig{
Transport: c.ControlPlane.Extra.ProxyTransport,
KubeletClientConfig: c.Extra.KubeletClientConfig,
},
// 3. Service 配置(IP 和 Port 分配)
Services: corerest.ServicesConfig{
ClusterIPRange: c.Extra.ServiceIPRange, // 10.96.0.0/12
SecondaryClusterIPRange: c.Extra.SecondaryServiceIPRange, // IPv6 范围
NodePortRange: c.Extra.ServiceNodePortRange, // 30000-32767
IPRepairInterval: c.Extra.RepairServicesInterval, // 修复间隔
},
})2. corerest.New() 内部实现#
// 代码: pkg\registry\core\rest\storage_core.go#L106-L149
func New(c Config) (*legacyProvider, error) {
// 2.1 创建 Service IP 和 Port 分配器
rangeRegistries, serviceClusterIPAllocator, serviceIPAllocators,
serviceNodePortAllocator, err := c.newServiceIPAllocators()
// 2.2 创建 legacyProvider 实例,保存分配器
p := &legacyProvider{
Config: c,
primaryServiceClusterIPAllocator: serviceClusterIPAllocator, // ClusterIP 分配器
serviceClusterIPAllocators: serviceIPAllocators, // 多 IP 族分配器
serviceNodePortAllocator: serviceNodePortAllocator, // NodePort 分配器
}
// 2.3 创建 Kubernetes 客户端
client, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
// 2.4 创建 NodePort 修复控制器
// 作用:定期检查并修复 NodePort 分配冲突
p.startServiceNodePortsRepair = portallocatorcontroller.NewRepair(
c.Services.IPRepairInterval,
client.CoreV1(),
client.EventsV1(),
c.Services.NodePortRange,
rangeRegistries.nodePort,
).RunUntil
// 2.5 创建 ClusterIP 修复控制器
// 作用:定期检查并修复 ClusterIP 分配冲突
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepair(
c.Services.IPRepairInterval,
client.CoreV1(),
client.EventsV1(),
&c.Services.ClusterIPRange,
rangeRegistries.clusterIP,
&c.Services.SecondaryClusterIPRange,
rangeRegistries.secondaryClusterIP,
).RunUntil
return p, nil
}3. legacyProvider.NewRESTStorage() 创建资源 Storage#
当后续调用 NewRESTStorage() 时,会调用 pkg\registry\core\rest\storage_core.go#L151 处的函数:
// 代码: pkg\registry\core\rest\storage_core.go#L151-L323
func (p *legacyProvider) NewRESTStorage(
apiResourceConfigSource serverstorage.APIResourceConfigSource,
restOptionsGetter generic.RESTOptionsGetter,
) (genericapiserver.APIGroupInfo, error) {
// 3.1 先调用 GenericConfig.NewRESTStorage() 创建基础资源
// (Event, ResourceQuota, Secret, ConfigMap, Namespace 等)
apiGroupInfo, err := p.GenericConfig.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
// 3.2 创建需要特殊配置的资源 Storage
// Pod Storage - 需要 Kubelet 连接信息和代理
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo, // Kubelet 连接信息
p.Proxy.Transport, // HTTP 代理传输
podDisruptionClient,
)
// Service Storage - 需要 IP 和 Port 分配器
serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
restOptionsGetter,
p.primaryServiceClusterIPAllocator.IPFamily(),
p.serviceClusterIPAllocators, // ClusterIP 分配器
p.serviceNodePortAllocator, // NodePort 分配器
endpointsStorage,
podStorage.Pod,
p.Proxy.Transport,
)
// Node Storage - 需要 Kubelet 客户端配置
nodeStorage, err := nodestore.NewStorage(
restOptionsGetter,
p.Proxy.KubeletClientConfig, // Kubelet 配置
p.Proxy.Transport, // HTTP 传输
)
// 3.3 将所有资源注册到 storage map
storage := apiGroupInfo.VersionedResourcesStorageMap["v1"]
storage["pods"] = podStorage.Pod
storage["pods/status"] = podStorage.Status
storage["pods/log"] = podStorage.Log // 需要 Kubelet 代理
storage["pods/exec"] = podStorage.Exec // 需要 Kubelet 代理
storage["pods/portforward"] = podStorage.PortForward // 需要 Kubelet 代理
storage["services"] = serviceRESTStorage // 需要 IP 分配器
storage["services/status"] = serviceStatusStorage
storage["services/proxy"] = serviceRESTProxy
storage["nodes"] = nodeStorage.Node
storage["nodes/status"] = nodeStorage.Status
storage["nodes/proxy"] = nodeStorage.Proxy // 需要 Kubelet 代理
// ... 其他资源
return apiGroupInfo, nil
}至此,Kubernetes Main Server 的构建流程就梳理完毕了。

