#include <common/ust-consumer/ust-consumer.h>
#include "consumer.h"
+#include "consumer-stream.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
/*
* Destroy and free relayd socket pair object.
- *
- * This function MUST be called with the consumer_data lock acquired.
*/
-static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret;
struct lttng_ht_iter iter;
{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream, *stmp;
DBG("Consumer delete channel key %" PRIu64, channel->key);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ free(stream);
+ }
lttng_ustconsumer_del_channel(channel);
break;
default:
goto end;
}
+ /* Empty no monitor streams list. */
+ if (!channel->monitor) {
+ struct lttng_consumer_stream *stream, *stmp;
+
+ /*
+ * So, these streams are not visible to any data thread. This is why we
+ * close and free them because they were never added to any data
+ * structure apart from this one.
+ */
+ cds_list_for_each_entry_safe(stream, stmp,
+ &channel->stream_no_monitor_list.head, no_monitor_node) {
+ cds_list_del(&stream->no_monitor_node);
+ /* Close everything in that stream. */
+ consumer_stream_close(stream);
+ /* Free the ressource. */
+ consumer_stream_free(stream);
+ }
+ }
+
rcu_read_lock();
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
node.node) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
rcu_read_unlock();
* Delete the relayd from the relayd hash table, close the sockets and free
* the object in a RCU call.
*/
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
/* Set inactive endpoint to all streams */
update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
/* Destroy the relayd if refcount is 0 */
if (uatomic_read(&relayd->refcount) == 0) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
}
/*
- * Remove a stream from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Completly destroy stream from every visiable data structure and the given
+ * hash table if one.
+ *
+ * One this call returns, the stream object is not longer usable nor visible.
*/
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
- int ret;
- struct lttng_ht_iter iter;
- struct lttng_consumer_channel *free_chan = NULL;
- struct consumer_relayd_sock_pair *relayd;
-
- assert(stream);
-
- DBG("Consumer del stream %d", stream->wait_fd);
-
- if (ht == NULL) {
- /* Means the stream was allocated but not successfully added */
- goto free_stream_rcu;
- }
-
- pthread_mutex_lock(&consumer_data.lock);
- pthread_mutex_lock(&stream->lock);
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (stream->mmap_base != NULL) {
- ret = munmap(stream->mmap_base, stream->mmap_len);
- if (ret != 0) {
- PERROR("munmap");
- }
- }
-
- if (stream->wait_fd >= 0) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_del_stream(stream);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
-
- rcu_read_lock();
- iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- iter.iter.node = &stream->node_channel_id.node;
- ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
- assert(!ret);
-
- iter.iter.node = &stream->node_session_id.node;
- ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
- assert(consumer_data.stream_count > 0);
- consumer_data.stream_count--;
-
- if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- /* Check and cleanup relayd */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_dec(&relayd->refcount);
- assert(uatomic_read(&relayd->refcount) >= 0);
-
- /* Closing streams requires to lock the control socket. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_send_close_stream(&relayd->control_sock,
- stream->relayd_stream_id,
- stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- DBG("Unable to close stream on the relayd. Continuing");
- /*
- * Continue here. There is nothing we can do for the relayd.
- * Chances are that the relayd has closed the socket so we just
- * continue cleaning up.
- */
- }
-
- /* Both conditions are met, we destroy the relayd. */
- if (uatomic_read(&relayd->refcount) == 0 &&
- uatomic_read(&relayd->destroy_flag)) {
- destroy_relayd(relayd);
- }
- }
- rcu_read_unlock();
-
- if (!uatomic_sub_return(&stream->chan->refcount, 1)
- && !uatomic_read(&stream->chan->nb_init_stream_left)) {
- free_chan = stream->chan;
- }
-
-end:
- consumer_data.need_update = 1;
- pthread_mutex_unlock(&stream->lock);
- pthread_mutex_unlock(&consumer_data.lock);
-
- if (free_chan) {
- consumer_del_channel(free_chan);
- }
-
-free_stream_rcu:
- call_rcu(&stream->node.head, free_stream_rcu);
+ consumer_stream_destroy(stream, ht);
}
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
const char *channel_name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
uint64_t session_id,
int cpu,
int *alloc_ret,
/* Init session id node with the stream session id */
lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
- DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64,
- stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id);
+ DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+ " relayd_id %" PRIu64 ", session_id %" PRIu64,
+ stream->name, stream->key, channel_key,
+ stream->net_seq_idx, stream->session_id);
rcu_read_unlock();
return stream;
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor)
{
struct lttng_consumer_channel *channel;
channel->key = key;
channel->refcount = 0;
channel->session_id = session_id;
+ channel->session_id_per_pid = session_id_per_pid;
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
+ channel->monitor = monitor;
+
+ /*
+ * In monitor mode, the streams associated with the channel will be put in
+ * a special list ONLY owned by this channel. So, the refcount is set to 1
+ * here meaning that the channel itself has streams that are referenced.
+ *
+ * On a channel deletion, once the channel is no longer visible, the
+ * refcount is decremented and checked for a zero value to delete it. With
+ * streams in no monitor mode, it will now be safe to destroy the channel.
+ */
+ if (!channel->monitor) {
+ channel->refcount = 1;
+ }
+
+ switch (output) {
+ case LTTNG_EVENT_SPLICE:
+ channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ ERR("Allocate channel output unknown %d", output);
+ free(channel);
+ channel = NULL;
+ goto end;
+ }
strncpy(channel->pathname, pathname, sizeof(channel->pathname));
channel->pathname[sizeof(channel->pathname) - 1] = '\0';
channel->wait_fd = -1;
CDS_INIT_LIST_HEAD(&channel->streams.head);
+ CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
DBG("Allocated channel (key %" PRIu64 ")", channel->key)
/*
* Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
/* Channel already exist. Ignore the insertion */
ERR("Consumer add channel key %" PRIu64 " already exists!",
channel->key);
- ret = LTTNG_ERR_KERN_CHAN_EXIST;
+ ret = -EEXIST;
goto end;
}
PERROR("munmap metadata stream");
}
}
-
if (stream->wait_fd >= 0) {
ret = close(stream->wait_fd);
if (ret < 0) {
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
}
rcu_read_unlock();
}
end:
+ /*
+ * Nullify the stream reference so it is not used after deletion. The
+ * consumer data lock MUST be acquired before being able to check for a
+ * NULL pointer value.
+ */
+ stream->chan->metadata_stream = NULL;
+
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
uatomic_inc(&relayd->refcount);
}
- /* Update channel refcount once added without error(s). */
- uatomic_inc(&stream->chan->refcount);
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
/* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
- sizeof(struct lttng_consumer_stream));
+ sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
+ rcu_read_lock();
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
+ rcu_read_unlock();
/* Add channel to the global poll events list */
lttng_poll_add(&events, chan->wait_fd,
LPOLLIN | LPOLLPRI);
break;
case CONSUMER_CHANNEL_DEL:
{
+ struct lttng_consumer_stream *stream, *stmp;
+
+ rcu_read_lock();
chan = consumer_find_channel(key);
if (!chan) {
+ rcu_read_unlock();
ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
break;
}
lttng_poll_del(&events, chan->wait_fd);
+ iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
consumer_close_channel_streams(chan);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ uatomic_sub(&stream->chan->refcount, 1);
+ assert(&chan->refcount);
+ free(stream);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
/*
* Release our own refcount. Force channel deletion even if
* streams were not initialized.
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
+ rcu_read_unlock();
goto restart;
}
case CONSUMER_CHANNEL_QUIT:
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
+ assert(cds_list_empty(&chan->streams.head));
consumer_close_channel_streams(chan);
/* Release our own refcount */
goto end;
}
- ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
-
/* prepare the FDs to poll : to client socket and the should_quit pipe */
consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
consumer_sockpoll[0].events = POLLIN | POLLPRI;
WARN("On accept");
goto end;
}
- ret = fcntl(sock, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
/*
* Setup metadata socket which is the second socket connection on the
}
}
if (client_socket >= 0) {
- ret = close(sock);
+ ret = close(client_socket);
if (ret < 0) {
PERROR("close client_socket sessiond poll");
}
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
- /* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- /* Somehow, the session daemon is not responding anymore. */
- goto error;
- }
-
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
- ret = -1;
- goto error;
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ ret = -ENOMEM;
+ } else {
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
- relayd_created = 1;
+
+ /*
+ * This code path MUST continue to the consumer send status message to
+ * we can notify the session daemon and continue our work without
+ * killing everything.
+ */
+ }
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0 || ret_code != LTTNG_OK) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
}
/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
goto error;
}
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
- goto error_close;
+
+ /*
+ * Failing to receive FDs might indicate a major problem such as
+ * reaching a fd limit during the receive where the kernel returns a
+ * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+ * don't take any chances and stop everything.
+ *
+ * XXX: Feature request #558 will fix that and avoid this possible
+ * issue when reaching the fd limit.
+ */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+
+ /*
+ * This code path MUST continue to the consumer send status message so
+ * we can send the error to the thread expecting a reply. The above
+ * call will make everything stop.
+ */
}
/* We have the fds without error. Send status back. */
ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ if (ret < 0 || ret_code != LTTNG_OK) {
/* Somehow, the session daemon is not responding anymore. */
goto error;
}
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
+ fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
+ fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;
}
}
-error_close:
if (relayd_created) {
free(relayd);
}