/* assign default values */
if (control_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_CONTROL_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_CONTROL_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
goto exit;
}
}
if (data_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_DATA_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_DATA_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
goto exit;
}
}
if (live_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_VIEWER_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_VIEWER_PORT);
if (ret < 0) {
PERROR("asprintf default viewer control address");
goto exit;
stream->session_id = session->id;
stream->index_fd = -1;
stream->read_index_fd = -1;
+ stream->ctf_stream_id = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
/*
- * Only flag a stream inactive when it has already received data.
+ * Only flag a stream inactive when it has already received data
+ * and no indexes are in flight.
*/
- if (stream->total_index_received > 0) {
+ if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
stream->beacon_ts_end = be64toh(index_info.timestamp_end);
}
ret = 0;
goto end_rcu_unlock;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
copy_index_control_data(index, &index_info);
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = be64toh(index_info.stream_id);
+ }
if (index_created) {
/*
goto end_rcu_unlock;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
end_rcu_unlock:
goto error;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
if (rotate_index || stream->index_fd < 0) {
goto error;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
error:
connection_delete(relay_connections_ht, conn);
/* For the control socket, we try to destroy the session. */
- if (conn->type == RELAY_CONTROL) {
+ if (conn->type == RELAY_CONTROL && conn->session) {
destroy_session(conn->session, conn->sessions_ht);
}