X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=c762934ff92a55569b9323852589157e3e4ceb81;hb=50f8ae690312d8f824fb9c9875b0a07f4a2547b6;hp=f910f033d9ea96e283f3ee5705e6cf9d599efef8;hpb=91dfef6e2f50460b9597fc8a8186949c48557c14;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index f910f033d..c762934ff 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -118,7 +118,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, -1, -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -137,9 +138,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ADD_STREAM: { - int fd; + int fd, stream_pipe; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; + int alloc_ret = 0; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { @@ -165,9 +167,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.uid, msg.u.stream.gid, msg.u.stream.net_index, - msg.u.stream.metadata_flag); + msg.u.stream.metadata_flag, + &alloc_ret); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + case -ENOENT: + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + DBG3("Could not find channel"); + break; + } goto end_nosignal; } @@ -190,42 +206,44 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + consumer_del_stream(new_stream, NULL); goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); - free(new_stream); + consumer_del_stream(new_stream, NULL); goto end_nosignal; } - /* Send stream to the metadata thread */ - if (new_stream->metadata_flag) { - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); - if (ret < 0) { - goto end_nosignal; - } - } - - do { - ret = write(ctx->consumer_metadata_pipe[1], new_stream, - sizeof(struct lttng_consumer_stream)); - } while (ret < 0 && errno == EINTR); + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); if (ret < 0) { - PERROR("write metadata pipe"); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } + } + + /* Get the right pipe where the stream will be sent. */ + if (new_stream->metadata_flag) { + stream_pipe = ctx->consumer_metadata_pipe[1]; } else { - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); - if (ret < 0) { - goto end_nosignal; - } - } - consumer_add_stream(new_stream); + stream_pipe = ctx->consumer_data_pipe[1]; + } + + do { + ret = write(stream_pipe, &new_stream, sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + stream_pipe); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - DBG("Kernel consumer_add_stream (%d)", fd); + DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, + msg.u.stream.path_name, fd, new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -265,20 +283,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - /* - * Wake-up the other end by writing a null byte in the pipe (non-blocking). - * Important note: Because writing into the pipe is non-blocking (and - * therefore we allow dropping wakeup data, as long as there is wakeup data - * present in the pipe buffer to wake up the other end), the other end - * should perform the following sequence for waiting: - * - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). - */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock();