Require automake >= 1.12
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #define _LGPL_SOURCE
22 #include <common/common.h>
23 #include <common/utils.h>
24 #include <common/defaults.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <urcu/rculist.h>
27 #include <sys/stat.h>
28
29 #include "lttng-relayd.h"
30 #include "index.h"
31 #include "stream.h"
32 #include "viewer-stream.h"
33
34 #include <sys/types.h>
35 #include <fcntl.h>
36
37 #define FILE_IO_STACK_BUFFER_SIZE 65536
38
39 /* Should be called with RCU read-side lock held. */
40 bool stream_get(struct relay_stream *stream)
41 {
42 return urcu_ref_get_unless_zero(&stream->ref);
43 }
44
45 /*
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
49 */
50 struct relay_stream *stream_get_by_id(uint64_t stream_id)
51 {
52 struct lttng_ht_node_u64 *node;
53 struct lttng_ht_iter iter;
54 struct relay_stream *stream = NULL;
55
56 rcu_read_lock();
57 lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
58 node = lttng_ht_iter_get_node_u64(&iter);
59 if (!node) {
60 DBG("Relay stream %" PRIu64 " not found", stream_id);
61 goto end;
62 }
63 stream = caa_container_of(node, struct relay_stream, node);
64 if (!stream_get(stream)) {
65 stream = NULL;
66 }
67 end:
68 rcu_read_unlock();
69 return stream;
70 }
71
72 static void stream_complete_rotation(struct relay_stream *stream)
73 {
74 DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
75 lttng_trace_chunk_put(stream->trace_chunk);
76 stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
77 stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
78 }
79
80 static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream *stream,
82 struct lttng_trace_chunk *trace_chunk,
83 bool force_unlink,
84 struct stream_fd **out_stream_fd)
85 {
86 int ret, fd;
87 char stream_path[LTTNG_PATH_MAX];
88 enum lttng_trace_chunk_status status;
89 const int flags = O_RDWR | O_CREAT | O_TRUNC;
90 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
91
92 ASSERT_LOCKED(stream->lock);
93 assert(stream->trace_chunk);
94
95 ret = utils_stream_file_path(stream->path_name, stream->channel_name,
96 stream->tracefile_size, stream->tracefile_current_index,
97 NULL, stream_path, sizeof(stream_path));
98 if (ret < 0) {
99 goto end;
100 }
101
102 if (stream->tracefile_wrapped_around || force_unlink) {
103 /*
104 * The on-disk ring-buffer has wrapped around.
105 * Newly created stream files will replace existing files. Since
106 * live clients may be consuming existing files, the file about
107 * to be replaced is unlinked in order to not overwrite its
108 * content.
109 */
110 status = lttng_trace_chunk_unlink_file(trace_chunk,
111 stream_path);
112 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
113 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
114 stream_path);
115 /*
116 * Don't abort if the file doesn't exist, it is
117 * unexpected, but should not be a fatal error.
118 */
119 if (errno != ENOENT) {
120 ret = -1;
121 goto end;
122 }
123 }
124 }
125
126 status = lttng_trace_chunk_open_file(
127 trace_chunk, stream_path, flags, mode, &fd);
128 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
129 ERR("Failed to open stream file \"%s\"", stream->channel_name);
130 ret = -1;
131 goto end;
132 }
133
134 *out_stream_fd = stream_fd_create(fd);
135 if (!*out_stream_fd) {
136 if (close(ret)) {
137 PERROR("Error closing stream file descriptor %d", ret);
138 }
139 ret = -1;
140 goto end;
141 }
142 end:
143 return ret;
144 }
145
146 static int stream_rotate_data_file(struct relay_stream *stream)
147 {
148 int ret = 0;
149
150 DBG("Rotating stream %" PRIu64 " data file",
151 stream->stream_handle);
152
153 if (stream->stream_fd) {
154 stream_fd_put(stream->stream_fd);
155 stream->stream_fd = NULL;
156 }
157
158 stream->tracefile_wrapped_around = false;
159 stream->tracefile_current_index = 0;
160
161 if (stream->ongoing_rotation.value.next_trace_chunk) {
162 struct stream_fd *new_stream_fd = NULL;
163 enum lttng_trace_chunk_status chunk_status;
164
165 chunk_status = lttng_trace_chunk_create_subdirectory(
166 stream->ongoing_rotation.value.next_trace_chunk,
167 stream->path_name);
168 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
169 ret = -1;
170 goto end;
171 }
172
173 /* Rotate the data file. */
174 ret = stream_create_data_output_file_from_trace_chunk(stream,
175 stream->ongoing_rotation.value.next_trace_chunk,
176 false, &new_stream_fd);
177 stream->stream_fd = new_stream_fd;
178 if (ret < 0) {
179 ERR("Failed to rotate stream data file");
180 goto end;
181 }
182 }
183 stream->tracefile_size_current = 0;
184 stream->pos_after_last_complete_data_index = 0;
185 stream->ongoing_rotation.value.data_rotated = true;
186
187 if (stream->ongoing_rotation.value.index_rotated) {
188 /* Rotation completed; reset its state. */
189 stream_complete_rotation(stream);
190 }
191 end:
192 return ret;
193 }
194
195 /*
196 * If too much data has been written in a tracefile before we received the
197 * rotation command, we have to move the excess data to the new tracefile and
198 * perform the rotation. This can happen because the control and data
199 * connections are separate, the indexes as well as the commands arrive from
200 * the control connection and we have no control over the order so we could be
201 * in a situation where too much data has been received on the data connection
202 * before the rotation command on the control connection arrives.
203 */
204 static int rotate_truncate_stream(struct relay_stream *stream)
205 {
206 int ret;
207 off_t lseek_ret, previous_stream_copy_origin;
208 uint64_t copy_bytes_left, misplaced_data_size;
209 bool acquired_reference;
210 struct stream_fd *previous_stream_fd = NULL;
211 struct lttng_trace_chunk *previous_chunk = NULL;
212
213 if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
214 ERR("Protocol error encoutered in %s(): stream rotation "
215 "sequence number is before the current sequence number "
216 "and the next trace chunk is unset. Honoring this "
217 "rotation command would result in data loss",
218 __FUNCTION__);
219 ret = -1;
220 goto end;
221 }
222
223 ASSERT_LOCKED(stream->lock);
224 /*
225 * Acquire a reference to the current trace chunk to ensure
226 * it is not reclaimed when `stream_rotate_data_file` is called.
227 * Failing to do so would violate the contract of the trace
228 * chunk API as an active file descriptor would outlive the
229 * trace chunk.
230 */
231 acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
232 assert(acquired_reference);
233 previous_chunk = stream->trace_chunk;
234
235 /*
236 * Steal the stream's reference to its stream_fd. A new
237 * stream_fd will be created when the rotation completes and
238 * the orinal stream_fd will be used to copy the "extra" data
239 * to the new file.
240 */
241 assert(stream->stream_fd);
242 previous_stream_fd = stream->stream_fd;
243 stream->stream_fd = NULL;
244
245 assert(!stream->is_metadata);
246 assert(stream->tracefile_size_current >
247 stream->pos_after_last_complete_data_index);
248 misplaced_data_size = stream->tracefile_size_current -
249 stream->pos_after_last_complete_data_index;
250 copy_bytes_left = misplaced_data_size;
251 previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
252
253 ret = stream_rotate_data_file(stream);
254 if (ret) {
255 goto end;
256 }
257
258 assert(stream->stream_fd);
259 /*
260 * Seek the current tracefile to the position at which the rotation
261 * should have occurred.
262 */
263 lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
264 SEEK_SET);
265 if (lseek_ret < 0) {
266 PERROR("Failed to seek to offset %" PRIu64
267 " while copying extra data received before a stream rotation",
268 (uint64_t) previous_stream_copy_origin);
269 ret = -1;
270 goto end;
271 }
272
273 /* Move data from the old file to the new file. */
274 while (copy_bytes_left) {
275 ssize_t io_ret;
276 char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
277 const off_t copy_size_this_pass = min_t(
278 off_t, copy_bytes_left, sizeof(copy_buffer));
279
280 io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
281 copy_size_this_pass);
282 if (io_ret < (ssize_t) copy_size_this_pass) {
283 if (io_ret == -1) {
284 PERROR("Failed to read %" PRIu64
285 " bytes from fd %i in %s(), returned %zi",
286 copy_size_this_pass,
287 previous_stream_fd->fd,
288 __FUNCTION__, io_ret);
289 } else {
290 ERR("Failed to read %" PRIu64
291 " bytes from fd %i in %s(), returned %zi",
292 copy_size_this_pass,
293 previous_stream_fd->fd,
294 __FUNCTION__, io_ret);
295 }
296 ret = -1;
297 goto end;
298 }
299
300 io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
301 copy_size_this_pass);
302 if (io_ret < (ssize_t) copy_size_this_pass) {
303 if (io_ret == -1) {
304 PERROR("Failed to write %" PRIu64
305 " bytes from fd %i in %s(), returned %zi",
306 copy_size_this_pass,
307 stream->stream_fd->fd,
308 __FUNCTION__, io_ret);
309 } else {
310 ERR("Failed to write %" PRIu64
311 " bytes from fd %i in %s(), returned %zi",
312 copy_size_this_pass,
313 stream->stream_fd->fd,
314 __FUNCTION__, io_ret);
315 }
316 ret = -1;
317 goto end;
318 }
319 copy_bytes_left -= copy_size_this_pass;
320 }
321
322 /* Truncate the file to get rid of the excess data. */
323 ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
324 if (ret) {
325 PERROR("Failed to truncate current stream file to offset %" PRIu64,
326 previous_stream_copy_origin);
327 goto end;
328 }
329
330 /*
331 * Update the offset and FD of all the eventual indexes created by the
332 * data connection before the rotation command arrived.
333 */
334 ret = relay_index_switch_all_files(stream);
335 if (ret < 0) {
336 ERR("Failed to rotate index file");
337 goto end;
338 }
339
340 stream->tracefile_size_current = misplaced_data_size;
341 /* Index and data contents are back in sync. */
342 stream->pos_after_last_complete_data_index = 0;
343 ret = 0;
344 end:
345 lttng_trace_chunk_put(previous_chunk);
346 stream_fd_put(previous_stream_fd);
347 return ret;
348 }
349
350 /*
351 * Check if a stream's data file (as opposed to index) should be rotated
352 * (for session rotation).
353 * Must be called with the stream lock held.
354 *
355 * Return 0 on success, a negative value on error.
356 */
357 static int try_rotate_stream_data(struct relay_stream *stream)
358 {
359 int ret = 0;
360
361 if (caa_likely(!stream->ongoing_rotation.is_set)) {
362 /* No rotation expected. */
363 goto end;
364 }
365
366 if (stream->ongoing_rotation.value.data_rotated) {
367 /* Rotation of the data file has already occurred. */
368 goto end;
369 }
370
371 if (stream->prev_data_seq == -1ULL ||
372 stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
373 /*
374 * The next packet that will be written is not part of the next
375 * chunk yet.
376 */
377 DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
378 ", prev_data_seq = %" PRIu64 ")",
379 stream->stream_handle,
380 stream->ongoing_rotation.value.seq_num,
381 stream->prev_data_seq);
382 goto end;
383 } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
384 /*
385 * prev_data_seq is checked here since indexes and rotation
386 * commands are serialized with respect to each other.
387 */
388 DBG("Rotation after too much data has been written in tracefile "
389 "for stream %" PRIu64 ", need to truncate before "
390 "rotating", stream->stream_handle);
391 ret = rotate_truncate_stream(stream);
392 if (ret) {
393 ERR("Failed to truncate stream");
394 goto end;
395 }
396 } else {
397 ret = stream_rotate_data_file(stream);
398 }
399
400 end:
401 return ret;
402 }
403
404 /*
405 * Close the current index file if it is open, and create a new one.
406 *
407 * Return 0 on success, -1 on error.
408 */
409 static int create_index_file(struct relay_stream *stream,
410 struct lttng_trace_chunk *chunk)
411 {
412 int ret;
413 uint32_t major, minor;
414 char *index_subpath = NULL;
415 enum lttng_trace_chunk_status status;
416
417 ASSERT_LOCKED(stream->lock);
418
419 /* Put ref on previous index_file. */
420 if (stream->index_file) {
421 lttng_index_file_put(stream->index_file);
422 stream->index_file = NULL;
423 }
424 major = stream->trace->session->major;
425 minor = stream->trace->session->minor;
426
427 if (!chunk) {
428 ret = 0;
429 goto end;
430 }
431 ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
432 DEFAULT_INDEX_DIR);
433 if (ret < 0) {
434 goto end;
435 }
436
437 status = lttng_trace_chunk_create_subdirectory(chunk,
438 index_subpath);
439 free(index_subpath);
440 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
441 ret = -1;
442 goto end;
443 }
444 stream->index_file = lttng_index_file_create_from_trace_chunk(
445 chunk, stream->path_name,
446 stream->channel_name, stream->tracefile_size,
447 stream->tracefile_current_index,
448 lttng_to_index_major(major, minor),
449 lttng_to_index_minor(major, minor), true);
450 if (!stream->index_file) {
451 ret = -1;
452 goto end;
453 }
454
455 ret = 0;
456
457 end:
458 return ret;
459 }
460
461 /*
462 * Check if a stream's index file should be rotated (for session rotation).
463 * Must be called with the stream lock held.
464 *
465 * Return 0 on success, a negative value on error.
466 */
467 static int try_rotate_stream_index(struct relay_stream *stream)
468 {
469 int ret = 0;
470
471 if (!stream->ongoing_rotation.is_set) {
472 /* No rotation expected. */
473 goto end;
474 }
475
476 if (stream->ongoing_rotation.value.index_rotated) {
477 /* Rotation of the index has already occurred. */
478 goto end;
479 }
480
481 if (stream->prev_index_seq == -1ULL ||
482 stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
483 DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
484 stream->stream_handle,
485 stream->ongoing_rotation.value.seq_num,
486 stream->prev_index_seq);
487 goto end;
488 } else {
489 /* The next index belongs to the new trace chunk; rotate. */
490 assert(stream->prev_index_seq + 1 ==
491 stream->ongoing_rotation.value.seq_num);
492 DBG("Rotating stream %" PRIu64 " index file",
493 stream->stream_handle);
494 ret = create_index_file(stream,
495 stream->ongoing_rotation.value.next_trace_chunk);
496 stream->ongoing_rotation.value.index_rotated = true;
497
498 if (stream->ongoing_rotation.value.data_rotated &&
499 stream->ongoing_rotation.value.index_rotated) {
500 /* Rotation completed; reset its state. */
501 DBG("Rotation completed for stream %" PRIu64,
502 stream->stream_handle);
503 stream_complete_rotation(stream);
504 }
505 }
506
507 end:
508 return ret;
509 }
510
511 static int stream_set_trace_chunk(struct relay_stream *stream,
512 struct lttng_trace_chunk *chunk)
513 {
514 int ret = 0;
515 enum lttng_trace_chunk_status status;
516 bool acquired_reference;
517 struct stream_fd *new_stream_fd = NULL;
518
519 status = lttng_trace_chunk_create_subdirectory(chunk,
520 stream->path_name);
521 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
522 ret = -1;
523 goto end;
524 }
525
526 lttng_trace_chunk_put(stream->trace_chunk);
527 acquired_reference = lttng_trace_chunk_get(chunk);
528 assert(acquired_reference);
529 stream->trace_chunk = chunk;
530
531 if (stream->stream_fd) {
532 stream_fd_put(stream->stream_fd);
533 stream->stream_fd = NULL;
534 }
535 ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
536 false, &new_stream_fd);
537 stream->stream_fd = new_stream_fd;
538 end:
539 return ret;
540 }
541
542 /*
543 * We keep ownership of path_name and channel_name.
544 */
545 struct relay_stream *stream_create(struct ctf_trace *trace,
546 uint64_t stream_handle, char *path_name,
547 char *channel_name, uint64_t tracefile_size,
548 uint64_t tracefile_count)
549 {
550 int ret;
551 struct relay_stream *stream = NULL;
552 struct relay_session *session = trace->session;
553 bool acquired_reference = false;
554 struct lttng_trace_chunk *current_trace_chunk;
555
556 stream = zmalloc(sizeof(struct relay_stream));
557 if (stream == NULL) {
558 PERROR("relay stream zmalloc");
559 goto error_no_alloc;
560 }
561
562 stream->stream_handle = stream_handle;
563 stream->prev_data_seq = -1ULL;
564 stream->prev_index_seq = -1ULL;
565 stream->last_net_seq_num = -1ULL;
566 stream->ctf_stream_id = -1ULL;
567 stream->tracefile_size = tracefile_size;
568 stream->tracefile_count = tracefile_count;
569 stream->path_name = path_name;
570 stream->channel_name = channel_name;
571 stream->beacon_ts_end = -1ULL;
572 lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
573 pthread_mutex_init(&stream->lock, NULL);
574 urcu_ref_init(&stream->ref);
575 ctf_trace_get(trace);
576 stream->trace = trace;
577
578 pthread_mutex_lock(&trace->session->lock);
579 current_trace_chunk = trace->session->current_trace_chunk;
580 if (current_trace_chunk) {
581 acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
582 }
583 pthread_mutex_unlock(&trace->session->lock);
584 if (!acquired_reference) {
585 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
586 channel_name);
587 ret = -1;
588 goto end;
589 }
590
591 stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
592 if (!stream->indexes_ht) {
593 ERR("Cannot created indexes_ht");
594 ret = -1;
595 goto end;
596 }
597
598 pthread_mutex_lock(&stream->lock);
599 ret = stream_set_trace_chunk(stream, current_trace_chunk);
600 pthread_mutex_unlock(&stream->lock);
601 if (ret) {
602 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
603 trace->session->session_name,
604 stream->channel_name);
605 ret = -1;
606 goto end;
607 }
608 stream->tfa = tracefile_array_create(stream->tracefile_count);
609 if (!stream->tfa) {
610 ret = -1;
611 goto end;
612 }
613
614 stream->is_metadata = !strcmp(stream->channel_name,
615 DEFAULT_METADATA_NAME);
616 stream->in_recv_list = true;
617
618 /*
619 * Add the stream in the recv list of the session. Once the end stream
620 * message is received, all session streams are published.
621 */
622 pthread_mutex_lock(&session->recv_list_lock);
623 cds_list_add_rcu(&stream->recv_node, &session->recv_list);
624 session->stream_count++;
625 pthread_mutex_unlock(&session->recv_list_lock);
626
627 /*
628 * Both in the ctf_trace object and the global stream ht since the data
629 * side of the relayd does not have the concept of session.
630 */
631 lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
632 stream->in_stream_ht = true;
633
634 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
635 stream->stream_handle);
636 ret = 0;
637
638 end:
639 if (ret) {
640 if (stream->stream_fd) {
641 stream_fd_put(stream->stream_fd);
642 stream->stream_fd = NULL;
643 }
644 stream_put(stream);
645 stream = NULL;
646 }
647 if (acquired_reference) {
648 lttng_trace_chunk_put(current_trace_chunk);
649 }
650 return stream;
651
652 error_no_alloc:
653 /*
654 * path_name and channel_name need to be freed explicitly here
655 * because we cannot rely on stream_put().
656 */
657 free(path_name);
658 free(channel_name);
659 return NULL;
660 }
661
662 /*
663 * Called with the session lock held.
664 */
665 void stream_publish(struct relay_stream *stream)
666 {
667 struct relay_session *session;
668
669 pthread_mutex_lock(&stream->lock);
670 if (stream->published) {
671 goto unlock;
672 }
673
674 session = stream->trace->session;
675
676 pthread_mutex_lock(&session->recv_list_lock);
677 if (stream->in_recv_list) {
678 cds_list_del_rcu(&stream->recv_node);
679 stream->in_recv_list = false;
680 }
681 pthread_mutex_unlock(&session->recv_list_lock);
682
683 pthread_mutex_lock(&stream->trace->stream_list_lock);
684 cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
685 pthread_mutex_unlock(&stream->trace->stream_list_lock);
686
687 stream->published = true;
688 unlock:
689 pthread_mutex_unlock(&stream->lock);
690 }
691
692 /*
693 * Stream must be protected by holding the stream lock or by virtue of being
694 * called from stream_destroy.
695 */
696 static void stream_unpublish(struct relay_stream *stream)
697 {
698 if (stream->in_stream_ht) {
699 struct lttng_ht_iter iter;
700 int ret;
701
702 iter.iter.node = &stream->node.node;
703 ret = lttng_ht_del(relay_streams_ht, &iter);
704 assert(!ret);
705 stream->in_stream_ht = false;
706 }
707 if (stream->published) {
708 pthread_mutex_lock(&stream->trace->stream_list_lock);
709 cds_list_del_rcu(&stream->stream_node);
710 pthread_mutex_unlock(&stream->trace->stream_list_lock);
711 stream->published = false;
712 }
713 }
714
715 static void stream_destroy(struct relay_stream *stream)
716 {
717 if (stream->indexes_ht) {
718 /*
719 * Calling lttng_ht_destroy in call_rcu worker thread so
720 * we don't hold the RCU read-side lock while calling
721 * it.
722 */
723 lttng_ht_destroy(stream->indexes_ht);
724 }
725 if (stream->tfa) {
726 tracefile_array_destroy(stream->tfa);
727 }
728 free(stream->path_name);
729 free(stream->channel_name);
730 free(stream);
731 }
732
733 static void stream_destroy_rcu(struct rcu_head *rcu_head)
734 {
735 struct relay_stream *stream =
736 caa_container_of(rcu_head, struct relay_stream, rcu_node);
737
738 stream_destroy(stream);
739 }
740
741 /*
742 * No need to take stream->lock since this is only called on the final
743 * stream_put which ensures that a single thread may act on the stream.
744 */
745 static void stream_release(struct urcu_ref *ref)
746 {
747 struct relay_stream *stream =
748 caa_container_of(ref, struct relay_stream, ref);
749 struct relay_session *session;
750
751 session = stream->trace->session;
752
753 DBG("Releasing stream id %" PRIu64, stream->stream_handle);
754
755 pthread_mutex_lock(&session->recv_list_lock);
756 session->stream_count--;
757 if (stream->in_recv_list) {
758 cds_list_del_rcu(&stream->recv_node);
759 stream->in_recv_list = false;
760 }
761 pthread_mutex_unlock(&session->recv_list_lock);
762
763 stream_unpublish(stream);
764
765 if (stream->stream_fd) {
766 stream_fd_put(stream->stream_fd);
767 stream->stream_fd = NULL;
768 }
769 if (stream->index_file) {
770 lttng_index_file_put(stream->index_file);
771 stream->index_file = NULL;
772 }
773 if (stream->trace) {
774 ctf_trace_put(stream->trace);
775 stream->trace = NULL;
776 }
777 stream_complete_rotation(stream);
778 lttng_trace_chunk_put(stream->trace_chunk);
779 stream->trace_chunk = NULL;
780
781 call_rcu(&stream->rcu_node, stream_destroy_rcu);
782 }
783
784 void stream_put(struct relay_stream *stream)
785 {
786 rcu_read_lock();
787 assert(stream->ref.refcount != 0);
788 /*
789 * Wait until we have processed all the stream packets before
790 * actually putting our last stream reference.
791 */
792 urcu_ref_put(&stream->ref, stream_release);
793 rcu_read_unlock();
794 }
795
796 int stream_set_pending_rotation(struct relay_stream *stream,
797 struct lttng_trace_chunk *next_trace_chunk,
798 uint64_t rotation_sequence_number)
799 {
800 int ret = 0;
801 const struct relay_stream_rotation rotation = {
802 .seq_num = rotation_sequence_number,
803 .next_trace_chunk = next_trace_chunk,
804 };
805
806 if (stream->ongoing_rotation.is_set) {
807 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
808 ret = -1;
809 goto end;
810 }
811
812 if (next_trace_chunk) {
813 const bool reference_acquired =
814 lttng_trace_chunk_get(next_trace_chunk);
815
816 assert(reference_acquired);
817 }
818 LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
819
820 DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
821 stream->stream_handle, rotation_sequence_number);
822 if (stream->is_metadata) {
823 /*
824 * A metadata stream has no index; consider it already rotated.
825 */
826 stream->ongoing_rotation.value.index_rotated = true;
827 ret = stream_rotate_data_file(stream);
828 } else {
829 ret = try_rotate_stream_data(stream);
830 if (ret < 0) {
831 goto end;
832 }
833
834 ret = try_rotate_stream_index(stream);
835 if (ret < 0) {
836 goto end;
837 }
838 }
839 end:
840 return ret;
841 }
842
843 void try_stream_close(struct relay_stream *stream)
844 {
845 bool session_aborted;
846 struct relay_session *session = stream->trace->session;
847
848 DBG("Trying to close stream %" PRIu64, stream->stream_handle);
849
850 pthread_mutex_lock(&session->lock);
851 session_aborted = session->aborted;
852 pthread_mutex_unlock(&session->lock);
853
854 pthread_mutex_lock(&stream->lock);
855 /*
856 * Can be called concurently by connection close and reception of last
857 * pending data.
858 */
859 if (stream->closed) {
860 pthread_mutex_unlock(&stream->lock);
861 DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
862 return;
863 }
864
865 stream->close_requested = true;
866
867 if (stream->last_net_seq_num == -1ULL) {
868 /*
869 * Handle connection close without explicit stream close
870 * command.
871 *
872 * We can be clever about indexes partially received in
873 * cases where we received the data socket part, but not
874 * the control socket part: since we're currently closing
875 * the stream on behalf of the control socket, we *know*
876 * there won't be any more control information for this
877 * socket. Therefore, we can destroy all indexes for
878 * which we have received only the file descriptor (from
879 * data socket). This takes care of consumerd crashes
880 * between sending the data and control information for
881 * a packet. Since those are sent in that order, we take
882 * care of consumerd crashes.
883 */
884 DBG("relay_index_close_partial_fd");
885 relay_index_close_partial_fd(stream);
886 /*
887 * Use the highest net_seq_num we currently have pending
888 * As end of stream indicator. Leave last_net_seq_num
889 * at -1ULL if we cannot find any index.
890 */
891 stream->last_net_seq_num = relay_index_find_last(stream);
892 DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
893 /* Fall-through into the next check. */
894 }
895
896 if (stream->last_net_seq_num != -1ULL &&
897 ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
898 && !session_aborted) {
899 /*
900 * Don't close since we still have data pending. This
901 * handles cases where an explicit close command has
902 * been received for this stream, and cases where the
903 * connection has been closed, and we are awaiting for
904 * index information from the data socket. It is
905 * therefore expected that all the index fd information
906 * we need has already been received on the control
907 * socket. Matching index information from data socket
908 * should be Expected Soon(TM).
909 *
910 * TODO: We should implement a timer to garbage collect
911 * streams after a timeout to be resilient against a
912 * consumerd implementation that would not match this
913 * expected behavior.
914 */
915 pthread_mutex_unlock(&stream->lock);
916 DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
917 return;
918 }
919 /*
920 * We received all the indexes we can expect.
921 */
922 stream_unpublish(stream);
923 stream->closed = true;
924 /* Relay indexes are only used by the "consumer/sessiond" end. */
925 relay_index_close_all(stream);
926
927 /*
928 * If we are closed by an application exiting (per-pid buffers),
929 * we need to put our reference on the stream trace chunk right
930 * away, because otherwise still holding the reference on the
931 * trace chunk could allow a viewer stream (which holds a reference
932 * to the stream) to postpone destroy waiting for the chunk to cease
933 * to exist endlessly until the viewer is detached.
934 */
935
936 /* Put stream fd before put chunk. */
937 if (stream->stream_fd) {
938 stream_fd_put(stream->stream_fd);
939 stream->stream_fd = NULL;
940 }
941 if (stream->index_file) {
942 lttng_index_file_put(stream->index_file);
943 stream->index_file = NULL;
944 }
945 lttng_trace_chunk_put(stream->trace_chunk);
946 stream->trace_chunk = NULL;
947 pthread_mutex_unlock(&stream->lock);
948 DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
949 stream_put(stream);
950 }
951
952 int stream_init_packet(struct relay_stream *stream, size_t packet_size,
953 bool *file_rotated)
954 {
955 int ret = 0;
956
957 ASSERT_LOCKED(stream->lock);
958 if (caa_likely(stream->tracefile_size == 0)) {
959 /* No size limit set; nothing to check. */
960 goto end;
961 }
962
963 /*
964 * Check if writing the new packet would exceed the maximal file size.
965 */
966 if (caa_unlikely((stream->tracefile_size_current + packet_size) >
967 stream->tracefile_size)) {
968 const uint64_t new_file_index =
969 (stream->tracefile_current_index + 1) %
970 stream->tracefile_count;
971
972 if (new_file_index < stream->tracefile_current_index) {
973 stream->tracefile_wrapped_around = true;
974 }
975 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
976 ", current_file_size = %" PRIu64
977 ", packet_size = %zu, current_file_index = %" PRIu64
978 " new_file_index = %" PRIu64,
979 stream->stream_handle,
980 stream->tracefile_size_current, packet_size,
981 stream->tracefile_current_index, new_file_index);
982 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
983 stream->tracefile_current_index = new_file_index;
984
985 if (stream->stream_fd) {
986 stream_fd_put(stream->stream_fd);
987 stream->stream_fd = NULL;
988 }
989 ret = stream_create_data_output_file_from_trace_chunk(stream,
990 stream->trace_chunk, false, &stream->stream_fd);
991 if (ret) {
992 ERR("Failed to perform trace file rotation of stream %" PRIu64,
993 stream->stream_handle);
994 goto end;
995 }
996
997 /*
998 * Reset current size because we just performed a stream
999 * rotation.
1000 */
1001 stream->tracefile_size_current = 0;
1002 *file_rotated = true;
1003 } else {
1004 *file_rotated = false;
1005 }
1006 end:
1007 return ret;
1008 }
1009
1010 /* Note that the packet is not necessarily complete. */
1011 int stream_write(struct relay_stream *stream,
1012 const struct lttng_buffer_view *packet, size_t padding_len)
1013 {
1014 int ret = 0;
1015 ssize_t write_ret;
1016 size_t padding_to_write = padding_len;
1017 char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
1018
1019 ASSERT_LOCKED(stream->lock);
1020 memset(padding_buffer, 0,
1021 min(sizeof(padding_buffer), padding_to_write));
1022
1023 if (packet) {
1024 write_ret = lttng_write(stream->stream_fd->fd,
1025 packet->data, packet->size);
1026 if (write_ret != packet->size) {
1027 PERROR("Failed to write to stream file of %sstream %" PRIu64,
1028 stream->is_metadata ? "metadata " : "",
1029 stream->stream_handle);
1030 ret = -1;
1031 goto end;
1032 }
1033 }
1034
1035 while (padding_to_write > 0) {
1036 const size_t padding_to_write_this_pass =
1037 min(padding_to_write, sizeof(padding_buffer));
1038
1039 write_ret = lttng_write(stream->stream_fd->fd,
1040 padding_buffer, padding_to_write_this_pass);
1041 if (write_ret != padding_to_write_this_pass) {
1042 PERROR("Failed to write padding to file of %sstream %" PRIu64,
1043 stream->is_metadata ? "metadata " : "",
1044 stream->stream_handle);
1045 ret = -1;
1046 goto end;
1047 }
1048 padding_to_write -= padding_to_write_this_pass;
1049 }
1050
1051 if (stream->is_metadata) {
1052 stream->metadata_received += packet ? packet->size : 0;
1053 stream->metadata_received += padding_len;
1054 }
1055
1056 DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
1057 stream->is_metadata ? "metadata " : "",
1058 stream->stream_handle,
1059 packet ? packet->size : (size_t) 0, padding_len);
1060 end:
1061 return ret;
1062 }
1063
1064 /*
1065 * Update index after receiving a packet for a data stream.
1066 *
1067 * Called with the stream lock held.
1068 *
1069 * Return 0 on success else a negative value.
1070 */
1071 int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
1072 bool rotate_index, bool *flushed, uint64_t total_size)
1073 {
1074 int ret = 0;
1075 uint64_t data_offset;
1076 struct relay_index *index;
1077
1078 assert(stream->trace_chunk);
1079 ASSERT_LOCKED(stream->lock);
1080 /* Get data offset because we are about to update the index. */
1081 data_offset = htobe64(stream->tracefile_size_current);
1082
1083 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
1084 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
1085
1086 /*
1087 * Lookup for an existing index for that stream id/sequence
1088 * number. If it exists, the control thread has already received the
1089 * data for it, thus we need to write it to disk.
1090 */
1091 index = relay_index_get_by_id_or_create(stream, net_seq_num);
1092 if (!index) {
1093 ret = -1;
1094 goto end;
1095 }
1096
1097 if (rotate_index || !stream->index_file) {
1098 ret = create_index_file(stream, stream->trace_chunk);
1099 if (ret) {
1100 ERR("Failed to create index file for stream %" PRIu64,
1101 stream->stream_handle);
1102 /* Put self-ref for this index due to error. */
1103 relay_index_put(index);
1104 index = NULL;
1105 goto end;
1106 }
1107 }
1108
1109 if (relay_index_set_file(index, stream->index_file, data_offset)) {
1110 ret = -1;
1111 /* Put self-ref for this index due to error. */
1112 relay_index_put(index);
1113 index = NULL;
1114 goto end;
1115 }
1116
1117 ret = relay_index_try_flush(index);
1118 if (ret == 0) {
1119 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
1120 tracefile_array_commit_seq(stream->tfa);
1121 stream->index_received_seqcount++;
1122 *flushed = true;
1123 } else if (ret > 0) {
1124 index->total_size = total_size;
1125 /* No flush. */
1126 ret = 0;
1127 } else {
1128 /*
1129 * ret < 0
1130 *
1131 * relay_index_try_flush is responsible for the self-reference
1132 * put of the index object on error.
1133 */
1134 ERR("relay_index_try_flush error %d", ret);
1135 ret = -1;
1136 }
1137 end:
1138 return ret;
1139 }
1140
1141 int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
1142 uint64_t sequence_number, bool index_flushed)
1143 {
1144 int ret = 0;
1145
1146 ASSERT_LOCKED(stream->lock);
1147
1148 stream->tracefile_size_current += packet_total_size;
1149 if (index_flushed) {
1150 stream->pos_after_last_complete_data_index =
1151 stream->tracefile_size_current;
1152 stream->prev_index_seq = sequence_number;
1153 ret = try_rotate_stream_index(stream);
1154 if (ret < 0) {
1155 goto end;
1156 }
1157 }
1158
1159 stream->prev_data_seq = sequence_number;
1160 ret = try_rotate_stream_data(stream);
1161
1162 end:
1163 return ret;
1164 }
1165
1166 int stream_add_index(struct relay_stream *stream,
1167 const struct lttcomm_relayd_index *index_info)
1168 {
1169 int ret = 0;
1170 struct relay_index *index;
1171
1172 ASSERT_LOCKED(stream->lock);
1173
1174 /* Live beacon handling */
1175 if (index_info->packet_size == 0) {
1176 DBG("Received live beacon for stream %" PRIu64,
1177 stream->stream_handle);
1178
1179 /*
1180 * Only flag a stream inactive when it has already
1181 * received data and no indexes are in flight.
1182 */
1183 if (stream->index_received_seqcount > 0
1184 && stream->indexes_in_flight == 0) {
1185 stream->beacon_ts_end = index_info->timestamp_end;
1186 }
1187 ret = 0;
1188 goto end;
1189 } else {
1190 stream->beacon_ts_end = -1ULL;
1191 }
1192
1193 if (stream->ctf_stream_id == -1ULL) {
1194 stream->ctf_stream_id = index_info->stream_id;
1195 }
1196
1197 index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
1198 if (!index) {
1199 ret = -1;
1200 ERR("Failed to get or create index %" PRIu64,
1201 index_info->net_seq_num);
1202 goto end;
1203 }
1204 if (relay_index_set_control_data(index, index_info,
1205 stream->trace->session->minor)) {
1206 ERR("set_index_control_data error");
1207 relay_index_put(index);
1208 ret = -1;
1209 goto end;
1210 }
1211 ret = relay_index_try_flush(index);
1212 if (ret == 0) {
1213 tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
1214 tracefile_array_commit_seq(stream->tfa);
1215 stream->index_received_seqcount++;
1216 stream->pos_after_last_complete_data_index += index->total_size;
1217 stream->prev_index_seq = index_info->net_seq_num;
1218
1219 ret = try_rotate_stream_index(stream);
1220 if (ret < 0) {
1221 goto end;
1222 }
1223 } else if (ret > 0) {
1224 /* no flush. */
1225 ret = 0;
1226 } else {
1227 /*
1228 * ret < 0
1229 *
1230 * relay_index_try_flush is responsible for the self-reference
1231 * put of the index object on error.
1232 */
1233 ERR("relay_index_try_flush error %d", ret);
1234 ret = -1;
1235 }
1236 end:
1237 return ret;
1238 }
1239
1240 static void print_stream_indexes(struct relay_stream *stream)
1241 {
1242 struct lttng_ht_iter iter;
1243 struct relay_index *index;
1244
1245 rcu_read_lock();
1246 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
1247 index_n.node) {
1248 DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
1249 " stream %" PRIu64 " trace %" PRIu64
1250 " session %" PRIu64,
1251 index,
1252 index->index_n.key,
1253 stream->ref.refcount,
1254 index->stream->stream_handle,
1255 index->stream->trace->id,
1256 index->stream->trace->session->id);
1257 }
1258 rcu_read_unlock();
1259 }
1260
1261 int stream_reset_file(struct relay_stream *stream)
1262 {
1263 ASSERT_LOCKED(stream->lock);
1264
1265 if (stream->stream_fd) {
1266 stream_fd_put(stream->stream_fd);
1267 stream->stream_fd = NULL;
1268 }
1269
1270 stream->tracefile_size_current = 0;
1271 stream->prev_data_seq = 0;
1272 stream->prev_index_seq = 0;
1273 /* Note that this does not reset the tracefile array. */
1274 stream->tracefile_current_index = 0;
1275 stream->pos_after_last_complete_data_index = 0;
1276
1277 return stream_create_data_output_file_from_trace_chunk(stream,
1278 stream->trace_chunk, true, &stream->stream_fd);
1279 }
1280
1281 void print_relay_streams(void)
1282 {
1283 struct lttng_ht_iter iter;
1284 struct relay_stream *stream;
1285
1286 if (!relay_streams_ht) {
1287 return;
1288 }
1289
1290 rcu_read_lock();
1291 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
1292 node.node) {
1293 if (!stream_get(stream)) {
1294 continue;
1295 }
1296 DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
1297 " session %" PRIu64,
1298 stream,
1299 stream->ref.refcount,
1300 stream->stream_handle,
1301 stream->trace->id,
1302 stream->trace->session->id);
1303 print_stream_indexes(stream);
1304 stream_put(stream);
1305 }
1306 rcu_read_unlock();
1307 }
This page took 0.058166 seconds and 4 git commands to generate.