博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
docker-containerd 启动流程分析
阅读量:5329 次
发布时间:2019-06-14

本文共 6354 字,大约阅读时间需要 21 分钟。

一般在docker启动时,containerd的启动命令如下所示:

root      2090  0.0  0.1 292780 11008 ?        Ssl  10月22   0:12 docker-containerd -l unix:///var/run/docker/libcontainerd/docker-containerd.sock --shim docker-containerd-shim --metrics-interval=0 --start-timeout 2m --state-dir /var/run/docker/libcontainerd/containerd --runtime docker-runc

  

1、containerd/containerd/main.go

func daemon(context *cli.Context) error

(1)、首先调用:

sv, err := supervisor.New(  context.String("state-dir"),  context.String("runtime"),  context.String("shim"),  context.String("runtime-args"),  context.String("start-timeout"),  context.Int("retain-count"),)

  

(2)、for循环10次,调用w := supervisor.NewWorker(sv, wg),再go w.Start()

(3)、调用sv.Start(),启动supervisor

(4)、调用server, err := startServer(listenParts[0], listenParts[1], sv),启动grpc server

 

supervisor的数据结构定义如下所示:

// Supervisor represents a container supervisortype Supervisor struct {  // stateDir is the directory on the system to store container runtime state information  stateDir    string  // name of the OCI compatible runtime used to execute containers  runtime    string  runtimeArgs  []string  shim      string  containers   map[string]*containerInfo  startTasks   chan *startTask  // we need a lock around the subscribers map only because addtions and deletions from  // the map via the API so we cannot really control the currency  subscriberLock sync.RWMutex  subscribers    map[chan Event]struct{}  machine    Machine  tasks       chan Task  monitor      *Monitor  eventLog    []Event  eventLock     sync.Mutex  timeout     time.Duration}

  

2、containerd/supervisor/supervisor.go

// New returns an initialized Process supervisor

func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error)

(1)、调用machine, err := CollectionMachineInformation(),获取当前宿主机的CPU数和RAM总量

(2)、调用monitor, err := NewMonitor(),启动并返回一个监视器

(3)、填充数据结构Supervisor:

s := &Supervisor{  stateDir:    stateDir,  containers:   make(map[string]*ContainerInfo),  startTasks:   startTasks,  machine:    machine,  subscriber:   make(map[chan Event]struct{}),  tasks:      make(chan Task, defaultBufferSize),  monitor:    monitor,  runtime:    runtimeName,  runtimeArgs:  runtimeArgs,  shim:     shimName,  timeout:    timeout,}

  

(4)、调用setupEventLog(s, retainCount)设置event log

(5)、生成两个goroutine,s.exitHandler()和s.oomHandler()

(6)、最后,调用s.restore(),加载之前已经存在的容器

 

3、containerd/supervisor/supervisor.go

func (s *Supvervisor) restore() error

(1)、遍历目录s.stateDir(其实就是/var/run/docker/libcontainerd/containerd)

(2)、调用id := d.Name()获取容器id,再调用container, err := runtime.Load(s.stateDir, id, s.shim, s.timeout),load的作用就是加载s.stateDir/id/state.json获取容器实例。之后,再遍历s.stateDir/id/下的pid 文件,加载容器中的process。

(3)、调用processes, err := container.Processes(),加载容器中的process,如果process的状态为running,则调用s.monitorProcess(p)对其进行监控,并对其中不在运行的process进行处理。

 

4、containerd/supervisor/supervisor.go

// Start is a non-blocking call that runs the supervisor for monitoring container processes and executing new containers

// This event loop is the only thing that is allowed to modify state of containers and processes, therefore it is save to do operations

// in the handlers that modify state of the system or state of the Supervisor

func (s *Supervisor) Start() error

该函数所做的工作很简单,就是启动一个goroutine,再for i := range s.tasks,调用s.handlerTask(i)

 

Task的数据结构如下所示:

// Task executes an action returning an error chan with either nil or the error from excuting the tasktype Task interface {  // ErrorCh returns a channel used to report and error from an async task  ErrorCh() chan error}

  

5、containerd/supervisor/supervisor.go

func (s *Supervisor) handleTask(i Task)

该函数根据i的类型,调用相应的处理函数进行处理。例如,i.(type)为*StartTask时,则调用s.start(t),若i.(type)为*DeleteTask时,则调用s.delete(t)。

 

