return ret;
}
-/*
- * Find a relayd and send the stream
- *
- * Returns 0 on success, < 0 on error
- */
-static int send_relayd_stream(struct lttng_consumer_stream *stream,
- char *path)
-{
- int ret = 0;
- const char *stream_path;
- struct consumer_relayd_sock_pair *relayd;
-
- assert(stream);
- assert(stream->net_seq_idx != -1ULL);
-
- if (path != NULL) {
- stream_path = path;
- } else {
- stream_path = stream->chan->pathname;
- }
-
- /* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- /* Add stream on the relayd */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_add_stream(&relayd->control_sock, stream->name,
- stream_path, &stream->relayd_stream_id,
- stream->chan->tracefile_size, stream->chan->tracefile_count);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- goto end;
- }
- uatomic_inc(&relayd->refcount);
- } else {
- ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
- stream->key, stream->net_seq_idx);
- ret = -1;
- goto end;
- }
-
- DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
- stream->name, stream->key, stream->net_seq_idx);
-
-end:
- rcu_read_unlock();
- return ret;
-}
-
-/*
- * Find a relayd and close the stream
- */
-static void close_relayd_stream(struct lttng_consumer_stream *stream)
-{
- struct consumer_relayd_sock_pair *relayd;
-
- /* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd) {
- consumer_stream_relayd_close(stream, relayd);
- }
- rcu_read_unlock();
-}
-
/*
* Take a snapshot of all the stream of a channel
*
goto end;
}
- cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head,
- no_monitor_node) {
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
/*
* Lock stream because we are about to change its state.
*/
stream->net_seq_idx = relayd_id;
channel->relayd_id = relayd_id;
if (relayd_id != (uint64_t) -1ULL) {
- ret = send_relayd_stream(stream, path);
+ ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
ERR("sending stream to relayd");
goto end_unlock;
}
} else {
ret = utils_create_stream_file(path, stream->name,
- stream->chan->tracefile_size, stream->tracefile_count_current,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current,
stream->uid, stream->gid);
if (ret < 0) {
ERR("utils_create_stream_file");
ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
- ERR("Failed to flush kernel metadata stream");
+ ERR("Failed to flush kernel stream");
goto end_unlock;
}
}
if (use_relayd) {
- ret = send_relayd_stream(metadata_stream, path);
+ ret = consumer_send_relayd_stream(metadata_stream, path);
if (ret < 0) {
goto error;
}
ret = 0;
+ cds_list_del(&metadata_stream->send_node);
+ consumer_stream_destroy(metadata_stream, NULL);
+ metadata_channel->metadata_stream = NULL;
error:
rcu_read_unlock();
return ret;
DBG("Kernel consumer add stream %s in no monitor mode with "
"relayd id %" PRIu64, new_stream->name,
new_stream->net_seq_idx);
- cds_list_add(&new_stream->no_monitor_node,
- &channel->stream_no_monitor_list.head);
+ cds_list_add(&new_stream->send_node, &channel->streams.head);
break;
}
- ret = send_relayd_stream(new_stream, NULL);
+ ret = consumer_send_relayd_stream(new_stream, NULL);
if (ret < 0) {
consumer_stream_free(new_stream);
goto end_nosignal;
return 0;
error_close_fd:
- {
+ if (stream->out_fd >= 0) {
int err;
err = close(stream->out_fd);
assert(!err);
+ stream->out_fd = -1;
}
error:
return ret;