资讯专栏INFORMATION COLUMN

Kubernetes1.5源码分析(三) apiServer之go-restful的使用

Doyle / 3479人阅读

摘要:它包括一组和一个对象,使用进行请求派发。流程基本就是这样,接着我们直接进入接口看实现拼装然后填充并返回一个对象创建一个这个是关键,会对各种进行注册增加一个的将该加入到前两个调用函数比较简单,这里不进行介绍了。

源码版本

Kubernetes v1.5.0

go-restful 简介

go-restful是用于构建REST-style web服务的golang包。
它是出现时因为一个javaer在golang中没找到顺手的REST-based服务构建包,所以就按照他在java里常用的JAX-RS的设计,在golang中造了一个轮子。

关键组件

1.Route:
路由包含两种,一种是标准JSR311接口规范的实现RouterJSR311,一种是快速路由CurlyRouter。
CurlyRouter支持正则表达式和动态参数,相比RouterJSR11更加轻量级,apiserver中使用的就是这种路由。
一种Route的设定包含:请求方法(http Method),请求路径(URL Path),输入输出类型(JSON/YAML)以及对应的回掉函数restful.RouteFunction,响应内容类型(Accept)等。

2.WebService:
WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。
需要注意的是,WebService必须加入到Container中才能生效。

</>复制代码

  1. func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
  2. installer := g.newInstaller()
  3. ws := installer.NewWebService()
  4. 。。。
  5. container.Add(ws)
  6. return utilerrors.NewAggregate(registrationErrors)
  7. }

上面是k8s的REST注册接口,也调用了Container.Add(ws),才能让这个ws生效。

3.Container:
Container逻辑上是WebService的集合,功能上可以实现多终端的效果。
它包括一组restful.WebService和一个http.ServeMux对象,使用RouteSelector进行请求派发。
例如,下面代码中创建了两个Container,分别在不同的port上提供服务。
该代码是go-restful的example:

</>复制代码

  1. package main
  2. import (
  3. "github.com/emicklei/go-restful"
  4. "io"
  5. "log"
  6. "net/http"
  7. )
  8. func main() {
  9. ws := new(restful.WebService)
  10. ws.Route(ws.GET("/hello").To(hello))
  11. // ws被添加到默认的container restful.DefaultContainer中
  12. restful.Add(ws)
  13. go func() {
  14. // restful.DefaultContainer监听在端口8080上
  15. http.ListenAndServe(":8080", nil)
  16. }()
  17. container2 := restful.NewContainer()
  18. ws2 := new(restful.WebService)
  19. ws2.Route(ws2.GET("/hello").To(hello2))
  20. // ws2被添加到container2中
  21. container2.Add(ws2)
  22. // container2中监听端口8081
  23. server := &http.Server{Addr: ":8081", Handler: container2}
  24. log.Fatal(server.ListenAndServe())
  25. }
  26. func hello(req *restful.Request, resp *restful.Response) {
  27. io.WriteString(resp, "default world")
  28. }
  29. func hello2(req *restful.Request, resp *restful.Response) {
  30. io.WriteString(resp, "second world")
  31. }

4.Filter:
Filter用于动态的拦截请求和响应,类似于放置在相应组件前的钩子,在相应组件功能运行前捕获请求或者响应,主要用于记录log,验证,重定向等功能。
go-restful中有三种类型的Filter:

Container Filter:
运行在Container中所有的WebService执行之前。

</>复制代码

  1. // install a (global) filter for the default container (processed before any webservice)
  2. restful.Filter(globalLogging)

WebService Filter:
运行在WebService中所有的Route执行之前。

</>复制代码

  1. // install a webservice filter (processed before any route)
  2. ws.Filter(webserviceLogging).Filter(measureTime)

Route Filter:
运行在调用Route绑定的方法之前。

</>复制代码

  1. // install 2 chained route filters (processed before calling findUser)
  2. ws.Route(ws.GET("/{user-id}").Filter(routeLogging).Filter(NewCountFilter().routeCounter).To(findUser))

示例

拿用官方提供的例子:

