X Tutup
Skip to content

Commit aa5ff88

Browse files
committed
Integrate NATS with event subsystem
Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
1 parent 934940a commit aa5ff88

File tree

10 files changed

+165
-68
lines changed

10 files changed

+165
-68
lines changed

cmd/containerd/main.go

Lines changed: 67 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@ import (
1313
"strings"
1414
"syscall"
1515

16+
gocontext "golang.org/x/net/context"
1617
"google.golang.org/grpc"
1718

18-
"github.com/Sirupsen/logrus"
1919
"github.com/docker/containerd"
2020
api "github.com/docker/containerd/api/execution"
21+
"github.com/docker/containerd/events"
2122
"github.com/docker/containerd/execution"
2223
"github.com/docker/containerd/execution/executors/oci"
24+
"github.com/docker/containerd/log"
2325
metrics "github.com/docker/go-metrics"
26+
"github.com/sirupsen/logrus"
2427
"github.com/urfave/cli"
2528

2629
"github.com/nats-io/go-nats"
@@ -85,22 +88,10 @@ high performance container runtime
8588
go serveMetrics(address)
8689
}
8790

88-
eventsURL, err := url.Parse(context.GlobalString("events-address"))
91+
s, err := startNATSServer(context)
8992
if err != nil {
90-
return err
91-
}
92-
93-
no := stand.DefaultNatsServerOptions
94-
nOpts := &no
95-
nOpts.NoSigs = true
96-
parts := strings.Split(eventsURL.Host, ":")
97-
nOpts.Host = parts[0]
98-
if len(parts) == 2 {
99-
nOpts.Port, err = strconv.Atoi(parts[1])
100-
} else {
101-
nOpts.Port = nats.DefaultPort
93+
return nil
10294
}
103-
s := stand.RunServerWithOpts(nil, nOpts)
10495
defer s.Shutdown()
10596

10697
path := context.GlobalString("socket")
@@ -121,24 +112,31 @@ high performance container runtime
121112
}
122113
}
123114

124-
// Start events listener
125-
nc, err := nats.Connect(context.GlobalString("events-address"))
115+
// Get events publisher
116+
nec, err := getNATSPublisher(context)
126117
if err != nil {
127118
return err
128119
}
129-
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
130-
if err != nil {
131-
nc.Close()
132-
return err
133-
}
134120
defer nec.Close()
135121

136-
execService, err := execution.New(executor, nec)
122+
execService, err := execution.New(executor)
137123
if err != nil {
138124
return err
139125
}
140126

141-
server := grpc.NewServer()
127+
// Intercept the GRPC call in order to populate the correct module path
128+
interceptor := func(ctx gocontext.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
129+
ctx = log.WithModule(ctx, "containerd")
130+
switch info.Server.(type) {
131+
case api.ExecutionServiceServer:
132+
ctx = log.WithModule(ctx, "execution")
133+
ctx = events.WithPoster(ctx, events.GetNATSPoster(nec))
134+
default:
135+
fmt.Println("Unknown type: %#v", info.Server)
136+
}
137+
return handler(ctx, req)
138+
}
139+
server := grpc.NewServer(grpc.UnaryInterceptor(interceptor))
142140
api.RegisterExecutionServiceServer(server, execService)
143141
go serveGRPC(server, l)
144142

@@ -201,3 +199,48 @@ func dumpStacks() {
201199
buf = buf[:stackSize]
202200
logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
203201
}
202+
203+
func startNATSServer(context *cli.Context) (e *stand.StanServer, err error) {
204+
eventsURL, err := url.Parse(context.GlobalString("events-address"))
205+
if err != nil {
206+
return nil, err
207+
}
208+
209+
no := stand.DefaultNatsServerOptions
210+
nOpts := &no
211+
nOpts.NoSigs = true
212+
parts := strings.Split(eventsURL.Host, ":")
213+
nOpts.Host = parts[0]
214+
if len(parts) == 2 {
215+
nOpts.Port, err = strconv.Atoi(parts[1])
216+
} else {
217+
nOpts.Port = nats.DefaultPort
218+
}
219+
defer func() {
220+
if r := recover(); r != nil {
221+
e = nil
222+
if _, ok := r.(error); !ok {
223+
err = fmt.Errorf("failed to start NATS server: %v", r)
224+
} else {
225+
err = r.(error)
226+
}
227+
}
228+
}()
229+
s := stand.RunServerWithOpts(nil, nOpts)
230+
231+
return s, nil
232+
}
233+
234+
func getNATSPublisher(context *cli.Context) (*nats.EncodedConn, error) {
235+
nc, err := nats.Connect(context.GlobalString("events-address"))
236+
if err != nil {
237+
return nil, err
238+
}
239+
nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
240+
if err != nil {
241+
nc.Close()
242+
return nil, err
243+
}
244+
245+
return nec, nil
246+
}

cmd/ctr/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"fmt"
55
"os"
66

7-
"github.com/Sirupsen/logrus"
87
"github.com/docker/containerd"
8+
"github.com/sirupsen/logrus"
99
"github.com/urfave/cli"
1010
)
1111

events/nats.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"github.com/docker/containerd/log"
8+
nats "github.com/nats-io/go-nats"
9+
)
10+
11+
type natsPoster struct {
12+
nec *nats.EncodedConn
13+
}
14+
15+
func GetNATSPoster(nec *nats.EncodedConn) Poster {
16+
return &natsPoster{nec}
17+
}
18+
19+
func (p *natsPoster) Post(ctx context.Context, e Event) {
20+
subject := strings.Replace(log.GetModulePath(ctx), "/", ".", -1)
21+
topic := getTopic(ctx)
22+
if topic != "" {
23+
subject = strings.Join([]string{subject, topic}, ".")
24+
}
25+
26+
if subject == "" {
27+
log.GetLogger(ctx).WithField("event", e).Warn("unable to post event, subject is empty")
28+
}
29+
30+
p.nec.Publish(subject, e)
31+
}

