relayd: fix: rotate_truncate_stream() assumes non-null next chunk
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
1 /*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #define _LGPL_SOURCE
22 #include <common/common.h>
23 #include <common/utils.h>
24 #include <common/defaults.h>
25 #include <common/sessiond-comm/relayd.h>
26 #include <urcu/rculist.h>
27 #include <sys/stat.h>
28
29 #include "lttng-relayd.h"
30 #include "index.h"
31 #include "stream.h"
32 #include "viewer-stream.h"
33
34 #include <sys/types.h>
35 #include <fcntl.h>
36
37 #define FILE_IO_STACK_BUFFER_SIZE 65536
38
39 /* Should be called with RCU read-side lock held. */
40 bool stream_get(struct relay_stream *stream)
41 {
42 return urcu_ref_get_unless_zero(&stream->ref);
43 }
44
45 /*
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
49 */
50 struct relay_stream *stream_get_by_id(uint64_t stream_id)
51 {
52 struct lttng_ht_node_u64 *node;
53 struct lttng_ht_iter iter;
54 struct relay_stream *stream = NULL;
55
56 rcu_read_lock();
57 lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
58 node = lttng_ht_iter_get_node_u64(&iter);
59 if (!node) {
60 DBG("Relay stream %" PRIu64 " not found", stream_id);
61 goto end;
62 }
63 stream = caa_container_of(node, struct relay_stream, node);
64 if (!stream_get(stream)) {
65 stream = NULL;
66 }
67 end:
68 rcu_read_unlock();
69 return stream;
70 }
71
72 static void stream_complete_rotation(struct relay_stream *stream)
73 {
74 DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
75 lttng_trace_chunk_put(stream->trace_chunk);
76 stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
77 stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
78 }
79
80 static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream *stream,
82 struct lttng_trace_chunk *trace_chunk,
83 bool force_unlink,
84 struct stream_fd **out_stream_fd)
85 {
86 int ret, fd;
87 char stream_path[LTTNG_PATH_MAX];
88 enum lttng_trace_chunk_status status;
89 const int flags = O_RDWR | O_CREAT | O_TRUNC;
90 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
91
92 ASSERT_LOCKED(stream->lock);
93 assert(stream->trace_chunk);
94
95 ret = utils_stream_file_path(stream->path_name, stream->channel_name,
96 stream->tracefile_size, stream->tracefile_current_index,
97 NULL, stream_path, sizeof(stream_path));
98 if (ret < 0) {
99 goto end;
100 }
101
102 if (stream->tracefile_wrapped_around || force_unlink) {
103 /*
104 * The on-disk ring-buffer has wrapped around.
105 * Newly created stream files will replace existing files. Since
106 * live clients may be consuming existing files, the file about
107 * to be replaced is unlinked in order to not overwrite its
108 * content.
109 */
110 status = lttng_trace_chunk_unlink_file(trace_chunk,
111 stream_path);
112 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
113 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
114 stream_path);
115 /*
116 * Don't abort if the file doesn't exist, it is
117 * unexpected, but should not be a fatal error.
118 */
119 if (errno != ENOENT) {
120 ret = -1;
121 goto end;
122 }
123 }
124 }
125
126 status = lttng_trace_chunk_open_file(
127 trace_chunk, stream_path, flags, mode, &fd);
128 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
129 ERR("Failed to open stream file \"%s\"", stream->channel_name);
130 ret = -1;
131 goto end;
132 }
133
134 *out_stream_fd = stream_fd_create(fd);
135 if (!*out_stream_fd) {
136 if (close(ret)) {
137 PERROR("Error closing stream file descriptor %d", ret);
138 }
139 ret = -1;
140 goto end;
141 }
142 end:
143 return ret;
144 }
145
146 static int stream_rotate_data_file(struct relay_stream *stream)
147 {
148 int ret = 0;
149
150 DBG("Rotating stream %" PRIu64 " data file",
151 stream->stream_handle);
152
153 if (stream->stream_fd) {
154 stream_fd_put(stream->stream_fd);
155 stream->stream_fd = NULL;
156 }
157
158 stream->tracefile_wrapped_around = false;
159 stream->tracefile_current_index = 0;
160
161 if (stream->ongoing_rotation.value.next_trace_chunk) {
162 struct stream_fd *new_stream_fd = NULL;
163 enum lttng_trace_chunk_status chunk_status;
164
165 chunk_status = lttng_trace_chunk_create_subdirectory(
166 stream->ongoing_rotation.value.next_trace_chunk,
167 stream->path_name);
168 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
169 ret = -1;
170 goto end;
171 }
172
173 /* Rotate the data file. */
174 ret = stream_create_data_output_file_from_trace_chunk(stream,
175 stream->ongoing_rotation.value.next_trace_chunk,
176 false, &new_stream_fd);
177 stream->stream_fd = new_stream_fd;
178 if (ret < 0) {
179 ERR("Failed to rotate stream data file");
180 goto end;
181 }
182 }
183 stream->tracefile_size_current = 0;
184 stream->pos_after_last_complete_data_index = 0;
185 stream->ongoing_rotation.value.data_rotated = true;
186
187 if (stream->ongoing_rotation.value.index_rotated) {
188 /* Rotation completed; reset its state. */
189 stream_complete_rotation(stream);
190 }
191 end:
192 return ret;
193 }
194
195 /*
196 * If too much data has been written in a tracefile before we received the
197 * rotation command, we have to move the excess data to the new tracefile and
198 * perform the rotation. This can happen because the control and data
199 * connections are separate, the indexes as well as the commands arrive from
200 * the control connection and we have no control over the order so we could be
201 * in a situation where too much data has been received on the data connection
202 * before the rotation command on the control connection arrives.
203 */
204 static int rotate_truncate_stream(struct relay_stream *stream)
205 {
206 int ret;
207 off_t lseek_ret, previous_stream_copy_origin;
208 uint64_t copy_bytes_left, misplaced_data_size;
209 bool acquired_reference;
210 struct stream_fd *previous_stream_fd = NULL;
211 struct lttng_trace_chunk *previous_chunk = NULL;
212
213 if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
214 ERR("Protocol error encoutered in %s(): stream rotation "
215 "sequence number is before the current sequence number "
216 "and the next trace chunk is unset. Honoring this "
217 "rotation command would result in data loss",
218 __FUNCTION__);
219 ret = -1;
220 goto end;
221 }
222
223 ASSERT_LOCKED(stream->lock);
224 /*
225 * Acquire a reference to the current trace chunk to ensure
226 * it is not reclaimed when `stream_rotate_data_file` is called.
227 * Failing to do so would violate the contract of the trace
228 * chunk API as an active file descriptor would outlive the
229 * trace chunk.
230 */
231 acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
232 assert(acquired_reference);
233 previous_chunk = stream->trace_chunk;
234
235 /*
236 * Steal the stream's reference to its stream_fd. A new
237 * stream_fd will be created when the rotation completes and
238 * the orinal stream_fd will be used to copy the "extra" data
239 * to the new file.
240 */
241 assert(stream->stream_fd);
242 previous_stream_fd = stream->stream_fd;
243 stream->stream_fd = NULL;
244
245 assert(!stream->is_metadata);
246 assert(stream->tracefile_size_current >
247 stream->pos_after_last_complete_data_index);
248 misplaced_data_size = stream->tracefile_size_current -
249 stream->pos_after_last_complete_data_index;
250 copy_bytes_left = misplaced_data_size;
251 previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
252
253 ret = stream_rotate_data_file(stream);
254 if (ret) {
255 goto end;
256 }
257
258 assert(stream->stream_fd);
259 /*
260 * Seek the current tracefile to the position at which the rotation
261 * should have occurred.
262 */
263 lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
264 SEEK_SET);
265 if (lseek_ret < 0) {
266 PERROR("Failed to seek to offset %" PRIu64
267 " while copying extra data received before a stream rotation",
268 (uint64_t) previous_stream_copy_origin);
269 ret = -1;
270 goto end;
271 }
272
273 /* Move data from the old file to the new file. */
274 while (copy_bytes_left) {
275 ssize_t io_ret;
276 char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
277 const off_t copy_size_this_pass = min_t(
278 off_t, copy_bytes_left, sizeof(copy_buffer));
279
280 io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
281 copy_size_this_pass);
282 if (io_ret < (ssize_t) copy_size_this_pass) {
283 if (io_ret == -1) {
284 PERROR("Failed to read %" PRIu64
285 " bytes from fd %i in %s(), returned %zi",
286 copy_size_this_pass,
287 previous_stream_fd->fd,
288 __FUNCTION__, io_ret);
289 } else {
290 ERR("Failed to read %" PRIu64
291 " bytes from fd %i in %s(), returned %zi",
292 copy_size_this_pass,
293 previous_stream_fd->fd,
294 __FUNCTION__, io_ret);
295 }
296 ret = -1;
297 goto end;
298 }
299
300 io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
301 copy_size_this_pass);
302 if (io_ret < (ssize_t) copy_size_this_pass) {
303 if (io_ret == -1) {
304 PERROR("Failed to write %" PRIu64
305 " bytes from fd %i in %s(), returned %zi",
306 copy_size_this_pass,
307 stream->stream_fd->fd,
308 __FUNCTION__, io_ret);
309 } else {
310 ERR("Failed to write %" PRIu64
311 " bytes from fd %i in %s(), returned %zi",
312 copy_size_this_pass,
313 stream->stream_fd->fd,
314 __FUNCTION__, io_ret);
315 }
316 ret = -1;
317 goto end;
318 }
319 copy_bytes_left -= copy_size_this_pass;
320 }
321
322 /* Truncate the file to get rid of the excess data. */
323 ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
324 if (ret) {
325 PERROR("Failed to truncate current stream file to offset %" PRIu64,
326 previous_stream_copy_origin);
327 goto end;
328 }
329
330 /*
331 * Update the offset and FD of all the eventual indexes created by the
332 * data connection before the rotation command arrived.
333 */
334 ret = relay_index_switch_all_files(stream);
335 if (ret < 0) {
336 ERR("Failed to rotate index file");
337 goto end;
338 }
339
340 stream->tracefile_size_current = misplaced_data_size;
341 /* Index and data contents are back in sync. */
342 stream->pos_after_last_complete_data_index = 0;
343 ret = 0;
344 end:
345 lttng_trace_chunk_put(previous_chunk);
346 stream_fd_put(previous_stream_fd);
347 return ret;
348 }
349
350 /*
351 * Check if a stream's data file (as opposed to index) should be rotated
352 * (for session rotation).
353 * Must be called with the stream lock held.
354 *
355 * Return 0 on success, a negative value on error.
356 */
357 static int try_rotate_stream_data(struct relay_stream *stream)
358 {
359 int ret = 0;
360
361 if (caa_likely(!stream->ongoing_rotation.is_set)) {
362 /* No rotation expected. */
363 goto end;
364 }
365
366 if (stream->ongoing_rotation.value.data_rotated) {
367 /* Rotation of the data file has already occurred. */
368 goto end;
369 }
370
371 if (stream->prev_data_seq == -1ULL ||
372 stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
373 /*
374 * The next packet that will be written is not part of the next
375 * chunk yet.
376 */
377 DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
378 ", prev_data_seq = %" PRIu64 ")",
379 stream->stream_handle,
380 stream->ongoing_rotation.value.seq_num,
381 stream->prev_data_seq);
382 goto end;
383 } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
384 /*
385 * prev_data_seq is checked here since indexes and rotation
386 * commands are serialized with respect to each other.
387 */
388 DBG("Rotation after too much data has been written in tracefile "
389 "for stream %" PRIu64 ", need to truncate before "
390 "rotating", stream->stream_handle);
391 ret = rotate_truncate_stream(stream);
392 if (ret) {
393 ERR("Failed to truncate stream");
394 goto end;
395 }
396 } else {
397 ret = stream_rotate_data_file(stream);
398 }
399
400 end:
401 return ret;
402 }
403
404 /*
405 * Close the current index file if it is open, and create a new one.
406 *
407 * Return 0 on success, -1 on error.
408 */
409 static int create_index_file(struct relay_stream *stream,
410 struct lttng_trace_chunk *chunk)
411 {
412 int ret;
413 uint32_t major, minor;
414 char *index_subpath = NULL;
415 enum lttng_trace_chunk_status status;
416
417 ASSERT_LOCKED(stream->lock);
418
419 /* Put ref on previous index_file. */
420 if (stream->index_file) {
421 lttng_index_file_put(stream->index_file);
422 stream->index_file = NULL;
423 }
424 major = stream->trace->session->major;
425 minor = stream->trace->session->minor;
426
427 if (!chunk) {
428 ret = 0;
429 goto end;
430 }
431 ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
432 DEFAULT_INDEX_DIR);
433 if (ret < 0) {
434 goto end;
435 }
436
437 status = lttng_trace_chunk_create_subdirectory(chunk,
438 index_subpath);
439 free(index_subpath);
440 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
441 ret = -1;
442 goto end;
443 }
444 stream->index_file = lttng_index_file_create_from_trace_chunk(
445 chunk, stream->path_name,
446 stream->channel_name, stream->tracefile_size,
447 stream->tracefile_current_index,
448 lttng_to_index_major(major, minor),
449 lttng_to_index_minor(major, minor), true);
450 if (!stream->index_file) {
451 ret = -1;
452 goto end;
453 }
454
455 ret = 0;
456
457 end:
458 return ret;
459 }
460
461 /*
462 * Check if a stream's index file should be rotated (for session rotation).
463 * Must be called with the stream lock held.
464 *
465 * Return 0 on success, a negative value on error.
466 */
467 static int try_rotate_stream_index(struct relay_stream *stream)
468 {
469 int ret = 0;
470
471 if (!stream->ongoing_rotation.is_set) {
472 /* No rotation expected. */
473 goto end;
474 }
475
476 if (stream->ongoing_rotation.value.index_rotated) {
477 /* Rotation of the index has already occurred. */
478 goto end;
479 }
480
481 if (stream->prev_index_seq == -1ULL ||
482 stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
483 DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
484 stream->stream_handle,
485 stream->ongoing_rotation.value.seq_num,
486 stream->prev_index_seq);
487 goto end;
488 } else {
489 /* The next index belongs to the new trace chunk; rotate. */
490 assert(stream->prev_index_seq + 1 ==
491 stream->ongoing_rotation.value.seq_num);
492 DBG("Rotating stream %" PRIu64 " index file",
493 stream->stream_handle);
494 ret = create_index_file(stream,
495 stream->ongoing_rotation.value.next_trace_chunk);
496 stream->ongoing_rotation.value.index_rotated = true;
497
498 if (stream->ongoing_rotation.value.data_rotated &&
499 stream->ongoing_rotation.value.index_rotated) {
500 /* Rotation completed; reset its state. */
501 DBG("Rotation completed for stream %" PRIu64,
502 stream->stream_handle);
503 stream_complete_rotation(stream);
504 }
505 }
506
507 end:
508 return ret;
509 }
510
511 static int stream_set_trace_chunk(struct relay_stream *stream,
512 struct lttng_trace_chunk *chunk)
513 {
514 int ret = 0;
515 enum lttng_trace_chunk_status status;
516 bool acquired_reference;
517 struct stream_fd *new_stream_fd = NULL;
518
519 status = lttng_trace_chunk_create_subdirectory(chunk,
520 stream->path_name);
521 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
522 ret = -1;
523 goto end;
524 }
525
526 lttng_trace_chunk_put(stream->trace_chunk);
527 acquired_reference = lttng_trace_chunk_get(chunk);
528 assert(acquired_reference);
529 stream->trace_chunk = chunk;
530
531 if (stream->stream_fd) {
532 stream_fd_put(stream->stream_fd);
533 stream->stream_fd = NULL;
534 }
535 ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
536 false, &new_stream_fd);
537 stream->stream_fd = new_stream_fd;
538 end:
539 return ret;
540 }
541
542 /*
543 * We keep ownership of path_name and channel_name.
544 */
545 struct relay_stream *stream_create(struct ctf_trace *trace,
546 uint64_t stream_handle, char *path_name,
547 char *channel_name, uint64_t tracefile_size,
548 uint64_t tracefile_count)
549 {
550 int ret;
551 struct relay_stream *stream = NULL;
552 struct relay_session *session = trace->session;
553 bool acquired_reference = false;
554 struct lttng_trace_chunk *current_trace_chunk;
555
556 stream = zmalloc(sizeof(struct relay_stream));
557 if (stream == NULL) {
558 PERROR("relay stream zmalloc");
559 goto error_no_alloc;
560 }
561
562 stream->stream_handle = stream_handle;
563 stream->prev_data_seq = -1ULL;
564 stream->prev_index_seq = -1ULL;
565 stream->last_net_seq_num = -1ULL;
566 stream->ctf_stream_id = -1ULL;
567 stream->tracefile_size = tracefile_size;
568 stream->tracefile_count = tracefile_count;
569 stream->path_name = path_name;
570 stream->channel_name = channel_name;
571 stream->beacon_ts_end = -1ULL;
572 lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
573 pthread_mutex_init(&stream->lock, NULL);
574 urcu_ref_init(&stream->ref);
575 ctf_trace_get(trace);
576 stream->trace = trace;
577
578 pthread_mutex_lock(&trace->session->lock);
579 current_trace_chunk = trace->session->current_trace_chunk;
580 if (current_trace_chunk) {
581 acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
582 }
583 pthread_mutex_unlock(&trace->session->lock);
584 if (!acquired_reference) {
585 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
586 channel_name);
587 ret = -1;
588 goto end;
589 }
590
591 stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
592 if (!stream->indexes_ht) {
593 ERR("Cannot created indexes_ht");
594 ret = -1;
595 goto end;
596 }
597
598 pthread_mutex_lock(&stream->lock);
599 ret = stream_set_trace_chunk(stream, current_trace_chunk);
600 pthread_mutex_unlock(&stream->lock);
601 if (ret) {
602 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
603 trace->session->session_name,
604 stream->channel_name);
605 ret = -1;
606 goto end;
607 }
608 stream->tfa = tracefile_array_create(stream->tracefile_count);
609 if (!stream->tfa) {
610 ret = -1;
611 goto end;
612 }
613
614 stream->is_metadata = !strcmp(stream->channel_name,
615 DEFAULT_METADATA_NAME);
616 stream->in_recv_list = true;
617
618 /*
619 * Add the stream in the recv list of the session. Once the end stream
620 * message is received, all session streams are published.
621 */
622 pthread_mutex_lock(&session->recv_list_lock);
623 cds_list_add_rcu(&stream->recv_node, &session->recv_list);
624 session->stream_count++;
625 pthread_mutex_unlock(&session->recv_list_lock);
626
627 /*
628 * Both in the ctf_trace object and the global stream ht since the data
629 * side of the relayd does not have the concept of session.
630 */
631 lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
632 stream->in_stream_ht = true;
633
634 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
635 stream->stream_handle);
636 ret = 0;
637
638 end:
639 if (ret) {
640 if (stream->stream_fd) {
641 stream_fd_put(stream->stream_fd);
642 stream->stream_fd = NULL;
643 }
644 stream_put(stream);
645 stream = NULL;
646 }
647 lttng_trace_chunk_put(current_trace_chunk);
648 return stream;
649
650 error_no_alloc:
651 /*
652 * path_name and channel_name need to be freed explicitly here
653 * because we cannot rely on stream_put().
654 */
655 free(path_name);
656 free(channel_name);
657 return NULL;
658 }
659
660 /*
661 * Called with the session lock held.
662 */
663 void stream_publish(struct relay_stream *stream)
664 {
665 struct relay_session *session;
666
667 pthread_mutex_lock(&stream->lock);
668 if (stream->published) {
669 goto unlock;
670 }
671
672 session = stream->trace->session;
673
674 pthread_mutex_lock(&session->recv_list_lock);
675 if (stream->in_recv_list) {
676 cds_list_del_rcu(&stream->recv_node);
677 stream->in_recv_list = false;
678 }
679 pthread_mutex_unlock(&session->recv_list_lock);
680
681 pthread_mutex_lock(&stream->trace->stream_list_lock);
682 cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
683 pthread_mutex_unlock(&stream->trace->stream_list_lock);
684
685 stream->published = true;
686 unlock:
687 pthread_mutex_unlock(&stream->lock);
688 }
689
690 /*
691 * Stream must be protected by holding the stream lock or by virtue of being
692 * called from stream_destroy.
693 */
694 static void stream_unpublish(struct relay_stream *stream)
695 {
696 if (stream->in_stream_ht) {
697 struct lttng_ht_iter iter;
698 int ret;
699
700 iter.iter.node = &stream->node.node;
701 ret = lttng_ht_del(relay_streams_ht, &iter);
702 assert(!ret);
703 stream->in_stream_ht = false;
704 }
705 if (stream->published) {
706 pthread_mutex_lock(&stream->trace->stream_list_lock);
707 cds_list_del_rcu(&stream->stream_node);
708 pthread_mutex_unlock(&stream->trace->stream_list_lock);
709 stream->published = false;
710 }
711 }
712
713 static void stream_destroy(struct relay_stream *stream)
714 {
715 if (stream->indexes_ht) {
716 /*
717 * Calling lttng_ht_destroy in call_rcu worker thread so
718 * we don't hold the RCU read-side lock while calling
719 * it.
720 */
721 lttng_ht_destroy(stream->indexes_ht);
722 }
723 if (stream->tfa) {
724 tracefile_array_destroy(stream->tfa);
725 }
726 free(stream->path_name);
727 free(stream->channel_name);
728 free(stream);
729 }
730
731 static void stream_destroy_rcu(struct rcu_head *rcu_head)
732 {
733 struct relay_stream *stream =
734 caa_container_of(rcu_head, struct relay_stream, rcu_node);
735
736 stream_destroy(stream);
737 }
738
739 /*
740 * No need to take stream->lock since this is only called on the final
741 * stream_put which ensures that a single thread may act on the stream.
742 */
743 static void stream_release(struct urcu_ref *ref)
744 {
745 struct relay_stream *stream =
746 caa_container_of(ref, struct relay_stream, ref);
747 struct relay_session *session;
748
749 session = stream->trace->session;
750
751 DBG("Releasing stream id %" PRIu64, stream->stream_handle);
752
753 pthread_mutex_lock(&session->recv_list_lock);
754 session->stream_count--;
755 if (stream->in_recv_list) {
756 cds_list_del_rcu(&stream->recv_node);
757 stream->in_recv_list = false;
758 }
759 pthread_mutex_unlock(&session->recv_list_lock);
760
761 stream_unpublish(stream);
762
763 if (stream->stream_fd) {
764 stream_fd_put(stream->stream_fd);
765 stream->stream_fd = NULL;
766 }
767 if (stream->index_file) {
768 lttng_index_file_put(stream->index_file);
769 stream->index_file = NULL;
770 }
771 if (stream->trace) {
772 ctf_trace_put(stream->trace);
773 stream->trace = NULL;
774 }
775 stream_complete_rotation(stream);
776 lttng_trace_chunk_put(stream->trace_chunk);
777 stream->trace_chunk = NULL;
778
779 call_rcu(&stream->rcu_node, stream_destroy_rcu);
780 }
781
782 void stream_put(struct relay_stream *stream)
783 {
784 rcu_read_lock();
785 assert(stream->ref.refcount != 0);
786 /*
787 * Wait until we have processed all the stream packets before
788 * actually putting our last stream reference.
789 */
790 urcu_ref_put(&stream->ref, stream_release);
791 rcu_read_unlock();
792 }
793
794 int stream_set_pending_rotation(struct relay_stream *stream,
795 struct lttng_trace_chunk *next_trace_chunk,
796 uint64_t rotation_sequence_number)
797 {
798 int ret = 0;
799 const struct relay_stream_rotation rotation = {
800 .seq_num = rotation_sequence_number,
801 .next_trace_chunk = next_trace_chunk,
802 };
803
804 if (stream->ongoing_rotation.is_set) {
805 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
806 ret = -1;
807 goto end;
808 }
809
810 if (next_trace_chunk) {
811 const bool reference_acquired =
812 lttng_trace_chunk_get(next_trace_chunk);
813
814 assert(reference_acquired);
815 }
816 LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
817
818 DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
819 stream->stream_handle, rotation_sequence_number);
820 if (stream->is_metadata) {
821 /*
822 * A metadata stream has no index; consider it already rotated.
823 */
824 stream->ongoing_rotation.value.index_rotated = true;
825 ret = stream_rotate_data_file(stream);
826 } else {
827 ret = try_rotate_stream_data(stream);
828 if (ret < 0) {
829 goto end;
830 }
831
832 ret = try_rotate_stream_index(stream);
833 if (ret < 0) {
834 goto end;
835 }
836 }
837 end:
838 return ret;
839 }
840
841 void try_stream_close(struct relay_stream *stream)
842 {
843 bool session_aborted;
844 struct relay_session *session = stream->trace->session;
845
846 DBG("Trying to close stream %" PRIu64, stream->stream_handle);
847
848 pthread_mutex_lock(&session->lock);
849 session_aborted = session->aborted;
850 pthread_mutex_unlock(&session->lock);
851
852 pthread_mutex_lock(&stream->lock);
853 /*
854 * Can be called concurently by connection close and reception of last
855 * pending data.
856 */
857 if (stream->closed) {
858 pthread_mutex_unlock(&stream->lock);
859 DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
860 return;
861 }
862
863 stream->close_requested = true;
864
865 if (stream->last_net_seq_num == -1ULL) {
866 /*
867 * Handle connection close without explicit stream close
868 * command.
869 *
870 * We can be clever about indexes partially received in
871 * cases where we received the data socket part, but not
872 * the control socket part: since we're currently closing
873 * the stream on behalf of the control socket, we *know*
874 * there won't be any more control information for this
875 * socket. Therefore, we can destroy all indexes for
876 * which we have received only the file descriptor (from
877 * data socket). This takes care of consumerd crashes
878 * between sending the data and control information for
879 * a packet. Since those are sent in that order, we take
880 * care of consumerd crashes.
881 */
882 DBG("relay_index_close_partial_fd");
883 relay_index_close_partial_fd(stream);
884 /*
885 * Use the highest net_seq_num we currently have pending
886 * As end of stream indicator. Leave last_net_seq_num
887 * at -1ULL if we cannot find any index.
888 */
889 stream->last_net_seq_num = relay_index_find_last(stream);
890 DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
891 /* Fall-through into the next check. */
892 }
893
894 if (stream->last_net_seq_num != -1ULL &&
895 ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
896 && !session_aborted) {
897 /*
898 * Don't close since we still have data pending. This
899 * handles cases where an explicit close command has
900 * been received for this stream, and cases where the
901 * connection has been closed, and we are awaiting for
902 * index information from the data socket. It is
903 * therefore expected that all the index fd information
904 * we need has already been received on the control
905 * socket. Matching index information from data socket
906 * should be Expected Soon(TM).
907 *
908 * TODO: We should implement a timer to garbage collect
909 * streams after a timeout to be resilient against a
910 * consumerd implementation that would not match this
911 * expected behavior.
912 */
913 pthread_mutex_unlock(&stream->lock);
914 DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
915 return;
916 }
917 /*
918 * We received all the indexes we can expect.
919 */
920 stream_unpublish(stream);
921 stream->closed = true;
922 /* Relay indexes are only used by the "consumer/sessiond" end. */
923 relay_index_close_all(stream);
924 pthread_mutex_unlock(&stream->lock);
925 DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
926 stream_put(stream);
927 }
928
929 int stream_init_packet(struct relay_stream *stream, size_t packet_size,
930 bool *file_rotated)
931 {
932 int ret = 0;
933
934 ASSERT_LOCKED(stream->lock);
935 if (caa_likely(stream->tracefile_size == 0)) {
936 /* No size limit set; nothing to check. */
937 goto end;
938 }
939
940 /*
941 * Check if writing the new packet would exceed the maximal file size.
942 */
943 if (caa_unlikely((stream->tracefile_size_current + packet_size) >
944 stream->tracefile_size)) {
945 const uint64_t new_file_index =
946 (stream->tracefile_current_index + 1) %
947 stream->tracefile_count;
948
949 if (new_file_index < stream->tracefile_current_index) {
950 stream->tracefile_wrapped_around = true;
951 }
952 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
953 ", current_file_size = %" PRIu64
954 ", packet_size = %zu, current_file_index = %" PRIu64
955 " new_file_index = %" PRIu64,
956 stream->stream_handle,
957 stream->tracefile_size_current, packet_size,
958 stream->tracefile_current_index, new_file_index);
959 tracefile_array_file_rotate(stream->tfa);
960 stream->tracefile_current_index = new_file_index;
961
962 if (stream->stream_fd) {
963 stream_fd_put(stream->stream_fd);
964 stream->stream_fd = NULL;
965 }
966 ret = stream_create_data_output_file_from_trace_chunk(stream,
967 stream->trace_chunk, false, &stream->stream_fd);
968 if (ret) {
969 ERR("Failed to perform trace file rotation of stream %" PRIu64,
970 stream->stream_handle);
971 goto end;
972 }
973
974 /*
975 * Reset current size because we just performed a stream
976 * rotation.
977 */
978 stream->tracefile_size_current = 0;
979 *file_rotated = true;
980 } else {
981 *file_rotated = false;
982 }
983 end:
984 return ret;
985 }
986
987 /* Note that the packet is not necessarily complete. */
988 int stream_write(struct relay_stream *stream,
989 const struct lttng_buffer_view *packet, size_t padding_len)
990 {
991 int ret = 0;
992 ssize_t write_ret;
993 size_t padding_to_write = padding_len;
994 char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
995
996 ASSERT_LOCKED(stream->lock);
997 memset(padding_buffer, 0,
998 min(sizeof(padding_buffer), padding_to_write));
999
1000 if (packet) {
1001 write_ret = lttng_write(stream->stream_fd->fd,
1002 packet->data, packet->size);
1003 if (write_ret != packet->size) {
1004 PERROR("Failed to write to stream file of %sstream %" PRIu64,
1005 stream->is_metadata ? "metadata " : "",
1006 stream->stream_handle);
1007 ret = -1;
1008 goto end;
1009 }
1010 }
1011
1012 while (padding_to_write > 0) {
1013 const size_t padding_to_write_this_pass =
1014 min(padding_to_write, sizeof(padding_buffer));
1015
1016 write_ret = lttng_write(stream->stream_fd->fd,
1017 padding_buffer, padding_to_write_this_pass);
1018 if (write_ret != padding_to_write_this_pass) {
1019 PERROR("Failed to write padding to file of %sstream %" PRIu64,
1020 stream->is_metadata ? "metadata " : "",
1021 stream->stream_handle);
1022 ret = -1;
1023 goto end;
1024 }
1025 padding_to_write -= padding_to_write_this_pass;
1026 }
1027
1028 if (stream->is_metadata) {
1029 stream->metadata_received += packet ? packet->size : 0;
1030 stream->metadata_received += padding_len;
1031 }
1032
1033 DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
1034 stream->is_metadata ? "metadata " : "",
1035 stream->stream_handle,
1036 packet ? packet->size : (size_t) 0, padding_len);
1037 end:
1038 return ret;
1039 }
1040
1041 /*
1042 * Update index after receiving a packet for a data stream.
1043 *
1044 * Called with the stream lock held.
1045 *
1046 * Return 0 on success else a negative value.
1047 */
1048 int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
1049 bool rotate_index, bool *flushed, uint64_t total_size)
1050 {
1051 int ret = 0;
1052 uint64_t data_offset;
1053 struct relay_index *index;
1054
1055 ASSERT_LOCKED(stream->lock);
1056 /* Get data offset because we are about to update the index. */
1057 data_offset = htobe64(stream->tracefile_size_current);
1058
1059 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
1060 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
1061
1062 /*
1063 * Lookup for an existing index for that stream id/sequence
1064 * number. If it exists, the control thread has already received the
1065 * data for it, thus we need to write it to disk.
1066 */
1067 index = relay_index_get_by_id_or_create(stream, net_seq_num);
1068 if (!index) {
1069 ret = -1;
1070 goto end;
1071 }
1072
1073 if (rotate_index || !stream->index_file) {
1074 ret = create_index_file(stream, stream->trace_chunk);
1075 if (ret) {
1076 ERR("Failed to create index file for stream %" PRIu64,
1077 stream->stream_handle);
1078 /* Put self-ref for this index due to error. */
1079 relay_index_put(index);
1080 index = NULL;
1081 goto end;
1082 }
1083 }
1084
1085 if (relay_index_set_file(index, stream->index_file, data_offset)) {
1086 ret = -1;
1087 /* Put self-ref for this index due to error. */
1088 relay_index_put(index);
1089 index = NULL;
1090 goto end;
1091 }
1092
1093 ret = relay_index_try_flush(index);
1094 if (ret == 0) {
1095 tracefile_array_commit_seq(stream->tfa);
1096 stream->index_received_seqcount++;
1097 *flushed = true;
1098 } else if (ret > 0) {
1099 index->total_size = total_size;
1100 /* No flush. */
1101 ret = 0;
1102 } else {
1103 /*
1104 * ret < 0
1105 *
1106 * relay_index_try_flush is responsible for the self-reference
1107 * put of the index object on error.
1108 */
1109 ERR("relay_index_try_flush error %d", ret);
1110 ret = -1;
1111 }
1112 end:
1113 return ret;
1114 }
1115
1116 int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
1117 uint64_t sequence_number, bool index_flushed)
1118 {
1119 int ret = 0;
1120
1121 ASSERT_LOCKED(stream->lock);
1122
1123 stream->tracefile_size_current += packet_total_size;
1124 if (index_flushed) {
1125 stream->pos_after_last_complete_data_index =
1126 stream->tracefile_size_current;
1127 stream->prev_index_seq = sequence_number;
1128 ret = try_rotate_stream_index(stream);
1129 if (ret < 0) {
1130 goto end;
1131 }
1132 }
1133
1134 stream->prev_data_seq = sequence_number;
1135 ret = try_rotate_stream_data(stream);
1136 if (ret < 0) {
1137 goto end;
1138 }
1139 end:
1140 return ret;
1141 }
1142
1143 int stream_add_index(struct relay_stream *stream,
1144 const struct lttcomm_relayd_index *index_info)
1145 {
1146 int ret = 0;
1147 struct relay_index *index;
1148
1149 ASSERT_LOCKED(stream->lock);
1150
1151 /* Live beacon handling */
1152 if (index_info->packet_size == 0) {
1153 DBG("Received live beacon for stream %" PRIu64,
1154 stream->stream_handle);
1155
1156 /*
1157 * Only flag a stream inactive when it has already
1158 * received data and no indexes are in flight.
1159 */
1160 if (stream->index_received_seqcount > 0
1161 && stream->indexes_in_flight == 0) {
1162 stream->beacon_ts_end = index_info->timestamp_end;
1163 }
1164 ret = 0;
1165 goto end;
1166 } else {
1167 stream->beacon_ts_end = -1ULL;
1168 }
1169
1170 if (stream->ctf_stream_id == -1ULL) {
1171 stream->ctf_stream_id = index_info->stream_id;
1172 }
1173
1174 index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
1175 if (!index) {
1176 ret = -1;
1177 ERR("Failed to get or create index %" PRIu64,
1178 index_info->net_seq_num);
1179 goto end;
1180 }
1181 if (relay_index_set_control_data(index, index_info,
1182 stream->trace->session->minor)) {
1183 ERR("set_index_control_data error");
1184 relay_index_put(index);
1185 ret = -1;
1186 goto end;
1187 }
1188 ret = relay_index_try_flush(index);
1189 if (ret == 0) {
1190 tracefile_array_commit_seq(stream->tfa);
1191 stream->index_received_seqcount++;
1192 stream->pos_after_last_complete_data_index += index->total_size;
1193 stream->prev_index_seq = index_info->net_seq_num;
1194
1195 ret = try_rotate_stream_index(stream);
1196 if (ret < 0) {
1197 goto end;
1198 }
1199 } else if (ret > 0) {
1200 /* no flush. */
1201 ret = 0;
1202 } else {
1203 /*
1204 * ret < 0
1205 *
1206 * relay_index_try_flush is responsible for the self-reference
1207 * put of the index object on error.
1208 */
1209 ERR("relay_index_try_flush error %d", ret);
1210 ret = -1;
1211 }
1212 end:
1213 return ret;
1214 }
1215
1216 static void print_stream_indexes(struct relay_stream *stream)
1217 {
1218 struct lttng_ht_iter iter;
1219 struct relay_index *index;
1220
1221 rcu_read_lock();
1222 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
1223 index_n.node) {
1224 DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
1225 " stream %" PRIu64 " trace %" PRIu64
1226 " session %" PRIu64,
1227 index,
1228 index->index_n.key,
1229 stream->ref.refcount,
1230 index->stream->stream_handle,
1231 index->stream->trace->id,
1232 index->stream->trace->session->id);
1233 }
1234 rcu_read_unlock();
1235 }
1236
1237 int stream_reset_file(struct relay_stream *stream)
1238 {
1239 ASSERT_LOCKED(stream->lock);
1240
1241 if (stream->stream_fd) {
1242 stream_fd_put(stream->stream_fd);
1243 stream->stream_fd = NULL;
1244 }
1245
1246 stream->tracefile_size_current = 0;
1247 stream->prev_data_seq = 0;
1248 stream->prev_index_seq = 0;
1249 /* Note that this does not reset the tracefile array. */
1250 stream->tracefile_current_index = 0;
1251 stream->pos_after_last_complete_data_index = 0;
1252
1253 return stream_create_data_output_file_from_trace_chunk(stream,
1254 stream->trace_chunk, true, &stream->stream_fd);
1255 }
1256
1257 void print_relay_streams(void)
1258 {
1259 struct lttng_ht_iter iter;
1260 struct relay_stream *stream;
1261
1262 if (!relay_streams_ht) {
1263 return;
1264 }
1265
1266 rcu_read_lock();
1267 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
1268 node.node) {
1269 if (!stream_get(stream)) {
1270 continue;
1271 }
1272 DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
1273 " session %" PRIu64,
1274 stream,
1275 stream->ref.refcount,
1276 stream->stream_handle,
1277 stream->trace->id,
1278 stream->trace->session->id);
1279 print_stream_indexes(stream);
1280 stream_put(stream);
1281 }
1282 rcu_read_unlock();
1283 }
This page took 0.093449 seconds and 4 git commands to generate.