struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
assert(ht);
*/
lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
- /* Check and cleanup relayd */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
goto end;
}
uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
stream->key, stream->net_seq_idx);
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int stream_key, uint32_t state))
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
node = lttng_ht_iter_get_node_u64(&iter);
assert(!node);
- /* Find relayd and, if one is found, increment refcount. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
goto end;
}
- local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
+ if (local_stream == NULL) {
+ PERROR("local_stream malloc");
+ goto end;
+ }
while (1) {
high_prio = 0;
int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd->sessiond_session_id = sessiond_id;
relayd_created = 1;
}