X Tutup
Skip to content

Commit 8558b98

Browse files
authored
Merge pull request containerd#1582 from dmcgowan/metadata-db-object
Create metadata db object
2 parents 7c4bca5 + 7f657ce commit 8558b98

File tree

22 files changed

+543
-102
lines changed

22 files changed

+543
-102
lines changed

differ/differ.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"os"
77
"strings"
88

9-
"github.com/boltdb/bolt"
109
"github.com/containerd/containerd/archive"
1110
"github.com/containerd/containerd/archive/compression"
1211
"github.com/containerd/containerd/content"
@@ -26,19 +25,14 @@ func init() {
2625
Type: plugin.DiffPlugin,
2726
ID: "walking",
2827
Requires: []plugin.Type{
29-
plugin.ContentPlugin,
3028
plugin.MetadataPlugin,
3129
},
3230
Init: func(ic *plugin.InitContext) (interface{}, error) {
33-
c, err := ic.Get(plugin.ContentPlugin)
34-
if err != nil {
35-
return nil, err
36-
}
3731
md, err := ic.Get(plugin.MetadataPlugin)
3832
if err != nil {
3933
return nil, err
4034
}
41-
return NewWalkingDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
35+
return NewWalkingDiff(md.(*metadata.DB).ContentStore())
4236
},
4337
})
4438
}

