/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
rcu_read_lock();
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
rcu_read_lock();
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht,
&iter);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht,
&iter);
/* Set destroy flag for this object */
uatomic_set(&relayd->destroy_flag, 1);
/* Set destroy flag for this object */
uatomic_set(&relayd->destroy_flag, 1);
return lttng_ustconsumer_get_produced_snapshot(stream, pos);
default:
ERR("Unknown consumer_data type");
return lttng_ustconsumer_get_produced_snapshot(stream, pos);
default:
ERR("Unknown consumer_data type");
return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
default:
ERR("Unknown consumer_data type");
return lttng_ustconsumer_get_consumed_snapshot(stream, pos);
default:
ERR("Unknown consumer_data type");
return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
default:
ERR("Unknown consumer_data type");
return lttng_ustconsumer_recv_cmd(ctx, sock, consumer_sockpoll);
default:
ERR("Unknown consumer_data type");
/*
* This call should NEVER receive regular stream. It must always be
* metadata stream and this is crucial for data structure synchronization.
*/
/*
* This call should NEVER receive regular stream. It must always be
* metadata stream and this is crucial for data structure synchronization.
*/
*/
lttng_ht_lookup(ht, &stream->key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
*/
lttng_ht_lookup(ht, &stream->key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
}
node = lttng_ht_iter_get_node_u64(&iter);
lttng_ht_lookup(metadata_ht, &tmp_id, &iter);
}
node = lttng_ht_iter_get_node_u64(&iter);
if (revents & (LPOLLIN | LPOLLPRI)) {
/* Get the data out of the metadata file descriptor */
DBG("Metadata available on fd %d", pollfd);
if (revents & (LPOLLIN | LPOLLPRI)) {
/* Get the data out of the metadata file descriptor */
DBG("Metadata available on fd %d", pollfd);
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
ret = lttng_ht_del(ht, &iter);
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, channel, wait_fd_node.node) {
ret = lttng_ht_del(ht, &iter);
lttng_poll_del(&events, chan->wait_fd);
iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
lttng_poll_del(&events, chan->wait_fd);
iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
lttng_ht_lookup(channel_ht, &tmp_id, &iter);
}
node = lttng_ht_iter_get_node_u64(&iter);
lttng_ht_lookup(channel_ht, &tmp_id, &iter);
}
node = lttng_ht_iter_get_node_u64(&iter);
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
start_pos -= max_sb_size * nb_packets_per_stream;
if ((long) (start_pos - consumed_pos) < 0) {
return consumed_pos; /* Grab everything */
start_pos -= max_sb_size * nb_packets_per_stream;
if ((long) (start_pos - consumed_pos) < 0) {
return consumed_pos; /* Grab everything */
chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
&next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
&next_chunk_id);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
* Align produced position on the start-of-packet boundary of the first
* packet going into the next trace chunk.
*/
* Align produced position on the start-of-packet boundary of the first
* packet going into the next trace chunk.
*/
if (consumed_pos == produced_pos) {
DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
stream->key, produced_pos, consumed_pos);
if (consumed_pos == produced_pos) {
DBG("Set rotate ready for stream %" PRIu64 " produced = %lu consumed = %lu",
stream->key, produced_pos, consumed_pos);
ret = time_to_iso8601_str(chunk_creation_timestamp,
creation_timestamp_buffer,
sizeof(creation_timestamp_buffer));
ret = time_to_iso8601_str(chunk_creation_timestamp,
creation_timestamp_buffer,
sizeof(creation_timestamp_buffer));