X Tutup
Skip to content

Commit 7a9fbec

Browse files
author
Akshat Kumar
committed
Add logging binary support when terminal is true
Currently the shims only support starting the logging binary process if the io.Creator Config does not specify Terminal: true. This means that the program using containerd will only be able to specify FIFO io when Terminal: true, rather than allowing the shim to fork the logging binary process. Hence, containerd consumers face an inconsistent behavior regarding logging binary management depending on the Terminal option. Allowing the shim to fork the logging binary process will introduce consistency between the running container and the logging process. Otherwise, the logging process may die if its parent process dies whereas the container will keep running, resulting in the loss of container logs. Signed-off-by: Akshat Kumar <kshtku@amazon.com>
1 parent 5c73fe0 commit 7a9fbec

File tree

9 files changed

+328
-61
lines changed

9 files changed

+328
-61
lines changed

cio/io.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,26 @@ func BinaryIO(binary string, args map[string]string) Creator {
260260
}
261261
}
262262

263+
// TerminalBinaryIO forwards container STDOUT|STDERR directly to a logging binary
264+
// It also sets the terminal option to true
265+
func TerminalBinaryIO(binary string, args map[string]string) Creator {
266+
return func(_ string) (IO, error) {
267+
uri, err := LogURIGenerator("binary", binary, args)
268+
if err != nil {
269+
return nil, err
270+
}
271+
272+
res := uri.String()
273+
return &logURI{
274+
config: Config{
275+
Stdout: res,
276+
Stderr: res,
277+
Terminal: true,
278+
},
279+
}, nil
280+
}
281+
}
282+
263283
// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
264284
// If the log file already exists, the logs will be appended to the file.
265285
func LogFile(path string) Creator {

pkg/process/exec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (e *execProcess) start(ctx context.Context) (err error) {
221221
if err != nil {
222222
return errors.Wrap(err, "failed to retrieve console master")
223223
}
224-
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
224+
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.id, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
225225
return errors.Wrap(err, "failed to start console copy")
226226
}
227227
} else {

pkg/process/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
157157
if err != nil {
158158
return errors.Wrap(err, "failed to retrieve console master")
159159
}
160-
console, err = p.Platform.CopyConsole(ctx, console, r.Stdin, r.Stdout, r.Stderr, &p.wg)
160+
console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg)
161161
if err != nil {
162162
return errors.Wrap(err, "failed to start console copy")
163163
}

pkg/process/init_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (s *createdCheckpointState) Start(ctx context.Context) error {
172172
if err != nil {
173173
return errors.Wrap(err, "failed to retrieve console master")
174174
}
175-
console, err = p.Platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg)
175+
console, err = p.Platform.CopyConsole(ctx, console, p.id, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg)
176176
if err != nil {
177177
return errors.Wrap(err, "failed to start console copy")
178178
}

pkg/stdio/platform.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
// Platform handles platform-specific behavior that may differs across
2727
// platform implementations
2828
type Platform interface {
29-
CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string,
29+
CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string,
3030
wg *sync.WaitGroup) (console.Console, error)
3131
ShutdownConsole(ctx context.Context, console console.Console) error
3232
Close() error

runtime/io.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package runtime
18+
19+
import (
20+
"net/url"
21+
"os"
22+
"os/exec"
23+
)
24+
25+
type Pipe struct {
26+
R *os.File
27+
W *os.File
28+
}
29+
30+
func NewPipe() (*Pipe, error) {
31+
R, W, err := os.Pipe()
32+
if err != nil {
33+
return nil, err
34+
}
35+
return &Pipe{
36+
R: R,
37+
W: W,
38+
}, nil
39+
}
40+
41+
func NewBinaryCmd(binaryURI *url.URL, id, ns string) *exec.Cmd {
42+
var args []string
43+
for k, vs := range binaryURI.Query() {
44+
args = append(args, k)
45+
if len(vs) > 0 {
46+
args = append(args, vs[0])
47+
}
48+
}
49+
50+
cmd := exec.Command(binaryURI.Path, args...)
51+
52+
cmd.Env = append(cmd.Env,
53+
"CONTAINER_ID="+id,
54+
"CONTAINER_NAMESPACE="+ns,
55+
)
56+
57+
return cmd
58+
}