linux/runtime.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
112112
state: ic.State,
113113
monitor: monitor.(runtime.TaskMonitor),
114114
tasks: runtime.NewTaskList(),
115-
db: m.(*bolt.DB),
115+
db: m.(*metadata.DB),
116116
address: ic.Address,
117117
events: ic.Events,
118118
config: cfg,
@@ -138,7 +138,7 @@ type Runtime struct {
138138

139139
monitor runtime.TaskMonitor
140140
tasks *runtime.TaskList
141-
db *bolt.DB
141+
db *metadata.DB
142142
events *events.Exchange
143143

144144
config *Config

metadata/bolt.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ func WithTransactionContext(ctx context.Context, tx *bolt.Tx) context.Context {
1717
return context.WithValue(ctx, transactionKey{}, tx)
1818
}
1919

20+
type transactor interface {
21+
View(fn func(*bolt.Tx) error) error
22+
Update(fn func(*bolt.Tx) error) error
23+
}
24+
2025
// view gets a bolt db transaction either from the context
2126
// or starts a new one with the provided bolt database.
22-
func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
27+
func view(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error {
2328
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
2429
if !ok {
2530
return db.View(fn)
@@ -29,7 +34,7 @@ func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
2934

3035
// update gets a writable bolt db transaction either from the context
3136
// or starts a new one with the provided bolt database.
32-
func update(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
37+
func update(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error {
3338
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
3439
if !ok {
3540
return db.Update(fn)

metadata/buckets.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import (
2828
// key: object-specific key identifying the storage bucket for the objects
2929
// contents.
3030
var (
31-
bucketKeyVersion = []byte("v1")
31+
bucketKeyVersion = []byte(schemaVersion)
32+
bucketKeyDBVersion = []byte("version") // stores the version of the schema
3233
bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace.
3334
bucketKeyObjectIndexes = []byte("indexes") // reserved
3435
bucketKeyObjectImages = []byte("images") // stores image objects
@@ -45,6 +46,7 @@ var (
4546
bucketKeyRuntime = []byte("runtime")
4647
bucketKeyName = []byte("name")
4748
bucketKeyParent = []byte("parent")
49+
bucketKeyChildren = []byte("children")
4850
bucketKeyOptions = []byte("options")
4951
bucketKeySpec = []byte("spec")
5052
bucketKeySnapshotKey = []byte("snapshotKey")

metadata/containers_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"path/filepath"
99
"reflect"
10+
"strings"
1011
"testing"
1112
"time"
1213

@@ -701,7 +702,7 @@ func testEnv(t *testing.T) (context.Context, *bolt.DB, func()) {
701702
ctx, cancel := context.WithCancel(context.Background())
702703
ctx = namespaces.WithNamespace(ctx, "testing")
703704

704-
dirname, err := ioutil.TempDir("", t.Name()+"-")
705+
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
705706
if err != nil {
706707
t.Fatal(err)
707708
}

metadata/content.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ import (
1919

2020
type contentStore struct {
2121
content.Store
22-
db *bolt.DB
22+
db transactor
2323
}
2424

25-
// NewContentStore returns a namespaced content store using an existing
25+
// newContentStore returns a namespaced content store using an existing
2626
// content store interface.
27-
func NewContentStore(db *bolt.DB, cs content.Store) content.Store {
27+
func newContentStore(db transactor, cs content.Store) content.Store {
2828
return &contentStore{
2929
Store: cs,
3030
db: db,
@@ -353,7 +353,7 @@ type namespacedWriter struct {
353353
content.Writer
354354
ref string
355355
namespace string
356-
db *bolt.DB
356+
db transactor
357357
}
358358

359359
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
@@ -406,7 +406,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
406406

407407
commitTime := time.Now().UTC()
408408

409-
sizeEncoded, err := encodeSize(size)
409+
sizeEncoded, err := encodeInt(size)
410410
if err != nil {
411411
return err
412412
}
@@ -488,7 +488,7 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
488488
}
489489

490490
// Write size
491-
sizeEncoded, err := encodeSize(info.Size)
491+
sizeEncoded, err := encodeInt(info.Size)
492492
if err != nil {
493493
return err
494494
}

metadata/content_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func createContentStore(ctx context.Context, root string) (content.Store, func()
2323
return nil, nil, err
2424
}
2525

26-
return NewContentStore(db, cs), func() error {
26+
return NewDB(db, cs, nil).ContentStore(), func() error {
2727
return db.Close()
2828
}, nil
2929
}

metadata/db.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package metadata
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"time"
7+
8+
"github.com/boltdb/bolt"
9+
"github.com/containerd/containerd/content"
10+
"github.com/containerd/containerd/log"
11+
"github.com/containerd/containerd/snapshot"
12+
"github.com/pkg/errors"
13+
)
14+
15+
const (
16+
// schemaVersion represents the schema version of
17+
// the database. This schema version represents the
18+
// structure of the data in the database. The schema
19+
// can envolve at any time but any backwards
20+
// incompatible changes or structural changes require
21+
// bumping the schema version.
22+
schemaVersion = "v1"
23+
24+
// dbVersion represents updates to the schema
25+
// version which are additions and compatible with
26+
// prior version of the same schema.
27+
dbVersion = 1
28+
)
29+
30+
type DB struct {
31+
db *bolt.DB
32+
ss map[string]snapshot.Snapshotter
33+
cs content.Store
34+
}
35+
36+
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB {
37+
return &DB{
38+
db: db,
39+
ss: ss,
40+
cs: cs,
41+
}
42+
}
43+
44+
func (m *DB) Init(ctx context.Context) error {
45+
// errSkip is used when no migration or version needs to be written
46+
// to the database and the transaction can be immediately rolled
47+
// back rather than performing a much slower and unnecessary commit.
48+
var errSkip = errors.New("skip update")
49+
50+
err := m.db.Update(func(tx *bolt.Tx) error {
51+
var (
52+
// current schema and version
53+
schema = "v0"
54+
version = 0
55+
)
56+
57+
i := len(migrations)
58+
for ; i > 0; i-- {
59+
migration := migrations[i-1]
60+
61+
bkt := tx.Bucket([]byte(migration.schema))
62+
if bkt == nil {
63+
// Hasn't encountered another schema, go to next migration
64+
if schema == "v0" {
65+
continue
66+
}
67+
break
68+
}
69+
if schema == "v0" {
70+
schema = migration.schema
71+
vb := bkt.Get(bucketKeyDBVersion)
72+
if vb != nil {
73+
v, _ := binary.Varint(vb)
74+
version = int(v)
75+
}
76+
}
77+
78+
if version >= migration.version {
79+
break
80+
}
81+
}
82+
83+
// Previous version fo database found
84+
if schema != "v0" {
85+
updates := migrations[i:]
86+
87+
// No migration updates, return immediately
88+
if len(updates) == 0 {
89+
return errSkip
90+
}
91+
92+
for _, m := range updates {
93+
t0 := time.Now()
94+
if err := m.migrate(tx); err != nil {
95+
return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
96+
}
97+
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version)
98+
}
99+
}
100+
101+
bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
102+
if err != nil {
103+
return err
104+
}
105+
106+
versionEncoded, err := encodeInt(dbVersion)
107+
if err != nil {
108+
return err
109+
}
110+
111+
return bkt.Put(bucketKeyDBVersion, versionEncoded)
112+
})
113+
if err == errSkip {
114+
err = nil
115+
}
116+
return err
117+
}
118+
119+
func (m *DB) ContentStore() content.Store {
120+
if m.cs == nil {
121+
return nil
122+
}
123+
return newContentStore(m, m.cs)
124+
}
125+
126+
func (m *DB) Snapshotter(name string) snapshot.Snapshotter {
127+
sn, ok := m.ss[name]
128+
if !ok {
129+
return nil
130+
}
131+
return newSnapshotter(m, name, sn)
132+
}
133+
134+
func (m *DB) View(fn func(*bolt.Tx) error) error {
135+
return m.db.View(fn)
136+
}
137+
138+
func (m *DB) Update(fn func(*bolt.Tx) error) error {
139+
return m.db.Update(fn)
140+
}

0 commit comments

Comments
 (0)
X Tutup