* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; only version 2
- * of the License.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
*
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
*
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _GNU_SOURCE
#include <assert.h>
-#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
{
struct lttng_consumer_stream *stream;
+ rcu_read_lock();
stream = consumer_find_stream(key);
- if (stream)
+ if (stream) {
stream->key = -1;
+ /*
+ * We don't want the lookup to match, but we still need
+ * to iterate on this stream when iterating over the hash table. Just
+ * change the node key.
+ */
+ stream->node.key = -1;
+ }
+ rcu_read_unlock();
}
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_consumer_channel *channel;
+ rcu_read_lock();
channel = consumer_find_channel(key);
- if (channel)
+ if (channel) {
channel->key = -1;
+ /*
+ * We don't want the lookup to match, but we still need
+ * to iterate on this channel when iterating over the hash table. Just
+ * change the node key.
+ */
+ channel->node.key = -1;
+ }
+ rcu_read_unlock();
+}
+
+static
+void consumer_free_stream(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+
+ free(stream);
}
/*
}
rcu_read_lock();
-
- /* Get stream node from hash table */
- lttng_ht_lookup(consumer_data.stream_ht,
- (void *)((unsigned long) stream->key), &iter);
- /* Remove stream node from hash table */
+ iter.iter.node = &stream->node.node;
ret = lttng_ht_del(consumer_data.stream_ht, &iter);
assert(!ret);
goto end;
}
if (stream->out_fd >= 0) {
- close(stream->out_fd);
+ ret = close(stream->out_fd);
+ if (ret) {
+ PERROR("close");
+ }
}
if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
- close(stream->wait_fd);
+ ret = close(stream->wait_fd);
+ if (ret) {
+ PERROR("close");
+ }
}
if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
- close(stream->shm_fd);
+ ret = close(stream->shm_fd);
+ if (ret) {
+ PERROR("close");
+ }
}
if (!--stream->chan->refcount)
free_chan = stream->chan;
- free(stream);
+
+ call_rcu(&stream->node.head, consumer_free_stream);
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
consumer_del_channel(free_chan);
}
-static void consumer_del_stream_rcu(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
-
- consumer_del_stream(stream);
-}
-
struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
int consumer_add_stream(struct lttng_consumer_stream *stream)
{
int ret = 0;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
consumer_steal_stream_key(stream->key);
rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.stream_ht,
+ (void *)((unsigned long) stream->key), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ rcu_read_unlock();
+ /* Stream already exist. Ignore the insertion */
+ goto end;
+ }
+
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
rcu_read_unlock();
consumer_data.stream_count++;
end:
pthread_mutex_unlock(&consumer_data.lock);
+
return ret;
}
pthread_mutex_unlock(&consumer_data.lock);
}
+static
+void consumer_free_channel(struct rcu_head *head)
+{
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_channel *channel =
+ caa_container_of(node, struct lttng_consumer_channel, node);
+
+ free(channel);
+}
+
/*
* Remove a channel from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
}
rcu_read_lock();
-
- lttng_ht_lookup(consumer_data.channel_ht,
- (void *)((unsigned long) channel->key), &iter);
+ iter.iter.node = &channel->node.node;
ret = lttng_ht_del(consumer_data.channel_ht, &iter);
assert(!ret);
-
rcu_read_unlock();
if (channel->mmap_base != NULL) {
}
}
if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
- close(channel->wait_fd);
+ ret = close(channel->wait_fd);
+ if (ret) {
+ PERROR("close");
+ }
}
if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
- close(channel->shm_fd);
+ ret = close(channel->shm_fd);
+ if (ret) {
+ PERROR("close");
+ }
}
- free(channel);
+
+ call_rcu(&channel->node.head, consumer_free_channel);
end:
pthread_mutex_unlock(&consumer_data.lock);
}
-static void consumer_del_channel_rcu(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_channel *channel=
- caa_container_of(node, struct lttng_consumer_channel, node);
-
- consumer_del_channel(channel);
-}
-
struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int shm_fd, int wait_fd,
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+
pthread_mutex_lock(&consumer_data.lock);
/* Steal channel identifier, for UST */
consumer_steal_channel_key(channel->key);
rcu_read_lock();
+
+ lttng_ht_lookup(consumer_data.channel_ht,
+ (void *)((unsigned long) channel->key), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node != NULL) {
+ /* Channel already exist. Ignore the insertion */
+ goto end;
+ }
+
lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+
+end:
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
+
return 0;
}
struct lttng_consumer_stream *stream;
DBG("Updating poll fd array");
+ rcu_read_lock();
cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
node.node) {
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM) {
local_stream[i] = stream;
i++;
}
+ rcu_read_unlock();
/*
* Insert the consumer_poll_pipe at the end of the array and don't
*/
void lttng_consumer_cleanup(void)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
*/
cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
node) {
- ret = lttng_ht_del(consumer_data.stream_ht, &iter);
- assert(!ret);
- call_rcu(&node->head, consumer_del_stream_rcu);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
+ consumer_del_stream(stream);
}
cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
node) {
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- call_rcu(&node->head, consumer_del_channel_rcu);
+ struct lttng_consumer_channel *channel =
+ caa_container_of(node, struct lttng_consumer_channel, node);
+ consumer_del_channel(channel);
}
rcu_read_unlock();
if (orig_offset < stream->chan->max_sb_size) {
return;
}
- sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
+ lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
stream->chan->max_sb_size,
SYNC_FILE_RANGE_WAIT_BEFORE
| SYNC_FILE_RANGE_WRITE
int err;
err = close(ctx->consumer_should_quit[i]);
- assert(!err);
+ if (err) {
+ PERROR("close");
+ }
}
error_quit_pipe:
for (i = 0; i < 2; i++) {
int err;
err = close(ctx->consumer_poll_pipe[i]);
- assert(!err);
+ if (err) {
+ PERROR("close");
+ }
}
error_poll_pipe:
free(ctx);
*/
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx)
{
- close(ctx->consumer_error_socket);
- close(ctx->consumer_thread_pipe[0]);
- close(ctx->consumer_thread_pipe[1]);
- close(ctx->consumer_poll_pipe[0]);
- close(ctx->consumer_poll_pipe[1]);
- close(ctx->consumer_should_quit[0]);
- close(ctx->consumer_should_quit[1]);
+ int ret;
+
+ ret = close(ctx->consumer_error_socket);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_thread_pipe[0]);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_thread_pipe[1]);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_poll_pipe[0]);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_poll_pipe[1]);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_should_quit[0]);
+ if (ret) {
+ PERROR("close");
+ }
+ ret = close(ctx->consumer_should_quit[1]);
+ if (ret) {
+ PERROR("close");
+ }
unlink(ctx->consumer_command_sock_path);
free(ctx);
}
ERR("Unknown consumer_data type");
assert(0);
}
+
+ return 0;
}
/*
local_stream[i]->hangup_flush_done) {
ssize_t len;
- assert(!(pollfd[i].revents & POLLERR));
- assert(!(pollfd[i].revents & POLLNVAL));
DBG("Normal read on fd %d", pollfd[i].fd);
len = ctx->on_buffer_ready(local_stream[i], ctx);
/* it's ok to have an unavailable sub-buffer */
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
+ consumer_del_stream(local_stream[i]);
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
+ consumer_del_stream(local_stream[i]);
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->data_read) {
- rcu_read_lock();
- consumer_del_stream_rcu(&local_stream[i]->node.head);
- rcu_read_unlock();
+ consumer_del_stream(local_stream[i]);
num_hup++;
}
}