X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=718887971abb7b280565613b0b08643e8232304b;hb=58b1f4255ea457f2965f31b84205cb0eec21e71f;hp=855d07141b6ebe70424ed6ee649babb494e984a4;hpb=f73fabfda365d22e7dd180fb1614e37c446fbd9e;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 855d07141..718887971 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -92,6 +92,11 @@ int lttng_ustconsumer_get_produced_snapshot( return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -145,7 +150,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, fds[0], -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -168,6 +174,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fds[2]; size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; + int alloc_ret = 0; DBG("UST Consumer adding stream"); @@ -183,9 +190,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } - DBG("consumer_add_stream chan %d stream %d", - msg.u.stream.channel_key, - msg.u.stream.stream_key); + DBG("Consumer command ADD_STREAM chan %d stream %d", + msg.u.stream.channel_key, msg.u.stream.stream_key); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, @@ -198,9 +204,23 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.uid, msg.u.stream.gid, msg.u.stream.net_index, - msg.u.stream.metadata_flag); + msg.u.stream.metadata_flag, + &alloc_ret); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + case -ENOENT: + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + DBG3("Could not find channel"); + break; + } goto end_nosignal; } @@ -214,24 +234,46 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, &new_stream->relayd_stream_id); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + consumer_del_stream(new_stream, NULL); goto end_nosignal; } } else if (msg.u.stream.net_index != -1) { ERR("Network sequence index %d unknown. Not adding stream.", msg.u.stream.net_index); - free(new_stream); + consumer_del_stream(new_stream, NULL); goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { + /* Do actions once stream has been received. */ + if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { + if (ret < 0) { + consumer_del_stream(new_stream, NULL); + goto end_nosignal; + } + } + + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { + do { + ret = write(ctx->consumer_metadata_pipe[1], &new_stream, + sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata pipe"); + consumer_del_metadata_stream(new_stream, NULL); goto end_nosignal; } } else { - consumer_add_stream(new_stream); + do { + ret = write(ctx->consumer_poll_pipe[1], &new_stream, + sizeof(new_stream)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write data pipe"); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; + } } DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64, @@ -290,24 +332,14 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } - /* - * Wake-up the other end by writing a null byte in the pipe (non-blocking). - * Important note: Because writing into the pipe is non-blocking (and - * therefore we allow dropping wakeup data, as long as there is wakeup data - * present in the pipe buffer to wake up the other end), the other end - * should perform the following sequence for waiting: - * - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). - */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret < 0 && errno == EINTR); end_nosignal: - /* XXX: At some point we might want to return something else than zero */ rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) @@ -339,7 +371,7 @@ void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) ustctl_unmap_channel(chan->handle); } -int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) +int lttng_ustconsumer_add_stream(struct lttng_consumer_stream *stream) { struct lttng_ust_object_data obj; int ret; @@ -349,21 +381,35 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) obj.wait_fd = stream->wait_fd; obj.memory_map_size = stream->mmap_len; ret = ustctl_add_stream(stream->chan->handle, &obj); - if (ret) - return ret; + if (ret) { + ERR("UST ctl add_stream failed with ret %d", ret); + goto error; + } + stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); - if (!stream->buf) - return -EBUSY; + if (!stream->buf) { + ERR("UST ctl open_stream_read failed"); + ret = -EBUSY; + goto error; + } + /* ustctl_open_stream_read has closed the shm fd. */ stream->wait_fd_is_copy = 1; stream->shm_fd = -1; stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); if (!stream->mmap_base) { - return -EINVAL; + ERR("UST ctl get_mmap_base failed"); + ret = -EINVAL; + goto mmap_error; } return 0; + +mmap_error: + ustctl_close_stream_read(stream->chan->handle, stream->buf); +error: + return ret; } void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) @@ -375,7 +421,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; long ret = 0; struct lttng_ust_shm_handle *handle; @@ -402,7 +448,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -414,17 +460,33 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } assert(stream->output == LTTNG_EVENT_MMAP); - /* read the used subbuffer size */ + /* Get the full padded subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); assert(err == 0); + + /* Get subbuffer data size (without padding) */ + err = ustctl_get_subbuf_size(handle, buf, &subbuf_size); + assert(err == 0); + + /* Make sure we don't get a subbuffer size bigger than the padded */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding); + /* + * The mmap operation should write subbuf_size amount of data when network + * streaming or the full padding (len) size when we are _not_ streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != -1) || + (ret != len && stream->net_seq_idx == -1)) { /* - * display the error but continue processing to try - * to release the subbuffer + * Display the error but continue processing to try to release the + * subbuffer */ - ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len); + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); } err = ustctl_put_next_subbuf(handle, buf); assert(err == 0); @@ -450,6 +512,13 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->out_fd = ret; } + ret = lttng_ustconsumer_add_stream(stream); + if (ret) { + consumer_del_stream(stream, NULL); + ret = -1; + goto error; + } + /* we return 0 to let the library handle the FD internally */ return 0;