The new stream hash table indexed by session id is added. The per
consumer stream mutex used for the synchronization between threads
accessing the stream which will be indexed in two hash tables.
Also, the data available command is added to both UST and kernel
consumer but for now returns ENOSYS. Just set the skeleton for the
implementation.
Finally, the session id is added to the trace-kernel (where it already
exists for UST) so it could be used later on for the data available
command.
This session id is also added in the consumer data structure for the
same purpose.
Signed-off-by: David Goulet <dgoulet@efficios.com>
PERROR("fcntl session fd");
}
PERROR("fcntl session fd");
}
lks->consumer_fds_sent = 0;
session->kernel_session = lks;
lks->consumer_fds_sent = 0;
session->kernel_session = lks;
*/
struct consumer_output *consumer;
struct consumer_output *tmp_consumer;
*/
struct consumer_output *consumer;
struct consumer_output *tmp_consumer;
+ /* Tracing session id */
+ unsigned int id;
gid_t gid,
int net_index,
int metadata_flag,
gid_t gid,
int net_index,
int metadata_flag,
int *alloc_ret)
{
struct lttng_consumer_stream *stream;
int *alloc_ret)
{
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = net_index;
stream->metadata_flag = metadata_flag;
stream->gid = gid;
stream->net_seq_idx = net_index;
stream->metadata_flag = metadata_flag;
+ stream->session_id = session_id;
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
+ pthread_mutex_init(&stream->lock, NULL);
/*
* Index differently the metadata node because the thread is using an
/*
* Index differently the metadata node because the thread is using an
lttng_ht_node_init_ulong(&stream->node, stream->key);
}
lttng_ht_node_init_ulong(&stream->node, stream->key);
}
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+
/*
* The cpu number is needed before using any ustctl_* actions. Ignored for
* the kernel so the value does not matter.
/*
* The cpu number is needed before using any ustctl_* actions. Ignored for
* the kernel so the value does not matter.
pthread_mutex_unlock(&consumer_data.lock);
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
pthread_mutex_unlock(&consumer_data.lock);
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
- stream->shm_fd, stream->wait_fd,
+ " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
+ stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
+ stream->net_seq_idx, stream->session_id);
{
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
{
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
assert(metadata_ht);
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
assert(metadata_ht);
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
+ /* Return to the sessiond if there is data pending for a session */
+ LTTNG_CONSUMER_DATA_AVAILABLE,
};
/* State of each fd in consumer */
};
/* State of each fd in consumer */
* uniquely a stream.
*/
struct lttng_consumer_stream {
* uniquely a stream.
*/
struct lttng_consumer_stream {
- /* Hash table node for both metadata and data type */
+ /* HT node used by the data_ht and metadata_ht */
struct lttng_ht_node_ulong node;
struct lttng_ht_node_ulong node;
+ /* HT node used in consumer_data.stream_list_ht */
+ struct lttng_ht_node_ulong node_session_id;
struct lttng_consumer_channel *chan; /* associated channel */
/*
* key is the key used by the session daemon to refer to the
struct lttng_consumer_channel *chan; /* associated channel */
/*
* key is the key used by the session daemon to refer to the
uint64_t relayd_stream_id;
/* Next sequence number to use for trace packet */
uint64_t next_net_seq_num;
uint64_t relayd_stream_id;
/* Next sequence number to use for trace packet */
uint64_t next_net_seq_num;
+ /* Lock to use the stream FDs since they are used between threads. */
+ pthread_mutex_t lock;
+ /* Tracing session id */
+ uint64_t session_id;
* stream has an index which associate the right relayd socket to use.
*/
struct lttng_ht *relayd_ht;
* stream has an index which associate the right relayd socket to use.
*/
struct lttng_ht *relayd_ht;
+
+ /*
+ * This hash table contains all streams (metadata and data) indexed by
+ * session id. In other words, the ht is indexed by session id and each
+ * bucket contains the list of associated streams.
+ *
+ * This HT uses the "node_session_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_list_ht;
};
/* Defined in consumer.c and coupled with explanations */
};
/* Defined in consumer.c and coupled with explanations */
gid_t gid,
int net_index,
int metadata_flag,
gid_t gid,
int net_index,
int metadata_flag,
int *alloc_ret);
extern void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
int *alloc_ret);
extern void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
+ msg.u.stream.session_id,
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
+ case LTTNG_CONSUMER_DATA_AVAILABLE:
+ {
+ rcu_read_unlock();
+ return -ENOSYS;
+ }
default:
goto end_nosignal;
}
default:
goto end_nosignal;
}
int net_index;
unsigned int metadata_flag;
char name[LTTNG_SYMBOL_NAME_LEN]; /* Name string of the stream */
int net_index;
unsigned int metadata_flag;
char name[LTTNG_SYMBOL_NAME_LEN]; /* Name string of the stream */
+ uint64_t session_id; /* Tracing session id of the stream */
} stream;
struct {
int net_index;
} stream;
struct {
int net_index;
struct {
uint64_t net_seq_idx;
} destroy_relayd;
struct {
uint64_t net_seq_idx;
} destroy_relayd;
+ struct {
+ uint64_t session_id;
+ } data_available;
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
msg.u.stream.gid,
msg.u.stream.net_index,
msg.u.stream.metadata_flag,
+ msg.u.stream.session_id,
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
&alloc_ret);
if (new_stream == NULL) {
switch (alloc_ret) {
rcu_read_unlock();
return -ENOSYS;
}
rcu_read_unlock();
return -ENOSYS;
}
+ case LTTNG_CONSUMER_DATA_AVAILABLE:
+ {
+ rcu_read_unlock();
+ return -ENOSYS;
+ }