From 2a174661a1e0ab551b41ff1cae7191688525fc1f Mon Sep 17 00:00:00 2001 From: David Goulet Date: Wed, 22 Jan 2014 12:32:56 -0500 Subject: [PATCH] Fix: big relayd cleanup and refactor Due to a number of race conditions between the live feature and the data threads, we had to refactor the relayd which mostly splits the different objects in their own files creating APIs for each of them. That done, we are able to remove duplicate code, centralize code path to help with cleanup and having a flow that makes more sense when handling multiple objects. This commits adds a lot of fixes with the live feature and cleanup code path of a session since some objects are shared between threads like ctf-trace and stream. So yeah, this is quite a BIG commit and unfortunately this work was essential in order to fix a lof of issues and have a way more maintainable code base for the relayd. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- src/bin/lttng-relayd/Makefile.am | 3 +- src/bin/lttng-relayd/cmd-2-1.h | 1 + src/bin/lttng-relayd/ctf-trace.c | 149 ++++++---- src/bin/lttng-relayd/ctf-trace.h | 29 +- src/bin/lttng-relayd/live.c | 243 +++++++++------- src/bin/lttng-relayd/lttng-relayd.h | 96 +------ src/bin/lttng-relayd/main.c | 398 ++++++++++----------------- src/bin/lttng-relayd/session.c | 140 +++++++++- src/bin/lttng-relayd/session.h | 40 ++- src/bin/lttng-relayd/stream.c | 153 ++++++++++ src/bin/lttng-relayd/stream.h | 116 ++++++++ src/bin/lttng-relayd/viewer-stream.c | 62 ++--- src/bin/lttng-relayd/viewer-stream.h | 6 +- 13 files changed, 891 insertions(+), 545 deletions(-) create mode 100644 src/bin/lttng-relayd/stream.c create mode 100644 src/bin/lttng-relayd/stream.h diff --git a/src/bin/lttng-relayd/Makefile.am b/src/bin/lttng-relayd/Makefile.am index 65e3d675c..cf1e62f61 100644 --- a/src/bin/lttng-relayd/Makefile.am +++ b/src/bin/lttng-relayd/Makefile.am @@ -15,7 +15,8 @@ lttng_relayd_SOURCES = main.c lttng-relayd.h utils.h utils.c cmd.h \ health-relayd.c health-relayd.h \ lttng-viewer-abi.h testpoint.h \ viewer-stream.h viewer-stream.c \ - session.c session.h + session.c session.h \ + stream.c stream.h # link on liblttngctl for check if relayd is already alive. lttng_relayd_LDADD = -lrt -lurcu-common -lurcu \ diff --git a/src/bin/lttng-relayd/cmd-2-1.h b/src/bin/lttng-relayd/cmd-2-1.h index 26bc0c1ba..3bba909b3 100644 --- a/src/bin/lttng-relayd/cmd-2-1.h +++ b/src/bin/lttng-relayd/cmd-2-1.h @@ -20,6 +20,7 @@ #define RELAYD_CMD_2_1_H #include "lttng-relayd.h" +#include "stream.h" int cmd_recv_stream_2_1(struct relay_command *cmd, struct relay_stream *stream); diff --git a/src/bin/lttng-relayd/ctf-trace.c b/src/bin/lttng-relayd/ctf-trace.c index 137790f57..6fe77a5b4 100644 --- a/src/bin/lttng-relayd/ctf-trace.c +++ b/src/bin/lttng-relayd/ctf-trace.c @@ -23,100 +23,135 @@ #include #include "ctf-trace.h" +#include "lttng-relayd.h" +#include "stream.h" static uint64_t last_relay_ctf_trace_id; +static void rcu_destroy_ctf_trace(struct rcu_head *head) +{ + struct lttng_ht_node_str *node = + caa_container_of(head, struct lttng_ht_node_str, head); + struct ctf_trace *trace= + caa_container_of(node, struct ctf_trace, node); + + free(trace); +} + /* - * Try to destroy a ctf_trace object meaning that the refcount is decremented - * and checked if down to 0 which will free it. + * Destroy a ctf trace and all stream contained in it. + * + * MUST be called with the RCU read side lock. */ -void ctf_trace_try_destroy(struct ctf_trace *obj) +void ctf_trace_destroy(struct ctf_trace *obj) { - unsigned long ret_ref; - - if (!obj) { - return; + struct relay_stream *stream, *tmp_stream; + + assert(obj); + /* + * Getting to this point, every stream referenced to that object have put + * back their ref since the've been closed by the control side. + */ + assert(!obj->refcount); + + cds_list_for_each_entry_safe(stream, tmp_stream, &obj->stream_list, + trace_list) { + stream_delete(relay_streams_ht, stream); + stream_destroy(stream); } - ret_ref = uatomic_add_return(&obj->refcount, -1); - if (ret_ref == 0) { - DBG("Freeing ctf_trace %" PRIu64, obj->id); - free(obj); + call_rcu(&obj->node.head, rcu_destroy_ctf_trace); +} + +void ctf_trace_try_destroy(struct relay_session *session, + struct ctf_trace *ctf_trace) +{ + assert(session); + assert(ctf_trace); + + /* + * Considering no viewer attach to the session and the trace having no more + * stream attached, wipe the trace. + */ + if (uatomic_read(&session->viewer_refcount) == 0 && + uatomic_read(&ctf_trace->refcount) == 0) { + ctf_trace_destroy(ctf_trace); } } /* * Create and return an allocated ctf_trace object. NULL on error. */ -struct ctf_trace *ctf_trace_create(void) +struct ctf_trace *ctf_trace_create(char *path_name) { struct ctf_trace *obj; + assert(path_name); + obj = zmalloc(sizeof(*obj)); if (!obj) { PERROR("ctf_trace alloc"); goto error; } + CDS_INIT_LIST_HEAD(&obj->stream_list); + obj->id = ++last_relay_ctf_trace_id; - DBG("Created ctf_trace %" PRIu64, obj->id); + lttng_ht_node_init_str(&obj->node, path_name); + + DBG("Created ctf_trace %" PRIu64 " with path: %s", obj->id, path_name); error: return obj; } /* - * Check if we can assign the ctf_trace id and metadata stream to one or all - * the streams with the same path_name (our unique ID for ctf traces). - * - * The given stream MUST be new and NOT visible (in any hash table). + * Return a ctf_trace object if found by id in the given hash table else NULL. */ -void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream) +struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht, + char *path_name) { + struct lttng_ht_node_str *node; struct lttng_ht_iter iter; - struct relay_stream *tmp_stream; + struct ctf_trace *trace = NULL; assert(ht); - assert(stream); - - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct((void *) stream->path_name, lttng_ht_seed), - ht->match_fct, (void *) stream->path_name, - &iter.iter, tmp_stream, ctf_trace_node.node) { - if (stream->metadata_flag) { - /* - * The new stream is the metadata stream for this trace, - * assign the ctf_trace pointer to all the streams in - * this bucket. - */ - pthread_mutex_lock(&tmp_stream->lock); - tmp_stream->ctf_trace = stream->ctf_trace; - uatomic_inc(&tmp_stream->ctf_trace->refcount); - pthread_mutex_unlock(&tmp_stream->lock); - DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64, - tmp_stream->ctf_trace->id, tmp_stream->stream_handle); - } else if (tmp_stream->ctf_trace) { - /* - * The ctf_trace already exists for this bucket, - * just assign the pointer to the new stream and exit. - */ - stream->ctf_trace = tmp_stream->ctf_trace; - uatomic_inc(&stream->ctf_trace->refcount); - DBG("Assigned ctf_trace %" PRIu64 " to stream %" PRIu64, - tmp_stream->ctf_trace->id, tmp_stream->stream_handle); - goto end; - } else { - /* - * We don't know yet the ctf_trace ID (no metadata has been added), - * so leave it there until the metadata stream arrives. - */ - goto end; - } + + lttng_ht_lookup(ht, (void *) path_name, &iter); + node = lttng_ht_iter_get_node_str(&iter); + if (!node) { + DBG("CTF Trace path %s not found", path_name); + goto end; } + trace = caa_container_of(node, struct ctf_trace, node); end: - rcu_read_unlock(); - return; + return trace; } +/* + * Add stream to a given hash table. + */ +void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace) +{ + assert(ht); + assert(trace); + + lttng_ht_add_str(ht, &trace->node); +} + +/* + * Delete stream from a given hash table. + */ +void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace) +{ + int ret; + struct lttng_ht_iter iter; + + assert(ht); + assert(trace); + + iter.iter.node = &trace->node.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); +} diff --git a/src/bin/lttng-relayd/ctf-trace.h b/src/bin/lttng-relayd/ctf-trace.h index 66f7a1568..a64d53aa6 100644 --- a/src/bin/lttng-relayd/ctf-trace.h +++ b/src/bin/lttng-relayd/ctf-trace.h @@ -24,18 +24,41 @@ #include #include "lttng-relayd.h" +#include "session.h" struct ctf_trace { int refcount; + unsigned int invalid_flag:1; uint64_t id; uint64_t metadata_received; uint64_t metadata_sent; struct relay_stream *metadata_stream; struct relay_viewer_stream *viewer_metadata_stream; + /* Node indexed by stream path name in the corresponding session. */ + struct lttng_ht_node_str node; + + /* Relay stream associated with this ctf trace. */ + struct cds_list_head stream_list; }; -void ctf_trace_assign(struct lttng_ht *ht, struct relay_stream *stream); -struct ctf_trace *ctf_trace_create(void); -void ctf_trace_try_destroy(struct ctf_trace *obj); +static inline void ctf_trace_get_ref(struct ctf_trace *trace) +{ + uatomic_inc(&trace->refcount); +} + +static inline void ctf_trace_put_ref(struct ctf_trace *trace) +{ + uatomic_add(&trace->refcount, -1); +} + +void ctf_trace_assign(struct relay_stream *stream); +struct ctf_trace *ctf_trace_create(char *path_name); +void ctf_trace_destroy(struct ctf_trace *obj); +void ctf_trace_try_destroy(struct relay_session *session, + struct ctf_trace *ctf_trace); +struct ctf_trace *ctf_trace_find_by_path(struct lttng_ht *ht, + char *path_name); +void ctf_trace_add(struct lttng_ht *ht, struct ctf_trace *trace); +void ctf_trace_delete(struct lttng_ht *ht, struct ctf_trace *trace); #endif /* _CTF_TRACE_H */ diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index d558b7148..764d616a2 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -59,6 +59,9 @@ #include "health-relayd.h" #include "testpoint.h" #include "viewer-stream.h" +#include "stream.h" +#include "session.h" +#include "ctf-trace.h" static struct lttng_uri *live_uri; @@ -199,6 +202,8 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { + struct ctf_trace *ctf_trace; + health_code_update(); /* Ignore if not the same session. */ @@ -207,8 +212,12 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, continue; } + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + vstream->path_name); + assert(ctf_trace); + send_stream.id = htobe64(vstream->stream_handle); - send_stream.ctf_trace_id = htobe64(vstream->ctf_trace->id); + send_stream.ctf_trace_id = htobe64(ctf_trace->id); send_stream.metadata_flag = htobe32(vstream->metadata_flag); strncpy(send_stream.path_name, vstream->path_name, sizeof(send_stream.path_name)); @@ -244,8 +253,8 @@ int make_viewer_streams(struct relay_session *session, uint32_t *nb_created) { int ret; - struct relay_stream *stream; struct lttng_ht_iter iter; + struct ctf_trace *ctf_trace; assert(session); @@ -262,45 +271,53 @@ int make_viewer_streams(struct relay_session *session, * Create viewer streams for relay streams that are ready to be used for a * the given session id only. */ - cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { - struct relay_viewer_stream *vstream; + rcu_read_lock(); + cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, + node.node) { + struct relay_stream *stream; health_code_update(); - if (stream->session->id != session->id || - !stream->ctf_trace || !stream->viewer_ready) { - /* - * Ignore stream from a different session. Don't create streams - * with no ctf_trace or not ready for the viewer. - */ + if (ctf_trace->invalid_flag) { continue; } - vstream = viewer_stream_find_by_id(stream->stream_handle); - if (!vstream) { - vstream = viewer_stream_create(stream, seek_t); + cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) { + struct relay_viewer_stream *vstream; + + if (!stream->viewer_ready) { + continue; + } + + vstream = viewer_stream_find_by_id(stream->stream_handle); if (!vstream) { - ret = -1; - goto error_unlock; + vstream = viewer_stream_create(stream, seek_t, ctf_trace); + if (!vstream) { + ret = -1; + goto error_unlock; + } + /* Acquire reference to ctf_trace. */ + ctf_trace_get_ref(ctf_trace); + + if (nb_created) { + /* Update number of created stream counter. */ + (*nb_created)++; + } + } else if (!vstream->sent_flag && nb_unsent) { + /* Update number of unsent stream counter. */ + (*nb_unsent)++; } - if (nb_created) { - /* Update number of created stream counter. */ - (*nb_created)++; + /* Update number of total stream counter. */ + if (nb_total) { + (*nb_total)++; } - } else if (!vstream->sent_flag && nb_unsent) { - /* Update number of unsent stream counter. */ - (*nb_unsent)++; - } - /* Update number of total stream counter. */ - if (nb_total) { - (*nb_total)++; } } ret = 0; error_unlock: + rcu_read_unlock(); pthread_mutex_unlock(&session->viewer_ready_lock); return ret; } @@ -331,7 +348,7 @@ void stop_threads(void) /* Stopping all threads */ DBG("Terminating all live threads"); - ret = notify_thread_pipe(thread_quit_pipe[1]); + ret = notify_thread_pipe(live_conn_pipe[1]); if (ret < 0) { ERR("write error on thread quit pipe"); } @@ -360,7 +377,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size) } /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); + ret = lttng_poll_add(events, live_conn_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -377,9 +394,9 @@ error: * Return 1 if it was triggered else 0; */ static -int check_thread_quit_pipe(int fd, uint32_t events) +int check_live_conn_pipe(int fd, uint32_t events) { - if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { + if (fd == live_conn_pipe[0] && (events & LPOLLIN)) { return 1; } @@ -497,7 +514,7 @@ restart: pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = check_live_conn_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -717,7 +734,6 @@ int viewer_connect(struct relay_command *cmd) health_code_update(); - ret = send_response(cmd->sock, &reply, sizeof(reply)); if (ret < 0) { goto end; @@ -774,7 +790,7 @@ int viewer_list_sessions(struct relay_command *cmd, sizeof(send_session.hostname)); send_session.id = htobe64(session->id); send_session.live_timer = htobe32(session->live_timer); - send_session.clients = htobe32(session->viewer_attached); + send_session.clients = htobe32(session->viewer_refcount); send_session.streams = htobe32(session->stream_count); health_code_update(); @@ -907,8 +923,6 @@ int viewer_attach_session(struct relay_command *cmd, assert(cmd); assert(sessions_ht); - DBG("Attach session received"); - health_code_update(); /* Receive the request from the connected client. */ @@ -927,21 +941,19 @@ int viewer_attach_session(struct relay_command *cmd, response.status = htobe32(VIEWER_ATTACH_UNK); goto send_reply; } + session_viewer_attach(session); + DBG("Attach session ID %" PRIu64 " received", be64toh(request.session_id)); - if (cmd->session_id == session->id) { - /* Same viewer already attached, just send the stream list. */ - send_streams = 1; - response.status = htobe32(VIEWER_ATTACH_OK); - } else if (session->viewer_attached != 0) { + if (uatomic_read(&session->viewer_refcount) > 1) { DBG("Already a viewer attached"); response.status = htobe32(VIEWER_ATTACH_ALREADY); + session_viewer_detach(session); goto send_reply; } else if (session->live_timer == 0) { DBG("Not live session"); response.status = htobe32(VIEWER_ATTACH_NOT_LIVE); goto send_reply; } else { - session->viewer_attached++; send_streams = 1; response.status = htobe32(VIEWER_ATTACH_OK); cmd->session_id = session->id; @@ -1014,6 +1026,8 @@ int viewer_get_next_index(struct relay_command *cmd, struct ctf_packet_index packet_index; struct relay_viewer_stream *vstream; struct relay_stream *rstream; + struct ctf_trace *ctf_trace; + struct relay_session *session; assert(cmd); assert(sessions_ht); @@ -1029,12 +1043,21 @@ int viewer_get_next_index(struct relay_command *cmd, health_code_update(); rcu_read_lock(); + session = session_find_by_id(sessions_ht, cmd->session_id); + if (!session) { + ret = -1; + goto end_unlock; + } + vstream = viewer_stream_find_by_id(be64toh(request_index.stream_id)); if (!vstream) { ret = -1; goto end_unlock; } + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, vstream->path_name); + assert(ctf_trace); + memset(&viewer_index, 0, sizeof(viewer_index)); /* @@ -1063,8 +1086,10 @@ int viewer_get_next_index(struct relay_command *cmd, vstream->index_read_fd = ret; } - rstream = relay_stream_find_by_id(vstream->stream_handle); - if (rstream) { + rstream = stream_find_by_id(relay_streams_ht, vstream->stream_handle); + assert(rstream); + + if (!rstream->close_flag) { if (vstream->abort_flag) { /* Rotate on abort (overwrite). */ DBG("Viewer rotate because of overwrite"); @@ -1074,10 +1099,12 @@ int viewer_get_next_index(struct relay_command *cmd, } else if (ret == 1) { viewer_index.status = htobe32(VIEWER_INDEX_HUP); viewer_stream_delete(vstream); - viewer_stream_destroy(vstream); + viewer_stream_destroy(ctf_trace, vstream); goto send_reply; } + /* ret == 0 means successful so we continue. */ } + pthread_mutex_lock(&rstream->viewer_stream_rotation_lock); if (rstream->tracefile_count_current == vstream->tracefile_count_current) { if (rstream->beacon_ts_end != -1ULL && @@ -1086,13 +1113,13 @@ int viewer_get_next_index(struct relay_command *cmd, viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end); pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); goto send_reply; - /* - * Reader and writer are working in the same tracefile, so we care - * about the number of index received and sent. Otherwise, we read - * up to EOF. - */ } else if (rstream->total_index_received <= vstream->last_sent_index && !vstream->close_write_flag) { + /* + * Reader and writer are working in the same tracefile, so we care + * about the number of index received and sent. Otherwise, we read + * up to EOF. + */ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); /* No new index to send, retry later. */ viewer_index.status = htobe32(VIEWER_INDEX_RETRY); @@ -1100,20 +1127,19 @@ int viewer_get_next_index(struct relay_command *cmd, } } pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock); - } else if (!rstream && vstream->close_write_flag && + } else if (rstream->close_flag && vstream->close_write_flag && vstream->total_index_received == vstream->last_sent_index) { /* Last index sent and current tracefile closed in write */ viewer_index.status = htobe32(VIEWER_INDEX_HUP); viewer_stream_delete(vstream); - viewer_stream_destroy(vstream); + viewer_stream_destroy(ctf_trace, vstream); goto send_reply; } else { vstream->close_write_flag = 1; } - if (!vstream->ctf_trace->metadata_received || - vstream->ctf_trace->metadata_received > - vstream->ctf_trace->metadata_sent) { + if (!ctf_trace->metadata_received || + ctf_trace->metadata_received > ctf_trace->metadata_sent) { viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; } @@ -1127,8 +1153,7 @@ int viewer_get_next_index(struct relay_command *cmd, pthread_mutex_lock(&vstream->overwrite_lock); if (vstream->abort_flag) { /* - * The file is being overwritten by the writer, we cannot - * use it. + * The file is being overwritten by the writer, we cannot * use it. */ viewer_index.status = htobe32(VIEWER_INDEX_RETRY); pthread_mutex_unlock(&vstream->overwrite_lock); @@ -1138,7 +1163,7 @@ int viewer_get_next_index(struct relay_command *cmd, } else if (ret == 1) { viewer_index.status = htobe32(VIEWER_INDEX_HUP); viewer_stream_delete(vstream); - viewer_stream_destroy(vstream); + viewer_stream_destroy(ctf_trace, vstream); goto send_reply; } goto send_reply; @@ -1160,7 +1185,7 @@ int viewer_get_next_index(struct relay_command *cmd, } else if (ret == 1) { viewer_index.status = htobe32(VIEWER_INDEX_HUP); viewer_stream_delete(vstream); - viewer_stream_destroy(vstream); + viewer_stream_destroy(ctf_trace, vstream); goto send_reply; } } else { @@ -1220,6 +1245,7 @@ int viewer_get_packet(struct relay_command *cmd, struct lttng_viewer_get_packet get_packet_info; struct lttng_viewer_trace_packet reply; struct relay_viewer_stream *stream; + struct ctf_trace *ctf_trace; assert(cmd); @@ -1241,7 +1267,10 @@ int viewer_get_packet(struct relay_command *cmd, if (!stream) { goto error; } - assert(stream->ctf_trace); + + ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht, + stream->path_name); + assert(ctf_trace); /* * First time we read this stream, we need open the tracefile, we should @@ -1270,9 +1299,8 @@ int viewer_get_packet(struct relay_command *cmd, stream->read_fd = ret; } - if (!stream->ctf_trace->metadata_received || - stream->ctf_trace->metadata_received > - stream->ctf_trace->metadata_sent) { + if (!ctf_trace->metadata_received || + ctf_trace->metadata_received > ctf_trace->metadata_sent) { reply.status = htobe32(VIEWER_GET_PACKET_ERR); reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; goto send_reply; @@ -1377,6 +1405,7 @@ int viewer_get_metadata(struct relay_command *cmd) struct lttng_viewer_get_metadata request; struct lttng_viewer_metadata_packet reply; struct relay_viewer_stream *stream; + struct ctf_trace *ctf_trace; assert(cmd); @@ -1396,12 +1425,13 @@ int viewer_get_metadata(struct relay_command *cmd) ERR("Invalid metadata stream"); goto error; } - assert(stream->ctf_trace); - assert(stream->ctf_trace->metadata_sent <= - stream->ctf_trace->metadata_received); - len = stream->ctf_trace->metadata_received - - stream->ctf_trace->metadata_sent; + ctf_trace = ctf_trace_find_by_path(cmd->session->ctf_traces_ht, + stream->path_name); + assert(ctf_trace); + assert(ctf_trace->metadata_sent <= ctf_trace->metadata_received); + + len = ctf_trace->metadata_received - ctf_trace->metadata_sent; if (len == 0) { reply.status = htobe32(VIEWER_NO_NEW_METADATA); goto send_reply; @@ -1436,7 +1466,7 @@ int viewer_get_metadata(struct relay_command *cmd) PERROR("Relay reading metadata file"); goto error; } - stream->ctf_trace->metadata_sent += read_len; + ctf_trace->metadata_sent += read_len; reply.status = htobe32(VIEWER_METADATA_OK); goto send_reply; @@ -1605,10 +1635,6 @@ void deferred_free_connection(struct rcu_head *head) struct relay_command *relay_connection = caa_container_of(head, struct relay_command, rcu_node); - if (relay_connection->session && - relay_connection->session->viewer_attached > 0) { - relay_connection->session->viewer_attached--; - } lttcomm_destroy_sock(relay_connection->sock); free(relay_connection); } @@ -1616,39 +1642,53 @@ void deferred_free_connection(struct rcu_head *head) /* * Delete all streams for a specific session ID. */ -static -void viewer_del_streams(uint64_t session_id) +static void destroy_viewer_streams_by_session(struct relay_session *session) { struct relay_viewer_stream *stream; struct lttng_ht_iter iter; + assert(session); + rcu_read_lock(); cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, stream, stream_n.node) { - health_code_update(); + struct ctf_trace *ctf_trace; - if (stream->session_id != session_id) { + health_code_update(); + if (stream->session_id != session->id) { continue; } + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + stream->path_name); + assert(ctf_trace); + viewer_stream_delete(stream); - assert(stream->ctf_trace); if (stream->metadata_flag) { - /* - * The metadata viewer stream is destroyed once the refcount on the - * ctf trace goes to 0 in the destroy stream function thus there is - * no explicit call to that function here. - */ - stream->ctf_trace->metadata_sent = 0; - stream->ctf_trace->viewer_metadata_stream = NULL; - } else { - viewer_stream_destroy(stream); + ctf_trace->metadata_sent = 0; + ctf_trace->viewer_metadata_stream = NULL; } + + viewer_stream_destroy(ctf_trace, stream); } rcu_read_unlock(); } +static void try_destroy_streams(struct relay_session *session) +{ + struct ctf_trace *ctf_trace; + struct lttng_ht_iter iter; + + assert(session); + + cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, + node.node) { + /* Attempt to destroy the ctf trace of that session. */ + ctf_trace_try_destroy(session, ctf_trace); + } +} + /* * Delete and free a connection. * @@ -1656,21 +1696,35 @@ void viewer_del_streams(uint64_t session_id) */ static void del_connection(struct lttng_ht *relay_connections_ht, - struct lttng_ht_iter *iter, struct relay_command *relay_connection) + struct lttng_ht_iter *iter, struct relay_command *relay_connection, + struct lttng_ht *sessions_ht) { int ret; + struct relay_session *session; assert(relay_connections_ht); assert(iter); assert(relay_connection); + assert(sessions_ht); DBG("Cleaning connection of session ID %" PRIu64, relay_connection->session_id); + rcu_read_lock(); ret = lttng_ht_del(relay_connections_ht, iter); assert(!ret); - viewer_del_streams(relay_connection->session_id); + session = session_find_by_id(sessions_ht, relay_connection->session_id); + if (session) { + /* + * Very important that this is done before destroying the session so we + * can put back every viewer stream reference from the ctf_trace. + */ + destroy_viewer_streams_by_session(session); + try_destroy_streams(session); + session_viewer_try_destroy(sessions_ht, session); + } + rcu_read_unlock(); call_rcu(&relay_connection->rcu_node, deferred_free_connection); } @@ -1754,7 +1808,7 @@ restart: health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = check_live_conn_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1789,12 +1843,12 @@ restart: if (revents & (LPOLLERR)) { cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection); + relay_connection, relay_ctx->sessions_ht); } else if (revents & (LPOLLHUP | LPOLLRDHUP)) { DBG("Viewer socket %d hung up", pollfd); cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection); + relay_connection, relay_ctx->sessions_ht); } else if (revents & LPOLLIN) { ret = relay_connection->sock->ops->recvmsg( relay_connection->sock, &recv_hdr, @@ -1804,7 +1858,7 @@ restart: if (ret <= 0) { cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection); + relay_connection, relay_ctx->sessions_ht); DBG("Viewer control connection closed with %d", pollfd); } else { @@ -1819,7 +1873,7 @@ restart: /* Clear the session on error. */ cleanup_poll_connection(&events, pollfd); del_connection(relay_connections_ht, &iter, - relay_connection); + relay_connection, relay_ctx->sessions_ht); DBG("Viewer connection closed with %d", pollfd); } } @@ -1845,7 +1899,8 @@ error: relay_connection = caa_container_of(node, struct relay_command, sock_n); - del_connection(relay_connections_ht, &iter, relay_connection); + del_connection(relay_connections_ht, &iter, relay_connection, + relay_ctx->sessions_ht); } rcu_read_unlock(); error_poll_create: diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index 6dfc80323..f0c1abfee 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -46,92 +46,6 @@ enum connection_type { RELAY_VIEWER_NOTIFICATION = 4, }; -/* - * When we receive a stream, it gets stored in a list (on a per connection - * basis) until we have all the streams of the same channel and the metadata - * associated with it, then it gets flagged with viewer_ready. - */ -struct relay_stream_recv_handle { - uint64_t id; /* stream handle */ - struct cds_list_head node; -}; - -/* - * Represents a stream in the relay - */ -struct relay_stream { - uint64_t stream_handle; - uint64_t prev_seq; /* previous data sequence number encountered */ - struct lttng_ht_node_ulong stream_n; - struct relay_session *session; - struct rcu_head rcu_node; - int fd; - /* FD on which to write the index data. */ - int index_fd; - /* FD on which to read the index data for the viewer. */ - int read_index_fd; - - char *path_name; - char *channel_name; - /* on-disk circular buffer of tracefiles */ - uint64_t tracefile_size; - uint64_t tracefile_size_current; - uint64_t tracefile_count; - uint64_t tracefile_count_current; - /* To inform the viewer up to where it can go back in time. */ - uint64_t oldest_tracefile_id; - - uint64_t total_index_received; - uint64_t last_net_seq_num; - - /* - * This node is added to the *control* connection hash table and the - * pointer is copied in here so we can access it when deleting this object. - * When deleting this, the ctf trace ht MUST NOT be destroyed. This happens - * at connection deletion. - */ - struct lttng_ht_node_str ctf_trace_node; - struct lttng_ht *ctf_traces_ht; - - /* - * To protect from concurrent read/update between the - * streaming-side and the viewer-side. - * This lock must be held, we reading/updating the - * ctf_trace pointer. - */ - pthread_mutex_t lock; - - struct ctf_trace *ctf_trace; - /* - * If the stream is inactive, this field is updated with the live beacon - * timestamp end, when it is active, this field == -1ULL. - */ - uint64_t beacon_ts_end; - /* - * To protect the update of the close_write_flag and the checks of - * the tracefile_count_current. - * It is taken before checking whenever we need to know if the - * writer and reader are working in the same tracefile. - */ - pthread_mutex_t viewer_stream_rotation_lock; - - /* Information telling us when to close the stream */ - unsigned int close_flag:1; - /* Indicate if the stream was initialized for a data pending command. */ - unsigned int data_pending_check_done:1; - unsigned int metadata_flag:1; - /* - * To detect when we start overwriting old data, it is used to - * update the oldest_tracefile_id. - */ - unsigned int tracefile_overwrite:1; - /* - * Can this stream be used by a viewer or are we waiting for additional - * information. - */ - unsigned int viewer_ready:1; -}; - /* * Internal structure to map a socket with the corresponding session. * A hashtable indexed on the socket FD is used for the lookups. @@ -146,7 +60,6 @@ struct relay_command { /* protocol version to use for this session */ uint32_t major; uint32_t minor; - struct lttng_ht *ctf_traces_ht; /* indexed by path name */ uint64_t session_id; struct cds_list_head recv_head; unsigned int version_check_done:1; @@ -158,7 +71,13 @@ struct relay_local_data { extern char *opt_output_path; +/* + * Contains stream indexed by ID. This is important since many commands lookup + * streams only by ID thus also keeping them in this hash table makes the + * search O(1) instead of iterating over the ctf_traces_ht of the session. + */ extern struct lttng_ht *relay_streams_ht; + extern struct lttng_ht *viewer_streams_ht; extern struct lttng_ht *indexes_ht; @@ -166,9 +85,6 @@ extern const char *tracing_group_name; extern const char * const config_section_name; -extern int thread_quit_pipe[2]; - -struct relay_stream *relay_stream_find_by_id(uint64_t stream_id); void lttng_relay_notify_ready(void); #endif /* LTTNG_RELAYD_H */ diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 02f676df8..fdfe73ccc 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -63,6 +63,8 @@ #include "health-relayd.h" #include "testpoint.h" #include "viewer-stream.h" +#include "session.h" +#include "stream.h" /* command line options */ char *opt_output_path; @@ -109,7 +111,6 @@ static pthread_t worker_thread; static pthread_t health_thread; static uint64_t last_relay_stream_id; -static uint64_t last_relay_session_id; /* * Relay command queue. @@ -691,7 +692,6 @@ error: static int close_stream_check(struct relay_stream *stream) { - if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) { /* * We are about to close the stream so set the data pending flag to 1 @@ -705,6 +705,40 @@ int close_stream_check(struct relay_stream *stream) return 0; } +static void try_close_stream(struct relay_session *session, + struct relay_stream *stream) +{ + int ret; + struct ctf_trace *ctf_trace; + + assert(session); + assert(stream); + + if (!close_stream_check(stream)) { + /* Can't close it, not ready for that. */ + goto end; + } + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + stream->path_name); + assert(ctf_trace); + + pthread_mutex_lock(&session->viewer_ready_lock); + ctf_trace->invalid_flag = 1; + pthread_mutex_unlock(&session->viewer_ready_lock); + + ret = stream_close(session, stream); + if (!ret) { + /* Already close thus the ctf trace is being or has been destroyed. */ + goto end; + } + + ctf_trace_try_destroy(session, ctf_trace); + +end: + return; +} + /* * This thread manages the listening for new connections on the network */ @@ -964,149 +998,51 @@ error_testpoint: return NULL; } -/* - * Get stream from stream id. - * Need to be called with RCU read-side lock held. - */ -struct relay_stream *relay_stream_find_by_id(uint64_t stream_id) -{ - struct lttng_ht_node_ulong *node; - struct lttng_ht_iter iter; - struct relay_stream *ret; - - lttng_ht_lookup(relay_streams_ht, - (void *)((unsigned long) stream_id), - &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - DBG("Relay stream %" PRIu64 " not found", stream_id); - ret = NULL; - goto end; - } - - ret = caa_container_of(node, struct relay_stream, stream_n); - -end: - return ret; -} - -static -void deferred_free_stream(struct rcu_head *head) -{ - struct relay_stream *stream = - caa_container_of(head, struct relay_stream, rcu_node); - - free(stream->path_name); - free(stream->channel_name); - free(stream); -} - -static -void deferred_free_session(struct rcu_head *head) +static void try_close_streams(struct relay_session *session) { - struct relay_session *session = - caa_container_of(head, struct relay_session, rcu_node); - free(session); -} - -/* - * Close a given stream. The stream is freed using a call RCU. - * - * RCU read side lock MUST be acquired. If NO close_stream_check() was called - * BEFORE the stream lock MUST be acquired. - */ -static void destroy_stream(struct relay_stream *stream) -{ - int delret; - struct relay_viewer_stream *vstream; + struct ctf_trace *ctf_trace; struct lttng_ht_iter iter; - assert(stream); + assert(session); - delret = close(stream->fd); - if (delret < 0) { - PERROR("close stream"); - } + pthread_mutex_lock(&session->viewer_ready_lock); + rcu_read_lock(); + cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, + node.node) { + struct relay_stream *stream; - if (stream->index_fd >= 0) { - delret = close(stream->index_fd); - if (delret < 0) { - PERROR("close stream index_fd"); + /* Close streams. */ + cds_list_for_each_entry(stream, &ctf_trace->stream_list, trace_list) { + stream_close(session, stream); } - } - - vstream = viewer_stream_find_by_id(stream->stream_handle); - if (vstream) { - /* - * Set the last good value into the viewer stream. This is done - * right before the stream gets deleted from the hash table. The - * lookup failure on the live thread side of a stream indicates - * that the viewer stream index received value should be used. - */ - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - vstream->total_index_received = stream->total_index_received; - vstream->tracefile_count_last = stream->tracefile_count_current; - vstream->close_write_flag = 1; - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); - } - - /* Cleanup index of that stream. */ - relay_index_destroy_by_stream_id(stream->stream_handle); - iter.iter.node = &stream->stream_n.node; - delret = lttng_ht_del(relay_streams_ht, &iter); - assert(!delret); - iter.iter.node = &stream->ctf_trace_node.node; - delret = lttng_ht_del(stream->ctf_traces_ht, &iter); - assert(!delret); - - if (stream->ctf_trace) { - ctf_trace_try_destroy(stream->ctf_trace); + ctf_trace->invalid_flag = 1; + ctf_trace_try_destroy(session, ctf_trace); } - - call_rcu(&stream->rcu_node, deferred_free_stream); - DBG("Closed tracefile %d from close stream", stream->fd); + rcu_read_unlock(); + pthread_mutex_unlock(&session->viewer_ready_lock); } /* - * relay_delete_session: Free all memory associated with a session and - * close all the FDs + * Try to destroy a session within a connection. */ static void relay_delete_session(struct relay_command *cmd, struct lttng_ht *sessions_ht) { - struct lttng_ht_iter iter; - struct lttng_ht_node_ulong *node; - struct relay_stream *stream; - int ret; + assert(cmd); + assert(sessions_ht); - if (!cmd->session) { - return; - } + /* Indicate that this session can be destroyed from now on. */ + cmd->session->close_flag = 1; - DBG("Relay deleting session %" PRIu64, cmd->session->id); + try_close_streams(cmd->session); - rcu_read_lock(); - cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) { - node = lttng_ht_iter_get_node_ulong(&iter); - if (!node) { - continue; - } - stream = caa_container_of(node, struct relay_stream, stream_n); - if (stream->session == cmd->session) { - destroy_stream(stream); - cmd->session->stream_count--; - assert(cmd->session->stream_count >= 0); - } - } - - /* Make this session not visible anymore. */ - iter.iter.node = &cmd->session->session_n.node; - ret = lttng_ht_del(sessions_ht, &iter); - assert(!ret); - call_rcu(&cmd->session->rcu_node, deferred_free_session); - rcu_read_unlock(); + /* + * This will try to delete and destroy the session if no viewer is attached + * to it meaning the refcount is down to zero. + */ + session_try_destroy(sessions_ht, cmd->session); } /* @@ -1150,38 +1086,30 @@ int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr, memset(&reply, 0, sizeof(reply)); - session = zmalloc(sizeof(struct relay_session)); - if (session == NULL) { - PERROR("relay session zmalloc"); + session = session_create(); + if (!session) { ret = -1; goto error; } - - session->id = ++last_relay_session_id; - session->sock = cmd->sock; session->minor = cmd->minor; session->major = cmd->major; - pthread_mutex_init(&session->viewer_ready_lock, NULL); + cmd->session_id = session->id; cmd->session = session; reply.session_id = htobe64(session->id); switch (cmd->minor) { - case 1: - case 2: - case 3: - break; - case 4: /* LTTng sessiond 2.4 */ - default: - ret = cmd_create_session_2_4(cmd, session); - break; + case 1: + case 2: + case 3: + break; + case 4: /* LTTng sessiond 2.4 */ + default: + ret = cmd_create_session_2_4(cmd, session); + break; } - lttng_ht_node_init_ulong(&session->session_n, - (unsigned long) session->id); - lttng_ht_add_unique_ulong(sessions_ht, - &session->session_n); - + lttng_ht_add_unique_u64(sessions_ht, &session->session_n); DBG("Created session %" PRIu64, session->id); error: @@ -1207,32 +1135,14 @@ error: static void set_viewer_ready_flag(struct relay_command *cmd) { - struct relay_stream_recv_handle *node, *tmp_node; + struct relay_stream *stream, *tmp_stream; pthread_mutex_lock(&cmd->session->viewer_ready_lock); - - cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) { - struct relay_stream *stream; - - rcu_read_lock(); - stream = relay_stream_find_by_id(node->id); - if (!stream) { - /* - * Stream is most probably being cleaned up by the data thread thus - * simply continue to the next one. - */ - rcu_read_unlock(); - continue; - } - + cds_list_for_each_entry_safe(stream, tmp_stream, &cmd->recv_head, + recv_list) { stream->viewer_ready = 1; - rcu_read_unlock(); - - /* Clean stream handle node. */ - cds_list_del(&node->node); - free(node); + cds_list_del(&stream->recv_list); } - pthread_mutex_unlock(&cmd->session->viewer_ready_lock); return; } @@ -1242,20 +1152,12 @@ void set_viewer_ready_flag(struct relay_command *cmd) * handle. A new node is allocated thus must be freed when the node is deleted * from the list. */ -static void queue_stream_handle(uint64_t handle, struct relay_command *cmd) +static void queue_stream(struct relay_stream *stream, struct relay_command *cmd) { - struct relay_stream_recv_handle *node; - assert(cmd); + assert(stream); - node = zmalloc(sizeof(*node)); - if (!node) { - PERROR("zmalloc queue stream handle"); - return; - } - - node->id = handle; - cds_list_add(&node->node, &cmd->recv_head); + cds_list_add(&stream->recv_list, &cmd->recv_head); } /* @@ -1265,10 +1167,11 @@ static int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd, struct lttng_ht *sessions_ht) { + int ret, send_ret; struct relay_session *session = cmd->session; struct relay_stream *stream = NULL; struct lttcomm_relayd_status_stream reply; - int ret, send_ret; + struct ctf_trace *trace; if (!session || cmd->version_check_done == 0) { ERR("Trying to add a stream before version check"); @@ -1299,10 +1202,10 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, rcu_read_lock(); stream->stream_handle = ++last_relay_stream_id; stream->prev_seq = -1ULL; - stream->session = session; + stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; - stream->ctf_trace = NULL; + lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); @@ -1328,42 +1231,37 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); } - if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { - stream->metadata_flag = 1; - /* - * When we receive a new metadata stream, we create a new - * ctf_trace and we assign this ctf_trace to all streams with - * the same path. - * - * If later on we receive a new stream for the same ctf_trace, - * we copy the information from the first hit in the HT to the - * new stream. - */ - stream->ctf_trace = ctf_trace_create(); - if (!stream->ctf_trace) { + trace = ctf_trace_find_by_path(session->ctf_traces_ht, stream->path_name); + if (!trace) { + trace = ctf_trace_create(stream->path_name); + if (!trace) { ret = -1; goto end; } - stream->ctf_trace->refcount++; - stream->ctf_trace->metadata_stream = stream; + ctf_trace_add(session->ctf_traces_ht, trace); + } + ctf_trace_get_ref(trace); + + if (!strncmp(stream->channel_name, DEFAULT_METADATA_NAME, NAME_MAX)) { + stream->metadata_flag = 1; + /* Assign quick reference to the metadata stream in the trace. */ + trace->metadata_stream = stream; } - ctf_trace_assign(cmd->ctf_traces_ht, stream); - stream->ctf_traces_ht = cmd->ctf_traces_ht; /* - * Add the stream handle in the recv list of the connection. Once the end - * stream message is received, this list is emptied and streams are set - * with the viewer ready flag. + * Add the stream in the recv list of the connection. Once the end stream + * message is received, this list is emptied and streams are set with the + * viewer ready flag. */ - queue_stream_handle(stream->stream_handle, cmd); + queue_stream(stream, cmd); - lttng_ht_node_init_ulong(&stream->stream_n, - (unsigned long) stream->stream_handle); - lttng_ht_add_unique_ulong(relay_streams_ht, - &stream->stream_n); + /* + * Both in the ctf_trace object and the global stream ht since the data + * side of the relayd does not have the concept of session. + */ + lttng_ht_add_unique_u64(relay_streams_ht, &stream->node); + cds_list_add_tail(&stream->trace_list, &trace->stream_list); - lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name); - lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node); session->stream_count++; DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, @@ -1433,7 +1331,8 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, } rcu_read_lock(); - stream = relay_stream_find_by_id(be64toh(stream_info.stream_id)); + stream = stream_find_by_id(relay_streams_ht, + be64toh(stream_info.stream_id)); if (!stream) { ret = -1; goto end_unlock; @@ -1444,9 +1343,8 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, session->stream_count--; assert(session->stream_count >= 0); - if (close_stream_check(stream)) { - destroy_stream(stream); - } + /* Check if we can close it or else the data will do it. */ + try_close_stream(session, stream); end_unlock: rcu_read_unlock(); @@ -1554,6 +1452,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload *metadata_struct; struct relay_stream *metadata_stream; uint64_t data_size, payload_size; + struct ctf_trace *ctf_trace; if (!session) { ERR("Metadata sent before version check"); @@ -1599,7 +1498,7 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer; rcu_read_lock(); - metadata_stream = relay_stream_find_by_id( + metadata_stream = stream_find_by_id(relay_streams_ht, be64toh(metadata_struct->stream_id)); if (!metadata_stream) { ret = -1; @@ -1619,7 +1518,11 @@ int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, if (ret < 0) { goto end_unlock; } - metadata_stream->ctf_trace->metadata_received += + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + metadata_stream->path_name); + assert(ctf_trace); + ctf_trace->metadata_received += payload_size + be32toh(metadata_struct->padding_size); DBG2("Relay metadata written"); @@ -1731,7 +1634,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, last_net_seq_num = be64toh(msg.last_net_seq_num); rcu_read_lock(); - stream = relay_stream_find_by_id(stream_id); + stream = stream_find_by_id(relay_streams_ht, stream_id); if (stream == NULL) { ret = -1; goto end_unlock; @@ -1809,7 +1712,7 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr, rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { + node.node) { if (stream->stream_handle == stream_id) { stream->data_pending_check_done = 1; DBG("Relay quiescent control pending flag set to %" PRIu64, @@ -1880,8 +1783,8 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr, */ rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { - if (stream->session->id == session_id) { + node.node) { + if (stream->session_id == session_id) { stream->data_pending_check_done = 0; DBG("Set begin data pending flag to stream %" PRIu64, stream->stream_handle); @@ -1951,8 +1854,8 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr, /* Iterate over all streams to see if the begin data pending flag is set. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, - stream_n.node) { - if (stream->session->id == session_id && + node.node) { + if (stream->session_id == session_id && !stream->data_pending_check_done) { is_data_inflight = 1; DBG("Data is still in flight for stream %" PRIu64, @@ -2017,7 +1920,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, net_seq_num = be64toh(index_info.net_seq_num); rcu_read_lock(); - stream = relay_stream_find_by_id(be64toh(index_info.relay_stream_id)); + stream = stream_find_by_id(relay_streams_ht, + be64toh(index_info.relay_stream_id)); if (!stream) { ret = -1; goto end_rcu_unlock; @@ -2296,7 +2200,7 @@ error: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd) +int relay_process_data(struct relay_command *cmd, struct lttng_ht *sessions_ht) { int ret = 0, rotate_index = 0; ssize_t size_ret; @@ -2305,6 +2209,7 @@ int relay_process_data(struct relay_command *cmd) uint64_t stream_id; uint64_t net_seq_num; uint32_t data_size; + struct relay_session *session; ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr, sizeof(struct lttcomm_relayd_data_hdr), 0); @@ -2322,12 +2227,15 @@ int relay_process_data(struct relay_command *cmd) stream_id = be64toh(data_hdr.stream_id); rcu_read_lock(); - stream = relay_stream_find_by_id(stream_id); + stream = stream_find_by_id(relay_streams_ht, stream_id); if (!stream) { ret = -1; goto end_rcu_unlock; } + session = session_find_by_id(sessions_ht, stream->session_id); + assert(session); + data_size = be32toh(data_hdr.data_size); if (data_buffer_size < data_size) { char *tmp_data_ptr; @@ -2423,7 +2331,7 @@ int relay_process_data(struct relay_command *cmd) * Index are handled in protocol version 2.4 and above. Also, snapshot and * index are NOT supported. */ - if (stream->session->minor >= 4 && !stream->session->snapshot) { + if (session->minor >= 4 && !session->snapshot) { ret = handle_index_data(stream, net_seq_num, rotate_index); if (ret < 0) { goto end_rcu_unlock; @@ -2449,10 +2357,7 @@ int relay_process_data(struct relay_command *cmd) stream->prev_seq = net_seq_num; - /* Check if we need to close the FD */ - if (close_stream_check(stream)) { - destroy_stream(stream); - } + try_close_stream(session, stream); end_rcu_unlock: rcu_read_unlock(); @@ -2492,19 +2397,6 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, } CDS_INIT_LIST_HEAD(&relay_connection->recv_head); - /* - * Only used by the control side and the reference is copied inside each - * stream from that connection. Thus a destroy HT must be done after every - * stream has been destroyed. - */ - if (relay_connection->type == RELAY_CONTROL) { - relay_connection->ctf_traces_ht = lttng_ht_new(0, - LTTNG_HT_TYPE_STRING); - if (!relay_connection->ctf_traces_ht) { - goto error_read; - } - } - lttng_ht_node_init_ulong(&relay_connection->sock_n, (unsigned long) relay_connection->sock->fd); rcu_read_lock(); @@ -2542,17 +2434,16 @@ void relay_del_connection(struct lttng_ht *relay_connections_ht, assert(!ret); if (relay_connection->type == RELAY_CONTROL) { - struct relay_stream_recv_handle *node, *tmp_node; - - relay_delete_session(relay_connection, sessions_ht); - lttng_ht_destroy(relay_connection->ctf_traces_ht); + struct relay_stream *stream, *tmp_stream; /* Clean up recv list. */ - cds_list_for_each_entry_safe(node, tmp_node, - &relay_connection->recv_head, node) { - cds_list_del(&node->node); - free(node); + cds_list_for_each_entry_safe(stream, tmp_stream, + &relay_connection->recv_head, recv_list) { + cds_list_del(&stream->recv_list); } + + relay_delete_session(relay_connection, sessions_ht); + } call_rcu(&relay_connection->rcu_node, deferred_free_connection); @@ -2788,7 +2679,8 @@ restart: continue; } - ret = relay_process_data(relay_connection); + ret = relay_process_data(relay_connection, + sessions_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2963,13 +2855,13 @@ int main(int argc, char **argv) } /* tables of sessions indexed by session ID */ - relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_ctx->sessions_ht) { goto exit_relay_ctx_sessions; } /* tables of streams indexed by stream ID */ - relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG); + relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!relay_streams_ht) { goto exit_relay_ctx_streams; } diff --git a/src/bin/lttng-relayd/session.c b/src/bin/lttng-relayd/session.c index 07f1d9f03..02cc748e7 100644 --- a/src/bin/lttng-relayd/session.c +++ b/src/bin/lttng-relayd/session.c @@ -16,7 +16,52 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#define _GNU_SOURCE +#include + +#include "ctf-trace.h" #include "session.h" +#include "stream.h" + +/* Global session id used in the session creation. */ +static uint64_t last_relay_session_id; + +static void rcu_destroy_session(struct rcu_head *head) +{ + struct relay_session *session = + caa_container_of(head, struct relay_session, rcu_node); + + free(session); +} + +/* + * Create a new session by assigning a new session ID. + * + * Return allocated session or else NULL. + */ +struct relay_session *session_create(void) +{ + struct relay_session *session; + + session = zmalloc(sizeof(*session)); + if (!session) { + PERROR("relay session zmalloc"); + goto error; + } + + session->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING); + if (!session->ctf_traces_ht) { + free(session); + goto error; + } + + pthread_mutex_init(&session->viewer_ready_lock, NULL); + session->id = ++last_relay_session_id; + lttng_ht_node_init_u64(&session->session_n, session->id); + +error: + return session; +} /* * Lookup a session within the given hash table and session id. RCU read side @@ -28,18 +73,107 @@ struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id) { struct relay_session *session = NULL; - struct lttng_ht_node_ulong *node; + struct lttng_ht_node_u64 *node; struct lttng_ht_iter iter; assert(ht); - lttng_ht_lookup(ht, (void *)((unsigned long) id), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); + lttng_ht_lookup(ht, &id, &iter); + node = lttng_ht_iter_get_node_u64(&iter); if (!node) { + DBG("Session find by ID %" PRIu64 " id NOT found", id); goto end; } session = caa_container_of(node, struct relay_session, session_n); + DBG("Session find by ID %" PRIu64 " id found", id); end: return session; } + +/* + * Delete session from the given hash table. + * + * Return lttng ht del error code being 0 on success and 1 on failure. + */ +int session_delete(struct lttng_ht *ht, struct relay_session *session) +{ + struct lttng_ht_iter iter; + + assert(ht); + assert(session); + + iter.iter.node = &session->session_n.node; + return lttng_ht_del(ht, &iter); +} + +/* + * The caller MUST be from the viewer thread since the viewer refcount is + * decremented. With this calue down to 0, it will try to destroy the session. + */ +void session_viewer_try_destroy(struct lttng_ht *ht, + struct relay_session *session) +{ + unsigned long ret_ref; + + assert(session); + + ret_ref = uatomic_add_return(&session->viewer_refcount, -1); + if (ret_ref == 0) { + session_try_destroy(ht, session); + } +} + +/* + * Should only be called from the main streaming thread since it does not touch + * the viewer refcount. If this refcount is down to 0, destroy the session only + * and only if the session deletion succeeds. This is done because the viewer + * *and* the streaming thread can both concurently try to destroy the session + * thus the first come first serve. + */ +void session_try_destroy(struct lttng_ht *ht, struct relay_session *session) +{ + int ret = 0; + unsigned long ret_ref; + + assert(session); + + ret_ref = uatomic_read(&session->viewer_refcount); + if (ret_ref == 0 && session->close_flag) { + if (ht) { + ret = session_delete(ht, session); + } + if (!ret) { + /* Only destroy the session if the deletion was successful. */ + session_destroy(session); + } + } +} + +/* + * Destroy a session object. + */ +void session_destroy(struct relay_session *session) +{ + struct ctf_trace *ctf_trace; + struct lttng_ht_iter iter; + + assert(session); + + DBG("Relay destroying session %" PRIu64, session->id); + + /* + * Empty the ctf trace hash table which will destroy the stream contained + * in that table. + */ + rcu_read_lock(); + cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace, + node.node) { + ctf_trace_delete(session->ctf_traces_ht, ctf_trace); + ctf_trace_destroy(ctf_trace); + } + lttng_ht_destroy(session->ctf_traces_ht); + rcu_read_unlock(); + + call_rcu(&session->rcu_node, rcu_destroy_session); +} diff --git a/src/bin/lttng-relayd/session.h b/src/bin/lttng-relayd/session.h index 698e8ef7f..4953cdcd3 100644 --- a/src/bin/lttng-relayd/session.h +++ b/src/bin/lttng-relayd/session.h @@ -35,16 +35,22 @@ struct relay_session { * daemon which can provide multiple data source. */ uint64_t id; - struct lttcomm_sock *sock; char session_name[NAME_MAX]; char hostname[HOST_NAME_MAX]; uint32_t live_timer; - struct lttng_ht_node_ulong session_n; + struct lttng_ht_node_u64 session_n; struct rcu_head rcu_node; - uint32_t viewer_attached; uint32_t stream_count; /* Tell if this session is for a snapshot or not. */ unsigned int snapshot:1; + /* Tell if the session has been closed on the streaming side. */ + unsigned int close_flag:1; + + /* Number of viewer using it. Set to 0, it should be destroyed. */ + int viewer_refcount; + + /* Contains ctf_trace object of that session indexed by path name. */ + struct lttng_ht *ctf_traces_ht; /* * Indicate version protocol for this session. This is especially useful @@ -67,6 +73,34 @@ struct relay_session { pthread_mutex_t viewer_ready_lock; }; +static inline void session_viewer_attach(struct relay_session *session) +{ + uatomic_inc(&session->viewer_refcount); +} + +static inline void session_viewer_detach(struct relay_session *session) +{ + uatomic_add(&session->viewer_refcount, -1); +} + struct relay_session *session_find_by_id(struct lttng_ht *ht, uint64_t id); +struct relay_session *session_create(void); +int session_delete(struct lttng_ht *ht, struct relay_session *session); + +/* + * Direct destroy without reading the refcount. + */ +void session_destroy(struct relay_session *session); + +/* + * Destroy the session if the refcount is down to 0. + */ +void session_try_destroy(struct lttng_ht *ht, struct relay_session *session); + +/* + * Decrement the viewer refcount and destroy it if down to 0. + */ +void session_viewer_try_destroy(struct lttng_ht *ht, + struct relay_session *session); #endif /* _SESSION_H */ diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c new file mode 100644 index 000000000..fff8065b0 --- /dev/null +++ b/src/bin/lttng-relayd/stream.c @@ -0,0 +1,153 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _GNU_SOURCE +#include + +#include "index.h" +#include "stream.h" +#include "viewer-stream.h" + +static void rcu_destroy_stream(struct rcu_head *head) +{ + struct relay_stream *stream = + caa_container_of(head, struct relay_stream, rcu_node); + + free(stream->path_name); + free(stream->channel_name); + free(stream); +} + +/* + * Get stream from stream id from the given hash table. Return stream if found + * else NULL. + * + * Need to be called with RCU read-side lock held. + */ +struct relay_stream *stream_find_by_id(struct lttng_ht *ht, + uint64_t stream_id) +{ + struct lttng_ht_node_u64 *node; + struct lttng_ht_iter iter; + struct relay_stream *stream = NULL; + + assert(ht); + + lttng_ht_lookup(ht, &stream_id, &iter); + node = lttng_ht_iter_get_node_u64(&iter); + if (node == NULL) { + DBG("Relay stream %" PRIu64 " not found", stream_id); + goto end; + } + stream = caa_container_of(node, struct relay_stream, node); + +end: + return stream; +} + +/* + * Close a given stream. If an assosiated viewer stream exists it is updated. + * + * RCU read side lock MUST be acquired. + * + * Return 0 if close was successful or 1 if already closed. + */ +int stream_close(struct relay_session *session, struct relay_stream *stream) +{ + int delret, ret; + struct relay_viewer_stream *vstream; + struct ctf_trace *ctf_trace; + + assert(stream); + + pthread_mutex_lock(&stream->lock); + + if (stream->terminated_flag) { + /* This stream is already closed. Ignore. */ + ret = 1; + goto end_unlock; + } + + DBG("Closing stream id %" PRIu64, stream->stream_handle); + + if (stream->fd >= 0) { + delret = close(stream->fd); + if (delret < 0) { + PERROR("close stream"); + } + } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } + + vstream = viewer_stream_find_by_id(stream->stream_handle); + if (vstream) { + /* + * Set the last good value into the viewer stream. This is done + * right before the stream gets deleted from the hash table. The + * lookup failure on the live thread side of a stream indicates + * that the viewer stream index received value should be used. + */ + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); + vstream->total_index_received = stream->total_index_received; + vstream->tracefile_count_last = stream->tracefile_count_current; + vstream->close_write_flag = 1; + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); + } + + /* Cleanup index of that stream. */ + relay_index_destroy_by_stream_id(stream->stream_handle); + + ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht, + stream->path_name); + assert(ctf_trace); + ctf_trace_put_ref(ctf_trace); + + stream->terminated_flag = 1; + ret = 0; + +end_unlock: + pthread_mutex_unlock(&stream->lock); + return ret; +} + +void stream_delete(struct lttng_ht *ht, struct relay_stream *stream) +{ + int ret; + struct lttng_ht_iter iter; + + assert(ht); + assert(stream); + + iter.iter.node = &stream->node.node; + ret = lttng_ht_del(ht, &iter); + assert(!ret); + + cds_list_del(&stream->trace_list); +} + +void stream_destroy(struct relay_stream *stream) +{ + assert(stream); + + call_rcu(&stream->rcu_node, rcu_destroy_stream); +} diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h new file mode 100644 index 000000000..c6bdb307e --- /dev/null +++ b/src/bin/lttng-relayd/stream.h @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2013 - Julien Desfossez + * David Goulet + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef _STREAM_H +#define _STREAM_H + +#include +#include +#include +#include + +#include + +#include "session.h" + +/* + * Represents a stream in the relay + */ +struct relay_stream { + uint64_t stream_handle; + uint64_t prev_seq; /* previous data sequence number encountered */ + struct lttng_ht_node_u64 node; + /* + * When we receive a stream, it gets stored in a list (on a per connection + * basis) until we have all the streams of the same channel and the metadata + * associated with it, then it gets flagged with viewer_ready. + */ + struct cds_list_head recv_list; + + /* Added to the corresponding ctf_trace. */ + struct cds_list_head trace_list; + struct rcu_head rcu_node; + uint64_t session_id; + int fd; + /* FD on which to write the index data. */ + int index_fd; + /* FD on which to read the index data for the viewer. */ + int read_index_fd; + + char *path_name; + char *channel_name; + /* on-disk circular buffer of tracefiles */ + uint64_t tracefile_size; + uint64_t tracefile_size_current; + uint64_t tracefile_count; + uint64_t tracefile_count_current; + /* To inform the viewer up to where it can go back in time. */ + uint64_t oldest_tracefile_id; + + uint64_t total_index_received; + uint64_t last_net_seq_num; + + /* + * To protect from concurrent read/update. Also used to synchronize the + * closing of this stream. + */ + pthread_mutex_t lock; + + /* + * If the stream is inactive, this field is updated with the live beacon + * timestamp end, when it is active, this field == -1ULL. + */ + uint64_t beacon_ts_end; + /* + * To protect the update of the close_write_flag and the checks of + * the tracefile_count_current. + * It is taken before checking whenever we need to know if the + * writer and reader are working in the same tracefile. + */ + pthread_mutex_t viewer_stream_rotation_lock; + + /* Information telling us when to close the stream */ + unsigned int close_flag:1; + /* + * Indicates if the stream has been effectively closed thus having the + * information in it invalidated but NOT freed. The stream lock MUST be + * held to read/update that value. + */ + unsigned int terminated_flag:1; + /* Indicate if the stream was initialized for a data pending command. */ + unsigned int data_pending_check_done:1; + unsigned int metadata_flag:1; + /* + * To detect when we start overwriting old data, it is used to + * update the oldest_tracefile_id. + */ + unsigned int tracefile_overwrite:1; + /* + * Can this stream be used by a viewer or are we waiting for additional + * information. + */ + unsigned int viewer_ready:1; +}; + +struct relay_stream *stream_find_by_id(struct lttng_ht *ht, + uint64_t stream_id); +int stream_close(struct relay_session *session, struct relay_stream *stream); +void stream_delete(struct lttng_ht *ht, struct relay_stream *stream); +void stream_destroy(struct relay_stream *stream); + +#endif /* _STREAM_H */ diff --git a/src/bin/lttng-relayd/viewer-stream.c b/src/bin/lttng-relayd/viewer-stream.c index 3953d7dd1..a16f331b8 100644 --- a/src/bin/lttng-relayd/viewer-stream.c +++ b/src/bin/lttng-relayd/viewer-stream.c @@ -41,11 +41,12 @@ static void deferred_free_viewer_stream(struct rcu_head *head) } struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t) + enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace) { struct relay_viewer_stream *vstream; assert(stream); + assert(ctf_trace); vstream = zmalloc(sizeof(*vstream)); if (!vstream) { @@ -53,7 +54,7 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->session_id = stream->session->id; + vstream->session_id = stream->session_id; vstream->stream_handle = stream->stream_handle; vstream->path_name = strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX); vstream->channel_name = strndup(stream->channel_name, @@ -74,11 +75,9 @@ struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, goto error; } - vstream->ctf_trace = stream->ctf_trace; if (vstream->metadata_flag) { - vstream->ctf_trace->viewer_metadata_stream = vstream; + ctf_trace->viewer_metadata_stream = vstream; } - uatomic_inc(&vstream->ctf_trace->refcount); /* Globally visible after the add unique. */ lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle); @@ -144,14 +143,16 @@ void viewer_stream_delete(struct relay_viewer_stream *stream) assert(!ret); } -void viewer_stream_destroy(struct relay_viewer_stream *stream) +void viewer_stream_destroy(struct ctf_trace *ctf_trace, + struct relay_viewer_stream *stream) { int ret; - unsigned long ret_ref; assert(stream); - ret_ref = uatomic_add_return(&stream->ctf_trace->refcount, -1); - assert(ret_ref >= 0); + + if (ctf_trace) { + ctf_trace_put_ref(ctf_trace); + } if (stream->read_fd >= 0) { ret = close(stream->read_fd); @@ -166,25 +167,6 @@ void viewer_stream_destroy(struct relay_viewer_stream *stream) } } - /* - * If the only stream left in the HT is the metadata stream, - * we need to remove it because we won't detect a EOF for this - * stream. - */ - if (ret_ref == 1 && stream->ctf_trace->viewer_metadata_stream) { - viewer_stream_delete(stream->ctf_trace->viewer_metadata_stream); - viewer_stream_destroy(stream->ctf_trace->viewer_metadata_stream); - stream->ctf_trace->metadata_stream = NULL; - DBG("Freeing ctf_trace %" PRIu64, stream->ctf_trace->id); - /* - * The streaming-side is already closed and we can't receive a new - * stream concurrently at this point (since the session is being - * destroyed), so when we detect the refcount equals 0, we are the - * only owners of the ctf_trace and we can free it ourself. - */ - free(stream->ctf_trace); - } - call_rcu(&stream->rcu_node, deferred_free_viewer_stream); } @@ -223,6 +205,13 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, uint64_t tracefile_id; assert(vstream); + assert(stream); + + if (vstream->tracefile_count == 0) { + /* Ignore rotation, there is none to do. */ + ret = 0; + goto end; + } tracefile_id = (vstream->tracefile_count_current + 1) % vstream->tracefile_count; @@ -236,19 +225,16 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, } /* - * If the stream on the streaming side still exists, lock to execute - * rotation in order to avoid races between a modification on the index - * values. + * Lock to execute rotation in order to avoid races between a modification + * on the index values. */ - if (stream) { - pthread_mutex_lock(&stream->viewer_stream_rotation_lock); - } + pthread_mutex_lock(&stream->viewer_stream_rotation_lock); /* * The writer and the reader are not working in the same tracefile, we can * read up to EOF, we don't care about the total_index_received. */ - if (!stream || (stream->tracefile_count_current != tracefile_id)) { + if (stream->close_flag || (stream->tracefile_count_current != tracefile_id)) { vstream->close_write_flag = 1; } else { /* @@ -256,7 +242,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, * limit our reading to the number of indexes received. */ vstream->close_write_flag = 0; - if (stream) { + if (stream->close_flag) { vstream->total_index_received = stream->total_index_received; } } @@ -278,9 +264,7 @@ int viewer_stream_rotate(struct relay_viewer_stream *vstream, vstream->abort_flag = 0; pthread_mutex_unlock(&vstream->overwrite_lock); - if (stream) { - pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); - } + pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); ret = index_open(vstream->path_name, vstream->channel_name, vstream->tracefile_count, vstream->tracefile_count_current); diff --git a/src/bin/lttng-relayd/viewer-stream.h b/src/bin/lttng-relayd/viewer-stream.h index 793fa7cca..003b1197c 100644 --- a/src/bin/lttng-relayd/viewer-stream.h +++ b/src/bin/lttng-relayd/viewer-stream.h @@ -27,6 +27,7 @@ #include "ctf-trace.h" #include "lttng-viewer-abi.h" +#include "stream.h" /* Stub */ struct relay_stream; @@ -77,9 +78,10 @@ struct relay_viewer_stream { }; struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream, - enum lttng_viewer_seek seek_t); + enum lttng_viewer_seek seek_t, struct ctf_trace *ctf_trace); struct relay_viewer_stream *viewer_stream_find_by_id(uint64_t id); -void viewer_stream_destroy(struct relay_viewer_stream *stream); +void viewer_stream_destroy(struct ctf_trace *ctf_trace, + struct relay_viewer_stream *stream); void viewer_stream_delete(struct relay_viewer_stream *stream); int viewer_stream_rotate(struct relay_viewer_stream *vstream, struct relay_stream *stream); -- 2.34.1