From: David Goulet Date: Thu, 26 Jul 2012 18:54:20 +0000 (-0400) Subject: Add consumer socket object and relayd commands X-Git-Tag: v2.1.0-rc1~43 X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=commitdiff_plain;h=173af62f4804133d4a7f45e34b6f72126f3eca5f Add consumer socket object and relayd commands This commit adds a consumer_socket object and a lock to access it. This fixes the possible concurrent access to the consumer(s) socket between threads in the session daemon. This also introduce the use of a sequence number during streaming so the relayd can know when to close the trace file according to the sent versus the expected sequence number. Introduce the close_stream command between the consumer and relayd. Upon a destroy session or a stream that hung up, the consumer now sends the close stream command indicating the last sequence number. The consumer never closes the relayd sockets unless a destroy relayd command (new) is received from the session daemon which occurs on a destroy session command. Some fixes here and there but overall the streaming synchronization issues have been resolved. Signed-off-by: David Goulet Signed-off-by: Mathieu Desnoyers --- diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index bec13496c..f231f0f6c 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -50,11 +50,15 @@ struct relay_session { */ struct relay_stream { uint64_t stream_handle; - uint64_t seq; + uint64_t prev_seq; /* previous data sequence number encountered */ struct lttng_ht_node_ulong stream_n; struct relay_session *session; struct rcu_head rcu_node; int fd; + + /* Information telling us when to close the stream */ + unsigned int close_flag:1; + uint64_t last_net_seq_num; }; /* diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 1ef6881c0..ae631c70a 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -446,6 +447,19 @@ error: return NULL; } +/* + * Return nonzero if stream needs to be closed. + */ +static +int close_stream_check(struct relay_stream *stream) +{ + + if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) { + return 1; + } + return 0; +} + /* * This thread manages the listening for new connections on the network */ @@ -922,7 +936,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, rcu_read_lock(); stream->stream_handle = ++last_relay_stream_id; - stream->seq = 0; + stream->prev_seq = -1ULL; stream->session = session; root_path = create_output_path(stream_info.pathname); @@ -979,6 +993,85 @@ end_no_session: return ret; } +/* + * relay_close_stream: close a specific stream + */ +static +int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht) +{ + struct relay_session *session = cmd->session; + struct lttcomm_relayd_close_stream stream_info; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + int ret, send_ret; + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + + DBG("Close stream received"); + + if (!session || session->version_check_done == 0) { + ERR("Trying to close a stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, + sizeof(struct lttcomm_relayd_close_stream), MSG_WAITALL); + if (ret < sizeof(struct lttcomm_relayd_close_stream)) { + ERR("Relay didn't receive valid add_stream struct size : %d", ret); + ret = -1; + goto end_no_session; + } + + rcu_read_lock(); + lttng_ht_lookup(streams_ht, + (void *)((unsigned long) be64toh(stream_info.stream_id)), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay stream %lu not found", be64toh(stream_info.stream_id)); + ret = -1; + goto end_unlock; + } + + stream = caa_container_of(node, struct relay_stream, stream_n); + if (!stream) { + ret = -1; + goto end_unlock; + } + + stream->close_flag = 1; + + if (close_stream_check(stream)) { + int delret; + + close(stream->fd); + delret = lttng_ht_del(streams_ht, &iter); + assert(!delret); + call_rcu(&stream->rcu_node, + deferred_free_stream); + DBG("Closed tracefile %d from close stream", stream->fd); + } + +end_unlock: + rcu_read_unlock(); + + if (ret < 0) { + reply.ret_code = htobe32(LTTCOMM_ERR); + } else { + reply.ret_code = htobe32(LTTCOMM_OK); + } + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, + sizeof(struct lttcomm_relayd_generic_reply), 0); + if (send_ret < 0) { + ERR("Relay sending stream id"); + } + +end_no_session: + return ret; +} + /* * relay_unknown_command: send -1 if received unknown command */ @@ -1188,6 +1281,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_VERSION: ret = relay_send_version(recv_hdr, cmd); break; + case RELAYD_CLOSE_STREAM: + ret = relay_close_stream(recv_hdr, cmd, streams_ht); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -1210,6 +1306,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) struct relay_stream *stream; struct lttcomm_relayd_data_hdr data_hdr; uint64_t stream_id; + uint64_t net_seq_num; uint32_t data_size; ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr, @@ -1241,7 +1338,10 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) } memset(data_buffer, 0, data_size); - DBG3("Receiving data of size %u for stream id %zu", data_size, stream_id); + net_seq_num = be64toh(data_hdr.net_seq_num); + + DBG3("Receiving data of size %u for stream id %zu seqnum %" PRIu64, + data_size, stream_id, net_seq_num); ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL); if (ret <= 0) { ret = -1; @@ -1258,6 +1358,21 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) } DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle); + stream->prev_seq = net_seq_num; + + /* Check if we need to close the FD */ + if (close_stream_check(stream)) { + struct lttng_ht_iter iter; + + close(stream->fd); + iter.iter.node = &stream->stream_n.node; + ret = lttng_ht_del(streams_ht, &iter); + assert(!ret); + call_rcu(&stream->rcu_node, + deferred_free_stream); + DBG("Closed tracefile %d after recv data", stream->fd); + } + end_unlock: rcu_read_unlock(); end: diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 72450d50c..7071974a1 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -30,6 +30,113 @@ #include "consumer.h" +/* + * Find a consumer_socket in a consumer_output hashtable. Read side lock must + * be acquired before calling this function and across use of the + * returned consumer_socket. + */ +struct consumer_socket *consumer_find_socket(int key, + struct consumer_output *consumer) +{ + struct lttng_ht_iter iter; + struct lttng_ht_node_ulong *node; + struct consumer_socket *socket = NULL; + + /* Negative keys are lookup failures */ + if (key < 0) { + return NULL; + } + + lttng_ht_lookup(consumer->socks, (void *)((unsigned long) key), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node != NULL) { + socket = caa_container_of(node, struct consumer_socket, node); + } + + return socket; +} + +/* + * Allocate a new consumer_socket and return the pointer. + */ +struct consumer_socket *consumer_allocate_socket(int fd) +{ + struct consumer_socket *socket = NULL; + + socket = zmalloc(sizeof(struct consumer_socket)); + if (socket == NULL) { + PERROR("zmalloc consumer socket"); + goto error; + } + + socket->fd = fd; + lttng_ht_node_init_ulong(&socket->node, fd); + +error: + return socket; +} + +/* + * Add consumer socket to consumer output object. Read side lock must be + * acquired before calling this function. + */ +void consumer_add_socket(struct consumer_socket *sock, + struct consumer_output *consumer) +{ + assert(sock); + assert(consumer); + + lttng_ht_add_unique_ulong(consumer->socks, &sock->node); +} + +/* + * Delte consumer socket to consumer output object. Read side lock must be + * acquired before calling this function. + */ +void consumer_del_socket(struct consumer_socket *sock, + struct consumer_output *consumer) +{ + int ret; + struct lttng_ht_iter iter; + + assert(sock); + assert(consumer); + + iter.iter.node = &sock->node.node; + ret = lttng_ht_del(consumer->socks, &iter); + assert(!ret); +} + +/* + * RCU destroy call function. + */ +static void destroy_socket_rcu(struct rcu_head *head) +{ + struct lttng_ht_node_ulong *node = + caa_container_of(head, struct lttng_ht_node_ulong, head); + struct consumer_socket *socket = + caa_container_of(node, struct consumer_socket, node); + + free(socket); +} + +/* + * Destroy and free socket pointer in a call RCU. Read side lock must be + * acquired before calling this function. + */ +void consumer_destroy_socket(struct consumer_socket *sock) +{ + assert(sock); + + /* + * We DO NOT close the file descriptor here since it is global to the + * session daemon and is closed only if the consumer dies. + */ + + call_rcu(&sock->node.head, destroy_socket_rcu); +} + /* * Allocate and assign data to a consumer_output object. * @@ -49,11 +156,8 @@ struct consumer_output *consumer_create_output(enum consumer_dst_type type) output->enabled = 1; output->type = type; output->net_seq_index = -1; - /* - * Important to keep it to a negative value on creation since it was zeroed - * during allocation and the file descriptor 0 is a valid one. - */ - output->sock = -1; + + output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); error: return output; @@ -68,9 +172,15 @@ void consumer_destroy_output(struct consumer_output *obj) return; } - if (obj->sock >= 0) { - (void) close(obj->sock); + if (obj->socks) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { + consumer_destroy_socket(socket); + } } + free(obj); } @@ -79,6 +189,8 @@ void consumer_destroy_output(struct consumer_output *obj) */ struct consumer_output *consumer_copy_output(struct consumer_output *obj) { + struct lttng_ht_iter iter; + struct consumer_socket *socket, *copy_sock; struct consumer_output *output; assert(obj); @@ -90,8 +202,26 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) memcpy(output, obj, sizeof(struct consumer_output)); + /* Copy sockets */ + output->socks = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + + cds_lfht_for_each_entry(obj->socks->ht, &iter.iter, socket, node.node) { + /* Create new socket object. */ + copy_sock = consumer_allocate_socket(socket->fd); + if (copy_sock == NULL) { + goto malloc_error; + } + + copy_sock->lock = socket->lock; + consumer_add_socket(copy_sock, output); + } + error: return output; + +malloc_error: + consumer_destroy_output(output); + return NULL; } /* @@ -364,7 +494,7 @@ int consumer_send_relayd_socket(int consumer_sock, msg.u.relayd_sock.type = type; memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock)); - DBG3("Sending relayd sock info to consumer"); + DBG3("Sending relayd sock info to consumer on %d", consumer_sock); ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg)); if (ret < 0) { PERROR("send consumer relayd socket info"); @@ -382,3 +512,43 @@ int consumer_send_relayd_socket(int consumer_sock, error: return ret; } + +/* + * Send destroy relayd command to consumer. + * + * On success return positive value. On error, negative value. + */ +int consumer_send_destroy_relayd(struct consumer_socket *sock, + struct consumer_output *consumer) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(consumer); + assert(sock); + + DBG2("Sending destroy relayd command to consumer..."); + + /* Bail out if consumer is disabled */ + if (!consumer->enabled) { + ret = LTTCOMM_OK; + DBG3("Consumer is disabled"); + goto error; + } + + msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD; + msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index; + + pthread_mutex_lock(sock->lock); + ret = lttcomm_send_unix_sock(sock->fd, &msg, sizeof(msg)); + pthread_mutex_unlock(sock->lock); + if (ret < 0) { + PERROR("send consumer destroy relayd command"); + goto error; + } + + DBG2("Consumer send destroy relayd command done"); + +error: + return ret; +} diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index ab36a685c..ab1765c48 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -21,6 +21,7 @@ #include #include +#include #include #include "health.h" @@ -30,6 +31,16 @@ enum consumer_dst_type { CONSUMER_DST_NET, }; +struct consumer_socket { + /* File descriptor */ + int fd; + /* + * To use this socket (send/recv), this lock MUST be acquired. + */ + pthread_mutex_t *lock; + struct lttng_ht_node_ulong node; +}; + struct consumer_data { enum lttng_consumer_type type; @@ -49,6 +60,9 @@ struct consumer_data { /* Health check of the thread */ struct health_state health; + + /* communication lock */ + pthread_mutex_t lock; }; /* @@ -78,27 +92,44 @@ struct consumer_net { * Consumer output object describing where and how to send data. */ struct consumer_output { - /* Consumer socket file descriptor */ - int sock; /* If the consumer is enabled meaning that should be used */ unsigned int enabled; enum consumer_dst_type type; + /* * The net_seq_index is the index of the network stream on the consumer * side. It's basically the relayd socket file descriptor value so the * consumer can identify which streams goes with which socket. */ int net_seq_index; + /* * Subdirectory path name used for both local and network consumer. */ char subdir[PATH_MAX]; + + /* + * Hashtable of consumer_socket index by the file descriptor value. For + * multiarch consumer support, we can have more than one consumer (ex: 32 + * and 64 bit). + */ + struct lttng_ht *socks; + union { char trace_path[PATH_MAX]; struct consumer_net net; } dst; }; +struct consumer_socket *consumer_find_socket(int key, + struct consumer_output *consumer); +struct consumer_socket *consumer_allocate_socket(int fd); +void consumer_add_socket(struct consumer_socket *sock, + struct consumer_output *consumer); +void consumer_del_socket(struct consumer_socket *sock, + struct consumer_output *consumer); +void consumer_destroy_socket(struct consumer_socket *sock); + struct consumer_output *consumer_create_output(enum consumer_dst_type type); struct consumer_output *consumer_copy_output(struct consumer_output *obj); void consumer_destroy_output(struct consumer_output *obj); @@ -111,6 +142,8 @@ int consumer_send_channel(int sock, struct lttcomm_consumer_msg *msg); int consumer_send_relayd_socket(int consumer_sock, struct lttcomm_sock *sock, struct consumer_output *consumer, enum lttng_stream_type type); +int consumer_send_destroy_relayd(struct consumer_socket *sock, + struct consumer_output *consumer); void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index 484ea304e..9049022f4 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -150,6 +150,12 @@ int kernel_create_channel(struct ltt_kernel_session *session, goto error; } + DBG3("Kernel create channel %s in %s with attr: %d, %zu, %zu, %u, %u, %d", + chan->name, path, lkc->channel->attr.overwrite, + lkc->channel->attr.subbuf_size, lkc->channel->attr.num_subbuf, + lkc->channel->attr.switch_timer_interval, lkc->channel->attr.read_timer_interval, + lkc->channel->attr.output); + /* Kernel tracer channel creation */ ret = kernctl_create_channel(session->fd, &lkc->channel->attr); if (ret < 0) { diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index f6d560c33..c79a47405 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -87,6 +87,8 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer64_data = { .type = LTTNG_CONSUMER64_UST, @@ -94,6 +96,8 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer32_data = { .type = LTTNG_CONSUMER32_UST, @@ -101,6 +105,8 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; /* Shared between threads */ @@ -365,23 +371,56 @@ error: */ static void teardown_kernel_session(struct ltt_session *session) { + int ret; + struct lttng_ht_iter iter; + struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + if (!session->kernel_session) { DBG3("No kernel session when tearing down session"); return; } + ksess = session->kernel_session; + DBG("Tearing down kernel session"); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, ksess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + /* * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (kconsumer_data.cmd_sock >= 0 && - session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { - lttcomm_close_unix_sock(session->kernel_session->consumer_fd); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + if (socket->fd != kconsumer_data.cmd_sock) { + rcu_read_lock(); + consumer_del_socket(socket, ksess->consumer); + lttcomm_close_unix_sock(socket->fd); + consumer_destroy_socket(socket); + rcu_read_unlock(); + } } - trace_kernel_destroy_session(session->kernel_session); + trace_kernel_destroy_session(ksess); } /* @@ -391,20 +430,44 @@ static void teardown_kernel_session(struct ltt_session *session) static void teardown_ust_session(struct ltt_session *session) { int ret; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess; + struct consumer_socket *socket; if (!session->ust_session) { DBG3("No UST session when tearing down session"); return; } + usess = session->ust_session; DBG("Tearing down UST session(s)"); - ret = ust_app_destroy_trace_all(session->ust_session); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, usess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + + ret = ust_app_destroy_trace_all(usess); if (ret) { ERR("Error in ust_app_destroy_trace_all"); } - trace_ust_destroy_session(session->ust_session); + trace_ust_destroy_session(usess); } /* @@ -468,8 +531,6 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumer_data.pid_mutex); - if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); if (kernel_tracer_fd >= 0) { @@ -634,6 +695,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; + struct ltt_kernel_session *ksess; struct ltt_kernel_channel *channel; DBG("Updating kernel streams for channel fd %d", fd); @@ -645,14 +707,9 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) session_unlock(session); continue; } + ksess = session->kernel_session; - /* This is not suppose to be -1 but this is an extra security check */ - if (session->kernel_session->consumer_fd < 0) { - session->kernel_session->consumer_fd = consumer_data->cmd_sock; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, &ksess->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -665,13 +722,23 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1 && - session->kernel_session->consumer != NULL) { - ret = kernel_consumer_send_channel_stream( - session->kernel_session->consumer_fd, channel, - session->kernel_session); - if (ret < 0) { - goto error; + if (ksess->consumer_fds_sent == 1 && ksess->consumer != NULL) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_stream(socket->fd, + channel, ksess); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error; + } } } goto error; @@ -1869,21 +1936,24 @@ error: static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; + struct lttng_ht_iter iter; + struct consumer_socket *socket; - if (session->consumer_fds_sent == 0 && session->consumer != NULL) { - /* - * Assign default kernel consumer socket if no consumer assigned to the - * kernel session. At this point, it's NOT supposed to be -1 but this is - * an extra security check. - */ - if (session->consumer_fd < 0) { - session->consumer_fd = kconsumer_data.cmd_sock; - } + assert(session); - ret = kernel_consumer_send_session(session->consumer_fd, session); - if (ret < 0) { - ret = LTTCOMM_KERN_CONSUMER_FAIL; - goto error; + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { + cds_lfht_for_each_entry(session->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_session(socket->fd, session); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } } } @@ -2054,6 +2124,8 @@ static int setup_relayd(struct ltt_session *session) int ret = LTTCOMM_OK; struct ltt_ust_session *usess; struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + struct lttng_ht_iter iter; assert(session); @@ -2062,34 +2134,37 @@ static int setup_relayd(struct ltt_session *session) DBG2("Setting relayd for session %s", session->name); - if (usess && usess->consumer->sock == -1 && - usess->consumer->type == CONSUMER_DST_NET && + if (usess && usess->consumer->type == CONSUMER_DST_NET && usess->consumer->enabled) { - /* Setup relayd for 64 bits consumer */ - if (ust_consumerd64_fd >= 0) { + /* For each consumer socket, send relayd sockets */ + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd64_fd); + usess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - - /* Setup relayd for 32 bits consumer */ - if (ust_consumerd32_fd >= 0) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd32_fd); + } else if (ksess && ksess->consumer->type == CONSUMER_DST_NET && + ksess->consumer->enabled) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - } else if (ksess && ksess->consumer->sock == -1 && - ksess->consumer->type == CONSUMER_DST_NET && - ksess->consumer->enabled) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } } error: @@ -2219,11 +2294,6 @@ static int create_kernel_session(struct ltt_session *session) goto error; } - /* Set kernel consumer socket fd */ - if (kconsumer_data.cmd_sock >= 0) { - session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; - } - /* Copy session output to the newly created Kernel session */ ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); if (ret != LTTCOMM_OK) { @@ -3465,6 +3535,9 @@ static int cmd_destroy_session(struct ltt_session *session, char *name) { int ret; + /* Safety net */ + assert(session); + /* Clean kernel session teardown */ teardown_kernel_session(session); /* UST session teardown */ @@ -3534,6 +3607,7 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, char *sock_path) { int ret, sock; + struct consumer_socket *socket; switch (domain) { case LTTNG_DOMAIN_KERNEL: @@ -3549,7 +3623,17 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, goto error; } - session->kernel_session->consumer_fd = sock; + socket = consumer_allocate_socket(sock); + if (socket == NULL) { + ret = LTTCOMM_FATAL; + close(sock); + goto error; + } + + rcu_read_lock(); + consumer_add_socket(socket, session->kernel_session->consumer); + rcu_read_unlock(); + break; default: /* TODO: Userspace tracing */ @@ -3705,6 +3789,10 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, switch (domain) { case LTTNG_DOMAIN_KERNEL: + { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3738,11 +3826,20 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - ret = send_socket_relayd_consumer(domain, session, uri, consumer, - ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = send_socket_relayd_consumer(domain, session, uri, consumer, + socket->fd); + pthread_mutex_unlock(socket->lock); + if (ret != LTTCOMM_OK) { + goto error; + } } + break; case LTTNG_DST_PATH: DBG2("Setting trace directory path from URI to %s", uri->dst.path); @@ -3758,6 +3855,7 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, /* All good! */ break; + } case LTTNG_DOMAIN_UST: /* Code flow error if we don't have a kernel session here. */ assert(usess); @@ -3778,6 +3876,8 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, case LTTNG_DST_IPV4: case LTTNG_DST_IPV6: { + struct consumer_socket *socket; + DBG2("Setting network URI for UST session %s", session->name); /* Set URI into consumer object */ @@ -3793,22 +3893,31 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, sizeof(consumer->subdir)); } - if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd64_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - if (ust_consumerd32_fd >= 0) { + socket = consumer_find_socket(uatomic_read(&ust_consumerd32_fd), + consumer); + if (socket != NULL) { + pthread_mutex_lock(socket->lock); ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd32_fd); + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - + rcu_read_unlock(); break; } case LTTNG_DST_PATH: @@ -4194,6 +4303,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Need a session for kernel command */ if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->kernel_session == NULL) { ret = create_kernel_session(cmd_ctx->session); if (ret < 0) { @@ -4213,13 +4324,29 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); - - /* Set consumer fd of the session */ - cmd_ctx->session->kernel_session->consumer_fd = - kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } + + /* Set kernel consumer socket fd */ + if (kconsumer_data.cmd_sock >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(kconsumer_data.cmd_sock, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(kconsumer_data.cmd_sock); + if (socket == NULL) { + goto error; + } + + socket->lock = &kconsumer_data.lock; + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->kernel_session->consumer); + rcu_read_unlock(); + } + } } break; @@ -4232,6 +4359,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, } if (need_tracing_session) { + struct consumer_socket *socket; + if (cmd_ctx->session->ust_session == NULL) { ret = create_ust_session(cmd_ctx->session, &cmd_ctx->lsm->domain); @@ -4250,15 +4379,40 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER64_FAIL; - ust_consumerd64_fd = -EINVAL; + uatomic_set(&ust_consumerd64_fd, -EINVAL); goto error; } - ust_consumerd64_fd = ustconsumer64_data.cmd_sock; + uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd64_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd64_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 64 bit socket set to %d", socket->fd); + } + /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && @@ -4267,15 +4421,39 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER32_FAIL; - ust_consumerd32_fd = -EINVAL; + uatomic_set(&ust_consumerd32_fd, -EINVAL); goto error; } - ust_consumerd32_fd = ustconsumer32_data.cmd_sock; + uatomic_set(&ust_consumerd32_fd, ustconsumer32_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + if (ust_consumerd32_fd >= 0) { + rcu_read_lock(); + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + if (socket == NULL) { + socket = consumer_allocate_socket(ust_consumerd32_fd); + if (socket == NULL) { + goto error; + } + socket->lock = &ustconsumer32_data.lock; + + rcu_read_lock(); + consumer_add_socket(socket, + cmd_ctx->session->ust_session->consumer); + rcu_read_unlock(); + } + DBG3("UST consumer 32 bit socket set to %d", socket->fd); + } } break; } diff --git a/src/bin/lttng-sessiond/trace-kernel.c b/src/bin/lttng-sessiond/trace-kernel.c index c1104fac2..c0239d4bb 100644 --- a/src/bin/lttng-sessiond/trace-kernel.c +++ b/src/bin/lttng-sessiond/trace-kernel.c @@ -101,7 +101,6 @@ struct ltt_kernel_session *trace_kernel_create_session(char *path) lks->channel_count = 0; lks->stream_count_global = 0; lks->metadata = NULL; - lks->consumer_fd = -1; CDS_INIT_LIST_HEAD(&lks->channel_list.head); /* Create default consumer output object */ diff --git a/src/bin/lttng-sessiond/trace-kernel.h b/src/bin/lttng-sessiond/trace-kernel.h index bc4c0e763..d34b83c8c 100644 --- a/src/bin/lttng-sessiond/trace-kernel.h +++ b/src/bin/lttng-sessiond/trace-kernel.h @@ -90,7 +90,6 @@ struct ltt_kernel_session { int fd; int metadata_stream_fd; int consumer_fds_sent; - int consumer_fd; unsigned int channel_count; unsigned int stream_count_global; char *trace_path; diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index b272fd679..087846df1 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -1405,8 +1405,10 @@ int ust_app_register(struct ust_register_msg *msg, int sock) struct ust_app *lta; int ret; - if ((msg->bits_per_long == 64 && ust_consumerd64_fd == -EINVAL) - || (msg->bits_per_long == 32 && ust_consumerd32_fd == -EINVAL)) { + if ((msg->bits_per_long == 64 && + (uatomic_read(&ust_consumerd64_fd) == -EINVAL)) + || (msg->bits_per_long == 32 && + (uatomic_read(&ust_consumerd32_fd) == -EINVAL))) { ERR("Registration failed: application \"%s\" (pid: %d) has " "%d-bit long, but no consumerd for this long size is available.\n", msg->name, msg->pid, msg->bits_per_long); @@ -2155,7 +2157,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) struct ust_app_session *ua_sess; struct ust_app_channel *ua_chan; struct ltt_ust_stream *ustream; - int consumerd_fd; + struct consumer_socket *socket; DBG("Starting tracing for ust app pid %d", app->pid); @@ -2238,10 +2240,18 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) switch (app->bits_per_long) { case 64: - consumerd_fd = ust_consumerd64_fd; + socket = consumer_find_socket(uatomic_read(&ust_consumerd64_fd), + usess->consumer); + if (socket == NULL) { + goto skip_setup; + } break; case 32: - consumerd_fd = ust_consumerd32_fd; + socket = consumer_find_socket(uatomic_read(&ust_consumerd32_fd), + usess->consumer); + if (socket == NULL) { + goto skip_setup; + } break; default: ret = -EINVAL; @@ -2249,7 +2259,7 @@ int ust_app_start_trace(struct ltt_ust_session *usess, struct ust_app *app) } /* Setup UST consumer socket and send fds to it */ - ret = ust_consumer_send_session(consumerd_fd, ua_sess, usess->consumer); + ret = ust_consumer_send_session(ua_sess, usess->consumer, socket); if (ret < 0) { goto error_rcu_unlock; } diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 5b909ab15..b7e0d3592 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -117,7 +117,7 @@ error: /* * Send all stream fds of UST channel to the consumer. */ -int ust_consumer_send_channel_streams(int sock, +static int send_channel_streams(int sock, struct ust_app_channel *uchan, struct ust_app_session *usess, struct consumer_output *consumer) { @@ -178,7 +178,7 @@ error: /* * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM. */ -int ust_consumer_send_metadata(int sock, struct ust_app_session *usess, +static int send_metadata(int sock, struct ust_app_session *usess, struct consumer_output *consumer) { int ret, fd, fds[2]; @@ -281,23 +281,23 @@ error: /* * Send all stream fds of the UST session to the consumer. */ -int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, - struct consumer_output *consumer) +int ust_consumer_send_session(struct ust_app_session *usess, + struct consumer_output *consumer, struct consumer_socket *sock) { int ret = 0; - int sock = consumer_fd; struct lttng_ht_iter iter; struct ust_app_channel *ua_chan; - DBG("Sending metadata stream fd"); + assert(usess); + assert(consumer); + assert(sock); - if (consumer_fd < 0) { - ERR("Consumer has negative file descriptor"); - return -EINVAL; - } + DBG("Sending metadata stream fd to consumer on %d", sock->fd); + + pthread_mutex_lock(sock->lock); /* Sending metadata information to the consumer */ - ret = ust_consumer_send_metadata(consumer_fd, usess, consumer); + ret = send_metadata(sock->fd, usess, consumer); if (ret < 0) { goto error; } @@ -314,7 +314,7 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, continue; } - ret = ust_consumer_send_channel_streams(sock, ua_chan, usess, consumer); + ret = send_channel_streams(sock->fd, ua_chan, usess, consumer); if (ret < 0) { rcu_read_unlock(); goto error; @@ -324,8 +324,10 @@ int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, DBG("consumer fds (metadata and channel streams) sent"); - return 0; + /* All good! */ + ret = 0; error: + pthread_mutex_unlock(sock->lock); return ret; } diff --git a/src/bin/lttng-sessiond/ust-consumer.h b/src/bin/lttng-sessiond/ust-consumer.h index ac57e64ed..b4a711b0a 100644 --- a/src/bin/lttng-sessiond/ust-consumer.h +++ b/src/bin/lttng-sessiond/ust-consumer.h @@ -21,14 +21,7 @@ #include "consumer.h" #include "ust-app.h" -int ust_consumer_send_session(int consumer_fd, struct ust_app_session *usess, - struct consumer_output *consumer); - -int ust_consumer_send_metadata(int sock, struct ust_app_session *usess, - struct consumer_output *consumer); - -int ust_consumer_send_channel_streams(int sock, - struct ust_app_channel *uchan, struct ust_app_session *usess, - struct consumer_output *consumer); +int ust_consumer_send_session(struct ust_app_session *usess, + struct consumer_output *consumer, struct consumer_socket *sock); #endif /* _UST_CONSUMER_H */ diff --git a/src/common/consumer.c b/src/common/consumer.c index 831592a1e..63d0d65ee 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -177,11 +177,18 @@ void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd) int ret; struct lttng_ht_iter iter; + if (relayd == NULL) { + return; + } + DBG("Consumer destroy and close relayd socket pair"); iter.iter.node = &relayd->node.node; ret = lttng_ht_del(consumer_data.relayd_ht, &iter); - assert(!ret); + if (ret != 0) { + /* We assume the relayd was already destroyed */ + return; + } /* Close all sockets */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); @@ -266,8 +273,18 @@ void consumer_del_stream(struct lttng_consumer_stream *stream) if (relayd != NULL) { uatomic_dec(&relayd->refcount); assert(uatomic_read(&relayd->refcount) >= 0); - if (uatomic_read(&relayd->refcount) == 0) { - /* Refcount of the relayd struct is 0, destroy it */ + + ret = relayd_send_close_stream(&relayd->control_sock, + stream->relayd_stream_id, + stream->next_net_seq_num - 1); + if (ret < 0) { + ERR("Unable to close stream on the relayd. Continuing"); + /* Continue here. There is nothing we can do for the relayd.*/ + } + + /* Both conditions are met, we destroy the relayd. */ + if (uatomic_read(&relayd->refcount) == 0 && + uatomic_read(&relayd->destroy_flag)) { consumer_destroy_relayd(relayd); } } @@ -469,6 +486,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( obj->net_seq_idx = net_seq_idx; obj->refcount = 0; + obj->destroy_flag = 0; lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx); pthread_mutex_init(&obj->ctrl_sock_mutex, NULL); @@ -546,6 +564,7 @@ int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); data_hdr.data_size = htobe32(data_size); + data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++); /* Other fields are zeroed previously */ ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr, diff --git a/src/common/consumer.h b/src/common/consumer.h index 8e7cec994..cc90933e7 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -54,6 +54,8 @@ enum lttng_consumer_command { /* inform the consumer to quit when all fd has hang up */ LTTNG_CONSUMER_STOP, LTTNG_CONSUMER_ADD_RELAYD_SOCKET, + /* Inform the consumer to kill a specific relayd connection */ + LTTNG_CONSUMER_DESTROY_RELAYD, }; /* State of each fd in consumer */ @@ -128,6 +130,8 @@ struct lttng_consumer_stream { unsigned int metadata_flag; /* Used when the stream is set for network streaming */ uint64_t relayd_stream_id; + /* Next sequence number to use for trace packet */ + uint64_t next_net_seq_num; }; /* @@ -138,6 +142,14 @@ struct consumer_relayd_sock_pair { int net_seq_idx; /* Number of stream associated with this relayd */ unsigned int refcount; + + /* + * This flag indicates whether or not we should destroy this object. The + * destruction should ONLY occurs when this flag is set and the refcount is + * set to zero. + */ + unsigned int destroy_flag; + /* * Mutex protecting the control socket to avoid out of order packets * between threads sending data to the relayd. Since metadata data is sent @@ -339,6 +351,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( struct consumer_relayd_sock_pair *consumer_find_relayd(int key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); +void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd); extern struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 8945afea3..6f877c968 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -385,3 +385,50 @@ int relayd_send_data_hdr(struct lttcomm_sock *sock, error: return ret; } + +/* + * Send close stream command to the relayd. + */ +int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, + uint64_t last_net_seq_num) +{ + int ret; + struct lttcomm_relayd_close_stream msg; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(sock); + + DBG("Relayd closing stream id %zu", stream_id); + + msg.stream_id = htobe64(stream_id); + msg.last_net_seq_num = htobe64(last_net_seq_num); + + /* Send command */ + ret = send_command(sock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + + /* Recevie response */ + ret = recv_reply(sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTCOMM_OK) { + ret = -reply.ret_code; + ERR("Relayd close stream replied error %d", ret); + } else { + /* Success */ + ret = 0; + } + + DBG("Relayd close stream id %zu successfully", stream_id); + +error: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 12efe1ec0..c6834df61 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -31,6 +31,8 @@ int relayd_create_session(struct lttcomm_sock *sock, const char *hostname, #endif int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id); +int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id, + uint64_t last_net_seq_num); int relayd_version_check(struct lttcomm_sock *sock, uint32_t major, uint32_t minor); int relayd_start_data(struct lttcomm_sock *sock); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index 67b06371f..c025fcdd9 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -44,20 +44,10 @@ struct lttcomm_relayd_data_hdr { /* Circuit ID not used for now so always ignored */ uint64_t circuit_id; uint64_t stream_id; /* Stream ID known by the relayd */ - uint64_t net_seq_num; /* Network seq. number for UDP. */ + uint64_t net_seq_num; /* Network sequence number, per stream. */ uint32_t data_size; /* data size following this header */ } __attribute__ ((__packed__)); -#if 0 -/* - * Used to create a session between the relay and the sessiond. - */ -struct lttcomm_relayd_create_session { - char hostname[LTTNG_MAX_DNNAME]; - char session_name[NAME_MAX]; -}; -#endif - /* * Used to add a stream on the relay daemon. */ @@ -85,7 +75,7 @@ struct lttcomm_relayd_generic_reply { * Used to update synchronization information. */ struct lttcomm_relayd_update_sync_info { - /* TODO: fill the structure */ + /* TODO: fill the structure. Feature not implemented yet */ } __attribute__ ((__packed__)); /* @@ -104,4 +94,12 @@ struct lttcomm_relayd_metadata_payload { char payload[]; } __attribute__ ((__packed__)); +/* + * Used to indicate that a specific stream id can now be closed. + */ +struct lttcomm_relayd_close_stream { + uint64_t stream_id; + uint64_t last_net_seq_num; /* sequence number of last packet */ +} __attribute__ ((__packed__)); + #endif /* _RELAYD_COMM */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index eb13734a6..aebe30c6a 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -84,6 +84,7 @@ enum lttcomm_sessiond_command { RELAYD_UPDATE_SYNC_INFO, RELAYD_VERSION, RELAYD_SEND_METADATA, + RELAYD_CLOSE_STREAM, LTTNG_SET_FILTER, LTTNG_HEALTH_CHECK, }; @@ -364,6 +365,9 @@ struct lttcomm_consumer_msg { /* Open socket to the relayd */ struct lttcomm_sock sock; } relayd_sock; + struct { + uint64_t net_seq_idx; + } destroy_relayd; } u; }; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 7ce03ad87..07a68d8f7 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -212,6 +212,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { + DBG("Consumer received unexpected message size %zd (expects %zu)", + ret, sizeof(msg)); lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); return ret; } @@ -307,6 +309,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fds[1]; size_t nb_fd = 1; + DBG("UST Consumer adding channel"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; @@ -346,6 +350,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; + DBG("UST Consumer adding stream"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { return -EINTR; @@ -356,8 +362,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } + DBG("consumer_add_stream chan %d stream %d", + msg.u.stream.channel_key, + msg.u.stream.stream_key); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.channel.channel_key, + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, @@ -408,6 +418,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + struct consumer_relayd_sock_pair *relayd; + + DBG("UST consumer destroying relayd %zu", + msg.u.destroy_relayd.net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + if (relayd == NULL) { + ERR("Unable to find relayd %zu", + msg.u.destroy_relayd.net_seq_idx); + } + + /* Set destroy flag for this object */ + uatomic_set(&relayd->destroy_flag, 1); + + /* Destroy the relayd if refcount is 0 else set the destroy flag. */ + if (uatomic_read(&relayd->refcount) == 0) { + consumer_destroy_relayd(relayd); + } + break; + } case LTTNG_CONSUMER_UPDATE_STREAM: { return -ENOSYS; diff --git a/tests/tools/Makefile.am b/tests/tools/Makefile.am index 555d8239e..5cad1fb05 100644 --- a/tests/tools/Makefile.am +++ b/tests/tools/Makefile.am @@ -22,7 +22,7 @@ test_sessions_LDADD = $(COMMON) $(HASHTABLE) # Kernel trace data unit tests test_kernel_data_trace_SOURCES = test_kernel_data_trace.c $(UTILS) $(KERN_DATA_TRACE) -test_kernel_data_trace_LDADD = $(SESSIOND_COMM) +test_kernel_data_trace_LDADD = $(SESSIOND_COMM) $(HASHTABLE) if HAVE_LIBLTTNG_UST_CTL noinst_PROGRAMS += test_ust_data_trace diff --git a/tests/tools/streaming/run-kernel b/tests/tools/streaming/run-kernel index 8674edfac..73a99de15 100755 --- a/tests/tools/streaming/run-kernel +++ b/tests/tools/streaming/run-kernel @@ -21,6 +21,8 @@ EVENT_NAME="sched_switch" PID_RELAYD=0 SESSION_NAME="" +TRACE_PATH=$(mktemp -d) + source $TESTDIR/utils.sh echo -e "\n---------------------------" @@ -81,6 +83,8 @@ function test_kernel_before_start () destroy_lttng_session $SESSION_NAME } +# Deactivated since this feature is not yet available where we can enable +# an event AFTERE tracing has started. function test_kernel_after_start () { echo -e "\n=== Testing kernel streaming with event enable AFTER start\n" @@ -95,9 +99,9 @@ function test_kernel_after_start () } start_sessiond -lttng_start_relayd +lttng_start_relayd "-o $TRACE_PATH" -tests=( test_kernel_before_start test_kernel_after_start ) +tests=( test_kernel_before_start ) for fct_test in ${tests[@]}; do @@ -105,11 +109,10 @@ do ${fct_test} # Validate test - validate_trace $EVENT_NAME ~/lttng-traces/$HOSTNAME/$SESSION_NAME* + validate_trace $EVENT_NAME $TRACE_PATH/$HOSTNAME/$SESSION_NAME* if [ $? -eq 0 ]; then # Only delete if successful - rm -rf ~/lttng-traces/$HOSTNAME/$SESSION_NAME* - rm -rf ~/lttng-traces/$SESSION_NAME* + rm -rf $TRACE_PATH else break fi diff --git a/tests/tools/test_kernel_data_trace.c b/tests/tools/test_kernel_data_trace.c index 3e9dc442a..8bd62875a 100644 --- a/tests/tools/test_kernel_data_trace.c +++ b/tests/tools/test_kernel_data_trace.c @@ -78,7 +78,6 @@ static void create_one_kernel_session(void) assert(kern->channel_count == 0); assert(kern->stream_count_global == 0); assert(kern->metadata == NULL); - assert(kern->consumer_fd == -1); PRINT_OK(); /* Init list in order to avoid sefaults from cds_list_del */ diff --git a/tests/utils.sh b/tests/utils.sh index e1fd5cb91..1ca298562 100644 --- a/tests/utils.sh +++ b/tests/utils.sh @@ -97,11 +97,14 @@ function lttng_enable_kernel_event function lttng_start_relayd { - echo -e -n "Starting lttng-relayd... " + local opt="$1" + + echo -e -n "Starting lttng-relayd (opt: $opt)... " + DIR=$(readlink -f $TESTDIR) if [ -z $(pidof lt-$RELAYD_BIN) ]; then - $DIR/../src/bin/lttng-relayd/$RELAYD_BIN >/dev/null 2>&1 & + $DIR/../src/bin/lttng-relayd/$RELAYD_BIN $opt >/dev/null 2>&1 & if [ $? -eq 1 ]; then echo -e "\e[1;31mFAILED\e[0m" return 1