OpenDNSSEC-signer  2.0.2
worker.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2009 NLNet Labs. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
14  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
17  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
19  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
21  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
22  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
23  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
27 #include <time.h> /* time() */
28 
29 #include "daemon/engine.h"
30 #include "daemon/worker.h"
31 #include "duration.h"
32 #include "hsm.h"
33 #include "locks.h"
34 #include "log.h"
35 #include "status.h"
36 #include "signer/tools.h"
37 #include "signer/zone.h"
38 
39 ods_lookup_table worker_str[] = {
40  { WORKER_WORKER, "worker" },
41  { WORKER_DRUDGER, "drudger" },
42  { 0, NULL }
43 };
44 
49 static const char*
50 worker2str(worker_id type)
51 {
52  ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
53  if (lt) {
54  return lt->name;
55  }
56  return NULL;
57 }
58 
59 
65 worker_create(int num, worker_id type)
66 {
67  worker_type* worker;
68  CHECKALLOC(worker = (worker_type*) malloc(sizeof(worker_type)));
69  ods_log_debug("[%s[%i]] create", worker2str(type), num+1);
70  lock_basic_init(&worker->worker_lock);
71  lock_basic_set(&worker->worker_alarm);
72  lock_basic_lock(&worker->worker_lock);
73  worker->thread_num = num +1;
74  worker->engine = NULL;
75  worker->task = NULL;
76  worker->working_with = TASK_NONE;
77  worker->need_to_exit = 0;
78  worker->type = type;
79  worker->clock_in = 0;
80  worker->jobs_appointed = 0;
81  worker->jobs_completed = 0;
82  worker->jobs_failed = 0;
83  worker->sleeping = 0;
84  worker->waiting = 0;
85  lock_basic_unlock(&worker->worker_lock);
86  return worker;
87 }
88 
89 
94 static void
95 worker_working_with(worker_type* worker, task_id with, task_id next,
96  const char* str, const char* name, task_id* what, time_t* when)
97 {
98  worker->working_with = with;
99  ods_log_verbose("[%s[%i]] %s zone %s", worker2str(worker->type),
100  worker->thread_num, str, name);
101  *what = next;
102  *when = time_now();
103 }
104 
105 
110 static int
111 worker_fulfilled(worker_type* worker)
112 {
113  int ret = 0;
114  ret = (worker->jobs_completed + worker->jobs_failed) ==
115  worker->jobs_appointed;
116  return ret;
117 }
118 
119 
124 static void
125 worker_clear_jobs(worker_type* worker)
126 {
127  ods_log_assert(worker);
128  lock_basic_lock(&worker->worker_lock);
129  worker->jobs_appointed = 0;
130  worker->jobs_completed = 0;
131  worker->jobs_failed = 0;
132  lock_basic_unlock(&worker->worker_lock);
133 }
134 
135 
140 static void
141 worker_queue_rrset(worker_type* worker, fifoq_type* q, rrset_type* rrset)
142 {
143  ods_status status = ODS_STATUS_UNCHANGED;
144  int tries = 0;
145  ods_log_assert(worker);
146  ods_log_assert(worker->task);
147  ods_log_assert(q);
148  ods_log_assert(rrset);
149 
150  lock_basic_lock(&q->q_lock);
151  status = fifoq_push(q, (void*) rrset, worker, &tries);
152  while (status == ODS_STATUS_UNCHANGED) {
153  tries++;
154  if (worker->need_to_exit) {
155  lock_basic_unlock(&q->q_lock);
156  return;
157  }
164  lock_basic_sleep(&q->q_nonfull, &q->q_lock, 5);
165  status = fifoq_push(q, (void*) rrset, worker, &tries);
166  }
167  lock_basic_unlock(&q->q_lock);
168 
169  ods_log_assert(status == ODS_STATUS_OK);
170  lock_basic_lock(&worker->worker_lock);
171  worker->jobs_appointed += 1;
172  lock_basic_unlock(&worker->worker_lock);
173 }
174 
175 
180 static void
181 worker_queue_domain(worker_type* worker, fifoq_type* q, domain_type* domain)
182 {
183  rrset_type* rrset = NULL;
184  denial_type* denial = NULL;
185  ods_log_assert(worker);
186  ods_log_assert(q);
187  ods_log_assert(domain);
188  rrset = domain->rrsets;
189  while (rrset) {
190  worker_queue_rrset(worker, q, rrset);
191  rrset = rrset->next;
192 }
193  denial = (denial_type*) domain->denial;
194  if (denial && denial->rrset) {
195  worker_queue_rrset(worker, q, denial->rrset);
196  }
197 }
198 
199 
204 static void
205 worker_queue_zone(worker_type* worker, fifoq_type* q, zone_type* zone)
206 {
207  ldns_rbnode_t* node = LDNS_RBTREE_NULL;
208  domain_type* domain = NULL;
209  ods_log_assert(worker);
210  ods_log_assert(q);
211  ods_log_assert(zone);
212  worker_clear_jobs(worker);
213  if (!zone->db || !zone->db->domains) {
214  return;
215  }
216  if (zone->db->domains->root != LDNS_RBTREE_NULL) {
217  node = ldns_rbtree_first(zone->db->domains);
218  }
219  while (node && node != LDNS_RBTREE_NULL) {
220  domain = (domain_type*) node->data;
221  worker_queue_domain(worker, q, domain);
222  node = ldns_rbtree_next(node);
223  }
224 }
225 
226 
231 static ods_status
232 worker_check_jobs(worker_type* worker, task_type* task)
233 {
234  ods_log_assert(worker);
235  ods_log_assert(task);
236  lock_basic_lock(&worker->worker_lock);
237  if (worker->jobs_failed) {
238  ods_log_error("[%s[%i]] sign zone %s failed: %lu RRsets failed",
239  worker2str(worker->type), worker->thread_num,
240  task_who2str(task), (unsigned long)worker->jobs_failed);
241  lock_basic_unlock(&worker->worker_lock);
242  return ODS_STATUS_ERR;
243  } else if (worker->jobs_completed != worker->jobs_appointed) {
244  ods_log_error("[%s[%i]] sign zone %s failed: processed %lu of %lu "
245  "RRsets", worker2str(worker->type), worker->thread_num,
246  task_who2str(task), (unsigned long)worker->jobs_completed,
247  (unsigned long)worker->jobs_appointed);
248  lock_basic_unlock(&worker->worker_lock);
249  return ODS_STATUS_ERR;
250  } else if (worker->need_to_exit) {
251  ods_log_debug("[%s[%i]] sign zone %s failed: worker needs to exit",
252  worker2str(worker->type), worker->thread_num, task_who2str(task));
253  lock_basic_unlock(&worker->worker_lock);
254  return ODS_STATUS_ERR;
255  } else {
256  ods_log_debug("[%s[%i]] sign zone %s ok: %lu of %lu RRsets "
257  "succeeded", worker2str(worker->type), worker->thread_num,
258  task_who2str(task), (unsigned long)worker->jobs_completed,
259  (unsigned long)worker->jobs_appointed);
260  ods_log_assert(worker->jobs_appointed == worker->jobs_completed);
261  }
262  lock_basic_unlock(&worker->worker_lock);
263  return ODS_STATUS_OK;
264 }
265 
266 
271 static void
272 worker_perform_task(worker_type* worker)
273 {
274  engine_type* engine = NULL;
275  zone_type* zone = NULL;
276  task_type* task = NULL;
277  task_id what = TASK_NONE;
278  time_t when = 0;
279  time_t never = (3600*24*365);
280  ods_status status = ODS_STATUS_OK;
281  int backup = 0;
282  time_t start = 0;
283  time_t end = 0;
284 
285  if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
286  return;
287  }
288  engine = worker->engine;
289  task = (task_type*) worker->task;
290  zone = (zone_type*) worker->task->zone;
291  ods_log_debug("[%s[%i]] perform task %s for zone %s",
292  worker2str(worker->type), worker->thread_num, task_what2str(task->what),
293  task_who2str(task));
294  /* do what you have been told to do */
295  switch (task->what) {
296  case TASK_SIGNCONF:
297  /* perform 'load signconf' task */
298  worker_working_with(worker, TASK_SIGNCONF, TASK_READ,
299  "configure", task_who2str(task), &what, &when);
300  status = tools_signconf(zone);
301  if (status == ODS_STATUS_UNCHANGED) {
302  if (!zone->signconf->last_modified) {
303  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
304  worker2str(worker->type), worker->thread_num,
305  task_who2str(task));
306  status = ODS_STATUS_ERR;
307  }
308  }
309  if (status == ODS_STATUS_UNCHANGED) {
310  if (task->halted != TASK_NONE && task->halted != TASK_SIGNCONF) {
311  goto task_perform_continue;
312  }
313  status = ODS_STATUS_OK;
314  } else if (status == ODS_STATUS_OK) {
315  task->interrupt = TASK_NONE;
316  task->halted = TASK_NONE;
317  } else {
318  if (task->halted == TASK_NONE) {
319  goto task_perform_fail;
320  }
321  goto task_perform_continue;
322  }
323  /* break; */
324  case TASK_READ:
325  /* perform 'read input adapter' task */
326  worker_working_with(worker, TASK_READ, TASK_SIGN,
327  "read", task_who2str(task), &what, &when);
328  task->what = TASK_READ;
329  if (!zone->signconf->last_modified) {
330  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
331  worker2str(worker->type), worker->thread_num,
332  task_who2str(task));
333  status = ODS_STATUS_ERR;
334  } else {
335  if (hsm_check_context()) {
336  ods_log_error("signer instructed to reload due to hsm reset in read task");
337  engine->need_to_reload = 1;
338  status = ODS_STATUS_ERR;
339  } else {
340  status = tools_input(zone);
341  }
342  }
343 
344  if (status == ODS_STATUS_UNCHANGED) {
345  ods_log_verbose("[%s[%i]] zone %s unsigned data not changed, "
346  "continue", worker2str(worker->type), worker->thread_num,
347  task_who2str(task));
348  status = ODS_STATUS_OK;
349  }
350  if (status == ODS_STATUS_OK) {
351  if (task->interrupt > TASK_SIGNCONF) {
352  task->interrupt = TASK_NONE;
353  task->halted = TASK_NONE;
354  }
355  } else {
356  if (task->halted == TASK_NONE) {
357  goto task_perform_fail;
358  }
359  goto task_perform_continue;
360  }
361  /* break; */
362  case TASK_SIGN:
363  /* perform 'sign' task */
364  worker_working_with(worker, TASK_SIGN, TASK_WRITE,
365  "sign", task_who2str(task), &what, &when);
366  task->what = TASK_SIGN;
367  status = zone_update_serial(zone);
368  if (status == ODS_STATUS_OK) {
369  if (task->interrupt > TASK_SIGNCONF) {
370  task->interrupt = TASK_NONE;
371  task->halted = TASK_NONE;
372  }
373  } else {
374  ods_log_error("[%s[%i]] unable to sign zone %s: "
375  "failed to increment serial",
376  worker2str(worker->type), worker->thread_num,
377  task_who2str(task));
378  if (task->halted == TASK_NONE) {
379  goto task_perform_fail;
380  }
381  goto task_perform_continue;
382  }
383 
384  /* start timer */
385  start = time(NULL);
386  if (zone->stats) {
387  lock_basic_lock(&zone->stats->stats_lock);
388  if (!zone->stats->start_time) {
389  zone->stats->start_time = start;
390  }
391  zone->stats->sig_count = 0;
392  zone->stats->sig_soa_count = 0;
393  zone->stats->sig_reuse = 0;
394  zone->stats->sig_time = 0;
395  lock_basic_unlock(&zone->stats->stats_lock);
396  }
397  /* check the HSM connection before queuing sign operations */
398  if (hsm_check_context()) {
399  ods_log_error("signer instructed to reload due to hsm reset in sign task");
400  engine->need_to_reload = 1;
401  goto task_perform_fail;
402  }
403  /* prepare keys */
404  status = zone_prepare_keys(zone);
405  if (status == ODS_STATUS_OK) {
406  /* queue menial, hard signing work */
407  worker_queue_zone(worker, engine->signq, zone);
408  ods_log_deeebug("[%s[%i]] wait until drudgers are finished "
409  "signing zone %s", worker2str(worker->type),
410  worker->thread_num, task_who2str(task));
411  /* sleep until work is done */
412  worker_sleep_unless(worker, 0);
413  }
414  /* stop timer */
415  end = time(NULL);
416  /* check status and jobs */
417  if (status == ODS_STATUS_OK) {
418  status = worker_check_jobs(worker, task);
419  }
420  worker_clear_jobs(worker);
421  if (status == ODS_STATUS_OK && zone->stats) {
422  lock_basic_lock(&zone->stats->stats_lock);
423  zone->stats->sig_time = (end-start);
424  lock_basic_unlock(&zone->stats->stats_lock);
425  }
426  if (status != ODS_STATUS_OK) {
427  if (task->halted == TASK_NONE) {
428  goto task_perform_fail;
429  }
430  goto task_perform_continue;
431  } else {
432  if (task->interrupt > TASK_SIGNCONF) {
433  task->interrupt = TASK_NONE;
434  task->halted = TASK_NONE;
435  }
436  }
437  /* break; */
438  case TASK_WRITE:
439  /* perform 'write to output adapter' task */
440  worker_working_with(worker, TASK_WRITE, TASK_SIGN,
441  "write", task_who2str(task), &what, &when);
442  task->what = TASK_WRITE;
443  status = tools_output(zone, engine);
444  if (status == ODS_STATUS_OK) {
445  if (task->interrupt > TASK_SIGNCONF) {
446  task->interrupt = TASK_NONE;
447  task->halted = TASK_NONE;
448  }
449  } else {
450  /* clear signatures? */
451  if (task->halted == TASK_NONE) {
452  goto task_perform_fail;
453  }
454  goto task_perform_continue;
455  }
456  zone->db->is_processed = 1;
457  if (zone->signconf &&
458  duration2time(zone->signconf->sig_resign_interval)) {
459  what = TASK_SIGN;
460  when = worker->clock_in +
461  duration2time(zone->signconf->sig_resign_interval);
462  } else {
463  ods_log_error("[%s[%i]] unable to retrieve resign interval "
464  "for zone %s: duration2time() failed",
465  worker2str(worker->type), worker->thread_num,
466  task_who2str(task));
467  ods_log_info("[%s[%i]] defaulting to 1H resign interval for "
468  "zone %s", worker2str(worker->type), worker->thread_num,
469  task_who2str(task));
470  what = TASK_SIGN;
471  when = worker->clock_in + 3600;
472  }
473  backup = 1;
474  break;
475  case TASK_NONE:
476  worker->working_with = TASK_NONE;
477  /* no task */
478  ods_log_warning("[%s[%i]] none task for zone %s",
479  worker2str(worker->type), worker->thread_num,
480  task_who2str(task));
481  when = time_now() + never;
482  break;
483  default:
484  worker->working_with = TASK_NONE;
485  /* unknown task */
486  ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
487  worker2str(worker->type), worker->thread_num,
488  task_who2str(task));
489  what = TASK_SIGNCONF;
490  when = time_now();
491  break;
492  }
493  /* no error */
494  task->backoff = 0;
495  if (task->interrupt != TASK_NONE && task->interrupt != what) {
496  ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
497  worker2str(worker->type), worker->thread_num,
498  task_what2str(what), task_who2str(task));
499  task->halted = what;
500  task->halted_when = when;
501  task->what = task->interrupt;
502  task->when = time_now();
503  } else {
504  ods_log_debug("[%s[%i]] next task %s for zone %s",
505  worker2str(worker->type), worker->thread_num,
506  task_what2str(what), task_who2str(task));
507  task->what = what;
508  task->when = when;
509  task->interrupt = TASK_NONE;
510  task->halted = TASK_NONE;
511  task->halted_when = 0;
512  }
513  /* backup the last successful run */
514  if (backup) {
515  status = zone_backup2(zone);
516  if (status != ODS_STATUS_OK) {
517  ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
518  worker2str(worker->type), worker->thread_num,
519  task_who2str(task), ods_status2str(status));
520  /* just a warning */
521  status = ODS_STATUS_OK;
522  }
523  backup = 0;
524  }
525  return;
526 
527 task_perform_fail:
528  if (status != ODS_STATUS_XFR_NOT_READY) {
529  /* other statuses is critical, and we know it is not ODS_STATUS_OK */
530  ods_log_crit("[%s[%i]] CRITICAL: failed to sign zone %s: %s",
531  worker2str(worker->type), worker->thread_num,
532  task_who2str(task), ods_status2str(status));
533  }
534  /* in case of failure, also mark zone processed (for single run usage) */
535  zone->db->is_processed = 1;
536  if (task->backoff) {
537  task->backoff *= 2;
538  } else {
539  task->backoff = 60;
540  }
541  if (task->backoff > ODS_SE_MAX_BACKOFF) {
542  task->backoff = ODS_SE_MAX_BACKOFF;
543  }
544  ods_log_info("[%s[%i]] backoff task %s for zone %s with %lu seconds",
545  worker2str(worker->type), worker->thread_num,
546  task_what2str(task->what), task_who2str(task), (long)task->backoff);
547  task->when = time_now() + task->backoff;
548  return;
549 
550 task_perform_continue:
551  ods_log_info("[%s[%i]] continue task %s for zone %s",
552  worker2str(worker->type), worker->thread_num,
553  task_what2str(task->halted), task_who2str(task));
554  task->what = task->halted;
555  task->when = task->halted_when;
556  task->interrupt = TASK_NONE;
557  task->halted = TASK_NONE;
558  task->halted_when = 0;
559  return;
560 }
561 
562 
567 static void
568 worker_work(worker_type* worker)
569 {
570  time_t now = 0;
571  time_t timeout = 1;
572  engine_type* engine = NULL;
573  zone_type* zone = NULL;
574  ods_status status = ODS_STATUS_OK;
575 
576  ods_log_assert(worker);
577  ods_log_assert(worker->type == WORKER_WORKER);
578 
579  engine = worker->engine;
580  while (worker->need_to_exit == 0) {
581  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
582  worker->thread_num);
583  now = time_now();
584  lock_basic_lock(&engine->taskq->schedule_lock);
585  worker->task = schedule_pop_task(engine->taskq);
586  if (worker->task) {
587  worker->working_with = worker->task->what;
588  lock_basic_unlock(&engine->taskq->schedule_lock);
589  zone = (zone_type*) worker->task->zone;
590 
591  lock_basic_lock(&zone->zone_lock);
592  ods_log_debug("[%s[%i]] start working on zone %s",
593  worker2str(worker->type), worker->thread_num, zone->name);
594  worker->clock_in = time_now();
595  worker_perform_task(worker);
596  zone->task = worker->task;
597  ods_log_debug("[%s[%i]] finished working on zone %s",
598  worker2str(worker->type), worker->thread_num, zone->name);
599 
600  lock_basic_lock(&engine->taskq->schedule_lock);
601  worker->task = NULL;
602  worker->working_with = TASK_NONE;
603  status = schedule_task(engine->taskq, zone->task, 1);
604  if (status != ODS_STATUS_OK) {
605  ods_log_error("[%s[%i]] unable to schedule task for zone %s: "
606  "%s", worker2str(worker->type), worker->thread_num,
607  zone->name, ods_status2str(status));
608  }
609  lock_basic_unlock(&engine->taskq->schedule_lock);
610  lock_basic_unlock(&zone->zone_lock);
611  timeout = 1;
613  lock_basic_lock(&engine->signal_lock);
614  if (engine->need_to_reload) {
615  lock_basic_alarm(&engine->signal_cond);
616  }
617  lock_basic_unlock(&engine->signal_lock);
618 
619  } else {
620  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
621  worker->thread_num);
622  worker->task = schedule_get_first_task(engine->taskq);
623  lock_basic_unlock(&engine->taskq->schedule_lock);
624  if (worker->task && !engine->taskq->loading) {
625  timeout = (worker->task->when - now);
626  } else {
627  timeout *= 2;
628  }
629  if (timeout > ODS_SE_MAX_BACKOFF) {
630  timeout = ODS_SE_MAX_BACKOFF;
631  }
632  worker->task = NULL;
633  worker_sleep(worker, timeout);
634  }
635  }
636 }
637 
638 
643 static void
644 worker_drudge(worker_type* worker)
645 {
646  engine_type* engine = NULL;
647  zone_type* zone = NULL;
648  task_type* task = NULL;
649  rrset_type* rrset = NULL;
650  ods_status status = ODS_STATUS_OK;
651  worker_type* superior = NULL;
652  hsm_ctx_t* ctx = NULL;
653 
654  ods_log_assert(worker);
655  ods_log_assert(worker->engine);
656  ods_log_assert(worker->type == WORKER_DRUDGER);
657 
658  engine = worker->engine;
659  while (worker->need_to_exit == 0) {
660  ods_log_deeebug("[%s[%i]] report for duty", worker2str(worker->type),
661  worker->thread_num);
662  /* initialize */
663  superior = NULL;
664  zone = NULL;
665  task = NULL;
666  /* get item */
667  lock_basic_lock(&engine->signq->q_lock);
668  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
669  if (!rrset) {
670  ods_log_deeebug("[%s[%i]] nothing to do, wait",
671  worker2str(worker->type), worker->thread_num);
678  lock_basic_sleep(&engine->signq->q_threshold,
679  &engine->signq->q_lock, 0);
680  if(worker->need_to_exit == 0)
681  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
682  }
683  lock_basic_unlock(&engine->signq->q_lock);
684  /* do some work */
685  if (rrset) {
686  ods_log_assert(superior);
687  if (!ctx) {
688  ods_log_debug("[%s[%i]] create hsm context",
689  worker2str(worker->type), worker->thread_num);
690  ctx = hsm_create_context();
691  }
692  if (!ctx) {
693  ods_log_crit("[%s[%i]] error creating libhsm context",
694  worker2str(worker->type), worker->thread_num);
695  engine->need_to_reload = 1;
696  ods_log_error("signer instructed to reload due to hsm reset while signing");
697  lock_basic_lock(&superior->worker_lock);
698  superior->jobs_failed++;
699  lock_basic_unlock(&superior->worker_lock);
700  } else {
701  ods_log_assert(ctx);
702  lock_basic_lock(&superior->worker_lock);
703  task = superior->task;
704  ods_log_assert(task);
705  zone = task->zone;
706  lock_basic_unlock(&superior->worker_lock);
707  ods_log_assert(zone);
708  ods_log_assert(zone->apex);
709  ods_log_assert(zone->signconf);
710  worker->clock_in = time_now();
711  status = rrset_sign(ctx, rrset, superior->clock_in);
712  lock_basic_lock(&superior->worker_lock);
713  if (status == ODS_STATUS_OK) {
714  superior->jobs_completed++;
715  } else {
716  superior->jobs_failed++;
717  }
718  lock_basic_unlock(&superior->worker_lock);
719  }
720  if (worker_fulfilled(superior) && superior->sleeping) {
721  ods_log_deeebug("[%s[%i]] wake up superior[%u], work is "
722  "done", worker2str(worker->type), worker->thread_num,
723  superior->thread_num);
724  worker_wakeup(superior);
725  }
726  superior = NULL;
727  rrset = NULL;
728  }
729  /* done work */
730  }
731  /* cleanup open HSM sessions */
732  if (ctx) {
733  hsm_destroy_context(ctx);
734  }
735 }
736 
737 
742 void
744 {
745  ods_log_assert(worker);
746  switch (worker->type) {
747  case WORKER_DRUDGER:
748  worker_drudge(worker);
749  break;
750  case WORKER_WORKER:
751  worker_work(worker);
752  break;
753  default:
754  ods_log_error("[worker] illegal worker (id=%i)", worker->type);
755  break;
756  }
757 }
758 
759 
764 void
765 worker_sleep(worker_type* worker, time_t timeout)
766 {
767  ods_log_assert(worker);
768  if (!worker->need_to_exit) {
769  lock_basic_lock(&worker->worker_lock);
770  worker->sleeping = 1;
771  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
772  timeout);
773  lock_basic_unlock(&worker->worker_lock);
774  }
775 }
776 
777 
782 void
783 worker_sleep_unless(worker_type* worker, time_t timeout)
784 {
785  ods_log_assert(worker);
786  lock_basic_lock(&worker->worker_lock);
787  while (!worker->need_to_exit && !worker_fulfilled(worker)) {
788  worker->sleeping = 1;
789  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
790  timeout);
791  ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %lu "
792  "appointed, %lu completed, %lu failed", worker2str(worker->type),
793  worker->thread_num, (long)worker->jobs_appointed, (long)worker->jobs_completed,
794  (long)worker->jobs_failed);
795  }
796  lock_basic_unlock(&worker->worker_lock);
797 }
798 
799 
804 void
806 {
807  ods_log_assert(worker);
808  if (worker && worker->sleeping && !worker->waiting) {
809  ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
810  worker->thread_num);
811  lock_basic_lock(&worker->worker_lock);
812  lock_basic_alarm(&worker->worker_alarm);
813  worker->sleeping = 0;
814  lock_basic_unlock(&worker->worker_lock);
815  }
816 }
817 
818 
823 void
824 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
825 {
826  lock_basic_lock(lock);
827  lock_basic_broadcast(condition);
828  lock_basic_unlock(lock);
829 }
830 
831 
836 void
838 {
839  if (!worker) {
840  return;
841  }
842  lock_basic_destroy(&worker->worker_lock);
843  lock_basic_off(&worker->worker_alarm);
844  free(worker);
845 }
Definition: task.h:39
task_type * task
Definition: zone.h:83
rrset_type * rrset
Definition: denial.h:55
size_t jobs_completed
Definition: worker.h:60
unsigned waiting
Definition: worker.h:65
unsigned need_to_exit
Definition: worker.h:66
task_type * schedule_get_first_task(schedule_type *schedule)
Definition: schedule.c:217
worker_type * worker_create(int num, worker_id type)
Definition: worker.c:65
time_t when
Definition: task.h:60
size_t jobs_appointed
Definition: worker.h:59
lock_basic_type worker_lock
Definition: worker.h:63
cond_basic_type signal_cond
Definition: engine.h:78
cond_basic_type q_threshold
Definition: fifoq.h:65
task_id interrupt
Definition: task.h:58
ods_status tools_signconf(zone_type *zone)
Definition: tools.c:52
lock_basic_type q_lock
Definition: fifoq.h:64
time_t sig_time
Definition: stats.h:59
ods_status schedule_task(schedule_type *schedule, task_type *task, int log)
Definition: schedule.c:140
const char * task_who2str(task_type *task)
Definition: task.c:137
void worker_start(worker_type *worker)
Definition: worker.c:743
lock_basic_type zone_lock
Definition: zone.h:86
time_t backoff
Definition: task.h:62
lock_basic_type stats_lock
Definition: stats.h:63
ldns_rbtree_t * domains
Definition: namedb.h:51
Definition: task.h:43
rrset_type * next
Definition: rrset.h:60
void worker_cleanup(worker_type *worker)
Definition: worker.c:837
enum task_id_enum task_id
Definition: task.h:46
ods_status fifoq_push(fifoq_type *q, void *item, worker_type *worker, int *tries)
Definition: fifoq.c:114
denial_type * denial
Definition: domain.h:52
zone_type * zone
Definition: task.h:64
lock_basic_type signal_lock
Definition: engine.h:79
size_t jobs_failed
Definition: worker.h:61
ods_status tools_input(zone_type *zone)
Definition: tools.c:93
task_type * task
Definition: worker.h:55
namedb_type * db
Definition: zone.h:77
Definition: task.h:41
void worker_sleep(worker_type *worker, time_t timeout)
Definition: worker.c:765
time_t halted_when
Definition: task.h:61
time_t clock_in
Definition: worker.h:58
task_type * schedule_pop_task(schedule_type *schedule)
Definition: schedule.c:257
unsigned is_processed
Definition: namedb.h:58
ods_status tools_output(zone_type *zone, engine_type *engine)
Definition: tools.c:181
void worker_notify_all(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:824
signconf_type * signconf
Definition: zone.h:75
ods_status zone_backup2(zone_type *zone)
Definition: zone.c:1092
time_t start_time
Definition: stats.h:61
ods_status zone_update_serial(zone_type *zone)
Definition: zone.c:481
task_id halted
Definition: task.h:59
void worker_wakeup(worker_type *worker)
Definition: worker.c:805
uint32_t sig_reuse
Definition: stats.h:58
enum worker_enum worker_id
Definition: worker.h:43
void worker_sleep_unless(worker_type *worker, time_t timeout)
Definition: worker.c:783
time_t last_modified
Definition: signconf.h:72
task_id what
Definition: task.h:57
int thread_num
Definition: worker.h:52
const char * name
Definition: zone.h:67
schedule_type * taskq
Definition: engine.h:61
uint32_t sig_soa_count
Definition: stats.h:57
ods_status zone_prepare_keys(zone_type *zone)
Definition: zone.c:439
duration_type * sig_resign_interval
Definition: signconf.h:46
cond_basic_type worker_alarm
Definition: worker.h:62
ods_status rrset_sign(hsm_ctx_t *ctx, rrset_type *rrset, time_t signtime)
Definition: rrset.c:629
uint32_t sig_count
Definition: stats.h:56
unsigned sleeping
Definition: worker.h:64
task_id working_with
Definition: worker.h:56
lock_basic_type schedule_lock
Definition: schedule.h:62
worker_id type
Definition: worker.h:57
rrset_type * rrsets
Definition: domain.h:57
engine_type * engine
Definition: worker.h:54
fifoq_type * signq
Definition: engine.h:62
cond_basic_type q_nonfull
Definition: fifoq.h:66
int need_to_reload
Definition: engine.h:75
ldns_rdf * apex
Definition: zone.h:59
const char * task_what2str(task_id what)
Definition: task.c:107
ods_lookup_table worker_str[]
Definition: worker.c:39
stats_type * stats
Definition: zone.h:85
void * fifoq_pop(fifoq_type *q, worker_type **worker)
Definition: fifoq.c:84