Import Upstream version 1.2.2
[quagga-debian.git] / zebra / zebra_fpm.c
1 /*
2  * Main implementation file for interface to Forwarding Plane Manager.
3  *
4  * Copyright (C) 2012 by Open Source Routing.
5  * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6  *
7  * This file is part of GNU Zebra.
8  *
9  * GNU Zebra is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2, or (at your option) any
12  * later version.
13  *
14  * GNU Zebra is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with GNU Zebra; see the file COPYING.  If not, write to the Free
21  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22  * 02111-1307, USA.
23  */
24
25 #include <zebra.h>
26
27 #include "log.h"
28 #include "stream.h"
29 #include "thread.h"
30 #include "network.h"
31 #include "command.h"
32
33 #include "zebra/rib.h"
34
35 #include "fpm/fpm.h"
36 #include "zebra_fpm.h"
37 #include "zebra_fpm_private.h"
38
39 /*
40  * Interval at which we attempt to connect to the FPM.
41  */
42 #define ZFPM_CONNECT_RETRY_IVL   5
43
44 /*
45  * Sizes of outgoing and incoming stream buffers for writing/reading
46  * FPM messages.
47  */
48 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
49 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
50
51 /*
52  * The maximum number of times the FPM socket write callback can call
53  * 'write' before it yields.
54  */
55 #define ZFPM_MAX_WRITES_PER_RUN 10
56
57 /*
58  * Interval over which we collect statistics.
59  */
60 #define ZFPM_STATS_IVL_SECS        10
61
62 /*
63  * Structure that holds state for iterating over all route_node
64  * structures that are candidates for being communicated to the FPM.
65  */
66 typedef struct zfpm_rnodes_iter_t_
67 {
68   rib_tables_iter_t tables_iter;
69   route_table_iter_t iter;
70 } zfpm_rnodes_iter_t;
71
72 /*
73  * Statistics.
74  */
75 typedef struct zfpm_stats_t_ {
76   unsigned long connect_calls;
77   unsigned long connect_no_sock;
78
79   unsigned long read_cb_calls;
80
81   unsigned long write_cb_calls;
82   unsigned long write_calls;
83   unsigned long partial_writes;
84   unsigned long max_writes_hit;
85   unsigned long t_write_yields;
86
87   unsigned long nop_deletes_skipped;
88   unsigned long route_adds;
89   unsigned long route_dels;
90
91   unsigned long updates_triggered;
92   unsigned long redundant_triggers;
93   unsigned long non_fpm_table_triggers;
94
95   unsigned long dests_del_after_update;
96
97   unsigned long t_conn_down_starts;
98   unsigned long t_conn_down_dests_processed;
99   unsigned long t_conn_down_yields;
100   unsigned long t_conn_down_finishes;
101
102   unsigned long t_conn_up_starts;
103   unsigned long t_conn_up_dests_processed;
104   unsigned long t_conn_up_yields;
105   unsigned long t_conn_up_aborts;
106   unsigned long t_conn_up_finishes;
107
108 } zfpm_stats_t;
109
110 /*
111  * States for the FPM state machine.
112  */
113 typedef enum {
114
115   /*
116    * In this state we are not yet ready to connect to the FPM. This
117    * can happen when this module is disabled, or if we're cleaning up
118    * after a connection has gone down.
119    */
120   ZFPM_STATE_IDLE,
121
122   /*
123    * Ready to talk to the FPM and periodically trying to connect to
124    * it.
125    */
126   ZFPM_STATE_ACTIVE,
127
128   /*
129    * In the middle of bringing up a TCP connection. Specifically,
130    * waiting for a connect() call to complete asynchronously.
131    */
132   ZFPM_STATE_CONNECTING,
133
134   /*
135    * TCP connection to the FPM is up.
136    */
137   ZFPM_STATE_ESTABLISHED
138
139 } zfpm_state_t;
140
141 /*
142  * Message format to be used to communicate with the FPM.
143  */
144 typedef enum
145 {
146   ZFPM_MSG_FORMAT_NONE,
147   ZFPM_MSG_FORMAT_NETLINK,
148   ZFPM_MSG_FORMAT_PROTOBUF,
149 } zfpm_msg_format_e;
150 /*
151  * Globals.
152  */
153 typedef struct zfpm_glob_t_
154 {
155
156   /*
157    * True if the FPM module has been enabled.
158    */
159   int enabled;
160
161   /*
162    * Message format to be used to communicate with the fpm.
163    */
164   zfpm_msg_format_e message_format;
165
166   struct thread_master *master;
167
168   zfpm_state_t state;
169
170   in_addr_t   fpm_server;
171   /*
172    * Port on which the FPM is running.
173    */
174   int fpm_port;
175
176   /*
177    * List of rib_dest_t structures to be processed
178    */
179   TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
180
181   /*
182    * Stream socket to the FPM.
183    */
184   int sock;
185
186   /*
187    * Buffers for messages to/from the FPM.
188    */
189   struct stream *obuf;
190   struct stream *ibuf;
191
192   /*
193    * Threads for I/O.
194    */
195   struct thread *t_connect;
196   struct thread *t_write;
197   struct thread *t_read;
198
199   /*
200    * Thread to clean up after the TCP connection to the FPM goes down
201    * and the state that belongs to it.
202    */
203   struct thread *t_conn_down;
204
205   struct {
206     zfpm_rnodes_iter_t iter;
207   } t_conn_down_state;
208
209   /*
210    * Thread to take actions once the TCP conn to the FPM comes up, and
211    * the state that belongs to it.
212    */
213   struct thread *t_conn_up;
214
215   struct {
216     zfpm_rnodes_iter_t iter;
217   } t_conn_up_state;
218
219   unsigned long connect_calls;
220   time_t last_connect_call_time;
221
222   /*
223    * Stats from the start of the current statistics interval up to
224    * now. These are the counters we typically update in the code.
225    */
226   zfpm_stats_t stats;
227
228   /*
229    * Statistics that were gathered in the last collection interval.
230    */
231   zfpm_stats_t last_ivl_stats;
232
233   /*
234    * Cumulative stats from the last clear to the start of the current
235    * statistics interval.
236    */
237   zfpm_stats_t cumulative_stats;
238
239   /*
240    * Stats interval timer.
241    */
242   struct thread *t_stats;
243
244   /*
245    * If non-zero, the last time when statistics were cleared.
246    */
247   time_t last_stats_clear_time;
248
249 } zfpm_glob_t;
250
251 static zfpm_glob_t zfpm_glob_space;
252 static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
253
254 static int zfpm_read_cb (struct thread *thread);
255 static int zfpm_write_cb (struct thread *thread);
256
257 static void zfpm_set_state (zfpm_state_t state, const char *reason);
258 static void zfpm_start_connect_timer (const char *reason);
259 static void zfpm_start_stats_timer (void);
260
261 /*
262  * zfpm_thread_should_yield
263  */
264 static inline int
265 zfpm_thread_should_yield (struct thread *t)
266 {
267   return thread_should_yield (t);
268 }
269
270 /*
271  * zfpm_state_to_str
272  */
273 static const char *
274 zfpm_state_to_str (zfpm_state_t state)
275 {
276   switch (state)
277     {
278
279     case ZFPM_STATE_IDLE:
280       return "idle";
281
282     case ZFPM_STATE_ACTIVE:
283       return "active";
284
285     case ZFPM_STATE_CONNECTING:
286       return "connecting";
287
288     case ZFPM_STATE_ESTABLISHED:
289       return "established";
290
291     default:
292       return "unknown";
293     }
294 }
295
296 /*
297  * zfpm_get_time
298  */
299 static time_t
300 zfpm_get_time (void)
301 {
302   struct timeval tv;
303
304   if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
305     zlog_warn ("FPM: quagga_gettime failed!!");
306
307   return tv.tv_sec;
308 }
309
310 /*
311  * zfpm_get_elapsed_time
312  *
313  * Returns the time elapsed (in seconds) since the given time.
314  */
315 static time_t
316 zfpm_get_elapsed_time (time_t reference)
317 {
318   time_t now;
319
320   now = zfpm_get_time ();
321
322   if (now < reference)
323     {
324       assert (0);
325       return 0;
326     }
327
328   return now - reference;
329 }
330
331 /*
332  * zfpm_is_table_for_fpm
333  *
334  * Returns TRUE if the the given table is to be communicated to the
335  * FPM.
336  */
337 static inline int
338 zfpm_is_table_for_fpm (struct route_table *table)
339 {
340   rib_table_info_t *info;
341
342   info = rib_table_info (table);
343
344   /*
345    * We only send the unicast tables in the main instance to the FPM
346    * at this point.
347    */
348   if (info->zvrf->vrf_id != 0)
349     return 0;
350
351   if (info->safi != SAFI_UNICAST)
352     return 0;
353
354   return 1;
355 }
356
357 /*
358  * zfpm_rnodes_iter_init
359  */
360 static inline void
361 zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
362 {
363   memset (iter, 0, sizeof (*iter));
364   rib_tables_iter_init (&iter->tables_iter);
365
366   /*
367    * This is a hack, but it makes implementing 'next' easier by
368    * ensuring that route_table_iter_next() will return NULL the first
369    * time we call it.
370    */
371   route_table_iter_init (&iter->iter, NULL);
372   route_table_iter_cleanup (&iter->iter);
373 }
374
375 /*
376  * zfpm_rnodes_iter_next
377  */
378 static inline struct route_node *
379 zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
380 {
381   struct route_node *rn;
382   struct route_table *table;
383
384   while (1)
385     {
386       rn = route_table_iter_next (&iter->iter);
387       if (rn)
388         return rn;
389
390       /*
391        * We've made our way through this table, go to the next one.
392        */
393       route_table_iter_cleanup (&iter->iter);
394
395       while ((table = rib_tables_iter_next (&iter->tables_iter)))
396         {
397           if (zfpm_is_table_for_fpm (table))
398             break;
399         }
400
401       if (!table)
402         return NULL;
403
404       route_table_iter_init (&iter->iter, table);
405     }
406
407   return NULL;
408 }
409
410 /*
411  * zfpm_rnodes_iter_pause
412  */
413 static inline void
414 zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
415 {
416   route_table_iter_pause (&iter->iter);
417 }
418
419 /*
420  * zfpm_rnodes_iter_cleanup
421  */
422 static inline void
423 zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
424 {
425   route_table_iter_cleanup (&iter->iter);
426   rib_tables_iter_cleanup (&iter->tables_iter);
427 }
428
429 /*
430  * zfpm_stats_init
431  *
432  * Initialize a statistics block.
433  */
434 static inline void
435 zfpm_stats_init (zfpm_stats_t *stats)
436 {
437   memset (stats, 0, sizeof (*stats));
438 }
439
440 /*
441  * zfpm_stats_reset
442  */
443 static inline void
444 zfpm_stats_reset (zfpm_stats_t *stats)
445 {
446   zfpm_stats_init (stats);
447 }
448
449 /*
450  * zfpm_stats_copy
451  */
452 static inline void
453 zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
454 {
455   memcpy (dest, src, sizeof (*dest));
456 }
457
458 /*
459  * zfpm_stats_compose
460  *
461  * Total up the statistics in two stats structures ('s1 and 's2') and
462  * return the result in the third argument, 'result'. Note that the
463  * pointer 'result' may be the same as 's1' or 's2'.
464  *
465  * For simplicity, the implementation below assumes that the stats
466  * structure is composed entirely of counters. This can easily be
467  * changed when necessary.
468  */
469 static void
470 zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
471                     zfpm_stats_t *result)
472 {
473   const unsigned long *p1, *p2;
474   unsigned long *result_p;
475   int i, num_counters;
476
477   p1 = (const unsigned long *) s1;
478   p2 = (const unsigned long *) s2;
479   result_p = (unsigned long *) result;
480
481   num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
482
483   for (i = 0; i < num_counters; i++)
484     {
485       result_p[i] = p1[i] + p2[i];
486     }
487 }
488
489 /*
490  * zfpm_read_on
491  */
492 static inline void
493 zfpm_read_on (void)
494 {
495   assert (!zfpm_g->t_read);
496   assert (zfpm_g->sock >= 0);
497
498   THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
499                   zfpm_g->sock);
500 }
501
502 /*
503  * zfpm_write_on
504  */
505 static inline void
506 zfpm_write_on (void)
507 {
508   assert (!zfpm_g->t_write);
509   assert (zfpm_g->sock >= 0);
510
511   THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
512                    zfpm_g->sock);
513 }
514
515 /*
516  * zfpm_read_off
517  */
518 static inline void
519 zfpm_read_off (void)
520 {
521   THREAD_READ_OFF (zfpm_g->t_read);
522 }
523
524 /*
525  * zfpm_write_off
526  */
527 static inline void
528 zfpm_write_off (void)
529 {
530   THREAD_WRITE_OFF (zfpm_g->t_write);
531 }
532
533 /*
534  * zfpm_conn_up_thread_cb
535  *
536  * Callback for actions to be taken when the connection to the FPM
537  * comes up.
538  */
539 static int
540 zfpm_conn_up_thread_cb (struct thread *thread)
541 {
542   struct route_node *rnode;
543   zfpm_rnodes_iter_t *iter;
544   rib_dest_t *dest;
545
546   assert (zfpm_g->t_conn_up);
547   zfpm_g->t_conn_up = NULL;
548
549   iter = &zfpm_g->t_conn_up_state.iter;
550
551   if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
552     {
553       zfpm_debug ("Connection not up anymore, conn_up thread aborting");
554       zfpm_g->stats.t_conn_up_aborts++;
555       goto done;
556     }
557
558   while ((rnode = zfpm_rnodes_iter_next (iter)))
559     {
560       dest = rib_dest_from_rnode (rnode);
561
562       if (dest)
563         {
564           zfpm_g->stats.t_conn_up_dests_processed++;
565           zfpm_trigger_update (rnode, NULL);
566         }
567
568       /*
569        * Yield if need be.
570        */
571       if (!zfpm_thread_should_yield (thread))
572         continue;
573
574       zfpm_g->stats.t_conn_up_yields++;
575       zfpm_rnodes_iter_pause (iter);
576       zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
577                                                  zfpm_conn_up_thread_cb,
578                                                  0, 0);
579       return 0;
580     }
581
582   zfpm_g->stats.t_conn_up_finishes++;
583
584  done:
585   zfpm_rnodes_iter_cleanup (iter);
586   return 0;
587 }
588
589 /*
590  * zfpm_connection_up
591  *
592  * Called when the connection to the FPM comes up.
593  */
594 static void
595 zfpm_connection_up (const char *detail)
596 {
597   assert (zfpm_g->sock >= 0);
598   zfpm_read_on ();
599   zfpm_write_on ();
600   zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
601
602   /*
603    * Start thread to push existing routes to the FPM.
604    */
605   assert (!zfpm_g->t_conn_up);
606
607   zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
608
609   zfpm_debug ("Starting conn_up thread");
610   zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
611                                              zfpm_conn_up_thread_cb, 0, 0);
612   zfpm_g->stats.t_conn_up_starts++;
613 }
614
615 /*
616  * zfpm_connect_check
617  *
618  * Check if an asynchronous connect() to the FPM is complete.
619  */
620 static void
621 zfpm_connect_check ()
622 {
623   int status;
624   socklen_t slen;
625   int ret;
626
627   zfpm_read_off ();
628   zfpm_write_off ();
629
630   slen = sizeof (status);
631   ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
632                     &slen);
633
634   if (ret >= 0 && status == 0)
635     {
636       zfpm_connection_up ("async connect complete");
637       return;
638     }
639
640   /*
641    * getsockopt() failed or indicated an error on the socket.
642    */
643   close (zfpm_g->sock);
644   zfpm_g->sock = -1;
645
646   zfpm_start_connect_timer ("getsockopt() after async connect failed");
647   return;
648 }
649
650 /*
651  * zfpm_conn_down_thread_cb
652  *
653  * Callback that is invoked to clean up state after the TCP connection
654  * to the FPM goes down.
655  */
656 static int
657 zfpm_conn_down_thread_cb (struct thread *thread)
658 {
659   struct route_node *rnode;
660   zfpm_rnodes_iter_t *iter;
661   rib_dest_t *dest;
662
663   assert (zfpm_g->state == ZFPM_STATE_IDLE);
664
665   assert (zfpm_g->t_conn_down);
666   zfpm_g->t_conn_down = NULL;
667
668   iter = &zfpm_g->t_conn_down_state.iter;
669
670   while ((rnode = zfpm_rnodes_iter_next (iter)))
671     {
672       dest = rib_dest_from_rnode (rnode);
673
674       if (dest)
675         {
676           if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
677             {
678               TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
679             }
680
681           UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
682           UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
683
684           zfpm_g->stats.t_conn_down_dests_processed++;
685
686           /*
687            * Check if the dest should be deleted.
688            */
689           rib_gc_dest(rnode);
690         }
691
692       /*
693        * Yield if need be.
694        */
695       if (!zfpm_thread_should_yield (thread))
696         continue;
697
698       zfpm_g->stats.t_conn_down_yields++;
699       zfpm_rnodes_iter_pause (iter);
700       zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
701                                                    zfpm_conn_down_thread_cb,
702                                                    0, 0);
703       return 0;
704     }
705
706   zfpm_g->stats.t_conn_down_finishes++;
707   zfpm_rnodes_iter_cleanup (iter);
708
709   /*
710    * Start the process of connecting to the FPM again.
711    */
712   zfpm_start_connect_timer ("cleanup complete");
713   return 0;
714 }
715
716 /*
717  * zfpm_connection_down
718  *
719  * Called when the connection to the FPM has gone down.
720  */
721 static void
722 zfpm_connection_down (const char *detail)
723 {
724   if (!detail)
725     detail = "unknown";
726
727   assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
728
729   zlog_info ("connection to the FPM has gone down: %s", detail);
730
731   zfpm_read_off ();
732   zfpm_write_off ();
733
734   stream_reset (zfpm_g->ibuf);
735   stream_reset (zfpm_g->obuf);
736
737   if (zfpm_g->sock >= 0) {
738     close (zfpm_g->sock);
739     zfpm_g->sock = -1;
740   }
741
742   /*
743    * Start thread to clean up state after the connection goes down.
744    */
745   assert (!zfpm_g->t_conn_down);
746   zfpm_debug ("Starting conn_down thread");
747   zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
748   zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
749                                                zfpm_conn_down_thread_cb, 0, 0);
750   zfpm_g->stats.t_conn_down_starts++;
751
752   zfpm_set_state (ZFPM_STATE_IDLE, detail);
753 }
754
755 /*
756  * zfpm_read_cb
757  */
758 static int
759 zfpm_read_cb (struct thread *thread)
760 {
761   size_t already;
762   struct stream *ibuf;
763   uint16_t msg_len;
764   fpm_msg_hdr_t *hdr;
765
766   zfpm_g->stats.read_cb_calls++;
767   assert (zfpm_g->t_read);
768   zfpm_g->t_read = NULL;
769
770   /*
771    * Check if async connect is now done.
772    */
773   if (zfpm_g->state == ZFPM_STATE_CONNECTING)
774     {
775       zfpm_connect_check();
776       return 0;
777     }
778
779   assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
780   assert (zfpm_g->sock >= 0);
781
782   ibuf = zfpm_g->ibuf;
783
784   already = stream_get_endp (ibuf);
785   if (already < FPM_MSG_HDR_LEN)
786     {
787       ssize_t nbyte;
788
789       nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
790       if (nbyte == 0 || nbyte == -1)
791         {
792           zfpm_connection_down ("closed socket in read");
793           return 0;
794         }
795
796       if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
797         goto done;
798
799       already = FPM_MSG_HDR_LEN;
800     }
801
802   stream_set_getp (ibuf, 0);
803
804   hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
805
806   if (!fpm_msg_hdr_ok (hdr))
807     {
808       zfpm_connection_down ("invalid message header");
809       return 0;
810     }
811
812   msg_len = fpm_msg_len (hdr);
813
814   /*
815    * Read out the rest of the packet.
816    */
817   if (already < msg_len)
818     {
819       ssize_t nbyte;
820
821       nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
822
823       if (nbyte == 0 || nbyte == -1)
824         {
825           zfpm_connection_down ("failed to read message");
826           return 0;
827         }
828
829       if (nbyte != (ssize_t) (msg_len - already))
830         goto done;
831     }
832
833   zfpm_debug ("Read out a full fpm message");
834
835   /*
836    * Just throw it away for now.
837    */
838   stream_reset (ibuf);
839
840  done:
841   zfpm_read_on ();
842   return 0;
843 }
844
845 /*
846  * zfpm_writes_pending
847  *
848  * Returns TRUE if we may have something to write to the FPM.
849  */
850 static int
851 zfpm_writes_pending (void)
852 {
853
854   /*
855    * Check if there is any data in the outbound buffer that has not
856    * been written to the socket yet.
857    */
858   if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
859     return 1;
860
861   /*
862    * Check if there are any prefixes on the outbound queue.
863    */
864   if (!TAILQ_EMPTY (&zfpm_g->dest_q))
865     return 1;
866
867   return 0;
868 }
869
870 /*
871  * zfpm_encode_route
872  *
873  * Encode a message to the FPM with information about the given route.
874  *
875  * Returns the number of bytes written to the buffer. 0 or a negative
876  * value indicates an error.
877  */
878 static inline int
879 zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
880                    size_t in_buf_len, fpm_msg_type_e *msg_type)
881 {
882   size_t len;
883   int cmd;
884   len = 0;
885
886   *msg_type = FPM_MSG_TYPE_NONE;
887
888   switch (zfpm_g->message_format) {
889
890   case ZFPM_MSG_FORMAT_PROTOBUF:
891 #ifdef HAVE_PROTOBUF
892     len = zfpm_protobuf_encode_route (dest, rib, (uint8_t *) in_buf,
893                                       in_buf_len);
894     *msg_type = FPM_MSG_TYPE_PROTOBUF;
895 #endif
896     break;
897
898   case ZFPM_MSG_FORMAT_NETLINK:
899 #ifdef HAVE_NETLINK
900     *msg_type = FPM_MSG_TYPE_NETLINK;
901     cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
902     len = zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
903     assert(fpm_msg_align(len) == len);
904     *msg_type = FPM_MSG_TYPE_NETLINK;
905 #endif /* HAVE_NETLINK */
906     break;
907
908   default:
909     break;
910   }
911
912   return len;
913
914 }
915
916 /*
917  * zfpm_route_for_update
918  *
919  * Returns the rib that is to be sent to the FPM for a given dest.
920  */
921 struct rib *
922 zfpm_route_for_update (rib_dest_t *dest)
923 {
924   struct rib *rib;
925
926   RIB_DEST_FOREACH_ROUTE (dest, rib)
927     {
928       if (!CHECK_FLAG (rib->status, RIB_ENTRY_SELECTED_FIB))
929         continue;
930
931       return rib;
932     }
933
934   /*
935    * We have no route for this destination.
936    */
937   return NULL;
938 }
939
940 /*
941  * zfpm_build_updates
942  *
943  * Process the outgoing queue and write messages to the outbound
944  * buffer.
945  */
946 static void
947 zfpm_build_updates (void)
948 {
949   struct stream *s;
950   rib_dest_t *dest;
951   unsigned char *buf, *data, *buf_end;
952   size_t msg_len;
953   size_t data_len;
954   fpm_msg_hdr_t *hdr;
955   struct rib *rib;
956   int is_add, write_msg;
957   fpm_msg_type_e msg_type;
958
959   s = zfpm_g->obuf;
960
961   assert (stream_empty (s));
962
963   do {
964
965     /*
966      * Make sure there is enough space to write another message.
967      */
968     if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
969       break;
970
971     buf = STREAM_DATA (s) + stream_get_endp (s);
972     buf_end = buf + STREAM_WRITEABLE (s);
973
974     dest = TAILQ_FIRST (&zfpm_g->dest_q);
975     if (!dest)
976       break;
977
978     assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
979
980     hdr = (fpm_msg_hdr_t *) buf;
981     hdr->version = FPM_PROTO_VERSION;
982
983     data = fpm_msg_data (hdr);
984
985     rib = zfpm_route_for_update (dest);
986     is_add = rib ? 1 : 0;
987
988     write_msg = 1;
989
990     /*
991      * If this is a route deletion, and we have not sent the route to
992      * the FPM previously, skip it.
993      */
994     if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
995       {
996         write_msg = 0;
997         zfpm_g->stats.nop_deletes_skipped++;
998       }
999
1000     if (write_msg) {
1001       data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data,
1002                                     &msg_type);
1003
1004       assert (data_len);
1005       if (data_len)
1006         {
1007           hdr->msg_type = msg_type;
1008           msg_len = fpm_data_len_to_msg_len (data_len);
1009           hdr->msg_len = htons (msg_len);
1010           stream_forward_endp (s, msg_len);
1011
1012           if (is_add)
1013             zfpm_g->stats.route_adds++;
1014           else
1015             zfpm_g->stats.route_dels++;
1016         }
1017     }
1018
1019     /*
1020      * Remove the dest from the queue, and reset the flag.
1021      */
1022     UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1023     TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
1024
1025     if (is_add)
1026       {
1027         SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1028       }
1029     else
1030       {
1031         UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1032       }
1033
1034     /*
1035      * Delete the destination if necessary.
1036      */
1037     if (rib_gc_dest (dest->rnode))
1038       zfpm_g->stats.dests_del_after_update++;
1039
1040   } while (1);
1041
1042 }
1043
1044 /*
1045  * zfpm_write_cb
1046  */
1047 static int
1048 zfpm_write_cb (struct thread *thread)
1049 {
1050   struct stream *s;
1051   int num_writes;
1052
1053   zfpm_g->stats.write_cb_calls++;
1054   assert (zfpm_g->t_write);
1055   zfpm_g->t_write = NULL;
1056
1057   /*
1058    * Check if async connect is now done.
1059    */
1060   if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1061     {
1062       zfpm_connect_check ();
1063       return 0;
1064     }
1065
1066   assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1067   assert (zfpm_g->sock >= 0);
1068
1069   num_writes = 0;
1070
1071   do
1072     {
1073       int bytes_to_write, bytes_written;
1074
1075       s = zfpm_g->obuf;
1076
1077       /*
1078        * If the stream is empty, try fill it up with data.
1079        */
1080       if (stream_empty (s))
1081         {
1082           zfpm_build_updates ();
1083         }
1084
1085       bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1086       if (!bytes_to_write)
1087         break;
1088
1089       bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1090       zfpm_g->stats.write_calls++;
1091       num_writes++;
1092
1093       if (bytes_written < 0)
1094         {
1095           if (ERRNO_IO_RETRY (errno))
1096             break;
1097
1098           zfpm_connection_down ("failed to write to socket");
1099           return 0;
1100         }
1101
1102       if (bytes_written != bytes_to_write)
1103         {
1104
1105           /*
1106            * Partial write.
1107            */
1108           stream_forward_getp (s, bytes_written);
1109           zfpm_g->stats.partial_writes++;
1110           break;
1111         }
1112
1113       /*
1114        * We've written out the entire contents of the stream.
1115        */
1116       stream_reset (s);
1117
1118       if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1119         {
1120           zfpm_g->stats.max_writes_hit++;
1121           break;
1122         }
1123
1124       if (zfpm_thread_should_yield (thread))
1125         {
1126           zfpm_g->stats.t_write_yields++;
1127           break;
1128         }
1129     } while (1);
1130
1131   if (zfpm_writes_pending ())
1132       zfpm_write_on ();
1133
1134   return 0;
1135 }
1136
1137 /*
1138  * zfpm_connect_cb
1139  */
1140 static int
1141 zfpm_connect_cb (struct thread *t)
1142 {
1143   int sock, ret;
1144   struct sockaddr_in serv;
1145
1146   assert (zfpm_g->t_connect);
1147   zfpm_g->t_connect = NULL;
1148   assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1149
1150   sock = socket (AF_INET, SOCK_STREAM, 0);
1151   if (sock < 0)
1152     {
1153       zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1154       zfpm_g->stats.connect_no_sock++;
1155       return 0;
1156     }
1157
1158   set_nonblocking(sock);
1159
1160   /* Make server socket. */
1161   memset (&serv, 0, sizeof (serv));
1162   serv.sin_family = AF_INET;
1163   serv.sin_port = htons (zfpm_g->fpm_port);
1164 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1165   serv.sin_len = sizeof (struct sockaddr_in);
1166 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1167   if (!zfpm_g->fpm_server)
1168     serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1169   else 
1170     serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1171
1172   /*
1173    * Connect to the FPM.
1174    */
1175   zfpm_g->connect_calls++;
1176   zfpm_g->stats.connect_calls++;
1177   zfpm_g->last_connect_call_time = zfpm_get_time ();
1178
1179   ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1180   if (ret >= 0)
1181     {
1182       zfpm_g->sock = sock;
1183       zfpm_connection_up ("connect succeeded");
1184       return 1;
1185     }
1186
1187   if (errno == EINPROGRESS)
1188     {
1189       zfpm_g->sock = sock;
1190       zfpm_read_on ();
1191       zfpm_write_on ();
1192       zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1193       return 0;
1194     }
1195
1196   zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1197   close (sock);
1198
1199   /*
1200    * Restart timer for retrying connection.
1201    */
1202   zfpm_start_connect_timer ("connect() failed");
1203   return 0;
1204 }
1205
1206 /*
1207  * zfpm_set_state
1208  *
1209  * Move state machine into the given state.
1210  */
1211 static void
1212 zfpm_set_state (zfpm_state_t state, const char *reason)
1213 {
1214   zfpm_state_t cur_state = zfpm_g->state;
1215
1216   if (!reason)
1217     reason = "Unknown";
1218
1219   if (state == cur_state)
1220     return;
1221
1222   zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1223              zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1224              reason);
1225
1226   switch (state) {
1227
1228   case ZFPM_STATE_IDLE:
1229     assert (cur_state == ZFPM_STATE_ESTABLISHED);
1230     break;
1231
1232   case ZFPM_STATE_ACTIVE:
1233      assert (cur_state == ZFPM_STATE_IDLE ||
1234              cur_state == ZFPM_STATE_CONNECTING);
1235     assert (zfpm_g->t_connect);
1236     break;
1237
1238   case ZFPM_STATE_CONNECTING:
1239     assert (zfpm_g->sock);
1240     assert (cur_state == ZFPM_STATE_ACTIVE);
1241     assert (zfpm_g->t_read);
1242     assert (zfpm_g->t_write);
1243     break;
1244
1245   case ZFPM_STATE_ESTABLISHED:
1246     assert (cur_state == ZFPM_STATE_ACTIVE ||
1247             cur_state == ZFPM_STATE_CONNECTING);
1248     assert (zfpm_g->sock);
1249     assert (zfpm_g->t_read);
1250     assert (zfpm_g->t_write);
1251     break;
1252   }
1253
1254   zfpm_g->state = state;
1255 }
1256
1257 /*
1258  * zfpm_calc_connect_delay
1259  *
1260  * Returns the number of seconds after which we should attempt to
1261  * reconnect to the FPM.
1262  */
1263 static long
1264 zfpm_calc_connect_delay (void)
1265 {
1266   time_t elapsed;
1267
1268   /*
1269    * Return 0 if this is our first attempt to connect.
1270    */
1271   if (zfpm_g->connect_calls == 0)
1272     {
1273       return 0;
1274     }
1275
1276   elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1277
1278   if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1279     return 0;
1280   }
1281
1282   return ZFPM_CONNECT_RETRY_IVL - elapsed;
1283 }
1284
1285 /*
1286  * zfpm_start_connect_timer
1287  */
1288 static void
1289 zfpm_start_connect_timer (const char *reason)
1290 {
1291   long delay_secs;
1292
1293   assert (!zfpm_g->t_connect);
1294   assert (zfpm_g->sock < 0);
1295
1296   assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1297          zfpm_g->state == ZFPM_STATE_ACTIVE ||
1298          zfpm_g->state == ZFPM_STATE_CONNECTING);
1299
1300   delay_secs = zfpm_calc_connect_delay();
1301   zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1302
1303   THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1304                    delay_secs);
1305   zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1306 }
1307
1308 /*
1309  * zfpm_is_enabled
1310  *
1311  * Returns TRUE if the zebra FPM module has been enabled.
1312  */
1313 static inline int
1314 zfpm_is_enabled (void)
1315 {
1316   return zfpm_g->enabled;
1317 }
1318
1319 /*
1320  * zfpm_conn_is_up
1321  *
1322  * Returns TRUE if the connection to the FPM is up.
1323  */
1324 static inline int
1325 zfpm_conn_is_up (void)
1326 {
1327   if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1328     return 0;
1329
1330   assert (zfpm_g->sock >= 0);
1331
1332   return 1;
1333 }
1334
1335 /*
1336  * zfpm_trigger_update
1337  *
1338  * The zebra code invokes this function to indicate that we should
1339  * send an update to the FPM about the given route_node.
1340  */
1341 void
1342 zfpm_trigger_update (struct route_node *rn, const char *reason)
1343 {
1344   rib_dest_t *dest;
1345   char buf[PREFIX_STRLEN];
1346
1347   /*
1348    * Ignore if the connection is down. We will update the FPM about
1349    * all destinations once the connection comes up.
1350    */
1351   if (!zfpm_conn_is_up ())
1352     return;
1353
1354   dest = rib_dest_from_rnode (rn);
1355
1356   /*
1357    * Ignore the trigger if the dest is not in a table that we would
1358    * send to the FPM.
1359    */
1360   if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1361     {
1362       zfpm_g->stats.non_fpm_table_triggers++;
1363       return;
1364     }
1365
1366   if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1367     zfpm_g->stats.redundant_triggers++;
1368     return;
1369   }
1370
1371   if (reason)
1372     {
1373       zfpm_debug ("%s triggering update to FPM - Reason: %s",
1374                   prefix2str (&rn->p, buf, sizeof(buf)), reason);
1375     }
1376
1377   SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1378   TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1379   zfpm_g->stats.updates_triggered++;
1380
1381   /*
1382    * Make sure that writes are enabled.
1383    */
1384   if (zfpm_g->t_write)
1385     return;
1386
1387   zfpm_write_on ();
1388 }
1389
1390 /*
1391  * zfpm_stats_timer_cb
1392  */
1393 static int
1394 zfpm_stats_timer_cb (struct thread *t)
1395 {
1396   assert (zfpm_g->t_stats);
1397   zfpm_g->t_stats = NULL;
1398
1399   /*
1400    * Remember the stats collected in the last interval for display
1401    * purposes.
1402    */
1403   zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1404
1405   /*
1406    * Add the current set of stats into the cumulative statistics.
1407    */
1408   zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1409                       &zfpm_g->cumulative_stats);
1410
1411   /*
1412    * Start collecting stats afresh over the next interval.
1413    */
1414   zfpm_stats_reset (&zfpm_g->stats);
1415
1416   zfpm_start_stats_timer ();
1417
1418   return 0;
1419 }
1420
1421 /*
1422  * zfpm_stop_stats_timer
1423  */
1424 static void
1425 zfpm_stop_stats_timer (void)
1426 {
1427   if (!zfpm_g->t_stats)
1428     return;
1429
1430   zfpm_debug ("Stopping existing stats timer");
1431   THREAD_TIMER_OFF (zfpm_g->t_stats);
1432 }
1433
1434 /*
1435  * zfpm_start_stats_timer
1436  */
1437 void
1438 zfpm_start_stats_timer (void)
1439 {
1440   assert (!zfpm_g->t_stats);
1441
1442   THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1443                    ZFPM_STATS_IVL_SECS);
1444 }
1445
1446 /*
1447  * Helper macro for zfpm_show_stats() below.
1448  */
1449 #define ZFPM_SHOW_STAT(counter)                                         \
1450   do {                                                                  \
1451     vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1452              zfpm_g->last_ivl_stats.counter, VTY_NEWLINE);              \
1453   } while (0)
1454
1455 /*
1456  * zfpm_show_stats
1457  */
1458 static void
1459 zfpm_show_stats (struct vty *vty)
1460 {
1461   zfpm_stats_t total_stats;
1462   time_t elapsed;
1463
1464   vty_out (vty, "%s%-40s %10s     Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1465            "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1466
1467   /*
1468    * Compute the total stats up to this instant.
1469    */
1470   zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1471                       &total_stats);
1472
1473   ZFPM_SHOW_STAT (connect_calls);
1474   ZFPM_SHOW_STAT (connect_no_sock);
1475   ZFPM_SHOW_STAT (read_cb_calls);
1476   ZFPM_SHOW_STAT (write_cb_calls);
1477   ZFPM_SHOW_STAT (write_calls);
1478   ZFPM_SHOW_STAT (partial_writes);
1479   ZFPM_SHOW_STAT (max_writes_hit);
1480   ZFPM_SHOW_STAT (t_write_yields);
1481   ZFPM_SHOW_STAT (nop_deletes_skipped);
1482   ZFPM_SHOW_STAT (route_adds);
1483   ZFPM_SHOW_STAT (route_dels);
1484   ZFPM_SHOW_STAT (updates_triggered);
1485   ZFPM_SHOW_STAT (non_fpm_table_triggers);
1486   ZFPM_SHOW_STAT (redundant_triggers);
1487   ZFPM_SHOW_STAT (dests_del_after_update);
1488   ZFPM_SHOW_STAT (t_conn_down_starts);
1489   ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1490   ZFPM_SHOW_STAT (t_conn_down_yields);
1491   ZFPM_SHOW_STAT (t_conn_down_finishes);
1492   ZFPM_SHOW_STAT (t_conn_up_starts);
1493   ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1494   ZFPM_SHOW_STAT (t_conn_up_yields);
1495   ZFPM_SHOW_STAT (t_conn_up_aborts);
1496   ZFPM_SHOW_STAT (t_conn_up_finishes);
1497
1498   if (!zfpm_g->last_stats_clear_time)
1499     return;
1500
1501   elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1502
1503   vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1504            (unsigned long) elapsed, VTY_NEWLINE);
1505 }
1506
1507 /*
1508  * zfpm_clear_stats
1509  */
1510 static void
1511 zfpm_clear_stats (struct vty *vty)
1512 {
1513   if (!zfpm_is_enabled ())
1514     {
1515       vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1516       return;
1517     }
1518
1519   zfpm_stats_reset (&zfpm_g->stats);
1520   zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1521   zfpm_stats_reset (&zfpm_g->cumulative_stats);
1522
1523   zfpm_stop_stats_timer ();
1524   zfpm_start_stats_timer ();
1525
1526   zfpm_g->last_stats_clear_time = zfpm_get_time();
1527
1528   vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1529 }
1530
1531 /*
1532  * show_zebra_fpm_stats
1533  */
1534 DEFUN (show_zebra_fpm_stats,
1535        show_zebra_fpm_stats_cmd,
1536        "show zebra fpm stats",
1537        SHOW_STR
1538        "Zebra information\n"
1539        "Forwarding Path Manager information\n"
1540        "Statistics\n")
1541 {
1542   zfpm_show_stats (vty);
1543   return CMD_SUCCESS;
1544 }
1545
1546 /*
1547  * clear_zebra_fpm_stats
1548  */
1549 DEFUN (clear_zebra_fpm_stats,
1550        clear_zebra_fpm_stats_cmd,
1551        "clear zebra fpm stats",
1552        CLEAR_STR
1553        "Zebra information\n"
1554        "Clear Forwarding Path Manager information\n"
1555        "Statistics\n")
1556 {
1557   zfpm_clear_stats (vty);
1558   return CMD_SUCCESS;
1559 }
1560
1561 /*
1562  * update fpm connection information 
1563  */
1564 DEFUN ( fpm_remote_ip, 
1565         fpm_remote_ip_cmd,
1566         "fpm connection ip A.B.C.D port <1-65535>",
1567         "fpm connection remote ip and port\n"
1568         "Remote fpm server ip A.B.C.D\n"
1569         "Enter ip ")
1570 {
1571
1572    in_addr_t fpm_server;
1573    uint32_t port_no;
1574
1575    fpm_server = inet_addr (argv[0]);
1576    if (fpm_server == INADDR_NONE)
1577      return CMD_ERR_INCOMPLETE;
1578
1579    port_no = atoi (argv[1]);
1580    if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1581      return CMD_ERR_INCOMPLETE;
1582
1583    zfpm_g->fpm_server = fpm_server;
1584    zfpm_g->fpm_port = port_no;
1585
1586
1587    return CMD_SUCCESS;
1588 }
1589
1590 DEFUN ( no_fpm_remote_ip, 
1591         no_fpm_remote_ip_cmd,
1592         "no fpm connection ip A.B.C.D port <1-65535>",
1593         "fpm connection remote ip and port\n"
1594         "Connection\n"
1595         "Remote fpm server ip A.B.C.D\n"
1596         "Enter ip ")
1597 {
1598    if (zfpm_g->fpm_server != inet_addr (argv[0]) || 
1599               zfpm_g->fpm_port !=  atoi (argv[1]))
1600        return CMD_ERR_NO_MATCH;
1601
1602    zfpm_g->fpm_server = FPM_DEFAULT_IP;
1603    zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1604
1605    return CMD_SUCCESS;
1606 }
1607
1608
1609 /*
1610  * zfpm_init_message_format
1611  */
1612 static inline void
1613 zfpm_init_message_format (const char *format)
1614 {
1615   int have_netlink, have_protobuf;
1616
1617   have_netlink = have_protobuf = 0;
1618
1619 #ifdef HAVE_NETLINK
1620   have_netlink = 1;
1621 #endif
1622
1623 #ifdef HAVE_PROTOBUF
1624   have_protobuf = 1;
1625 #endif
1626
1627   zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1628
1629   if (!format)
1630     {
1631       if (have_netlink)
1632         {
1633           zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1634         }
1635       else if (have_protobuf)
1636         {
1637           zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1638         }
1639       return;
1640     }
1641
1642   if (!strcmp ("netlink", format))
1643     {
1644       if (!have_netlink)
1645         {
1646           zlog_err ("FPM netlink message format is not available");
1647           return;
1648         }
1649       zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1650       return;
1651     }
1652
1653   if (!strcmp ("protobuf", format))
1654     {
1655       if (!have_protobuf)
1656         {
1657           zlog_err ("FPM protobuf message format is not available");
1658           return;
1659         }
1660       zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1661       return;
1662     }
1663
1664   zlog_warn ("Unknown fpm format '%s'", format);
1665 }
1666
1667 /**
1668  * fpm_remote_srv_write 
1669  *
1670  * Module to write remote fpm connection 
1671  *
1672  * Returns ZERO on success.
1673  */
1674
1675 int fpm_remote_srv_write (struct vty *vty )
1676 {
1677    struct in_addr in;
1678
1679    in.s_addr = zfpm_g->fpm_server;
1680
1681    if (zfpm_g->fpm_server != FPM_DEFAULT_IP || 
1682           zfpm_g->fpm_port != FPM_DEFAULT_PORT)
1683       vty_out (vty,"fpm connection ip %s port %d%s", inet_ntoa (in),zfpm_g->fpm_port,VTY_NEWLINE);
1684
1685    return 0;
1686 }
1687
1688
1689 /**
1690  * zfpm_init
1691  *
1692  * One-time initialization of the Zebra FPM module.
1693  *
1694  * @param[in] port port at which FPM is running.
1695  * @param[in] enable TRUE if the zebra FPM module should be enabled
1696  * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1697  *
1698  * Returns TRUE on success.
1699  */
1700 int
1701 zfpm_init (struct thread_master *master, int enable, uint16_t port,
1702            const char *format)
1703 {
1704   static int initialized = 0;
1705
1706   if (initialized) {
1707     return 1;
1708   }
1709
1710   initialized = 1;
1711
1712   memset (zfpm_g, 0, sizeof (*zfpm_g));
1713   zfpm_g->master = master;
1714   TAILQ_INIT(&zfpm_g->dest_q);
1715   zfpm_g->sock = -1;
1716   zfpm_g->state = ZFPM_STATE_IDLE;
1717
1718   zfpm_stats_init (&zfpm_g->stats);
1719   zfpm_stats_init (&zfpm_g->last_ivl_stats);
1720   zfpm_stats_init (&zfpm_g->cumulative_stats);
1721
1722   install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1723   install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1724   install_element (CONFIG_NODE, &fpm_remote_ip_cmd);
1725   install_element (CONFIG_NODE, &no_fpm_remote_ip_cmd);
1726
1727   zfpm_init_message_format(format);
1728
1729   /*
1730    * Disable FPM interface if no suitable format is available.
1731    */
1732   if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1733       enable = 0;
1734
1735   zfpm_g->enabled = enable;
1736
1737   if (!enable) {
1738     return 1;
1739   }
1740
1741   if (!zfpm_g->fpm_server)
1742      zfpm_g->fpm_server = FPM_DEFAULT_IP;
1743
1744   if (!port)
1745     port = FPM_DEFAULT_PORT;
1746
1747   zfpm_g->fpm_port = port;
1748
1749   zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1750   zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1751
1752   zfpm_start_stats_timer ();
1753   zfpm_start_connect_timer ("initialized");
1754
1755   return 1;
1756 }