X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=8806e9c322bef43b818975bee8f44bc520e3aa53;hp=d79dee3cafd99c1232f2458a985561fb0e7a1875;hb=a4baae1b0463bc4ce65c2a458c4a941e7fabc594;hpb=9363801e2d2069022a05e67066d8f527538946d0 diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index d79dee3ca..8806e9c32 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -28,9 +28,10 @@ #include #include #include +#include #include "consumer.h" -#include "health.h" +#include "health-sessiond.h" #include "ust-app.h" #include "utils.h" @@ -62,8 +63,8 @@ int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len) /* The above call will print a PERROR on error. */ DBG("Error when sending data to consumer on sock %d", fd); /* - * At this point, the socket is not usable anymore thus flagging it - * invalid and closing it. + * At this point, the socket is not usable anymore thus closing it and + * setting the file descriptor to -1 so it is not reused. */ /* This call will PERROR on error. */ @@ -106,8 +107,8 @@ int consumer_socket_recv(struct consumer_socket *socket, void *msg, size_t len) /* The above call will print a PERROR on error. */ DBG("Error when receiving data from the consumer socket %d", fd); /* - * At this point, the socket is not usable anymore thus flagging it - * invalid and closing it. + * At this point, the socket is not usable anymore thus closing it and + * setting the file descriptor to -1 so it is not reused. */ /* This call will PERROR on error. */ @@ -141,7 +142,7 @@ int consumer_recv_status_reply(struct consumer_socket *sock) goto end; } - if (reply.ret_code == LTTNG_OK) { + if (reply.ret_code == LTTCOMM_CONSUMERD_SUCCESS) { /* All good. */ ret = 0; } else { @@ -177,13 +178,14 @@ int consumer_recv_status_channel(struct consumer_socket *sock, } /* An error is possible so don't touch the key and stream_count. */ - if (reply.ret_code != LTTNG_OK) { + if (reply.ret_code != LTTCOMM_CONSUMERD_SUCCESS) { ret = -1; goto end; } *key = reply.key; *stream_count = reply.stream_count; + ret = 0; end: return ret; @@ -781,6 +783,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, int overwrite, unsigned int switch_timer_interval, unsigned int read_timer_interval, + unsigned int live_timer_interval, int output, int type, uint64_t session_id, @@ -809,6 +812,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.overwrite = overwrite; msg->u.ask_channel.switch_timer_interval = switch_timer_interval; msg->u.ask_channel.read_timer_interval = read_timer_interval; + msg->u.ask_channel.live_timer_interval = live_timer_interval; msg->u.ask_channel.output = output; msg->u.ask_channel.type = type; msg->u.ask_channel.session_id = session_id; @@ -852,7 +856,8 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, int type, uint64_t tracefile_size, uint64_t tracefile_count, - unsigned int monitor) + unsigned int monitor, + unsigned int live_timer_interval) { assert(msg); @@ -872,6 +877,7 @@ void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.channel.tracefile_size = tracefile_size; msg->u.channel.tracefile_count = tracefile_count; msg->u.channel.monitor = monitor; + msg->u.channel.live_timer_interval = live_timer_interval; strncpy(msg->u.channel.pathname, pathname, sizeof(msg->u.channel.pathname)); @@ -900,6 +906,19 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.cpu = cpu; } +void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + uint64_t channel_key, uint64_t net_seq_idx) +{ + assert(msg); + + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + msg->cmd_type = cmd; + msg->u.sent_streams.channel_key = channel_key; + msg->u.sent_streams.net_seq_idx = net_seq_idx; +} + /* * Send stream communication structure to the consumer. */ @@ -935,7 +954,8 @@ error: */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, struct lttcomm_relayd_sock *rsock, struct consumer_output *consumer, - enum lttng_stream_type type, uint64_t session_id) + enum lttng_stream_type type, uint64_t session_id, + char *session_name, char *hostname, int session_live_timer) { int ret; struct lttcomm_consumer_msg msg; @@ -951,6 +971,18 @@ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, goto error; } + if (type == LTTNG_STREAM_CONTROL) { + ret = relayd_create_session(rsock, + &msg.u.relayd_sock.relayd_session_id, + session_name, hostname, session_live_timer, + consumer->snapshot); + if (ret < 0) { + /* Close the control socket. */ + (void) relayd_close(rsock); + goto error; + } + } + msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET; /* * Assign network consumer output index using the temporary consumer since