return stream;
}
-static void steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
stream = find_stream(key, ht);
if (stream) {
- stream->key = -1ULL;
+ stream->key = (uint64_t) -1ULL;
/*
* We don't want the lookup to match, but we still need
* to iterate on this stream when iterating over the hash table. Just
* change the node key.
*/
- stream->node.key = -1ULL;
+ stream->node.key = (uint64_t) -1ULL;
}
rcu_read_unlock();
}
goto end;
}
+ /* Empty no monitor streams list. */
+ if (!channel->monitor) {
+ struct lttng_consumer_stream *stream, *stmp;
+
+ /*
+ * So, these streams are not visible to any data thread. This is why we
+ * close and free them because they were never added to any data
+ * structure apart from this one.
+ */
+ cds_list_for_each_entry_safe(stream, stmp,
+ &channel->stream_no_monitor_list.head, no_monitor_node) {
+ cds_list_del(&stream->no_monitor_node);
+ /* Close everything in that stream. */
+ consumer_stream_close(stream);
+ /* Free the ressource. */
+ consumer_stream_free(stream);
+ }
+ }
+
rcu_read_lock();
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
* It's atomically set without having the stream mutex locked which is fine
* because we handle the write/read race with a pipe wakeup for each thread.
*/
-static void update_endpoint_status_by_netidx(int net_seq_idx,
+static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
enum consumer_endpoint_status status)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+ DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
rcu_read_lock();
static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
struct lttng_consumer_local_data *ctx)
{
- int netidx;
+ uint64_t netidx;
assert(relayd);
/* Init session id node with the stream session id */
lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
- DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64 " relayd_id %" PRIu64 ", session_id %" PRIu64,
- stream->name, stream->key, channel_key, stream->net_seq_idx, stream->session_id);
+ DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
+ " relayd_id %" PRIu64 ", session_id %" PRIu64,
+ stream->name, stream->key, channel_key,
+ stream->net_seq_idx, stream->session_id);
rcu_read_unlock();
return stream;
* Allocate and return a consumer relayd socket.
*/
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- int net_seq_idx)
+ uint64_t net_seq_idx)
{
struct consumer_relayd_sock_pair *obj = NULL;
- /* Negative net sequence index is a failure */
- if (net_seq_idx < 0) {
+ /* net sequence index of -1 is a failure */
+ if (net_seq_idx == (uint64_t) -1ULL) {
goto error;
}
uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor)
{
struct lttng_consumer_channel *channel;
channel->key = key;
channel->refcount = 0;
channel->session_id = session_id;
+ channel->session_id_per_pid = session_id_per_pid;
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
+ channel->monitor = monitor;
+
+ /*
+ * In monitor mode, the streams associated with the channel will be put in
+ * a special list ONLY owned by this channel. So, the refcount is set to 1
+ * here meaning that the channel itself has streams that are referenced.
+ *
+ * On a channel deletion, once the channel is no longer visible, the
+ * refcount is decremented and checked for a zero value to delete it. With
+ * streams in no monitor mode, it will now be safe to destroy the channel.
+ */
+ if (!channel->monitor) {
+ channel->refcount = 1;
+ }
strncpy(channel->pathname, pathname, sizeof(channel->pathname));
channel->pathname[sizeof(channel->pathname) - 1] = '\0';
channel->wait_fd = -1;
CDS_INIT_LIST_HEAD(&channel->streams.head);
+ CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
DBG("Allocated channel (key %" PRIu64 ")", channel->key)
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
PERROR("munmap metadata stream");
}
}
-
if (stream->wait_fd >= 0) {
ret = close(stream->wait_fd);
if (ret < 0) {
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
assert(ctx);
assert(relayd_sock);
- DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
+ assert(sock_type == LTTNG_STREAM_CONTROL);
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
* we can notify the session daemon and continue our work without
* killing everything.
*/
+ } else {
+ /*
+ * relayd key should never be found for control socket.
+ */
+ assert(sock_type != LTTNG_STREAM_CONTROL);
}
/* First send a status message before receiving the fds. */
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->control_sock.sock);
- /* Immediately try to close the created socket if valid. */
- if (relayd->control_sock.sock.fd >= 0) {
- if (close(relayd->control_sock.sock.fd)) {
- PERROR("close relayd control socket");
- }
- }
/* Handle create_sock error. */
if (ret < 0) {
goto error;
}
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->control_sock.sock.fd)) {
+ PERROR("close");
+ }
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
+ fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->data_sock.sock);
- /* Immediately try to close the created socket if valid. */
- if (relayd->data_sock.sock.fd >= 0) {
- if (close(relayd->data_sock.sock.fd)) {
- PERROR("close relayd data socket");
- }
- }
/* Handle create_sock error. */
if (ret < 0) {
goto error;
}
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->data_sock.sock.fd)) {
+ PERROR("close");
+ }
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
+ fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;