*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
DBG2("Sending destroy relayd command to consumer sock %d", *sock->fd_ptr);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_DESTROY_RELAYD;
msg.u.destroy_relayd.net_seq_idx = consumer->net_seq_index;
assert(consumer);
assert(consumer_sock);
+ memset(&msg, 0, sizeof(msg));
/* Bail out if consumer is disabled */
if (!consumer->enabled) {
ret = LTTNG_OK;
assert(consumer);
- msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
+ DBG3("Consumer data pending for id %" PRIu64, session_id);
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_DATA_PENDING;
msg.u.data_pending.session_id = session_id;
- DBG3("Consumer data pending for id %" PRIu64, session_id);
-
/* Send command for each consumer */
rcu_read_lock();
cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
DBG2("Consumer flush channel key %" PRIu64, key);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_FLUSH_CHANNEL;
msg.u.flush_channel.key = key;
}
/*
- * Send a close metdata command to consumer using the given channel key.
+ * Send a close metadata command to consumer using the given channel key.
+ * Called with registry lock held.
*
* Return 0 on success else a negative value.
*/
DBG2("Consumer close metadata channel key %" PRIu64, metadata_key);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_CLOSE_METADATA;
msg.u.close_metadata.key = metadata_key;
DBG2("Consumer setup metadata channel key %" PRIu64, metadata_key);
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_SETUP_METADATA;
msg.u.setup_metadata.key = metadata_key;
}
/*
- * Send metadata string to consumer. Socket lock MUST be acquired.
+ * Send metadata string to consumer.
+ * RCU read-side lock must be held to guarantee existence of socket.
*
* Return 0 on success else a negative value.
*/
DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr);
+ pthread_mutex_lock(socket->lock);
+
+ memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_PUSH_METADATA;
msg.u.push_metadata.key = metadata_key;
msg.u.push_metadata.target_offset = target_offset;
}
end:
+ pthread_mutex_unlock(socket->lock);
health_code_update();
return ret;
}
*/
int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
- const char *session_path, int wait, int max_stream_size)
+ const char *session_path, int wait, uint64_t nb_packets_per_stream)
{
int ret;
struct lttcomm_consumer_msg msg;
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_SNAPSHOT_CHANNEL;
msg.u.snapshot_channel.key = key;
- msg.u.snapshot_channel.max_stream_size = max_stream_size;
+ msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
msg.u.snapshot_channel.metadata = metadata;
if (output->consumer->type == CONSUMER_DST_NET) {