Fix: stream intersection fails on snapshot of cleared session
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 26 Jun 2020 22:40:12 +0000 (18:40 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 16 Jul 2020 16:30:55 +0000 (12:30 -0400)
Observed issue
==============

In the following scenario:
  lttng create --snapshot
  lttng enable-event -u -a
  lttng start
  taskset -c 0 <tracepoint producing app>
  lttng clear
  taskset -c 0 <tracepoint producing app>
  lttng snapshot record
  lttng destroy

When using the stream-intersection mode, babetrace complains that the
time range for the intersection is invalid since the begin timestamp is
after the end timestamp.

This is caused by the presence of "inactive" streams for which no events
are recorded between the clear action and the recording of the snapshot.
These streams have a begin timestamp roughly equal to the moment when
the snapshot was taken (i.e. the end timestamp). Babeltrace, in
stream-intersection mode, attempts to use the latest beginning timestamp
of all streams as the start of the intersection and the earliest end
timestamp as the end boundary.

Path                                                        │File size        │Packets     │Timestamp: beginning         │Timestamp: end               │
snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_0 │        4.000 KiB│           1│2020-06-22 21:26:01.903685878│2020-06-22 21:26:17.630456312
snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_1 │        4.000 KiB│           1│2020-06-22 21:26:17.630909310│2020-06-22 21:26:17.630909310
snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_2 │        4.000 KiB│           1│2020-06-22 21:26:17.631295033│2020-06-22 21:26:17.631295033
snapshot-1-20200622-212617-1/ust/uid/1000/64-bit/channel0_3 │        4.000 KiB│           1│2020-06-22 21:26:17.631673614│2020-06-22 21:26:17.631673614

Cause
=====

The packet beginning timestamps of the buffers are initialized on
creation (on the first "start" of a tracing session). When a "clear" is
performed on a session, all open packets are closed and the existing
contents are purged.

If a stream is inactive, it is possible for no packet to be "opened"
until a snapshot of the tracing session is recorded.

Solution
========

A new consumer command, "open channel packets" is added. This command
performs a "flush empty" operation on all streams of a channel.

This command is invoked after a clear (after the tracing is re-started)
and on start. This ensures that streams are opened as soon as possible
after a clear, a rotation, or a session start.

Known drawbacks
===============

In the case of an inactive stream, this results an extra empty packet at
the beginning of the inactive streams (typically 4kB) in the snapshots.

In the case of an active stream, this change will cause the first packet
to be empty or contain few events. If the stream is active enough to
wrap-around, that empty packet will simply be overwritten.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I19b5c630fa8bddfb13c3c10f86c6cc9dc4990b08

13 files changed:
src/bin/lttng-sessiond/clear.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/session.c
src/bin/lttng-sessiond/session.h
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-app.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 03abbd8e19090c55301c393936824a5bfc2a2c2d..3ae70ea2fa5fd462d38e816a9328c58aaebc8081 100644 (file)
@@ -194,6 +194,16 @@ int cmd_clear_session(struct ltt_session *session, int *sock_fd)
                                goto end;
                        }
                }
                                goto end;
                        }
                }
+
+               /*
+                * Open a packet in every stream of the session to ensure that
+                * viewers can correctly identify the boundaries of the periods
+                * during which tracing was active for this session.
+                */
+               ret = session_open_packets(session);
+               if (ret != LTTNG_OK) {
+                       goto end;
+               }
        }
        ret = LTTNG_OK;
 end:
        }
        ret = LTTNG_OK;
 end:
index 2963577563cc94aa658003e89095c95b36fdb770..8c1776072c35d1aae2206007c70bd193a60cd89e 100644 (file)
@@ -2794,6 +2794,16 @@ int cmd_start_trace(struct ltt_session *session)
                }
        }
 
                }
        }
 
+       /*
+        * Open a packet in every stream of the session to ensure that viewers
+        * can correctly identify the boundaries of the periods during which
+        * tracing was active for this session.
+        */
+       ret = session_open_packets(session);
+       if (ret != LTTNG_OK) {
+               goto error;
+       }
+
        /*
         * Clear the flag that indicates that a rotation was done while the
         * session was stopped.
        /*
         * Clear the flag that indicates that a rotation was done while the
         * session was stopped.
index d282f59c9266e1b22fc0aa581576c8e30d3fd32d..552ff95cb2b08711624d5a2ad76261cc789b0535 100644 (file)
@@ -93,7 +93,8 @@ error:
  *
  * Return 0 on success else a negative value on error.
  */
  *
  * Return 0 on success else a negative value on error.
  */
