X Tutup
Skip to content

Commit 0caa233

Browse files
committed
Rework shim logger shutdown process
Signed-off-by: Maksym Pavlenko <makpav@amazon.com>
1 parent 23fc859 commit 0caa233

File tree

2 files changed

+91
-38
lines changed

2 files changed

+91
-38
lines changed

pkg/process/io.go

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,23 @@ import (
2929
"sync"
3030
"sync/atomic"
3131
"syscall"
32+
"time"
3233

3334
"github.com/containerd/containerd/log"
3435
"github.com/containerd/containerd/namespaces"
3536
"github.com/containerd/containerd/pkg/stdio"
37+
"github.com/containerd/containerd/pkg/timeout"
3638
"github.com/containerd/containerd/sys"
3739
"github.com/containerd/fifo"
3840
runc "github.com/containerd/go-runc"
41+
"github.com/hashicorp/go-multierror"
3942
"github.com/pkg/errors"
4043
)
4144

45+
const (
46+
shimLoggerTermTimeout = "io.containerd.timeout.shim.logger.shutdown"
47+
)
48+
4249
var bufPool = sync.Pool{
4350
New: func() interface{} {
4451
// setting to 4096 to align with PIPE_BUF
@@ -254,86 +261,133 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error)
254261
args = append(args, vs[0])
255262
}
256263
}
257-
ctx, cancel := context.WithCancel(ctx)
258-
cmd := exec.CommandContext(ctx, uri.Path, args...)
259-
cmd.Env = append(cmd.Env,
260-
"CONTAINER_ID="+id,
261-
"CONTAINER_NAMESPACE="+ns,
262-
)
264+
263265
out, err := newPipe()
264266
if err != nil {
265-
cancel()
266267
return nil, err
267268
}
269+
268270
serr, err := newPipe()
269271
if err != nil {
270-
cancel()
271272
return nil, err
272273
}
274+
273275
r, w, err := os.Pipe()
274276
if err != nil {
275-
cancel()
276277
return nil, err
277278
}
279+
280+
cmd := exec.Command(uri.Path, args...)
281+
cmd.Env = append(cmd.Env,
282+
"CONTAINER_ID="+id,
283+
"CONTAINER_NAMESPACE="+ns,
284+
)
285+
278286
cmd.ExtraFiles = append(cmd.ExtraFiles, out.r, serr.r, w)
279287
// don't need to register this with the reaper or wait when
280288
// running inside a shim
281289
if err := cmd.Start(); err != nil {
282-
cancel()
283290
return nil, err
284291
}
285292
// close our side of the pipe after start
286293
if err := w.Close(); err != nil {
287-
cancel()
288294
return nil, err
289295
}
290296
// wait for the logging binary to be ready
291297
b := make([]byte, 1)
292298
if _, err := r.Read(b); err != nil && err != io.EOF {
293-
cancel()
294299
return nil, err
295300
}
296301
return &binaryIO{
297-
cmd: cmd,
298-
cancel: cancel,
299-
out: out,
300-
err: serr,
302+
cmd: cmd,
303+
out: out,
304+
err: serr,
301305
}, nil
302306
}
303307

304308
type binaryIO struct {
305309
cmd *exec.Cmd
306-
cancel func()
307310
out, err *pipe
308311
}
309312

310-
func (b *binaryIO) CloseAfterStart() (err error) {
311-
for _, v := range []*pipe{
312-
b.out,
313-
b.err,
314-
} {
313+
func (b *binaryIO) CloseAfterStart() error {
314+
var (
315+
result *multierror.Error
316+
)
317+
318+
for _, v := range []*pipe{b.out, b.err} {
315319
if v != nil {
316-
if cerr := v.r.Close(); err == nil {
317-
err = cerr
320+
if err := v.r.Close(); err != nil {
321+
result = multierror.Append(result, err)
318322
}
319323
}
320324
}
321-
return err
325+
326+
return result.ErrorOrNil()
322327
}
323328

324-
func (b *binaryIO) Close() (err error) {
325-
b.cancel()
326-
for _, v := range []*pipe{
327-
b.out,
328-
b.err,
329-
} {
329+
func (b *binaryIO) Close() error {
330+
var (
331+
result *multierror.Error
332+
)
333+
334+
for _, v := range []*pipe{b.out, b.err} {
330335
if v != nil {
331-
if cerr := v.Close(); err == nil {
332-
err = cerr
336+
if err := v.Close(); err != nil {
337+
result = multierror.Append(result, err)
333338
}
334339
}
335340
}
336-
return err
341+
342+
if err := b.cancel(); err != nil {
343+
result = multierror.Append(result, err)
344+
}
345+
346+
return result.ErrorOrNil()
347+
}
348+
349+
func (b *binaryIO) cancel() error {
350+
if b.cmd == nil || b.cmd.Process == nil {
351+
return nil
352+
}
353+
354+
// Send SIGTERM first, so logger process has a chance to flush and exit properly
355+
if err := b.cmd.Process.Signal(syscall.SIGTERM); err != nil {
356+
result := multierror.Append(errors.Wrap(err, "failed to send SIGTERM"))
357+
358+
log.L.WithError(err).Warn("failed to send SIGTERM signal, killing logging shim")
359+
360+
if err := b.cmd.Process.Kill(); err != nil {
361+
result = multierror.Append(result, errors.Wrap(err, "failed to kill process after faulty SIGTERM"))
362+
}
363+
364+
return result.ErrorOrNil()
365+
}
366+
367+
done := make(chan error)
368+
go func() {
369+
err := b.cmd.Wait()
370+
if err != nil {
371+
err = errors.Wrap(err, "failed to wait for shim logger process after SIGTERM")
372+
}
373+
done <- err
374+
}()
375+
376+
termTimeout := timeout.Get(shimLoggerTermTimeout)
377+
378+
select {
379+
case err := <-done:
380+
return err
381+
case <-time.After(termTimeout):
382+
log.L.Warn("failed to wait for shim logger process to exit, killing")
383+
384+
err := b.cmd.Process.Kill()
385+
if err != nil {
386+
return errors.Wrap(err, "failed to kill shim logger process")
387+
}
388+
389+
return nil
390+
}
337391
}
338392

339393
func (b *binaryIO) Stdin() io.WriteCloser {

runtime/v2/logging/logging.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type LoggerFunc func(context.Context, *Config, func() error) error
4242
// Run the logging driver
4343
func Run(fn LoggerFunc) {
4444
ctx, cancel := context.WithCancel(context.Background())
45+
defer cancel()
46+
4547
config := &Config{
4648
ID: os.Getenv("CONTAINER_ID"),
4749
Namespace: os.Getenv("CONTAINER_NAMESPACE"),
@@ -56,10 +58,7 @@ func Run(fn LoggerFunc) {
5658
signal.Notify(s, unix.SIGTERM)
5759

5860
go func() {
59-
if err := fn(ctx, config, wait.Close); err != nil {
60-
errCh <- err
61-
}
62-
errCh <- nil
61+
errCh <- fn(ctx, config, wait.Close)
6362
}()
6463

6564
for {

0 commit comments

Comments
 (0)
X Tutup