</>复制代码

  1. package main
  2. import (
  3. "github.com/emicklei/go-restful"
  4. "log"
  5. "net/http"
  6. )
  7. type User struct {
  8. Id, Name string
  9. }
  10. type UserResource struct {
  11. // normally one would use DAO (data access object)
  12. users map[string]User
  13. }
  14. func (u UserResource) Register(container *restful.Container) {
  15. // 创建新的WebService
  16. ws := new(restful.WebService)
  17. // 设定WebService对应的路径("/users")和支持的MIME类型(restful.MIME_XML/ restful.MIME_JSON)
  18. ws.
  19. Path("/users").
  20. Consumes(restful.MIME_XML, restful.MIME_JSON).
  21. Produces(restful.MIME_JSON, restful.MIME_XML) // you can specify this per route as well
  22. // 添加路由: GET /{user-id} --> u.findUser
  23. ws.Route(ws.GET("/{user-id}").To(u.findUser))
  24. // 添加路由: POST / --> u.updateUser
  25. ws.Route(ws.POST("").To(u.updateUser))
  26. // 添加路由: PUT /{user-id} --> u.createUser
  27. ws.Route(ws.PUT("/{user-id}").To(u.createUser))
  28. // 添加路由: DELETE /{user-id} --> u.removeUser
  29. ws.Route(ws.DELETE("/{user-id}").To(u.removeUser))
  30. // 将初始化好的WebService添加到Container中
  31. container.Add(ws)
  32. }
  33. // GET http://localhost:8080/users/1
  34. //
  35. func (u UserResource) findUser(request *restful.Request, response *restful.Response) {
  36. id := request.PathParameter("user-id")
  37. usr := u.users[id]
  38. if len(usr.Id) == 0 {
  39. response.AddHeader("Content-Type", "text/plain")
  40. response.WriteErrorString(http.StatusNotFound, "User could not be found.")
  41. } else {
  42. response.WriteEntity(usr)
  43. }
  44. }
  45. // POST http://localhost:8080/users
  46. // 1Melissa Raspberry
  47. //
  48. func (u *UserResource) updateUser(request *restful.Request, response *restful.Response) {
  49. usr := new(User)
  50. err := request.ReadEntity(&usr)
  51. if err == nil {
  52. u.users[usr.Id] = *usr
  53. response.WriteEntity(usr)
  54. } else {
  55. response.AddHeader("Content-Type", "text/plain")
  56. response.WriteErrorString(http.StatusInternalServerError, err.Error())
  57. }
  58. }
  59. // PUT http://localhost:8080/users/1
  60. // 1Melissa
  61. //
  62. func (u *UserResource) createUser(request *restful.Request, response *restful.Response) {
  63. usr := User{Id: request.PathParameter("user-id")}
  64. err := request.ReadEntity(&usr)
  65. if err == nil {
  66. u.users[usr.Id] = usr
  67. response.WriteHeader(http.StatusCreated)
  68. response.WriteEntity(usr)
  69. } else {
  70. response.AddHeader("Content-Type", "text/plain")
  71. response.WriteErrorString(http.StatusInternalServerError, err.Error())
  72. }
  73. }
  74. // DELETE http://localhost:8080/users/1
  75. //
  76. func (u *UserResource) removeUser(request *restful.Request, response *restful.Response) {
  77. id := request.PathParameter("user-id")
  78. delete(u.users, id)
  79. }
  80. func main() {
  81. // 创建一个空的Container
  82. wsContainer := restful.NewContainer()
  83. // 设定路由为CurlyRouter(快速路由)
  84. wsContainer.Router(restful.CurlyRouter{})
  85. // 创建自定义的Resource Handle(此处为UserResource)
  86. u := UserResource{map[string]User{}}
  87. // 创建WebService,并将WebService加入到Container中
  88. u.Register(wsContainer)
  89. log.Printf("start listening on localhost:8080")
  90. server := &http.Server{Addr: ":8080", Handler: wsContainer}
  91. // 启动服务
  92. log.Fatal(server.ListenAndServe())
  93. }

上面的示例代码构建RESTful服务,分为几个步骤,apiServer也是类似:
1.创建Container
2.配置Container属性:ServeMux/Router type等
3.创建自定义的Resource Handle,实现Resource相关的处理方式。
4.创建对应Resource的WebService,在WebService中添加响应Route,并将WebService加入到Container中。
5.启动监听服务。

