#include <inttypes.h>
#include <signal.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
#include <common/kernel-consumer/kernel-consumer.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
#include "consumer.h"
#include "consumer-stream.h"
(void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
}
+static void notify_health_quit_pipe(int *pipe)
+{
+ ssize_t ret;
+
+ ret = lttng_write(pipe[1], "4", 1);
+ if (ret < 1) {
+ PERROR("write consumer health quit");
+ }
+}
+
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
uint64_t key,
enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
- int ret;
+ ssize_t ret;
memset(&msg, 0, sizeof(msg));
msg.action = action;
msg.chan = chan;
msg.key = key;
- do {
- ret = write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(ctx->consumer_channel_pipe[1], &msg, sizeof(msg));
+ if (ret < sizeof(msg)) {
+ PERROR("notify_channel_pipe write error");
+ }
}
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
- int ret;
+ ssize_t ret;
- do {
- ret = read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
- } while (ret < 0 && errno == EINTR);
- if (ret > 0) {
- *action = msg.action;
- *chan = msg.chan;
- *key = msg.key;
+ ret = lttng_read(ctx->consumer_channel_pipe[0], &msg, sizeof(msg));
+ if (ret < sizeof(msg)) {
+ ret = -1;
+ goto error;
}
- return ret;
+ *action = msg.action;
+ *chan = msg.chan;
+ *key = msg.key;
+error:
+ return (int) ret;
}
/*
consumer_stream_destroy(stream, NULL);
}
+ if (channel->live_timer_enabled == 1) {
+ consumer_timer_live_stop(channel);
+ }
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
stream->metadata_flag = 1;
/* Metadata is flat out. */
strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+ /* Live rendez-vous point. */
+ pthread_cond_init(&stream->metadata_rdv, NULL);
+ pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
} else {
/* Format stream name to <channel_name>_<cpu_number> */
ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
if (ret < 0) {
goto end;
}
+
uatomic_inc(&relayd->refcount);
stream->sent_to_relayd = 1;
} else {
return ret;
}
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(net_seq_idx != -1ULL);
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_streams_sent(&relayd->control_sock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+ } else {
+ ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+ net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+ DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Find a relayd and close the stream
*/
uint64_t tracefile_size,
uint64_t tracefile_count,
uint64_t session_id_per_pid,
- unsigned int monitor)
+ unsigned int monitor,
+ unsigned int live_timer_interval)
{
struct lttng_consumer_channel *channel;
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
- channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
+ channel->live_timer_interval = live_timer_interval;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
+ switch (output) {
+ case LTTNG_EVENT_SPLICE:
+ channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ assert(0);
+ free(channel);
+ channel = NULL;
+ goto end;
+ }
+
/*
* In monitor mode, the streams associated with the channel will be put in
* a special list ONLY owned by this channel. So, the refcount is set to 1
*/
void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
{
- int ret;
+ ssize_t ret;
+
consumer_quit = 1;
- do {
- ret = write(ctx->consumer_should_quit[1], "4", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(ctx->consumer_should_quit[1], "4", 1);
+ if (ret < 1) {
PERROR("write consumer quit");
}
struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *relayd, unsigned long padding)
{
- int ret;
+ ssize_t ret;
struct lttcomm_relayd_metadata_payload hdr;
hdr.stream_id = htobe64(stream->relayd_stream_id);
hdr.padding_size = htobe32(padding);
- do {
- ret = write(fd, (void *) &hdr, sizeof(hdr));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != sizeof(hdr)) {
+ ret = lttng_write(fd, (void *) &hdr, sizeof(hdr));
+ if (ret < sizeof(hdr)) {
/*
* This error means that the fd's end is closed so ignore the perror
* not to clubber the error output since this can happen in a normal
stream->relayd_stream_id, padding);
end:
- return ret;
+ return (int) ret;
}
/*
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
unsigned long mmap_offset;
void *mmap_base;
}
while (len > 0) {
- do {
- ret = write(outfd, mmap_base + mmap_offset, len);
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(outfd, mmap_base + mmap_offset, len);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
- if (ret < 0) {
+ if (ret < len) {
/*
* This is possible if the fd is closed on the other side (outfd)
* or any write problem. It can be verbose a bit for a normal
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
*/
void *consumer_thread_metadata_poll(void *data)
{
- int ret, i, pollfd;
+ int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
+ health_code_update();
+
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!metadata_ht) {
/* ENOMEM at this point. Better to bail out. */
DBG("Metadata main loop started");
while (1) {
+ health_code_update();
+
/* Only the metadata pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
restart:
DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
DBG("Metadata event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
&stream, sizeof(stream));
- if (pipe_len < 0) {
- ERR("read metadata stream, ret: %zd", pipe_len);
+ if (pipe_len < sizeof(stream)) {
+ PERROR("read metadata stream");
/*
* Continue here to handle the rest of the streams.
*/
/* We just flushed the stream now read it. */
do {
+ health_code_update();
+
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
assert(stream->wait_fd == pollfd);
do {
+ health_code_update();
+
len = ctx->on_buffer_ready(stream, ctx);
/*
* We don't check the return value here since if we get
}
}
+ /* All is OK */
+ err = 0;
error:
end:
DBG("Metadata poll thread exiting");
end_poll:
destroy_stream_ht(metadata_ht);
end_ht:
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i;
+ int num_rdy, num_hup, high_prio, ret, i, err = -1;
struct pollfd *pollfd = NULL;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
+ health_code_update();
+
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (data_ht == NULL) {
/* ENOMEM at this point. Better to bail out. */
}
while (1) {
+ health_code_update();
+
high_prio = 0;
num_hup = 0;
/* No FDs and consumer_quit, consumer_cleanup the thread */
if (nb_fd == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
+ health_poll_entry();
num_rdy = poll(pollfd, nb_fd + 1, -1);
+ health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
DBG("consumer_data_pipe wake up");
pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
&new_stream, sizeof(new_stream));
- if (pipe_readlen < 0) {
- ERR("Consumer data pipe ret %zd", pipe_readlen);
+ if (pipe_readlen < sizeof(new_stream)) {
+ PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
continue;
}
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
/* Handle hangup and errors */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
if (local_stream[i] == NULL) {
continue;
}
}
}
}
+ /* All is OK */
+ err = 0;
end:
DBG("polling thread exiting");
free(pollfd);
destroy_data_stream_ht(data_ht);
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
+
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_channel_poll(void *data)
{
- int ret, i, pollfd;
+ int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
struct lttng_consumer_channel *chan = NULL;
struct lttng_ht_iter iter;
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
+ health_code_update();
+
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!channel_ht) {
/* ENOMEM at this point. Better to bail out. */
DBG("Channel main loop started");
while (1) {
+ health_code_update();
+
/* Only the channel pipe is set */
if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+ err = 0; /* All is OK */
goto end;
}
restart:
DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
+ health_poll_entry();
ret = lttng_poll_wait(&events, -1);
+ health_poll_exit();
DBG("Channel event catched in thread");
if (ret < 0) {
if (errno == EINTR) {
/* From here, the event is a channel wait fd */
for (i = 0; i < nb_fd; i++) {
+ health_code_update();
+
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
send_node) {
+ health_code_update();
+
cds_list_del(&stream->send_node);
lttng_ustconsumer_del_stream(stream);
uatomic_sub(&stream->chan->refcount, 1);
}
}
+ /* All is OK */
+ err = 0;
end:
lttng_poll_clean(&events);
end_poll:
destroy_channel_ht(channel_ht);
end_ht:
DBG("Channel poll thread exiting");
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock = -1, client_socket, ret;
+ int sock = -1, client_socket, ret, err = -1;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
rcu_register_thread();
+ health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
+ health_code_update();
+
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
unlink(ctx->consumer_command_sock_path);
client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
consumer_sockpoll[1].events = POLLIN | POLLPRI;
while (1) {
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_code_update();
+
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
goto end;
}
DBG("Incoming command on sock");
* ERR() here.
*/
DBG("Communication interrupted on command socket");
+ err = 0;
goto end;
}
if (consumer_quit) {
DBG("consumer_thread_receive_fds received quit from signal");
+ err = 0; /* All is OK */
goto end;
}
DBG("received command on sock");
}
+ /* All is OK */
+ err = 0;
+
end:
DBG("Consumer thread sessiond poll exiting");
notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
+ notify_health_quit_pipe(health_quit_pipe);
+
/* Cleaning up possibly open sockets. */
if (sock >= 0) {
ret = close(sock);
}
}
+ if (err) {
+ health_error();
+ ERR("Health error occurred in %s", __func__);
+ }
+ health_unregister(health_consumerd);
+
rcu_unregister_thread();
return NULL;
}
ssize_t ret;
pthread_mutex_lock(&stream->lock);
+ if (stream->metadata_flag) {
+ pthread_mutex_lock(&stream->metadata_rdv_lock);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
}
+ if (stream->metadata_flag) {
+ pthread_cond_broadcast(&stream->metadata_rdv);
+ pthread_mutex_unlock(&stream->metadata_rdv_lock);
+ }
pthread_mutex_unlock(&stream->lock);
return ret;
}
int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+ uint64_t relayd_session_id)
{
int fd = -1, ret = -1, relayd_created = 0;
- enum lttng_error_code ret_code = LTTNG_OK;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
assert(ctx);
}
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, LTTNG_OK);
+ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
- /*
- * Create a session on the relayd and store the returned id. Lock the
- * control socket mutex if the relayd was NOT created before.
- */
- if (!relayd_created) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- }
- ret = relayd_create_session(&relayd->control_sock,
- &relayd->relayd_session_id);
- if (!relayd_created) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- }
- if (ret < 0) {
- /*
- * Close all sockets of a relayd object. It will be freed if it was
- * created at the error code path or else it will be garbage
- * collect.
- */
- (void) relayd_close(&relayd->control_sock);
- (void) relayd_close(&relayd->data_sock);
- ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
- goto error;
- }
+ relayd->relayd_session_id = relayd_session_id;
break;
case LTTNG_STREAM_DATA:
assert(sock >= 0);
if (!channel) {
- msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+ msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
} else {
- msg.ret_code = LTTNG_OK;
+ msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
msg.key = channel->key;
msg.stream_count = channel->streams.count;
}