.type = LTTNG_CONSUMER_UNKNOWN,
};
-/* timeout parameter, to control the polling thread grace period. */
-int consumer_poll_timeout = -1;
-
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
* Also updated by the signal handler (consumer_should_exit()). Read by the
* polling threads.
*/
-volatile int consumer_quit = 0;
+volatile int consumer_quit;
+
+/*
+ * The following two hash tables are visible by all threads which are separated
+ * in different source files.
+ *
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht;
+struct lttng_ht *data_ht;
+
+/*
+ * Notify a thread pipe to poll back again. This usually means that some global
+ * state has changed so we just send back the thread in a poll wait call.
+ */
+static void notify_thread_pipe(int wpipe)
+{
+ int ret;
+
+ do {
+ struct lttng_consumer_stream *null_stream = NULL;
+
+ ret = write(wpipe, &null_stream, sizeof(null_stream));
+ } while (ret < 0 && errno == EINTR);
+}
/*
* Find a stream. The consumer_data.lock must be locked during this
return stream;
}
-static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+void consumer_steal_stream_key(int key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
free(stream);
}
-static
-void consumer_free_metadata_stream(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, waitfd_node);
-
- free(stream);
-}
-
/*
* RCU protected relayd socket pair free.
*/
struct consumer_relayd_sock_pair *relayd =
caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ /*
+ * Close all sockets. This is done in the call RCU since we don't want the
+ * socket fds to be reassigned thus potentially creating bad state of the
+ * relayd object.
+ *
+ * We do not have to lock the control socket mutex here since at this stage
+ * there is no one referencing to this relayd object.
+ */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+
free(relayd);
}
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
- /* We assume the relayd was already destroyed */
+ /* We assume the relayd is being or is destroyed */
return;
}
- /* Close all sockets */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- (void) relayd_close(&relayd->control_sock);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- (void) relayd_close(&relayd->data_sock);
-
/* RCU free() call */
call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
}
+/*
+ * Update the end point status of all streams having the given network sequence
+ * index (relayd index).
+ *
+ * It's atomically set without having the stream mutex locked which is fine
+ * because we handle the write/read race with a pipe wakeup for each thread.
+ */
+static void update_endpoint_status_by_netidx(int net_seq_idx,
+ enum consumer_endpoint_status status)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+
+ rcu_read_lock();
+
+ /* Let's begin with metadata */
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ if (stream->net_seq_idx == net_seq_idx) {
+ uatomic_set(&stream->endpoint_status, status);
+ DBG("Delete flag set to metadata stream %d", stream->wait_fd);
+ }
+ }
+
+ /* Follow up by the data streams */
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ if (stream->net_seq_idx == net_seq_idx) {
+ uatomic_set(&stream->endpoint_status, status);
+ DBG("Delete flag set to data stream %d", stream->wait_fd);
+ }
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Cleanup a relayd object by flagging every associated streams for deletion,
+ * destroying the object meaning removing it from the relayd hash table,
+ * closing the sockets and freeing the memory in a RCU call.
+ *
+ * If a local data context is available, notify the threads that the streams'
+ * state have changed.
+ */
+static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
+ struct lttng_consumer_local_data *ctx)
+{
+ int netidx;
+
+ assert(relayd);
+
+ DBG("Cleaning up relayd sockets");
+
+ /* Save the net sequence index before destroying the object */
+ netidx = relayd->net_seq_idx;
+
+ /*
+ * Delete the relayd from the relayd hash table, close the sockets and free
+ * the object in a RCU call.
+ */
+ destroy_relayd(relayd);
+
+ /* Set inactive endpoint to all streams */
+ update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
+
+ /*
+ * With a local data context, notify the threads that the streams' state
+ * have changed. The write() action on the pipe acts as an "implicit"
+ * memory barrier ordering the updates of the end point status from the
+ * read of this status which happens AFTER receiving this notify.
+ */
+ if (ctx) {
+ notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ notify_thread_pipe(ctx->consumer_metadata_pipe[1]);
+ }
+}
+
/*
* Flag a relayd socket pair for destruction. Destroy it if the refcount
* reaches zero.
assert(stream);
+ DBG("Consumer del stream %d", stream->wait_fd);
+
if (ht == NULL) {
/* Means the stream was allocated but not successfully added */
goto free_stream;
}
+ pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
ret = lttng_ht_del(ht, &iter);
assert(!ret);
+ /* Remove node session id from the consumer_data stream ht */
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
rcu_read_unlock();
- if (consumer_data.stream_count <= 0) {
- goto end;
- }
+ assert(consumer_data.stream_count > 0);
consumer_data.stream_count--;
- if (!stream) {
- goto end;
- }
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&stream->lock);
if (free_chan) {
consumer_del_channel(free_chan);
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret)
{
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = net_index;
stream->metadata_flag = metadata_flag;
+ stream->session_id = session_id;
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
- lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
- lttng_ht_node_init_ulong(&stream->node, stream->key);
+ pthread_mutex_init(&stream->lock, NULL);
+
+ /*
+ * Index differently the metadata node because the thread is using an
+ * internal hash table to match streams in the metadata_ht to the epoll set
+ * file descriptor.
+ */
+ if (metadata_flag) {
+ lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
+ } else {
+ lttng_ht_node_init_ulong(&stream->node, stream->key);
+ }
+
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+
+ /*
+ * The cpu number is needed before using any ustctl_* actions. Ignored for
+ * the kernel so the value does not matter.
+ */
+ pthread_mutex_lock(&consumer_data.lock);
+ stream->cpu = stream->chan->cpucount++;
+ pthread_mutex_unlock(&consumer_data.lock);
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
- stream->shm_fd, stream->wait_fd,
+ " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
+ stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
- stream->net_seq_idx);
+ stream->net_seq_idx, stream->session_id);
return stream;
error:
/*
* Add a stream to the global list protected by a mutex.
*/
-int consumer_add_stream(struct lttng_consumer_stream *stream)
+static int consumer_add_stream(struct lttng_consumer_stream *stream,
+ struct lttng_ht *ht)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
+ assert(ht);
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- stream->cpu = stream->chan->cpucount++;
- ret = lttng_ustconsumer_add_stream(stream);
- if (ret) {
- ret = -EINVAL;
- goto error;
- }
+ /* Steal stream identifier to avoid having streams with the same key */
+ consumer_steal_stream_key(stream->key, ht);
- /* Steal stream identifier only for UST */
- consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- ret = -ENOSYS;
- goto error;
- }
+ lttng_ht_add_unique_ulong(ht, &stream->node);
- lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
consumer_data.stream_count++;
consumer_data.need_update = 1;
-error:
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
return outfd;
}
-/*
- * Update a stream according to what we just received.
- */
-void consumer_change_stream_state(int stream_key,
- enum lttng_consumer_stream_state state)
-{
- struct lttng_consumer_stream *stream;
-
- pthread_mutex_lock(&consumer_data.lock);
- stream = consumer_find_stream(stream_key, consumer_data.stream_ht);
- if (stream) {
- stream->state = state;
- }
- consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
-}
-
static
void consumer_free_channel(struct rcu_head *head)
{
*
* Returns the number of fds in the structures.
*/
-int consumer_update_poll_array(
+static int consumer_update_poll_array(
struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_stream)
+ struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
{
int i = 0;
struct lttng_ht_iter iter;
DBG("Updating poll fd array");
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
- node.node) {
- if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ */
+ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
+ stream->endpoint_status) {
continue;
}
DBG("Active FD %d", stream->wait_fd);
rcu_read_unlock();
/*
- * Insert the consumer_poll_pipe at the end of the array and don't
+ * Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+ (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
rcu_read_lock();
- /*
- * close all outfd. Called when there are no more threads running (after
- * joining on the threads), no need to protect list iteration with mutex.
- */
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
- node) {
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
- consumer_del_stream(stream, consumer_data.stream_ht);
- }
-
cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
node) {
struct lttng_consumer_channel *channel =
rcu_read_unlock();
- lttng_ht_destroy(consumer_data.stream_ht);
lttng_ht_destroy(consumer_data.channel_ht);
}
if (ret < 0) {
PERROR("write consumer quit");
}
+
+ DBG("Consumer flag that it should quit");
}
void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_poll_pipe);
+ ret = pipe(ctx->consumer_data_pipe);
if (ret < 0) {
PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
for (i = 0; i < 2; i++) {
int err;
- err = close(ctx->consumer_poll_pipe[i]);
+ err = close(ctx->consumer_data_pipe[i]);
if (err) {
PERROR("close");
}
{
int ret;
+ DBG("Consumer destroying it. Closing everything.");
+
ret = close(ctx->consumer_error_socket);
if (ret) {
PERROR("close");
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[0]);
+ ret = close(ctx->consumer_data_pipe[0]);
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[1]);
+ ret = close(ctx->consumer_data_pipe[1]);
if (ret) {
PERROR("close");
}
/* Default is on the disk */
int outfd = stream->out_fd;
struct consumer_relayd_sock_pair *relayd = NULL;
+ unsigned int relayd_hang_up = 0;
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
if (ret < 0) {
written = ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EPIPE || ret == -EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
goto end;
}
}
+ } else {
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EPIPE || ret == -EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ /* Else, use the default set before which is the filesystem. */
}
- /* Else, use the default set before which is the filesystem. */
} else {
/* No streaming, we have to set the len with the full padding */
len += padding;
if (written == 0) {
written = ret;
}
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EPIPE || errno == EINVAL) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
goto end;
} else if (ret > len) {
PERROR("Error in file write (ret %zd > len %lu)", ret, len);
}
lttng_consumer_sync_trace_file(stream, orig_offset);
+write_error:
+ /*
+ * This is a special case that the relayd has closed its socket. Let's
+ * cleanup the relayd object and all associated streams.
+ */
+ if (relayd && relayd_hang_up) {
+ cleanup_relayd(relayd, ctx);
+ }
+
end:
/* Unlock only if ctrl socket used */
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
+ pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
int outfd = stream->out_fd;
struct consumer_relayd_sock_pair *relayd = NULL;
int *splice_pipe;
+ unsigned int relayd_hang_up = 0;
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
/* RCU lock for the relayd pointer */
rcu_read_lock();
+ pthread_mutex_lock(&stream->lock);
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
padding);
if (ret < 0) {
written = ret;
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EBADF) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
goto end;
}
/* Use the returned socket. */
outfd = ret;
} else {
- ERR("Remote relayd disconnected. Stopping");
+ /* Socket operation failed. We consider the relayd dead */
+ if (ret == -EBADF) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
goto end;
}
} else {
if (written == 0) {
written = ret_splice;
}
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EBADF || errno == EPIPE) {
+ WARN("Remote relayd disconnected. Stopping");
+ relayd_hang_up = 1;
+ goto write_error;
+ }
ret = errno;
goto splice_error;
} else if (ret_splice > len) {
goto end;
+write_error:
+ /*
+ * This is a special case that the relayd has closed its socket. Let's
+ * cleanup the relayd object and all associated streams.
+ */
+ if (relayd && relayd_hang_up) {
+ cleanup_relayd(relayd, ctx);
+ /* Skip splice error so the consumer does not fail */
+ goto end;
+ }
+
splice_error:
/* send the appropriate error description to sessiond */
switch (ret) {
- case EBADF:
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EBADF);
- break;
case EINVAL:
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_SPLICE_EINVAL);
break;
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
+ pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
}
}
+/*
+ * Iterate over all streams of the hashtable and free them properly.
+ *
+ * WARNING: *MUST* be used with data stream only.
+ */
+static void destroy_data_stream_ht(struct lttng_ht *ht)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ if (ht == NULL) {
+ return;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+
+ call_rcu(&stream->node.head, consumer_free_stream);
+ }
+ rcu_read_unlock();
+
+ lttng_ht_destroy(ht);
+}
+
/*
* Iterate over all streams of the hashtable and free them properly.
*
}
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, waitfd_node.node) {
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
ret = lttng_ht_del(ht, &iter);
assert(!ret);
- call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream);
+ call_rcu(&stream->node.head, consumer_free_stream);
}
rcu_read_unlock();
goto free_stream;
}
- rcu_read_lock();
- iter.iter.node = &stream->waitfd_node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
- rcu_read_unlock();
+ pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
goto end;
}
+ rcu_read_lock();
+ iter.iter.node = &stream->node.node;
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+
+ /* Remove node session id from the consumer_data stream ht */
+ iter.iter.node = &stream->node_session_id.node;
+ ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
end:
pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&stream->lock);
if (free_chan) {
consumer_del_channel(free_chan);
}
free_stream:
- call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream);
+ call_rcu(&stream->node.head, consumer_free_stream);
}
/*
pthread_mutex_lock(&consumer_data.lock);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_add_stream(stream);
- if (ret) {
- ret = -EINVAL;
- goto error;
- }
-
- /* Steal stream identifier only for UST */
- consumer_steal_stream_key(stream->wait_fd, ht);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- ret = -ENOSYS;
- goto error;
- }
-
/*
* From here, refcounts are updated so be _careful_ when returning an error
* after this point.
uatomic_dec(&stream->chan->nb_init_streams);
}
- lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
+ /* Steal stream identifier to avoid having streams with the same key */
+ consumer_steal_stream_key(stream->key, ht);
+
+ lttng_ht_add_unique_ulong(ht, &stream->node);
+
+ /*
+ * Add stream to the stream_list_ht of the consumer data. No need to steal
+ * the key since the HT does not use it and we allow to add redundant keys
+ * into this table.
+ */
+ lttng_ht_add_ulong(consumer_data.stream_list_ht, &stream->node_session_id);
+
rcu_read_unlock();
-error:
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
+/*
+ * Delete data stream that are flagged for deletion (endpoint_status).
+ */
+static void validate_endpoint_status_data_stream(void)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Consumer delete flagged data stream");
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
+ continue;
+ }
+ /* Delete it right now */
+ consumer_del_stream(stream, data_ht);
+ }
+ rcu_read_unlock();
+}
+
+/*
+ * Delete metadata stream that are flagged for deletion (endpoint_status).
+ */
+static void validate_endpoint_status_metadata_stream(
+ struct lttng_poll_event *pollset)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+
+ DBG("Consumer delete flagged metadata stream");
+
+ assert(pollset);
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (!stream->endpoint_status) {
+ continue;
+ }
+ /*
+ * Remove from pollset so the metadata thread can continue without
+ * blocking on a deleted stream.
+ */
+ lttng_poll_del(pollset, stream->wait_fd);
+
+ /* Delete it right now */
+ consumer_del_metadata_stream(stream, metadata_ht);
+ }
+ rcu_read_unlock();
+}
+
/*
* Thread polls on metadata file descriptor and write them on disk or on the
* network.
*/
-void *lttng_consumer_thread_poll_metadata(void *data)
+void *consumer_thread_metadata_poll(void *data)
{
int ret, i, pollfd;
uint32_t revents, nb_fd;
struct lttng_consumer_stream *stream = NULL;
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
- struct lttng_ht *metadata_ht = NULL;
struct lttng_poll_event events;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
DBG("Thread metadata poll started");
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- if (metadata_ht == NULL) {
- goto end;
- }
-
/* Size is set to 1 for the consumer_metadata pipe */
ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
if (ret < 0) {
continue;
}
+ /* A NULL stream means that the state has changed. */
+ if (stream == NULL) {
+ /* Check for deleted streams. */
+ validate_endpoint_status_metadata_stream(&events);
+ continue;
+ }
+
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
assert(node);
stream = caa_container_of(node, struct lttng_consumer_stream,
- waitfd_node);
+ node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
len = ctx->on_buffer_ready(stream, ctx);
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
- rcu_read_unlock();
- goto end;
+ /* Clean up stream from consumer and free it. */
+ lttng_poll_del(&events, stream->wait_fd);
+ consumer_del_metadata_stream(stream, metadata_ht);
} else if (len > 0) {
stream->data_read = 1;
}
* This thread polls the fds in the set to consume the data and write
* it to tracefile if necessary.
*/
-void *lttng_consumer_thread_poll_fds(void *data)
+void *consumer_thread_data_poll(void *data)
{
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
/* local view of the streams */
- struct lttng_consumer_stream **local_stream = NULL;
+ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
- pthread_t metadata_thread;
- void *status;
rcu_register_thread();
- /* Start metadata polling thread */
- ret = pthread_create(&metadata_thread, NULL,
- lttng_consumer_thread_poll_metadata, (void *) ctx);
- if (ret < 0) {
- PERROR("pthread_create metadata thread");
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (data_ht == NULL) {
goto end;
}
local_stream = NULL;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
goto end;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- ret = consumer_update_poll_array(ctx, &pollfd, local_stream);
+ ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+ data_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
- num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+ num_rdy = poll(pollfd, nb_fd + 1, -1);
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
}
/*
- * If the consumer_poll_pipe triggered poll go directly to the
+ * If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
- char tmp;
- DBG("consumer_poll_pipe wake up");
+ DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+ pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
+ sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+
+ /*
+ * If the stream is NULL, just ignore it. It's also possible that
+ * the sessiond poll thread changed the consumer_quit state and is
+ * waking us up to test it.
+ */
+ if (new_stream == NULL) {
+ validate_endpoint_status_data_stream();
+ continue;
+ }
+
+ ret = consumer_add_stream(new_stream, data_ht);
+ if (ret) {
+ ERR("Consumer add stream %d failed. Continuing",
+ new_stream->key);
+ /*
+ * At this point, if the add_stream fails, it is not in the
+ * hash table thus passing the NULL value here.
+ */
+ consumer_del_stream(new_stream, NULL);
+ }
+
+ /* Continue to update the local streams and handle prio ones */
continue;
}
/* Take care of high priority channels first. */
for (i = 0; i < nb_fd; i++) {
+ if (local_stream[i] == NULL) {
+ continue;
+ }
if (pollfd[i].revents & POLLPRI) {
DBG("Urgent read on fd %d", pollfd[i].fd);
high_prio = 1;
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
- goto end;
+ /* Clean the stream and free it. */
+ consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
} else if (len > 0) {
local_stream[i]->data_read = 1;
}
/* Take care of low priority channels. */
for (i = 0; i < nb_fd; i++) {
+ if (local_stream[i] == NULL) {
+ continue;
+ }
if ((pollfd[i].revents & POLLIN) ||
local_stream[i]->hangup_flush_done) {
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
- goto end;
+ /* Clean the stream and free it. */
+ consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
} else if (len > 0) {
local_stream[i]->data_read = 1;
}
/* Handle hangup and errors */
for (i = 0; i < nb_fd; i++) {
+ if (local_stream[i] == NULL) {
+ continue;
+ }
if (!local_stream[i]->hangup_flush_done
&& (pollfd[i].revents & (POLLHUP | POLLERR | POLLNVAL))
&& (consumer_data.type == LTTNG_CONSUMER32_UST
|| consumer_data.type == LTTNG_CONSUMER64_UST)) {
DBG("fd %d is hup|err|nval. Attempting flush and read.",
- pollfd[i].fd);
+ pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
local_stream[i]->data_read = 1;
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- consumer_del_stream(local_stream[i],
- consumer_data.stream_ht);
+ consumer_del_stream(local_stream[i], data_ht);
+ local_stream[i] = NULL;
num_hup++;
}
}
- local_stream[i]->data_read = 0;
+ if (local_stream[i] != NULL) {
+ local_stream[i]->data_read = 0;
+ }
}
}
end:
/*
* Close the write side of the pipe so epoll_wait() in
- * lttng_consumer_thread_poll_metadata can catch it. The thread is
- * monitoring the read side of the pipe. If we close them both, epoll_wait
- * strangely does not return and could create a endless wait period if the
- * pipe is the only tracked fd in the poll set. The thread will take care
- * of closing the read side.
+ * consumer_thread_metadata_poll can catch it. The thread is monitoring the
+ * read side of the pipe. If we close them both, epoll_wait strangely does
+ * not return and could create a endless wait period if the pipe is the
+ * only tracked fd in the poll set. The thread will take care of closing
+ * the read side.
*/
close(ctx->consumer_metadata_pipe[1]);
- if (ret) {
- ret = pthread_join(metadata_thread, &status);
- if (ret < 0) {
- PERROR("pthread_join metadata thread");
- }
+
+ if (data_ht) {
+ destroy_data_stream_ht(data_ht);
}
rcu_unregister_thread();
* This thread listens on the consumerd socket and receives the file
* descriptors from the session daemon.
*/
-void *lttng_consumer_thread_receive_fds(void *data)
+void *consumer_thread_sessiond_poll(void *data)
{
int sock, client_socket, ret;
/*
consumer_quit = 1;
/*
- * 2s of grace period, if no polling events occur during
- * this period, the polling thread will exit even if there
- * are still open FDs (should not happen, but safety mechanism).
+ * Notify the data poll thread to poll back again and test the
+ * consumer_quit state that we just set so to quit gracefully.
*/
- consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
+ notify_thread_pipe(ctx->consumer_data_pipe[1]);
- /*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
- * 1) empty the pipe (reads).
- * 2) perform update operation.
- * 3) wait on the pipe (poll).
- */
- do {
- ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret < 0 && errno == EINTR);
rcu_unregister_thread();
return NULL;
}
*/
void lttng_consumer_init(void)
{
- consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ assert(metadata_ht);
+ data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ assert(data_ht);
}
/*
error:
return ret;
}
+
+/*
+ * Try to lock the stream mutex.
+ *
+ * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
+ */
+static int stream_try_lock(struct lttng_consumer_stream *stream)
+{
+ int ret;
+
+ assert(stream);
+
+ /*
+ * Try to lock the stream mutex. On failure, we know that the stream is
+ * being used else where hence there is data still being extracted.
+ */
+ ret = pthread_mutex_trylock(&stream->lock);
+ if (ret) {
+ /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
+ ret = 0;
+ goto end;
+ }
+
+ ret = 1;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if for a given session id there is still data needed to be extract
+ * from the buffers.
+ *
+ * Return 1 if data is pending or else 0 meaning ready to be read.
+ */
+int consumer_data_pending(uint64_t id)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+ struct lttng_ht *ht;
+ struct lttng_consumer_stream *stream;
+ struct consumer_relayd_sock_pair *relayd;
+ int (*data_pending)(struct lttng_consumer_stream *);
+
+ DBG("Consumer data pending command on session id %" PRIu64, id);
+
+ rcu_read_lock();
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ data_pending = lttng_kconsumer_data_pending;
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ data_pending = lttng_ustconsumer_data_pending;
+ break;
+ default:
+ ERR("Unknown consumer data type");
+ assert(0);
+ }
+
+ /* Ease our life a bit */
+ ht = consumer_data.stream_list_ht;
+
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
+ ht->match_fct, (void *)((unsigned long) id),
+ &iter.iter, stream, node_session_id.node) {
+ /* If this call fails, the stream is being used hence data pending. */
+ ret = stream_try_lock(stream);
+ if (!ret) {
+ goto data_not_pending;
+ }
+
+ /*
+ * A removed node from the hash table indicates that the stream has
+ * been deleted thus having a guarantee that the buffers are closed
+ * on the consumer side. However, data can still be transmitted
+ * over the network so don't skip the relayd check.
+ */
+ ret = cds_lfht_is_node_deleted(&stream->node.node);
+ if (!ret) {
+ /* Check the stream if there is data in the buffers. */
+ ret = data_pending(stream);
+ if (ret == 1) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
+ }
+
+ /* Relayd check */
+ if (stream->net_seq_idx != -1) {
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (!relayd) {
+ /*
+ * At this point, if the relayd object is not available for the
+ * given stream, it is because the relayd is being cleaned up
+ * so every stream associated with it (for a session id value)
+ * are or will be marked for deletion hence no data pending.
+ */
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ if (stream->metadata_flag) {
+ ret = relayd_quiescent_control(&relayd->control_sock);
+ } else {
+ ret = relayd_data_pending(&relayd->control_sock,
+ stream->relayd_stream_id, stream->next_net_seq_num);
+ }
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret == 1) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
+ }
+ pthread_mutex_unlock(&stream->lock);
+ }
+
+ /*
+ * Finding _no_ node in the hash table means that the stream(s) have been
+ * removed thus data is guaranteed to be available for analysis from the
+ * trace files. This is *only* true for local consumer and not network
+ * streaming.
+ */
+
+ /* Data is available to be read by a viewer. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 0;
+
+data_not_pending:
+ /* Data is still being extracted from buffers. */
+ pthread_mutex_unlock(&consumer_data.lock);
+ rcu_read_unlock();
+ return 1;
+}