X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=76238a087976ff5ca341040ecbafecfcb3d0b0a0;hp=5e2f7692b1e6e7c35ffbc574fa45f974c4e9f117;hb=7a57cf92a463031c8cb668900f9215e3773a15c3;hpb=fb3a43a9284f3300e9b66edc2f2c2d2767895423 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 5e2f7692b..76238a087 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -150,7 +150,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_channel = consumer_allocate_channel(msg.u.channel.channel_key, fds[0], -1, msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.max_sb_size, + msg.u.channel.nb_init_streams); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -173,6 +174,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int fds[2]; size_t nb_fd = 2; struct consumer_relayd_sock_pair *relayd = NULL; + int alloc_ret = 0; DBG("UST Consumer adding stream"); @@ -188,9 +190,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } - DBG("consumer_add_stream chan %d stream %d", - msg.u.stream.channel_key, - msg.u.stream.stream_key); + DBG("Consumer command ADD_STREAM chan %d stream %d", + msg.u.stream.channel_key, msg.u.stream.stream_key); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, @@ -203,9 +204,23 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.uid, msg.u.stream.gid, msg.u.stream.net_index, - msg.u.stream.metadata_flag); + msg.u.stream.metadata_flag, + &alloc_ret); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + case -ENOENT: + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + DBG3("Could not find channel"); + break; + } goto end_nosignal; } @@ -228,29 +243,24 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - /* Send stream to the metadata thread */ - if (new_stream->metadata_flag) { - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); - if (ret < 0) { - goto end_nosignal; - } + /* Do actions once stream has been received. */ + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); + if (ret < 0) { + goto end_nosignal; } + } + /* Send stream to the metadata thread */ + if (new_stream->metadata_flag) { do { - ret = write(ctx->consumer_metadata_pipe[1], new_stream, - sizeof(struct lttng_consumer_stream)); + ret = write(ctx->consumer_metadata_pipe[1], &new_stream, + sizeof(new_stream)); } while (ret < 0 && errno == EINTR); if (ret < 0) { PERROR("write metadata pipe"); } } else { - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); - if (ret < 0) { - goto end_nosignal; - } - } consumer_add_stream(new_stream); } @@ -373,17 +383,24 @@ int lttng_ustconsumer_allocate_stream(struct lttng_consumer_stream *stream) obj.wait_fd = stream->wait_fd; obj.memory_map_size = stream->mmap_len; ret = ustctl_add_stream(stream->chan->handle, &obj); - if (ret) + if (ret) { + ERR("UST ctl add_stream failed with ret %d", ret); return ret; + } + stream->buf = ustctl_open_stream_read(stream->chan->handle, stream->cpu); - if (!stream->buf) + if (!stream->buf) { + ERR("UST ctl open_stream_read failed"); return -EBUSY; + } + /* ustctl_open_stream_read has closed the shm fd. */ stream->wait_fd_is_copy = 1; stream->shm_fd = -1; stream->mmap_base = ustctl_get_mmap_base(stream->chan->handle, stream->buf); if (!stream->mmap_base) { + ERR("UST ctl get_mmap_base failed"); return -EINVAL; } @@ -399,7 +416,7 @@ void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream) int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; long ret = 0; struct lttng_ust_shm_handle *handle; @@ -426,7 +443,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = ustctl_get_next_subbuf(handle, buf); if (err != 0) { - ret = -ret; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ + ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */ /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -438,17 +455,33 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } assert(stream->output == LTTNG_EVENT_MMAP); - /* read the used subbuffer size */ + /* Get the full padded subbuffer size */ err = ustctl_get_padded_subbuf_size(handle, buf, &len); assert(err == 0); + + /* Get subbuffer data size (without padding) */ + err = ustctl_get_subbuf_size(handle, buf, &subbuf_size); + assert(err == 0); + + /* Make sure we don't get a subbuffer size bigger than the padded */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding); + /* + * The mmap operation should write subbuf_size amount of data when network + * streaming or the full padding (len) size when we are _not_ streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != -1) || + (ret != len && stream->net_seq_idx == -1)) { /* - * display the error but continue processing to try - * to release the subbuffer + * Display the error but continue processing to try to release the + * subbuffer */ - ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len); + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); } err = ustctl_put_next_subbuf(handle, buf); assert(err == 0);