-----------------------------------------------------------------------  worker的工作 -------------------------------------------------------------------------

worker的数据结构如下所示:

type Work interface {  Start()}type worker struct {  wg  *sync.WaitGroup  s   *Supervisor}

 

4、containerd/supervisor/worker.go

func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker

这个函数只是简单地填充数据结构,return &worker{s: s, wg: wg}

 

5、containerd/supervisor/worker.go

// Start runs a loop in charge of starting new containers

func (w *worker) Start()

(1)、遍历w.s.startTasks,调用process, err := t.container.Start(t.checkPointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))

(2)、调用w.s.monitor.MonitorOOM(t.Container)和w.s.monitorProcess(process)对container和process进行监控

(3)、当我们从checkpoint restore一个容器的时候,不需要start process。因此,在t.CheckpointPath == ""的时候,调用process.Start()

(4)、调用ContainerStartTimer.UpdateSince(started),started是当前的时间

(5)、最后,调用t.Err <- nil, t.StartResponse <- StartResponse{Container: t.Container},和w.s.notifySubscribers(Event{Timestamp: time.Now, ID: t.container.ID(), Type: StateStart}),进行消息通知

 

---------------------------------------------------------------------------- monitor 分析 -----------------------------------------------------------------------

Monitor的数据结构如下所示:

// Monitor represents a runtime.Process monitortype Monitor struct {  m       sync.Mutext  receivers    map[int]interface{}  exits      chan runtime.Process  ooms     chan string  epollFd    int}

  

1、containerd/supervisor/monitor_linux.go

// NewMonitor starts a new process monitor and returns it

(1)、首先获取一个monitor实例,m := &Monitor{receivers: make(map[int]interface{}), exits: make(chan runtime.Process, 1024), oom: make(chan string, 1024)}

(2)、调用fd, err := archutils.EpollCreate1(0),创建一个epoll fd,接着将fd赋值给m.epollFd

(3)、生成一个goroutine,go m.start()

 

2、containerd/supervisor/monitor_linux.go

func (m *Monitor) start()

(1)、该函数就是对各种syscall.EpollEvent进行处理,每次通过调用n, err := archutils.EpollWait(m.epollFd, events[:], -1),获取n个EpollEvent。

(2)、再通过fd := int(events[i].Fd),r := m.receivers[fd]找到对应的runtimeProcess或者runtime.OOM。

(3)、最后,t := r.(type),再分别对runtime.Process和runtime.OOM进行处理

 

3、containerd/supervisor/monitor_linux.go

// Monitor adds a process to the list of the one being monitored

func (m *Monitor) Monitor(p runtime.Process) error

(1)、调用fd := p.ExitFD() ---> ExitFD returns the fd of the exit pipe,再根据fd新建一个event := syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLHUP,}

(2)、调用archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event)

(3)、最后,调用EpollFdCounter.Inc(1),m.receivers[fd] = p

转载于:https://www.cnblogs.com/YaoDD/p/5996525.html

你可能感兴趣的文章
web.xml 中加载顺序
查看>>
pycharm激活地址
查看>>
hdu 1207 四柱汉诺塔
查看>>
Vue 2.x + Webpack 3.x + Nodejs 多页面项目框架(上篇——纯前端多页面)
查看>>
display:none与visible:hidden的区别
查看>>
我的PHP学习之路
查看>>
【题解】luogu p2340 奶牛会展
查看>>
对PostgreSQL的 SPI_prepare 的理解。
查看>>
解决响应式布局下兼容性的问题
查看>>
京东静态网页练习记录
查看>>
使用DBCP连接池对连接进行管理
查看>>
【洛谷】【堆+模拟】P2278 操作系统
查看>>
hdu3307 欧拉函数
查看>>
Spring Bean InitializingBean和DisposableBean实例
查看>>
Solr4.8.0源码分析(5)之查询流程分析总述
查看>>
[Windows Server]安装系统显示“缺少计算机所需的介质驱动程序”解决方案
查看>>
[容斥][dp][快速幂] Jzoj P5862 孤独
查看>>
Lucene 学习之二:数值类型的索引和范围查询分析
查看>>
软件开发工作模型
查看>>
Java基础之字符串匹配大全
查看>>