enum consumer_channel_action {
CONSUMER_CHANNEL_ADD,
+ CONSUMER_CHANNEL_DEL,
CONSUMER_CHANNEL_QUIT,
};
struct consumer_channel_msg {
enum consumer_channel_action action;
- struct lttng_consumer_channel *chan;
+ struct lttng_consumer_channel *chan; /* add */
+ uint64_t key; /* del */
};
/*
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
+ uint64_t key,
enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
} while (ret < 0 && errno == EINTR);
}
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+ uint64_t key)
+{
+ notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+}
+
static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel **chan,
+ uint64_t *key,
enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
if (ret > 0) {
*action = msg.action;
*chan = msg.chan;
+ *key = msg.key;
}
return ret;
}
obj->net_seq_idx = net_seq_idx;
obj->refcount = 0;
obj->destroy_flag = 0;
+ obj->control_sock.sock.fd = -1;
+ obj->data_sock.sock.fd = -1;
lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
if (!ret && channel->wait_fd != -1 &&
channel->metadata_stream == NULL) {
- notify_channel_pipe(ctx, channel, CONSUMER_CHANNEL_ADD);
+ notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
}
return ret;
}
goto end;
}
outfd = stream->out_fd = ret;
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
}
stream->tracefile_size_current += len;
}
goto end;
}
outfd = stream->out_fd = ret;
+ /* Reset current size because we just perform a rotation. */
+ stream->tracefile_size_current = 0;
}
stream->tracefile_size_current += len;
}
continue;
} else if (revents & LPOLLIN) {
enum consumer_channel_action action;
+ uint64_t key;
- ret = read_channel_pipe(ctx, &chan, &action);
+ ret = read_channel_pipe(ctx, &chan, &key, &action);
if (ret <= 0) {
ERR("Error reading channel pipe");
continue;
lttng_poll_add(&events, chan->wait_fd,
LPOLLIN | LPOLLPRI);
break;
+ case CONSUMER_CHANNEL_DEL:
+ {
+ chan = consumer_find_channel(key);
+ if (!chan) {
+ ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+ break;
+ }
+ lttng_poll_del(&events, chan->wait_fd);
+ ret = lttng_ht_del(channel_ht, &iter);
+ assert(ret == 0);
+ consumer_close_channel_streams(chan);
+
+ /*
+ * Release our own refcount. Force channel deletion even if
+ * streams were not initialized.
+ */
+ if (!uatomic_sub_return(&chan->refcount, 1)) {
+ consumer_del_channel(chan);
+ }
+ goto restart;
+ }
case CONSUMER_CHANNEL_QUIT:
/*
* Remove the pipe from the poll set and continue the loop
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
- notify_channel_pipe(ctx, NULL, CONSUMER_CHANNEL_QUIT);
+ notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
/* Cleaning up possibly open sockets. */
if (sock >= 0) {