/* Stopping all threads */
DBG("Terminating all live threads");
- ret = notify_thread_pipe(live_conn_pipe[1]);
+ ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
}
}
/* Add quit pipe */
- ret = lttng_poll_add(events, live_conn_pipe[0], LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
if (ret < 0) {
goto error;
}
* Return 1 if it was triggered else 0;
*/
static
-int check_live_conn_pipe(int fd, uint32_t events)
+int check_thread_quit_pipe(int fd, uint32_t events)
{
- if (fd == live_conn_pipe[0] && (events & LPOLLIN)) {
+ if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
return 1;
}
pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
- ret = check_live_conn_pipe(pollfd, revents);
+ ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
err = 0;
goto exit;
health_code_update();
+ memset(&reply, 0, sizeof(reply));
reply.major = RELAYD_VERSION_COMM_MAJOR;
reply.minor = RELAYD_VERSION_COMM_MINOR;
send_streams = 1;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
- if (!send_streams) {
- goto send_reply;
- }
-
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
&nb_created);
if (ret < 0) {
goto send_reply;
}
- if (!send_streams) {
- goto send_reply;
- }
-
ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
if (ret < 0) {
goto end_unlock;
rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle);
assert(rstream);
+ pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
if (!rstream->close_flag) {
if (vstream->abort_flag) {
/* Rotate on abort (overwrite). */
DBG("Viewer rotate because of overwrite");
ret = viewer_stream_rotate(vstream, rstream);
if (ret < 0) {
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
goto end_unlock;
} else if (ret == 1) {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
viewer_stream_destroy(ctf_trace, vstream);
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
goto send_reply;
}
/* ret == 0 means successful so we continue. */
}
- pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
if (rstream->beacon_ts_end != -1ULL &&
vstream->last_sent_index == rstream->total_index_received) {
goto send_reply;
}
}
- pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
} else if (rstream->close_flag && vstream->close_write_flag &&
vstream->total_index_received == vstream->last_sent_index) {
/* Last index sent and current tracefile closed in write */
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
viewer_stream_destroy(ctf_trace, vstream);
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
goto send_reply;
} else {
vstream->close_write_flag = 1;
}
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
if (!ctf_trace->metadata_received ||
ctf_trace->metadata_received > ctf_trace->metadata_sent) {
sizeof(packet_index));
pthread_mutex_unlock(&vstream->overwrite_lock);
if (ret < sizeof(packet_index)) {
+ unsigned int close_write_flag;
+
+ pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
+ close_write_flag = vstream->close_write_flag;
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
/*
* The tracefile is closed in write, so we read up to EOF.
*/
- if (vstream->close_write_flag == 1) {
+ if (close_write_flag == 1) {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
/* Rotate on normal EOF */
ret = viewer_stream_rotate(vstream, rstream);
DBG("Viewer create session received");
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
- conn->viewer_session = zmalloc(sizeof(conn->viewer_session));
+ conn->viewer_session = zmalloc(sizeof(*conn->viewer_session));
if (!conn->viewer_session) {
ERR("Allocation viewer session");
resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
health_code_update();
/* Thread quit pipe has been closed. Killing thread. */
- ret = check_live_conn_pipe(pollfd, revents);
+ ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
err = 0;
goto exit;