Fix: relayd: failure to read index entry or stream packet after clear
[lttng-tools.git] / src / bin / lttng-relayd / viewer-stream.c
... / ...
CommitLineData
1/*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 *
6 * SPDX-License-Identifier: GPL-2.0-only
7 *
8 */
9
10#define _LGPL_SOURCE
11#include <common/common.h>
12#include <common/index/index.h>
13#include <common/compat/string.h>
14#include <common/utils.h>
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <fcntl.h>
18
19#include "lttng-relayd.h"
20#include "viewer-stream.h"
21
22static void viewer_stream_destroy(struct relay_viewer_stream *vstream)
23{
24 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
25 free(vstream->path_name);
26 free(vstream->channel_name);
27 free(vstream);
28}
29
30static void viewer_stream_destroy_rcu(struct rcu_head *head)
31{
32 struct relay_viewer_stream *vstream =
33 caa_container_of(head, struct relay_viewer_stream, rcu_node);
34
35 viewer_stream_destroy(vstream);
36}
37
38/* Relay stream's lock must be held by the caller. */
39struct relay_viewer_stream *viewer_stream_create(struct relay_stream *stream,
40 struct lttng_trace_chunk *trace_chunk,
41 enum lttng_viewer_seek seek_t)
42{
43 struct relay_viewer_stream *vstream = NULL;
44
45 ASSERT_LOCKED(stream->lock);
46
47 vstream = zmalloc(sizeof(*vstream));
48 if (!vstream) {
49 PERROR("relay viewer stream zmalloc");
50 goto error;
51 }
52
53 if (trace_chunk) {
54 const bool acquired_reference = lttng_trace_chunk_get(
55 trace_chunk);
56
57 assert(acquired_reference);
58 }
59
60 vstream->stream_file.trace_chunk = trace_chunk;
61 vstream->path_name = lttng_strndup(stream->path_name, LTTNG_VIEWER_PATH_MAX);
62 if (vstream->path_name == NULL) {
63 PERROR("relay viewer path_name alloc");
64 goto error;
65 }
66 vstream->channel_name = lttng_strndup(stream->channel_name,
67 LTTNG_VIEWER_NAME_MAX);
68 if (vstream->channel_name == NULL) {
69 PERROR("relay viewer channel_name alloc");
70 goto error;
71 }
72
73 if (!stream_get(stream)) {
74 ERR("Cannot get stream");
75 goto error;
76 }
77 vstream->stream = stream;
78
79 if (stream->is_metadata && stream->trace->viewer_metadata_stream) {
80 ERR("Cannot attach viewer metadata stream to trace (busy).");
81 goto error;
82 }
83
84 switch (seek_t) {
85 case LTTNG_VIEWER_SEEK_BEGINNING:
86 {
87 uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
88
89 if (seq_tail == -1ULL) {
90 /*
91 * Tail may not be initialized yet. Nonetheless, we know
92 * we want to send the first index once it becomes
93 * available.
94 */
95 seq_tail = 0;
96 }
97 vstream->current_tracefile_id =
98 tracefile_array_get_file_index_tail(stream->tfa);
99 vstream->index_sent_seqcount = seq_tail;
100 break;
101 }
102 case LTTNG_VIEWER_SEEK_LAST:
103 vstream->current_tracefile_id =
104 tracefile_array_get_read_file_index_head(stream->tfa);
105 /*
106 * We seek at the very end of each stream, awaiting for
107 * a future packet to eventually come in.
108 *
109 * We don't need to check the head position for -1ULL since the
110 * increment will set it to 0.
111 */
112 vstream->index_sent_seqcount =
113 tracefile_array_get_seq_head(stream->tfa) + 1;
114 break;
115 default:
116 goto error;
117 }
118
119 /*
120 * If we never received an index for the current stream, delay
121 * the opening of the index, otherwise open it right now.
122 */
123 if (stream->index_file == NULL) {
124 vstream->index_file = NULL;
125 } else {
126 const uint32_t connection_major = stream->trace->session->major;
127 const uint32_t connection_minor = stream->trace->session->minor;
128 enum lttng_trace_chunk_status chunk_status;
129
130 chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
131 vstream->stream_file.trace_chunk,
132 stream->path_name,
133 stream->channel_name, stream->tracefile_size,
134 vstream->current_tracefile_id,
135 lttng_to_index_major(connection_major,
136 connection_minor),
137 lttng_to_index_minor(connection_major,
138 connection_minor),
139 true, &vstream->index_file);
140 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
141 if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
142 vstream->index_file = NULL;
143 } else {
144 goto error;
145 }
146 }
147 }
148
149 /*
150 * If we never received a data file for the current stream, delay the
151 * opening, otherwise open it right now.
152 */
153 if (stream->file) {
154 int ret;
155 char file_path[LTTNG_PATH_MAX];
156 enum lttng_trace_chunk_status status;
157
158 ret = utils_stream_file_path(stream->path_name,
159 stream->channel_name, stream->tracefile_size,
160 vstream->current_tracefile_id, NULL, file_path,
161 sizeof(file_path));
162 if (ret < 0) {
163 goto error;
164 }
165
166 status = lttng_trace_chunk_open_fs_handle(
167 vstream->stream_file.trace_chunk, file_path,
168 O_RDONLY, 0, &vstream->stream_file.handle,
169 true);
170 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
171 goto error;
172 }
173 }
174
175 if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
176 off_t lseek_ret;
177
178 lseek_ret = fs_handle_seek(
179 vstream->index_file->file, 0, SEEK_END);
180 if (lseek_ret < 0) {
181 goto error;
182 }
183 }
184 if (stream->is_metadata) {
185 rcu_assign_pointer(stream->trace->viewer_metadata_stream,
186 vstream);
187 }
188
189 vstream->last_seen_rotation_count = stream->completed_rotation_count;
190
191 /* Globally visible after the add unique. */
192 lttng_ht_node_init_u64(&vstream->stream_n, stream->stream_handle);
193 urcu_ref_init(&vstream->ref);
194 lttng_ht_add_unique_u64(viewer_streams_ht, &vstream->stream_n);
195
196 return vstream;
197
198error:
199 if (vstream) {
200 viewer_stream_destroy(vstream);
201 }
202 return NULL;
203}
204
205static void viewer_stream_unpublish(struct relay_viewer_stream *vstream)
206{
207 int ret;
208 struct lttng_ht_iter iter;
209
210 iter.iter.node = &vstream->stream_n.node;
211 ret = lttng_ht_del(viewer_streams_ht, &iter);
212 assert(!ret);
213}
214
215static void viewer_stream_release(struct urcu_ref *ref)
216{
217 struct relay_viewer_stream *vstream = caa_container_of(ref,
218 struct relay_viewer_stream, ref);
219
220 if (vstream->stream->is_metadata) {
221 rcu_assign_pointer(vstream->stream->trace->viewer_metadata_stream, NULL);
222 }
223
224 viewer_stream_unpublish(vstream);
225
226 if (vstream->stream_file.handle) {
227 fs_handle_close(vstream->stream_file.handle);
228 vstream->stream_file.handle = NULL;
229 }
230 if (vstream->index_file) {
231 lttng_index_file_put(vstream->index_file);
232 vstream->index_file = NULL;
233 }
234 if (vstream->stream) {
235 stream_put(vstream->stream);
236 vstream->stream = NULL;
237 }
238 lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
239 vstream->stream_file.trace_chunk = NULL;
240 call_rcu(&vstream->rcu_node, viewer_stream_destroy_rcu);
241}
242
243/* Must be called with RCU read-side lock held. */
244bool viewer_stream_get(struct relay_viewer_stream *vstream)
245{
246 return urcu_ref_get_unless_zero(&vstream->ref);
247}
248
249/*
250 * Get viewer stream by id.
251 *
252 * Return viewer stream if found else NULL.
253 */
254struct relay_viewer_stream *viewer_stream_get_by_id(uint64_t id)
255{
256 struct lttng_ht_node_u64 *node;
257 struct lttng_ht_iter iter;
258 struct relay_viewer_stream *vstream = NULL;
259
260 rcu_read_lock();
261 lttng_ht_lookup(viewer_streams_ht, &id, &iter);
262 node = lttng_ht_iter_get_node_u64(&iter);
263 if (!node) {
264 DBG("Relay viewer stream %" PRIu64 " not found", id);
265 goto end;
266 }
267 vstream = caa_container_of(node, struct relay_viewer_stream, stream_n);
268 if (!viewer_stream_get(vstream)) {
269 vstream = NULL;
270 }
271end:
272 rcu_read_unlock();
273 return vstream;
274}
275
276void viewer_stream_put(struct relay_viewer_stream *vstream)
277{
278 rcu_read_lock();
279 urcu_ref_put(&vstream->ref, viewer_stream_release);
280 rcu_read_unlock();
281}
282
283void viewer_stream_close_files(struct relay_viewer_stream *vstream)
284{
285 if (vstream->index_file) {
286 lttng_index_file_put(vstream->index_file);
287 vstream->index_file = NULL;
288 }
289 if (vstream->stream_file.handle) {
290 fs_handle_close(vstream->stream_file.handle);
291 vstream->stream_file.handle = NULL;
292 }
293}
294
295void viewer_stream_sync_tracefile_array_tail(struct relay_viewer_stream *vstream)
296{
297 const struct relay_stream *stream = vstream->stream;
298 uint64_t seq_tail;
299
300 vstream->current_tracefile_id = tracefile_array_get_file_index_tail(stream->tfa);
301 seq_tail = tracefile_array_get_seq_tail(stream->tfa);
302 if (seq_tail == -1ULL) {
303 seq_tail = 0;
304 }
305 vstream->index_sent_seqcount = seq_tail;
306}
307
308/*
309 * Rotate a stream to the next tracefile.
310 *
311 * Must be called with the rstream lock held.
312 * Returns 0 on success, 1 on EOF.
313 */
314int viewer_stream_rotate(struct relay_viewer_stream *vstream)
315{
316 int ret;
317 uint64_t new_id;
318 const struct relay_stream *stream = vstream->stream;
319
320 /* Detect the last tracefile to open. */
321 if (stream->index_received_seqcount
322 == vstream->index_sent_seqcount
323 && stream->trace->session->connection_closed) {
324 ret = 1;
325 goto end;
326 }
327
328 if (stream->tracefile_count == 0) {
329 /* Ignore rotation, there is none to do. */
330 ret = 0;
331 goto end;
332 }
333
334 /*
335 * Try to move to the next file.
336 */
337 new_id = (vstream->current_tracefile_id + 1)
338 % stream->tracefile_count;
339 if (tracefile_array_seq_in_file(stream->tfa, new_id,
340 vstream->index_sent_seqcount)) {
341 vstream->current_tracefile_id = new_id;
342 } else {
343 uint64_t seq_tail = tracefile_array_get_seq_tail(stream->tfa);
344
345 /*
346 * This can only be reached on overwrite, which implies there
347 * has been data written at some point, which will have set the
348 * tail.
349 */
350 assert(seq_tail != -1ULL);
351 /*
352 * We need to resync because we lag behind tail.
353 */
354 vstream->current_tracefile_id =
355 tracefile_array_get_file_index_tail(stream->tfa);
356 vstream->index_sent_seqcount = seq_tail;
357 }
358 viewer_stream_close_files(vstream);
359 ret = 0;
360end:
361 return ret;
362}
363
364void print_viewer_streams(void)
365{
366 struct lttng_ht_iter iter;
367 struct relay_viewer_stream *vstream;
368
369 if (!viewer_streams_ht) {
370 return;
371 }
372
373 rcu_read_lock();
374 cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
375 stream_n.node) {
376 if (!viewer_stream_get(vstream)) {
377 continue;
378 }
379 DBG("vstream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
380 " session %" PRIu64,
381 vstream,
382 vstream->ref.refcount,
383 vstream->stream->stream_handle,
384 vstream->stream->trace->id,
385 vstream->stream->trace->session->id);
386 viewer_stream_put(vstream);
387 }
388 rcu_read_unlock();
389}
This page took 0.024659 seconds and 4 git commands to generate.