X Tutup
Skip to content

Commit d88de4a

Browse files
committed
content: change Writer/ReaderAt to take OCI
This change allows implementations to resolve the location of the actual data using OCI descriptor fields such as MediaType. No OCI descriptor field is written to the store. No change on gRPC API. Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>
1 parent e4ad710 commit d88de4a

File tree

32 files changed

+279
-169
lines changed

32 files changed

+279
-169
lines changed

client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func TestImagePullAllPlatforms(t *testing.T) {
214214
}
215215
// check if childless data type has blob in content store
216216
for _, desc := range children {
217-
ra, err := cs.ReaderAt(ctx, desc.Digest)
217+
ra, err := cs.ReaderAt(ctx, desc)
218218
if err != nil {
219219
t.Fatal(err)
220220
}
@@ -275,7 +275,7 @@ func TestImagePullSomePlatforms(t *testing.T) {
275275

276276
// check if childless data type has blob in content store
277277
for _, desc := range children {
278-
ra, err := cs.ReaderAt(ctx, desc.Digest)
278+
ra, err := cs.ReaderAt(ctx, desc)
279279
if err != nil {
280280
t.Fatal(err)
281281
}

cmd/ctr/commands/content/content.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var (
7272
}
7373
defer cancel()
7474
cs := client.ContentStore()
75-
ra, err := cs.ReaderAt(ctx, dgst)
75+
ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
7676
if err != nil {
7777
return err
7878
}
@@ -121,7 +121,7 @@ var (
121121
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
122122
// all data to be written in a single invocation. Allow multiple writes
123123
// to the same transaction key followed by a commit.
124-
return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest)
124+
return content.WriteBlob(ctx, cs, ref, os.Stdin, ocispec.Descriptor{Size: expectedSize, Digest: expectedDigest})
125125
},
126126
}
127127

@@ -314,7 +314,7 @@ var (
314314
}
315315
defer cancel()
316316
cs := client.ContentStore()
317-
ra, err := cs.ReaderAt(ctx, dgst)
317+
ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
318318
if err != nil {
319319
return err
320320
}
@@ -326,7 +326,7 @@ var (
326326
}
327327
defer nrc.Close()
328328

329-
wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key?
329+
wr, err := cs.Writer(ctx, content.WithRef("edit-"+object)) // TODO(stevvooe): Choose a better key?
330330
if err != nil {
331331
return err
332332
}
@@ -482,7 +482,7 @@ var (
482482
Size: info.Size,
483483
}
484484

485-
ra, err := cs.ReaderAt(ctx, dgst)
485+
ra, err := cs.ReaderAt(ctx, desc)
486486
if err != nil {
487487
return err
488488
}

cmd/ctr/commands/snapshots/snapshots.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ var diffCommand = cli.Command{
167167
}
168168
}
169169

170-
ra, err := client.ContentStore().ReaderAt(ctx, desc.Digest)
170+
ra, err := client.ContentStore().ReaderAt(ctx, desc)
171171
if err != nil {
172172
return err
173173
}

container_opts_unix.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ import (
3636
"github.com/containerd/containerd/runtime/linux/runctypes"
3737
"github.com/gogo/protobuf/proto"
3838
protobuf "github.com/gogo/protobuf/types"
39-
digest "github.com/opencontainers/go-digest"
4039
"github.com/opencontainers/image-spec/identity"
4140
"github.com/opencontainers/image-spec/specs-go/v1"
41+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
4242
"github.com/pkg/errors"
4343
)
4444

@@ -50,10 +50,9 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
5050
return func(ctx context.Context, client *Client, c *containers.Container) error {
5151
var (
5252
desc = im.Target()
53-
id = desc.Digest
5453
store = client.ContentStore()
5554
)
56-
index, err := decodeIndex(ctx, store, id)
55+
index, err := decodeIndex(ctx, store, desc)
5756
if err != nil {
5857
return err
5958
}
@@ -80,7 +79,7 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
8079
}
8180
c.Image = index.Annotations["image.name"]
8281
case images.MediaTypeContainerd1CheckpointConfig:
83-
data, err := content.ReadBlob(ctx, store, m.Digest)
82+
data, err := content.ReadBlob(ctx, store, m)
8483
if err != nil {
8584
return errors.Wrap(err, "unable to read checkpoint config")
8685
}
@@ -113,7 +112,7 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
113112
return func(ctx context.Context, c *Client, info *TaskInfo) error {
114113
desc := im.Target()
115114
id := desc.Digest
116-
index, err := decodeIndex(ctx, c.ContentStore(), id)
115+
index, err := decodeIndex(ctx, c.ContentStore(), desc)
117116
if err != nil {
118117
return err
119118
}
@@ -131,9 +130,9 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
131130
}
132131
}
133132

