X Tutup
Skip to content

Commit 536afb8

Browse files
authored
Pipeline ocsp-updater work (letsencrypt#5687)
* Pipeline ocsp-updater work Create a three stage pipeline for concurrent work of ocsp-updates. `findStaleOCSPResponses` will send query results on a channel to `processExpired` which will then mark expired certs and send the stale statuses on a channel to `generateOCSPResponses` which already concurrently signs and stores new responses. Two new stats are introduced for `mark_expired` and `find_stale_ocsp` which give visibility into the number of and the status of those calls to the database.
1 parent 99502b1 commit 536afb8

File tree

2 files changed

+265
-117
lines changed

2 files changed

+265
-117
lines changed

cmd/ocsp-updater/main.go

Lines changed: 164 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"os"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/go-sql-driver/mysql"
@@ -39,6 +40,30 @@ type ocspDb interface {
3940
Exec(query string, args ...interface{}) (sql.Result, error)
4041
}
4142

43+
// failCounter provides a concurrent safe counter.
44+
type failCounter struct {
45+
mu sync.Mutex
46+
count int
47+
}
48+
49+
func (c *failCounter) Add(i int) {
50+
c.mu.Lock()
51+
defer c.mu.Unlock()
52+
c.count += i
53+
}
54+
55+
func (c *failCounter) Reset() {
56+
c.mu.Lock()
57+
defer c.mu.Unlock()
58+
c.count = 0
59+
}
60+
61+
func (c *failCounter) Value() int {
62+
c.mu.Lock()
63+
defer c.mu.Unlock()
64+
return c.count
65+
}
66+
4267
// OCSPUpdater contains the useful objects for the Updater
4368
type OCSPUpdater struct {
4469
log blog.Logger
@@ -55,7 +80,7 @@ type OCSPUpdater struct {
5580

5681
maxBackoff time.Duration
5782
backoffFactor float64
58-
tickFailures int
83+
readFailures failCounter
5984

6085
serialSuffixes []string
6186
queryBody string
@@ -66,10 +91,12 @@ type OCSPUpdater struct {
6691
// these requests in parallel allows us to get higher total throughput.
6792
parallelGenerateOCSPRequests int
6893

69-
stalenessHistogram prometheus.Histogram
70-
genStoreHistogram prometheus.Histogram
71-
generatedCounter *prometheus.CounterVec
72-
storedCounter *prometheus.CounterVec
94+
stalenessHistogram prometheus.Histogram
95+
genStoreHistogram prometheus.Histogram
96+
generatedCounter *prometheus.CounterVec
97+
storedCounter *prometheus.CounterVec
98+
markExpiredCounter *prometheus.CounterVec
99+
findStaleOCSPCounter *prometheus.CounterVec
73100
}
74101

75102
func newUpdater(
@@ -118,12 +145,12 @@ func newUpdater(
118145
stats.MustRegister(genStoreHistogram)
119146
generatedCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
120147
Name: "ocsp_updater_generated",
121-
Help: "A counter of OCSP response generation calls labelled by result",
148+
Help: "A counter of OCSP response generation calls labeled by result",
122149
}, []string{"result"})
123150
stats.MustRegister(generatedCounter)
124151
storedCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
125152
Name: "ocsp_updater_stored",
126-
Help: "A counter of OCSP response storage calls labelled by result",
153+
Help: "A counter of OCSP response storage calls labeled by result",
127154
}, []string{"result"})
128155
stats.MustRegister(storedCounter)
129156
tickHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
@@ -138,6 +165,16 @@ func newUpdater(
138165
Buckets: []float64{10, 100, 1000, 10000, 21600, 32400, 36000, 39600, 43200, 54000, 64800, 75600, 86400, 108000, 129600, 172800},
139166
})
140167
stats.MustRegister(stalenessHistogram)
168+
markExpiredCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
169+
Name: "mark_expired",
170+
Help: "A counter of mark expired calls labeled by result",
171+
}, []string{"result"})
172+
stats.MustRegister(markExpiredCounter)
173+
findStaleOCSPCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
174+
Name: "find_stale_ocsp",
175+
Help: "A counter of query for stale OCSP responses labeled by result",
176+
}, []string{"result"})
177+
stats.MustRegister(findStaleOCSPCounter)
141178

142179
updater := OCSPUpdater{
143180
clk: clk,
@@ -150,6 +187,8 @@ func newUpdater(
150187
genStoreHistogram: genStoreHistogram,
151188
generatedCounter: generatedCounter,
152189
storedCounter: storedCounter,
190+
markExpiredCounter: markExpiredCounter,
191+
findStaleOCSPCounter: findStaleOCSPCounter,
153192
stalenessHistogram: stalenessHistogram,
154193
tickHistogram: tickHistogram,
155194
tickWindow: config.OldOCSPWindow.Duration,
@@ -167,7 +206,13 @@ func getQuestionsForShardList(count int) string {
167206
return strings.TrimRight(strings.Repeat("?,", count), ",")
168207
}
169208

170-
func (updater *OCSPUpdater) findStaleOCSPResponses(oldestLastUpdatedTime time.Time, batchSize int) ([]core.CertificateStatus, error) {
209+
// findStaleOCSPResponses sends a goroutine to fetch rows of stale OCSP
210+
// responses from the database and returns results on a channel.
211+
func (updater *OCSPUpdater) findStaleOCSPResponses(ctx context.Context, oldestLastUpdatedTime time.Time, batchSize int) <-chan core.CertificateStatus {
212+
// staleStatusesOut channel contains all stale ocsp responses that need
213+
// updating.
214+
staleStatusesOut := make(chan core.CertificateStatus)
215+
171216
args := make([]interface{}, 0)
172217
args = append(args, oldestLastUpdatedTime)
173218

@@ -177,41 +222,63 @@ func (updater *OCSPUpdater) findStaleOCSPResponses(oldestLastUpdatedTime time.Ti
177222
}
178223
args = append(args, batchSize)
179224

180-
rows, err := updater.readOnlyDb.Query(
181-
fmt.Sprintf(
182-
"SELECT %s FROM certificateStatus %s",
183-
strings.Join(sa.CertStatusMetadataFields(), ","),
184-
updater.queryBody,
185-
),
186-
args...,
187-
)
188-
if err != nil {
189-
return nil, err
190-
}
225+
go func() {
226+
defer close(staleStatusesOut)
191227

192-
var statuses []core.CertificateStatus
193-
for rows.Next() {
194-
var status core.CertificateStatus
195-
err := sa.ScanCertStatusRow(rows, &status)
228+
rows, err := updater.readOnlyDb.Query(
229+
fmt.Sprintf(
230+
"SELECT %s FROM certificateStatus %s",
231+
strings.Join(sa.CertStatusMetadataFields(), ","),
232+
updater.queryBody,
233+
),
234+
args...,
235+
)
236+
237+
// If error, log and increment retries for backoff. Else no
238+
// error, proceed to push statuses to channel.
196239
if err != nil {
197-
rows.Close()
198-
return nil, err
240+
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
241+
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
242+
updater.readFailures.Add(1)
243+
return
199244
}
200-
statuses = append(statuses, status)
201245

202-
updater.stalenessHistogram.Observe(
203-
oldestLastUpdatedTime.Sub(status.OCSPLastUpdated).Seconds(),
204-
)
205-
}
206-
// Ensure the query wasn't interrupted before it could complete.
207-
err = rows.Close()
208-
if err != nil {
209-
return nil, err
210-
}
246+
for rows.Next() {
247+
var status core.CertificateStatus
248+
err := sa.ScanCertStatusRow(rows, &status)
249+
if err != nil {
250+
rows.Close()
251+
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
252+
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
253+
updater.readFailures.Add(1)
254+
return
255+
}
256+
staleness := oldestLastUpdatedTime.Sub(status.OCSPLastUpdated).Seconds()
257+
updater.stalenessHistogram.Observe(staleness)
258+
select {
259+
case <-ctx.Done():
260+
return
261+
case staleStatusesOut <- status:
262+
}
263+
}
264+
// Ensure the query wasn't interrupted before it could complete.
265+
err = rows.Close()
266+
if err != nil {
267+
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
268+
updater.findStaleOCSPCounter.WithLabelValues("failed").Inc()
269+
updater.readFailures.Add(1)
270+
return
271+
}
211272

212-
return statuses, err
273+
updater.findStaleOCSPCounter.WithLabelValues("success").Inc()
274+
updater.readFailures.Reset()
275+
}()
276+
277+
return staleStatusesOut
213278
}
214279

280+
// generateResponse signs an new OCSP response for a given
281+
// `core.CertificateStatus` entry.
215282
func (updater *OCSPUpdater) generateResponse(ctx context.Context, status core.CertificateStatus) (*core.CertificateStatus, error) {
216283
if status.IssuerID == 0 {
217284
return nil, errors.New("cert status has 0 IssuerID")
@@ -235,6 +302,7 @@ func (updater *OCSPUpdater) generateResponse(ctx context.Context, status core.Ce
235302
return &status, nil
236303
}
237304

305+
// storeResponse stores a given CertificateStatus in the database.
238306
func (updater *OCSPUpdater) storeResponse(status *core.CertificateStatus) error {
239307
// Update the certificateStatus table with the new OCSP response, the status
240308
// WHERE is used make sure we don't overwrite a revoked response with a one
@@ -263,7 +331,39 @@ func (updater *OCSPUpdater) markExpired(status core.CertificateStatus) error {
263331
return err
264332
}
265333

266-
func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses []core.CertificateStatus) error {
334+
// processExpired is a pipeline step to process a channel of
335+
// `core.CertificateStatus` and set `isExpired` in the database.
336+
func (updater *OCSPUpdater) processExpired(ctx context.Context, staleStatusesIn <-chan core.CertificateStatus) <-chan core.CertificateStatus {
337+
tickStart := updater.clk.Now()
338+
staleStatusesOut := make(chan core.CertificateStatus)
339+
go func() {
340+
defer close(staleStatusesOut)
341+
for status := range staleStatusesIn {
342+
if !status.IsExpired && tickStart.After(status.NotAfter) {
343+
err := updater.markExpired(status)
344+
if err != nil {
345+
// Update error counters and log
346+
updater.log.AuditErrf("Failed to set certificate expired: %s", err)
347+
updater.markExpiredCounter.WithLabelValues("failed").Inc()
348+
} else {
349+
updater.markExpiredCounter.WithLabelValues("success").Inc()
350+
}
351+
}
352+
select {
353+
case <-ctx.Done():
354+
return
355+
case staleStatusesOut <- status:
356+
}
357+
}
358+
}()
359+
360+
return staleStatusesOut
361+
}
362+
363+
// generateOCSPResponses is the final stage of a pipeline. It takes a
364+
// channel of `core.CertificateStatus` and sends a goroutine for each to
365+
// obtain a new OCSP response and update the status in the database.
366+
func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, staleStatusesIn <-chan core.CertificateStatus) {
267367
// Use the semaphore pattern from
268368
// https://github.com/golang/go/wiki/BoundingResourceUse to send a number of
269369
// GenerateOCSP / storeResponse requests in parallel, while limiting the total number of
@@ -278,15 +378,19 @@ func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses
278378
updater.genStoreHistogram.Observe(time.Since(start).Seconds())
279379
}
280380

381+
// Work runs as a goroutine per ocsp response to obtain a new ocsp
382+
// response and store it in the database.
281383
work := func(status core.CertificateStatus) {
282384
defer done(updater.clk.Now())
385+
283386
meta, err := updater.generateResponse(ctx, status)
284387
if err != nil {
285388
updater.log.AuditErrf("Failed to generate OCSP response: %s", err)
286389
updater.generatedCounter.WithLabelValues("failed").Inc()
287390
return
288391
}
289392
updater.generatedCounter.WithLabelValues("success").Inc()
393+
290394
err = updater.storeResponse(meta)
291395
if err != nil {
292396
updater.log.AuditErrf("Failed to store OCSP response: %s", err)
@@ -296,38 +400,18 @@ func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses
296400
updater.storedCounter.WithLabelValues("success").Inc()
297401
}
298402

299-
for _, status := range statuses {
403+
// Consume the stale statuses channel and send off a sign/store request
404+
// for each stale response.
405+
for status := range staleStatusesIn {
300406
wait()
301407
go work(status)
302408
}
303-
// Block until the channel reaches its full capacity again, indicating each
304-
// goroutine has completed.
409+
410+
// Block until the sem channel reaches its full capacity again,
411+
// indicating each goroutine has completed.
305412
for i := 0; i < updater.parallelGenerateOCSPRequests; i++ {
306413
wait()
307414
}
308-
return nil
309-
}
310-
311-
// updateOCSPResponses looks for certificates with stale OCSP responses and
312-
// generates/stores new ones
313-
func (updater *OCSPUpdater) updateOCSPResponses(ctx context.Context, batchSize int) error {
314-
tickStart := updater.clk.Now()
315-
statuses, err := updater.findStaleOCSPResponses(tickStart.Add(-updater.ocspMinTimeToExpiry), batchSize)
316-
if err != nil {
317-
updater.log.AuditErrf("Failed to find stale OCSP responses: %s", err)
318-
return err
319-
}
320-
321-
for _, s := range statuses {
322-
if !s.IsExpired && tickStart.After(s.NotAfter) {
323-
err := updater.markExpired(s)
324-
if err != nil {
325-
return err
326-
}
327-
}
328-
}
329-
330-
return updater.generateOCSPResponses(ctx, statuses)
331415
}
332416

333417
type config struct {
@@ -362,25 +446,35 @@ type OCSPUpdaterConfig struct {
362446

363447
func (updater *OCSPUpdater) tick() {
364448
start := updater.clk.Now()
365-
err := updater.updateOCSPResponses(context.Background(), updater.batchSize)
449+
450+
ctx, cancel := context.WithCancel(context.Background())
451+
defer cancel()
452+
453+
oldestLastUpdatedTime := updater.clk.Now().Add(-updater.ocspMinTimeToExpiry)
454+
455+
// Run pipeline
456+
updater.generateOCSPResponses(ctx, updater.processExpired(ctx, updater.findStaleOCSPResponses(ctx, oldestLastUpdatedTime, updater.batchSize)))
457+
366458
end := updater.clk.Now()
367459
took := end.Sub(start)
368460
long, state := "false", "success"
369461
if took > updater.tickWindow {
370462
long = "true"
371463
}
464+
465+
// Set sleep duration to the configured tickWindow.
372466
sleepDur := start.Add(updater.tickWindow).Sub(end)
373-
if err != nil {
374-
state = "failed"
375-
updater.tickFailures++
467+
468+
// Set sleep duration higher to backoff starting the next tick and
469+
// reading from the database if the last read failed.
470+
readFails := updater.readFailures.Value()
471+
if readFails > 0 {
376472
sleepDur = core.RetryBackoff(
377-
updater.tickFailures,
473+
readFails,
378474
updater.tickWindow,
379475
updater.maxBackoff,
380476
updater.backoffFactor,
381477
)
382-
} else if updater.tickFailures > 0 {
383-
updater.tickFailures = 0
384478
}
385479
updater.tickHistogram.WithLabelValues(state, long).Observe(took.Seconds())
386480
updater.clk.Sleep(sleepDur)
@@ -498,7 +592,6 @@ func main() {
498592
cmd.FailOnError(err, "Failed to create updater")
499593

500594
go cmd.CatchSignals(logger, nil)
501-
502595
for {
503596
updater.tick()
504597
}

0 commit comments

Comments
 (0)
X Tutup