X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=a85dfe0b047089be39cdd0a1d86c99258814f66f;hp=8c2bee33363614cbf58a4e5082aad84d9b8140ad;hb=77c7c900d190f7fb4f99a456c767f069da7e72b8;hpb=f02e1e8a5820da2eda835add020f92ca8d32c973 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 8c2bee333..a85dfe0b0 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -123,6 +124,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } @@ -214,6 +216,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } @@ -221,6 +224,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -237,7 +241,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + goto end_nosignal; } /* The stream is not metadata. Get relayd reference if exists. */ @@ -250,13 +254,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - goto end; + goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); free(new_stream); - goto end; + goto end_nosignal; } if (ctx->on_recv_stream != NULL) { @@ -264,7 +268,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret == 0) { consumer_add_stream(new_stream); } else if (ret < 0) { - goto end; + goto end_nosignal; } } else { consumer_add_stream(new_stream); @@ -275,30 +279,44 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_UPDATE_STREAM: { - if (ctx->on_update_stream != NULL) { - ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); - if (ret == 0) { - consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); - } else if (ret < 0) { - goto end; - } - } else { - consumer_change_stream_state(msg.u.stream.stream_key, - msg.u.stream.state); + rcu_read_unlock(); + return -ENOSYS; + } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + struct consumer_relayd_sock_pair *relayd; + + DBG("Kernel consumer destroying relayd %" PRIu64, + msg.u.destroy_relayd.net_seq_idx); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx); + if (relayd == NULL) { + ERR("Unable to find relayd %" PRIu64, + msg.u.destroy_relayd.net_seq_idx); + goto end_nosignal; } - break; + + /* Set destroy flag for this object */ + uatomic_set(&relayd->destroy_flag, 1); + + /* Destroy the relayd if refcount is 0 else set the destroy flag. */ + if (uatomic_read(&relayd->refcount) == 0) { + consumer_destroy_relayd(relayd); + } + goto end_nosignal; } default: - break; + goto end_nosignal; } -end: + /* - * Wake-up the other end by writing a null byte in the pipe - * (non-blocking). Important note: Because writing into the - * pipe is non-blocking (and therefore we allow dropping wakeup - * data, as long as there is wakeup data present in the pipe - * buffer to wake up the other end), the other end should - * perform the following sequence for waiting: + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * * 1) empty the pipe (reads). * 2) perform update operation. * 3) wait on the pipe (poll). @@ -354,7 +372,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, * display the error but continue processing to try * to release the subbuffer */ - ERR("Error splicing to tracefile (ret: %ld != len: %ld)", + ERR("Error splicing to tracefile (ret: %zd != len: %lu)", ret, len); }