apiServer go-restful使用 Container初始化

apiServer的Container相关的结构是APIContainer。
路径:pkg/genericapiserver/mux/container.go

</>复制代码

  1. type APIContainer struct {
  2. *restful.Container
  3. NonSwaggerRoutes PathRecorderMux
  4. SecretRoutes Mux
  5. }

而该结构是在GenericAPIServer中被使用,分析过apiServer的启动过程的话,应该对该结构比较熟悉。

</>复制代码

  1. type GenericAPIServer struct {
  2. discoveryAddresses DiscoveryAddresses
  3. LoopbackClientConfig *restclient.Config
  4. minRequestTimeout time.Duration
  5. ...
  6. requestContextMapper api.RequestContextMapper
  7. // 这里使用到了restful.Container
  8. HandlerContainer *genericmux.APIContainer
  9. SecureServingInfo *SecureServingInfo
  10. InsecureServingInfo *ServingInfo
  11. effectiveSecurePort, effectiveInsecurePort int
  12. ExternalAddress string
  13. storage map[string]rest.Storage
  14. Serializer runtime.NegotiatedSerializer
  15. Handler http.Handler
  16. InsecureHandler http.Handler
  17. apiGroupsForDiscoveryLock sync.RWMutex
  18. apiGroupsForDiscovery map[string]unversioned.APIGroup
  19. ...
  20. }

而该结构的初始化是在master的初始化过程中进行的。
调用过程: main --> App.Run --> master.Complete.New --> c.Config.GenericConfig.SkipComplete().New()
路径: pkg/genericapiserver/config.go

</>复制代码

  1. func (c completedConfig) New() (*GenericAPIServer, error) {
  2. if c.Serializer == nil {
  3. return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
  4. }
  5. s := &GenericAPIServer{
  6. discoveryAddresses: c.DiscoveryAddresses,
  7. LoopbackClientConfig: c.LoopbackClientConfig,
  8. legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
  9. admissionControl: c.AdmissionControl,
  10. requestContextMapper: c.RequestContextMapper,
  11. Serializer: c.Serializer,
  12. minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
  13. enableSwaggerSupport: c.EnableSwaggerSupport,
  14. SecureServingInfo: c.SecureServingInfo,
  15. InsecureServingInfo: c.InsecureServingInfo,
  16. ExternalAddress: c.ExternalAddress,
  17. apiGroupsForDiscovery: map[string]unversioned.APIGroup{},
  18. enableOpenAPISupport: c.EnableOpenAPISupport,
  19. openAPIConfig: c.OpenAPIConfig,
  20. postStartHooks: map[string]postStartHookEntry{},
  21. }
  22. // 这里进行了Contianer的初始化
  23. s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer)
  24. // 添加了DynamicApisDiscovery的
  25. s.installAPI(c.Config)
  26. s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config)
  27. return s, nil
  28. }

继续调用mux.NewAPIContainer()接口创建,该接口的两个参数:新建了一个http的ServeMux; 另一个是实现了编解码序列化反序列化的对象

</>复制代码

  1. func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *APIContainer {
  2. c := APIContainer{
  3. // New一个Container
  4. Container: restful.NewContainer(),
  5. NonSwaggerRoutes: PathRecorderMux{
  6. mux: mux,
  7. },
  8. SecretRoutes: mux,
  9. }
  10. // 配置http.ServeMux
  11. c.Container.ServeMux = mux
  12. // 配置该Container的路由方式:CurlyRouter 即快速路由
  13. c.Container.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
  14. // 配置panic产生之后的恢复处理函数
  15. apiserver.InstallRecoverHandler(s, c.Container)
  16. apiserver.InstallServiceErrorHandler(s, c.Container)
  17. return &c
  18. }

看下apiserver.InstallRecoverHandler()实现:

</>复制代码

  1. func InstallRecoverHandler(s runtime.NegotiatedSerializer, container *restful.Container) {
  2. container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
  3. logStackOnRecover(s, panicReason, httpWriter)
  4. })
  5. }
  6. // RecoverHandler changes the default function (logStackOnRecover) to be called
  7. // when a panic is detected. DoNotRecover must be have its default value (=false).
  8. func (c *Container) RecoverHandler(handler RecoverHandleFunction) {
  9. c.recoverHandleFunc = handler
  10. }

