docs: Add supported versions and fix-backport policy
[lttng-tools.git] / src / bin / lttng-relayd / stream.cpp
1 /*
2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * Copyright (C) 2019 Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 *
7 * SPDX-License-Identifier: GPL-2.0-only
8 *
9 */
10
11 #define _LGPL_SOURCE
12 #include <algorithm>
13 #include <common/common.hpp>
14 #include <common/defaults.hpp>
15 #include <common/fs-handle.hpp>
16 #include <common/sessiond-comm/relayd.hpp>
17 #include <common/utils.hpp>
18 #include <sys/stat.h>
19 #include <urcu/rculist.h>
20
21 #include "lttng-relayd.hpp"
22 #include "index.hpp"
23 #include "stream.hpp"
24 #include "viewer-stream.hpp"
25
26 #include <sys/types.h>
27 #include <fcntl.h>
28
29 #define FILE_IO_STACK_BUFFER_SIZE 65536
30
31 /* Should be called with RCU read-side lock held. */
32 bool stream_get(struct relay_stream *stream)
33 {
34 ASSERT_RCU_READ_LOCKED();
35
36 return urcu_ref_get_unless_zero(&stream->ref);
37 }
38
39 /*
40 * Get stream from stream id from the streams hash table. Return stream
41 * if found else NULL. A stream reference is taken when a stream is
42 * returned. stream_put() must be called on that stream.
43 */
44 struct relay_stream *stream_get_by_id(uint64_t stream_id)
45 {
46 struct lttng_ht_node_u64 *node;
47 struct lttng_ht_iter iter;
48 struct relay_stream *stream = NULL;
49
50 rcu_read_lock();
51 lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
52 node = lttng_ht_iter_get_node_u64(&iter);
53 if (!node) {
54 DBG("Relay stream %" PRIu64 " not found", stream_id);
55 goto end;
56 }
57 stream = caa_container_of(node, struct relay_stream, node);
58 if (!stream_get(stream)) {
59 stream = NULL;
60 }
61 end:
62 rcu_read_unlock();
63 return stream;
64 }
65
66 static void stream_complete_rotation(struct relay_stream *stream)
67 {
68 DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
69 if (stream->ongoing_rotation.value.next_trace_chunk) {
70 tracefile_array_reset(stream->tfa);
71 tracefile_array_commit_seq(stream->tfa,
72 stream->index_received_seqcount);
73 }
74 lttng_trace_chunk_put(stream->trace_chunk);
75 stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
76 stream->ongoing_rotation = LTTNG_OPTIONAL_INIT_UNSET;
77 stream->completed_rotation_count++;
78 }
79
80 static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream *stream,
82 struct lttng_trace_chunk *trace_chunk,
83 bool force_unlink,
84 struct fs_handle **out_file)
85 {
86 int ret;
87 char stream_path[LTTNG_PATH_MAX];
88 enum lttng_trace_chunk_status status;
89 const int flags = O_RDWR | O_CREAT | O_TRUNC;
90 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
91
92 ASSERT_LOCKED(stream->lock);
93
94 ret = utils_stream_file_path(stream->path_name, stream->channel_name,
95 stream->tracefile_size, stream->tracefile_current_index,
96 NULL, stream_path, sizeof(stream_path));
97 if (ret < 0) {
98 goto end;
99 }
100
101 if (stream->tracefile_wrapped_around || force_unlink) {
102 /*
103 * The on-disk ring-buffer has wrapped around.
104 * Newly created stream files will replace existing files. Since
105 * live clients may be consuming existing files, the file about
106 * to be replaced is unlinked in order to not overwrite its
107 * content.
108 */
109 status = (lttng_trace_chunk_status) lttng_trace_chunk_unlink_file(trace_chunk,
110 stream_path);
111 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
112 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
113 stream_path);
114 /*
115 * Don't abort if the file doesn't exist, it is
116 * unexpected, but should not be a fatal error.
117 */
118 if (errno != ENOENT) {
119 ret = -1;
120 goto end;
121 }
122 }
123 }
124
125 status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
126 flags, mode, out_file, false);
127 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
128 ERR("Failed to open stream file \"%s\"", stream->channel_name);
129 ret = -1;
130 goto end;
131 }
132 end:
133 return ret;
134 }
135
136 static int stream_rotate_data_file(struct relay_stream *stream)
137 {
138 int ret = 0;
139
140 DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
141 stream->stream_handle, stream->tracefile_size_current);
142
143 if (stream->file) {
144 fs_handle_close(stream->file);
145 stream->file = NULL;
146 }
147
148 stream->tracefile_wrapped_around = false;
149 stream->tracefile_current_index = 0;
150
151 if (stream->ongoing_rotation.value.next_trace_chunk) {
152 enum lttng_trace_chunk_status chunk_status;
153
154 chunk_status = lttng_trace_chunk_create_subdirectory(
155 stream->ongoing_rotation.value.next_trace_chunk,
156 stream->path_name);
157 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
158 ret = -1;
159 goto end;
160 }
161
162 /* Rotate the data file. */
163 ret = stream_create_data_output_file_from_trace_chunk(stream,
164 stream->ongoing_rotation.value.next_trace_chunk,
165 false, &stream->file);
166 if (ret < 0) {
167 ERR("Failed to rotate stream data file");
168 goto end;
169 }
170 }
171 DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
172 __func__, stream->stream_handle, stream->tracefile_size_current);
173 stream->tracefile_size_current = 0;
174 stream->pos_after_last_complete_data_index = 0;
175 stream->ongoing_rotation.value.data_rotated = true;
176
177 if (stream->ongoing_rotation.value.index_rotated) {
178 /* Rotation completed; reset its state. */
179 stream_complete_rotation(stream);
180 }
181 end:
182 return ret;
183 }
184
185 /*
186 * If too much data has been written in a tracefile before we received the
187 * rotation command, we have to move the excess data to the new tracefile and
188 * perform the rotation. This can happen because the control and data
189 * connections are separate, the indexes as well as the commands arrive from
190 * the control connection and we have no control over the order so we could be
191 * in a situation where too much data has been received on the data connection
192 * before the rotation command on the control connection arrives.
193 */
194 static int rotate_truncate_stream(struct relay_stream *stream)
195 {
196 int ret;
197 off_t lseek_ret, previous_stream_copy_origin;
198 uint64_t copy_bytes_left, misplaced_data_size;
199 bool acquired_reference;
200 struct fs_handle *previous_stream_file = NULL;
201 struct lttng_trace_chunk *previous_chunk = NULL;
202
203 if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
204 ERR("Protocol error encoutered in %s(): stream rotation "
205 "sequence number is before the current sequence number "
206 "and the next trace chunk is unset. Honoring this "
207 "rotation command would result in data loss",
208 __FUNCTION__);
209 ret = -1;
210 goto end;
211 }
212
213 ASSERT_LOCKED(stream->lock);
214 /*
215 * Acquire a reference to the current trace chunk to ensure
216 * it is not reclaimed when `stream_rotate_data_file` is called.
217 * Failing to do so would violate the contract of the trace
218 * chunk API as an active file descriptor would outlive the
219 * trace chunk.
220 */
221 acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
222 LTTNG_ASSERT(acquired_reference);
223 previous_chunk = stream->trace_chunk;
224
225 /*
226 * Steal the stream's reference to its stream_fd. A new
227 * stream_fd will be created when the rotation completes and
228 * the orinal stream_fd will be used to copy the "extra" data
229 * to the new file.
230 */
231 LTTNG_ASSERT(stream->file);
232 previous_stream_file = stream->file;
233 stream->file = NULL;
234
235 LTTNG_ASSERT(!stream->is_metadata);
236 LTTNG_ASSERT(stream->tracefile_size_current >
237 stream->pos_after_last_complete_data_index);
238 misplaced_data_size = stream->tracefile_size_current -
239 stream->pos_after_last_complete_data_index;
240 copy_bytes_left = misplaced_data_size;
241 previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
242
243 ret = stream_rotate_data_file(stream);
244 if (ret) {
245 goto end;
246 }
247
248 LTTNG_ASSERT(stream->file);
249 /*
250 * Seek the current tracefile to the position at which the rotation
251 * should have occurred.
252 */
253 lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
254 if (lseek_ret < 0) {
255 PERROR("Failed to seek to offset %" PRIu64
256 " while copying extra data received before a stream rotation",
257 (uint64_t) previous_stream_copy_origin);
258 ret = -1;
259 goto end;
260 }
261
262 /* Move data from the old file to the new file. */
263 while (copy_bytes_left) {
264 ssize_t io_ret;
265 char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
266 const off_t copy_size_this_pass = std::min<uint64_t>(copy_bytes_left, sizeof(copy_buffer));
267
268 io_ret = fs_handle_read(previous_stream_file, copy_buffer,
269 copy_size_this_pass);
270 if (io_ret < (ssize_t) copy_size_this_pass) {
271 if (io_ret == -1) {
272 PERROR("Failed to read %" PRIu64
273 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
274 copy_size_this_pass,
275 __FUNCTION__, io_ret,
276 stream->stream_handle);
277 } else {
278 ERR("Failed to read %" PRIu64
279 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
280 copy_size_this_pass,
281 __FUNCTION__, io_ret,
282 stream->stream_handle);
283 }
284 ret = -1;
285 goto end;
286 }
287
288 io_ret = fs_handle_write(
289 stream->file, copy_buffer, copy_size_this_pass);
290 if (io_ret < (ssize_t) copy_size_this_pass) {
291 if (io_ret == -1) {
292 PERROR("Failed to write %" PRIu64
293 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
294 copy_size_this_pass,
295 __FUNCTION__, io_ret,
296 stream->stream_handle);
297 } else {
298 ERR("Failed to write %" PRIu64
299 " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
300 copy_size_this_pass,
301 __FUNCTION__, io_ret,
302 stream->stream_handle);
303 }
304 ret = -1;
305 goto end;
306 }
307 copy_bytes_left -= copy_size_this_pass;
308 }
309
310 /* Truncate the file to get rid of the excess data. */
311 ret = fs_handle_truncate(
312 previous_stream_file, previous_stream_copy_origin);
313 if (ret) {
314 PERROR("Failed to truncate current stream file to offset %" PRIu64,
315 previous_stream_copy_origin);
316 goto end;
317 }
318
319 /*
320 * Update the offset and FD of all the eventual indexes created by the
321 * data connection before the rotation command arrived.
322 */
323 ret = relay_index_switch_all_files(stream);
324 if (ret < 0) {
325 ERR("Failed to rotate index file");
326 goto end;
327 }
328
329 stream->tracefile_size_current = misplaced_data_size;
330 /* Index and data contents are back in sync. */
331 stream->pos_after_last_complete_data_index = 0;
332 ret = 0;
333 end:
334 lttng_trace_chunk_put(previous_chunk);
335 return ret;
336 }
337
338 /*
339 * Check if a stream's data file (as opposed to index) should be rotated
340 * (for session rotation).
341 * Must be called with the stream lock held.
342 *
343 * Return 0 on success, a negative value on error.
344 */
345 static int try_rotate_stream_data(struct relay_stream *stream)
346 {
347 int ret = 0;
348
349 if (caa_likely(!stream->ongoing_rotation.is_set)) {
350 /* No rotation expected. */
351 goto end;
352 }
353
354 if (stream->ongoing_rotation.value.data_rotated) {
355 /* Rotation of the data file has already occurred. */
356 goto end;
357 }
358
359 DBG("%s: Stream %" PRIu64
360 " (rotate_at_index_packet_seq_num = %" PRIu64
361 ", rotate_at_prev_data_net_seq = %" PRIu64
362 ", prev_data_seq = %" PRIu64 ")",
363 __func__, stream->stream_handle,
364 stream->ongoing_rotation.value.packet_seq_num,
365 stream->ongoing_rotation.value.prev_data_net_seq,
366 stream->prev_data_seq);
367
368 if (stream->prev_data_seq == -1ULL ||
369 stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
370 stream->prev_data_seq <
371 stream->ongoing_rotation.value.prev_data_net_seq) {
372 /*
373 * The next packet that will be written is not part of the next
374 * chunk yet.
375 */
376 DBG("Stream %" PRIu64 " data not yet ready for rotation "
377 "(rotate_at_index_packet_seq_num = %" PRIu64
378 ", rotate_at_prev_data_net_seq = %" PRIu64
379 ", prev_data_seq = %" PRIu64 ")",
380 stream->stream_handle,
381 stream->ongoing_rotation.value.packet_seq_num,
382 stream->ongoing_rotation.value.prev_data_net_seq,
383 stream->prev_data_seq);
384 goto end;
385 } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
386 /*
387 * prev_data_seq is checked here since indexes and rotation
388 * commands are serialized with respect to each other.
389 */
390 DBG("Rotation after too much data has been written in tracefile "
391 "for stream %" PRIu64 ", need to truncate before "
392 "rotating", stream->stream_handle);
393 ret = rotate_truncate_stream(stream);
394 if (ret) {
395 ERR("Failed to truncate stream");
396 goto end;
397 }
398 } else {
399 ret = stream_rotate_data_file(stream);
400 }
401
402 end:
403 return ret;
404 }
405
406 /*
407 * Close the current index file if it is open, and create a new one.
408 *
409 * Return 0 on success, -1 on error.
410 */
411 static int create_index_file(struct relay_stream *stream,
412 struct lttng_trace_chunk *chunk)
413 {
414 int ret;
415 uint32_t major, minor;
416 char *index_subpath = NULL;
417 enum lttng_trace_chunk_status status;
418
419 ASSERT_LOCKED(stream->lock);
420
421 /* Put ref on previous index_file. */
422 if (stream->index_file) {
423 lttng_index_file_put(stream->index_file);
424 stream->index_file = NULL;
425 }
426 major = stream->trace->session->major;
427 minor = stream->trace->session->minor;
428
429 if (!chunk) {
430 ret = 0;
431 goto end;
432 }
433 ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
434 DEFAULT_INDEX_DIR);
435 if (ret < 0) {
436 goto end;
437 }
438
439 status = lttng_trace_chunk_create_subdirectory(chunk,
440 index_subpath);
441 free(index_subpath);
442 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
443 ret = -1;
444 goto end;
445 }
446 status = lttng_index_file_create_from_trace_chunk(
447 chunk, stream->path_name,
448 stream->channel_name, stream->tracefile_size,
449 stream->tracefile_current_index,
450 lttng_to_index_major(major, minor),
451 lttng_to_index_minor(major, minor), true,
452 &stream->index_file);
453 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
454 ret = -1;
455 goto end;
456 }
457
458 ret = 0;
459
460 end:
461 return ret;
462 }
463
464 /*
465 * Check if a stream's index file should be rotated (for session rotation).
466 * Must be called with the stream lock held.
467 *
468 * Return 0 on success, a negative value on error.
469 */
470 static int try_rotate_stream_index(struct relay_stream *stream)
471 {
472 int ret = 0;
473
474 if (!stream->ongoing_rotation.is_set) {
475 /* No rotation expected. */
476 goto end;
477 }
478
479 if (stream->ongoing_rotation.value.index_rotated) {
480 /* Rotation of the index has already occurred. */
481 goto end;
482 }
483
484 DBG("%s: Stream %" PRIu64
485 " (rotate_at_packet_seq_num = %" PRIu64
486 ", received_packet_seq_num = "
487 "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
488 __func__, stream->stream_handle,
489 stream->ongoing_rotation.value.packet_seq_num,
490 stream->received_packet_seq_num.value,
491 stream->received_packet_seq_num.is_set);
492
493 if (!stream->received_packet_seq_num.is_set ||
494 LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
495 stream->ongoing_rotation.value.packet_seq_num) {
496 DBG("Stream %" PRIu64 " index not yet ready for rotation "
497 "(rotate_at_packet_seq_num = %" PRIu64
498 ", received_packet_seq_num = "
499 "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
500 stream->stream_handle,
501 stream->ongoing_rotation.value.packet_seq_num,
502 stream->received_packet_seq_num.value,
503 stream->received_packet_seq_num.is_set);
504 goto end;
505 } else {
506 /*
507 * The next index belongs to the new trace chunk; rotate.
508 * In overwrite mode, the packet seq num may jump over the
509 * rotation position.
510 */
511 LTTNG_ASSERT(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
512 stream->ongoing_rotation.value.packet_seq_num);
513 DBG("Rotating stream %" PRIu64 " index file",
514 stream->stream_handle);
515 if (stream->index_file) {
516 lttng_index_file_put(stream->index_file);
517 stream->index_file = NULL;
518 }
519 stream->ongoing_rotation.value.index_rotated = true;
520
521 /*
522 * Set the rotation pivot position for the data, now that we have the
523 * net_seq_num matching the packet_seq_num index pivot position.
524 */
525 stream->ongoing_rotation.value.prev_data_net_seq =
526 stream->prev_index_seq;
527 if (stream->ongoing_rotation.value.data_rotated &&
528 stream->ongoing_rotation.value.index_rotated) {
529 /* Rotation completed; reset its state. */
530 DBG("Rotation completed for stream %" PRIu64,
531 stream->stream_handle);
532 stream_complete_rotation(stream);
533 }
534 }
535
536 end:
537 return ret;
538 }
539
540 static int stream_set_trace_chunk(struct relay_stream *stream,
541 struct lttng_trace_chunk *chunk)
542 {
543 int ret = 0;
544 enum lttng_trace_chunk_status status;
545 bool acquired_reference;
546
547 status = lttng_trace_chunk_create_subdirectory(chunk,
548 stream->path_name);
549 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
550 ret = -1;
551 goto end;
552 }
553
554 lttng_trace_chunk_put(stream->trace_chunk);
555 acquired_reference = lttng_trace_chunk_get(chunk);
556 LTTNG_ASSERT(acquired_reference);
557 stream->trace_chunk = chunk;
558
559 if (stream->file) {
560 fs_handle_close(stream->file);
561 stream->file = NULL;
562 }
563 ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
564 false, &stream->file);
565 end:
566 return ret;
567 }
568
569 /*
570 * We keep ownership of path_name and channel_name.
571 */
572 struct relay_stream *stream_create(struct ctf_trace *trace,
573 uint64_t stream_handle, char *path_name,
574 char *channel_name, uint64_t tracefile_size,
575 uint64_t tracefile_count)
576 {
577 int ret;
578 struct relay_stream *stream = NULL;
579 struct relay_session *session = trace->session;
580 bool acquired_reference = false;
581 struct lttng_trace_chunk *current_trace_chunk;
582
583 stream = zmalloc<relay_stream>();
584 if (stream == NULL) {
585 PERROR("relay stream zmalloc");
586 goto error_no_alloc;
587 }
588
589 stream->stream_handle = stream_handle;
590 stream->prev_data_seq = -1ULL;
591 stream->prev_index_seq = -1ULL;
592 stream->last_net_seq_num = -1ULL;
593 stream->ctf_stream_id = -1ULL;
594 stream->tracefile_size = tracefile_size;
595 stream->tracefile_count = tracefile_count;
596 stream->path_name = path_name;
597 stream->channel_name = channel_name;
598 stream->beacon_ts_end = -1ULL;
599 lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
600 pthread_mutex_init(&stream->lock, NULL);
601 urcu_ref_init(&stream->ref);
602 ctf_trace_get(trace);
603 stream->trace = trace;
604
605 pthread_mutex_lock(&trace->session->lock);
606 current_trace_chunk = trace->session->current_trace_chunk;
607 if (current_trace_chunk) {
608 acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
609 }
610 pthread_mutex_unlock(&trace->session->lock);
611 if (!acquired_reference) {
612 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
613 channel_name);
614 ret = -1;
615 goto end;
616 }
617
618 stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
619 if (!stream->indexes_ht) {
620 ERR("Cannot created indexes_ht");
621 ret = -1;
622 goto end;
623 }
624
625 pthread_mutex_lock(&stream->lock);
626 ret = stream_set_trace_chunk(stream, current_trace_chunk);
627 pthread_mutex_unlock(&stream->lock);
628 if (ret) {
629 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
630 trace->session->session_name,
631 stream->channel_name);
632 ret = -1;
633 goto end;
634 }
635 stream->tfa = tracefile_array_create(stream->tracefile_count);
636 if (!stream->tfa) {
637 ret = -1;
638 goto end;
639 }
640
641 stream->is_metadata = !strcmp(stream->channel_name,
642 DEFAULT_METADATA_NAME);
643 stream->in_recv_list = true;
644
645 /*
646 * Add the stream in the recv list of the session. Once the end stream
647 * message is received, all session streams are published.
648 */
649 pthread_mutex_lock(&session->recv_list_lock);
650 cds_list_add_rcu(&stream->recv_node, &session->recv_list);
651 session->stream_count++;
652 pthread_mutex_unlock(&session->recv_list_lock);
653
654 /*
655 * Both in the ctf_trace object and the global stream ht since the data
656 * side of the relayd does not have the concept of session.
657 */
658 lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
659 stream->in_stream_ht = true;
660
661 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
662 stream->stream_handle);
663 ret = 0;
664
665 end:
666 if (ret) {
667 if (stream->file) {
668 fs_handle_close(stream->file);
669 stream->file = NULL;
670 }
671 stream_put(stream);
672 stream = NULL;
673 }
674 if (acquired_reference) {
675 lttng_trace_chunk_put(current_trace_chunk);
676 }
677 return stream;
678
679 error_no_alloc:
680 /*
681 * path_name and channel_name need to be freed explicitly here
682 * because we cannot rely on stream_put().
683 */
684 free(path_name);
685 free(channel_name);
686 return NULL;
687 }
688
689 /*
690 * Called with the session lock held.
691 */
692 void stream_publish(struct relay_stream *stream)
693 {
694 struct relay_session *session;
695
696 pthread_mutex_lock(&stream->lock);
697 if (stream->published) {
698 goto unlock;
699 }
700
701 session = stream->trace->session;
702
703 pthread_mutex_lock(&session->recv_list_lock);
704 if (stream->in_recv_list) {
705 cds_list_del_rcu(&stream->recv_node);
706 stream->in_recv_list = false;
707 }
708 pthread_mutex_unlock(&session->recv_list_lock);
709
710 pthread_mutex_lock(&stream->trace->stream_list_lock);
711 cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
712 pthread_mutex_unlock(&stream->trace->stream_list_lock);
713
714 stream->published = true;
715 unlock:
716 pthread_mutex_unlock(&stream->lock);
717 }
718
719 /*
720 * Stream must be protected by holding the stream lock or by virtue of being
721 * called from stream_destroy.
722 */
723 static void stream_unpublish(struct relay_stream *stream)
724 {
725 if (stream->in_stream_ht) {
726 struct lttng_ht_iter iter;
727 int ret;
728
729 iter.iter.node = &stream->node.node;
730 ret = lttng_ht_del(relay_streams_ht, &iter);
731 LTTNG_ASSERT(!ret);
732 stream->in_stream_ht = false;
733 }
734 if (stream->published) {
735 pthread_mutex_lock(&stream->trace->stream_list_lock);
736 cds_list_del_rcu(&stream->stream_node);
737 pthread_mutex_unlock(&stream->trace->stream_list_lock);
738 stream->published = false;
739 }
740 }
741
742 static void stream_destroy(struct relay_stream *stream)
743 {
744 if (stream->indexes_ht) {
745 /*
746 * Calling lttng_ht_destroy in call_rcu worker thread so
747 * we don't hold the RCU read-side lock while calling
748 * it.
749 */
750 lttng_ht_destroy(stream->indexes_ht);
751 }
752 if (stream->tfa) {
753 tracefile_array_destroy(stream->tfa);
754 }
755 free(stream->path_name);
756 free(stream->channel_name);
757 free(stream);
758 }
759
760 static void stream_destroy_rcu(struct rcu_head *rcu_head)
761 {
762 struct relay_stream *stream =
763 caa_container_of(rcu_head, struct relay_stream, rcu_node);
764
765 stream_destroy(stream);
766 }
767
768 /*
769 * No need to take stream->lock since this is only called on the final
770 * stream_put which ensures that a single thread may act on the stream.
771 */
772 static void stream_release(struct urcu_ref *ref)
773 {
774 struct relay_stream *stream =
775 caa_container_of(ref, struct relay_stream, ref);
776 struct relay_session *session;
777
778 session = stream->trace->session;
779
780 DBG("Releasing stream id %" PRIu64, stream->stream_handle);
781
782 pthread_mutex_lock(&session->recv_list_lock);
783 session->stream_count--;
784 if (stream->in_recv_list) {
785 cds_list_del_rcu(&stream->recv_node);
786 stream->in_recv_list = false;
787 }
788 pthread_mutex_unlock(&session->recv_list_lock);
789
790 stream_unpublish(stream);
791
792 if (stream->file) {
793 fs_handle_close(stream->file);
794 stream->file = NULL;
795 }
796 if (stream->index_file) {
797 lttng_index_file_put(stream->index_file);
798 stream->index_file = NULL;
799 }
800 if (stream->trace) {
801 ctf_trace_put(stream->trace);
802 stream->trace = NULL;
803 }
804 stream_complete_rotation(stream);
805 lttng_trace_chunk_put(stream->trace_chunk);
806 stream->trace_chunk = NULL;
807
808 call_rcu(&stream->rcu_node, stream_destroy_rcu);
809 }
810
811 void stream_put(struct relay_stream *stream)
812 {
813 rcu_read_lock();
814 LTTNG_ASSERT(stream->ref.refcount != 0);
815 /*
816 * Wait until we have processed all the stream packets before
817 * actually putting our last stream reference.
818 */
819 urcu_ref_put(&stream->ref, stream_release);
820 rcu_read_unlock();
821 }
822
823 int stream_set_pending_rotation(struct relay_stream *stream,
824 struct lttng_trace_chunk *next_trace_chunk,
825 uint64_t rotation_sequence_number)
826 {
827 int ret = 0;
828 const struct relay_stream_rotation rotation = {
829 .data_rotated = false,
830 .index_rotated = false,
831 .packet_seq_num = rotation_sequence_number,
832 .prev_data_net_seq = -1ULL,
833 .next_trace_chunk = next_trace_chunk,
834 };
835
836 if (stream->ongoing_rotation.is_set) {
837 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
838 ret = -1;
839 goto end;
840 }
841
842 if (next_trace_chunk) {
843 const bool reference_acquired =
844 lttng_trace_chunk_get(next_trace_chunk);
845
846 LTTNG_ASSERT(reference_acquired);
847 }
848 LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
849
850 DBG("Setting pending rotation: stream_id = %" PRIu64
851 ", rotate_at_packet_seq_num = %" PRIu64,
852 stream->stream_handle, rotation_sequence_number);
853 if (stream->is_metadata) {
854 /*
855 * A metadata stream has no index; consider it already rotated.
856 */
857 stream->ongoing_rotation.value.index_rotated = true;
858 if (next_trace_chunk) {
859 /*
860 * The metadata will be received again in the new chunk.
861 */
862 stream->metadata_received = 0;
863 }
864 ret = stream_rotate_data_file(stream);
865 } else {
866 ret = try_rotate_stream_index(stream);
867 if (ret < 0) {
868 goto end;
869 }
870
871 ret = try_rotate_stream_data(stream);
872 if (ret < 0) {
873 goto end;
874 }
875 }
876 end:
877 return ret;
878 }
879
880 void try_stream_close(struct relay_stream *stream)
881 {
882 bool session_aborted;
883 struct relay_session *session = stream->trace->session;
884
885 DBG("Trying to close stream %" PRIu64, stream->stream_handle);
886
887 pthread_mutex_lock(&session->lock);
888 session_aborted = session->aborted;
889 pthread_mutex_unlock(&session->lock);
890
891 pthread_mutex_lock(&stream->lock);
892 /*
893 * Can be called concurently by connection close and reception of last
894 * pending data.
895 */
896 if (stream->closed) {
897 pthread_mutex_unlock(&stream->lock);
898 DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
899 return;
900 }
901
902 stream->close_requested = true;
903
904 if (stream->last_net_seq_num == -1ULL) {
905 /*
906 * Handle connection close without explicit stream close
907 * command.
908 *
909 * We can be clever about indexes partially received in
910 * cases where we received the data socket part, but not
911 * the control socket part: since we're currently closing
912 * the stream on behalf of the control socket, we *know*
913 * there won't be any more control information for this
914 * socket. Therefore, we can destroy all indexes for
915 * which we have received only the file descriptor (from
916 * data socket). This takes care of consumerd crashes
917 * between sending the data and control information for
918 * a packet. Since those are sent in that order, we take
919 * care of consumerd crashes.
920 */
921 DBG("relay_index_close_partial_fd");
922 relay_index_close_partial_fd(stream);
923 /*
924 * Use the highest net_seq_num we currently have pending
925 * As end of stream indicator. Leave last_net_seq_num
926 * at -1ULL if we cannot find any index.
927 */
928 stream->last_net_seq_num = relay_index_find_last(stream);
929 DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
930 /* Fall-through into the next check. */
931 }
932
933 if (stream->last_net_seq_num != -1ULL &&
934 ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
935 && !session_aborted) {
936 /*
937 * Don't close since we still have data pending. This
938 * handles cases where an explicit close command has
939 * been received for this stream, and cases where the
940 * connection has been closed, and we are awaiting for
941 * index information from the data socket. It is
942 * therefore expected that all the index fd information
943 * we need has already been received on the control
944 * socket. Matching index information from data socket
945 * should be Expected Soon(TM).
946 *
947 * TODO: We should implement a timer to garbage collect
948 * streams after a timeout to be resilient against a
949 * consumerd implementation that would not match this
950 * expected behavior.
951 */
952 pthread_mutex_unlock(&stream->lock);
953 DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
954 return;
955 }
956 /*
957 * We received all the indexes we can expect.
958 */
959 stream_unpublish(stream);
960 stream->closed = true;
961 /* Relay indexes are only used by the "consumer/sessiond" end. */
962 relay_index_close_all(stream);
963
964 /*
965 * If we are closed by an application exiting (per-pid buffers),
966 * we need to put our reference on the stream trace chunk right
967 * away, because otherwise still holding the reference on the
968 * trace chunk could allow a viewer stream (which holds a reference
969 * to the stream) to postpone destroy waiting for the chunk to cease
970 * to exist endlessly until the viewer is detached.
971 */
972
973 /* Put stream fd before put chunk. */
974 if (stream->file) {
975 fs_handle_close(stream->file);
976 stream->file = NULL;
977 }
978 if (stream->index_file) {
979 lttng_index_file_put(stream->index_file);
980 stream->index_file = NULL;
981 }
982 lttng_trace_chunk_put(stream->trace_chunk);
983 stream->trace_chunk = NULL;
984 pthread_mutex_unlock(&stream->lock);
985 DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
986 stream_put(stream);
987 }
988
989 int stream_init_packet(struct relay_stream *stream, size_t packet_size,
990 bool *file_rotated)
991 {
992 int ret = 0;
993
994 ASSERT_LOCKED(stream->lock);
995
996 if (!stream->file || !stream->trace_chunk) {
997 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
998 stream->stream_handle, stream->channel_name);
999 ret = -1;
1000 goto end;
1001 }
1002
1003 if (caa_likely(stream->tracefile_size == 0)) {
1004 /* No size limit set; nothing to check. */
1005 goto end;
1006 }
1007
1008 /*
1009 * Check if writing the new packet would exceed the maximal file size.
1010 */
1011 if (caa_unlikely((stream->tracefile_size_current + packet_size) >
1012 stream->tracefile_size)) {
1013 const uint64_t new_file_index =
1014 (stream->tracefile_current_index + 1) %
1015 stream->tracefile_count;
1016
1017 if (new_file_index < stream->tracefile_current_index) {
1018 stream->tracefile_wrapped_around = true;
1019 }
1020 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
1021 ", current_file_size = %" PRIu64
1022 ", packet_size = %zu, current_file_index = %" PRIu64
1023 " new_file_index = %" PRIu64,
1024 stream->stream_handle,
1025 stream->tracefile_size_current, packet_size,
1026 stream->tracefile_current_index, new_file_index);
1027 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
1028 stream->tracefile_current_index = new_file_index;
1029
1030 if (stream->file) {
1031 fs_handle_close(stream->file);
1032 stream->file = NULL;
1033 }
1034 ret = stream_create_data_output_file_from_trace_chunk(stream,
1035 stream->trace_chunk, false, &stream->file);
1036 if (ret) {
1037 ERR("Failed to perform trace file rotation of stream %" PRIu64,
1038 stream->stream_handle);
1039 goto end;
1040 }
1041
1042 /*
1043 * Reset current size because we just performed a stream
1044 * rotation.
1045 */
1046 DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
1047 __func__, stream->stream_handle, stream->tracefile_size_current);
1048 stream->tracefile_size_current = 0;
1049 *file_rotated = true;
1050 } else {
1051 *file_rotated = false;
1052 }
1053 end:
1054 return ret;
1055 }
1056
1057 /* Note that the packet is not necessarily complete. */
1058 int stream_write(struct relay_stream *stream,
1059 const struct lttng_buffer_view *packet, size_t padding_len)
1060 {
1061 int ret = 0;
1062 ssize_t write_ret;
1063 size_t padding_to_write = padding_len;
1064 char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
1065
1066 ASSERT_LOCKED(stream->lock);
1067 memset(padding_buffer, 0,
1068 std::min(sizeof(padding_buffer), padding_to_write));
1069
1070 if (!stream->file || !stream->trace_chunk) {
1071 ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
1072 stream->stream_handle, stream->channel_name);
1073 ret = -1;
1074 goto end;
1075 }
1076 if (packet) {
1077 write_ret = fs_handle_write(
1078 stream->file, packet->data, packet->size);
1079 if (write_ret != packet->size) {
1080 PERROR("Failed to write to stream file of %sstream %" PRIu64,
1081 stream->is_metadata ? "metadata " : "",
1082 stream->stream_handle);
1083 ret = -1;
1084 goto end;
1085 }
1086 }
1087
1088 while (padding_to_write > 0) {
1089 const size_t padding_to_write_this_pass =
1090 std::min(padding_to_write, sizeof(padding_buffer));
1091
1092 write_ret = fs_handle_write(stream->file, padding_buffer,
1093 padding_to_write_this_pass);
1094 if (write_ret != padding_to_write_this_pass) {
1095 PERROR("Failed to write padding to file of %sstream %" PRIu64,
1096 stream->is_metadata ? "metadata " : "",
1097 stream->stream_handle);
1098 ret = -1;
1099 goto end;
1100 }
1101 padding_to_write -= padding_to_write_this_pass;
1102 }
1103
1104 if (stream->is_metadata) {
1105 size_t recv_len;
1106
1107 recv_len = packet ? packet->size : 0;
1108 recv_len += padding_len;
1109 stream->metadata_received += recv_len;
1110 if (recv_len) {
1111 stream->no_new_metadata_notified = false;
1112 }
1113 }
1114
1115 DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
1116 stream->is_metadata ? "metadata " : "",
1117 stream->stream_handle,
1118 packet ? packet->size : (size_t) 0, padding_len);
1119 end:
1120 return ret;
1121 }
1122
1123 /*
1124 * Update index after receiving a packet for a data stream.
1125 *
1126 * Called with the stream lock held.
1127 *
1128 * Return 0 on success else a negative value.
1129 */
1130 int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
1131 bool rotate_index, bool *flushed, uint64_t total_size)
1132 {
1133 int ret = 0;
1134 uint64_t data_offset;
1135 struct relay_index *index;
1136
1137 LTTNG_ASSERT(stream->trace_chunk);
1138 ASSERT_LOCKED(stream->lock);
1139 /* Get data offset because we are about to update the index. */
1140 data_offset = htobe64(stream->tracefile_size_current);
1141
1142 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
1143 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
1144
1145 /*
1146 * Lookup for an existing index for that stream id/sequence
1147 * number. If it exists, the control thread has already received the
1148 * data for it, thus we need to write it to disk.
1149 */
1150 index = relay_index_get_by_id_or_create(stream, net_seq_num);
1151 if (!index) {
1152 ret = -1;
1153 goto end;
1154 }
1155
1156 if (rotate_index || !stream->index_file) {
1157 ret = create_index_file(stream, stream->trace_chunk);
1158 if (ret) {
1159 ERR("Failed to create index file for stream %" PRIu64,
1160 stream->stream_handle);
1161 /* Put self-ref for this index due to error. */
1162 relay_index_put(index);
1163 index = NULL;
1164 goto end;
1165 }
1166 }
1167
1168 if (relay_index_set_file(index, stream->index_file, data_offset)) {
1169 ret = -1;
1170 /* Put self-ref for this index due to error. */
1171 relay_index_put(index);
1172 index = NULL;
1173 goto end;
1174 }
1175
1176 ret = relay_index_try_flush(index);
1177 if (ret == 0) {
1178 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
1179 tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
1180 stream->index_received_seqcount++;
1181 LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
1182 be64toh(index->index_data.packet_seq_num));
1183 *flushed = true;
1184 } else if (ret > 0) {
1185 index->total_size = total_size;
1186 /* No flush. */
1187 ret = 0;
1188 } else {
1189 /*
1190 * ret < 0
1191 *
1192 * relay_index_try_flush is responsible for the self-reference
1193 * put of the index object on error.
1194 */
1195 ERR("relay_index_try_flush error %d", ret);
1196 ret = -1;
1197 }
1198 end:
1199 return ret;
1200 }
1201
1202 int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
1203 uint64_t sequence_number, bool index_flushed)
1204 {
1205 int ret = 0;
1206
1207 ASSERT_LOCKED(stream->lock);
1208
1209 stream->tracefile_size_current += packet_total_size;
1210 if (index_flushed) {
1211 stream->pos_after_last_complete_data_index =
1212 stream->tracefile_size_current;
1213 stream->prev_index_seq = sequence_number;
1214 ret = try_rotate_stream_index(stream);
1215 if (ret < 0) {
1216 goto end;
1217 }
1218 }
1219
1220 stream->prev_data_seq = sequence_number;
1221 ret = try_rotate_stream_data(stream);
1222
1223 end:
1224 return ret;
1225 }
1226
1227 int stream_add_index(struct relay_stream *stream,
1228 const struct lttcomm_relayd_index *index_info)
1229 {
1230 int ret = 0;
1231 struct relay_index *index;
1232
1233 ASSERT_LOCKED(stream->lock);
1234
1235 DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
1236
1237 /* Live beacon handling */
1238 if (index_info->packet_size == 0) {
1239 DBG("Received live beacon for stream %" PRIu64,
1240 stream->stream_handle);
1241
1242 /*
1243 * Only flag a stream inactive when it has already
1244 * received data and no indexes are in flight.
1245 */
1246 if (stream->index_received_seqcount > 0
1247 && stream->indexes_in_flight == 0) {
1248 stream->beacon_ts_end = index_info->timestamp_end;
1249 }
1250 ret = 0;
1251 goto end;
1252 } else {
1253 stream->beacon_ts_end = -1ULL;
1254 }
1255
1256 if (stream->ctf_stream_id == -1ULL) {
1257 stream->ctf_stream_id = index_info->stream_id;
1258 }
1259
1260 index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
1261 if (!index) {
1262 ret = -1;
1263 ERR("Failed to get or create index %" PRIu64,
1264 index_info->net_seq_num);
1265 goto end;
1266 }
1267 if (relay_index_set_control_data(index, index_info,
1268 stream->trace->session->minor)) {
1269 ERR("set_index_control_data error");
1270 relay_index_put(index);
1271 ret = -1;
1272 goto end;
1273 }
1274 ret = relay_index_try_flush(index);
1275 if (ret == 0) {
1276 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
1277 tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
1278 stream->index_received_seqcount++;
1279 stream->pos_after_last_complete_data_index += index->total_size;
1280 stream->prev_index_seq = index_info->net_seq_num;
1281 LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
1282 index_info->packet_seq_num);
1283
1284 ret = try_rotate_stream_index(stream);
1285 if (ret < 0) {
1286 goto end;
1287 }
1288 ret = try_rotate_stream_data(stream);
1289 if (ret < 0) {
1290 goto end;
1291 }
1292 } else if (ret > 0) {
1293 /* no flush. */
1294 ret = 0;
1295 } else {
1296 /*
1297 * ret < 0
1298 *
1299 * relay_index_try_flush is responsible for the self-reference
1300 * put of the index object on error.
1301 */
1302 ERR("relay_index_try_flush error %d", ret);
1303 ret = -1;
1304 }
1305 end:
1306 return ret;
1307 }
1308
1309 static void print_stream_indexes(struct relay_stream *stream)
1310 {
1311 struct lttng_ht_iter iter;
1312 struct relay_index *index;
1313
1314 rcu_read_lock();
1315 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
1316 index_n.node) {
1317 DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
1318 " stream %" PRIu64 " trace %" PRIu64
1319 " session %" PRIu64,
1320 index,
1321 index->index_n.key,
1322 stream->ref.refcount,
1323 index->stream->stream_handle,
1324 index->stream->trace->id,
1325 index->stream->trace->session->id);
1326 }
1327 rcu_read_unlock();
1328 }
1329
1330 int stream_reset_file(struct relay_stream *stream)
1331 {
1332 ASSERT_LOCKED(stream->lock);
1333
1334 if (stream->file) {
1335 int ret;
1336
1337 ret = fs_handle_close(stream->file);
1338 if (ret) {
1339 ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
1340 stream->channel_name,
1341 stream->stream_handle);
1342 }
1343 stream->file = NULL;
1344 }
1345
1346 DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
1347 __func__, stream->stream_handle, stream->tracefile_size_current);
1348 stream->tracefile_size_current = 0;
1349 stream->prev_data_seq = 0;
1350 stream->prev_index_seq = 0;
1351 /* Note that this does not reset the tracefile array. */
1352 stream->tracefile_current_index = 0;
1353 stream->pos_after_last_complete_data_index = 0;
1354
1355 return stream_create_data_output_file_from_trace_chunk(stream,
1356 stream->trace_chunk, true, &stream->file);
1357 }
1358
1359 void print_relay_streams(void)
1360 {
1361 struct lttng_ht_iter iter;
1362 struct relay_stream *stream;
1363
1364 if (!relay_streams_ht) {
1365 return;
1366 }
1367
1368 rcu_read_lock();
1369 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
1370 node.node) {
1371 if (!stream_get(stream)) {
1372 continue;
1373 }
1374 DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
1375 " session %" PRIu64,
1376 stream,
1377 stream->ref.refcount,
1378 stream->stream_handle,
1379 stream->trace->id,
1380 stream->trace->session->id);
1381 print_stream_indexes(stream);
1382 stream_put(stream);
1383 }
1384 rcu_read_unlock();
1385 }
This page took 0.089667 seconds and 4 git commands to generate.