国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Kubernetes1.5源碼分析(四) apiServer資源的etcd接口實現

K_B_Z / 2692人閱讀

摘要:為所有對外提供服務的資源實現了一套通用的符合要求的操作接口,每個服務接口負責處理一類資源對象。該接口最終返回了的和清除操作資源的接口。

源碼版本

Kubernetes v1.5.0

簡介

k8s的各個組件與apiServer交互操作各種資源對象,最終都會落入到etcd中。
k8s為所有對外提供服務的Restful資源實現了一套通用的符合Restful要求的etcd操作接口,每個服務接口負責處理一類(Kind)資源對象。
這些資源對象包括pods、bindings、podTemplates、RC、Services等。

Storage創建

要了解etcd操作接口的實現,我們先需要了解下Master.GenericAPIServer.storage結構:

storage map[string]rest.Storage

該storage變量是個map,Key是REST API的path,Value是rest.Storage接口,該接口就是一個通用的符合Restful要求的資源存儲接口。
核心組資源列表的創建要查看pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage()接口:
接口調用流程: main --> App.Run --> config.Complete().New() --> m.InstallLegacyAPI() --> NewLegacyRESTStorage()

func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    ....
    // 創建podStorage
    podStorage := podetcd.NewStorage(
        restOptionsGetter(api.Resource("pods")),
        nodeStorage.KubeletConnectionInfo,
        c.ProxyTransport,
        podDisruptionClient,
    )

    ...
    // 資源列表
    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest.Service,
        "services/proxy":  serviceRest.Proxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        ...

        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    }
    if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}) {
        restStorageMap["pods/eviction"] = podStorage.Eviction
    }
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap

    return restStorage, apiGroupInfo, nil
}

該接口在ApiServer源碼分析的第二章介紹資源注冊的時候已經講過,這里我們主要分析后端存儲etcd操作接口的實現。
我們以Pod資源為例,進行介紹:
路徑: pkg/registry/core/pod/etcd/etcd.go

func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
    // 完成prefix
    prefix := "/" + opts.ResourcePrefix

    newListFunc := func() runtime.Object { return &api.PodList{} }
    // 調用接口裝飾器,返回該storage的etcd操作接口及資源delete接口
    // 該opts傳參進來的,需要到上一層查看master.go下的restOptionsFactory.NewFor
    storageInterface, dFunc := opts.Decorator(
        opts.StorageConfig,
        // 這一下的參數都是用于開啟cache時的接口使用
        cachesize.GetWatchCacheSizeByResource(cachesize.Pods),
        &api.Pod{},
        prefix,
        pod.Strategy,
        newListFunc,
        pod.NodeNameTriggerFunc,
    )

    // 創建Store對象
    store := ®istry.Store{
        NewFunc:     func() runtime.Object { return &api.Pod{} },
        NewListFunc: newListFunc,
        KeyRootFunc: func(ctx api.Context) string {
            return registry.NamespaceKeyRootFunc(ctx, prefix)
        },
        KeyFunc: func(ctx api.Context, name string) (string, error) {
            return registry.NamespaceKeyFunc(ctx, prefix, name)
        },
        ObjectNameFunc: func(obj runtime.Object) (string, error) {
            return obj.(*api.Pod).Name, nil
        },
        PredicateFunc:           pod.MatchPod,
        QualifiedResource:       api.Resource("pods"),
        EnableGarbageCollection: opts.EnableGarbageCollection,
        DeleteCollectionWorkers: opts.DeleteCollectionWorkers,

        CreateStrategy:      pod.Strategy,
        UpdateStrategy:      pod.Strategy,
        DeleteStrategy:      pod.Strategy,
        ReturnDeletedObject: true,

        Storage:     storageInterface,
        DestroyFunc: dFunc,
    }

    statusStore := *store
    statusStore.UpdateStrategy = pod.StatusStrategy

    return PodStorage{
        Pod:         &REST{store, proxyTransport},
        Binding:     &BindingREST{store: store},
        Eviction:    newEvictionStorage(store, podDisruptionBudgetClient),
        Status:      &StatusREST{store: &statusStore},
        Log:         &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:       &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:        &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:      &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }
}

該接口中調用了opts.Decorator()接口返回了關鍵的storage interface及清除操作資源的接口。
要看該接口的實現,我們得先從opts的創建開始。
restOptionsGetter(api.Resource("pods"))該步完成了opts的創建,api.Resource("pods")其實就是拼接了一個GroupResource的結構,我們需要從頭開始介紹restOptionsGetter接口的由來。
路徑:pkg/master/master.go