-int consumer_socket_send(struct consumer_socket *socket, void *msg, size_t len)
+int consumer_socket_send(
+               struct consumer_socket *socket, const void *msg, size_t len)
 {
        int fd;
        ssize_t size;
 {
        int fd;
        ssize_t size;
@@ -861,7 +862,7 @@ error:
  * The consumer socket lock must be held by the caller.
  */
 int consumer_send_msg(struct consumer_socket *sock,
  * The consumer socket lock must be held by the caller.
  */
 int consumer_send_msg(struct consumer_socket *sock,
-               struct lttcomm_consumer_msg *msg)
+               const struct lttcomm_consumer_msg *msg)
 {
        int ret;
 
 {
        int ret;
 
@@ -1721,6 +1722,32 @@ error:
        return ret;
 }
 
        return ret;
 }
 
+int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key)
+{
+       int ret;
+       const struct lttcomm_consumer_msg msg = {
+               .cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
+               .u.open_channel_packets.key = key,
+       };
+
+       assert(socket);
+
+       DBG("Consumer open channel packets: channel key = %" PRIu64, key);
+
+       health_code_update();
+
+       pthread_mutex_lock(socket->lock);
+       ret = consumer_send_msg(socket, &msg);
+       pthread_mutex_unlock(socket->lock);
+       if (ret < 0) {
+               goto error_socket;
+       }
+
+error_socket:
+       health_code_update();
+       return ret;
+}
+
 int consumer_clear_channel(struct consumer_socket *socket, uint64_t key)
 {
        int ret;
 int consumer_clear_channel(struct consumer_socket *socket, uint64_t key)
 {
        int ret;
index 54c962a86847baf3f74d941bbcfb23018e14bfc8..eaa04d302ae6abf61c8a60b24223ab638bbb2051 100644 (file)
@@ -187,7 +187,7 @@ void consumer_destroy_socket(struct consumer_socket *sock);
 int consumer_copy_sockets(struct consumer_output *dst,
                struct consumer_output *src);
 void consumer_destroy_output_sockets(struct consumer_output *obj);
 int consumer_copy_sockets(struct consumer_output *dst,
                struct consumer_output *src);
 void consumer_destroy_output_sockets(struct consumer_output *obj);
-int consumer_socket_send(struct consumer_socket *socket, void *msg,
+int consumer_socket_send(struct consumer_socket *socket, const void *msg,
                size_t len);
 int consumer_socket_recv(struct consumer_socket *socket, void *msg,
                size_t len);
                size_t len);
 int consumer_socket_recv(struct consumer_socket *socket, void *msg,
                size_t len);
@@ -202,7 +202,7 @@ int consumer_set_network_uri(const struct ltt_session *session,
 int consumer_send_fds(struct consumer_socket *sock, const int *fds,
                size_t nb_fd);
 int consumer_send_msg(struct consumer_socket *sock,
 int consumer_send_fds(struct consumer_socket *sock, const int *fds,
                size_t nb_fd);
 int consumer_send_msg(struct consumer_socket *sock,
-               struct lttcomm_consumer_msg *msg);
+               const struct lttcomm_consumer_msg *msg);
 int consumer_send_stream(struct consumer_socket *sock,
                struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
                const int *fds, size_t nb_fd);
 int consumer_send_stream(struct consumer_socket *sock,
                struct consumer_output *dst, struct lttcomm_consumer_msg *msg,
                const int *fds, size_t nb_fd);
@@ -320,6 +320,7 @@ int consumer_trace_chunk_exists(struct consumer_socket *socket,
                uint64_t relayd_id, uint64_t session_id,
                struct lttng_trace_chunk *chunk,
                enum consumer_trace_chunk_exists_status *result);
                uint64_t relayd_id, uint64_t session_id,
                struct lttng_trace_chunk *chunk,
                enum consumer_trace_chunk_exists_status *result);
+int consumer_open_channel_packets(struct consumer_socket *socket, uint64_t key);
 
 char *setup_channel_trace_path(struct consumer_output *consumer,
                const char *session_path, size_t *consumer_path_offset);
 
 char *setup_channel_trace_path(struct consumer_output *consumer,
                const char *session_path, size_t *consumer_path_offset);
index 03bec43fd9284009fbdb7e07e85519d8116f7746..c3a65aafbfa6b44e327b32b165e155200afcdfbb 100644 (file)
@@ -792,6 +792,83 @@ end:
        return ret;
 }
 
        return ret;
 }
 
+/*
+ * This function skips the metadata channel as the begin/end timestamps of a
+ * metadata packet are useless.
+ *
+ * Moreover, opening a packet after a "clear" will cause problems for live
+ * sessions as it will introduce padding that was not part of the first trace
+ * chunk. The relay daemon expects the content of the metadata stream of
+ * successive metadata trace chunks to be strict supersets of one another.
+ *
+ * For example, flushing a packet at the beginning of the metadata stream of
+ * a trace chunk resulting from a "clear" session command will cause the
+ * size of the metadata stream of the new trace chunk to not match the size of
+ * the metadata stream of the original chunk. This will confuse the relay
+ * daemon as the same "offset" in a metadata stream will no longer point
+ * to the same content.
+ */
+static
+enum lttng_error_code session_kernel_open_packets(struct ltt_session *session)
+{
+       enum lttng_error_code ret = LTTNG_OK;
+       struct consumer_socket *socket;
+       struct lttng_ht_iter iter;
+       struct cds_lfht_node *node;
+       struct ltt_kernel_channel *chan;
+
+       rcu_read_lock();
+
+       cds_lfht_first(session->kernel_session->consumer->socks->ht, &iter.iter);
+       node = cds_lfht_iter_get_node(&iter.iter);
+       socket = container_of(node, typeof(*socket), node.node);
+
+       cds_list_for_each_entry(chan,
+                       &session->kernel_session->channel_list.head, list) {
+               int open_ret;
+
+               DBG("Open packet of kernel channel: channel key = %" PRIu64
+                               ", session name = %s, session_id = %" PRIu64,
+                               chan->key, session->name, session->id);
+
+               open_ret = consumer_open_channel_packets(socket, chan->key);
+               if (open_ret < 0) {
+                       /* General error (no known error expected). */
+                       ret = LTTNG_ERR_UNK;
+                       goto end;
+               }
+       }
+
+end:
+       rcu_read_unlock();
+       return ret;
+}
+
+enum lttng_error_code session_open_packets(struct ltt_session *session)
+{
+       enum lttng_error_code ret = LTTNG_OK;
+
+       DBG("Opening packets of session channels: session name = %s, session id = %" PRIu64,
+                       session->name, session->id);
+
+       if (session->ust_session) {
+               ret = ust_app_open_packets(session);
+               if (ret != LTTNG_OK) {
+                       goto end;
+               }
+       }
+
+       if (session->kernel_session) {
+               ret = session_kernel_open_packets(session);
+               if (ret != LTTNG_OK) {
+                       goto end;
+               }
+       }
+
+end:
+       return ret;
+}
+
 /*
  * Set a session's current trace chunk.
  *
 /*
  * Set a session's current trace chunk.
  *
index 1df70a4747a8341d93b8b3185603ccddcfb23782..34e51fe5a85850445e126453c3fd869c0e4d4790 100644 (file)
@@ -273,6 +273,9 @@ int session_close_trace_chunk(struct ltt_session *session,
                enum lttng_trace_chunk_command_type close_command,
                char *path);
 
                enum lttng_trace_chunk_command_type close_command,
                char *path);
 
+/* Open a packet in all channels of a given session. */
+enum lttng_error_code session_open_packets(struct ltt_session *session);
+
 bool session_output_supports_trace_chunks(const struct ltt_session *session);
 
 #endif /* _LTT_SESSION_H */
 bool session_output_supports_trace_chunks(const struct ltt_session *session);
 
 #endif /* _LTT_SESSION_H */
index 8c314ec1c9688d5df77c1eefc8fa047988e0d474..befbfab32dc2c692b56ef3005e4e46ad0ef9bf2a 100644 (file)
@@ -6644,3 +6644,126 @@ end:
        rcu_read_unlock();
        return cmd_ret;
 }
        rcu_read_unlock();
        return cmd_ret;
 }
