2 * Main implementation file for interface to Forwarding Plane Manager.
4 * Copyright (C) 2012 by Open Source Routing.
5 * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
7 * This file is part of GNU Zebra.
9 * GNU Zebra is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2, or (at your option) any
14 * GNU Zebra is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with GNU Zebra; see the file COPYING. If not, write to the Free
21 * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
33 #include "zebra/rib.h"
36 #include "zebra_fpm.h"
37 #include "zebra_fpm_private.h"
40 * Interval at which we attempt to connect to the FPM.
42 #define ZFPM_CONNECT_RETRY_IVL 5
45 * Sizes of outgoing and incoming stream buffers for writing/reading
48 #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
49 #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
52 * The maximum number of times the FPM socket write callback can call
53 * 'write' before it yields.
55 #define ZFPM_MAX_WRITES_PER_RUN 10
58 * Interval over which we collect statistics.
60 #define ZFPM_STATS_IVL_SECS 10
63 * Structure that holds state for iterating over all route_node
64 * structures that are candidates for being communicated to the FPM.
66 typedef struct zfpm_rnodes_iter_t_
68 rib_tables_iter_t tables_iter;
69 route_table_iter_t iter;
75 typedef struct zfpm_stats_t_ {
76 unsigned long connect_calls;
77 unsigned long connect_no_sock;
79 unsigned long read_cb_calls;
81 unsigned long write_cb_calls;
82 unsigned long write_calls;
83 unsigned long partial_writes;
84 unsigned long max_writes_hit;
85 unsigned long t_write_yields;
87 unsigned long nop_deletes_skipped;
88 unsigned long route_adds;
89 unsigned long route_dels;
91 unsigned long updates_triggered;
92 unsigned long redundant_triggers;
93 unsigned long non_fpm_table_triggers;
95 unsigned long dests_del_after_update;
97 unsigned long t_conn_down_starts;
98 unsigned long t_conn_down_dests_processed;
99 unsigned long t_conn_down_yields;
100 unsigned long t_conn_down_finishes;
102 unsigned long t_conn_up_starts;
103 unsigned long t_conn_up_dests_processed;
104 unsigned long t_conn_up_yields;
105 unsigned long t_conn_up_aborts;
106 unsigned long t_conn_up_finishes;
111 * States for the FPM state machine.
116 * In this state we are not yet ready to connect to the FPM. This
117 * can happen when this module is disabled, or if we're cleaning up
118 * after a connection has gone down.
123 * Ready to talk to the FPM and periodically trying to connect to
129 * In the middle of bringing up a TCP connection. Specifically,
130 * waiting for a connect() call to complete asynchronously.
132 ZFPM_STATE_CONNECTING,
135 * TCP connection to the FPM is up.
137 ZFPM_STATE_ESTABLISHED
142 * Message format to be used to communicate with the FPM.
146 ZFPM_MSG_FORMAT_NONE,
147 ZFPM_MSG_FORMAT_NETLINK,
148 ZFPM_MSG_FORMAT_PROTOBUF,
153 typedef struct zfpm_glob_t_
157 * True if the FPM module has been enabled.
162 * Message format to be used to communicate with the fpm.
164 zfpm_msg_format_e message_format;
166 struct thread_master *master;
170 in_addr_t fpm_server;
172 * Port on which the FPM is running.
177 * List of rib_dest_t structures to be processed
179 TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
182 * Stream socket to the FPM.
187 * Buffers for messages to/from the FPM.
195 struct thread *t_connect;
196 struct thread *t_write;
197 struct thread *t_read;
200 * Thread to clean up after the TCP connection to the FPM goes down
201 * and the state that belongs to it.
203 struct thread *t_conn_down;
206 zfpm_rnodes_iter_t iter;
210 * Thread to take actions once the TCP conn to the FPM comes up, and
211 * the state that belongs to it.
213 struct thread *t_conn_up;
216 zfpm_rnodes_iter_t iter;
219 unsigned long connect_calls;
220 time_t last_connect_call_time;
223 * Stats from the start of the current statistics interval up to
224 * now. These are the counters we typically update in the code.
229 * Statistics that were gathered in the last collection interval.
231 zfpm_stats_t last_ivl_stats;
234 * Cumulative stats from the last clear to the start of the current
235 * statistics interval.
237 zfpm_stats_t cumulative_stats;
240 * Stats interval timer.
242 struct thread *t_stats;
245 * If non-zero, the last time when statistics were cleared.
247 time_t last_stats_clear_time;
251 static zfpm_glob_t zfpm_glob_space;
252 static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
254 static int zfpm_read_cb (struct thread *thread);
255 static int zfpm_write_cb (struct thread *thread);
257 static void zfpm_set_state (zfpm_state_t state, const char *reason);
258 static void zfpm_start_connect_timer (const char *reason);
259 static void zfpm_start_stats_timer (void);
262 * zfpm_thread_should_yield
265 zfpm_thread_should_yield (struct thread *t)
267 return thread_should_yield (t);
274 zfpm_state_to_str (zfpm_state_t state)
279 case ZFPM_STATE_IDLE:
282 case ZFPM_STATE_ACTIVE:
285 case ZFPM_STATE_CONNECTING:
288 case ZFPM_STATE_ESTABLISHED:
289 return "established";
304 if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
305 zlog_warn ("FPM: quagga_gettime failed!!");
311 * zfpm_get_elapsed_time
313 * Returns the time elapsed (in seconds) since the given time.
316 zfpm_get_elapsed_time (time_t reference)
320 now = zfpm_get_time ();
328 return now - reference;
332 * zfpm_is_table_for_fpm
334 * Returns TRUE if the the given table is to be communicated to the
338 zfpm_is_table_for_fpm (struct route_table *table)
340 rib_table_info_t *info;
342 info = rib_table_info (table);
345 * We only send the unicast tables in the main instance to the FPM
348 if (info->zvrf->vrf_id != 0)
351 if (info->safi != SAFI_UNICAST)
358 * zfpm_rnodes_iter_init
361 zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
363 memset (iter, 0, sizeof (*iter));
364 rib_tables_iter_init (&iter->tables_iter);
367 * This is a hack, but it makes implementing 'next' easier by
368 * ensuring that route_table_iter_next() will return NULL the first
371 route_table_iter_init (&iter->iter, NULL);
372 route_table_iter_cleanup (&iter->iter);
376 * zfpm_rnodes_iter_next
378 static inline struct route_node *
379 zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
381 struct route_node *rn;
382 struct route_table *table;
386 rn = route_table_iter_next (&iter->iter);
391 * We've made our way through this table, go to the next one.
393 route_table_iter_cleanup (&iter->iter);
395 while ((table = rib_tables_iter_next (&iter->tables_iter)))
397 if (zfpm_is_table_for_fpm (table))
404 route_table_iter_init (&iter->iter, table);
411 * zfpm_rnodes_iter_pause
414 zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
416 route_table_iter_pause (&iter->iter);
420 * zfpm_rnodes_iter_cleanup
423 zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
425 route_table_iter_cleanup (&iter->iter);
426 rib_tables_iter_cleanup (&iter->tables_iter);
432 * Initialize a statistics block.
435 zfpm_stats_init (zfpm_stats_t *stats)
437 memset (stats, 0, sizeof (*stats));
444 zfpm_stats_reset (zfpm_stats_t *stats)
446 zfpm_stats_init (stats);
453 zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
455 memcpy (dest, src, sizeof (*dest));
461 * Total up the statistics in two stats structures ('s1 and 's2') and
462 * return the result in the third argument, 'result'. Note that the
463 * pointer 'result' may be the same as 's1' or 's2'.
465 * For simplicity, the implementation below assumes that the stats
466 * structure is composed entirely of counters. This can easily be
467 * changed when necessary.
470 zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
471 zfpm_stats_t *result)
473 const unsigned long *p1, *p2;
474 unsigned long *result_p;
477 p1 = (const unsigned long *) s1;
478 p2 = (const unsigned long *) s2;
479 result_p = (unsigned long *) result;
481 num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
483 for (i = 0; i < num_counters; i++)
485 result_p[i] = p1[i] + p2[i];
495 assert (!zfpm_g->t_read);
496 assert (zfpm_g->sock >= 0);
498 THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
508 assert (!zfpm_g->t_write);
509 assert (zfpm_g->sock >= 0);
511 THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
521 THREAD_READ_OFF (zfpm_g->t_read);
528 zfpm_write_off (void)
530 THREAD_WRITE_OFF (zfpm_g->t_write);
534 * zfpm_conn_up_thread_cb
536 * Callback for actions to be taken when the connection to the FPM
540 zfpm_conn_up_thread_cb (struct thread *thread)
542 struct route_node *rnode;
543 zfpm_rnodes_iter_t *iter;
546 assert (zfpm_g->t_conn_up);
547 zfpm_g->t_conn_up = NULL;
549 iter = &zfpm_g->t_conn_up_state.iter;
551 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
553 zfpm_debug ("Connection not up anymore, conn_up thread aborting");
554 zfpm_g->stats.t_conn_up_aborts++;
558 while ((rnode = zfpm_rnodes_iter_next (iter)))
560 dest = rib_dest_from_rnode (rnode);
564 zfpm_g->stats.t_conn_up_dests_processed++;
565 zfpm_trigger_update (rnode, NULL);
571 if (!zfpm_thread_should_yield (thread))
574 zfpm_g->stats.t_conn_up_yields++;
575 zfpm_rnodes_iter_pause (iter);
576 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
577 zfpm_conn_up_thread_cb,
582 zfpm_g->stats.t_conn_up_finishes++;
585 zfpm_rnodes_iter_cleanup (iter);
592 * Called when the connection to the FPM comes up.
595 zfpm_connection_up (const char *detail)
597 assert (zfpm_g->sock >= 0);
600 zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
603 * Start thread to push existing routes to the FPM.
605 assert (!zfpm_g->t_conn_up);
607 zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
609 zfpm_debug ("Starting conn_up thread");
610 zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
611 zfpm_conn_up_thread_cb, 0, 0);
612 zfpm_g->stats.t_conn_up_starts++;
618 * Check if an asynchronous connect() to the FPM is complete.
621 zfpm_connect_check ()
630 slen = sizeof (status);
631 ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
634 if (ret >= 0 && status == 0)
636 zfpm_connection_up ("async connect complete");
641 * getsockopt() failed or indicated an error on the socket.
643 close (zfpm_g->sock);
646 zfpm_start_connect_timer ("getsockopt() after async connect failed");
651 * zfpm_conn_down_thread_cb
653 * Callback that is invoked to clean up state after the TCP connection
654 * to the FPM goes down.
657 zfpm_conn_down_thread_cb (struct thread *thread)
659 struct route_node *rnode;
660 zfpm_rnodes_iter_t *iter;
663 assert (zfpm_g->state == ZFPM_STATE_IDLE);
665 assert (zfpm_g->t_conn_down);
666 zfpm_g->t_conn_down = NULL;
668 iter = &zfpm_g->t_conn_down_state.iter;
670 while ((rnode = zfpm_rnodes_iter_next (iter)))
672 dest = rib_dest_from_rnode (rnode);
676 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
678 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
681 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
682 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
684 zfpm_g->stats.t_conn_down_dests_processed++;
687 * Check if the dest should be deleted.
695 if (!zfpm_thread_should_yield (thread))
698 zfpm_g->stats.t_conn_down_yields++;
699 zfpm_rnodes_iter_pause (iter);
700 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
701 zfpm_conn_down_thread_cb,
706 zfpm_g->stats.t_conn_down_finishes++;
707 zfpm_rnodes_iter_cleanup (iter);
710 * Start the process of connecting to the FPM again.
712 zfpm_start_connect_timer ("cleanup complete");
717 * zfpm_connection_down
719 * Called when the connection to the FPM has gone down.
722 zfpm_connection_down (const char *detail)
727 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
729 zlog_info ("connection to the FPM has gone down: %s", detail);
734 stream_reset (zfpm_g->ibuf);
735 stream_reset (zfpm_g->obuf);
737 if (zfpm_g->sock >= 0) {
738 close (zfpm_g->sock);
743 * Start thread to clean up state after the connection goes down.
745 assert (!zfpm_g->t_conn_down);
746 zfpm_debug ("Starting conn_down thread");
747 zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
748 zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
749 zfpm_conn_down_thread_cb, 0, 0);
750 zfpm_g->stats.t_conn_down_starts++;
752 zfpm_set_state (ZFPM_STATE_IDLE, detail);
759 zfpm_read_cb (struct thread *thread)
766 zfpm_g->stats.read_cb_calls++;
767 assert (zfpm_g->t_read);
768 zfpm_g->t_read = NULL;
771 * Check if async connect is now done.
773 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
775 zfpm_connect_check();
779 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
780 assert (zfpm_g->sock >= 0);
784 already = stream_get_endp (ibuf);
785 if (already < FPM_MSG_HDR_LEN)
789 nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
790 if (nbyte == 0 || nbyte == -1)
792 zfpm_connection_down ("closed socket in read");
796 if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
799 already = FPM_MSG_HDR_LEN;
802 stream_set_getp (ibuf, 0);
804 hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
806 if (!fpm_msg_hdr_ok (hdr))
808 zfpm_connection_down ("invalid message header");
812 msg_len = fpm_msg_len (hdr);
815 * Read out the rest of the packet.
817 if (already < msg_len)
821 nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
823 if (nbyte == 0 || nbyte == -1)
825 zfpm_connection_down ("failed to read message");
829 if (nbyte != (ssize_t) (msg_len - already))
833 zfpm_debug ("Read out a full fpm message");
836 * Just throw it away for now.
846 * zfpm_writes_pending
848 * Returns TRUE if we may have something to write to the FPM.
851 zfpm_writes_pending (void)
855 * Check if there is any data in the outbound buffer that has not
856 * been written to the socket yet.
858 if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
862 * Check if there are any prefixes on the outbound queue.
864 if (!TAILQ_EMPTY (&zfpm_g->dest_q))
873 * Encode a message to the FPM with information about the given route.
875 * Returns the number of bytes written to the buffer. 0 or a negative
876 * value indicates an error.
879 zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
880 size_t in_buf_len, fpm_msg_type_e *msg_type)
886 *msg_type = FPM_MSG_TYPE_NONE;
888 switch (zfpm_g->message_format) {
890 case ZFPM_MSG_FORMAT_PROTOBUF:
892 len = zfpm_protobuf_encode_route (dest, rib, (uint8_t *) in_buf,
894 *msg_type = FPM_MSG_TYPE_PROTOBUF;
898 case ZFPM_MSG_FORMAT_NETLINK:
900 *msg_type = FPM_MSG_TYPE_NETLINK;
901 cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
902 len = zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
903 assert(fpm_msg_align(len) == len);
904 *msg_type = FPM_MSG_TYPE_NETLINK;
905 #endif /* HAVE_NETLINK */
917 * zfpm_route_for_update
919 * Returns the rib that is to be sent to the FPM for a given dest.
922 zfpm_route_for_update (rib_dest_t *dest)
926 RIB_DEST_FOREACH_ROUTE (dest, rib)
928 if (!CHECK_FLAG (rib->status, RIB_ENTRY_SELECTED_FIB))
935 * We have no route for this destination.
943 * Process the outgoing queue and write messages to the outbound
947 zfpm_build_updates (void)
951 unsigned char *buf, *data, *buf_end;
956 int is_add, write_msg;
957 fpm_msg_type_e msg_type;
961 assert (stream_empty (s));
966 * Make sure there is enough space to write another message.
968 if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
971 buf = STREAM_DATA (s) + stream_get_endp (s);
972 buf_end = buf + STREAM_WRITEABLE (s);
974 dest = TAILQ_FIRST (&zfpm_g->dest_q);
978 assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
980 hdr = (fpm_msg_hdr_t *) buf;
981 hdr->version = FPM_PROTO_VERSION;
983 data = fpm_msg_data (hdr);
985 rib = zfpm_route_for_update (dest);
986 is_add = rib ? 1 : 0;
991 * If this is a route deletion, and we have not sent the route to
992 * the FPM previously, skip it.
994 if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
997 zfpm_g->stats.nop_deletes_skipped++;
1001 data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data,
1007 hdr->msg_type = msg_type;
1008 msg_len = fpm_data_len_to_msg_len (data_len);
1009 hdr->msg_len = htons (msg_len);
1010 stream_forward_endp (s, msg_len);
1013 zfpm_g->stats.route_adds++;
1015 zfpm_g->stats.route_dels++;
1020 * Remove the dest from the queue, and reset the flag.
1022 UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1023 TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
1027 SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1031 UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
1035 * Delete the destination if necessary.
1037 if (rib_gc_dest (dest->rnode))
1038 zfpm_g->stats.dests_del_after_update++;
1048 zfpm_write_cb (struct thread *thread)
1053 zfpm_g->stats.write_cb_calls++;
1054 assert (zfpm_g->t_write);
1055 zfpm_g->t_write = NULL;
1058 * Check if async connect is now done.
1060 if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1062 zfpm_connect_check ();
1066 assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1067 assert (zfpm_g->sock >= 0);
1073 int bytes_to_write, bytes_written;
1078 * If the stream is empty, try fill it up with data.
1080 if (stream_empty (s))
1082 zfpm_build_updates ();
1085 bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1086 if (!bytes_to_write)
1089 bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1090 zfpm_g->stats.write_calls++;
1093 if (bytes_written < 0)
1095 if (ERRNO_IO_RETRY (errno))
1098 zfpm_connection_down ("failed to write to socket");
1102 if (bytes_written != bytes_to_write)
1108 stream_forward_getp (s, bytes_written);
1109 zfpm_g->stats.partial_writes++;
1114 * We've written out the entire contents of the stream.
1118 if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1120 zfpm_g->stats.max_writes_hit++;
1124 if (zfpm_thread_should_yield (thread))
1126 zfpm_g->stats.t_write_yields++;
1131 if (zfpm_writes_pending ())
1141 zfpm_connect_cb (struct thread *t)
1144 struct sockaddr_in serv;
1146 assert (zfpm_g->t_connect);
1147 zfpm_g->t_connect = NULL;
1148 assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1150 sock = socket (AF_INET, SOCK_STREAM, 0);
1153 zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1154 zfpm_g->stats.connect_no_sock++;
1158 set_nonblocking(sock);
1160 /* Make server socket. */
1161 memset (&serv, 0, sizeof (serv));
1162 serv.sin_family = AF_INET;
1163 serv.sin_port = htons (zfpm_g->fpm_port);
1164 #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1165 serv.sin_len = sizeof (struct sockaddr_in);
1166 #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1167 if (!zfpm_g->fpm_server)
1168 serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1170 serv.sin_addr.s_addr = (zfpm_g->fpm_server);
1173 * Connect to the FPM.
1175 zfpm_g->connect_calls++;
1176 zfpm_g->stats.connect_calls++;
1177 zfpm_g->last_connect_call_time = zfpm_get_time ();
1179 ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1182 zfpm_g->sock = sock;
1183 zfpm_connection_up ("connect succeeded");
1187 if (errno == EINPROGRESS)
1189 zfpm_g->sock = sock;
1192 zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1196 zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1200 * Restart timer for retrying connection.
1202 zfpm_start_connect_timer ("connect() failed");
1209 * Move state machine into the given state.
1212 zfpm_set_state (zfpm_state_t state, const char *reason)
1214 zfpm_state_t cur_state = zfpm_g->state;
1219 if (state == cur_state)
1222 zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1223 zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1228 case ZFPM_STATE_IDLE:
1229 assert (cur_state == ZFPM_STATE_ESTABLISHED);
1232 case ZFPM_STATE_ACTIVE:
1233 assert (cur_state == ZFPM_STATE_IDLE ||
1234 cur_state == ZFPM_STATE_CONNECTING);
1235 assert (zfpm_g->t_connect);
1238 case ZFPM_STATE_CONNECTING:
1239 assert (zfpm_g->sock);
1240 assert (cur_state == ZFPM_STATE_ACTIVE);
1241 assert (zfpm_g->t_read);
1242 assert (zfpm_g->t_write);
1245 case ZFPM_STATE_ESTABLISHED:
1246 assert (cur_state == ZFPM_STATE_ACTIVE ||
1247 cur_state == ZFPM_STATE_CONNECTING);
1248 assert (zfpm_g->sock);
1249 assert (zfpm_g->t_read);
1250 assert (zfpm_g->t_write);
1254 zfpm_g->state = state;
1258 * zfpm_calc_connect_delay
1260 * Returns the number of seconds after which we should attempt to
1261 * reconnect to the FPM.
1264 zfpm_calc_connect_delay (void)
1269 * Return 0 if this is our first attempt to connect.
1271 if (zfpm_g->connect_calls == 0)
1276 elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1278 if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1282 return ZFPM_CONNECT_RETRY_IVL - elapsed;
1286 * zfpm_start_connect_timer
1289 zfpm_start_connect_timer (const char *reason)
1293 assert (!zfpm_g->t_connect);
1294 assert (zfpm_g->sock < 0);
1296 assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1297 zfpm_g->state == ZFPM_STATE_ACTIVE ||
1298 zfpm_g->state == ZFPM_STATE_CONNECTING);
1300 delay_secs = zfpm_calc_connect_delay();
1301 zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1303 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1305 zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1311 * Returns TRUE if the zebra FPM module has been enabled.
1314 zfpm_is_enabled (void)
1316 return zfpm_g->enabled;
1322 * Returns TRUE if the connection to the FPM is up.
1325 zfpm_conn_is_up (void)
1327 if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1330 assert (zfpm_g->sock >= 0);
1336 * zfpm_trigger_update
1338 * The zebra code invokes this function to indicate that we should
1339 * send an update to the FPM about the given route_node.
1342 zfpm_trigger_update (struct route_node *rn, const char *reason)
1345 char buf[PREFIX_STRLEN];
1348 * Ignore if the connection is down. We will update the FPM about
1349 * all destinations once the connection comes up.
1351 if (!zfpm_conn_is_up ())
1354 dest = rib_dest_from_rnode (rn);
1357 * Ignore the trigger if the dest is not in a table that we would
1360 if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1362 zfpm_g->stats.non_fpm_table_triggers++;
1366 if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1367 zfpm_g->stats.redundant_triggers++;
1373 zfpm_debug ("%s triggering update to FPM - Reason: %s",
1374 prefix2str (&rn->p, buf, sizeof(buf)), reason);
1377 SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1378 TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1379 zfpm_g->stats.updates_triggered++;
1382 * Make sure that writes are enabled.
1384 if (zfpm_g->t_write)
1391 * zfpm_stats_timer_cb
1394 zfpm_stats_timer_cb (struct thread *t)
1396 assert (zfpm_g->t_stats);
1397 zfpm_g->t_stats = NULL;
1400 * Remember the stats collected in the last interval for display
1403 zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1406 * Add the current set of stats into the cumulative statistics.
1408 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1409 &zfpm_g->cumulative_stats);
1412 * Start collecting stats afresh over the next interval.
1414 zfpm_stats_reset (&zfpm_g->stats);
1416 zfpm_start_stats_timer ();
1422 * zfpm_stop_stats_timer
1425 zfpm_stop_stats_timer (void)
1427 if (!zfpm_g->t_stats)
1430 zfpm_debug ("Stopping existing stats timer");
1431 THREAD_TIMER_OFF (zfpm_g->t_stats);
1435 * zfpm_start_stats_timer
1438 zfpm_start_stats_timer (void)
1440 assert (!zfpm_g->t_stats);
1442 THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1443 ZFPM_STATS_IVL_SECS);
1447 * Helper macro for zfpm_show_stats() below.
1449 #define ZFPM_SHOW_STAT(counter) \
1451 vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1452 zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
1459 zfpm_show_stats (struct vty *vty)
1461 zfpm_stats_t total_stats;
1464 vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1465 "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1468 * Compute the total stats up to this instant.
1470 zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1473 ZFPM_SHOW_STAT (connect_calls);
1474 ZFPM_SHOW_STAT (connect_no_sock);
1475 ZFPM_SHOW_STAT (read_cb_calls);
1476 ZFPM_SHOW_STAT (write_cb_calls);
1477 ZFPM_SHOW_STAT (write_calls);
1478 ZFPM_SHOW_STAT (partial_writes);
1479 ZFPM_SHOW_STAT (max_writes_hit);
1480 ZFPM_SHOW_STAT (t_write_yields);
1481 ZFPM_SHOW_STAT (nop_deletes_skipped);
1482 ZFPM_SHOW_STAT (route_adds);
1483 ZFPM_SHOW_STAT (route_dels);
1484 ZFPM_SHOW_STAT (updates_triggered);
1485 ZFPM_SHOW_STAT (non_fpm_table_triggers);
1486 ZFPM_SHOW_STAT (redundant_triggers);
1487 ZFPM_SHOW_STAT (dests_del_after_update);
1488 ZFPM_SHOW_STAT (t_conn_down_starts);
1489 ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1490 ZFPM_SHOW_STAT (t_conn_down_yields);
1491 ZFPM_SHOW_STAT (t_conn_down_finishes);
1492 ZFPM_SHOW_STAT (t_conn_up_starts);
1493 ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1494 ZFPM_SHOW_STAT (t_conn_up_yields);
1495 ZFPM_SHOW_STAT (t_conn_up_aborts);
1496 ZFPM_SHOW_STAT (t_conn_up_finishes);
1498 if (!zfpm_g->last_stats_clear_time)
1501 elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1503 vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1504 (unsigned long) elapsed, VTY_NEWLINE);
1511 zfpm_clear_stats (struct vty *vty)
1513 if (!zfpm_is_enabled ())
1515 vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1519 zfpm_stats_reset (&zfpm_g->stats);
1520 zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1521 zfpm_stats_reset (&zfpm_g->cumulative_stats);
1523 zfpm_stop_stats_timer ();
1524 zfpm_start_stats_timer ();
1526 zfpm_g->last_stats_clear_time = zfpm_get_time();
1528 vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1532 * show_zebra_fpm_stats
1534 DEFUN (show_zebra_fpm_stats,
1535 show_zebra_fpm_stats_cmd,
1536 "show zebra fpm stats",
1538 "Zebra information\n"
1539 "Forwarding Path Manager information\n"
1542 zfpm_show_stats (vty);
1547 * clear_zebra_fpm_stats
1549 DEFUN (clear_zebra_fpm_stats,
1550 clear_zebra_fpm_stats_cmd,
1551 "clear zebra fpm stats",
1553 "Zebra information\n"
1554 "Clear Forwarding Path Manager information\n"
1557 zfpm_clear_stats (vty);
1562 * update fpm connection information
1564 DEFUN ( fpm_remote_ip,
1566 "fpm connection ip A.B.C.D port <1-65535>",
1567 "fpm connection remote ip and port\n"
1568 "Remote fpm server ip A.B.C.D\n"
1572 in_addr_t fpm_server;
1575 fpm_server = inet_addr (argv[0]);
1576 if (fpm_server == INADDR_NONE)
1577 return CMD_ERR_INCOMPLETE;
1579 port_no = atoi (argv[1]);
1580 if (port_no < TCP_MIN_PORT || port_no > TCP_MAX_PORT)
1581 return CMD_ERR_INCOMPLETE;
1583 zfpm_g->fpm_server = fpm_server;
1584 zfpm_g->fpm_port = port_no;
1590 DEFUN ( no_fpm_remote_ip,
1591 no_fpm_remote_ip_cmd,
1592 "no fpm connection ip A.B.C.D port <1-65535>",
1593 "fpm connection remote ip and port\n"
1595 "Remote fpm server ip A.B.C.D\n"
1598 if (zfpm_g->fpm_server != inet_addr (argv[0]) ||
1599 zfpm_g->fpm_port != atoi (argv[1]))
1600 return CMD_ERR_NO_MATCH;
1602 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1603 zfpm_g->fpm_port = FPM_DEFAULT_PORT;
1610 * zfpm_init_message_format
1613 zfpm_init_message_format (const char *format)
1615 int have_netlink, have_protobuf;
1617 have_netlink = have_protobuf = 0;
1623 #ifdef HAVE_PROTOBUF
1627 zfpm_g->message_format = ZFPM_MSG_FORMAT_NONE;
1633 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1635 else if (have_protobuf)
1637 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1642 if (!strcmp ("netlink", format))
1646 zlog_err ("FPM netlink message format is not available");
1649 zfpm_g->message_format = ZFPM_MSG_FORMAT_NETLINK;
1653 if (!strcmp ("protobuf", format))
1657 zlog_err ("FPM protobuf message format is not available");
1660 zfpm_g->message_format = ZFPM_MSG_FORMAT_PROTOBUF;
1664 zlog_warn ("Unknown fpm format '%s'", format);
1668 * fpm_remote_srv_write
1670 * Module to write remote fpm connection
1672 * Returns ZERO on success.
1675 int fpm_remote_srv_write (struct vty *vty )
1679 in.s_addr = zfpm_g->fpm_server;
1681 if (zfpm_g->fpm_server != FPM_DEFAULT_IP ||
1682 zfpm_g->fpm_port != FPM_DEFAULT_PORT)
1683 vty_out (vty,"fpm connection ip %s port %d%s", inet_ntoa (in),zfpm_g->fpm_port,VTY_NEWLINE);
1692 * One-time initialization of the Zebra FPM module.
1694 * @param[in] port port at which FPM is running.
1695 * @param[in] enable TRUE if the zebra FPM module should be enabled
1696 * @param[in] format to use to talk to the FPM. Can be 'netink' or 'protobuf'.
1698 * Returns TRUE on success.
1701 zfpm_init (struct thread_master *master, int enable, uint16_t port,
1704 static int initialized = 0;
1712 memset (zfpm_g, 0, sizeof (*zfpm_g));
1713 zfpm_g->master = master;
1714 TAILQ_INIT(&zfpm_g->dest_q);
1716 zfpm_g->state = ZFPM_STATE_IDLE;
1718 zfpm_stats_init (&zfpm_g->stats);
1719 zfpm_stats_init (&zfpm_g->last_ivl_stats);
1720 zfpm_stats_init (&zfpm_g->cumulative_stats);
1722 install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1723 install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1724 install_element (CONFIG_NODE, &fpm_remote_ip_cmd);
1725 install_element (CONFIG_NODE, &no_fpm_remote_ip_cmd);
1727 zfpm_init_message_format(format);
1730 * Disable FPM interface if no suitable format is available.
1732 if (zfpm_g->message_format == ZFPM_MSG_FORMAT_NONE)
1735 zfpm_g->enabled = enable;
1741 if (!zfpm_g->fpm_server)
1742 zfpm_g->fpm_server = FPM_DEFAULT_IP;
1745 port = FPM_DEFAULT_PORT;
1747 zfpm_g->fpm_port = port;
1749 zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1750 zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1752 zfpm_start_stats_timer ();
1753 zfpm_start_connect_timer ("initialized");