*/
volatile int consumer_quit = 0;
+/*
+ * The following two hash tables are visible by all threads which are separated
+ * in different source files.
+ *
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht = NULL;
+struct lttng_ht *data_ht = NULL;
+
/*
* Find a stream. The consumer_data.lock must be locked during this
* call.
return stream;
}
-static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+void consumer_steal_stream_key(int key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
lttng_ht_node_init_ulong(&stream->node, stream->key);
+ /*
+ * The cpu number is needed before using any ustctl_* actions. Ignored for
+ * the kernel so the value does not matter.
+ */
+ pthread_mutex_lock(&consumer_data.lock);
+ stream->cpu = stream->chan->cpucount++;
+ pthread_mutex_unlock(&consumer_data.lock);
+
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
" out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
stream->shm_fd, stream->wait_fd,
/*
* Add a stream to the global list protected by a mutex.
*/
-int consumer_add_stream(struct lttng_consumer_stream *stream)
+static int consumer_add_stream(struct lttng_consumer_stream *stream,
+ struct lttng_ht *ht)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
+ assert(ht);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- stream->cpu = stream->chan->cpucount++;
- ret = lttng_ustconsumer_add_stream(stream);
- if (ret) {
- ret = -EINVAL;
- goto error;
- }
+ /* Steal stream identifier to avoid having streams with the same key */
+ consumer_steal_stream_key(stream->key, ht);
- /* Steal stream identifier only for UST */
- consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- ret = -ENOSYS;
- goto error;
- }
-
- lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+ lttng_ht_add_unique_ulong(ht, &stream->node);
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
consumer_data.stream_count++;
consumer_data.need_update = 1;
-error:
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
*
* Returns the number of fds in the structures.
*/
-int consumer_update_poll_array(
+static int consumer_update_poll_array(
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_stream)
+ struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
{
int i = 0;
struct lttng_ht_iter iter;
DBG("Updating poll fd array");
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
- node.node) {
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
continue;
}
}
}
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ *
+ * WARNING: *MUST* be used with data stream only.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+
+ call_rcu(&stream->node.head, consumer_free_stream);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
/*
* Iterate over all streams of the hashtable and free them properly.
*
goto free_stream;
}
- rcu_read_lock();
- iter.iter.node = &stream->waitfd_node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
goto end;
}
+ rcu_read_lock();
+ iter.iter.node = &stream->waitfd_node.node;
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
pthread_mutex_lock(&consumer_data.lock);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_add_stream(stream);
- if (ret) {
- ret = -EINVAL;
- goto error;
- }
-
- /* Steal stream identifier only for UST */
- consumer_steal_stream_key(stream->wait_fd, ht);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- ret = -ENOSYS;
- goto error;
- }
-
/*
* From here, refcounts are updated so be _careful_ when returning an error
* after this point.
uatomic_dec(&stream->chan->nb_init_streams);
}
+ /* Steal stream identifier to avoid having streams with the same key */
+ consumer_steal_stream_key(stream->key, ht);
+
lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
rcu_read_unlock();
-error:
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
*/
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
{
int ret, i, pollfd;
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
- struct lttng_ht *metadata_ht = NULL;
struct lttng_poll_event events;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
DBG("Thread metadata poll started");
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- if (metadata_ht == NULL) {
- goto end;
- }
-
/* Size is set to 1 for the consumer_metadata pipe */
ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
if (ret < 0) {
lttng_ustconsumer_on_stream_hangup(stream);
/* We just flushed the stream now read it. */
- len = ctx->on_buffer_ready(stream, ctx);
- /* It's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
- rcu_read_unlock();
- goto end;
- }
+ do {
+ len = ctx->on_buffer_ready(stream, ctx);
+ /*
+ * We don't check the return value here since if we get
+ * a negative len, it means an error occured thus we
+ * simply remove it from the poll set and free the
+ * stream.
+ */
+ } while (len > 0);
}
lttng_poll_del(&events, stream->wait_fd);
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
+ if (len < 0 && len != -EAGAIN && len != -ENODATA) {
rcu_read_unlock();
goto end;
} else if (len > 0) {
* This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
{
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
/* local view of the streams */
- struct lttng_consumer_stream **local_stream = NULL;
+ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
- pthread_t metadata_thread;
- void *status;
rcu_register_thread();
- /* Start metadata polling thread */
- ret = pthread_create(&metadata_thread, NULL,
- lttng_consumer_thread_poll_metadata, (void *) ctx);
- if (ret < 0) {
- PERROR("pthread_create metadata thread");
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (data_ht == NULL) {
goto end;
}
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+ ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+ data_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
- char tmp;
DBG("consumer_poll_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+ pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+ sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+
+ /*
+ * If the stream is NULL, just ignore it. It's also possible that
+ * the sessiond poll thread changed the consumer_quit state and is
+ * waking us up to test it.
+ */
+ if (new_stream == NULL) {
+ continue;
+ }
+
+ ret = consumer_add_stream(new_stream, data_ht);
+ if (ret) {
+ ERR("Consumer add stream %d failed. Continuing",
+ new_stream->key);
+ /*
+ * At this point, if the add_stream fails, it is not in the
+ * hash table thus passing the NULL value here.
+ */
+ consumer_del_stream(new_stream, NULL);
+ }
+
+ /* Continue to update the local streams and handle prio ones */
continue;
}
high_prio = 1;
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
+ if (len < 0 && len != -EAGAIN && len != -ENODATA) {
goto end;
} else if (len > 0) {
local_stream[i]->data_read = 1;
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
- if (len < 0 && len != -EAGAIN) {
+ if (len < 0 && len != -EAGAIN && len != -ENODATA) {
goto end;
} else if (len > 0) {
local_stream[i]->data_read = 1;
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
num_hup++;
}
}
/*
* Close the write side of the pipe so epoll_wait() in
- * lttng_consumer_thread_poll_metadata can catch it. The thread is
- * monitoring the read side of the pipe. If we close them both, epoll_wait
- * strangely does not return and could create a endless wait period if the
- * pipe is the only tracked fd in the poll set. The thread will take care
- * of closing the read side.
+ * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+ * read side of the pipe. If we close them both, epoll_wait strangely does
+ * not return and could create a endless wait period if the pipe is the
+ * only tracked fd in the poll set. The thread will take care of closing
+ * the read side.
*/
close(ctx->consumer_metadata_pipe[1]);
- if (ret) {
- ret = pthread_join(metadata_thread, &status);
- if (ret < 0) {
- PERROR("pthread_join metadata thread");
- }
+
+ if (data_ht) {
+ destroy_data_stream_ht(data_ht);
}
rcu_unregister_thread();
* This thread listens on the consumerd socket and receives the file
* descriptors from the session daemon.
*/
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
{
int sock, client_socket, ret;
/*
consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
/*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
- * 1) empty the pipe (reads).
- * 2) perform update operation.
- * 3) wait on the pipe (poll).
+ * Notify the data poll thread to poll back again and test the
+ * consumer_quit state to quit gracefully.
*/
do {
- ret = write(ctx->consumer_poll_pipe[1], "", 1);
+ struct lttng_consumer_stream *null_stream = NULL;
+
+ ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+ sizeof(null_stream));
} while (ret < 0 && errno == EINTR);
+
rcu_unregister_thread();
return NULL;
}
consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ assert(metadata_ht);
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ assert(data_ht);
}
/*