shm fd is used as unique identifier for the channel when passing the
stream fds to the consumer. However, closing the fd reuses the same
identifier for the next round, thus getting the same consumer
channel/streams as the previous one. This causes multi-session tracing
to only work for the first session, not the following ones.
So instead of doing a channel lookup on add channel and a stream lookup
upon add stream, we "steal" the identifier. We still lookup the channel
identifier upon stream add though: this means the sessiond needs to keep
the channel shm fd open until it has finished sending all stream fds to
the consumer. For kernel consumer, the update operation still does a
stream id lookup.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
{
struct lttng_consumer_stream *iter;
{
struct lttng_consumer_stream *iter;
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
if (iter->key == key) {
DBG("Found stream key %d", key);
cds_list_for_each_entry(iter, &consumer_data.stream_list.head, list) {
if (iter->key == key) {
DBG("Found stream key %d", key);
+static void consumer_steal_stream_key(int key)
+{
+ struct lttng_consumer_stream *stream;
+
+ stream = consumer_find_stream(key);
+ if (stream)
+ stream->key = -1;
+}
+
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_consumer_channel *iter;
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_consumer_channel *iter;
+ /* Negative keys are lookup failures */
+ if (key < 0)
+ return NULL;
cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
if (iter->key == key) {
DBG("Found channel key %d", key);
cds_list_for_each_entry(iter, &consumer_data.channel_list.head, list) {
if (iter->key == key) {
DBG("Found channel key %d", key);
+static void consumer_steal_channel_key(int key)
+{
+ struct lttng_consumer_channel *channel;
+
+ channel = consumer_find_channel(key);
+ if (channel)
+ channel->key = -1;
+}
+
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
/*
* Remove a stream from the global list protected by a mutex. This
* function is also responsible for freeing its data structures.
int ret = 0;
pthread_mutex_lock(&consumer_data.lock);
int ret = 0;
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_stream(stream->key)) {
- ret = -1;
- goto end;
- }
+ /* Steal stream identifier, for UST */
+ consumer_steal_stream_key(stream->key);
cds_list_add(&stream->list, &consumer_data.stream_list.head);
consumer_data.stream_count++;
consumer_data.need_update = 1;
cds_list_add(&stream->list, &consumer_data.stream_list.head);
consumer_data.stream_count++;
consumer_data.need_update = 1;
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&consumer_data.lock);
- /* Check if already exist */
- if (consumer_find_channel(channel->key)) {
- ret = -1;
- goto end;
- }
+ /* Steal channel identifier, for UST */
+ consumer_steal_channel_key(channel->key);
cds_list_add(&channel->list, &consumer_data.channel_list.head);
cds_list_add(&channel->list, &consumer_data.channel_list.head);
pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&consumer_data.lock);
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
fds[0], fds[1]);
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name,
fds[0], fds[1]);
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
- new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
+ new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
if (ctx->on_update_stream != NULL) {
ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
if (ret == 0) {
if (ctx->on_update_stream != NULL) {
ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
if (ret == 0) {
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
lum.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
/*
lum.cmd_type = LTTNG_CONSUMER_ADD_CHANNEL;
/*
- * We need to keep shm_fd open to make sure this key stays unique within
- * the session daemon.
+ * We need to keep shm_fd open while we transfer the stream file
+ * descriptors to make sure this key stays unique within the
+ * session daemon. We can free the channel shm_fd without
+ * problem after we finished sending stream fds for that
+ * channel.
*/
lum.u.channel.channel_key = uchan->obj->shm_fd;
lum.u.channel.max_sb_size = uchan->attr.subbuf_size;
*/
lum.u.channel.channel_key = uchan->obj->shm_fd;
lum.u.channel.max_sb_size = uchan->attr.subbuf_size;