X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=553d44269454058233ee9c048d4ed3d46a071609;hb=ab1f27f2803120be93a7c148022d5c9c412e2b7b;hp=0e24f8c8c73053f58c85439f788049567938b599;hpb=7775df526447e504735983ee359dcb902fa6dd1e;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0e24f8c8c..553d44269 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -121,26 +121,6 @@ error: return ret; } -/* - * Allocate and return a consumer channel object. - */ -static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, - const uint64_t *chunk_id, const char *pathname, const char *name, - uint64_t relayd_id, uint64_t key, enum lttng_event_output output, - uint64_t tracefile_size, uint64_t tracefile_count, - uint64_t session_id_per_pid, unsigned int monitor, - unsigned int live_timer_interval, - const char *root_shm_path, const char *shm_path) -{ - assert(pathname); - assert(name); - - return consumer_allocate_channel(key, session_id, chunk_id, pathname, - name, relayd_id, output, tracefile_size, - tracefile_count, session_id_per_pid, monitor, - live_timer_interval, root_shm_path, shm_path); -} - /* * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the * error value if applicable is set in it else it is kept untouched. @@ -157,7 +137,9 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, assert(channel); assert(ctx); - stream = consumer_allocate_stream(channel->key, + stream = consumer_allocate_stream( + channel, + channel->key, key, channel->name, channel->relayd_id, @@ -186,7 +168,6 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, } consumer_stream_update_channel_attributes(stream, channel); - stream->chan = channel; error: if (_alloc_ret) { @@ -1221,6 +1202,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, ssize_t read_len; unsigned long len, padded_len; const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; health_code_update(); @@ -1255,9 +1237,11 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, goto error_put_subbuf; } + subbuf_view = lttng_buffer_view_init( + subbuf_addr, 0, padded_len); read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, - stream, subbuf_addr, len, - padded_len - len, NULL); + stream, &subbuf_view, padded_len - len, + NULL); if (use_relayd) { if (read_len != len) { ret = -EPERM; @@ -1486,19 +1470,21 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; /* Create a plain object and reserve a channel key. */ - channel = allocate_channel(msg.u.ask_channel.session_id, + channel = consumer_allocate_channel( + msg.u.ask_channel.key, + msg.u.ask_channel.session_id, msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL, msg.u.ask_channel.pathname, msg.u.ask_channel.name, msg.u.ask_channel.relayd_id, - msg.u.ask_channel.key, (enum lttng_event_output) msg.u.ask_channel.output, msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count, msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor, msg.u.ask_channel.live_timer_interval, + msg.u.ask_channel.is_live, msg.u.ask_channel.root_shm_path, msg.u.ask_channel.shm_path); if (!channel) { @@ -2786,11 +2772,12 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; const char *subbuf_addr; + struct lttng_buffer_view subbuf_view; assert(stream); assert(stream->ustream); @@ -2823,20 +2810,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } } - /* - * If the stream was flagged to be ready for rotation before we extract the - * next packet, rotate it now. - */ - if (stream->rotate_ready) { - DBG("Rotate stream before extracting data"); - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } - retry: /* Get the next subbuffer */ err = ustctl_get_next_subbuf(ustream); @@ -2906,9 +2879,11 @@ retry: goto error_put_subbuf; } + subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len); + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, subbuf_addr, subbuf_size, padding, &index); + ctx, stream, &subbuf_view, padding, &index); /* * The mmap operation should write subbuf_size amount of data when * network streaming or the full padding (len) size when we are _not_ @@ -2946,7 +2921,7 @@ error_put_subbuf: /* Write index if needed. */ if (!write_index) { - goto rotate; + goto end; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2981,24 +2956,7 @@ error_put_subbuf: goto error; } -rotate: - /* - * After extracting the packet, we check if the stream is now ready to be - * rotated and perform the action immediately. - */ - rotation_ret = lttng_consumer_stream_is_rotate_ready(stream); - if (rotation_ret == 1) { - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - ret = -1; - goto error; - } - } else if (rotation_ret < 0) { - ERR("Checking if stream is ready to rotate"); - ret = -1; - goto error; - } +end: error: return ret; }