根据英文注释可以看明白,该RecoverHandler就是在产生panic后会调用的恢复处理函数,默认的调用函数是logStackOnRecover,调用Container.RecoverHandler()后会修改该默认函数,并且Container.DoNotRecover的bool值必须是false才能生效。
apiserver.InstallServiceErrorHandler()接口就不看了,其实就是修改Service Error产生后的错误处理函数,默认是调用writeServiceError()。

到这里Container的初始化基本OK了。

添加WebService

Container已创建并且也进行了初始化。该轮到WebService了,这节会介绍k8s的WebService的创建及添加。
接续上文的Container初始化入口,继续往下看s.installAPI(c.Config):

</>复制代码

  1. func (s *GenericAPIServer) installAPI(c *Config) {
  2. // 这里原本还有很多routes.Install()函数
  3. // 这些install()貌似和mux有关。
  4. // 而mux就是一个http的多分器,用于派发某个Request路径到对应的http.Handler进行处理
  5. 。。。
  6. // 往HandlerContainer中的Container里添加WebService
  7. // 该WebService的创建在s.DynamicApisDiscovery()中进行
  8. // 实际上创建的WebService是用于list 该group下的所有versions
  9. s.HandlerContainer.Add(s.DynamicApisDiscovery())
  10. }

先看下WebService的创建接口s.DynamicApisDiscovery():
路径:pkg/genericapiserver/genericapiserver.go

</>复制代码

  1. // DynamicApisDiscovery returns a webservice serving api group discovery.
  2. // Note: during the server runtime apiGroupsForDiscovery might change.
  3. func (s *GenericAPIServer) DynamicApisDiscovery() *restful.WebService {
  4. return apiserver.NewApisWebService(s.Serializer, APIGroupPrefix, func(req *restful.Request) []unversioned.APIGroup {
  5. // 需要加锁
  6. // 接口注释也有说明。因为k8s可以动态加载第三方apiGroups
  7. s.apiGroupsForDiscoveryLock.RLock()
  8. defer s.apiGroupsForDiscoveryLock.RUnlock()
  9. // 将apiGroupsForDiscovery中所有的APIGroup按照其名字进行升序排序
  10. sortedGroups := []unversioned.APIGroup{}
  11. groupNames := make([]string, 0, len(s.apiGroupsForDiscovery))
  12. for groupName := range s.apiGroupsForDiscovery {
  13. groupNames = append(groupNames, groupName)
  14. }
  15. sort.Strings(groupNames)
  16. for _, groupName := range groupNames {
  17. sortedGroups = append(sortedGroups, s.apiGroupsForDiscovery[groupName])
  18. }
  19. // 创建切片,并填充各个APIGroup的ServerAddressByClientCIDRs信息
  20. clientIP := utilnet.GetClientIP(req.Request)
  21. serverCIDR := s.discoveryAddresses.ServerAddressByClientCIDRs(clientIP)
  22. groups := make([]unversioned.APIGroup, len(sortedGroups))
  23. for i := range sortedGroups {
  24. groups[i] = sortedGroups[i]
  25. groups[i].ServerAddressByClientCIDRs = serverCIDR
  26. }
  27. return groups
  28. })
  29. }

继续深入看apiserver.NewApisWebService(),该接口传入了编解码对象,APIGroup的Prefix,还有一个function。

</>复制代码

  1. func NewApisWebService(s runtime.NegotiatedSerializer, apiPrefix string, f func(req *restful.Request) []unversioned.APIGroup) *restful.WebService {
  2. // 用于向后兼容v1.1版本,返回一个空的APIGroup
  3. ss := StripVersionNegotiatedSerializer{s}
  4. // 获取支持的媒体类型,比如:application/json,application/yaml
  5. mediaTypes, _ := mediaTypesForSerializer(s)
  6. // 构建go-restful的Route处理方法
  7. rootAPIHandler := RootAPIHandler(ss, f)
  8. // 创建WebService
  9. ws := new(restful.WebService)
  10. // 添加Path
  11. ws.Path(apiPrefix)
  12. // API 说明
  13. ws.Doc("get available API versions")
  14. // 配置GET("/") 转到rootAPIHandler()接口
  15. ws.Route(ws.GET("/").To(rootAPIHandler).
  16. Doc("get available API versions").
  17. Operation("getAPIVersions").
  18. Produces(mediaTypes...).
  19. Consumes(mediaTypes...).
  20. Writes(unversioned.APIGroupList{}))
  21. return ws
  22. }

到这里list某个Group下所有的versions的API已经注册完成了。
这些都不是关键的RESTful API的注册,关键的注册都会在pkg/apiserver/apiserver.go中的InstallREST()接口中进行。
琢磨过apiServer启动流程的同学,应该会知道/api和/apis的注册接口最后都会调用到该接口。
/api的注册接口是pkg/genericapiserver/genericapiserver.go中的InstallLegacyAPIGroup()接口
/apis的注册接口是InstallAPIGroup()。
这两个接口都会调用s.installAPIResources(),最后再调用apiGroupVersion.InstallREST()进行API注册。
流程基本就是这样,接着我们直接进入InstallREST()接口看实现:

</>复制代码

  1. func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
  2. // 拼装path: "Prefix/Group/Version"
  3. // 然后填充并返回一个APIInstaller对象
  4. installer := g.newInstaller()
  5. // 创建一个WebService
  6. ws := installer.NewWebService()
  7. // 这个是关键,会对各种URL进行注册
  8. apiResources, registrationErrors := installer.Install(ws)
  9. lister := g.ResourceLister
  10. if lister == nil {
  11. lister = staticLister{apiResources}
  12. }
  13. // 增加一个list的API
  14. AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
  15. // 将该WebService加入到Container
  16. container.Add(ws)
  17. return utilerrors.NewAggregate(registrationErrors)
  18. }

前两个调用函数比较简单,这里不进行介绍了。直接进入关键函数installer.Install(ws):

</>复制代码

  1. func (a *APIInstaller) Install(ws *restful.WebService) (apiResources []unversioned.APIResource, errors []error) {
  2. errors = make([]error, 0)
  3. proxyHandler := (&ProxyHandler{
  4. prefix: a.prefix + "/proxy/",
  5. storage: a.group.Storage,
  6. serializer: a.group.Serializer,
  7. mapper: a.group.Context,
  8. })
  9. // 将所有的path合成一个切片,并按照升序重新排序
  10. paths := make([]string, len(a.group.Storage))
  11. var i int = 0
  12. for path := range a.group.Storage {
  13. paths[i] = path
  14. i++
  15. }
  16. sort.Strings(paths)
  17. for _, path := range paths {
  18. // 注册各个URL,关键接口
  19. // 传入的参数:path,rest.Storage,WebService,Handler
  20. apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws, proxyHandler)
  21. if err != nil {
  22. errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
  23. }
  24. // 将所有注册成功的Resource合成一个切片
  25. // 将该切片作为返回值,便于之后的接口注册list Resources的API
  26. if apiResource != nil {
  27. apiResources = append(apiResources, *apiResource)
  28. }
  29. }
  30. return apiResources, errors
  31. }

该接口先是遍历所有的path,并升序重新排列,然后循环调用接口注册各个URL的API,并将这些注册成功的APIResource加入到同一个切片中。
我们继续看a.registerResourceHandlers()接口:

</>复制代码

  1. func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, proxyHandler http.Handler) (*unversioned.APIResource, error) {
  2. ...
  3. // 构建creater、lister、deleter、updater、watcher等,其实就是storage
  4. creater, isCreater := storage.(rest.Creater)
  5. namedCreater, isNamedCreater := storage.(rest.NamedCreater)
  6. lister, isLister := storage.(rest.Lister)
  7. getter, isGetter := storage.(rest.Getter)
  8. getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
  9. deleter, isDeleter := storage.(rest.Deleter)
  10. gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
  11. collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
  12. updater, isUpdater := storage.(rest.Updater)
  13. patcher, isPatcher := storage.(rest.Patcher)
  14. watcher, isWatcher := storage.(rest.Watcher)
  15. _, isRedirector := storage.(rest.Redirector)
  16. connecter, isConnecter := storage.(rest.Connecter)
  17. storageMeta, isMetadata := storage.(rest.StorageMetadata)
  18. ...
  19. var apiResource unversioned.APIResource
  20. // k8s资源分为两类:无namespace的RESTScopeNameRoot; 有namespace的RESTScopeNameNamespace
  21. // 在对应的path上添加各类actions,并指定对应的handler。
  22. switch scope.Name() {
  23. case meta.RESTScopeNameRoot:
  24. // Handle non-namespace scoped resources like nodes.
  25. resourcePath := resource
  26. resourceParams := params
  27. itemPath := resourcePath + "/{name}"
  28. nameParams := append(params, nameParam)
  29. proxyParams := append(nameParams, pathParam)
  30. suffix := ""
  31. if hasSubresource {
  32. suffix = "/" + subresource
  33. itemPath = itemPath + suffix
  34. resourcePath = itemPath
  35. resourceParams = nameParams
  36. }
  37. apiResource.Name = path
  38. apiResource.Namespaced = false
  39. apiResource.Kind = resourceKind
  40. namer := rootScopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, resourcePath, "/"), suffix}
  41. // Handler for standard REST verbs (GET, PUT, POST and DELETE).
  42. // Add actions at the resource path: /api/apiVersion/resource
  43. actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
  44. actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
  45. actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
  46. // DEPRECATED
  47. actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
  48. // Add actions at the item path: /api/apiVersion/resource/{name}
  49. actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
  50. if getSubpath {
  51. actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
  52. }
  53. actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
  54. actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
  55. actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
  56. actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
  57. // We add "proxy" subresource to remove the need for the generic top level prefix proxy.
  58. // The generic top level prefix proxy is deprecated in v1.2, and will be removed in 1.3, or 1.4 at the latest.
  59. // TODO: DEPRECATED in v1.2.
  60. actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer, false}, isRedirector)
  61. // TODO: DEPRECATED in v1.2.
  62. actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer, false}, isRedirector)
  63. actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
  64. actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
  65. break
  66. case meta.RESTScopeNameNamespace:
  67. ...
  68. break
  69. default:
  70. return nil, fmt.Errorf("unsupported restscope: %s", scope.Name())
  71. }
  72. ...
  73. // 根据之前生成的actions,进行遍历
  74. // 然后在WebService中添加指定的route
  75. for _, action := range actions {
  76. versionedObject := storageMeta.ProducesObject(action.Verb)
  77. if versionedObject == nil {
  78. versionedObject = defaultVersionedObject
  79. }
  80. reqScope.Namer = action.Namer
  81. namespaced := ""
  82. if apiResource.Namespaced {
  83. namespaced = "Namespaced"
  84. }
  85. operationSuffix := ""
  86. if strings.HasSuffix(action.Path, "/{path:*}") {
  87. operationSuffix = operationSuffix + "WithPath"
  88. }
  89. if action.AllNamespaces {
  90. operationSuffix = operationSuffix + "ForAllNamespaces"
  91. namespaced = ""
  92. }
  93. // 判断action的动作类型
  94. // 生成响应的handler,创建route添加到WebService中
  95. switch action.Verb {
  96. case "GET": // Get a resource.
  97. var handler restful.RouteFunction
  98. // 判断是否有参数
  99. if isGetterWithOptions {
  100. handler = GetResourceWithOptions(getterWithOptions, reqScope)
  101. } else {
  102. handler = GetResource(getter, exporter, reqScope)
  103. }
  104. // 生成处理函数
  105. handler = metrics.InstrumentRouteFunc(action.Verb, resource, handler)
  106. doc := "read the specified " + kind
  107. if hasSubresource {
  108. doc = "read " + subresource + " of the specified " + kind
  109. }
  110. route := ws.GET(action.Path).To(handler).
  111. Doc(doc).
  112. Param(ws.QueryParameter("pretty", "If "true", then the output is pretty printed.")).
  113. Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
  114. Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
  115. Returns(http.StatusOK, "OK", versionedObject).
  116. Writes(versionedObject)
  117. if isGetterWithOptions {
  118. if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
  119. return nil, err
  120. }
  121. }
  122. if isExporter {
  123. if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
  124. return nil, err
  125. }
  126. }
  127. addParams(route, action.Params)
  128. ws.Route(route)
  129. case "LIST": // List all resources of a kind.
  130. ...
  131. case "PUT": // Update a resource.
  132. ...
  133. case "PATCH": // Partially update a resource
  134. ...
  135. case "POST": // Create a resource.
  136. ...
  137. case "DELETE": // Delete a resource.
  138. ...
  139. case "DELETECOLLECTION":
  140. ...
  141. case "WATCH": // Watch a resource.
  142. ...
  143. case "WATCHLIST": // Watch all resources of a kind.
  144. ...
  145. case "PROXY": // Proxy requests to a resource.
  146. ...
  147. case "CONNECT":
  148. ...
  149. }
  150. default:
  151. return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
  152. }
  153. // Note: update GetAttribs() when adding a custom handler.
  154. }
  155. return &apiResource, nil
  156. }