runtime/v1/shim/service_linux.go

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@ package shim
1919
import (
2020
"context"
2121
"io"
22+
"net/url"
23+
"os"
2224
"sync"
2325
"syscall"
2426

2527
"github.com/containerd/console"
28+
"github.com/containerd/containerd/namespaces"
29+
"github.com/containerd/containerd/runtime"
2630
"github.com/containerd/fifo"
2731
"github.com/pkg/errors"
2832
)
@@ -31,7 +35,7 @@ type linuxPlatform struct {
3135
epoller *console.Epoller
3236
}
3337

34-
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
38+
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
3539
if p.epoller == nil {
3640
return nil, errors.New("uninitialized epoller")
3741
}
@@ -59,26 +63,85 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
5963
}()
6064
}
6165

62-
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
66+
uri, err := url.Parse(stdout)
6367
if err != nil {
64-
return nil, err
68+
return nil, errors.Wrap(err, "unable to parse stdout uri")
6569
}
66-
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
67-
if err != nil {
68-
return nil, err
70+
71+
switch uri.Scheme {
72+
case "binary":
73+
ns, err := namespaces.NamespaceRequired(ctx)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
cmd := runtime.NewBinaryCmd(uri, id, ns)
79+
80+
// Create pipe to be used by logging binary for Stdout
81+
out, err := runtime.NewPipe()
82+
if err != nil {
83+
return nil, errors.Wrap(err, "failed to create stdout pipes")
84+
}
85+
86+
// Stderr is created for logging binary but unused when terminal is true
87+
serr, err := runtime.NewPipe()
88+
if err != nil {
89+
return nil, errors.Wrap(err, "failed to create stderr pipes")
90+
}
91+
92+
r, w, err := os.Pipe()
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
cmd.ExtraFiles = append(cmd.ExtraFiles, out.R, serr.R, w)
98+
99+
wg.Add(1)
100+
cwg.Add(1)
101+
go func() {
102+
cwg.Done()
103+
io.Copy(out.W, epollConsole)
104+
out.W.Close()
105+
wg.Done()
106+
}()
107+
108+
if err := cmd.Start(); err != nil {
109+
return nil, errors.Wrap(err, "failed to start logging binary process")
110+
}
111+
112+
// Close our side of the pipe after start
113+
if err := w.Close(); err != nil {
114+
return nil, errors.Wrap(err, "failed to close write pipe after start")
115+
}
116+
117+
// Wait for the logging binary to be ready
118+
b := make([]byte, 1)
119+
if _, err := r.Read(b); err != nil && err != io.EOF {
120+
return nil, errors.Wrap(err, "failed to read from logging binary")
121+
}
122+
123+
default:
124+
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
125+
if err != nil {
126+
return nil, err
127+
}
128+
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
129+
if err != nil {
130+
return nil, err
131+
}
132+
wg.Add(1)
133+
cwg.Add(1)
134+
go func() {
135+
cwg.Done()
136+
p := bufPool.Get().(*[]byte)
137+
defer bufPool.Put(p)
138+
io.CopyBuffer(outw, epollConsole, *p)
139+
outw.Close()
140+
outr.Close()
141+
wg.Done()
142+
}()
143+
cwg.Wait()
69144
}
70-
wg.Add(1)
71-
cwg.Add(1)
72-
go func() {
73-
cwg.Done()
74-
p := bufPool.Get().(*[]byte)
75-
defer bufPool.Put(p)
76-
io.CopyBuffer(outw, epollConsole, *p)
77-
outw.Close()
78-
outr.Close()
79-
wg.Done()
80-
}()
81-
cwg.Wait()
82145
return epollConsole, nil
83146
}
84147

