X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=1a786352d75adbfaedf93783ab4d9b91f559c0d2;hp=65dcce40b34f2b07e892c7f7a1298b9709bbd180;hb=b35308203f7844f48542978fd00aab21f2057c17;hpb=32af2c95c9494c282804964aed17bb2d57887505 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 65dcce40b..1a786352d 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1,6 +1,7 @@ /* * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers + * Copyright (C) 2017 - Jérémie Galarneau * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License, version 2 only, @@ -1012,6 +1013,55 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } + case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE: + { + int channel_monitor_pipe; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Successfully received the command's type. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + + ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, + 1); + if (ret != sizeof(channel_monitor_pipe)) { + ERR("Failed to receive channel monitor pipe"); + goto error_fatal; + } + + DBG("Received channel monitor pipe (%d)", channel_monitor_pipe); + ret = consumer_timer_thread_set_channel_monitor_pipe( + channel_monitor_pipe); + if (!ret) { + int flags; + + ret_code = LTTCOMM_CONSUMERD_SUCCESS; + /* Set the pipe as non-blocking. */ + ret = fcntl(channel_monitor_pipe, F_GETFL, 0); + if (ret == -1) { + PERROR("fcntl get flags of the channel monitoring pipe"); + goto error_fatal; + } + flags = ret; + + ret = fcntl(channel_monitor_pipe, F_SETFL, + flags | O_NONBLOCK); + if (ret == -1) { + PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe"); + goto error_fatal; + } + DBG("Channel monitor pipe set as non-blocking"); + } else { + ret_code = LTTCOMM_CONSUMERD_ALREADY_SET; + } + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + goto error_fatal; + } + break; + } default: goto end_nosignal; } @@ -1085,15 +1135,27 @@ static int get_index_values(struct ctf_packet_index *index, int infd) ret = kernctl_get_instance_id(infd, &index->stream_instance_id); if (ret < 0) { - PERROR("kernctl_get_instance_id"); - goto error; + if (ret == -ENOTTY) { + /* Command not implemented by lttng-modules. */ + index->stream_instance_id = -1ULL; + ret = 0; + } else { + PERROR("kernctl_get_instance_id"); + goto error; + } } index->stream_instance_id = htobe64(index->stream_instance_id); ret = kernctl_get_sequence_number(infd, &index->packet_seq_num); if (ret < 0) { - PERROR("kernctl_get_sequence_number"); - goto error; + if (ret == -ENOTTY) { + /* Command not implemented by lttng-modules. */ + index->packet_seq_num = -1ULL; + ret = 0; + } else { + PERROR("kernctl_get_sequence_number"); + goto error; + } } index->packet_seq_num = htobe64(index->packet_seq_num); @@ -1145,8 +1207,14 @@ int update_stream_stats(struct lttng_consumer_stream *stream) ret = kernctl_get_sequence_number(stream->wait_fd, &seq); if (ret < 0) { - PERROR("kernctl_get_sequence_number"); - goto end; + if (ret == -ENOTTY) { + /* Command not implemented by lttng-modules. */ + seq = -1ULL; + ret = 0; + } else { + PERROR("kernctl_get_sequence_number"); + goto end; + } } /* @@ -1205,6 +1273,14 @@ int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream ret = kernctl_get_metadata_version(infd, &cur_version); if (ret < 0) { + if (ret == -ENOTTY) { + /* + * LTTng-modules does not implement this + * command. + */ + ret = 0; + goto end; + } ERR("Failed to get the metadata version"); goto end; } @@ -1289,12 +1365,34 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } ret = update_stream_stats(stream); if (ret < 0) { + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto end; + } goto end; } } else { write_index = 0; ret = metadata_stream_check_version(infd, stream); if (ret < 0) { + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto end; + } goto end; } } @@ -1449,14 +1547,18 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) stream->tracefile_size_current = 0; if (!stream->metadata_flag) { - ret = index_create_file(stream->chan->pathname, + struct lttng_index_file *index_file; + + index_file = lttng_index_file_create(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!index_file) { goto error; } - stream->index_fd = ret; + assert(!stream->index_file); + stream->index_file = index_file; } }