events/poster.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package events
33
import (
44
"context"
55

6-
"github.com/Sirupsen/logrus"
76
"github.com/docker/containerd/log"
7+
"github.com/sirupsen/logrus"
88
)
99

1010
var (
@@ -13,21 +13,25 @@ var (
1313

1414
// Poster posts the event.
1515
type Poster interface {
16-
Post(event Event)
16+
Post(ctx context.Context, event Event)
1717
}
1818

1919
type posterKey struct{}
2020

21+
func WithPoster(ctx context.Context, poster Poster) context.Context {
22+
return context.WithValue(ctx, posterKey{}, poster)
23+
}
24+
2125
func GetPoster(ctx context.Context) Poster {
22-
poster := ctx.Value(ctx)
26+
poster := ctx.Value(posterKey{})
2327
if poster == nil {
2428
logger := log.G(ctx)
2529
tx, _ := getTx(ctx)
2630
topic := getTopic(ctx)
2731

2832
// likely means we don't have a configured event system. Just return
2933
// the default poster, which merely logs events.
30-
return posterFunc(func(event Event) {
34+
return posterFunc(func(ctx context.Context, event Event) {
3135
fields := logrus.Fields{"event": event}
3236

3337
if topic != "" {
@@ -48,8 +52,8 @@ func GetPoster(ctx context.Context) Poster {
4852
return poster.(Poster)
4953
}
5054

51-
type posterFunc func(event Event)
55+
type posterFunc func(ctx context.Context, event Event)
5256

53-
func (fn posterFunc) Post(event Event) {
54-
fn(event)
57+
func (fn posterFunc) Post(ctx context.Context, event Event) {
58+
fn(ctx, event)
5559
}

events/transaction.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,26 @@ func nexttxID() int64 {
1818
}
1919

2020
type transaction struct {
21+
ctx context.Context
2122
id int64
2223
parent *transaction // if nil, no parent transaction
2324
finish sync.Once
2425
start, end time.Time // informational
2526
}
2627

2728
// begin creates a sub-transaction.
28-
func (tx *transaction) begin(poster Poster) *transaction {
29+
func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction {
2930
id := nexttxID()
3031

3132
child := &transaction{
33+
ctx: ctx,
3234
id: id,
3335
parent: tx,
3436
start: time.Now(),
3537
}
3638

3739
// post the transaction started event
38-
poster.Post(child.makeTransactionEvent("begin")) // tranactions are really just events
40+
poster.Post(ctx, child.makeTransactionEvent("begin")) // tranactions are really just events
3941

4042
return child
4143
}
@@ -44,7 +46,7 @@ func (tx *transaction) begin(poster Poster) *transaction {
4446
func (tx *transaction) commit(poster Poster) {
4547
tx.finish.Do(func() {
4648
tx.end = time.Now()
47-
poster.Post(tx.makeTransactionEvent("commit"))
49+
poster.Post(tx.ctx, tx.makeTransactionEvent("commit"))
4850
})
4951
}
5052

@@ -53,7 +55,7 @@ func (tx *transaction) rollback(poster Poster, cause error) {
5355
tx.end = time.Now()
5456
event := tx.makeTransactionEvent("rollback")
5557
event = fmt.Sprintf("%s error=%q", event, cause.Error())
56-
poster.Post(event)
58+
poster.Post(tx.ctx, event)
5759
})
5860
}
5961

@@ -84,7 +86,7 @@ func getTx(ctx context.Context) (*transaction, bool) {
8486
func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) {
8587
poster := G(pctx)
8688
parent, _ := getTx(pctx)
87-
tx := parent.begin(poster)
89+
tx := parent.begin(pctx, poster)
8890

8991
return context.WithValue(pctx, txKey{}, tx), func() {
9092
tx.commit(poster)

execution/events.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package execution
22

3+
import "time"
4+
35
type ContainerEvent struct {
4-
ID string
5-
Action string
6+
Timestamp time.Time
7+
ID string
8+
Action string
69
}
710

811
type ContainerExitEvent struct {
@@ -16,6 +19,6 @@ const (
1619
)
1720

1821
const (
19-
containerEventsSubjectFormat = "containerd.execution.container.%s"
20-
containerProcessEventsSubjectFormat = "containerd.execution.container.%s.%s"
22+
containerEventsTopicFormat = "container.%s"
23+
containerProcessEventsTopicFormat = "container.%s.%s"
2124
)

execution/executors/oci/oci.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
"github.com/docker/containerd/execution"
1313
)
1414

15-
var ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string")
15+
var (
16+
ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string")
17+
)
1618

1719
func New(root string) (*OCIRuntime, error) {
1820
err := SetSubreaper(1)

execution/log.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package execution
2+
3+
import (
4+
"context"
5+
6+
"github.com/docker/containerd/log"
7+
"github.com/sirupsen/logrus"
8+
)
9+
10+
var ctx context.Context
11+
12+
func GetLogger(module string) *logrus.Entry {
13+
if ctx == nil {
14+
ctx = log.WithModule(context.Background(), "execution")
15+
}
16+
17+
subCtx := log.WithModule(ctx, module)
18+
return log.GetLogger(subCtx)
19+
}

0 commit comments

Comments
 (0)
X Tutup