首先构建creater、lister、getter、deleter、updater、patcher、watcher,其实他们都是storage,只是对应着对etcd的不同操作。
然后针对所有的action,构建响应的handler。创建对应的route,最后把route添加到service里面。这样就完成了api的注册。

关键的REST API注册基本就这样结束了,除此之外还会有很多别的API的注册:
比如APIGroupVersion.InstallREST()接口中的AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister);
GenericAPIServer.InstallLegacyAPIGroup()接口中的apiserver.AddApiWebService()的调用;
等等。。
其实上面也注册了各种REST API,比如像PodList,ServiceList,ReplicationControllerList等。这些就不深入了,都是大同小异。

参考资料

1.go-restful example: http://ernestmicklei.com/2012...
2.go-restful api desgin: http://ernestmicklei.com/2012...
3.go-restful github code: https://github.com/emicklei/g...
4.go-restful GoDoc: https://godoc.org/github.com/...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/32555.html

相关文章

  • Kubernetes1.5源码分析(二) apiServer资源注册

    摘要:我们先将上面的接口解析放放,先看下是如何初始化的路径定义了,再看路径定义空的创建,用于不同版本对象转换增加一些转换函数上面就创建了一个空的。其实就是向添加了转换函数,比如将转换为,将转换为。 源码版本 Kubernetes v1.5.0 简介 k8s里面有各种资源,如Pod、Service、RC、namespaces等资源,用户操作的其实也就是这一大堆资源。但这些资源并不是杂乱无章的,...

    imccl 评论0 收藏0
  • Kubernetes1.5源码分析(一) apiServer启动分析

    摘要:源码版本简介是最重要的组成部分,不论是命令操作还是通过进行控制,实际都需要经过。仅用于长时间执行的请求最小请求处理超时时间,默认仅用于该文件内设置鉴权机构一组用于运行时的配置信息。在最后会启动服务。 源码版本 Kubernetes v1.5.0 简介 apiserver是K8S最重要的组成部分,不论是命令操作还是通过remote API进行控制,实际都需要经过apiserver。api...

    stormgens 评论0 收藏0
  • Kubernetes1.5源码分析(四) apiServer资源etcd接口实现

    摘要:为所有对外提供服务的资源实现了一套通用的符合要求的操作接口,每个服务接口负责处理一类资源对象。该接口最终返回了的和清除操作资源的接口。 源码版本 Kubernetes v1.5.0 简介 k8s的各个组件与apiServer交互操作各种资源对象,最终都会落入到etcd中。k8s为所有对外提供服务的Restful资源实现了一套通用的符合Restful要求的etcd操作接口,每个服务接口负...

    K_B_Z 评论0 收藏0
  • Kubernetes监控Heapster源码分析

    摘要:源码版本简介是下的一个监控项目,用于进行容器集群的监控和性能分析。基本的功能及概念介绍可以回顾我之前的一篇文章监控之介绍。在源码分析之前我们先介绍的实现流程,由上图可以看出会从各个上获取相关的监控信息,然后进行汇总发送给后台数据库。 源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控...

    gclove 评论0 收藏0
  • kubeadm源码分析(kubernetes离线安装包,步安装)

    摘要:离线安装包三步安装,简单到难以置信源码分析说句实在话,的代码写的真心一般,质量不是很高。然后给该租户绑定角色。 k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高。 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm 生成证书在/etc/kubernetes/pki目录下 kubeadm 生...

    Eirunye 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<