health_code_update();
- cds_list_del(&stream->send_node);
+ cds_list_del_init(&stream->send_node);
lttng_ust_ctl_destroy_stream(stream->ustream);
lttng_trace_chunk_put(stream->trace_chunk);
free(stream);
* global.
*/
stream->globally_visible = 1;
- cds_list_del(&stream->send_node);
+ cds_list_del_init(&stream->send_node);
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
int ret;
struct lttng_consumer_channel *metadata;
+ ASSERT_RCU_READ_LOCKED();
+
DBG("UST consumer setup metadata key %" PRIu64, key);
metadata = consumer_find_channel(key);
* will make sure to clean that list.
*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
- cds_list_del(&metadata->metadata_stream->send_node);
metadata->metadata_stream = NULL;
send_streams_error:
error_no_stream:
LTTNG_ASSERT(path);
LTTNG_ASSERT(ctx);
+ ASSERT_RCU_READ_LOCKED();
DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
key, path);
* new metadata stream.
*/
consumer_stream_destroy(metadata_stream, NULL);
- cds_list_del(&metadata_stream->send_node);
metadata_channel->metadata_stream = NULL;
error:
LTTNG_ASSERT(path);
LTTNG_ASSERT(ctx);
+ ASSERT_RCU_READ_LOCKED();
rcu_read_lock();
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ uint32_t major = msg.u.relayd_sock.major;
+ uint32_t minor = msg.u.relayd_sock.minor;
+ enum lttcomm_sock_proto protocol =
+ (enum lttcomm_sock_proto) msg.u.relayd_sock
+ .relayd_socket_protocol;
+
/* Session daemon status message are handled in the following call. */
consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
- msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id);
+ msg.u.relayd_sock.type, ctx, sock,
+ consumer_sockpoll, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id, major,
+ minor, protocol);
goto end_nosignal;
}
case LTTNG_CONSUMER_DESTROY_RELAYD:
*/
rotate_channel = lttng_consumer_rotate_channel(
found_channel, key,
- msg.u.rotate_channel.relayd_id,
- msg.u.rotate_channel.metadata, ctx);
+ msg.u.rotate_channel.relayd_id);
if (rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
ret_rotate_read_streams =
lttng_consumer_rotate_ready_streams(
- found_channel, key,
- ctx);
+ found_channel, key);
if (ret_rotate_read_streams < 0) {
ERR("Rotate channel failed");
}
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(metadata_stream);
+ ASSERT_RCU_READ_LOCKED();
metadata_channel = metadata_stream->chan;
pthread_mutex_unlock(&metadata_stream->lock);
}
static int put_next_subbuffer(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+ struct stream_subbuffer *subbuffer __attribute__((unused)))
{
const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
}
static int signal_metadata(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
ASSERT_LOCKED(stream->metadata_rdv_lock);
return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;