projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Add hash table argument to helper functions
[lttng-tools.git]
/
src
/
common
/
consumer.c
diff --git
a/src/common/consumer.c
b/src/common/consumer.c
index a9e4dee661b1d1ba13da01a2db4d74ee16f68e8e..161bf7e324ba38abcc6976f91f219c9f2257d4af 100644
(file)
--- a/
src/common/consumer.c
+++ b/
src/common/consumer.c
@@
-62,20
+62,23
@@
volatile int consumer_quit = 0;
* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *consumer_find_stream(int key)
+static struct lttng_consumer_stream *consumer_find_stream(int key,
+ struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
struct lttng_consumer_stream *stream = NULL;
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
struct lttng_consumer_stream *stream = NULL;
+ assert(ht);
+
/* Negative keys are lookup failures */
/* Negative keys are lookup failures */
- if (key < 0)
+ if (key < 0)
{
return NULL;
return NULL;
+ }
rcu_read_lock();
rcu_read_lock();
- lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
- &iter);
+ lttng_ht_lookup(ht, (void *)((unsigned long) key), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
stream = caa_container_of(node, struct lttng_consumer_stream, node);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
stream = caa_container_of(node, struct lttng_consumer_stream, node);
@@
-86,12
+89,12
@@
static struct lttng_consumer_stream *consumer_find_stream(int key)
return stream;
}
return stream;
}
-static void consumer_steal_stream_key(int key)
+static void consumer_steal_stream_key(int key
, struct lttng_ht *ht
)
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
- stream = consumer_find_stream(key);
+ stream = consumer_find_stream(key
, ht
);
if (stream) {
stream->key = -1;
/*
if (stream) {
stream->key = -1;
/*
@@
-111,8
+114,9
@@
static struct lttng_consumer_channel *consumer_find_channel(int key)
struct lttng_consumer_channel *channel = NULL;
/* Negative keys are lookup failures */
struct lttng_consumer_channel *channel = NULL;
/* Negative keys are lookup failures */
- if (key < 0)
+ if (key < 0)
{
return NULL;
return NULL;
+ }
rcu_read_lock();
rcu_read_lock();
@@
-242,7
+246,7
@@
void consumer_del_stream(struct lttng_consumer_stream *stream)
if (stream->mmap_base != NULL) {
ret = munmap(stream->mmap_base, stream->mmap_len);
if (ret != 0) {
if (stream->mmap_base != NULL) {
ret = munmap(stream->mmap_base, stream->mmap_len);
if (ret != 0) {
-
perror
("munmap");
+
PERROR
("munmap");
}
}
break;
}
}
break;
@@
-353,13
+357,19
@@
struct lttng_consumer_stream *consumer_allocate_stream(
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
-
perror
("malloc struct lttng_consumer_stream");
+
PERROR
("malloc struct lttng_consumer_stream");
*alloc_ret = -ENOMEM;
*alloc_ret = -ENOMEM;
-
return NULL
;
+
goto end
;
}
}
+
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
stream->chan = consumer_find_channel(channel_key);
if (!stream->chan) {
*alloc_ret = -ENOENT;
stream->chan = consumer_find_channel(channel_key);
if (!stream->chan) {
*alloc_ret = -ENOENT;
+ ERR("Unable to find channel for stream %d", stream_key);
goto error;
}
stream->chan->refcount++;
goto error;
}
stream->chan->refcount++;
@@
-419,6
+429,7
@@
struct lttng_consumer_stream *consumer_allocate_stream(
error:
free(stream);
error:
free(stream);
+end:
return NULL;
}
return NULL;
}
@@
-434,7
+445,7
@@
int consumer_add_stream(struct lttng_consumer_stream *stream)
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
pthread_mutex_lock(&consumer_data.lock);
/* Steal stream identifier, for UST */
- consumer_steal_stream_key(stream->key);
+ consumer_steal_stream_key(stream->key
, consumer_data.stream_ht
);
rcu_read_lock();
lttng_ht_lookup(consumer_data.stream_ht,
rcu_read_lock();
lttng_ht_lookup(consumer_data.stream_ht,
@@
-611,7
+622,7
@@
void consumer_change_stream_state(int stream_key,
struct lttng_consumer_stream *stream;
pthread_mutex_lock(&consumer_data.lock);
struct lttng_consumer_stream *stream;
pthread_mutex_lock(&consumer_data.lock);
- stream = consumer_find_stream(stream_key);
+ stream = consumer_find_stream(stream_key
, consumer_data.stream_ht
);
if (stream) {
stream->state = state;
}
if (stream) {
stream->state = state;
}
@@
-663,7
+674,7
@@
void consumer_del_channel(struct lttng_consumer_channel *channel)
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
-
perror
("munmap");
+
PERROR
("munmap");
}
}
if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
}
}
if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
@@
-696,7
+707,7
@@
struct lttng_consumer_channel *consumer_allocate_channel(
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
-
perror
("malloc struct lttng_consumer_channel");
+
PERROR
("malloc struct lttng_consumer_channel");
goto end;
}
channel->key = channel_key;
goto end;
}
channel->key = channel_key;
@@
-820,7
+831,7
@@
restart:
if (errno == EINTR) {
goto restart;
}
if (errno == EINTR) {
goto restart;
}
-
perror
("Poll error");
+
PERROR
("Poll error");
goto exit;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
goto exit;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
@@
-912,7
+923,7
@@
void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
-
perror
("write consumer quit");
+
PERROR
("write consumer quit");
}
}
}
}
@@
-984,7
+995,7
@@
struct lttng_consumer_local_data *lttng_consumer_create(
ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
-
perror
("allocating context");
+
PERROR
("allocating context");
goto error;
}
goto error;
}
@@
-997,33
+1008,33
@@
struct lttng_consumer_local_data *lttng_consumer_create(
ret = pipe(ctx->consumer_poll_pipe);
if (ret < 0) {
ret = pipe(ctx->consumer_poll_pipe);
if (ret < 0) {
-
perror
("Error creating poll pipe");
+
PERROR
("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
-
perror
("fcntl O_NONBLOCK");
+
PERROR
("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
-
perror
("fcntl O_NONBLOCK");
+
PERROR
("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
goto error_poll_fcntl;
}
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
-
perror
("Error creating recv pipe");
+
PERROR
("Error creating recv pipe");
goto error_quit_pipe;
}
ret = pipe(ctx->consumer_thread_pipe);
if (ret < 0) {
goto error_quit_pipe;
}
ret = pipe(ctx->consumer_thread_pipe);
if (ret < 0) {
-
perror
("Error creating thread pipe");
+
PERROR
("Error creating thread pipe");
goto error_thread_pipe;
}
goto error_thread_pipe;
}
@@
-1507,9
+1518,7
@@
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
}
/*
}
/*
- * Iterate over all stream element of the hashtable and free them. This is race
- * free since the hashtable received MUST be in a race free synchronization
- * state. It's the caller responsability to make sure of that.
+ * Iterate over all streams of the hashtable and free them properly.
*/
static void destroy_stream_ht(struct lttng_ht *ht)
{
*/
static void destroy_stream_ht(struct lttng_ht *ht)
{
@@
-1526,7
+1535,7
@@
static void destroy_stream_ht(struct lttng_ht *ht)
ret = lttng_ht_del(ht, &iter);
assert(!ret);
ret = lttng_ht_del(ht, &iter);
assert(!ret);
-
free(
stream);
+
call_rcu(&stream->node.head, consumer_free_
stream);
}
rcu_read_unlock();
}
rcu_read_unlock();
@@
-1626,7
+1635,7
@@
static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
consumer_del_channel(stream->chan);
}
consumer_del_channel(stream->chan);
}
-
free(
stream);
+
call_rcu(&stream->node.head, consumer_free_
stream);
}
/*
}
/*
@@
-1723,20
+1732,13
@@
restart:
close(ctx->consumer_metadata_pipe[0]);
continue;
} else if (revents & LPOLLIN) {
close(ctx->consumer_metadata_pipe[0]);
continue;
} else if (revents & LPOLLIN) {
- stream = zmalloc(sizeof(struct lttng_consumer_stream));
- if (stream == NULL) {
- PERROR("zmalloc metadata consumer stream");
- goto error;
- }
-
do {
do {
- /* Get the stream and add it to the local hash table */
- ret = read(pollfd, stream,
- sizeof(struct lttng_consumer_stream));
+ /* Get the stream pointer received */
+ ret = read(pollfd, &stream, sizeof(stream));
} while (ret < 0 && errno == EINTR);
} while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
+ if (ret < 0 ||
+ ret < sizeof(struct lttng_consumer_stream *)) {
PERROR("read metadata stream");
PERROR("read metadata stream");
- free(stream);
/*
* Let's continue here and hope we can still work
* without stopping the consumer. XXX: Should we?
/*
* Let's continue here and hope we can still work
* without stopping the consumer. XXX: Should we?
@@
-1889,7
+1891,7
@@
void *lttng_consumer_thread_poll_fds(void *data)
/* allocate for all fds + 1 for the consumer_poll_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
/* allocate for all fds + 1 for the consumer_poll_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
-
perror
("pollfd malloc");
+
PERROR
("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
@@
-1898,7
+1900,7
@@
void *lttng_consumer_thread_poll_fds(void *data)
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
-
perror
("local_stream malloc");
+
PERROR
("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
@@
-1930,7
+1932,7
@@
void *lttng_consumer_thread_poll_fds(void *data)
if (errno == EINTR) {
goto restart;
}
if (errno == EINTR) {
goto restart;
}
-
perror
("Poll error");
+
PERROR
("Poll error");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
goto end;
} else if (num_rdy == 0) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
goto end;
} else if (num_rdy == 0) {
@@
-2102,7
+2104,7
@@
void *lttng_consumer_thread_receive_fds(void *data)
ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
if (ret < 0) {
ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
if (ret < 0) {
-
perror
("fcntl O_NONBLOCK");
+
PERROR
("fcntl O_NONBLOCK");
goto end;
}
goto end;
}
@@
-2125,7
+2127,7
@@
void *lttng_consumer_thread_receive_fds(void *data)
}
ret = fcntl(sock, F_SETFL, O_NONBLOCK);
if (ret < 0) {
}
ret = fcntl(sock, F_SETFL, O_NONBLOCK);
if (ret < 0) {
-
perror
("fcntl O_NONBLOCK");
+
PERROR
("fcntl O_NONBLOCK");
goto end;
}
goto end;
}
This page took
0.027858 seconds
and
4
git commands to generate.