summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
a500c25)
The LTTNG_CONSUMER_DESTROY_CHANNEL command is used to handle errors. It
should not call destroy_channel(), because at that point the channel
management thread has ownership of the channel. Send a notification to
the channel management thread asking it to destroy the channel instead.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
enum consumer_channel_action {
CONSUMER_CHANNEL_ADD,
enum consumer_channel_action {
CONSUMER_CHANNEL_ADD,
CONSUMER_CHANNEL_QUIT,
};
struct consumer_channel_msg {
enum consumer_channel_action action;
CONSUMER_CHANNEL_QUIT,
};
struct consumer_channel_msg {
enum consumer_channel_action action;
- struct lttng_consumer_channel *chan;
+ struct lttng_consumer_channel *chan; /* add */
+ uint64_t key; /* del */
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
static void notify_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *chan,
enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
enum consumer_channel_action action)
{
struct consumer_channel_msg msg;
} while (ret < 0 && errno == EINTR);
}
} while (ret < 0 && errno == EINTR);
}
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+ uint64_t key)
+{
+ notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+}
+
static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel **chan,
static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel **chan,
enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
enum consumer_channel_action *action)
{
struct consumer_channel_msg msg;
if (ret > 0) {
*action = msg.action;
*chan = msg.chan;
if (ret > 0) {
*action = msg.action;
*chan = msg.chan;
if (!ret && channel->wait_fd != -1 &&
channel->metadata_stream == NULL) {
if (!ret && channel->wait_fd != -1 &&
channel->metadata_stream == NULL) {
- notify_channel_pipe(ctx, channel, CONSUMER_CHANNEL_ADD);
+ notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
continue;
} else if (revents & LPOLLIN) {
enum consumer_channel_action action;
continue;
} else if (revents & LPOLLIN) {
enum consumer_channel_action action;
- ret = read_channel_pipe(ctx, &chan, &action);
+ ret = read_channel_pipe(ctx, &chan, &key, &action);
if (ret <= 0) {
ERR("Error reading channel pipe");
continue;
if (ret <= 0) {
ERR("Error reading channel pipe");
continue;
lttng_poll_add(&events, chan->wait_fd,
LPOLLIN | LPOLLPRI);
break;
lttng_poll_add(&events, chan->wait_fd,
LPOLLIN | LPOLLPRI);
break;
+ case CONSUMER_CHANNEL_DEL:
+ {
+ chan = consumer_find_channel(key);
+ if (!chan) {
+ ERR("UST consumer get channel key %" PRIu64 " not found for del channel", key);
+ break;
+ }
+ lttng_poll_del(&events, chan->wait_fd);
+ ret = lttng_ht_del(channel_ht, &iter);
+ assert(ret == 0);
+ consumer_close_channel_streams(chan);
+
+ /*
+ * Release our own refcount. Force channel deletion even if
+ * streams were not initialized.
+ */
+ if (!uatomic_sub_return(&chan->refcount, 1)) {
+ consumer_del_channel(chan);
+ }
+ goto restart;
+ }
case CONSUMER_CHANNEL_QUIT:
/*
* Remove the pipe from the poll set and continue the loop
case CONSUMER_CHANNEL_QUIT:
/*
* Remove the pipe from the poll set and continue the loop
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
- notify_channel_pipe(ctx, NULL, CONSUMER_CHANNEL_QUIT);
+ notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
/* Cleaning up possibly open sockets. */
if (sock >= 0) {
/* Cleaning up possibly open sockets. */
if (sock >= 0) {
int consumer_send_status_msg(int sock, int ret_code);
int consumer_send_status_channel(int sock,
struct lttng_consumer_channel *channel);
int consumer_send_status_msg(int sock, int ret_code);
int consumer_send_status_channel(int sock,
struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
+ uint64_t key);
#endif /* LIB_CONSUMER_H */
#endif /* LIB_CONSUMER_H */
case LTTNG_CONSUMER_DESTROY_CHANNEL:
{
uint64_t key = msg.u.destroy_channel.key;
case LTTNG_CONSUMER_DESTROY_CHANNEL:
{
uint64_t key = msg.u.destroy_channel.key;
- struct lttng_consumer_channel *channel;
-
- channel = consumer_find_channel(key);
- if (!channel) {
- ERR("UST consumer get channel key %" PRIu64 " not found", key);
- ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
- goto end_msg_sessiond;
- }
-
- destroy_channel(channel);
+ /*
+ * Only called if streams have not been sent to stream
+ * manager thread. However, channel has been sent to
+ * channel manager thread.
+ */
+ notify_thread_del_channel(ctx, key);
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_CLOSE_METADATA:
goto end_msg_sessiond;
}
case LTTNG_CONSUMER_CLOSE_METADATA: