X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=06c1c15134cf63af055729b6b28f75d0489962ce;hp=445f432e416a60f584516770c96bf3d278685d2b;hb=b5a6470f372799b28d3d20603c1c0c8e5871dd63;hpb=bcf4a440f6c79c6546d9fe6a609c49fe6e8dc082 diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 445f432e4..06c1c1513 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -694,6 +694,7 @@ int viewer_connect(struct relay_connection *conn) health_code_update(); + memset(&reply, 0, sizeof(reply)); reply.major = RELAYD_VERSION_COMM_MAJOR; reply.minor = RELAYD_VERSION_COMM_MINOR; @@ -947,10 +948,6 @@ int viewer_get_new_streams(struct relay_connection *conn) send_streams = 1; response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK); - if (!send_streams) { - goto send_reply; - } - ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent, &nb_created); if (ret < 0) { @@ -1081,10 +1078,6 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - if (!send_streams) { - goto send_reply; - } - ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL); if (ret < 0) { goto end_unlock; @@ -1196,23 +1189,25 @@ int viewer_get_next_index(struct relay_connection *conn) rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); assert(rstream); + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); if (!rstream->close_flag) { if (vstream->abort_flag) { /* Rotate on abort (overwrite). */ DBG("Viewer rotate because of overwrite"); ret = viewer_stream_rotate(vstream, rstream); if (ret < 0) { + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto end_unlock; } else if (ret == 1) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); viewer_stream_destroy(ctf_trace, vstream); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto send_reply; } /* ret == 0 means successful so we continue. */ } - pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); if (rstream->tracefile_count_current == vstream->tracefile_count_current) { if (rstream->beacon_ts_end != -1ULL && vstream->last_sent_index == rstream->total_index_received) { @@ -1233,17 +1228,18 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } } - pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); } else if (rstream->close_flag && vstream->close_write_flag && vstream->total_index_received == vstream->last_sent_index) { /* Last index sent and current tracefile closed in write */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP); viewer_stream_delete(vstream); viewer_stream_destroy(ctf_trace, vstream); + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto send_reply; } else { vstream->close_write_flag = 1; } + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); if (!ctf_trace->metadata_received || ctf_trace->metadata_received > ctf_trace->metadata_sent) { @@ -1280,10 +1276,15 @@ int viewer_get_next_index(struct relay_connection *conn) sizeof(packet_index)); pthread_mutex_unlock(&vstream->overwrite_lock); if (ret < sizeof(packet_index)) { + unsigned int close_write_flag; + + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); + close_write_flag = vstream->close_write_flag; + pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); /* * The tracefile is closed in write, so we read up to EOF. */ - if (vstream->close_write_flag == 1) { + if (close_write_flag == 1) { viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); /* Rotate on normal EOF */ ret = viewer_stream_rotate(vstream, rstream); @@ -1634,7 +1635,7 @@ int viewer_create_session(struct relay_connection *conn) DBG("Viewer create session received"); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK); - conn->viewer_session = zmalloc(sizeof(conn->viewer_session)); + conn->viewer_session = zmalloc(sizeof(*conn->viewer_session)); if (!conn->viewer_session) { ERR("Allocation viewer session"); resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);