X Tutup
Skip to content

Commit a70b95b

Browse files
committed
Move events exchange into subpackage
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
1 parent 526d15b commit a70b95b

File tree

7 files changed

+33
-31
lines changed

7 files changed

+33
-31
lines changed
Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
package events
1+
package exchange
22

33
import (
44
"context"
55
"strings"
66
"time"
77

8-
events "github.com/containerd/containerd/api/services/events/v1"
8+
v1 "github.com/containerd/containerd/api/services/events/v1"
99
"github.com/containerd/containerd/errdefs"
10+
"github.com/containerd/containerd/events"
1011
"github.com/containerd/containerd/filters"
1112
"github.com/containerd/containerd/identifiers"
1213
"github.com/containerd/containerd/log"
@@ -34,7 +35,7 @@ func NewExchange() *Exchange {
3435
//
3536
// This is useful when an event is forwaded on behalf of another namespace or
3637
// when the event is propagated on behalf of another publisher.
37-
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
38+
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) {
3839
if err := validateEnvelope(envelope); err != nil {
3940
return err
4041
}
@@ -59,11 +60,11 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err
5960
// Publish packages and sends an event. The caller will be considered the
6061
// initial publisher of the event. This means the timestamp will be calculated
6162
// at this point and this method may read from the calling context.
62-
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) {
63+
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
6364
var (
6465
namespace string
6566
encoded *types.Any
66-
envelope events.Envelope
67+
envelope v1.Envelope
6768
)
6869

6970
namespace, err = namespaces.NamespaceRequired(ctx)
@@ -108,9 +109,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err
108109
// Zero or more filters may be provided as strings. Only events that match
109110
// *any* of the provided filters will be sent on the channel. The filters use
110111
// the standard containerd filters package syntax.
111-
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
112+
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) {
112113
var (
113-
evch = make(chan *events.Envelope)
114+
evch = make(chan *v1.Envelope)
114115
errq = make(chan error, 1)
115116
channel = goevents.NewChannel(0)
116117
queue = goevents.NewQueue(channel)
@@ -150,7 +151,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
150151
for {
151152
select {
152153
case ev := <-channel.C:
153-
env, ok := ev.(*events.Envelope)
154+
env, ok := ev.(*v1.Envelope)
154155
if !ok {
155156
// TODO(stevvooe): For the most part, we are well protected
156157
// from this condition. Both Forward and Publish protect
@@ -204,7 +205,7 @@ func validateTopic(topic string) error {
204205
return nil
205206
}
206207

207-
func validateEnvelope(envelope *events.Envelope) error {
208+
func validateEnvelope(envelope *v1.Envelope) error {
208209
if err := namespaces.Validate(envelope.Namespace); err != nil {
209210
return errors.Wrapf(err, "event envelope has invalid namespace")
210211
}
Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package events
1+
package exchange
22

33
import (
44
"context"
@@ -8,19 +8,20 @@ import (
88
"testing"
99
"time"
1010

11-
events "github.com/containerd/containerd/api/services/events/v1"
11+
v1 "github.com/containerd/containerd/api/services/events/v1"
1212
"github.com/containerd/containerd/errdefs"
13+
"github.com/containerd/containerd/events"
1314
"github.com/containerd/containerd/namespaces"
1415
"github.com/containerd/typeurl"
1516
"github.com/pkg/errors"
1617
)
1718

1819
func TestExchangeBasic(t *testing.T) {
1920
ctx := namespaces.WithNamespace(context.Background(), t.Name())
20-
testevents := []Event{
21-
&events.ContainerCreate{ID: "asdf"},
22-
&events.ContainerCreate{ID: "qwer"},
23-
&events.ContainerCreate{ID: "zxcv"},
21+
testevents := []events.Event{
22+
&v1.ContainerCreate{ID: "asdf"},
23+
&v1.ContainerCreate{ID: "qwer"},
24+
&v1.ContainerCreate{ID: "zxcv"},
2425
}
2526
exchange := NewExchange()
2627

@@ -55,7 +56,7 @@ func TestExchangeBasic(t *testing.T) {
5556
wg.Wait()
5657

5758
for _, subscriber := range []struct {
58-
eventq <-chan *events.Envelope
59+
eventq <-chan *v1.Envelope
5960
errq <-chan error
6061
cancel func()
6162
}{
@@ -79,7 +80,7 @@ func TestExchangeBasic(t *testing.T) {
7980
if err != nil {
8081
t.Fatal(err)
8182
}
82-
received = append(received, ev.(*events.ContainerCreate))
83+
received = append(received, ev.(*v1.ContainerCreate))
8384
case err := <-subscriber.errq:
8485
if err != nil {
8586
t.Fatal(err)
@@ -117,7 +118,7 @@ func TestExchangeValidateTopic(t *testing.T) {
117118
},
118119
} {
119120
t.Run(testcase.input, func(t *testing.T) {
120-
event := &events.ContainerCreate{ID: t.Name()}
121+
event := &v1.ContainerCreate{ID: t.Name()}
121122
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
122123
if err == nil {
123124
t.Fatalf("expected error %v, received nil", testcase.err)
@@ -131,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) {
131132
t.Fatal(err)
132133
}
133134

134-
envelope := events.Envelope{
135+
envelope := v1.Envelope{
135136
Timestamp: time.Now().UTC(),
136137
Namespace: namespace,
137138
Topic: testcase.input,

linux/bundle.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"os"
1010
"path/filepath"
1111

12-
"github.com/containerd/containerd/events"
12+
"github.com/containerd/containerd/events/exchange"
1313
"github.com/containerd/containerd/linux/runcopts"
1414
client "github.com/containerd/containerd/linux/shim"
1515
"github.com/pkg/errors"
@@ -82,7 +82,7 @@ func ShimRemote(shim, daemonAddress, cgroup string, nonewns, debug bool, exitHan
8282
}
8383

8484
// ShimLocal is a ShimOpt for using an in process shim implementation
85-
func ShimLocal(exchange *events.Exchange) ShimOpt {
85+
func ShimLocal(exchange *exchange.Exchange) ShimOpt {
8686
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
8787
return b.shimConfig(ns, ropts), client.WithLocal(exchange)
8888
}

linux/runtime.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/containerd/containerd/api/types"
1616
"github.com/containerd/containerd/containers"
1717
"github.com/containerd/containerd/errdefs"
18-
"github.com/containerd/containerd/events"
18+
"github.com/containerd/containerd/events/exchange"
1919
"github.com/containerd/containerd/identifiers"
2020
"github.com/containerd/containerd/linux/runcopts"
2121
client "github.com/containerd/containerd/linux/shim"
@@ -143,7 +143,7 @@ type Runtime struct {
143143
monitor runtime.TaskMonitor
144144
tasks *runtime.TaskList
145145
db *metadata.DB
146-
events *events.Exchange
146+
events *exchange.Exchange
147147

148148
config *Config
149149
}

plugin/context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"path/filepath"
66

77
"github.com/containerd/containerd/errdefs"
8-
"github.com/containerd/containerd/events"
8+
"github.com/containerd/containerd/events/exchange"
99
"github.com/containerd/containerd/log"
1010
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
1111
"github.com/pkg/errors"
@@ -18,7 +18,7 @@ type InitContext struct {
1818
State string
1919
Config interface{}
2020
Address string
21-
Events *events.Exchange
21+
Events *exchange.Exchange
2222

2323
Meta *Meta // plugins can fill in metadata at init.
2424

server/server.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
version "github.com/containerd/containerd/api/services/version/v1"
2323
"github.com/containerd/containerd/content"
2424
"github.com/containerd/containerd/content/local"
25-
"github.com/containerd/containerd/events"
25+
"github.com/containerd/containerd/events/exchange"
2626
"github.com/containerd/containerd/log"
2727
"github.com/containerd/containerd/metadata"
2828
"github.com/containerd/containerd/plugin"
@@ -65,7 +65,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
6565
services []plugin.Service
6666
s = &Server{
6767
rpc: rpc,
68-
events: events.NewExchange(),
68+
events: exchange.NewExchange(),
6969
}
7070
initialized = plugin.NewPluginSet()
7171
)
@@ -122,7 +122,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
122122
// Server is the containerd main daemon
123123
type Server struct {
124124
rpc *grpc.Server
125-
events *events.Exchange
125+
events *exchange.Exchange
126126
}
127127

128128
// ServeGRPC provides the containerd grpc APIs on the provided listener

services/events/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package events
33
import (
44
api "github.com/containerd/containerd/api/services/events/v1"
55
"github.com/containerd/containerd/errdefs"
6-
"github.com/containerd/containerd/events"
6+
"github.com/containerd/containerd/events/exchange"
77
"github.com/containerd/containerd/plugin"
88
"github.com/golang/protobuf/ptypes/empty"
99
"github.com/pkg/errors"
@@ -22,11 +22,11 @@ func init() {
2222
}
2323

2424
type service struct {
25-
events *events.Exchange
25+
events *exchange.Exchange
2626
}
2727

2828
// NewService returns the GRPC events server
29-
func NewService(events *events.Exchange) api.EventsServer {
29+
func NewService(events *exchange.Exchange) api.EventsServer {
3030
return &service{events: events}
3131
}
3232

0 commit comments

Comments
 (0)
X Tutup