*
* Return 0 on success or else a negative value.
*/
-static
-int make_viewer_streams(struct relay_session *session,
- enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent,
- uint32_t *nb_created, bool *closed)
+static int make_viewer_streams(struct relay_session *session,
+ struct lttng_trace_chunk *viewer_trace_chunk,
+ enum lttng_viewer_seek seek_t,
+ uint32_t *nb_total,
+ uint32_t *nb_unsent,
+ uint32_t *nb_created,
+ bool *closed)
{
int ret;
struct lttng_ht_iter iter;
}
vstream = viewer_stream_get_by_id(stream->stream_handle);
if (!vstream) {
- vstream = viewer_stream_create(stream, seek_t);
+ vstream = viewer_stream_create(stream,
+ viewer_trace_chunk, seek_t);
if (!vstream) {
ret = -1;
ctf_trace_put(ctf_trace);
health_code_update();
+ pthread_mutex_lock(&session->lock);
if (session->connection_closed) {
/* Skip closed session */
- continue;
+ goto next_session;
+ }
+ if (!session->current_trace_chunk) {
+ /*
+ * Skip un-attachable session. It is either
+ * being destroyed or has not had a trace
+ * chunk created against it yet.
+ */
+ goto next_session;
}
if (count >= buf_count) {
new_buf_count * sizeof(*send_session_buf));
if (!newbuf) {
ret = -1;
- break;
+ goto break_loop;
}
send_session_buf = newbuf;
buf_count = new_buf_count;
session->session_name,
sizeof(send_session->session_name))) {
ret = -1;
- break;
+ goto break_loop;
}
if (lttng_strncpy(send_session->hostname, session->hostname,
sizeof(send_session->hostname))) {
ret = -1;
- break;
+ goto break_loop;
}
send_session->id = htobe64(session->id);
send_session->live_timer = htobe32(session->live_timer);
}
send_session->streams = htobe32(session->stream_count);
count++;
+ next_session:
+ pthread_mutex_unlock(&session->lock);
+ continue;
+ break_loop:
+ pthread_mutex_unlock(&session->lock);
+ break;
}
rcu_read_unlock();
if (ret < 0) {
uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0;
struct lttng_viewer_new_streams_request request;
struct lttng_viewer_new_streams_response response;
- struct relay_session *session;
+ struct relay_session *session = NULL;
uint64_t session_id;
bool closed = false;
}
if (!viewer_session_is_attached(conn->viewer_session, session)) {
- send_streams = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
}
- send_streams = 1;
- response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
-
pthread_mutex_lock(&session->lock);
- ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
+ if (!session->current_trace_chunk) {
+ /*
+ * Means the session is being destroyed. React the same way
+ * as if it could not be found at all.
+ */
+ DBG("Relay session %" PRIu64 " has no current trace chunk, replying LTTNG_VIEWER_NEW_STREAMS_ERR",
+ session_id);
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
+ goto send_reply_unlock;
+ }
+
+ if (!conn->viewer_session->current_trace_chunk &&
+ session->current_trace_chunk) {
+ ret = viewer_session_set_trace_chunk(conn->viewer_session,
+ session->current_trace_chunk);
+ if (ret) {
+ goto error_unlock_session;
+ }
+ }
+ ret = make_viewer_streams(session,
+ conn->viewer_session->current_trace_chunk,
+ LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
&nb_created, &closed);
- pthread_mutex_unlock(&session->lock);
if (ret < 0) {
- goto end_put_session;
+ goto error_unlock_session;
}
+ send_streams = 1;
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
+
/* Only send back the newly created streams with the unsent ones. */
nb_streams = nb_created + nb_unsent;
response.streams_count = htobe32(nb_streams);
send_streams = 0;
response.streams_count = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
- goto send_reply;
+ goto send_reply_unlock;
}
+send_reply_unlock:
+ pthread_mutex_unlock(&session->lock);
send_reply:
health_code_update();
}
error:
return ret;
+error_unlock_session:
+ pthread_mutex_unlock(&session->lock);
+ session_put(session);
+ return ret;
}
/*
DBG("Attach session ID %" PRIu64 " received", session_id);
pthread_mutex_lock(&session->lock);
+ if (!session->current_trace_chunk) {
+ /*
+ * Session is either being destroyed or it never had a trace
+ * chunk created against it.
+ */
+ DBG("Session requested by live client has no current trace chunk, returning unknown session");
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
+ goto send_reply;
+ }
if (session->live_timer == 0) {
DBG("Not live session");
response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
goto send_reply;
}
- ret = make_viewer_streams(session, seek_type, &nb_streams, NULL,
- NULL, &closed);
+ if (!conn->viewer_session->current_trace_chunk &&
+ session->current_trace_chunk) {
+ ret = viewer_session_set_trace_chunk(conn->viewer_session,
+ session->current_trace_chunk);
+ if (ret) {
+ goto end_put_session;
+ }
+ }
+ ret = make_viewer_streams(session,
+ conn->viewer_session->current_trace_chunk, seek_type,
+ &nb_streams, NULL, NULL, &closed);
if (ret < 0) {
goto end_put_session;
}
-
pthread_mutex_unlock(&session->lock);
session_put(session);
session = NULL;
goto end;
}
vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only(
- rstream->trace_chunk, rstream->path_name,
+ vstream->stream_file.trace_chunk, rstream->path_name,
rstream->channel_name, rstream->tracefile_size,
vstream->current_tracefile_id,
lttng_to_index_major(connection_major, connection_minor),
if (ret < 0) {
goto error;
}
- lttng_poll_add(&events, conn->sock->fd,
+ ret = lttng_poll_add(&events,
+ conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
+ if (ret) {
+ ERR("Failed to add new live connection file descriptor to poll set");
+ goto error;
+ }
connection_ht_add(viewer_connections_ht, conn);
DBG("Connection socket %d added to poll", conn->sock->fd);
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {