X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=f7c24b896ee24e54db183928992b9c4f88d494a1;hb=fe5b36ea157526c6a118c31b77f18eff48f46feb;hp=60b6bf2214e8bee4d0266000a1db592982384940;hpb=53efb85a242809ed5ed21e9ab40effa696ecbc6f;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 60b6bf221..f7c24b896 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -18,6 +18,7 @@ */ #define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -44,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -232,6 +234,11 @@ int set_option(int opt, const char *arg, const char *optname) break; case 'g': tracing_group_name = strdup(arg); + if (tracing_group_name == NULL) { + ret = -errno; + PERROR("strdup"); + goto end; + } tracing_group_name_override = 1; break; case 'h': @@ -250,7 +257,10 @@ int set_option(int opt, const char *arg, const char *optname) if (arg) { lttng_opt_verbose = config_parse_value(arg); } else { - lttng_opt_verbose += 1; + /* Only 3 level of verbosity (-vvv). */ + if (lttng_opt_verbose < 3) { + lttng_opt_verbose += 1; + } } break; default: @@ -382,8 +392,9 @@ int set_options(int argc, char **argv) /* assign default values */ if (control_uri == NULL) { - ret = asprintf(&default_address, "tcp://0.0.0.0:%d", - DEFAULT_NETWORK_CONTROL_PORT); + ret = asprintf(&default_address, + "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d", + DEFAULT_NETWORK_CONTROL_PORT); if (ret < 0) { PERROR("asprintf default data address"); goto exit; @@ -397,8 +408,9 @@ int set_options(int argc, char **argv) } } if (data_uri == NULL) { - ret = asprintf(&default_address, "tcp://0.0.0.0:%d", - DEFAULT_NETWORK_DATA_PORT); + ret = asprintf(&default_address, + "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d", + DEFAULT_NETWORK_DATA_PORT); if (ret < 0) { PERROR("asprintf default data address"); goto exit; @@ -412,8 +424,9 @@ int set_options(int argc, char **argv) } } if (live_uri == NULL) { - ret = asprintf(&default_address, "tcp://0.0.0.0:%d", - DEFAULT_NETWORK_VIEWER_PORT); + ret = asprintf(&default_address, + "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d", + DEFAULT_NETWORK_VIEWER_PORT); if (ret < 0) { PERROR("asprintf default viewer control address"); goto exit; @@ -875,11 +888,12 @@ restart: new_conn->sock = newsock; /* Enqueue request for the dispatcher thread. */ - cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode); + cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail, + &new_conn->qnode); /* * Wake the dispatch queue futex. Implicit memory barrier with - * the exchange in cds_wfq_enqueue. + * the exchange in cds_wfcq_enqueue. */ futex_nto1_wake(&relay_conn_queue.futex); } @@ -926,7 +940,7 @@ void *relay_thread_dispatcher(void *data) { int err = -1; ssize_t ret; - struct cds_wfq_node *node; + struct cds_wfcq_node *node; struct relay_connection *new_conn = NULL; DBG("[thread] Relay dispatcher started"); @@ -949,7 +963,8 @@ void *relay_thread_dispatcher(void *data) health_code_update(); /* Dequeue commands */ - node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue); + node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head, + &relay_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the relay command queue"); /* Continue thread execution */ @@ -1198,6 +1213,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; + stream->ctf_stream_id = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); @@ -1335,7 +1351,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num); stream->close_flag = 1; session->stream_count--; - assert(session->stream_count >= 0); /* Check if we can close it or else the data will do it. */ try_close_stream(session, stream); @@ -1934,9 +1949,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); /* - * Only flag a stream inactive when it has already received data. + * Only flag a stream inactive when it has already received data + * and no indexes are in flight. */ - if (stream->total_index_received > 0) { + if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); } ret = 0; @@ -1953,9 +1969,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } index_created = 1; + stream->indexes_in_flight++; } copy_index_control_data(index, &index_info); + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = be64toh(index_info.stream_id); + } if (index_created) { /* @@ -1980,6 +2000,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } end_rcu_unlock: @@ -2143,6 +2165,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } index_created = 1; + stream->indexes_in_flight++; } if (rotate_index || stream->index_fd < 0) { @@ -2185,6 +2208,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } error: @@ -2386,7 +2411,7 @@ static void destroy_connection(struct lttng_ht *relay_connections_ht, connection_delete(relay_connections_ht, conn); /* For the control socket, we try to destroy the session. */ - if (conn->type == RELAY_CONTROL) { + if (conn->type == RELAY_CONTROL && conn->session) { destroy_session(conn->session, conn->sessions_ht); } @@ -2408,6 +2433,7 @@ void *relay_thread_worker(void *data) struct lttcomm_relayd_hdr recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; + struct relay_index *index; DBG("[thread] Relay worker started"); @@ -2596,6 +2622,7 @@ restart: if (revents & LPOLLIN) { if (conn->type != RELAY_DATA) { + rcu_read_unlock(); continue; } @@ -2639,6 +2666,14 @@ error: } rcu_read_unlock(); error_poll_create: + rcu_read_lock(); + cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, + index_n.node) { + health_code_update(); + relay_index_delete(index); + relay_index_free_safe(index); + } + rcu_read_unlock(); lttng_ht_destroy(indexes_ht); indexes_ht_error: lttng_ht_destroy(relay_connections_ht); @@ -2751,7 +2786,7 @@ int main(int argc, char **argv) } /* Init relay command queue. */ - cds_wfq_init(&relay_conn_queue.queue); + cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail); /* Set up max poll set size */ lttng_poll_set_max_size();