X Tutup
Skip to content

Commit 00596f4

Browse files
committed
Add gc policy plugin
Add garbage collection as a background process and policy configuration for configuring when to run garbage collection. By default garbage collection will run when deletion occurs and no more than 20ms out of every second. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
1 parent 1bd39d3 commit 00596f4

File tree

10 files changed

+616
-50
lines changed

10 files changed

+616
-50
lines changed

cmd/containerd/builtins.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
// register containerd builtins here
44
import (
55
_ "github.com/containerd/containerd/diff/walking"
6+
_ "github.com/containerd/containerd/gc/policy"
67
_ "github.com/containerd/containerd/services/containers"
78
_ "github.com/containerd/containerd/services/content"
89
_ "github.com/containerd/containerd/services/diff"

gc/scheduler/scheduler.go

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/containerd/containerd/log"
11+
"github.com/containerd/containerd/metadata"
12+
"github.com/containerd/containerd/plugin"
13+
)
14+
15+
// Config configures the garbage collection policies.
16+
type Config struct {
17+
// PauseThreshold represents the maximum amount of time garbage
18+
// collection should be scheduled based on the average pause time.
19+
// For example, a value of 0.02 means that scheduled garbage collection
20+
// pauses should present at most 2% of real time,
21+
// or 20ms of every second.
22+
//
23+
// A maximum value of .5 is enforced to prevent over scheduling of the
24+
// garbage collector, trigger options are available to run in a more
25+
// predictable time frame after mutation.
26+
//
27+
// Default is 0.02
28+
PauseThreshold float64 `toml:"pause_threshold"`
29+
30+
// DeletionThreshold is used to guarantee that a garbage collection is
31+
// scheduled after configured number of deletions have occurred
32+
// since the previous garbage collection. A value of 0 indicates that
33+
// garbage collection will not be triggered by deletion count.
34+
//
35+
// Default 0
36+
DeletionThreshold int `toml:"deletion_threshold"`
37+
38+
// MutationThreshold is used to guarantee that a garbage collection is
39+
// run after a configured number of database mutations have occurred
40+
// since the previous garbage collection. A value of 0 indicates that
41+
// garbage collection will only be run after a manual trigger or
42+
// deletion. Unlike the deletion threshold, the mutation threshold does
43+
// not cause scheduling of a garbage collection, but ensures GC is run
44+
// at the next scheduled GC.
45+
//
46+
// Default 100
47+
MutationThreshold int `toml:"mutation_threshold"`
48+
49+
// ScheduleDelayMs is the number of milliseconds in the future to
50+
// schedule a garbage collection triggered manually or by exceeding
51+
// the configured threshold for deletion or mutation. A zero value
52+
// will immediately schedule.
53+
//
54+
// Default is 0
55+
ScheduleDelayMs int `toml:"schedule_delay_ms"`
56+
57+
// StartupDelayMs is the number of milliseconds to do an initial
58+
// garbage collection after startup. The initial garbage collection
59+
// is used to set the base for pause threshold and should be scheduled
60+
// in the future to avoid slowing down other startup processes.
61+
//
62+
// Default is 100
63+
StartupDelayMs int `toml:"startup_delay_ms"`
64+
}
65+
66+
func init() {
67+
plugin.Register(&plugin.Registration{
68+
Type: plugin.GCPlugin,
69+
ID: "scheduler",
70+
Requires: []plugin.Type{
71+
plugin.MetadataPlugin,
72+
},
73+
Config: &Config{
74+
PauseThreshold: 0.02,
75+
DeletionThreshold: 0,
76+
MutationThreshold: 100,
77+
ScheduleDelayMs: 0,
78+
StartupDelayMs: 100,
79+
},
80+
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
81+
md, err := ic.Get(plugin.MetadataPlugin)
82+
if err != nil {
83+
return nil, err
84+
}
85+
86+
m := newScheduler(md.(*metadata.DB), ic.Config.(*Config))
87+
88+
ic.Meta.Exports = map[string]string{
89+
"PauseThreshold": fmt.Sprint(m.pauseThreshold),
90+
"DeletionThreshold": fmt.Sprint(m.deletionThreshold),
91+
"MutationThreshold": fmt.Sprint(m.mutationThreshold),
92+
"ScheduleDelay": fmt.Sprint(m.scheduleDelay),
93+
}
94+
95+
go m.run(ic.Context)
96+
97+
return m, nil
98+
},
99+
})
100+
}
101+
102+
type mutationEvent struct {
103+
ts time.Time
104+
mutation bool
105+
dirty bool
106+
}
107+
108+
type collector interface {
109+
RegisterMutationCallback(func(bool))
110+
GarbageCollect(context.Context) (metadata.GCStats, error)
111+
}
112+
113+
type gcScheduler struct {
114+
c collector
115+
116+
eventC chan mutationEvent
117+
118+
waiterL sync.Mutex
119+
waiters []chan metadata.GCStats
120+
121+
pauseThreshold float64
122+
deletionThreshold int
123+
mutationThreshold int
124+
scheduleDelay time.Duration
125+
startupDelay time.Duration
126+
}
127+
128+
func newScheduler(c collector, cfg *Config) *gcScheduler {
129+
eventC := make(chan mutationEvent)
130+
131+
s := &gcScheduler{
132+
c: c,
133+
eventC: eventC,
134+
pauseThreshold: cfg.PauseThreshold,
135+
deletionThreshold: cfg.DeletionThreshold,
136+
mutationThreshold: cfg.MutationThreshold,
137+
scheduleDelay: time.Duration(cfg.ScheduleDelayMs) * time.Millisecond,
138+
startupDelay: time.Duration(cfg.StartupDelayMs) * time.Millisecond,
139+
}
140+
141+
if s.pauseThreshold < 0.0 {
142+
s.pauseThreshold = 0.0
143+
}
144+
if s.pauseThreshold > 0.5 {
145+
s.pauseThreshold = 0.5
146+
}
147+
if s.mutationThreshold < 0 {
148+
s.mutationThreshold = 0
149+
}
150+
if s.scheduleDelay < 0 {
151+
s.scheduleDelay = 0
152+
}
153+
if s.startupDelay < 0 {
154+
s.startupDelay = 0
155+
}
156+
157+
c.RegisterMutationCallback(s.mutationCallback)
158+
159+
return s
160+
}
161+
162+
func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (metadata.GCStats, error) {
163+
return s.wait(ctx, true)
164+
}
165+
166+
func (s *gcScheduler) wait(ctx context.Context, trigger bool) (metadata.GCStats, error) {
167+
wc := make(chan metadata.GCStats, 1)
168+
s.waiterL.Lock()
169+
s.waiters = append(s.waiters, wc)
170+
s.waiterL.Unlock()
171+
172+
if trigger {
173+
e := mutationEvent{
174+
ts: time.Now(),
175+
}
176+
go func() {
177+
s.eventC <- e
178+
}()
179+
}
180+
181+
var gcStats metadata.GCStats
182+
select {
183+
case stats, ok := <-wc:
184+
if !ok {
185+
return metadata.GCStats{}, errors.New("gc failed")
186+
}
187+
gcStats = stats
188+
case <-ctx.Done():
189+
return metadata.GCStats{}, ctx.Err()
190+
}
191+
192+
return gcStats, nil
193+
}
194+
195+
func (s *gcScheduler) mutationCallback(dirty bool) {
196+
e := mutationEvent{
197+
ts: time.Now(),
198+
mutation: true,
199+
dirty: dirty,
200+
}
201+
go func() {
202+
s.eventC <- e
203+
}()
204+
}
205+
206+
func schedule(d time.Duration) (<-chan time.Time, *time.Time) {
207+
next := time.Now().Add(d)
208+
return time.After(d), &next
209+
}
210+
211+
func (s *gcScheduler) run(ctx context.Context) {
212+
var (
213+
schedC <-chan time.Time
214+
215+
lastCollection *time.Time
216+
nextCollection *time.Time
217+
218+
interval = time.Second
219+
gcTime time.Duration
220+
collections int
221+
222+
triggered bool
223+
deletions int
224+
mutations int
225+
)
226+
if s.startupDelay > 0 {
227+
schedC, nextCollection = schedule(s.startupDelay)
228+
}
229+
for {
230+
select {
231+
case <-schedC:
232+
// Check if garbage collection can be skipped because
233+
// it is not needed or was not requested and reschedule
234+
// it to attempt again after another time interval.
235+
if !triggered && lastCollection != nil && deletions == 0 &&
236+
(s.mutationThreshold == 0 || mutations < s.mutationThreshold) {
237+
schedC, nextCollection = schedule(interval)
238+
continue
239+
}
240+
break
241+
case e := <-s.eventC:
242+
if lastCollection != nil && lastCollection.After(e.ts) {
243+
continue
244+
}
245+
if e.dirty {
246+
deletions++
247+
}
248+
if e.mutation {
249+
mutations++
250+
} else {
251+
triggered = true
252+
}
253+
254+
// Check if condition should cause immediate collection.
255+
if triggered ||
256+
(s.deletionThreshold > 0 && deletions >= s.deletionThreshold) ||
257+
(nextCollection == nil && ((s.deletionThreshold == 0 && deletions > 0) ||
258+
(s.mutationThreshold > 0 && mutations >= s.mutationThreshold))) {
259+
// Check if not already scheduled before delay threshold
260+
if nextCollection == nil || nextCollection.After(time.Now().Add(s.scheduleDelay)) {
261+
schedC, nextCollection = schedule(s.scheduleDelay)
262+
}
263+
}
264+
265+
continue
266+
case <-ctx.Done():
267+
return
268+
}
269+
270+
s.waiterL.Lock()
271+
272+
stats, err := s.c.GarbageCollect(ctx)
273+
last := time.Now()
274+
if err != nil {
275+
log.G(ctx).WithError(err).Error("garbage collection failed")
276+
277+
// Reschedule garbage collection for same duration + 1 second
278+
schedC, nextCollection = schedule(nextCollection.Sub(*lastCollection) + time.Second)
279+
280+
// Update last collection time even though failure occured
281+
lastCollection = &last
282+
283+
for _, w := range s.waiters {
284+
close(w)
285+
}
286+
s.waiters = nil
287+
s.waiterL.Unlock()
288+
continue
289+
}
290+
291+
log.G(ctx).WithField("d", stats.MetaD).Debug("garbage collected")
292+
293+
gcTime += stats.MetaD
294+
collections++
295+
triggered = false
296+
deletions = 0
297+
mutations = 0
298+
299+
// Calculate new interval with updated times
300+
if s.pauseThreshold > 0.0 {
301+
// Set interval to average gc time divided by the pause threshold
302+
// This algorithm ensures that a gc is scheduled to allow enough
303+
// runtime in between gc to reach the pause threshold.
304+
// Pause threshold is always 0.0 < threshold <= 0.5
305+
avg := float64(gcTime) / float64(collections)
306+
interval = time.Duration(avg/s.pauseThreshold - avg)
307+
}
308+
309+
lastCollection = &last
310+
schedC, nextCollection = schedule(interval)
311+
312+
for _, w := range s.waiters {
313+
w <- stats
314+
}
315+
s.waiters = nil
316+
s.waiterL.Unlock()
317+
}
318+
}

0 commit comments

Comments
 (0)
X Tutup