X Tutup
Skip to content

Commit 2b565da

Browse files
committed
Add restart monitor
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
1 parent e63768e commit 2b565da

File tree

6 files changed

+379
-7
lines changed

6 files changed

+379
-7
lines changed

cmd/containerd/builtins.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package main
2020
import (
2121
_ "github.com/containerd/containerd/diff/walking/plugin"
2222
_ "github.com/containerd/containerd/gc/scheduler"
23+
_ "github.com/containerd/containerd/restart/monitor"
2324
_ "github.com/containerd/containerd/services/containers"
2425
_ "github.com/containerd/containerd/services/content"
2526
_ "github.com/containerd/containerd/services/diff"

metadata/buckets.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,7 @@ func getImagesBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
135135
}
136136

137137
func createContainersBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
138-
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContainers)
139-
if err != nil {
140-
return nil, err
141-
}
142-
return bkt, nil
138+
return createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContainers)
143139
}
144140

145141
func getContainersBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {

plugin/plugin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ type Type string
5454
func (t Type) String() string { return string(t) }
5555

5656
const (
57-
// AllPlugins declares that the plugin should be initialized after all others.
58-
AllPlugins Type = "*"
57+
// InternalPlugin implements an internal plugin to containerd
58+
InternalPlugin Type = "io.containerd.internal.v1"
5959
// RuntimePlugin implements a runtime
6060
RuntimePlugin Type = "io.containerd.runtime.v1"
6161
// ServicePlugin implements a internal service

restart/monitor/change.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package monitor
18+
19+
import (
20+
"context"
21+
"syscall"
22+
23+
"github.com/containerd/containerd"
24+
"github.com/containerd/containerd/cio"
25+
)
26+
27+
type stopChange struct {
28+
container containerd.Container
29+
}
30+
31+
func (s *stopChange) apply(ctx context.Context, client *containerd.Client) error {
32+
return killTask(ctx, s.container)
33+
}
34+
35+
type startChange struct {
36+
container containerd.Container
37+
logPath string
38+
}
39+
40+
func (s *startChange) apply(ctx context.Context, client *containerd.Client) error {
41+
log := cio.NullIO
42+
if s.logPath != "" {
43+
log = cio.LogFile(s.logPath)
44+
}
45+
killTask(ctx, s.container)
46+
task, err := s.container.NewTask(ctx, log)
47+
if err != nil {
48+
return err
49+
}
50+
return task.Start(ctx)
51+
}
52+
53+
func killTask(ctx context.Context, container containerd.Container) error {
54+
task, err := container.Task(ctx, nil)
55+
if err == nil {
56+
wait, err := task.Wait(ctx)
57+
if err != nil {
58+
if _, derr := task.Delete(ctx); derr == nil {
59+
return nil
60+
}
61+
return err
62+
}
63+
if err := task.Kill(ctx, syscall.SIGKILL, containerd.WithKillAll); err != nil {
64+
if _, derr := task.Delete(ctx); derr == nil {
65+
return nil
66+
}
67+
return err
68+
}
69+
<-wait
70+
if _, err := task.Delete(ctx); err != nil {
71+
return err
72+
}
73+
}
74+
return nil
75+
}

restart/monitor/monitor.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package monitor
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/containerd/containerd"
25+
containers "github.com/containerd/containerd/api/services/containers/v1"
26+
diff "github.com/containerd/containerd/api/services/diff/v1"
27+
images "github.com/containerd/containerd/api/services/images/v1"
28+
leases "github.com/containerd/containerd/api/services/leases/v1"
29+
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
30+
tasks "github.com/containerd/containerd/api/services/tasks/v1"
31+
"github.com/containerd/containerd/content"
32+
"github.com/containerd/containerd/namespaces"
33+
"github.com/containerd/containerd/plugin"
34+
"github.com/containerd/containerd/restart"
35+
"github.com/containerd/containerd/services"
36+
"github.com/containerd/containerd/snapshots"
37+
"github.com/pkg/errors"
38+
"github.com/sirupsen/logrus"
39+
)
40+
41+
type duration struct {
42+
time.Duration
43+
}
44+
45+
func (d *duration) UnmarshalText(text []byte) error {
46+
var err error
47+
d.Duration, err = time.ParseDuration(string(text))
48+
return err
49+
}
50+
51+
func (d duration) MarshalText() ([]byte, error) {
52+
return []byte(d.Duration.String()), nil
53+
}
54+
55+
// Config for the restart monitor
56+
type Config struct {
57+
// Interval for how long to wait to check for state changes
58+
Interval duration `toml:"interval"`
59+
}
60+
61+
func init() {
62+
plugin.Register(&plugin.Registration{
63+
Type: plugin.InternalPlugin,
64+
Requires: []plugin.Type{
65+
plugin.ServicePlugin,
66+
},
67+
ID: "restart",
68+
Config: &Config{
69+
Interval: duration{
70+
Duration: 10 * time.Second,
71+
},
72+
},
73+
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
74+
opts, err := getServicesOpts(ic)
75+
if err != nil {
76+
return nil, err
77+
}
78+
client, err := containerd.New("", containerd.WithServices(opts...))
79+
if err != nil {
80+
return nil, err
81+
}
82+
m := &monitor{
83+
client: client,
84+
}
85+
go m.run(ic.Config.(*Config).Interval.Duration)
86+
return m, nil
87+
},
88+
})
89+
}
90+
91+
// getServicesOpts get service options from plugin context.
92+
func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
93+
plugins, err := ic.GetByType(plugin.ServicePlugin)
94+
if err != nil {
95+
return nil, errors.Wrap(err, "failed to get service plugin")
96+
}
97+
opts := []containerd.ServicesOpt{
98+
containerd.WithEventService(ic.Events),
99+
}
100+
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
101+
services.ContentService: func(s interface{}) containerd.ServicesOpt {
102+
return containerd.WithContentStore(s.(content.Store))
103+
},
104+
services.ImagesService: func(s interface{}) containerd.ServicesOpt {
105+
return containerd.WithImageService(s.(images.ImagesClient))
106+
},
107+
services.SnapshotsService: func(s interface{}) containerd.ServicesOpt {
108+
return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter))
109+
},
110+
services.ContainersService: func(s interface{}) containerd.ServicesOpt {
111+
return containerd.WithContainerService(s.(containers.ContainersClient))
112+
},
113+
services.TasksService: func(s interface{}) containerd.ServicesOpt {
114+
return containerd.WithTaskService(s.(tasks.TasksClient))
115+
},
116+
services.DiffService: func(s interface{}) containerd.ServicesOpt {
117+
return containerd.WithDiffService(s.(diff.DiffClient))
118+
},
119+
services.NamespacesService: func(s interface{}) containerd.ServicesOpt {
120+
return containerd.WithNamespaceService(s.(namespacesapi.NamespacesClient))
121+
},
122+
services.LeasesService: func(s interface{}) containerd.ServicesOpt {
123+
return containerd.WithLeasesService(s.(leases.LeasesClient))
124+
},
125+
} {
126+
p := plugins[s]
127+
if p == nil {
128+
return nil, errors.Errorf("service %q not found", s)
129+
}
130+
i, err := p.Instance()
131+
if err != nil {
132+
return nil, errors.Wrapf(err, "failed to get instance of service %q", s)
133+
}
134+
if i == nil {
135+
return nil, errors.Errorf("instance of service %q not found", s)
136+
}
137+
opts = append(opts, fn(i))
138+
}
139+
return opts, nil
140+
}
141+
142+
type change interface {
143+
apply(context.Context, *containerd.Client) error
144+
}
145+
146+
type monitor struct {
147+
client *containerd.Client
148+
}
149+
150+
func (m *monitor) run(interval time.Duration) {
151+
if interval == 0 {
152+
interval = 10 * time.Second
153+
}
154+
for {
155+
time.Sleep(interval)
156+
if err := m.reconcile(context.Background()); err != nil {
157+
logrus.WithError(err).Error("reconcile")
158+
}
159+
}
160+
}
161+
162+
func (m *monitor) reconcile(ctx context.Context) error {
163+
ns, err := m.client.NamespaceService().List(ctx)
164+
if err != nil {
165+
return err
166+
}
167+
for _, name := range ns {
168+
ctx = namespaces.WithNamespace(ctx, name)
169+
changes, err := m.monitor(ctx)
170+
if err != nil {
171+
return err
172+
}
173+
for _, c := range changes {
174+
if err := c.apply(ctx, m.client); err != nil {
175+
logrus.WithError(err).Error("apply change")
176+
}
177+
}
178+
}
179+
return nil
180+
}
181+
182+
func (m *monitor) monitor(ctx context.Context) ([]change, error) {
183+
containers, err := m.client.Containers(ctx, fmt.Sprintf("labels.%q", restart.StatusLabel))
184+
if err != nil {
185+
return nil, err
186+
}
187+
var changes []change
188+
for _, c := range containers {
189+
labels, err := c.Labels(ctx)
190+
if err != nil {
191+
return nil, err
192+
}
193+
desiredStatus := containerd.ProcessStatus(labels[restart.StatusLabel])
194+
if m.isSameStatus(ctx, desiredStatus, c) {
195+
continue
196+
}
197+
switch desiredStatus {
198+
case containerd.Running:
199+
changes = append(changes, &startChange{
200+
container: c,
201+
logPath: labels[restart.LogPathLabel],
202+
})
203+
case containerd.Stopped:
204+
changes = append(changes, &stopChange{
205+
container: c,
206+
})
207+
}
208+
}
209+
return changes, nil
210+
}
211+
212+
func (m *monitor) isSameStatus(ctx context.Context, desired containerd.ProcessStatus, container containerd.Container) bool {
213+
task, err := container.Task(ctx, nil)
214+
if err != nil {
215+
return desired == containerd.Stopped
216+
}
217+
state, err := task.Status(ctx)
218+
if err != nil {
219+
return desired == containerd.Stopped
220+
}
221+
return desired == state.Status
222+
}

0 commit comments

Comments
 (0)
X Tutup