/*
* Destroy and free relayd socket pair object.
- *
- * This function MUST be called with the consumer_data lock acquired.
*/
-static void destroy_relayd(struct consumer_relayd_sock_pair *relayd)
+void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd)
{
int ret;
struct lttng_ht_iter iter;
{
int ret;
struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream, *stmp;
DBG("Consumer delete channel key %" PRIu64, channel->key);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ free(stream);
+ }
lttng_ustconsumer_del_channel(channel);
break;
default:
cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
node.node) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
rcu_read_unlock();
* Delete the relayd from the relayd hash table, close the sockets and free
* the object in a RCU call.
*/
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
/* Set inactive endpoint to all streams */
update_endpoint_status_by_netidx(netidx, CONSUMER_ENDPOINT_INACTIVE);
/* Destroy the relayd if refcount is 0 */
if (uatomic_read(&relayd->refcount) == 0) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
}
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
}
rcu_read_unlock();
const char *channel_name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
uint64_t session_id,
int cpu,
int *alloc_ret,
const char *name,
uid_t uid,
gid_t gid,
- int relayd_id,
+ uint64_t relayd_id,
enum lttng_event_output output,
uint64_t tracefile_size,
uint64_t tracefile_count)
/*
* Add a channel to the global list protected by a mutex.
+ *
+ * On success 0 is returned else a negative value.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
/* Channel already exist. Ignore the insertion */
ERR("Consumer add channel key %" PRIu64 " already exists!",
channel->key);
- ret = LTTNG_ERR_KERN_CHAN_EXIST;
+ ret = -EEXIST;
goto end;
}
/* Both conditions are met, we destroy the relayd. */
if (uatomic_read(&relayd->refcount) == 0 &&
uatomic_read(&relayd->destroy_flag)) {
- destroy_relayd(relayd);
+ consumer_destroy_relayd(relayd);
}
}
rcu_read_unlock();
/* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
- sizeof(struct lttng_consumer_stream));
+ sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
break;
case CONSUMER_CHANNEL_DEL:
{
+ struct lttng_consumer_stream *stream, *stmp;
+
rcu_read_lock();
chan = consumer_find_channel(key);
if (!chan) {
assert(ret == 0);
consumer_close_channel_streams(chan);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ lttng_ustconsumer_del_stream(stream);
+ uatomic_sub(&stream->chan->refcount, 1);
+ assert(&chan->refcount);
+ free(stream);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ }
+
/*
* Release our own refcount. Force channel deletion even if
* streams were not initialized.
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
+ assert(cds_list_empty(&chan->streams.head));
consumer_close_channel_streams(chan);
/* Release our own refcount */
goto end;
}
- ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
-
/* prepare the FDs to poll : to client socket and the should_quit pipe */
consumer_sockpoll[0].fd = ctx->consumer_should_quit[0];
consumer_sockpoll[0].events = POLLIN | POLLPRI;
WARN("On accept");
goto end;
}
- ret = fcntl(sock, F_SETFL, O_NONBLOCK);
- if (ret < 0) {
- PERROR("fcntl O_NONBLOCK");
- goto end;
- }
/*
* Setup metadata socket which is the second socket connection on the
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
- /* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
- /* Somehow, the session daemon is not responding anymore. */
- goto error;
- }
-
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
- ret = -1;
- goto error;
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ ret = -ENOMEM;
+ } else {
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
- relayd_created = 1;
+
+ /*
+ * This code path MUST continue to the consumer send status message to
+ * we can notify the session daemon and continue our work without
+ * killing everything.
+ */
+ }
+
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0 || ret_code != LTTNG_OK) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
}
/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
goto error;
}
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+ ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
- goto error_close;
+
+ /*
+ * Failing to receive FDs might indicate a major problem such as
+ * reaching a fd limit during the receive where the kernel returns a
+ * MSG_CTRUNC and fails to cleanup the fd in the queue. Any case, we
+ * don't take any chances and stop everything.
+ *
+ * XXX: Feature request #558 will fix that and avoid this possible
+ * issue when reaching the fd limit.
+ */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
+
+ /*
+ * This code path MUST continue to the consumer send status message so
+ * we can send the error to the thread expecting a reply. The above
+ * call will make everything stop.
+ */
}
/* We have the fds without error. Send status back. */
ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0) {
+ if (ret < 0 || ret_code != LTTNG_OK) {
/* Somehow, the session daemon is not responding anymore. */
goto error;
}
}
}
-error_close:
if (relayd_created) {
free(relayd);
}