+
+/*
+ * This function skips the metadata channel as the begin/end timestamps of a
+ * metadata packet are useless.
+ *
+ * Moreover, opening a packet after a "clear" will cause problems for live
+ * sessions as it will introduce padding that was not part of the first trace
+ * chunk. The relay daemon expects the content of the metadata stream of
+ * successive metadata trace chunks to be strict supersets of one another.
+ *
+ * For example, flushing a packet at the beginning of the metadata stream of
+ * a trace chunk resulting from a "clear" session command will cause the
+ * size of the metadata stream of the new trace chunk to not match the size of
+ * the metadata stream of the original chunk. This will confuse the relay
+ * daemon as the same "offset" in a metadata stream will no longer point
+ * to the same content.
+ */
+enum lttng_error_code ust_app_open_packets(struct ltt_session *session)
+{
+       enum lttng_error_code ret = LTTNG_OK;
+       struct lttng_ht_iter iter;
+       struct ltt_ust_session *usess = session->ust_session;
+
+       assert(usess);
+
+       rcu_read_lock();
+
+       switch (usess->buffer_type) {
+       case LTTNG_BUFFER_PER_UID:
+       {
+               struct buffer_reg_uid *reg;
+
+               cds_list_for_each_entry (
+                               reg, &usess->buffer_reg_uid_list, lnode) {
+                       struct buffer_reg_channel *reg_chan;
+                       struct consumer_socket *socket;
+
+                       socket = consumer_find_socket_by_bitness(
+                                       reg->bits_per_long, usess->consumer);
+                       if (!socket) {
+                               ret = LTTNG_ERR_FATAL;
+                               goto error;
+                       }
+
+                       cds_lfht_for_each_entry(reg->registry->channels->ht,
+                                       &iter.iter, reg_chan, node.node) {
+                               const int open_ret =
+                                               consumer_open_channel_packets(
+                                                       socket,
+                                                       reg_chan->consumer_key);
+
+                               if (open_ret < 0) {
+                                       ret = LTTNG_ERR_UNK;
+                                       goto error;
+                               }
+                       }
+               }
+               break;
+       }
+       case LTTNG_BUFFER_PER_PID:
+       {
+               struct ust_app *app;
+
+               cds_lfht_for_each_entry (
+                               ust_app_ht->ht, &iter.iter, app, pid_n.node) {
+                       struct consumer_socket *socket;
+                       struct lttng_ht_iter chan_iter;
+                       struct ust_app_channel *ua_chan;
+                       struct ust_app_session *ua_sess;
+                       struct ust_registry_session *registry;
+
+                       ua_sess = lookup_session_by_app(usess, app);
+                       if (!ua_sess) {
+                               /* Session not associated with this app. */
+                               continue;
+                       }
+
+                       /* Get the right consumer socket for the application. */
+                       socket = consumer_find_socket_by_bitness(
+                                       app->bits_per_long, usess->consumer);
+                       if (!socket) {
+                               ret = LTTNG_ERR_FATAL;
+                               goto error;
+                       }
+
+                       registry = get_session_registry(ua_sess);
+                       if (!registry) {
+                               DBG("Application session is being torn down. Skip application.");
+                               continue;
+                       }
+
+                       cds_lfht_for_each_entry(ua_sess->channels->ht,
+                                       &chan_iter.iter, ua_chan, node.node) {
+                               const int open_ret =
+                                               consumer_open_channel_packets(
+                                                       socket,
+                                                       ua_chan->key);
+
+                               if (open_ret < 0) {
+                                       /*
+                                        * Per-PID buffer and application going
+                                        * away.
+                                        */
+                                       if (ret == -LTTNG_ERR_CHAN_NOT_FOUND) {
+                                               continue;
+                                       }
+
+                                       ret = LTTNG_ERR_UNK;
+                                       goto error;
+                               }
+                       }
+               }
+               break;
+       }
+       default:
+               abort();
+               break;
+       }
+
+error:
+       rcu_read_unlock();
+       return ret;
+}
index 6f3588a54f6930f9f4a105b4ef84084420d939dd..164bc2c6f2c95acf59d91e2a339cd55f3b338fef 100644 (file)
@@ -354,6 +354,7 @@ enum lttng_error_code ust_app_create_channel_subdirectories(
 int ust_app_release_object(struct ust_app *app,
                struct lttng_ust_object_data *data);
 enum lttng_error_code ust_app_clear_session(struct ltt_session *session);
 int ust_app_release_object(struct ust_app *app,
                struct lttng_ust_object_data *data);
 enum lttng_error_code ust_app_clear_session(struct ltt_session *session);
+enum lttng_error_code ust_app_open_packets(struct ltt_session *session);
 
 static inline
 int ust_app_supported(void)
 
 static inline
 int ust_app_supported(void)
@@ -600,6 +601,12 @@ enum lttng_error_code ust_app_clear_session(struct ltt_session *session)
        return 0;
 }
 
        return 0;
 }
 