func (c completedConfig) New() (*Master, error) {
...
    restOptionsFactory := restOptionsFactory{
        deleteCollectionWorkers: c.DeleteCollectionWorkers,
        enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
        storageFactory:          c.StorageFactory,
    }

    // 判斷是否使能了用于Watch的Cache
    // 有無cache賦值的是不同的接口實現
    // restOptionsFactory.storageDecorator:是一個各個資源的REST interface(CRUD)裝飾者
    // 后面調用NewStorage()時會用到該接口,并輸出對應的CRUD接口及銷毀接口。
    // 可以參考pkg/registry/core/pod/etcd/etcd.go中的NewStorage()
    // 其實這里有無cache的接口差異就在于:有cache的話,就提供操作cache的接口;無cache的話,就提供直接操作etcd的接口
    if c.EnableWatchCache {
        restOptionsFactory.storageDecorator = registry.StorageWithCacher
    } else {
        restOptionsFactory.storageDecorator = generic.UndecoratedStorage
    }
    // install legacy rest storage
    if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:       c.StorageFactory,
            ProxyTransport:       c.ProxyTransport,
            KubeletClientConfig:  c.KubeletClientConfig,
            EventTTL:             c.EventTTL,
            ServiceIPRange:       c.ServiceIPRange,
            ServiceNodePortRange: c.ServiceNodePortRange,
            LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
        }
        m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
    }
...
}

該接口初始化了一個restOptionsFactory變量,里面指定了最大的刪除回收資源的協程數,是否使能GC和storageFactory,還根據是否使能了WatchCache來完成NewStorage()接口中調用的裝飾器接口的賦值。
restOptionsFactory.NewForj接口一直被往下傳,直到NewLegacyRESTStorage()接口中被調用然后創建了opts,我們看下該接口實現:
路徑: pkg/master/master.go

type restOptionsFactory struct {
    deleteCollectionWorkers int
    enableGarbageCollection bool
    storageFactory          genericapiserver.StorageFactory
    storageDecorator        generic.StorageDecorator
}

func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.RESTOptions {
    // 創建該資源的Storage Config
    storageConfig, err := f.storageFactory.NewConfig(resource)
    if err != nil {
        glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error())
    }
    // 最終返回的就是RESTOptions, 就是前面的opts的類型
    // 需要關注f.storageDecorator的由來
    return generic.RESTOptions{
        // 用于生成Storage的config
        StorageConfig:           storageConfig,
        Decorator:               f.storageDecorator,
        DeleteCollectionWorkers: f.deleteCollectionWorkers,
        EnableGarbageCollection: f.enableGarbageCollection,
        ResourcePrefix:          f.storageFactory.ResourcePrefix(resource),
    }
}

該接口比較簡單,初始化了一個generic.RESTOptions變量,即opts。我們需要找出opts.Decorator的由來,就只需要看下上一個接口判斷EnableWatchCache時就明白了。
opts.Decorator該接口最終返回了storage的interface和清除操作資源的接口。可以想一下帶緩沖和不帶緩沖的接口實現肯定不一致,所以這里需要進行區分:

registry.StorageWithCacher:該接口是返回了操作cache的接口,和清除cache的操作接口

generic.UndecoratedStorage: 該接口會根據你配置的后端類型(etcd2/etcd3等),來返回不同的etcd操作接口,其實是為所有的資源對象創建了etcd的鏈接,然后通過該鏈接發送不同的命令,最后還返回了斷開該鏈接的接口。

所以實現完全不一樣,一個操作cache,一個操作實際的etcd。

先看registry.StorageWithCacher()接口實現:
路徑: pkg/registry/generic/registry/storage_factory.go

func StorageWithCacher(
    storageConfig *storagebackend.Config,
    capacity int,
    objectType runtime.Object,
    resourcePrefix string,
    scopeStrategy rest.NamespaceScopedStrategy,
    newListFunc func() runtime.Object,
    triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
    
    // storageConfig是后端存儲的config,定義了存儲類型,存儲服務器List,TLS證書信息,Cache大小等。
    // 該接口就是generic.UndecoratedStorage()接口的實現,StorageWithCacher()接口就是多了下面的cacher操作
    s, d := generic.NewRawStorage(storageConfig)
    // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
    // Currently it has two layers of same storage interface -- cacher and low level kv.
    cacherConfig := storage.CacherConfig{
        CacheCapacity:        capacity,
        Storage:              s,
        Versioner:            etcdstorage.APIObjectVersioner{},
        Type:                 objectType,
        ResourcePrefix:       resourcePrefix,
        NewListFunc:          newListFunc,
        TriggerPublisherFunc: triggerFunc,
        Codec:                storageConfig.Codec,
    }
    // 根據是否有namespace來進行區分賦值
    // KeyFunc函數用于獲取該object的Key: 
    // 有namespace的話,key的格式:prefix + "/" + Namespace + "/" + name
    // 無namespace的話,key的格式:prefix + "/" + name
    if scopeStrategy.NamespaceScoped() {
        cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
            return storage.NamespaceKeyFunc(resourcePrefix, obj)
        }
    } else {
        cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) {
            return storage.NoNamespaceKeyFunc(resourcePrefix, obj)
        }
    }
    // 根據之前初始化的Cacher的config,進行cacher創建
    // 比較關鍵,后面進行介紹
    cacher := storage.NewCacherFromConfig(cacherConfig)
    destroyFunc := func() {
        cacher.Stop()
        d()
    }

    return cacher, destroyFunc
}

