summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
e4421fe)
Signed-off-by: David Goulet <dgoulet@efficios.com>
/*
* At this time, this lock is used to ensure coherence between the count
* and number of element in the hash table. It's also a protection for
/*
* At this time, this lock is used to ensure coherence between the count
* and number of element in the hash table. It's also a protection for
- * concurrent read/write between threads. Although hash table used are
- * lockless data structure, appropriate RCU lock mechanism are not yet
- * implemented in the consumer.
+ * concurrent read/write between threads.
+ *
+ * XXX: We need to see if this lock is still needed with the lockless RCU
+ * hash tables.
if (key < 0)
return NULL;
if (key < 0)
return NULL;
lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
stream = caa_container_of(node, struct lttng_consumer_stream, node);
}
if (key < 0)
return NULL;
if (key < 0)
return NULL;
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
/* Get stream node from hash table */
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
/* Get stream node from hash table */
lttng_ht_lookup(consumer_data.stream_ht,
(void *)((unsigned long) stream->key), &iter);
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
if (consumer_data.stream_count <= 0) {
goto end;
}
if (consumer_data.stream_count <= 0) {
goto end;
}
consumer_del_channel(free_chan);
}
consumer_del_channel(free_chan);
}
+static void consumer_del_stream_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+
+ consumer_del_stream(stream);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
consumer_data.stream_count++;
consumer_data.need_update = 1;
consumer_data.stream_count++;
consumer_data.need_update = 1;
lttng_ht_lookup(consumer_data.channel_ht,
(void *)((unsigned long) channel->key), &iter);
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
lttng_ht_lookup(consumer_data.channel_ht,
(void *)((unsigned long) channel->key), &iter);
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
pthread_mutex_unlock(&consumer_data.lock);
}
pthread_mutex_unlock(&consumer_data.lock);
}
+static void consumer_del_channel_rcu(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_channel *channel=
+ caa_container_of(node, struct lttng_consumer_channel, node);
+
+ consumer_del_channel(channel);
+}
+
struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
pthread_mutex_unlock(&consumer_data.lock);
return 0;
}
pthread_mutex_unlock(&consumer_data.lock);
return 0;
}
{
int ret;
struct lttng_ht_iter iter;
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream;
- struct lttng_consumer_channel *channel;
+ struct lttng_ht_node_ulong *node;
+
+ rcu_read_lock();
- * close all outfd. Called when there are no more threads
- * running (after joining on the threads), no need to protect
- * list iteration with mutex.
+ * close all outfd. Called when there are no more threads running (after
+ * joining on the threads), no need to protect list iteration with mutex.
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
- node.node) {
+ cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
+ node) {
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
- consumer_del_stream(stream);
+ call_rcu(&node->head, consumer_del_stream_rcu);
- cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
- node.node) {
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
+ node) {
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
- consumer_del_channel(channel);
+ call_rcu(&node->head, consumer_del_channel_rcu);
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
num_hup++;
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();
num_hup++;
} else if ((pollfd[i].revents & POLLHUP) &&
!(pollfd[i].revents & POLLIN)) {
num_hup++;
} else if ((pollfd[i].revents & POLLHUP) &&
!(pollfd[i].revents & POLLIN)) {
} else {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
}
} else {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
}
- consumer_del_stream(local_stream[i]);
+ rcu_read_lock();
+ consumer_del_stream_rcu(&local_stream[i]->node.head);
+ rcu_read_unlock();