+static inline
+enum lttng_error_code ust_app_open_packets(struct ltt_session *session)
+{
+       return 0;
+}
+
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTT_UST_APP_H */
 #endif /* HAVE_LIBLTTNG_UST_CTL */
 
 #endif /* _LTT_UST_APP_H */
index 6505490cdc04e212c54aadd3d8eb58d569eb523e..be440e69492b020b4281f83cc94c12fe29c9c38f 100644 (file)
@@ -3425,6 +3425,9 @@ static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
        status = produced_pos_before != produced_pos_after ?
                        OPEN_PACKET_STATUS_OPENED :
                        OPEN_PACKET_STATUS_NO_SPACE;
        status = produced_pos_before != produced_pos_after ?
                        OPEN_PACKET_STATUS_OPENED :
                        OPEN_PACKET_STATUS_NO_SPACE;
+       if (status == OPEN_PACKET_STATUS_OPENED) {
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
 end:
        return status;
 }
 end:
        return status;
 }
@@ -3565,14 +3568,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk =
-                                       true;
                        break;
                case OPEN_PACKET_STATUS_NO_SPACE:
                        /*
                         * Can't open a packet as there is no space left.
                         * This means that new events were produced, resulting
                        break;
                case OPEN_PACKET_STATUS_NO_SPACE:
                        /*
                         * Can't open a packet as there is no space left.
                         * This means that new events were produced, resulting
-                        * in a packet being opened, which is what we want
+                        * in a packet being opened, which is what we wanted
                         * anyhow.
                         */
                        DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
                         * anyhow.
                         */
                        DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