先調用NewRawStorage()接口創建了一個存儲后端,我們先看下這個接口實現:
路徑: pkg/registry/generic/storage_decorator.go

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
    s, d, err := factory.Create(*config)
    if err != nil {
        glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
    }
    return s, d
}

沒啥好說的,繼續看Create():
路徑: pkg/storage/storagebackend/factory/factory.go

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    // 判斷下存儲類型:etcd2 、etcd3
    switch c.Type {
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2:
        return newETCD2Storage(c)
    case storagebackend.StorageTypeETCD3:
        // TODO: We have the following features to implement:
        // - Support secure connection by using key, cert, and CA files.
        // - Honor "https" scheme to support secure connection in gRPC.
        // - Support non-quorum read.
        return newETCD3Storage(c)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}

挑個etcd2看下實現:
路徑: pkg/storage/storagebackend/factory/etcd2.go

func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    // 根據配置的TLS證書信息創建http.Transport
    tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
    if err != nil {
        return nil, nil, err
    }
    // 創建etcd2 client,返回的是httpClusterClient結構
    client, err := newETCD2Client(tr, c.ServerList)
    if err != nil {
        return nil, nil, err
    }
    // 根據入參初始化一個實現了storage.Interface接口的etcdHelper變量
    s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize)
    // 返回etcdHelper變量,及關閉鏈接的函數
    return s, tr.CloseIdleConnections, nil
}

前兩步都是為了創建與etcd連接的client,后一步比較關鍵:
路徑: pkg/storage/etcd/etcd_helper.go

func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
    return &etcdHelper{
        // 創建一個httpMembersAPI變量,附帶很多方法
        etcdMembersAPI: etcd.NewMembersAPI(client),
        // 創建一個httpKeysAPI變量,同樣附帶各類方法
        etcdKeysAPI:    etcd.NewKeysAPI(client),
        // 編解碼使用
        codec:          codec,
        versioner:      APIObjectVersioner{},
        // 用于序列化反序列化,版本間轉換,兼容等
        copier:         api.Scheme,
        pathPrefix:     path.Join("/", prefix),
        quorum:         quorum,
        // 創建cache結構
        cache:          utilcache.NewCache(cacheSize),
    }
}

該接口很簡單的初始化,需要關注的是etcdHelper附帶的通用的RESTFul 方法:

可以看到storage.Interface接口所需要的方法都實現了。

繼續回到StorageWithCacher()接口,在往下走就是CacherConfig的初始化,就不介紹了,直接進入cacher的創建接口:
路徑: pkg/storage/cacher.go

func NewCacherFromConfig(config CacherConfig) *Cacher {
    watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
    listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

    // Give this error when it is constructed rather than when you get the
    // first watch item, because it"s much easier to track down that way.
    if obj, ok := config.Type.(runtime.Object); ok {
        if err := runtime.CheckCodec(config.Codec, obj); err != nil {
            panic("storage codec doesn"t seem to match given type: " + err.Error())
        }
    }

    cacher := &Cacher{
        ready:       newReady(),
        storage:     config.Storage,
        objectType:  reflect.TypeOf(config.Type),
        watchCache:  watchCache,
        reflector:   cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
        versioner:   config.Versioner,
        triggerFunc: config.TriggerPublisherFunc,
        watcherIdx:  0,
        watchers: indexedWatchers{
            allWatchers:   make(map[int]*cacheWatcher),
            valueWatchers: make(map[string]watchersMap),
        },
        // TODO: Figure out the correct value for the buffer size.
        incoming: make(chan watchCacheEvent, 100),
        // We need to (potentially) stop both:
        // - wait.Until go-routine
        // - reflector.ListAndWatch
        // and there are no guarantees on the order that they will stop.
        // So we will be simply closing the channel, and synchronizing on the WaitGroup.
        stopCh: make(chan struct{}),
    }
    watchCache.SetOnEvent(cacher.processEvent)
    go cacher.dispatchEvents()

    stopCh := cacher.stopCh
    cacher.stopWg.Add(1)
    go func() {
        defer cacher.stopWg.Done()
        wait.Until(
            func() {
                if !cacher.isStopped() {
                    cacher.startCaching(stopCh)
                }
            }, time.Second, stopCh,
        )
    }()
    return cacher
}

該接口主要用于開啟cacher,而該cache只用于WATCH和LIST的request。
我們在看下Cacher結構體:

該接口必然也實現了storage.Interface接口所需要的方法。
因為該Cacher只用于WATCH和LIST的request,所以你可以看下cacher提供的API,除了WATCH和LIST相關的之外的接口都是調用了之前創建的storage的API。
查看下cacher.Create和Delete:

func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
    return c.storage.Create(ctx, key, obj, out, ttl)
}

func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
    return c.storage.Delete(ctx, key, out, preconditions)
}

到這里registry.StorageWithCacher()接口就結束了,我們繼續回到前面講的另外一個接口generic.UndecoratedStorage():
路徑:pkg/registry/generic/storage_decorator.go

func UndecoratedStorage(
    config *storagebackend.Config,
    capacity int,
    objectType runtime.Object,
    resourcePrefix string,
    scopeStrategy rest.NamespaceScopedStrategy,
    newListFunc func() runtime.Object,
    trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
    return NewRawStorage(config)
}

發現registry.StorageWithCacher()接口也是調用了NewRawStorage()接口,其實現就少了cache。

這里接觸到了cache,下節會專門介紹該cache實現。

用戶配置

--watch-cache: 該apiServer的參數默認就是true的,用于打開watch cache

--watch-cache-sizes: 既然有enable cache,那就少不了cache sizes,而且該size可以指定各類資源所使用的cache size。格式: resource#size

--storage-backend: 后端持久化存儲類型,可選項為etcd2(默認)、etcd3

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/32554.html

相關文章

  • Kubernetes1.5源碼分析(一) apiServer啟動分析

    摘要:源碼版本簡介是最重要的組成部分,不論是命令操作還是通過進行控制,實際都需要經過。僅用于長時間執行的請求最小請求處理超時時間,默認僅用于該文件內設置鑒權機構一組用于運行時的配置信息。在最后會啟動服務。 源碼版本 Kubernetes v1.5.0 簡介 apiserver是K8S最重要的組成部分,不論是命令操作還是通過remote API進行控制,實際都需要經過apiserver。api...

    stormgens 評論0 收藏0
  • Kubernetes1.5源碼分析(二) apiServer資源注冊

    摘要:我們先將上面的接口解析放放,先看下是如何初始化的路徑定義了,再看路徑定義空的創建,用于不同版本對象轉換增加一些轉換函數上面就創建了一個空的。其實就是向添加了轉換函數,比如將轉換為,將轉換為。 源碼版本 Kubernetes v1.5.0 簡介 k8s里面有各種資源,如Pod、Service、RC、namespaces等資源,用戶操作的其實也就是這一大堆資源。但這些資源并不是雜亂無章的,...

    imccl 評論0 收藏0
  • Kubernetes1.5源碼分析(三) apiServer之go-restful使用

    摘要:它包括一組和一個對象,使用進行請求派發。流程基本就是這樣,接著我們直接進入接口看實現拼裝然后填充并返回一個對象創建一個這個是關鍵,會對各種進行注冊增加一個的將該加入到前兩個調用函數比較簡單,這里不進行介紹了。 源碼版本 Kubernetes v1.5.0 go-restful 簡介 go-restful是用于構建REST-style web服務的golang包。它是出現時因為一個jav...

    Doyle 評論0 收藏0
  • k8s :kube-apiserver RESTful API 實現 - Storage

    摘要:前言了解的同學都知道,對外提供接口提供查詢,監聽集群資源狀態的服務,主要就做一件事,就是如何將接口調用映射到對后端存儲比如的增刪改查訪問,在設計的時候考慮到是個快速迭代的開源項目,很多接口版本可能在未來版本發生變化,因此如何設計一個擴展 前言 了解 k8s 的同學都知道,kube-apiserver 對外提供 RESTful API 接口提供 查詢,監聽集群(資源)狀態的服務,kube...

    frank_fun 評論0 收藏0
  • k8s :kube-apiserver RESTful API 實現 - Storage

    摘要:前言了解的同學都知道,對外提供接口提供查詢,監聽集群資源狀態的服務,主要就做一件事,就是如何將接口調用映射到對后端存儲比如的增刪改查訪問,在設計的時候考慮到是個快速迭代的開源項目,很多接口版本可能在未來版本發生變化,因此如何設計一個擴展 前言 了解 k8s 的同學都知道,kube-apiserver 對外提供 RESTful API 接口提供 查詢,監聽集群(資源)狀態的服務,kube...

    ChristmasBoy 評論0 收藏0

發表評論

0條評論

K_B_Z

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<