int ret;
struct lttng_ht_iter iter;
+ if (relayd == NULL) {
+ return;
+ }
+
DBG("Consumer destroy and close relayd socket pair");
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
- assert(!ret);
+ if (ret != 0) {
+ /* We assume the relayd was already destroyed */
+ return;
+ }
/* Close all sockets */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
}
/* Check and cleanup relayd */
+ rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
- /* We are about to modify the relayd refcount */
- rcu_read_lock();
- if (!--relayd->refcount) {
- /* Refcount of the relayd struct is 0, destroy it */
+ uatomic_dec(&relayd->refcount);
+ assert(uatomic_read(&relayd->refcount) >= 0);
+
+ ret = relayd_send_close_stream(&relayd->control_sock,
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
+ if (ret < 0) {
+ ERR("Unable to close stream on the relayd. Continuing");
+ /* Continue here. There is nothing we can do for the relayd.*/
+ }
+
+ /* Both conditions are met, we destroy the relayd. */
+ if (uatomic_read(&relayd->refcount) == 0 &&
+ uatomic_read(&relayd->destroy_flag)) {
consumer_destroy_relayd(relayd);
}
- rcu_read_unlock();
}
+ rcu_read_unlock();
if (!--stream->chan->refcount) {
free_chan = stream->chan;
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
- rcu_read_lock();
+ rcu_read_lock();
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
}
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
- rcu_read_unlock();
/* Check and cleanup relayd */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
- /* We are about to modify the relayd refcount */
- rcu_read_lock();
- relayd->refcount++;
- rcu_read_unlock();
+ uatomic_inc(&relayd->refcount);
}
+ rcu_read_unlock();
/* Update consumer data */
consumer_data.stream_count++;
obj->net_seq_idx = net_seq_idx;
obj->refcount = 0;
+ obj->destroy_flag = 0;
lttng_ht_node_init_ulong(&obj->node, obj->net_seq_idx);
pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
* Find a relayd socket pair in the global consumer data.
*
* Return the object if found else NULL.
+ * RCU read-side lock must be held across this call and while using the
+ * returned object.
*/
struct consumer_relayd_sock_pair *consumer_find_relayd(int key)
{
goto error;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.relayd_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
}
- rcu_read_unlock();
-
error:
return relayd;
}
/* Reset data header */
memset(&data_hdr, 0, sizeof(data_hdr));
+ rcu_read_lock();
/* Get relayd reference of the stream. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
/* Set header with stream information */
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
}
error:
+ rcu_read_unlock();
return outfd;
}
{
int ret;
consumer_quit = 1;
- ret = write(ctx->consumer_should_quit[1], "4", 1);
+ do {
+ ret = write(ctx->consumer_should_quit[1], "4", 1);
+ } while (ret < 0 && errno == EINTR);
if (ret < 0) {
perror("write consumer quit");
}
*/
do {
ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret == -1UL && errno == EINTR);
+ } while (ret < 0 && errno == EINTR);
rcu_unregister_thread();
return NULL;
}