@@ -3588,8 +3589,6 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                default:
                        abort();
                }
                default:
                        abort();
                }
-
-               stream->opened_packet_in_current_trace_chunk = true;
        }
 
 sleep_stream:
        }
 
 sleep_stream:
@@ -4294,8 +4293,6 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                                                ", channel name = %s, session id = %" PRIu64,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
                                                ", channel name = %s, session id = %" PRIu64,
                                                stream->key, stream->chan->name,
                                                stream->chan->session_id);
-                               stream->opened_packet_in_current_trace_chunk =
-                                               true;
                                break;
                        case OPEN_PACKET_STATUS_NO_SPACE:
                                /*
                                break;
                        case OPEN_PACKET_STATUS_NO_SPACE:
                                /*
@@ -5172,3 +5169,70 @@ int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
 end:
        return ret;
 }
 end:
        return ret;
 }
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+               struct lttng_consumer_channel *channel)
+{
+       struct lttng_consumer_stream *stream;
+       enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+       if (channel->metadata_stream) {
+               ERR("Open channel packets command attempted on a metadata channel");
+               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end;
+       }
+
+       rcu_read_lock();
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               enum open_packet_status status;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               status = open_packet(stream);
+               switch (status) {
+               case OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case OPEN_PACKET_STATUS_NO_SPACE:
+                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case OPEN_PACKET_STATUS_ERROR:
+                       /*
+                        * Only unexpected internal errors can lead to this
+                        * failing. Report an unknown error.
+                        */
+                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel id = %" PRIu64
+                                       ", channel name = %s"
+                                       ", session id = %" PRIu64,
+                                       stream->key, channel->key,
+                                       channel->name, channel->session_id);
+                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                       goto error_unlock;
+               default:
+                       abort();
+               }
+
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
+       return ret;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       goto end_rcu_unlock;
+}
index 6dedb7525fe7724bc709a4fdb45e5ed35a5939b9..73189660ce4eb888bf3acc12a0f17ea0c3f9071e 100644 (file)
@@ -64,6 +64,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
        LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
        LTTNG_CONSUMER_CLEAR_CHANNEL,
        LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
        LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
        LTTNG_CONSUMER_CLEAR_CHANNEL,
+       LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
 };
 
 enum lttng_consumer_type {
 };
 
 enum lttng_consumer_type {
@@ -1048,5 +1049,7 @@ enum lttcomm_return_code lttng_consumer_init_command(
                struct lttng_consumer_local_data *ctx,
                const lttng_uuid sessiond_uuid);
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
                struct lttng_consumer_local_data *ctx,
                const lttng_uuid sessiond_uuid);
 int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+               struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
 
 #endif /* LIB_CONSUMER_H */
index eeec3a65e7d877291a82ba9d63bf2fb6dc8f30a3..de1a14c218729bd9e88b0a1c1e9cf8219f85e596 100644 (file)
@@ -1311,6 +1311,24 @@ error_rotate_channel:
                                msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
                                msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+       {
+               const uint64_t key = msg.u.open_channel_packets.key;
+               struct lttng_consumer_channel *channel =
+                               consumer_find_channel(key);
+
+               if (channel) {
+                       pthread_mutex_lock(&channel->lock);
+                       ret_code = lttng_consumer_open_channel_packets(channel);
+                       pthread_mutex_unlock(&channel->lock);
+               } else {
+                       WARN("Channel %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+               goto end_msg_sessiond;
+       }
        default:
                goto end_nosignal;
        }
        default:
                goto end_nosignal;
        }
index 0b4c05fb795359b3e936008024d41c68eaf3288d..cca48d6abafdb6150fa06048e0f1610e26c904d6 100644 (file)
@@ -179,6 +179,7 @@ enum lttcomm_return_code {
        LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE,/* Trace chunk exists on relay daemon. */
        LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK,      /* Unknown trace chunk. */
        LTTCOMM_CONSUMERD_RELAYD_CLEAR_DISALLOWED,  /* Relayd does not accept clear command. */
        LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE,/* Trace chunk exists on relay daemon. */
        LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK,      /* Unknown trace chunk. */
        LTTCOMM_CONSUMERD_RELAYD_CLEAR_DISALLOWED,  /* Relayd does not accept clear command. */
+       LTTCOMM_CONSUMERD_UNKNOWN_ERROR,            /* Unknown error. */
 
        /* MUST be last element */
        LTTCOMM_NR,                                             /* Last element */
 
        /* MUST be last element */
        LTTCOMM_NR,                                             /* Last element */
@@ -705,6 +706,9 @@ struct lttcomm_consumer_msg {
                struct {
                        uint64_t key;
                } LTTNG_PACKED clear_channel;
                struct {
                        uint64_t key;
                } LTTNG_PACKED clear_channel;
+               struct {
+                       uint64_t key;
+               } LTTNG_PACKED open_channel_packets;
        } u;
 } LTTNG_PACKED;
 
        } u;
 } LTTNG_PACKED;
 
index ce2e17f742eae728649ebdfd8632565eae160ac6..209931f5644a3d74dc27ed1a8f7dbb3d3863d387 100644 (file)
@@ -2181,6 +2181,28 @@ end_rotate_channel_nosignal:
                                msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
                                msg.u.trace_chunk_exists.chunk_id);
                goto end_msg_sessiond;
        }
+       case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
+       {
+               const uint64_t key = msg.u.open_channel_packets.key;
+               struct lttng_consumer_channel *channel =
+                               consumer_find_channel(key);
+
+               if (channel) {
+                       pthread_mutex_lock(&channel->lock);
+                       ret_code = lttng_consumer_open_channel_packets(channel);
+                       pthread_mutex_unlock(&channel->lock);
+               } else {
+                       /*
+                        * The channel could have disappeared in per-pid
+                        * buffering mode.
+                        */
+                       DBG("Channel %" PRIu64 " not found", key);
+                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+               }
+
+               health_code_update();
+               goto end_msg_sessiond;
+       }
        default:
                break;
        }
        default:
                break;
        }
This page took 0.037927 seconds and 4 git commands to generate.