X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=5e2f7692b1e6e7c35ffbc574fa45f974c4e9f117;hp=9cafe398e1493be918896b8bc803c14e0f98e242;hb=fb3a43a9284f3300e9b66edc2f2c2d2767895423;hpb=a6ba4fe1a8217fd5cb9e286b4d88a9252c0d5d06 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 9cafe398e..5e2f7692b 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -92,6 +92,11 @@ int lttng_ustconsumer_get_produced_snapshot( return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -102,7 +107,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret != sizeof(msg)) { DBG("Consumer received unexpected message size %zd (expects %zu)", ret, sizeof(msg)); - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -135,7 +140,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } @@ -147,7 +152,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.mmap_len, msg.u.channel.max_sb_size); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -178,7 +183,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } @@ -200,7 +205,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.net_index, msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } @@ -223,14 +228,29 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { - ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { - goto end_nosignal; + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } + + do { + ret = write(ctx->consumer_metadata_pipe[1], new_stream, + sizeof(struct lttng_consumer_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata pipe"); } } else { + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; + } + } consumer_add_stream(new_stream); } @@ -305,9 +325,13 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = write(ctx->consumer_poll_pipe[1], "", 1); } while (ret < 0 && errno == EINTR); end_nosignal: - /* XXX: At some point we might want to return something else than zero */ rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan)