88 "fmt"
99 "os"
1010 "strings"
11+ "sync"
1112 "time"
1213
1314 "github.com/go-sql-driver/mysql"
@@ -39,6 +40,30 @@ type ocspDb interface {
3940 Exec (query string , args ... interface {}) (sql.Result , error )
4041}
4142
43+ // failCounter provides a concurrent safe counter.
44+ type failCounter struct {
45+ mu sync.Mutex
46+ count int
47+ }
48+
49+ func (c * failCounter ) Add (i int ) {
50+ c .mu .Lock ()
51+ defer c .mu .Unlock ()
52+ c .count += i
53+ }
54+
55+ func (c * failCounter ) Reset () {
56+ c .mu .Lock ()
57+ defer c .mu .Unlock ()
58+ c .count = 0
59+ }
60+
61+ func (c * failCounter ) Value () int {
62+ c .mu .Lock ()
63+ defer c .mu .Unlock ()
64+ return c .count
65+ }
66+
4267// OCSPUpdater contains the useful objects for the Updater
4368type OCSPUpdater struct {
4469 log blog.Logger
@@ -55,7 +80,7 @@ type OCSPUpdater struct {
5580
5681 maxBackoff time.Duration
5782 backoffFactor float64
58- tickFailures int
83+ readFailures failCounter
5984
6085 serialSuffixes []string
6186 queryBody string
@@ -66,10 +91,12 @@ type OCSPUpdater struct {
6691 // these requests in parallel allows us to get higher total throughput.
6792 parallelGenerateOCSPRequests int
6893
69- stalenessHistogram prometheus.Histogram
70- genStoreHistogram prometheus.Histogram
71- generatedCounter * prometheus.CounterVec
72- storedCounter * prometheus.CounterVec
94+ stalenessHistogram prometheus.Histogram
95+ genStoreHistogram prometheus.Histogram
96+ generatedCounter * prometheus.CounterVec
97+ storedCounter * prometheus.CounterVec
98+ markExpiredCounter * prometheus.CounterVec
99+ findStaleOCSPCounter * prometheus.CounterVec
73100}
74101
75102func newUpdater (
@@ -118,12 +145,12 @@ func newUpdater(
118145 stats .MustRegister (genStoreHistogram )
119146 generatedCounter := prometheus .NewCounterVec (prometheus.CounterOpts {
120147 Name : "ocsp_updater_generated" ,
121- Help : "A counter of OCSP response generation calls labelled by result" ,
148+ Help : "A counter of OCSP response generation calls labeled by result" ,
122149 }, []string {"result" })
123150 stats .MustRegister (generatedCounter )
124151 storedCounter := prometheus .NewCounterVec (prometheus.CounterOpts {
125152 Name : "ocsp_updater_stored" ,
126- Help : "A counter of OCSP response storage calls labelled by result" ,
153+ Help : "A counter of OCSP response storage calls labeled by result" ,
127154 }, []string {"result" })
128155 stats .MustRegister (storedCounter )
129156 tickHistogram := prometheus .NewHistogramVec (prometheus.HistogramOpts {
@@ -138,6 +165,16 @@ func newUpdater(
138165 Buckets : []float64 {10 , 100 , 1000 , 10000 , 21600 , 32400 , 36000 , 39600 , 43200 , 54000 , 64800 , 75600 , 86400 , 108000 , 129600 , 172800 },
139166 })
140167 stats .MustRegister (stalenessHistogram )
168+ markExpiredCounter := prometheus .NewCounterVec (prometheus.CounterOpts {
169+ Name : "mark_expired" ,
170+ Help : "A counter of mark expired calls labeled by result" ,
171+ }, []string {"result" })
172+ stats .MustRegister (markExpiredCounter )
173+ findStaleOCSPCounter := prometheus .NewCounterVec (prometheus.CounterOpts {
174+ Name : "find_stale_ocsp" ,
175+ Help : "A counter of query for stale OCSP responses labeled by result" ,
176+ }, []string {"result" })
177+ stats .MustRegister (findStaleOCSPCounter )
141178
142179 updater := OCSPUpdater {
143180 clk : clk ,
@@ -150,6 +187,8 @@ func newUpdater(
150187 genStoreHistogram : genStoreHistogram ,
151188 generatedCounter : generatedCounter ,
152189 storedCounter : storedCounter ,
190+ markExpiredCounter : markExpiredCounter ,
191+ findStaleOCSPCounter : findStaleOCSPCounter ,
153192 stalenessHistogram : stalenessHistogram ,
154193 tickHistogram : tickHistogram ,
155194 tickWindow : config .OldOCSPWindow .Duration ,
@@ -167,7 +206,13 @@ func getQuestionsForShardList(count int) string {
167206 return strings .TrimRight (strings .Repeat ("?," , count ), "," )
168207}
169208
170- func (updater * OCSPUpdater ) findStaleOCSPResponses (oldestLastUpdatedTime time.Time , batchSize int ) ([]core.CertificateStatus , error ) {
209+ // findStaleOCSPResponses sends a goroutine to fetch rows of stale OCSP
210+ // responses from the database and returns results on a channel.
211+ func (updater * OCSPUpdater ) findStaleOCSPResponses (ctx context.Context , oldestLastUpdatedTime time.Time , batchSize int ) <- chan core.CertificateStatus {
212+ // staleStatusesOut channel contains all stale ocsp responses that need
213+ // updating.
214+ staleStatusesOut := make (chan core.CertificateStatus )
215+
171216 args := make ([]interface {}, 0 )
172217 args = append (args , oldestLastUpdatedTime )
173218
@@ -177,41 +222,63 @@ func (updater *OCSPUpdater) findStaleOCSPResponses(oldestLastUpdatedTime time.Ti
177222 }
178223 args = append (args , batchSize )
179224
180- rows , err := updater .readOnlyDb .Query (
181- fmt .Sprintf (
182- "SELECT %s FROM certificateStatus %s" ,
183- strings .Join (sa .CertStatusMetadataFields (), "," ),
184- updater .queryBody ,
185- ),
186- args ... ,
187- )
188- if err != nil {
189- return nil , err
190- }
225+ go func () {
226+ defer close (staleStatusesOut )
191227
192- var statuses []core.CertificateStatus
193- for rows .Next () {
194- var status core.CertificateStatus
195- err := sa .ScanCertStatusRow (rows , & status )
228+ rows , err := updater .readOnlyDb .Query (
229+ fmt .Sprintf (
230+ "SELECT %s FROM certificateStatus %s" ,
231+ strings .Join (sa .CertStatusMetadataFields (), "," ),
232+ updater .queryBody ,
233+ ),
234+ args ... ,
235+ )
236+
237+ // If error, log and increment retries for backoff. Else no
238+ // error, proceed to push statuses to channel.
196239 if err != nil {
197- rows .Close ()
198- return nil , err
240+ updater .log .AuditErrf ("Failed to find stale OCSP responses: %s" , err )
241+ updater .findStaleOCSPCounter .WithLabelValues ("failed" ).Inc ()
242+ updater .readFailures .Add (1 )
243+ return
199244 }
200- statuses = append (statuses , status )
201245
202- updater .stalenessHistogram .Observe (
203- oldestLastUpdatedTime .Sub (status .OCSPLastUpdated ).Seconds (),
204- )
205- }
206- // Ensure the query wasn't interrupted before it could complete.
207- err = rows .Close ()
208- if err != nil {
209- return nil , err
210- }
246+ for rows .Next () {
247+ var status core.CertificateStatus
248+ err := sa .ScanCertStatusRow (rows , & status )
249+ if err != nil {
250+ rows .Close ()
251+ updater .log .AuditErrf ("Failed to find stale OCSP responses: %s" , err )
252+ updater .findStaleOCSPCounter .WithLabelValues ("failed" ).Inc ()
253+ updater .readFailures .Add (1 )
254+ return
255+ }
256+ staleness := oldestLastUpdatedTime .Sub (status .OCSPLastUpdated ).Seconds ()
257+ updater .stalenessHistogram .Observe (staleness )
258+ select {
259+ case <- ctx .Done ():
260+ return
261+ case staleStatusesOut <- status :
262+ }
263+ }
264+ // Ensure the query wasn't interrupted before it could complete.
265+ err = rows .Close ()
266+ if err != nil {
267+ updater .log .AuditErrf ("Failed to find stale OCSP responses: %s" , err )
268+ updater .findStaleOCSPCounter .WithLabelValues ("failed" ).Inc ()
269+ updater .readFailures .Add (1 )
270+ return
271+ }
211272
212- return statuses , err
273+ updater .findStaleOCSPCounter .WithLabelValues ("success" ).Inc ()
274+ updater .readFailures .Reset ()
275+ }()
276+
277+ return staleStatusesOut
213278}
214279
280+ // generateResponse signs an new OCSP response for a given
281+ // `core.CertificateStatus` entry.
215282func (updater * OCSPUpdater ) generateResponse (ctx context.Context , status core.CertificateStatus ) (* core.CertificateStatus , error ) {
216283 if status .IssuerID == 0 {
217284 return nil , errors .New ("cert status has 0 IssuerID" )
@@ -235,6 +302,7 @@ func (updater *OCSPUpdater) generateResponse(ctx context.Context, status core.Ce
235302 return & status , nil
236303}
237304
305+ // storeResponse stores a given CertificateStatus in the database.
238306func (updater * OCSPUpdater ) storeResponse (status * core.CertificateStatus ) error {
239307 // Update the certificateStatus table with the new OCSP response, the status
240308 // WHERE is used make sure we don't overwrite a revoked response with a one
@@ -263,7 +331,39 @@ func (updater *OCSPUpdater) markExpired(status core.CertificateStatus) error {
263331 return err
264332}
265333
266- func (updater * OCSPUpdater ) generateOCSPResponses (ctx context.Context , statuses []core.CertificateStatus ) error {
334+ // processExpired is a pipeline step to process a channel of
335+ // `core.CertificateStatus` and set `isExpired` in the database.
336+ func (updater * OCSPUpdater ) processExpired (ctx context.Context , staleStatusesIn <- chan core.CertificateStatus ) <- chan core.CertificateStatus {
337+ tickStart := updater .clk .Now ()
338+ staleStatusesOut := make (chan core.CertificateStatus )
339+ go func () {
340+ defer close (staleStatusesOut )
341+ for status := range staleStatusesIn {
342+ if ! status .IsExpired && tickStart .After (status .NotAfter ) {
343+ err := updater .markExpired (status )
344+ if err != nil {
345+ // Update error counters and log
346+ updater .log .AuditErrf ("Failed to set certificate expired: %s" , err )
347+ updater .markExpiredCounter .WithLabelValues ("failed" ).Inc ()
348+ } else {
349+ updater .markExpiredCounter .WithLabelValues ("success" ).Inc ()
350+ }
351+ }
352+ select {
353+ case <- ctx .Done ():
354+ return
355+ case staleStatusesOut <- status :
356+ }
357+ }
358+ }()
359+
360+ return staleStatusesOut
361+ }
362+
363+ // generateOCSPResponses is the final stage of a pipeline. It takes a
364+ // channel of `core.CertificateStatus` and sends a goroutine for each to
365+ // obtain a new OCSP response and update the status in the database.
366+ func (updater * OCSPUpdater ) generateOCSPResponses (ctx context.Context , staleStatusesIn <- chan core.CertificateStatus ) {
267367 // Use the semaphore pattern from
268368 // https://github.com/golang/go/wiki/BoundingResourceUse to send a number of
269369 // GenerateOCSP / storeResponse requests in parallel, while limiting the total number of
@@ -278,15 +378,19 @@ func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses
278378 updater .genStoreHistogram .Observe (time .Since (start ).Seconds ())
279379 }
280380
381+ // Work runs as a goroutine per ocsp response to obtain a new ocsp
382+ // response and store it in the database.
281383 work := func (status core.CertificateStatus ) {
282384 defer done (updater .clk .Now ())
385+
283386 meta , err := updater .generateResponse (ctx , status )
284387 if err != nil {
285388 updater .log .AuditErrf ("Failed to generate OCSP response: %s" , err )
286389 updater .generatedCounter .WithLabelValues ("failed" ).Inc ()
287390 return
288391 }
289392 updater .generatedCounter .WithLabelValues ("success" ).Inc ()
393+
290394 err = updater .storeResponse (meta )
291395 if err != nil {
292396 updater .log .AuditErrf ("Failed to store OCSP response: %s" , err )
@@ -296,38 +400,18 @@ func (updater *OCSPUpdater) generateOCSPResponses(ctx context.Context, statuses
296400 updater .storedCounter .WithLabelValues ("success" ).Inc ()
297401 }
298402
299- for _ , status := range statuses {
403+ // Consume the stale statuses channel and send off a sign/store request
404+ // for each stale response.
405+ for status := range staleStatusesIn {
300406 wait ()
301407 go work (status )
302408 }
303- // Block until the channel reaches its full capacity again, indicating each
304- // goroutine has completed.
409+
410+ // Block until the sem channel reaches its full capacity again,
411+ // indicating each goroutine has completed.
305412 for i := 0 ; i < updater .parallelGenerateOCSPRequests ; i ++ {
306413 wait ()
307414 }
308- return nil
309- }
310-
311- // updateOCSPResponses looks for certificates with stale OCSP responses and
312- // generates/stores new ones
313- func (updater * OCSPUpdater ) updateOCSPResponses (ctx context.Context , batchSize int ) error {
314- tickStart := updater .clk .Now ()
315- statuses , err := updater .findStaleOCSPResponses (tickStart .Add (- updater .ocspMinTimeToExpiry ), batchSize )
316- if err != nil {
317- updater .log .AuditErrf ("Failed to find stale OCSP responses: %s" , err )
318- return err
319- }
320-
321- for _ , s := range statuses {
322- if ! s .IsExpired && tickStart .After (s .NotAfter ) {
323- err := updater .markExpired (s )
324- if err != nil {
325- return err
326- }
327- }
328- }
329-
330- return updater .generateOCSPResponses (ctx , statuses )
331415}
332416
333417type config struct {
@@ -362,25 +446,35 @@ type OCSPUpdaterConfig struct {
362446
363447func (updater * OCSPUpdater ) tick () {
364448 start := updater .clk .Now ()
365- err := updater .updateOCSPResponses (context .Background (), updater .batchSize )
449+
450+ ctx , cancel := context .WithCancel (context .Background ())
451+ defer cancel ()
452+
453+ oldestLastUpdatedTime := updater .clk .Now ().Add (- updater .ocspMinTimeToExpiry )
454+
455+ // Run pipeline
456+ updater .generateOCSPResponses (ctx , updater .processExpired (ctx , updater .findStaleOCSPResponses (ctx , oldestLastUpdatedTime , updater .batchSize )))
457+
366458 end := updater .clk .Now ()
367459 took := end .Sub (start )
368460 long , state := "false" , "success"
369461 if took > updater .tickWindow {
370462 long = "true"
371463 }
464+
465+ // Set sleep duration to the configured tickWindow.
372466 sleepDur := start .Add (updater .tickWindow ).Sub (end )
373- if err != nil {
374- state = "failed"
375- updater .tickFailures ++
467+
468+ // Set sleep duration higher to backoff starting the next tick and
469+ // reading from the database if the last read failed.
470+ readFails := updater .readFailures .Value ()
471+ if readFails > 0 {
376472 sleepDur = core .RetryBackoff (
377- updater . tickFailures ,
473+ readFails ,
378474 updater .tickWindow ,
379475 updater .maxBackoff ,
380476 updater .backoffFactor ,
381477 )
382- } else if updater .tickFailures > 0 {
383- updater .tickFailures = 0
384478 }
385479 updater .tickHistogram .WithLabelValues (state , long ).Observe (took .Seconds ())
386480 updater .clk .Sleep (sleepDur )
@@ -498,7 +592,6 @@ func main() {
498592 cmd .FailOnError (err , "Failed to create updater" )
499593
500594 go cmd .CatchSignals (logger , nil )
501-
502595 for {
503596 updater .tick ()
504597 }
0 commit comments