break;
case 'g':
tracing_group_name = strdup(arg);
+ if (tracing_group_name == NULL) {
+ ret = -errno;
+ PERROR("strdup");
+ goto end;
+ }
tracing_group_name_override = 1;
break;
case 'h':
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
new_conn->sock = newsock;
/* Enqueue request for the dispatcher thread. */
- cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode);
+ cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
+ &new_conn->qnode);
/*
* Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfq_enqueue.
+ * the exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&relay_conn_queue.futex);
}
{
int err = -1;
ssize_t ret;
- struct cds_wfq_node *node;
+ struct cds_wfcq_node *node;
struct relay_connection *new_conn = NULL;
DBG("[thread] Relay dispatcher started");
health_code_update();
/* Dequeue commands */
- node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue);
+ node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
+ &relay_conn_queue.tail);
if (node == NULL) {
DBG("Woken up but nothing in the relay command queue");
/* Continue thread execution */
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->close_flag = 1;
session->stream_count--;
- assert(session->stream_count >= 0);
/* Check if we can close it or else the data will do it. */
try_close_stream(session, stream);
DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
/*
- * Only flag a stream inactive when it has already received data.
+ * Only flag a stream inactive when it has already received data
+ * and no indexes are in flight.
*/
- if (stream->total_index_received > 0) {
+ if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
stream->beacon_ts_end = be64toh(index_info.timestamp_end);
}
ret = 0;
goto end_rcu_unlock;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
copy_index_control_data(index, &index_info);
goto end_rcu_unlock;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
end_rcu_unlock:
goto error;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
if (rotate_index || stream->index_fd < 0) {
goto error;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
error:
stream->tracefile_size, stream->tracefile_count,
relayd_uid, relayd_gid, stream->fd,
&(stream->tracefile_count_current), &stream->fd);
- stream->total_index_received = 0;
pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
if (ret < 0) {
ERR("Rotating stream output file");
struct lttcomm_relayd_hdr recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+ struct relay_index *index;
DBG("[thread] Relay worker started");
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
health_code_update();
+ if (!revents) {
+ /* No activity for this FD (poll implementation). */
+ continue;
+ }
+
/* Skip the command pipe. It's handled in the first loop. */
if (pollfd == relay_conn_pipe[0]) {
continue;
}
- if (revents) {
- rcu_read_lock();
- conn = connection_find_by_sock(relay_connections_ht, pollfd);
- if (!conn) {
- /* Skip it. Might be removed before. */
+ rcu_read_lock();
+ conn = connection_find_by_sock(relay_connections_ht, pollfd);
+ if (!conn) {
+ /* Skip it. Might be removed before. */
+ rcu_read_unlock();
+ continue;
+ }
+
+ if (revents & LPOLLIN) {
+ if (conn->type != RELAY_DATA) {
rcu_read_unlock();
continue;
}
- if (revents & LPOLLIN) {
- if (conn->type != RELAY_DATA) {
- continue;
- }
-
- ret = relay_process_data(conn);
- /* Connection closed */
- if (ret < 0) {
- cleanup_connection_pollfd(&events, pollfd);
- destroy_connection(relay_connections_ht, conn);
- DBG("Data connection closed with %d", pollfd);
- /*
- * Every goto restart call sets the last seen fd where
- * here we don't really care since we gracefully
- * continue the loop after the connection is deleted.
- */
- } else {
- /* Keep last seen port. */
- last_seen_data_fd = pollfd;
- rcu_read_unlock();
- goto restart;
- }
+ ret = relay_process_data(conn);
+ /* Connection closed */
+ if (ret < 0) {
+ cleanup_connection_pollfd(&events, pollfd);
+ destroy_connection(relay_connections_ht, conn);
+ DBG("Data connection closed with %d", pollfd);
+ /*
+ * Every goto restart call sets the last seen fd where
+ * here we don't really care since we gracefully
+ * continue the loop after the connection is deleted.
+ */
+ } else {
+ /* Keep last seen port. */
+ last_seen_data_fd = pollfd;
+ rcu_read_unlock();
+ goto restart;
}
- rcu_read_unlock();
}
+ rcu_read_unlock();
}
last_seen_data_fd = -1;
}
}
rcu_read_unlock();
error_poll_create:
+ rcu_read_lock();
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+ index_n.node) {
+ health_code_update();
+ relay_index_delete(index);
+ relay_index_free_safe(index);
+ }
+ rcu_read_unlock();
lttng_ht_destroy(indexes_ht);
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
}
/* Init relay command queue. */
- cds_wfq_init(&relay_conn_queue.queue);
+ cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
/* Set up max poll set size */
lttng_poll_set_max_size();