@@ -13,14 +13,17 @@ import (
1313 "strings"
1414 "syscall"
1515
16+ gocontext "golang.org/x/net/context"
1617 "google.golang.org/grpc"
1718
18- "github.com/Sirupsen/logrus"
1919 "github.com/docker/containerd"
2020 api "github.com/docker/containerd/api/execution"
21+ "github.com/docker/containerd/events"
2122 "github.com/docker/containerd/execution"
2223 "github.com/docker/containerd/execution/executors/oci"
24+ "github.com/docker/containerd/log"
2325 metrics "github.com/docker/go-metrics"
26+ "github.com/sirupsen/logrus"
2427 "github.com/urfave/cli"
2528
2629 "github.com/nats-io/go-nats"
@@ -85,22 +88,10 @@ high performance container runtime
8588 go serveMetrics (address )
8689 }
8790
88- eventsURL , err := url . Parse (context . GlobalString ( "events-address" ) )
91+ s , err := startNATSServer (context )
8992 if err != nil {
90- return err
91- }
92-
93- no := stand .DefaultNatsServerOptions
94- nOpts := & no
95- nOpts .NoSigs = true
96- parts := strings .Split (eventsURL .Host , ":" )
97- nOpts .Host = parts [0 ]
98- if len (parts ) == 2 {
99- nOpts .Port , err = strconv .Atoi (parts [1 ])
100- } else {
101- nOpts .Port = nats .DefaultPort
93+ return nil
10294 }
103- s := stand .RunServerWithOpts (nil , nOpts )
10495 defer s .Shutdown ()
10596
10697 path := context .GlobalString ("socket" )
@@ -121,24 +112,31 @@ high performance container runtime
121112 }
122113 }
123114
124- // Start events listener
125- nc , err := nats . Connect (context . GlobalString ( "events-address" ) )
115+ // Get events publisher
116+ nec , err := getNATSPublisher (context )
126117 if err != nil {
127118 return err
128119 }
129- nec , err := nats .NewEncodedConn (nc , nats .JSON_ENCODER )
130- if err != nil {
131- nc .Close ()
132- return err
133- }
134120 defer nec .Close ()
135121
136- execService , err := execution .New (executor , nec )
122+ execService , err := execution .New (executor )
137123 if err != nil {
138124 return err
139125 }
140126
141- server := grpc .NewServer ()
127+ // Intercept the GRPC call in order to populate the correct module path
128+ interceptor := func (ctx gocontext.Context , req interface {}, info * grpc.UnaryServerInfo , handler grpc.UnaryHandler ) (interface {}, error ) {
129+ ctx = log .WithModule (ctx , "containerd" )
130+ switch info .Server .(type ) {
131+ case api.ExecutionServiceServer :
132+ ctx = log .WithModule (ctx , "execution" )
133+ ctx = events .WithPoster (ctx , events .GetNATSPoster (nec ))
134+ default :
135+ fmt .Println ("Unknown type: %#v" , info .Server )
136+ }
137+ return handler (ctx , req )
138+ }
139+ server := grpc .NewServer (grpc .UnaryInterceptor (interceptor ))
142140 api .RegisterExecutionServiceServer (server , execService )
143141 go serveGRPC (server , l )
144142
@@ -201,3 +199,48 @@ func dumpStacks() {
201199 buf = buf [:stackSize ]
202200 logrus .Infof ("=== BEGIN goroutine stack dump ===\n %s\n === END goroutine stack dump ===" , buf )
203201}
202+
203+ func startNATSServer (context * cli.Context ) (e * stand.StanServer , err error ) {
204+ eventsURL , err := url .Parse (context .GlobalString ("events-address" ))
205+ if err != nil {
206+ return nil , err
207+ }
208+
209+ no := stand .DefaultNatsServerOptions
210+ nOpts := & no
211+ nOpts .NoSigs = true
212+ parts := strings .Split (eventsURL .Host , ":" )
213+ nOpts .Host = parts [0 ]
214+ if len (parts ) == 2 {
215+ nOpts .Port , err = strconv .Atoi (parts [1 ])
216+ } else {
217+ nOpts .Port = nats .DefaultPort
218+ }
219+ defer func () {
220+ if r := recover (); r != nil {
221+ e = nil
222+ if _ , ok := r .(error ); ! ok {
223+ err = fmt .Errorf ("failed to start NATS server: %v" , r )
224+ } else {
225+ err = r .(error )
226+ }
227+ }
228+ }()
229+ s := stand .RunServerWithOpts (nil , nOpts )
230+
231+ return s , nil
232+ }
233+
234+ func getNATSPublisher (context * cli.Context ) (* nats.EncodedConn , error ) {
235+ nc , err := nats .Connect (context .GlobalString ("events-address" ))
236+ if err != nil {
237+ return nil , err
238+ }
239+ nec , err := nats .NewEncodedConn (nc , nats .JSON_ENCODER )
240+ if err != nil {
241+ nc .Close ()
242+ return nil , err
243+ }
244+
245+ return nec , nil
246+ }
0 commit comments