134-
func decodeIndex(ctx context.Context, store content.Provider, id digest.Digest) (*v1.Index, error) {
133+
func decodeIndex(ctx context.Context, store content.Provider, desc ocispec.Descriptor) (*v1.Index, error) {
135134
var index v1.Index
136-
p, err := content.ReadBlob(ctx, store, id)
135+
p, err := content.ReadBlob(ctx, store, desc)
137136
if err != nil {
138137
return nil, err
139138
}

content/content.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/opencontainers/go-digest"
25+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2526
)
2627

2728
// ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer
@@ -33,12 +34,16 @@ type ReaderAt interface {
3334

3435
// Provider provides a reader interface for specific content
3536
type Provider interface {
36-
ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error)
37+
// ReaderAt only requires desc.Digest to be set.
38+
// Other fields in the descriptor may be used internally for resolving
39+
// the location of the actual data.
40+
ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
3741
}
3842

3943
// Ingester writes content
4044
type Ingester interface {
41-
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
45+
// Some implementations require WithRef to be included in opts.
46+
Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
4247
}
4348

4449
// Info holds content specific information
@@ -142,3 +147,33 @@ func WithLabels(labels map[string]string) Opt {
142147
return nil
143148
}
144149
}
150+
151+
// WriterOpts is internally used by WriterOpt.
152+
type WriterOpts struct {
153+
Ref string
154+
Desc ocispec.Descriptor
155+
}
156+
157+
// WriterOpt is used for passing options to Ingester.Writer.
158+
type WriterOpt func(*WriterOpts) error
159+
160+
// WithDescriptor specifies an OCI descriptor.
161+
// Writer may optionally use the descriptor internally for resolving
162+
// the location of the actual data.
163+
// Write does not require any field of desc to be set.
164+
// If the data size is unknown, desc.Size should be set to 0.
165+
// Some implementations may also accept negative values as "unknown".
166+
func WithDescriptor(desc ocispec.Descriptor) WriterOpt {
167+
return func(opts *WriterOpts) error {
168+
opts.Desc = desc
169+
return nil
170+
}
171+
}
172+
173+
// WithRef specifies a ref string.
174+
func WithRef(ref string) WriterOpt {
175+
return func(opts *WriterOpts) error {
176+
opts.Ref = ref
177+
return nil
178+
}
179+
}

content/helpers.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/containerd/containerd/errdefs"
2828
"github.com/opencontainers/go-digest"
29+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2930
"github.com/pkg/errors"
3031
)
3132

@@ -45,8 +46,8 @@ func NewReader(ra ReaderAt) io.Reader {
4546
// ReadBlob retrieves the entire contents of the blob from the provider.
4647
//
4748
// Avoid using this for large blobs, such as layers.
48-
func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) {
49-
ra, err := provider.ReaderAt(ctx, dgst)
49+
func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) {
50+
ra, err := provider.ReaderAt(ctx, desc)
5051
if err != nil {
5152
return nil, err
5253
}
@@ -65,8 +66,8 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt
6566
// This is useful when the digest and size are known beforehand.
6667
//
6768
// Copy is buffered, so no need to wrap reader in buffered io.
68-
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
69-
cw, err := OpenWriter(ctx, cs, ref, size, expected)
69+
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error {
70+
cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc))
7071
if err != nil {
7172
if !errdefs.IsAlreadyExists(err) {
7273
return err
@@ -76,19 +77,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
7677
}
7778
defer cw.Close()
7879

79-
return Copy(ctx, cw, r, size, expected, opts...)
80+
return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
8081
}
8182

8283
// OpenWriter opens a new writer for the given reference, retrying if the writer
8384
// is locked until the reference is available or returns an error.
84-
func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) {
85+
func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) {
8586
var (
8687
cw Writer
8788
err error
8889
retry = 16
8990
)
9091
for {
91-
cw, err = cs.Writer(ctx, ref, size, expected)
92+
cw, err = cs.Writer(ctx, opts...)
9293
if err != nil {
9394
if !errdefs.IsUnavailable(err) {
9495
return nil, err

content/local/store.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/containerd/containerd/filters"
3535
"github.com/containerd/containerd/log"
3636
digest "github.com/opencontainers/go-digest"
37+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3738
"github.com/pkg/errors"
3839
)
3940

@@ -119,15 +120,15 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]strin
119120
}
120121

121122
// ReaderAt returns an io.ReaderAt for the blob.
122-
func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
123-
p := s.blobPath(dgst)
123+
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
124+
p := s.blobPath(desc.Digest)
124125
fi, err := os.Stat(p)
125126
if err != nil {
126127
if !os.IsNotExist(err) {
127128
return nil, err
128129
}
129130

130-
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
131+
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
131132
}
132133

133134
fp, err := os.Open(p)
@@ -136,7 +137,7 @@ func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.Reade
136137
return nil, err
137138
}
138139

