2 * Quagga Work Queue Support.
4 * Copyright (C) 2005 Sun Microsystems, Inc.
6 * This file is part of GNU Zebra.
8 * Quagga is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2, or (at your option) any
13 * Quagga is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Quagga; see the file COPYING. If not, write to the Free
20 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
27 #include "workqueue.h"
32 /* master list of work_queues */
33 static struct list _work_queues;
34 /* pointer primarily to avoid an otherwise harmless warning on
35 * ALL_LIST_ELEMENTS_RO
37 static struct list *work_queues = &_work_queues;
39 #define WORK_QUEUE_MIN_GRANULARITY 1
41 static struct work_queue_item *
42 work_queue_item_new (struct work_queue *wq)
44 struct work_queue_item *item;
47 item = XCALLOC (MTYPE_WORK_QUEUE_ITEM,
48 sizeof (struct work_queue_item));
54 work_queue_item_free (struct work_queue_item *item)
56 XFREE (MTYPE_WORK_QUEUE_ITEM, item);
60 /* create new work queue */
62 work_queue_new (struct thread_master *m, const char *queue_name)
64 struct work_queue *new;
66 new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
71 new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
73 SET_FLAG (new->flags, WQ_UNPLUGGED);
75 if ( (new->items = list_new ()) == NULL)
77 XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
78 XFREE (MTYPE_WORK_QUEUE, new);
83 new->items->del = (void (*)(void *)) work_queue_item_free;
85 listnode_add (work_queues, new);
87 new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
88 new->cycles.worst = UINT_MAX;
90 /* Default values, can be overriden by caller */
91 new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
97 work_queue_free (struct work_queue *wq)
99 if (wq->thread != NULL)
100 thread_cancel(wq->thread);
102 /* list_delete frees items via callback */
103 list_delete (wq->items);
104 listnode_delete (work_queues, wq);
106 XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
107 XFREE (MTYPE_WORK_QUEUE, wq);
112 work_queue_is_scheduled (struct work_queue *wq)
114 return (wq->thread != NULL);
118 work_queue_schedule (struct work_queue *wq, unsigned int delay)
120 /* if appropriate, schedule work queue thread */
121 if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
122 && (wq->thread == NULL)
123 && (listcount (wq->items) > 0) )
125 wq->thread = thread_add_background (wq->master, work_queue_run,
134 work_queue_add (struct work_queue *wq, void *data)
136 struct work_queue_item *item;
140 if (!(item = work_queue_item_new (wq)))
142 zlog_err ("%s: unable to get new queue item", __func__);
147 listnode_add (wq->items, item);
149 work_queue_schedule (wq, wq->spec.hold);
155 work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
157 struct work_queue_item *item = listgetdata (ln);
159 assert (item && item->data);
161 /* call private data deletion callback if needed */
162 if (wq->spec.del_item_data)
163 wq->spec.del_item_data (wq, item->data);
165 list_delete_node (wq->items, ln);
166 work_queue_item_free (item);
172 work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
174 LISTNODE_DETACH (wq->items, ln);
175 LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
178 DEFUN(show_work_queues,
179 show_work_queues_cmd,
182 "Work Queue information\n")
184 struct listnode *node;
185 struct work_queue *wq;
188 "%c %8s %5s %8s %21s %6s %5s%s",
189 ' ', "List","(ms) ","Q. Runs","Cycle Counts ",
193 "%c %8s %5s %8s %7s %6s %6s %6s %5s %s%s",
198 "Best","Worst","Gran.","Avg.", "Lat.",
202 for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq))
204 vty_out (vty,"%c %8u %5u %8lu %7u %6u %6u %6u %5lu %s%s",
205 (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
206 listcount (wq->items),
210 MIN(wq->cycles.best, wq->cycles.worst),
211 wq->cycles.granularity,
213 (unsigned int) (wq->cycles.total / wq->runs) : 0,
222 /* 'plug' a queue: Stop it from being scheduled,
223 * ie: prevent the queue from draining.
226 work_queue_plug (struct work_queue *wq)
229 thread_cancel (wq->thread);
233 UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
236 /* unplug queue, schedule it again, if appropriate
237 * Ie: Allow the queue to be drained again
240 work_queue_unplug (struct work_queue *wq)
242 SET_FLAG (wq->flags, WQ_UNPLUGGED);
244 /* if thread isnt already waiting, add one */
245 work_queue_schedule (wq, wq->spec.hold);
248 /* timer thread to process a work queue
249 * will reschedule itself if required,
250 * otherwise work_queue_item_add
253 work_queue_run (struct thread *thread)
255 struct work_queue *wq;
256 struct work_queue_item *item;
259 unsigned int cycles = 0;
260 struct listnode *node, *nnode;
263 wq = THREAD_ARG (thread);
266 assert (wq && wq->items);
268 /* calculate cycle granularity:
269 * list iteration == 1 cycle
270 * granularity == # cycles between checks whether we should yield.
272 * granularity should be > 0, and can increase slowly after each run to
273 * provide some hysteris, but not past cycles.best or 2*cycles.
275 * Best: starts low, can only increase
277 * Worst: starts at MAX, can only decrease.
279 * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased
280 * if we run to end of time slot, can increase otherwise
283 * We could use just the average and save some work, however we want to be
284 * able to adjust quickly to CPU pressure. Average wont shift much if
285 * daemon has been running a long time.
287 if (wq->cycles.granularity == 0)
288 wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
290 for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
292 assert (item && item->data);
294 /* dont run items which are past their allowed retries */
295 if (item->ran > wq->spec.max_retries)
297 /* run error handler, if any */
298 if (wq->spec.errorfunc)
299 wq->spec.errorfunc (wq, item->data);
300 work_queue_item_remove (wq, node);
304 /* run and take care of items that want to be retried immediately */
307 ret = wq->spec.workfunc (wq, item->data);
310 while ((ret == WQ_RETRY_NOW)
311 && (item->ran < wq->spec.max_retries));
315 case WQ_QUEUE_BLOCKED:
317 /* decrement item->ran again, cause this isn't an item
318 * specific error, and fall through to WQ_RETRY_LATER
329 work_queue_item_requeue (wq, node);
333 /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
336 if (wq->spec.errorfunc)
337 wq->spec.errorfunc (wq, item);
339 /* fall through here is deliberate */
343 work_queue_item_remove (wq, node);
348 /* completed cycle */
351 /* test if we should yield */
352 if ( !(cycles % wq->cycles.granularity)
353 && (took = thread_should_yield (thread)))
362 #define WQ_HYSTERESIS_FACTOR 4
364 if (cycles > wq->cycles.best)
365 wq->cycles.best = cycles;
367 if (took > wq->worst_usec)
368 wq->worst_usec = took;
370 /* we yielded, check whether granularity should be reduced */
371 if (yielded && (cycles < wq->cycles.granularity))
373 wq->cycles.granularity = ((cycles > 0) ? cycles
374 : WORK_QUEUE_MIN_GRANULARITY);
375 if (cycles < wq->cycles.worst)
376 wq->cycles.worst = cycles;
378 /* otherwise, should granularity increase? */
379 else if (cycles >= (wq->cycles.granularity))
381 /* along with yielded check, provides hysteresis for granularity */
382 if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
383 * WQ_HYSTERESIS_FACTOR))
384 wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
385 else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
386 wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
388 /* clamp granularity down to the worst yielded cycle count */
389 wq->cycles.granularity = MIN(wq->cycles.granularity, wq->cycles.worst);
391 #undef WQ_HYSTERIS_FACTOR
394 wq->cycles.total += cycles;
397 printf ("%s: cycles %d, new: best %d, worst %d\n",
398 __func__, cycles, wq->cycles.best, wq->cycles.granularity);
401 /* Is the queue done yet? If it is, call the completion callback. */
402 if (listcount (wq->items) > 0)
403 work_queue_schedule (wq, 0);
404 else if (wq->spec.completion_func)
405 wq->spec.completion_func (wq);