return stream;
}
-static void steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
stream = find_stream(key, ht);
if (stream) {
- stream->key = -1ULL;
+ stream->key = (uint64_t) -1ULL;
/*
* We don't want the lookup to match, but we still need
* to iterate on this stream when iterating over the hash table. Just
* change the node key.
*/
- stream->node.key = -1ULL;
+ stream->node.key = (uint64_t) -1ULL;
}
rcu_read_unlock();
}
{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream, *stmp;
DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ free(stream);
+ }
lttng_ustconsumer_del_channel(channel);
break;
default:
call_rcu(&channel->node.head, free_channel_rcu);
end:
+ pthread_mutex_unlock(&channel->timer_lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
}
* It's atomically set without having the stream mutex locked which is fine
* because we handle the write/read race with a pipe wakeup for each thread.
*/
-static void update_endpoint_status_by_netidx(int net_seq_idx,
+static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
enum consumer_endpoint_status status)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+ DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
rcu_read_lock();
static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
struct lttng_consumer_local_data *ctx)
{
- int netidx;
+ uint64_t netidx;
assert(relayd);
const char *channel_name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
uint64_t session_id,
int cpu,
int *alloc_ret,
stream->gid = gid;
stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
+ stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
pthread_mutex_init(&stream->lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
* Allocate and return a consumer relayd socket.
*/
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
- int net_seq_idx)
+ uint64_t net_seq_idx)
{
struct consumer_relayd_sock_pair *obj = NULL;
- /* Negative net sequence index is a failure */
- if (net_seq_idx < 0) {
+ /* net sequence index of -1 is a failure */
+ if (net_seq_idx == (uint64_t) -1ULL) {
goto error;
}
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
- uint64_t tracefile_count)
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid)
{
struct lttng_consumer_channel *channel;
channel->key = key;
channel->refcount = 0;
channel->session_id = session_id;
+ channel->session_id_per_pid = session_id_per_pid;
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
+ pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
strncpy(channel->pathname, pathname, sizeof(channel->pathname));
channel->pathname[sizeof(channel->pathname) - 1] = '\0';
/*
* Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
/* Channel already exist. Ignore the insertion */
ERR("Consumer add channel key %" PRIu64 " already exists!",
channel->key);
- ret = LTTNG_ERR_KERN_CHAN_EXIST;
+ ret = -EEXIST;
goto end;
}
end:
rcu_read_unlock();
+ pthread_mutex_unlock(&channel->timer_lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (!ret && channel->wait_fd != -1 &&
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int stream_key, uint32_t state))
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
/*
uatomic_inc(&relayd->refcount);
}
- /* Update channel refcount once added without error(s). */
- uatomic_inc(&stream->chan->refcount);
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
/* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
- sizeof(struct lttng_consumer_stream));
+ sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
+ rcu_read_lock();
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
+ rcu_read_unlock();
/* Add channel to the global poll events list */
lttng_poll_add(&events, chan->wait_fd,
LPOLLIN | LPOLLPRI);
break;
case CONSUMER_CHANNEL_DEL:
{
+ struct lttng_consumer_stream *stream, *stmp;
+
+ rcu_read_lock();
chan = consumer_find_channel(key);
if (!chan) {
+ rcu_read_unlock();
ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
break;
}
lttng_poll_del(&events, chan->wait_fd);
+ iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
consumer_close_channel_streams(chan);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ uatomic_sub(&stream->chan->refcount, 1);
+ assert(&chan->refcount);
+ free(stream);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
/*
* Release our own refcount. Force channel deletion even if
* streams were not initialized.
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
+ rcu_read_unlock();
goto restart;
}
case CONSUMER_CHANNEL_QUIT:
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
+ assert(cds_list_empty(&chan->streams.head));
consumer_close_channel_streams(chan);
/* Release our own refcount */
goto end;
}
- ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
-
/* prepare the FDs to poll : to client socket and the should_quit pipe */
consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
consumer_sockpoll[0].events = POLLIN | POLLPRI;
WARN("On accept");
goto end;
}
- ret = fcntl(sock, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
/*
* Setup metadata socket which is the second socket connection on the
}
}
if (client_socket >= 0) {
- ret = close(sock);
+ ret = close(client_socket);
if (ret < 0) {
PERROR("close client_socket sessiond poll");
}
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
assert(ctx);
assert(relayd_sock);
- DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
-
- /* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- /* Somehow, the session daemon is not responding anymore. */
- goto error;
- }
+ DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
+ assert(sock_type == LTTNG_STREAM_CONTROL);
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
- ret = -1;
+ ret = -ENOMEM;
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
+ } else {
+ relayd->sessiond_session_id = sessiond_id;
+ relayd_created = 1;
}
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
- relayd_created = 1;
+
+ /*
+ * This code path MUST continue to the consumer send status message to
+ * we can notify the session daemon and continue our work without
+ * killing everything.
+ */
+ } else {
+ /*
+ * relayd key should never be found for control socket.
+ */
+ assert(sock_type != LTTNG_STREAM_CONTROL);
+ }
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, LTTNG_OK);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ goto error_nosignal;
}
/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
- goto error;
+ goto error_nosignal;
}
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
- goto error_close;
- }
- /* We have the fds without error. Send status back. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- /* Somehow, the session daemon is not responding anymore. */
+ /*
+ * Failing to receive FDs might indicate a major problem such as
+ * reaching a fd limit during the receive where the kernel returns a
+ * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+ * don't take any chances and stop everything.
+ *
+ * XXX: Feature request #558 will fix that and avoid this possible
+ * issue when reaching the fd limit.
+ */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
goto error;
}
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->control_sock.sock);
- /* Immediately try to close the created socket if valid. */
- if (relayd->control_sock.sock.fd >= 0) {
- if (close(relayd->control_sock.sock.fd)) {
- PERROR("close relayd control socket");
- }
- }
/* Handle create_sock error. */
if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
}
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->control_sock.sock.fd)) {
+ PERROR("close");
+ }
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
+ fd = -1; /* For error path */
/* Assign version values. */
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
*/
(void) relayd_close(&relayd->control_sock);
(void) relayd_close(&relayd->data_sock);
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
goto error;
}
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->data_sock.sock);
- /* Immediately try to close the created socket if valid. */
- if (relayd->data_sock.sock.fd >= 0) {
- if (close(relayd->data_sock.sock.fd)) {
- PERROR("close relayd data socket");
- }
- }
/* Handle create_sock error. */
if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
}
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->data_sock.sock.fd)) {
+ PERROR("close");
+ }
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
+ fd = -1; /* for eventual error paths */
/* Assign version values. */
relayd->data_sock.major = relayd_sock->major;
relayd->data_sock.minor = relayd_sock->minor;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
ret = -1;
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
+ /* We successfully added the socket. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ goto error_nosignal;
+ }
+
/*
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
return 0;
error:
+ if (consumer_send_status_msg(sock, ret_code) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ }
+
+error_nosignal:
/* Close received socket if valid. */
if (fd >= 0) {
if (close(fd)) {
}
}
-error_close:
if (relayd_created) {
free(relayd);
}