break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ {
+ /*
+ * Special case for the metadata since the wait fd is an internal pipe
+ * polled in the metadata thread.
+ */
+ if (stream->metadata_flag && stream->chan->monitor) {
+ int rpipe = stream->ust_metadata_poll_pipe[0];
+
+ /*
+ * This will stop the channel timer if one and close the write side
+ * of the metadata poll pipe.
+ */
+ lttng_ustconsumer_close_metadata(stream->chan);
+ if (rpipe >= 0) {
+ ret = close(rpipe);
+ if (ret < 0) {
+ PERROR("closing metadata pipe read side");
+ }
+ stream->ust_metadata_poll_pipe[0] = -1;
+ }
+ }
break;
+ }
default:
ERR("Unknown consumer_data type");
assert(0);
rcu_read_unlock();
- /* Decrement the stream count of the global consumer data. */
- assert(consumer_data.stream_count > 0);
- consumer_data.stream_count--;
+ if (!stream->metadata_flag) {
+ /* Decrement the stream count of the global consumer data. */
+ assert(consumer_data.stream_count > 0);
+ consumer_data.stream_count--;
+ }
}
/*
* Return 0 on success or else a negative value.
*/
int consumer_stream_write_index(struct lttng_consumer_stream *stream,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
int ret;
struct consumer_relayd_sock_pair *relayd;
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_send_index(&relayd->control_sock, index,
stream->relayd_stream_id, stream->next_net_seq_num - 1);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
} else {
- ret = index_write(stream->index_fd, index,
- sizeof(struct lttng_packet_index));
+ ssize_t size_ret;
+
+ size_ret = index_write(stream->index_fd, index,
+ sizeof(struct ctf_packet_index));
+ if (size_ret < sizeof(struct ctf_packet_index)) {
+ ret = -1;
+ } else {
+ ret = 0;
+ }
}
if (ret < 0) {
goto error;