139-
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
140+
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
140141
}
141142

142143
return sizeReaderAt{size: fi.Size(), fp: fp}, nil
@@ -400,11 +401,22 @@ func (s *store) total(ingestPath string) int64 {
400401
// ref at a time.
401402
//
402403
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
403-
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
404+
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
405+
var wOpts content.WriterOpts
406+
for _, opt := range opts {
407+
if err := opt(&wOpts); err != nil {
408+
return nil, err
409+
}
410+
}
411+
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
412+
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
413+
if wOpts.Ref == "" {
414+
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
415+
}
404416
var lockErr error
405417
for count := uint64(0); count < 10; count++ {
406418
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
407-
if err := tryLock(ref); err != nil {
419+
if err := tryLock(wOpts.Ref); err != nil {
408420
if !errdefs.IsUnavailable(err) {
409421
return nil, err
410422
}
@@ -420,9 +432,9 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
420432
return nil, lockErr
421433
}
422434

423-
w, err := s.writer(ctx, ref, total, expected)
435+
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
424436
if err != nil {
425-
unlock(ref)
437+
unlock(wOpts.Ref)
426438
return nil, err
427439
}
428440

content/local/store_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/containerd/containerd/pkg/testutil"
4040
"github.com/gotestyourself/gotestyourself/assert"
4141
"github.com/opencontainers/go-digest"
42+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
4243
)
4344

4445
type memoryLabelStore struct {
@@ -108,7 +109,7 @@ func TestContentWriter(t *testing.T) {
108109
t.Fatal("ingest dir should be created", err)
109110
}
110111

111-
cw, err := cs.Writer(ctx, "myref", 0, "")
112+
cw, err := cs.Writer(ctx, content.WithRef("myref"))
112113
if err != nil {
113114
t.Fatal(err)
114115
}
@@ -117,13 +118,13 @@ func TestContentWriter(t *testing.T) {
117118
}
118119

119120
// reopen, so we can test things
120-
cw, err = cs.Writer(ctx, "myref", 0, "")
121+
cw, err = cs.Writer(ctx, content.WithRef("myref"))
121122
if err != nil {
122123
t.Fatal(err)
123124
}
124125

125126
// make sure that second resume also fails
126-
if _, err = cs.Writer(ctx, "myref", 0, ""); err == nil {
127+
if _, err = cs.Writer(ctx, content.WithRef("myref")); err == nil {
127128
// TODO(stevvooe): This also works across processes. Need to find a way
128129
// to test that, as well.
129130
t.Fatal("no error on second resume")
@@ -166,7 +167,7 @@ func TestContentWriter(t *testing.T) {
166167
t.Fatal(err)
167168
}
168169

169-
cw, err = cs.Writer(ctx, "aref", 0, "")
170+
cw, err = cs.Writer(ctx, content.WithRef("aref"))
170171
if err != nil {
171172
t.Fatal(err)
172173
}
@@ -346,7 +347,8 @@ func checkBlobPath(t *testing.T, cs content.Store, dgst digest.Digest) string {
346347
}
347348

348349
func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest {
349-
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
350+
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p),
351+
ocispec.Descriptor{Size: int64(len(p)), Digest: dgst}); err != nil {
350352
t.Fatal(err)
351353
}
352354

@@ -365,25 +367,25 @@ func TestWriterTruncateRecoversFromIncompleteWrite(t *testing.T) {
365367
defer cancel()
366368

367369
ref := "ref"
368-
content := []byte("this is the content")
369-
total := int64(len(content))
370+
contentB := []byte("this is the content")
371+
total := int64(len(contentB))
370372
setupIncompleteWrite(ctx, t, cs, ref, total)
371373

372-
writer, err := cs.Writer(ctx, ref, total, "")
374+
writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
373375
assert.NilError(t, err)
374376

375377
assert.NilError(t, writer.Truncate(0))
376378

377-
_, err = writer.Write(content)
379+
_, err = writer.Write(contentB)
378380
assert.NilError(t, err)
379381

380-
dgst := digest.FromBytes(content)
382+
dgst := digest.FromBytes(contentB)
381383
err = writer.Commit(ctx, total, dgst)
382384
assert.NilError(t, err)
383385
}
384386

385387
func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) {
386-
writer, err := cs.Writer(ctx, ref, total, "")
388+
writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
387389
assert.NilError(t, err)
388390

389391
_, err = writer.Write([]byte("bad data"))

0 commit comments

Comments
 (0)
X Tutup