]> git.sommitrealweird.co.uk Git - quagga-debian.git/blob - lib/workqueue.c
New upstream version 1.2.4
[quagga-debian.git] / lib / workqueue.c
1 /* 
2  * Quagga Work Queue Support.
3  *
4  * Copyright (C) 2005 Sun Microsystems, Inc.
5  *
6  * This file is part of GNU Zebra.
7  *
8  * Quagga is free software; you can redistribute it and/or modify it
9  * under the terms of the GNU General Public License as published by the
10  * Free Software Foundation; either version 2, or (at your option) any
11  * later version.
12  *
13  * Quagga is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with Quagga; see the file COPYING.  If not, write to the Free
20  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
21  * 02111-1307, USA.  
22  */
23
24 #include <zebra.h>
25 #include "thread.h"
26 #include "memory.h"
27 #include "workqueue.h"
28 #include "linklist.h"
29 #include "command.h"
30 #include "log.h"
31
32 /* master list of work_queues */
33 static struct list _work_queues;
34 /* pointer primarily to avoid an otherwise harmless warning on
35  * ALL_LIST_ELEMENTS_RO 
36  */
37 static struct list *work_queues = &_work_queues;
38
39 #define WORK_QUEUE_MIN_GRANULARITY 1
40
41 static struct work_queue_item *
42 work_queue_item_new (struct work_queue *wq)
43 {
44   struct work_queue_item *item;
45   assert (wq);
46
47   item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, 
48                   sizeof (struct work_queue_item));
49   
50   return item;
51 }
52
53 static void
54 work_queue_item_free (struct work_queue_item *item)
55 {
56   XFREE (MTYPE_WORK_QUEUE_ITEM, item);
57   return;
58 }
59
60 /* create new work queue */
61 struct work_queue *
62 work_queue_new (struct thread_master *m, const char *queue_name)
63 {
64   struct work_queue *new;
65   
66   new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue));
67
68   if (new == NULL)
69     return new;
70   
71   new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name);
72   new->master = m;
73   SET_FLAG (new->flags, WQ_UNPLUGGED);
74   
75   if ( (new->items = list_new ()) == NULL)
76     {
77       XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
78       XFREE (MTYPE_WORK_QUEUE, new);
79       
80       return NULL;
81     }
82   
83   new->items->del = (void (*)(void *)) work_queue_item_free;  
84   
85   listnode_add (work_queues, new);
86   
87   new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
88   new->cycles.worst = UINT_MAX;
89   
90   /* Default values, can be overriden by caller */
91   new->spec.hold = WORK_QUEUE_DEFAULT_HOLD;
92     
93   return new;
94 }
95
96 void
97 work_queue_free (struct work_queue *wq)
98 {
99   if (wq->thread != NULL)
100     thread_cancel(wq->thread);
101   
102   /* list_delete frees items via callback */
103   list_delete (wq->items);
104   listnode_delete (work_queues, wq);
105   
106   XFREE (MTYPE_WORK_QUEUE_NAME, wq->name);
107   XFREE (MTYPE_WORK_QUEUE, wq);
108   return;
109 }
110
111 bool
112 work_queue_is_scheduled (struct work_queue *wq)
113 {
114   return (wq->thread != NULL);
115 }
116
117 static int
118 work_queue_schedule (struct work_queue *wq, unsigned int delay)
119 {
120   /* if appropriate, schedule work queue thread */
121   if ( CHECK_FLAG (wq->flags, WQ_UNPLUGGED)
122        && (wq->thread == NULL)
123        && (listcount (wq->items) > 0) )
124     {
125       wq->thread = thread_add_background (wq->master, work_queue_run, 
126                                           wq, delay);
127       return 1;
128     }
129   else
130     return 0;
131 }
132   
133 void
134 work_queue_add (struct work_queue *wq, void *data)
135 {
136   struct work_queue_item *item;
137   
138   assert (wq);
139
140   if (!(item = work_queue_item_new (wq)))
141     {
142       zlog_err ("%s: unable to get new queue item", __func__);
143       return;
144     }
145   
146   item->data = data;
147   listnode_add (wq->items, item);
148   
149   work_queue_schedule (wq, wq->spec.hold);
150   
151   return;
152 }
153
154 static void
155 work_queue_item_remove (struct work_queue *wq, struct listnode *ln)
156 {
157   struct work_queue_item *item = listgetdata (ln);
158
159   assert (item && item->data);
160
161   /* call private data deletion callback if needed */  
162   if (wq->spec.del_item_data)
163     wq->spec.del_item_data (wq, item->data);
164
165   list_delete_node (wq->items, ln);
166   work_queue_item_free (item);
167   
168   return;
169 }
170
171 static void
172 work_queue_item_requeue (struct work_queue *wq, struct listnode *ln)
173 {
174   LISTNODE_DETACH (wq->items, ln);
175   LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */
176 }
177
178 DEFUN(show_work_queues,
179       show_work_queues_cmd,
180       "show work-queues",
181       SHOW_STR
182       "Work Queue information\n")
183 {
184   struct listnode *node;
185   struct work_queue *wq;
186   
187   vty_out (vty, 
188            "%c %8s %5s %8s %21s %6s %5s%s",
189            ' ', "List","(ms) ","Q. Runs","Cycle Counts   ",
190            " ","Worst",
191            VTY_NEWLINE);
192   vty_out (vty,
193            "%c %8s %5s %8s %7s %6s %6s %6s %5s %s%s",
194            'P',
195            "Items",
196            "Hold",
197            "Total",
198            "Best","Worst","Gran.","Avg.", "Lat.",
199            "Name", 
200            VTY_NEWLINE);
201  
202   for (ALL_LIST_ELEMENTS_RO (work_queues, node, wq))
203     {
204       vty_out (vty,"%c %8u %5u %8lu %7u %6u %6u %6u %5lu %s%s",
205                (CHECK_FLAG (wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'),
206                listcount (wq->items),
207                wq->spec.hold,
208                wq->runs,
209                wq->cycles.best, 
210                MIN(wq->cycles.best, wq->cycles.worst),
211                wq->cycles.granularity,
212                  (wq->runs) ? 
213                    (unsigned int) (wq->cycles.total / wq->runs) : 0,
214                wq->worst_usec,
215                wq->name,
216                VTY_NEWLINE);
217     }
218     
219   return CMD_SUCCESS;
220 }
221
222 /* 'plug' a queue: Stop it from being scheduled,
223  * ie: prevent the queue from draining.
224  */
225 void
226 work_queue_plug (struct work_queue *wq)
227 {
228   if (wq->thread)
229     thread_cancel (wq->thread);
230   
231   wq->thread = NULL;
232   
233   UNSET_FLAG (wq->flags, WQ_UNPLUGGED);
234 }
235
236 /* unplug queue, schedule it again, if appropriate
237  * Ie: Allow the queue to be drained again
238  */
239 void
240 work_queue_unplug (struct work_queue *wq)
241 {
242   SET_FLAG (wq->flags, WQ_UNPLUGGED);
243
244   /* if thread isnt already waiting, add one */
245   work_queue_schedule (wq, wq->spec.hold);
246 }
247
248 /* timer thread to process a work queue
249  * will reschedule itself if required,
250  * otherwise work_queue_item_add 
251  */
252 int
253 work_queue_run (struct thread *thread)
254 {
255   struct work_queue *wq;
256   struct work_queue_item *item;
257   unsigned long took;
258   wq_item_status ret;
259   unsigned int cycles = 0;
260   struct listnode *node, *nnode;
261   char yielded = 0;
262
263   wq = THREAD_ARG (thread);
264   wq->thread = NULL;
265
266   assert (wq && wq->items);
267
268   /* calculate cycle granularity:
269    * list iteration == 1 cycle
270    * granularity == # cycles between checks whether we should yield.
271    *
272    * granularity should be > 0, and can increase slowly after each run to
273    * provide some hysteris, but not past cycles.best or 2*cycles.
274    *
275    * Best: starts low, can only increase
276    *
277    * Worst: starts at MAX, can only decrease.
278    *
279    * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased 
280    *              if we run to end of time slot, can increase otherwise 
281    *              by a small factor.
282    *
283    * We could use just the average and save some work, however we want to be
284    * able to adjust quickly to CPU pressure. Average wont shift much if
285    * daemon has been running a long time.
286    */
287    if (wq->cycles.granularity == 0)
288      wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY;
289
290   for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item))
291   {
292     assert (item && item->data);
293     
294     /* dont run items which are past their allowed retries */
295     if (item->ran > wq->spec.max_retries)
296       {
297         /* run error handler, if any */
298         if (wq->spec.errorfunc)
299           wq->spec.errorfunc (wq, item->data);
300         work_queue_item_remove (wq, node);
301         continue;
302       }
303
304     /* run and take care of items that want to be retried immediately */
305     do
306       {
307         ret = wq->spec.workfunc (wq, item->data);
308         item->ran++;
309       }
310     while ((ret == WQ_RETRY_NOW) 
311            && (item->ran < wq->spec.max_retries));
312
313     switch (ret)
314       {
315       case WQ_QUEUE_BLOCKED:
316         {
317           /* decrement item->ran again, cause this isn't an item
318            * specific error, and fall through to WQ_RETRY_LATER
319            */
320           item->ran--;
321         }
322       case WQ_RETRY_LATER:
323         {
324           goto stats;
325         }
326       case WQ_REQUEUE:
327         {
328           item->ran--;
329           work_queue_item_requeue (wq, node);
330           break;
331         }
332       case WQ_RETRY_NOW:
333         /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
334       case WQ_ERROR:
335         {
336           if (wq->spec.errorfunc)
337             wq->spec.errorfunc (wq, item);
338         }
339         /* fall through here is deliberate */
340       case WQ_SUCCESS:
341       default:
342         {
343           work_queue_item_remove (wq, node);
344           break;
345         }
346       }
347
348     /* completed cycle */
349     cycles++;
350
351     /* test if we should yield */
352     if ( !(cycles % wq->cycles.granularity) 
353         && (took = thread_should_yield (thread)))
354       {
355         yielded = 1;
356         goto stats;
357       }
358   }
359
360 stats:
361
362 #define WQ_HYSTERESIS_FACTOR 4
363
364   if (cycles > wq->cycles.best)
365     wq->cycles.best = cycles;
366   
367   if (took > wq->worst_usec)
368     wq->worst_usec = took;
369     
370   /* we yielded, check whether granularity should be reduced */
371   if (yielded && (cycles < wq->cycles.granularity))
372     {
373       wq->cycles.granularity = ((cycles > 0) ? cycles 
374                                              : WORK_QUEUE_MIN_GRANULARITY);
375       if (cycles < wq->cycles.worst)
376         wq->cycles.worst = cycles;
377     }
378   /* otherwise, should granularity increase? */
379   else if (cycles >= (wq->cycles.granularity))
380     {
381       /* along with yielded check, provides hysteresis for granularity */      
382       if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR
383                                            * WQ_HYSTERESIS_FACTOR))
384         wq->cycles.granularity *= WQ_HYSTERESIS_FACTOR; /* quick ramp-up */
385       else if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR))
386         wq->cycles.granularity += WQ_HYSTERESIS_FACTOR;
387         
388       /* clamp granularity down to the worst yielded cycle count */
389       wq->cycles.granularity = MIN(wq->cycles.granularity, wq->cycles.worst);
390     }
391 #undef WQ_HYSTERIS_FACTOR
392   
393   wq->runs++;
394   wq->cycles.total += cycles;
395
396 #if 0
397   printf ("%s: cycles %d, new: best %d, worst %d\n",
398             __func__, cycles, wq->cycles.best, wq->cycles.granularity);
399 #endif
400   
401   /* Is the queue done yet? If it is, call the completion callback. */
402   if (listcount (wq->items) > 0)
403     work_queue_schedule (wq, 0);
404   else if (wq->spec.completion_func)
405     wq->spec.completion_func (wq);
406   
407   return 0;
408 }