runtime/v1/shim/service_unix.go

Lines changed: 82 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,22 @@ package shim
2121
import (
2222
"context"
2323
"io"
24+
"net/url"
25+
"os"
2426
"sync"
2527
"syscall"
2628

2729
"github.com/containerd/console"
30+
"github.com/containerd/containerd/namespaces"
31+
"github.com/containerd/containerd/runtime"
2832
"github.com/containerd/fifo"
33+
"github.com/pkg/errors"
2934
)
3035

3136
type unixPlatform struct {
3237
}
3338

34-
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
39+
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (console.Console, error) {
3540
var cwg sync.WaitGroup
3641
if stdin != "" {
3742
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
@@ -47,28 +52,85 @@ func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console,
4752
io.CopyBuffer(console, in, *p)
4853
}()
4954
}
50-
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
55+
uri, err := url.Parse(stdout)
5156
if err != nil {
52-
return nil, err
57+
return nil, errors.Wrap(err, "unable to parse stdout uri")
5358
}
54-
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
55-
if err != nil {
56-
return nil, err
59+
60+
switch uri.Scheme {
61+
case "binary":
62+
ns, err := namespaces.NamespaceRequired(ctx)
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
cmd := runtime.NewBinaryCmd(uri, id, ns)
68+
69+
// Create pipe to be used by logging binary for Stdout
70+
out, err := runtime.NewPipe()
71+
if err != nil {
72+
return nil, errors.Wrap(err, "failed to create stdout pipes")
73+
}
74+
75+
// Stderr is created for logging binary but unused when terminal is true
76+
serr, err := runtime.NewPipe()
77+
if err != nil {
78+
return nil, errors.Wrap(err, "failed to create stderr pipes")
79+
}
80+
81+
r, w, err := os.Pipe()
82+
if err != nil {
83+
return nil, err
84+
}
85+
86+
cmd.ExtraFiles = append(cmd.ExtraFiles, out.R, serr.R, w)
87+
88+
wg.Add(1)
89+
cwg.Add(1)
90+
go func() {
91+
cwg.Done()
92+
io.Copy(out.W, console)
93+
out.W.Close()
94+
wg.Done()
95+
}()
96+
97+
if err := cmd.Start(); err != nil {
98+
return nil, errors.Wrap(err, "failed to start logging binary process")
99+
}
100+
101+
// Close our side of the pipe after start
102+
if err := w.Close(); err != nil {
103+
return nil, errors.Wrap(err, "failed to close write pipe after start")
104+
}
105+
106+
// Wait for the logging binary to be ready
107+
b := make([]byte, 1)
108+
if _, err := r.Read(b); err != nil && err != io.EOF {
109+
return nil, errors.Wrap(err, "failed to read from logging binary")
110+
}
111+
112+
default:
113+
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
114+
if err != nil {
115+
return nil, err
116+
}
117+
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
118+
if err != nil {
119+
return nil, err
120+
}
121+
wg.Add(1)
122+
cwg.Add(1)
123+
go func() {
124+
cwg.Done()
125+
p := bufPool.Get().(*[]byte)
126+
defer bufPool.Put(p)
127+
io.CopyBuffer(outw, console, *p)
128+
outw.Close()
129+
outr.Close()
130+
wg.Done()
131+
}()
132+
cwg.Wait()
57133
}
58-
wg.Add(1)
59-
cwg.Add(1)
60-
go func() {
61-
cwg.Done()
62-
p := bufPool.Get().(*[]byte)
63-
defer bufPool.Put(p)
64-
65-
io.CopyBuffer(outw, console, *p)
66-
console.Close()
67-
outr.Close()
68-
outw.Close()
69-
wg.Done()
70-
}()
71-
cwg.Wait()
72134
return console, nil
73135
}
74136

0 commit comments

Comments
 (0)
X Tutup