X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=11701aebfb9869b7e6dbac8deb26828bd3d8cebe;hb=d3e6246fe67c55cd5ad8616aeeb2391fd5841b7b;hp=4a7ba04516e442aae6b824d61923bffd1f652e1c;hpb=a6ba4fe1a8217fd5cb9e286b4d88a9252c0d5d06;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4a7ba0451..11701aebf 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -92,7 +92,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -120,7 +120,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.mmap_len, msg.u.channel.max_sb_size); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -150,7 +150,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } @@ -167,10 +167,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.net_index, msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } + /* + * The buffer flush is done on the session daemon side for the kernel + * so no need for the stream "hangup_flush_done" variable to be + * tracked. This is important for a kernel stream since we don't rely + * on the flush state of the stream to read data. It's not the case for + * user space tracing. + */ + new_stream->hangup_flush_done = 0; + /* The stream is not metadata. Get relayd reference if exists. */ relayd = consumer_find_relayd(msg.u.stream.net_index); if (relayd != NULL) { @@ -190,14 +199,29 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { - ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { - goto end_nosignal; + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } + + do { + ret = write(ctx->consumer_metadata_pipe[1], new_stream, + sizeof(struct lttng_consumer_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata pipe"); } } else { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } consumer_add_stream(new_stream); } @@ -257,7 +281,12 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } /*