/*
* Sending a single channel to the consumer with command ADD_CHANNEL.
*/
-int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
+int kernel_consumer_add_channel(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel)
{
int ret;
struct lttcomm_consumer_msg lkm;
channel->channel->name,
channel->stream_count);
+ health_code_update(&health_thread_kernel);
+
ret = consumer_send_channel(sock, &lkm);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
error:
return ret;
}
/*
* Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
*/
-int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_add_metadata(struct consumer_socket *sock,
+ struct ltt_kernel_session *session)
{
int ret;
char tmp_path[PATH_MAX];
/* Safety net */
assert(session);
assert(session->consumer);
+ assert(sock);
DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
session->metadata->fd,
session->metadata->conf->attr.subbuf_size,
0, /* for kernel */
- "metadata",
+ DEFAULT_METADATA_NAME,
1);
+ health_code_update(&health_thread_kernel);
+
ret = consumer_send_channel(sock, &lkm);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
/* Prep stream message structure */
consumer_init_stream_comm_msg(&lkm,
LTTNG_CONSUMER_ADD_STREAM,
session->gid,
consumer->net_seq_index,
1, /* Metadata flag set */
- "metadata",
+ DEFAULT_METADATA_NAME,
pathname,
session->id);
+ health_code_update(&health_thread_kernel);
+
/* Send stream and file descriptor */
ret = consumer_send_stream(sock, consumer, &lkm,
&session->metadata_stream_fd, 1);
goto error;
}
+ health_code_update(&health_thread_kernel);
+
error:
return ret;
}
/*
* Sending a single stream to the consumer with command ADD_STREAM.
*/
-int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
- struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
+int kernel_consumer_add_stream(struct consumer_socket *sock,
+ struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
+ struct ltt_kernel_session *session)
{
int ret;
char tmp_path[PATH_MAX];
assert(stream);
assert(session);
assert(session->consumer);
+ assert(sock);
DBG("Sending stream %d of channel %s to kernel consumer",
stream->fd, channel->channel->name);
pathname,
session->id);
+ health_code_update(&health_thread_kernel);
+
/* Send stream and file descriptor */
ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
if (ret < 0) {
goto error;
}
+ health_code_update(&health_thread_kernel);
+
error:
return ret;
}
/*
* Send all stream fds of kernel channel to the consumer.
*/
-int kernel_consumer_send_channel_stream(int sock,
+int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
{
int ret;
assert(channel);
assert(session);
assert(session->consumer);
+ assert(sock);
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {
/*
* Send all stream fds of the kernel session to the consumer.
*/
-int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
+int kernel_consumer_send_session(struct consumer_socket *sock,
+ struct ltt_kernel_session *session)
{
int ret;
struct ltt_kernel_channel *chan;
/* Safety net */
assert(session);
assert(session->consumer);
+ assert(sock);
/* Bail out if consumer is disabled */
if (!session->consumer->enabled) {