Fix: relayd: tracefile rotation: viewer opening missing index file
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
CommitLineData
2a174661
DG
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
7591bab1 4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
c35f9726 5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
2a174661
DG
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
6c1c0768 21#define _LGPL_SOURCE
2a174661 22#include <common/common.h>
7591bab1
MD
23#include <common/utils.h>
24#include <common/defaults.h>
c35f9726 25#include <common/sessiond-comm/relayd.h>
7591bab1
MD
26#include <urcu/rculist.h>
27#include <sys/stat.h>
2a174661 28
7591bab1 29#include "lttng-relayd.h"
2a174661
DG
30#include "index.h"
31#include "stream.h"
32#include "viewer-stream.h"
33
348a81dc
JG
34#include <sys/types.h>
35#include <fcntl.h>
36
c35f9726
JG
37#define FILE_IO_STACK_BUFFER_SIZE 65536
38
7591bab1
MD
39/* Should be called with RCU read-side lock held. */
40bool stream_get(struct relay_stream *stream)
41{
ce4d4083 42 return urcu_ref_get_unless_zero(&stream->ref);
7591bab1
MD
43}
44
2a174661 45/*
7591bab1
MD
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
2a174661 49 */
7591bab1 50struct relay_stream *stream_get_by_id(uint64_t stream_id)
2a174661
DG
51{
52 struct lttng_ht_node_u64 *node;
53 struct lttng_ht_iter iter;
54 struct relay_stream *stream = NULL;
55
7591bab1
MD
56 rcu_read_lock();
57 lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
2a174661 58 node = lttng_ht_iter_get_node_u64(&iter);
7591bab1 59 if (!node) {
2a174661
DG
60 DBG("Relay stream %" PRIu64 " not found", stream_id);
61 goto end;
62 }
63 stream = caa_container_of(node, struct relay_stream, node);
7591bab1
MD
64 if (!stream_get(stream)) {
65 stream = NULL;
66 }
2a174661 67end:
7591bab1 68 rcu_read_unlock();
2a174661
DG
69 return stream;
70}
71
c35f9726
JG
72static void stream_complete_rotation(struct relay_stream *stream)
73{
74 DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
75 lttng_trace_chunk_put(stream->trace_chunk);
76 stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
77 stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
78}
79
c35f9726
JG
80static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream *stream,
82 struct lttng_trace_chunk *trace_chunk,
83 bool force_unlink,
84 struct stream_fd **out_stream_fd)
348a81dc
JG
85{
86 int ret, fd;
c35f9726 87 char stream_path[LTTNG_PATH_MAX];
348a81dc
JG
88 enum lttng_trace_chunk_status status;
89 const int flags = O_RDWR | O_CREAT | O_TRUNC;
90 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
348a81dc
JG
91
92 ASSERT_LOCKED(stream->lock);
93 assert(stream->trace_chunk);
94
348a81dc 95 ret = utils_stream_file_path(stream->path_name, stream->channel_name,
c35f9726
JG
96 stream->tracefile_size, stream->tracefile_current_index,
97 NULL, stream_path, sizeof(stream_path));
348a81dc
JG
98 if (ret < 0) {
99 goto end;
100 }
101
c35f9726
JG
102 if (stream->tracefile_wrapped_around || force_unlink) {
103 /*
104 * The on-disk ring-buffer has wrapped around.
105 * Newly created stream files will replace existing files. Since
106 * live clients may be consuming existing files, the file about
107 * to be replaced is unlinked in order to not overwrite its
108 * content.
109 */
110 status = lttng_trace_chunk_unlink_file(trace_chunk,
111 stream_path);
112 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
113 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
114 stream_path);
115 /*
116 * Don't abort if the file doesn't exist, it is
117 * unexpected, but should not be a fatal error.
118 */
119 if (errno != ENOENT) {
120 ret = -1;
121 goto end;
122 }
123 }
124 }
125
348a81dc 126 status = lttng_trace_chunk_open_file(
c35f9726 127 trace_chunk, stream_path, flags, mode, &fd);
348a81dc
JG
128 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
129 ERR("Failed to open stream file \"%s\"", stream->channel_name);
130 ret = -1;
131 goto end;
132 }
133
c35f9726
JG
134 *out_stream_fd = stream_fd_create(fd);
135 if (!*out_stream_fd) {
348a81dc
JG
136 if (close(ret)) {
137 PERROR("Error closing stream file descriptor %d", ret);
138 }
139 ret = -1;
140 goto end;
141 }
142end:
143 return ret;
144}
145
c35f9726
JG
146static int stream_rotate_data_file(struct relay_stream *stream)
147{
148 int ret = 0;
149
150 DBG("Rotating stream %" PRIu64 " data file",
151 stream->stream_handle);
152
153 if (stream->stream_fd) {
154 stream_fd_put(stream->stream_fd);
155 stream->stream_fd = NULL;
156 }
157
158 stream->tracefile_wrapped_around = false;
159 stream->tracefile_current_index = 0;
160
161 if (stream->ongoing_rotation.value.next_trace_chunk) {
162 struct stream_fd *new_stream_fd = NULL;
163 enum lttng_trace_chunk_status chunk_status;
164
165 chunk_status = lttng_trace_chunk_create_subdirectory(
166 stream->ongoing_rotation.value.next_trace_chunk,
167 stream->path_name);
168 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
169 ret = -1;
170 goto end;
171 }
172
173 /* Rotate the data file. */
174 ret = stream_create_data_output_file_from_trace_chunk(stream,
175 stream->ongoing_rotation.value.next_trace_chunk,
176 false, &new_stream_fd);
177 stream->stream_fd = new_stream_fd;
178 if (ret < 0) {
179 ERR("Failed to rotate stream data file");
180 goto end;
181 }
182 }
183 stream->tracefile_size_current = 0;
184 stream->pos_after_last_complete_data_index = 0;
185 stream->ongoing_rotation.value.data_rotated = true;
186
187 if (stream->ongoing_rotation.value.index_rotated) {
188 /* Rotation completed; reset its state. */
189 stream_complete_rotation(stream);
190 }
191end:
192 return ret;
193}
194
f2aea36d
JG
195/*
196 * If too much data has been written in a tracefile before we received the
197 * rotation command, we have to move the excess data to the new tracefile and
198 * perform the rotation. This can happen because the control and data
199 * connections are separate, the indexes as well as the commands arrive from
200 * the control connection and we have no control over the order so we could be
201 * in a situation where too much data has been received on the data connection
202 * before the rotation command on the control connection arrives.
203 */
204static int rotate_truncate_stream(struct relay_stream *stream)
205{
206 int ret;
207 off_t lseek_ret, previous_stream_copy_origin;
208 uint64_t copy_bytes_left, misplaced_data_size;
209 bool acquired_reference;
210 struct stream_fd *previous_stream_fd = NULL;
211 struct lttng_trace_chunk *previous_chunk = NULL;
212
4147107d
JG
213 if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
214 ERR("Protocol error encoutered in %s(): stream rotation "
215 "sequence number is before the current sequence number "
216 "and the next trace chunk is unset. Honoring this "
217 "rotation command would result in data loss",
218 __FUNCTION__);
219 ret = -1;
220 goto end;
221 }
222
f2aea36d
JG
223 ASSERT_LOCKED(stream->lock);
224 /*
225 * Acquire a reference to the current trace chunk to ensure
226 * it is not reclaimed when `stream_rotate_data_file` is called.
227 * Failing to do so would violate the contract of the trace
228 * chunk API as an active file descriptor would outlive the
229 * trace chunk.
230 */
231 acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
232 assert(acquired_reference);
233 previous_chunk = stream->trace_chunk;
234
235 /*
236 * Steal the stream's reference to its stream_fd. A new
237 * stream_fd will be created when the rotation completes and
238 * the orinal stream_fd will be used to copy the "extra" data
239 * to the new file.
240 */
241 assert(stream->stream_fd);
242 previous_stream_fd = stream->stream_fd;
243 stream->stream_fd = NULL;
244
245 assert(!stream->is_metadata);
246 assert(stream->tracefile_size_current >
247 stream->pos_after_last_complete_data_index);
248 misplaced_data_size = stream->tracefile_size_current -
249 stream->pos_after_last_complete_data_index;
250 copy_bytes_left = misplaced_data_size;
251 previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
252
253 ret = stream_rotate_data_file(stream);
254 if (ret) {
255 goto end;
256 }
257
4147107d 258 assert(stream->stream_fd);
f2aea36d
JG
259 /*
260 * Seek the current tracefile to the position at which the rotation
261 * should have occurred.
262 */
263 lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
264 SEEK_SET);
265 if (lseek_ret < 0) {
266 PERROR("Failed to seek to offset %" PRIu64
267 " while copying extra data received before a stream rotation",
268 (uint64_t) previous_stream_copy_origin);
269 ret = -1;
270 goto end;
271 }
272
273 /* Move data from the old file to the new file. */
274 while (copy_bytes_left) {
275 ssize_t io_ret;
276 char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
277 const off_t copy_size_this_pass = min_t(
278 off_t, copy_bytes_left, sizeof(copy_buffer));
279
280 io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
281 copy_size_this_pass);
282 if (io_ret < (ssize_t) copy_size_this_pass) {
283 if (io_ret == -1) {
284 PERROR("Failed to read %" PRIu64
285 " bytes from fd %i in %s(), returned %zi",
286 copy_size_this_pass,
287 previous_stream_fd->fd,
288 __FUNCTION__, io_ret);
289 } else {
290 ERR("Failed to read %" PRIu64
291 " bytes from fd %i in %s(), returned %zi",
292 copy_size_this_pass,
293 previous_stream_fd->fd,
294 __FUNCTION__, io_ret);
295 }
296 ret = -1;
297 goto end;
298 }
299
300 io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
301 copy_size_this_pass);
302 if (io_ret < (ssize_t) copy_size_this_pass) {
303 if (io_ret == -1) {
304 PERROR("Failed to write %" PRIu64
305 " bytes from fd %i in %s(), returned %zi",
306 copy_size_this_pass,
307 stream->stream_fd->fd,
308 __FUNCTION__, io_ret);
309 } else {
310 ERR("Failed to write %" PRIu64
311 " bytes from fd %i in %s(), returned %zi",
312 copy_size_this_pass,
313 stream->stream_fd->fd,
314 __FUNCTION__, io_ret);
315 }
316 ret = -1;
317 goto end;
318 }
319 copy_bytes_left -= copy_size_this_pass;
320 }
321
322 /* Truncate the file to get rid of the excess data. */
323 ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
324 if (ret) {
325 PERROR("Failed to truncate current stream file to offset %" PRIu64,
326 previous_stream_copy_origin);
327 goto end;
328 }
329
330 /*
331 * Update the offset and FD of all the eventual indexes created by the
332 * data connection before the rotation command arrived.
333 */
334 ret = relay_index_switch_all_files(stream);
335 if (ret < 0) {
336 ERR("Failed to rotate index file");
337 goto end;
338 }
339
340 stream->tracefile_size_current = misplaced_data_size;
341 /* Index and data contents are back in sync. */
342 stream->pos_after_last_complete_data_index = 0;
343 ret = 0;
344end:
345 lttng_trace_chunk_put(previous_chunk);
346 stream_fd_put(previous_stream_fd);
347 return ret;
348}
349
c35f9726
JG
350/*
351 * Check if a stream's data file (as opposed to index) should be rotated
352 * (for session rotation).
353 * Must be called with the stream lock held.
354 *
355 * Return 0 on success, a negative value on error.
356 */
357static int try_rotate_stream_data(struct relay_stream *stream)
358{
359 int ret = 0;
360
361 if (caa_likely(!stream->ongoing_rotation.is_set)) {
362 /* No rotation expected. */
363 goto end;
364 }
365
366 if (stream->ongoing_rotation.value.data_rotated) {
367 /* Rotation of the data file has already occurred. */
368 goto end;
369 }
370
371 if (stream->prev_data_seq == -1ULL ||
372 stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
373 /*
374 * The next packet that will be written is not part of the next
375 * chunk yet.
376 */
377 DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
378 ", prev_data_seq = %" PRIu64 ")",
379 stream->stream_handle,
380 stream->ongoing_rotation.value.seq_num,
381 stream->prev_data_seq);
382 goto end;
383 } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
384 /*
385 * prev_data_seq is checked here since indexes and rotation
386 * commands are serialized with respect to each other.
387 */
388 DBG("Rotation after too much data has been written in tracefile "
389 "for stream %" PRIu64 ", need to truncate before "
390 "rotating", stream->stream_handle);
391 ret = rotate_truncate_stream(stream);
392 if (ret) {
393 ERR("Failed to truncate stream");
394 goto end;
395 }
396 } else {
397 ret = stream_rotate_data_file(stream);
398 }
399
400end:
401 return ret;
402}
403
404/*
405 * Close the current index file if it is open, and create a new one.
406 *
407 * Return 0 on success, -1 on error.
408 */
409static int create_index_file(struct relay_stream *stream,
410 struct lttng_trace_chunk *chunk)
411{
412 int ret;
413 uint32_t major, minor;
414 char *index_subpath = NULL;
3a735fa7 415 enum lttng_trace_chunk_status status;
c35f9726
JG
416
417 ASSERT_LOCKED(stream->lock);
418
419 /* Put ref on previous index_file. */
420 if (stream->index_file) {
421 lttng_index_file_put(stream->index_file);
422 stream->index_file = NULL;
423 }
424 major = stream->trace->session->major;
425 minor = stream->trace->session->minor;
426
427 if (!chunk) {
428 ret = 0;
429 goto end;
430 }
431 ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
432 DEFAULT_INDEX_DIR);
433 if (ret < 0) {
434 goto end;
435 }
436
3a735fa7 437 status = lttng_trace_chunk_create_subdirectory(chunk,
c35f9726
JG
438 index_subpath);
439 free(index_subpath);
3a735fa7
MD
440 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
441 ret = -1;
c35f9726
JG
442 goto end;
443 }
444 stream->index_file = lttng_index_file_create_from_trace_chunk(
445 chunk, stream->path_name,
446 stream->channel_name, stream->tracefile_size,
447 stream->tracefile_current_index,
448 lttng_to_index_major(major, minor),
449 lttng_to_index_minor(major, minor), true);
450 if (!stream->index_file) {
451 ret = -1;
452 goto end;
453 }
454
455 ret = 0;
456
457end:
458 return ret;
459}
460
461/*
462 * Check if a stream's index file should be rotated (for session rotation).
463 * Must be called with the stream lock held.
464 *
465 * Return 0 on success, a negative value on error.
466 */
467static int try_rotate_stream_index(struct relay_stream *stream)
468{
469 int ret = 0;
470
471 if (!stream->ongoing_rotation.is_set) {
472 /* No rotation expected. */
473 goto end;
474 }
475
476 if (stream->ongoing_rotation.value.index_rotated) {
477 /* Rotation of the index has already occurred. */
478 goto end;
479 }
480
481 if (stream->prev_index_seq == -1ULL ||
482 stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
483 DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
484 stream->stream_handle,
485 stream->ongoing_rotation.value.seq_num,
486 stream->prev_index_seq);
487 goto end;
488 } else {
489 /* The next index belongs to the new trace chunk; rotate. */
490 assert(stream->prev_index_seq + 1 ==
491 stream->ongoing_rotation.value.seq_num);
492 DBG("Rotating stream %" PRIu64 " index file",
493 stream->stream_handle);
494 ret = create_index_file(stream,
495 stream->ongoing_rotation.value.next_trace_chunk);
496 stream->ongoing_rotation.value.index_rotated = true;
497
498 if (stream->ongoing_rotation.value.data_rotated &&
499 stream->ongoing_rotation.value.index_rotated) {
500 /* Rotation completed; reset its state. */
501 DBG("Rotation completed for stream %" PRIu64,
502 stream->stream_handle);
503 stream_complete_rotation(stream);
504 }
505 }
506
507end:
508 return ret;
509}
510
348a81dc
JG
511static int stream_set_trace_chunk(struct relay_stream *stream,
512 struct lttng_trace_chunk *chunk)
513{
514 int ret = 0;
515 enum lttng_trace_chunk_status status;
516 bool acquired_reference;
c35f9726 517 struct stream_fd *new_stream_fd = NULL;
348a81dc 518
348a81dc
JG
519 status = lttng_trace_chunk_create_subdirectory(chunk,
520 stream->path_name);
521 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
522 ret = -1;
523 goto end;
524 }
525
526 lttng_trace_chunk_put(stream->trace_chunk);
527 acquired_reference = lttng_trace_chunk_get(chunk);
528 assert(acquired_reference);
529 stream->trace_chunk = chunk;
c35f9726
JG
530
531 if (stream->stream_fd) {
532 stream_fd_put(stream->stream_fd);
533 stream->stream_fd = NULL;
534 }
535 ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
536 false, &new_stream_fd);
537 stream->stream_fd = new_stream_fd;
348a81dc 538end:
348a81dc
JG
539 return ret;
540}
541
2a174661 542/*
7591bab1 543 * We keep ownership of path_name and channel_name.
2a174661 544 */
7591bab1
MD
545struct relay_stream *stream_create(struct ctf_trace *trace,
546 uint64_t stream_handle, char *path_name,
547 char *channel_name, uint64_t tracefile_size,
348a81dc 548 uint64_t tracefile_count)
2a174661 549{
7591bab1
MD
550 int ret;
551 struct relay_stream *stream = NULL;
552 struct relay_session *session = trace->session;
348a81dc
JG
553 bool acquired_reference = false;
554 struct lttng_trace_chunk *current_trace_chunk;
2a174661 555
7591bab1
MD
556 stream = zmalloc(sizeof(struct relay_stream));
557 if (stream == NULL) {
558 PERROR("relay stream zmalloc");
7591bab1
MD
559 goto error_no_alloc;
560 }
2a174661 561
7591bab1 562 stream->stream_handle = stream_handle;
a8f9f353 563 stream->prev_data_seq = -1ULL;
7a45c7e6 564 stream->prev_index_seq = -1ULL;
bda7c7b9 565 stream->last_net_seq_num = -1ULL;
7591bab1
MD
566 stream->ctf_stream_id = -1ULL;
567 stream->tracefile_size = tracefile_size;
568 stream->tracefile_count = tracefile_count;
569 stream->path_name = path_name;
570 stream->channel_name = channel_name;
2f9c3030 571 stream->beacon_ts_end = -1ULL;
7591bab1
MD
572 lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
573 pthread_mutex_init(&stream->lock, NULL);
7591bab1
MD
574 urcu_ref_init(&stream->ref);
575 ctf_trace_get(trace);
576 stream->trace = trace;
2a174661 577
348a81dc
JG
578 pthread_mutex_lock(&trace->session->lock);
579 current_trace_chunk = trace->session->current_trace_chunk;
580 if (current_trace_chunk) {
581 acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
582 }
583 pthread_mutex_unlock(&trace->session->lock);
584 if (!acquired_reference) {
585 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
586 channel_name);
7591bab1
MD
587 ret = -1;
588 goto end;
2a174661
DG
589 }
590
348a81dc
JG
591 stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
592 if (!stream->indexes_ht) {
593 ERR("Cannot created indexes_ht");
594 ret = -1;
7591bab1
MD
595 goto end;
596 }
2a174661 597
c35f9726 598 pthread_mutex_lock(&stream->lock);
348a81dc 599 ret = stream_set_trace_chunk(stream, current_trace_chunk);
c35f9726 600 pthread_mutex_unlock(&stream->lock);
348a81dc
JG
601 if (ret) {
602 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
603 trace->session->session_name,
604 stream->channel_name);
7591bab1
MD
605 ret = -1;
606 goto end;
2a174661 607 }
a44ca2ca
MD
608 stream->tfa = tracefile_array_create(stream->tracefile_count);
609 if (!stream->tfa) {
610 ret = -1;
611 goto end;
612 }
7591bab1 613
348a81dc
JG
614 stream->is_metadata = !strcmp(stream->channel_name,
615 DEFAULT_METADATA_NAME);
7591bab1
MD
616 stream->in_recv_list = true;
617
618 /*
619 * Add the stream in the recv list of the session. Once the end stream
620 * message is received, all session streams are published.
621 */
622 pthread_mutex_lock(&session->recv_list_lock);
623 cds_list_add_rcu(&stream->recv_node, &session->recv_list);
624 session->stream_count++;
625 pthread_mutex_unlock(&session->recv_list_lock);
626
627 /*
628 * Both in the ctf_trace object and the global stream ht since the data
629 * side of the relayd does not have the concept of session.
630 */
631 lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
77f7bd85 632 stream->in_stream_ht = true;
2a174661 633
7591bab1
MD
634 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
635 stream->stream_handle);
636 ret = 0;
637
638end:
639 if (ret) {
640 if (stream->stream_fd) {
641 stream_fd_put(stream->stream_fd);
642 stream->stream_fd = NULL;
2a174661 643 }
7591bab1
MD
644 stream_put(stream);
645 stream = NULL;
2a174661 646 }
317eadef
JG
647 if (acquired_reference) {
648 lttng_trace_chunk_put(current_trace_chunk);
649 }
7591bab1 650 return stream;
2a174661 651
7591bab1
MD
652error_no_alloc:
653 /*
654 * path_name and channel_name need to be freed explicitly here
655 * because we cannot rely on stream_put().
656 */
657 free(path_name);
658 free(channel_name);
659 return NULL;
660}
661
662/*
663 * Called with the session lock held.
664 */
665void stream_publish(struct relay_stream *stream)
666{
667 struct relay_session *session;
668
669 pthread_mutex_lock(&stream->lock);
670 if (stream->published) {
671 goto unlock;
2a174661
DG
672 }
673
7591bab1 674 session = stream->trace->session;
2a174661 675
7591bab1
MD
676 pthread_mutex_lock(&session->recv_list_lock);
677 if (stream->in_recv_list) {
678 cds_list_del_rcu(&stream->recv_node);
679 stream->in_recv_list = false;
680 }
681 pthread_mutex_unlock(&session->recv_list_lock);
2a174661 682
7591bab1
MD
683 pthread_mutex_lock(&stream->trace->stream_list_lock);
684 cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
685 pthread_mutex_unlock(&stream->trace->stream_list_lock);
2a174661 686
7591bab1
MD
687 stream->published = true;
688unlock:
2a174661 689 pthread_mutex_unlock(&stream->lock);
2a174661
DG
690}
691
7591bab1 692/*
77f7bd85 693 * Stream must be protected by holding the stream lock or by virtue of being
ce4d4083 694 * called from stream_destroy.
7591bab1
MD
695 */
696static void stream_unpublish(struct relay_stream *stream)
2a174661 697{
77f7bd85
MD
698 if (stream->in_stream_ht) {
699 struct lttng_ht_iter iter;
700 int ret;
701
702 iter.iter.node = &stream->node.node;
703 ret = lttng_ht_del(relay_streams_ht, &iter);
704 assert(!ret);
705 stream->in_stream_ht = false;
706 }
707 if (stream->published) {
708 pthread_mutex_lock(&stream->trace->stream_list_lock);
709 cds_list_del_rcu(&stream->stream_node);
710 pthread_mutex_unlock(&stream->trace->stream_list_lock);
711 stream->published = false;
7591bab1 712 }
7591bab1
MD
713}
714
715static void stream_destroy(struct relay_stream *stream)
716{
717 if (stream->indexes_ht) {
49e614cb
MD
718 /*
719 * Calling lttng_ht_destroy in call_rcu worker thread so
720 * we don't hold the RCU read-side lock while calling
721 * it.
722 */
7591bab1
MD
723 lttng_ht_destroy(stream->indexes_ht);
724 }
a44ca2ca
MD
725 if (stream->tfa) {
726 tracefile_array_destroy(stream->tfa);
727 }
7591bab1
MD
728 free(stream->path_name);
729 free(stream->channel_name);
730 free(stream);
731}
732
733static void stream_destroy_rcu(struct rcu_head *rcu_head)
734{
735 struct relay_stream *stream =
736 caa_container_of(rcu_head, struct relay_stream, rcu_node);
737
738 stream_destroy(stream);
739}
740
741/*
742 * No need to take stream->lock since this is only called on the final
743 * stream_put which ensures that a single thread may act on the stream.
7591bab1
MD
744 */
745static void stream_release(struct urcu_ref *ref)
746{
747 struct relay_stream *stream =
748 caa_container_of(ref, struct relay_stream, ref);
749 struct relay_session *session;
2a174661 750
7591bab1
MD
751 session = stream->trace->session;
752
753 DBG("Releasing stream id %" PRIu64, stream->stream_handle);
754
755 pthread_mutex_lock(&session->recv_list_lock);
756 session->stream_count--;
757 if (stream->in_recv_list) {
758 cds_list_del_rcu(&stream->recv_node);
759 stream->in_recv_list = false;
760 }
761 pthread_mutex_unlock(&session->recv_list_lock);
2a174661 762
7591bab1
MD
763 stream_unpublish(stream);
764
765 if (stream->stream_fd) {
766 stream_fd_put(stream->stream_fd);
767 stream->stream_fd = NULL;
768 }
f8f3885c
MD
769 if (stream->index_file) {
770 lttng_index_file_put(stream->index_file);
771 stream->index_file = NULL;
7591bab1
MD
772 }
773 if (stream->trace) {
774 ctf_trace_put(stream->trace);
775 stream->trace = NULL;
776 }
c35f9726 777 stream_complete_rotation(stream);
348a81dc
JG
778 lttng_trace_chunk_put(stream->trace_chunk);
779 stream->trace_chunk = NULL;
7591bab1
MD
780
781 call_rcu(&stream->rcu_node, stream_destroy_rcu);
2a174661
DG
782}
783
7591bab1 784void stream_put(struct relay_stream *stream)
2a174661 785{
7591bab1 786 rcu_read_lock();
7591bab1
MD
787 assert(stream->ref.refcount != 0);
788 /*
789 * Wait until we have processed all the stream packets before
790 * actually putting our last stream reference.
791 */
7591bab1 792 urcu_ref_put(&stream->ref, stream_release);
7591bab1
MD
793 rcu_read_unlock();
794}
795
c35f9726
JG
796int stream_set_pending_rotation(struct relay_stream *stream,
797 struct lttng_trace_chunk *next_trace_chunk,
798 uint64_t rotation_sequence_number)
799{
800 int ret = 0;
801 const struct relay_stream_rotation rotation = {
802 .seq_num = rotation_sequence_number,
803 .next_trace_chunk = next_trace_chunk,
804 };
805
806 if (stream->ongoing_rotation.is_set) {
807 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
808 ret = -1;
809 goto end;
810 }
811
812 if (next_trace_chunk) {
813 const bool reference_acquired =
814 lttng_trace_chunk_get(next_trace_chunk);
815
816 assert(reference_acquired);
817 }
818 LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
819
820 DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
821 stream->stream_handle, rotation_sequence_number);
822 if (stream->is_metadata) {
823 /*
824 * A metadata stream has no index; consider it already rotated.
825 */
826 stream->ongoing_rotation.value.index_rotated = true;
827 ret = stream_rotate_data_file(stream);
828 } else {
829 ret = try_rotate_stream_data(stream);
830 if (ret < 0) {
831 goto end;
832 }
833
834 ret = try_rotate_stream_index(stream);
835 if (ret < 0) {
836 goto end;
837 }
838 }
839end:
840 return ret;
841}
842
bda7c7b9 843void try_stream_close(struct relay_stream *stream)
7591bab1 844{
98ba050e
JR
845 bool session_aborted;
846 struct relay_session *session = stream->trace->session;
847
bda7c7b9 848 DBG("Trying to close stream %" PRIu64, stream->stream_handle);
98ba050e
JR
849
850 pthread_mutex_lock(&session->lock);
851 session_aborted = session->aborted;
852 pthread_mutex_unlock(&session->lock);
853
7591bab1 854 pthread_mutex_lock(&stream->lock);
bda7c7b9
JG
855 /*
856 * Can be called concurently by connection close and reception of last
857 * pending data.
858 */
859 if (stream->closed) {
860 pthread_mutex_unlock(&stream->lock);
861 DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
862 return;
863 }
864
865 stream->close_requested = true;
3d07a857
MD
866
867 if (stream->last_net_seq_num == -1ULL) {
868 /*
869 * Handle connection close without explicit stream close
870 * command.
871 *
872 * We can be clever about indexes partially received in
873 * cases where we received the data socket part, but not
874 * the control socket part: since we're currently closing
875 * the stream on behalf of the control socket, we *know*
876 * there won't be any more control information for this
877 * socket. Therefore, we can destroy all indexes for
878 * which we have received only the file descriptor (from
879 * data socket). This takes care of consumerd crashes
880 * between sending the data and control information for
881 * a packet. Since those are sent in that order, we take
882 * care of consumerd crashes.
883 */
5312a3ed 884 DBG("relay_index_close_partial_fd");
3d07a857
MD
885 relay_index_close_partial_fd(stream);
886 /*
887 * Use the highest net_seq_num we currently have pending
888 * As end of stream indicator. Leave last_net_seq_num
889 * at -1ULL if we cannot find any index.
890 */
891 stream->last_net_seq_num = relay_index_find_last(stream);
5312a3ed 892 DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
3d07a857
MD
893 /* Fall-through into the next check. */
894 }
895
bda7c7b9 896 if (stream->last_net_seq_num != -1ULL &&
a8f9f353 897 ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
98ba050e 898 && !session_aborted) {
3d07a857
MD
899 /*
900 * Don't close since we still have data pending. This
901 * handles cases where an explicit close command has
902 * been received for this stream, and cases where the
903 * connection has been closed, and we are awaiting for
904 * index information from the data socket. It is
905 * therefore expected that all the index fd information
906 * we need has already been received on the control
907 * socket. Matching index information from data socket
908 * should be Expected Soon(TM).
909 *
910 * TODO: We should implement a timer to garbage collect
911 * streams after a timeout to be resilient against a
912 * consumerd implementation that would not match this
913 * expected behavior.
914 */
bda7c7b9
JG
915 pthread_mutex_unlock(&stream->lock);
916 DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
917 return;
918 }
3d07a857
MD
919 /*
920 * We received all the indexes we can expect.
921 */
77f7bd85 922 stream_unpublish(stream);
2229a09c 923 stream->closed = true;
bda7c7b9 924 /* Relay indexes are only used by the "consumer/sessiond" end. */
7591bab1
MD
925 relay_index_close_all(stream);
926 pthread_mutex_unlock(&stream->lock);
bda7c7b9 927 DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
7591bab1
MD
928 stream_put(stream);
929}
930
c35f9726
JG
931int stream_init_packet(struct relay_stream *stream, size_t packet_size,
932 bool *file_rotated)
933{
934 int ret = 0;
935
936 ASSERT_LOCKED(stream->lock);
937 if (caa_likely(stream->tracefile_size == 0)) {
938 /* No size limit set; nothing to check. */
939 goto end;
940 }
941
942 /*
943 * Check if writing the new packet would exceed the maximal file size.
944 */
945 if (caa_unlikely((stream->tracefile_size_current + packet_size) >
946 stream->tracefile_size)) {
947 const uint64_t new_file_index =
948 (stream->tracefile_current_index + 1) %
949 stream->tracefile_count;
950
951 if (new_file_index < stream->tracefile_current_index) {
952 stream->tracefile_wrapped_around = true;
953 }
954 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
955 ", current_file_size = %" PRIu64
76ee9245 956 ", packet_size = %zu, current_file_index = %" PRIu64
c35f9726
JG
957 " new_file_index = %" PRIu64,
958 stream->stream_handle,
959 stream->tracefile_size_current, packet_size,
960 stream->tracefile_current_index, new_file_index);
78118e3b 961 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
c35f9726
JG
962 stream->tracefile_current_index = new_file_index;
963
964 if (stream->stream_fd) {
965 stream_fd_put(stream->stream_fd);
966 stream->stream_fd = NULL;
967 }
968 ret = stream_create_data_output_file_from_trace_chunk(stream,
969 stream->trace_chunk, false, &stream->stream_fd);
970 if (ret) {
971 ERR("Failed to perform trace file rotation of stream %" PRIu64,
972 stream->stream_handle);
973 goto end;
974 }
975
976 /*
977 * Reset current size because we just performed a stream
978 * rotation.
979 */
980 stream->tracefile_size_current = 0;
981 *file_rotated = true;
982 } else {
983 *file_rotated = false;
984 }
985end:
986 return ret;
987}
988
989/* Note that the packet is not necessarily complete. */
990int stream_write(struct relay_stream *stream,
991 const struct lttng_buffer_view *packet, size_t padding_len)
992{
993 int ret = 0;
994 ssize_t write_ret;
995 size_t padding_to_write = padding_len;
996 char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
997
998 ASSERT_LOCKED(stream->lock);
999 memset(padding_buffer, 0,
1000 min(sizeof(padding_buffer), padding_to_write));
1001
1002 if (packet) {
1003 write_ret = lttng_write(stream->stream_fd->fd,
1004 packet->data, packet->size);
1005 if (write_ret != packet->size) {
1006 PERROR("Failed to write to stream file of %sstream %" PRIu64,
1007 stream->is_metadata ? "metadata " : "",
1008 stream->stream_handle);
1009 ret = -1;
1010 goto end;
1011 }
1012 }
1013
1014 while (padding_to_write > 0) {
1015 const size_t padding_to_write_this_pass =
1016 min(padding_to_write, sizeof(padding_buffer));
1017
1018 write_ret = lttng_write(stream->stream_fd->fd,
1019 padding_buffer, padding_to_write_this_pass);
1020 if (write_ret != padding_to_write_this_pass) {
1021 PERROR("Failed to write padding to file of %sstream %" PRIu64,
1022 stream->is_metadata ? "metadata " : "",
1023 stream->stream_handle);
1024 ret = -1;
1025 goto end;
1026 }
1027 padding_to_write -= padding_to_write_this_pass;
1028 }
1029
1030 if (stream->is_metadata) {
014e956d
JG
1031 stream->metadata_received += packet ? packet->size : 0;
1032 stream->metadata_received += padding_len;
c35f9726
JG
1033 }
1034
8c865d87 1035 DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
c35f9726
JG
1036 stream->is_metadata ? "metadata " : "",
1037 stream->stream_handle,
8c865d87 1038 packet ? packet->size : (size_t) 0, padding_len);
c35f9726
JG
1039end:
1040 return ret;
1041}
1042
1043/*
1044 * Update index after receiving a packet for a data stream.
1045 *
1046 * Called with the stream lock held.
1047 *
1048 * Return 0 on success else a negative value.
1049 */
1050int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
1051 bool rotate_index, bool *flushed, uint64_t total_size)
1052{
1053 int ret = 0;
1054 uint64_t data_offset;
1055 struct relay_index *index;
1056
92761772 1057 assert(stream->trace_chunk);
c35f9726
JG
1058 ASSERT_LOCKED(stream->lock);
1059 /* Get data offset because we are about to update the index. */
1060 data_offset = htobe64(stream->tracefile_size_current);
1061
1062 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
1063 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
1064
1065 /*
1066 * Lookup for an existing index for that stream id/sequence
1067 * number. If it exists, the control thread has already received the
1068 * data for it, thus we need to write it to disk.
1069 */
1070 index = relay_index_get_by_id_or_create(stream, net_seq_num);
1071 if (!index) {
1072 ret = -1;
1073 goto end;
1074 }
1075
1076 if (rotate_index || !stream->index_file) {
1077 ret = create_index_file(stream, stream->trace_chunk);
1078 if (ret) {
1079 ERR("Failed to create index file for stream %" PRIu64,
1080 stream->stream_handle);
1081 /* Put self-ref for this index due to error. */
1082 relay_index_put(index);
1083 index = NULL;
1084 goto end;
1085 }
1086 }
1087
1088 if (relay_index_set_file(index, stream->index_file, data_offset)) {
1089 ret = -1;
1090 /* Put self-ref for this index due to error. */
1091 relay_index_put(index);
1092 index = NULL;
1093 goto end;
1094 }
1095
1096 ret = relay_index_try_flush(index);
1097 if (ret == 0) {
78118e3b 1098 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
c35f9726
JG
1099 tracefile_array_commit_seq(stream->tfa);
1100 stream->index_received_seqcount++;
1101 *flushed = true;
1102 } else if (ret > 0) {
1103 index->total_size = total_size;
1104 /* No flush. */
1105 ret = 0;
1106 } else {
1107 /*
1108 * ret < 0
1109 *
1110 * relay_index_try_flush is responsible for the self-reference
1111 * put of the index object on error.
1112 */
1113 ERR("relay_index_try_flush error %d", ret);
1114 ret = -1;
1115 }
1116end:
1117 return ret;
1118}
1119
1120int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
1121 uint64_t sequence_number, bool index_flushed)
1122{
1123 int ret = 0;
1124
1125 ASSERT_LOCKED(stream->lock);
1126
1127 stream->tracefile_size_current += packet_total_size;
1128 if (index_flushed) {
1129 stream->pos_after_last_complete_data_index =
1130 stream->tracefile_size_current;
1131 stream->prev_index_seq = sequence_number;
1132 ret = try_rotate_stream_index(stream);
1133 if (ret < 0) {
1134 goto end;
1135 }
1136 }
1137
1138 stream->prev_data_seq = sequence_number;
1139 ret = try_rotate_stream_data(stream);
d2ec9b88 1140
c35f9726
JG
1141end:
1142 return ret;
1143}
1144
1145int stream_add_index(struct relay_stream *stream,
1146 const struct lttcomm_relayd_index *index_info)
1147{
1148 int ret = 0;
1149 struct relay_index *index;
1150
1151 ASSERT_LOCKED(stream->lock);
1152
1153 /* Live beacon handling */
1154 if (index_info->packet_size == 0) {
1155 DBG("Received live beacon for stream %" PRIu64,
1156 stream->stream_handle);
1157
1158 /*
1159 * Only flag a stream inactive when it has already
1160 * received data and no indexes are in flight.
1161 */
1162 if (stream->index_received_seqcount > 0
1163 && stream->indexes_in_flight == 0) {
1164 stream->beacon_ts_end = index_info->timestamp_end;
1165 }
1166 ret = 0;
1167 goto end;
1168 } else {
1169 stream->beacon_ts_end = -1ULL;
1170 }
1171
1172 if (stream->ctf_stream_id == -1ULL) {
1173 stream->ctf_stream_id = index_info->stream_id;
1174 }
1175
1176 index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
1177 if (!index) {
1178 ret = -1;
1179 ERR("Failed to get or create index %" PRIu64,
1180 index_info->net_seq_num);
1181 goto end;
1182 }
1183 if (relay_index_set_control_data(index, index_info,
1184 stream->trace->session->minor)) {
1185 ERR("set_index_control_data error");
1186 relay_index_put(index);
1187 ret = -1;
1188 goto end;
1189 }
1190 ret = relay_index_try_flush(index);
1191 if (ret == 0) {
78118e3b 1192 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
c35f9726
JG
1193 tracefile_array_commit_seq(stream->tfa);
1194 stream->index_received_seqcount++;
1195 stream->pos_after_last_complete_data_index += index->total_size;
1196 stream->prev_index_seq = index_info->net_seq_num;
1197
1198 ret = try_rotate_stream_index(stream);
1199 if (ret < 0) {
1200 goto end;
1201 }
1202 } else if (ret > 0) {
1203 /* no flush. */
1204 ret = 0;
1205 } else {
1206 /*
1207 * ret < 0
1208 *
1209 * relay_index_try_flush is responsible for the self-reference
1210 * put of the index object on error.
1211 */
1212 ERR("relay_index_try_flush error %d", ret);
1213 ret = -1;
1214 }
1215end:
1216 return ret;
1217}
1218
da412cde
MD
1219static void print_stream_indexes(struct relay_stream *stream)
1220{
1221 struct lttng_ht_iter iter;
1222 struct relay_index *index;
1223
1224 rcu_read_lock();
1225 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
1226 index_n.node) {
1227 DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
1228 " stream %" PRIu64 " trace %" PRIu64
1229 " session %" PRIu64,
1230 index,
1231 index->index_n.key,
1232 stream->ref.refcount,
1233 index->stream->stream_handle,
1234 index->stream->trace->id,
1235 index->stream->trace->session->id);
1236 }
1237 rcu_read_unlock();
1238}
1239
c35f9726
JG
1240int stream_reset_file(struct relay_stream *stream)
1241{
1242 ASSERT_LOCKED(stream->lock);
1243
1244 if (stream->stream_fd) {
1245 stream_fd_put(stream->stream_fd);
1246 stream->stream_fd = NULL;
1247 }
1248
1249 stream->tracefile_size_current = 0;
1250 stream->prev_data_seq = 0;
1251 stream->prev_index_seq = 0;
1252 /* Note that this does not reset the tracefile array. */
1253 stream->tracefile_current_index = 0;
1254 stream->pos_after_last_complete_data_index = 0;
1255
1256 return stream_create_data_output_file_from_trace_chunk(stream,
1257 stream->trace_chunk, true, &stream->stream_fd);
1258}
1259
7591bab1
MD
1260void print_relay_streams(void)
1261{
1262 struct lttng_ht_iter iter;
1263 struct relay_stream *stream;
1264
ce3f3ba3
JG
1265 if (!relay_streams_ht) {
1266 return;
1267 }
1268
7591bab1
MD
1269 rcu_read_lock();
1270 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
1271 node.node) {
1272 if (!stream_get(stream)) {
1273 continue;
1274 }
1275 DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
1276 " session %" PRIu64,
1277 stream,
1278 stream->ref.refcount,
1279 stream->stream_handle,
1280 stream->trace->id,
1281 stream->trace->session->id);
da412cde 1282 print_stream_indexes(stream);
7591bab1
MD
1283 stream_put(stream);
1284 }
1285 rcu_read_unlock();
2a174661 1286}
This page took 0.095289 seconds and 4 git commands to generate.