pthread_mutex_lock(&consumer_data.lock);
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ /*
+ * Once a stream is added to this list, the buffers were created so
+ * we have a guarantee that this call will succeed.
+ */
+ consumer_stream_destroy(stream, NULL);
+ }
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- /* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
- cds_list_del(&stream->send_node);
- lttng_ustconsumer_del_stream(stream);
- free(stream);
- }
lttng_ustconsumer_del_channel(channel);
break;
default:
goto end;
}
- /* Empty no monitor streams list. */
- if (!channel->monitor) {
- struct lttng_consumer_stream *stream, *stmp;
-
- /*
- * So, these streams are not visible to any data thread. This is why we
- * close and free them because they were never added to any data
- * structure apart from this one.
- */
- cds_list_for_each_entry_safe(stream, stmp,
- &channel->stream_no_monitor_list.head, no_monitor_node) {
- cds_list_del(&stream->no_monitor_node);
- /* Close everything in that stream. */
- consumer_stream_close(stream);
- /* Free the ressource. */
- consumer_stream_free(stream);
- }
- }
-
rcu_read_lock();
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
uint64_t session_id,
int cpu,
int *alloc_ret,
- enum consumer_channel_type type)
+ enum consumer_channel_type type,
+ unsigned int monitor)
{
int ret;
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
+ stream->monitor = monitor;
pthread_mutex_init(&stream->lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
assert(ht);
*/
lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
- /* Check and cleanup relayd */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
return relayd;
}
+/*
+ * Find a relayd and send the stream
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_stream(struct lttng_consumer_stream *stream,
+ char *path)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(stream);
+ assert(stream->net_seq_idx != -1ULL);
+ assert(path);
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_add_stream(&relayd->control_sock, stream->name,
+ path, &stream->relayd_stream_id,
+ stream->chan->tracefile_size, stream->chan->tracefile_count);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+ uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
+ } else {
+ ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
+ stream->key, stream->net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+ stream->name, stream->key, stream->net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Find a relayd and close the stream
+ */
+void close_relayd_stream(struct lttng_consumer_stream *stream)
+{
+ struct consumer_relayd_sock_pair *relayd;
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(stream->net_seq_idx);
+ if (relayd) {
+ consumer_stream_relayd_close(stream, relayd);
+ }
+ rcu_read_unlock();
+}
+
/*
* Handle stream for relayd transmission if the stream applies for network
* streaming where the net sequence index is set.
channel->wait_fd = -1;
CDS_INIT_LIST_HEAD(&channel->streams.head);
- CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head);
DBG("Allocated channel (key %" PRIu64 ")", channel->key)
pthread_mutex_unlock(&consumer_data.lock);
if (!ret && channel->wait_fd != -1 &&
- channel->metadata_stream == NULL) {
+ channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
}
return ret;
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int stream_key, uint32_t state))
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
node = lttng_ht_iter_get_node_u64(&iter);
assert(!node);
- /* Find relayd and, if one is found, increment refcount. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
goto end;
}
- local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
+ if (local_stream == NULL) {
+ PERROR("local_stream malloc");
+ goto end;
+ }
while (1) {
high_prio = 0;
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
- assert(cds_list_empty(&chan->streams.head));
consumer_close_channel_streams(chan);
/* Release our own refcount */
int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
ret = -ENOMEM;
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
+ goto error;
} else {
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd->sessiond_session_id = sessiond_id;
relayd_created = 1;
}
}
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0 || ret_code != LTTNG_OK) {
+ ret = consumer_send_status_msg(sock, LTTNG_OK);
+ if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
- goto error;
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ goto error_nosignal;
}
/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
- goto error;
+ goto error_nosignal;
}
/* Get relayd socket from session daemon */
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
- ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
* issue when reaching the fd limit.
*/
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
-
- /*
- * This code path MUST continue to the consumer send status message so
- * we can send the error to the thread expecting a reply. The above
- * call will make everything stop.
- */
- }
-
- /* We have the fds without error. Send status back. */
- ret = consumer_send_status_msg(sock, ret_code);
- if (ret < 0 || ret_code != LTTNG_OK) {
- /* Somehow, the session daemon is not responding anymore. */
+ ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD;
goto error;
}
ret = lttcomm_create_sock(&relayd->control_sock.sock);
/* Handle create_sock error. */
if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
}
/*
*/
(void) relayd_close(&relayd->control_sock);
(void) relayd_close(&relayd->data_sock);
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
goto error;
}
ret = lttcomm_create_sock(&relayd->data_sock.sock);
/* Handle create_sock error. */
if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
}
/*
default:
ERR("Unknown relayd socket type (%d)", sock_type);
ret = -1;
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
goto error;
}
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
+ /* We successfully added the socket. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ goto error_nosignal;
+ }
+
/*
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
return 0;
error:
+ if (consumer_send_status_msg(sock, ret_code) < 0) {
+ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
+ }
+
+error_nosignal:
/* Close received socket if valid. */
if (fd >= 0) {
if (close(fd)) {