Fix: relayd: Dereference after null check
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
CommitLineData
2a174661
DG
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
7591bab1 4 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
c35f9726 5 * 2019 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
2a174661
DG
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License, version 2 only, as
9 * published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc., 51
18 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
6c1c0768 21#define _LGPL_SOURCE
2a174661 22#include <common/common.h>
7591bab1
MD
23#include <common/utils.h>
24#include <common/defaults.h>
c35f9726 25#include <common/sessiond-comm/relayd.h>
7591bab1
MD
26#include <urcu/rculist.h>
27#include <sys/stat.h>
2a174661 28
7591bab1 29#include "lttng-relayd.h"
2a174661
DG
30#include "index.h"
31#include "stream.h"
32#include "viewer-stream.h"
33
348a81dc
JG
34#include <sys/types.h>
35#include <fcntl.h>
36
c35f9726
JG
37#define FILE_IO_STACK_BUFFER_SIZE 65536
38
7591bab1
MD
39/* Should be called with RCU read-side lock held. */
40bool stream_get(struct relay_stream *stream)
41{
ce4d4083 42 return urcu_ref_get_unless_zero(&stream->ref);
7591bab1
MD
43}
44
2a174661 45/*
7591bab1
MD
46 * Get stream from stream id from the streams hash table. Return stream
47 * if found else NULL. A stream reference is taken when a stream is
48 * returned. stream_put() must be called on that stream.
2a174661 49 */
7591bab1 50struct relay_stream *stream_get_by_id(uint64_t stream_id)
2a174661
DG
51{
52 struct lttng_ht_node_u64 *node;
53 struct lttng_ht_iter iter;
54 struct relay_stream *stream = NULL;
55
7591bab1
MD
56 rcu_read_lock();
57 lttng_ht_lookup(relay_streams_ht, &stream_id, &iter);
2a174661 58 node = lttng_ht_iter_get_node_u64(&iter);
7591bab1 59 if (!node) {
2a174661
DG
60 DBG("Relay stream %" PRIu64 " not found", stream_id);
61 goto end;
62 }
63 stream = caa_container_of(node, struct relay_stream, node);
7591bab1
MD
64 if (!stream_get(stream)) {
65 stream = NULL;
66 }
2a174661 67end:
7591bab1 68 rcu_read_unlock();
2a174661
DG
69 return stream;
70}
71
c35f9726
JG
72static void stream_complete_rotation(struct relay_stream *stream)
73{
74 DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
75 lttng_trace_chunk_put(stream->trace_chunk);
76 stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
77 stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
78}
79
c35f9726
JG
80static int stream_create_data_output_file_from_trace_chunk(
81 struct relay_stream *stream,
82 struct lttng_trace_chunk *trace_chunk,
83 bool force_unlink,
84 struct stream_fd **out_stream_fd)
348a81dc
JG
85{
86 int ret, fd;
c35f9726 87 char stream_path[LTTNG_PATH_MAX];
348a81dc
JG
88 enum lttng_trace_chunk_status status;
89 const int flags = O_RDWR | O_CREAT | O_TRUNC;
90 const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
348a81dc
JG
91
92 ASSERT_LOCKED(stream->lock);
93 assert(stream->trace_chunk);
94
348a81dc 95 ret = utils_stream_file_path(stream->path_name, stream->channel_name,
c35f9726
JG
96 stream->tracefile_size, stream->tracefile_current_index,
97 NULL, stream_path, sizeof(stream_path));
348a81dc
JG
98 if (ret < 0) {
99 goto end;
100 }
101
c35f9726
JG
102 if (stream->tracefile_wrapped_around || force_unlink) {
103 /*
104 * The on-disk ring-buffer has wrapped around.
105 * Newly created stream files will replace existing files. Since
106 * live clients may be consuming existing files, the file about
107 * to be replaced is unlinked in order to not overwrite its
108 * content.
109 */
110 status = lttng_trace_chunk_unlink_file(trace_chunk,
111 stream_path);
112 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
113 PERROR("Failed to unlink stream file \"%s\" during trace file rotation",
114 stream_path);
115 /*
116 * Don't abort if the file doesn't exist, it is
117 * unexpected, but should not be a fatal error.
118 */
119 if (errno != ENOENT) {
120 ret = -1;
121 goto end;
122 }
123 }
124 }
125
348a81dc 126 status = lttng_trace_chunk_open_file(
c35f9726 127 trace_chunk, stream_path, flags, mode, &fd);
348a81dc
JG
128 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
129 ERR("Failed to open stream file \"%s\"", stream->channel_name);
130 ret = -1;
131 goto end;
132 }
133
c35f9726
JG
134 *out_stream_fd = stream_fd_create(fd);
135 if (!*out_stream_fd) {
348a81dc
JG
136 if (close(ret)) {
137 PERROR("Error closing stream file descriptor %d", ret);
138 }
139 ret = -1;
140 goto end;
141 }
142end:
143 return ret;
144}
145
c35f9726
JG
146static int stream_rotate_data_file(struct relay_stream *stream)
147{
148 int ret = 0;
149
150 DBG("Rotating stream %" PRIu64 " data file",
151 stream->stream_handle);
152
153 if (stream->stream_fd) {
154 stream_fd_put(stream->stream_fd);
155 stream->stream_fd = NULL;
156 }
157
158 stream->tracefile_wrapped_around = false;
159 stream->tracefile_current_index = 0;
160
161 if (stream->ongoing_rotation.value.next_trace_chunk) {
162 struct stream_fd *new_stream_fd = NULL;
163 enum lttng_trace_chunk_status chunk_status;
164
165 chunk_status = lttng_trace_chunk_create_subdirectory(
166 stream->ongoing_rotation.value.next_trace_chunk,
167 stream->path_name);
168 if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
169 ret = -1;
170 goto end;
171 }
172
173 /* Rotate the data file. */
174 ret = stream_create_data_output_file_from_trace_chunk(stream,
175 stream->ongoing_rotation.value.next_trace_chunk,
176 false, &new_stream_fd);
177 stream->stream_fd = new_stream_fd;
178 if (ret < 0) {
179 ERR("Failed to rotate stream data file");
180 goto end;
181 }
182 }
183 stream->tracefile_size_current = 0;
184 stream->pos_after_last_complete_data_index = 0;
185 stream->ongoing_rotation.value.data_rotated = true;
186
187 if (stream->ongoing_rotation.value.index_rotated) {
188 /* Rotation completed; reset its state. */
189 stream_complete_rotation(stream);
190 }
191end:
192 return ret;
193}
194
f2aea36d
JG
195/*
196 * If too much data has been written in a tracefile before we received the
197 * rotation command, we have to move the excess data to the new tracefile and
198 * perform the rotation. This can happen because the control and data
199 * connections are separate, the indexes as well as the commands arrive from
200 * the control connection and we have no control over the order so we could be
201 * in a situation where too much data has been received on the data connection
202 * before the rotation command on the control connection arrives.
203 */
204static int rotate_truncate_stream(struct relay_stream *stream)
205{
206 int ret;
207 off_t lseek_ret, previous_stream_copy_origin;
208 uint64_t copy_bytes_left, misplaced_data_size;
209 bool acquired_reference;
210 struct stream_fd *previous_stream_fd = NULL;
211 struct lttng_trace_chunk *previous_chunk = NULL;
212
4147107d
JG
213 if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
214 ERR("Protocol error encoutered in %s(): stream rotation "
215 "sequence number is before the current sequence number "
216 "and the next trace chunk is unset. Honoring this "
217 "rotation command would result in data loss",
218 __FUNCTION__);
219 ret = -1;
220 goto end;
221 }
222
f2aea36d
JG
223 ASSERT_LOCKED(stream->lock);
224 /*
225 * Acquire a reference to the current trace chunk to ensure
226 * it is not reclaimed when `stream_rotate_data_file` is called.
227 * Failing to do so would violate the contract of the trace
228 * chunk API as an active file descriptor would outlive the
229 * trace chunk.
230 */
231 acquired_reference = lttng_trace_chunk_get(stream->trace_chunk);
232 assert(acquired_reference);
233 previous_chunk = stream->trace_chunk;
234
235 /*
236 * Steal the stream's reference to its stream_fd. A new
237 * stream_fd will be created when the rotation completes and
238 * the orinal stream_fd will be used to copy the "extra" data
239 * to the new file.
240 */
241 assert(stream->stream_fd);
242 previous_stream_fd = stream->stream_fd;
243 stream->stream_fd = NULL;
244
245 assert(!stream->is_metadata);
246 assert(stream->tracefile_size_current >
247 stream->pos_after_last_complete_data_index);
248 misplaced_data_size = stream->tracefile_size_current -
249 stream->pos_after_last_complete_data_index;
250 copy_bytes_left = misplaced_data_size;
251 previous_stream_copy_origin = stream->pos_after_last_complete_data_index;
252
253 ret = stream_rotate_data_file(stream);
254 if (ret) {
255 goto end;
256 }
257
4147107d 258 assert(stream->stream_fd);
f2aea36d
JG
259 /*
260 * Seek the current tracefile to the position at which the rotation
261 * should have occurred.
262 */
263 lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
264 SEEK_SET);
265 if (lseek_ret < 0) {
266 PERROR("Failed to seek to offset %" PRIu64
267 " while copying extra data received before a stream rotation",
268 (uint64_t) previous_stream_copy_origin);
269 ret = -1;
270 goto end;
271 }
272
273 /* Move data from the old file to the new file. */
274 while (copy_bytes_left) {
275 ssize_t io_ret;
276 char copy_buffer[FILE_IO_STACK_BUFFER_SIZE];
277 const off_t copy_size_this_pass = min_t(
278 off_t, copy_bytes_left, sizeof(copy_buffer));
279
280 io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
281 copy_size_this_pass);
282 if (io_ret < (ssize_t) copy_size_this_pass) {
283 if (io_ret == -1) {
284 PERROR("Failed to read %" PRIu64
285 " bytes from fd %i in %s(), returned %zi",
286 copy_size_this_pass,
287 previous_stream_fd->fd,
288 __FUNCTION__, io_ret);
289 } else {
290 ERR("Failed to read %" PRIu64
291 " bytes from fd %i in %s(), returned %zi",
292 copy_size_this_pass,
293 previous_stream_fd->fd,
294 __FUNCTION__, io_ret);
295 }
296 ret = -1;
297 goto end;
298 }
299
300 io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
301 copy_size_this_pass);
302 if (io_ret < (ssize_t) copy_size_this_pass) {
303 if (io_ret == -1) {
304 PERROR("Failed to write %" PRIu64
305 " bytes from fd %i in %s(), returned %zi",
306 copy_size_this_pass,
307 stream->stream_fd->fd,
308 __FUNCTION__, io_ret);
309 } else {
310 ERR("Failed to write %" PRIu64
311 " bytes from fd %i in %s(), returned %zi",
312 copy_size_this_pass,
313 stream->stream_fd->fd,
314 __FUNCTION__, io_ret);
315 }
316 ret = -1;
317 goto end;
318 }
319 copy_bytes_left -= copy_size_this_pass;
320 }
321
322 /* Truncate the file to get rid of the excess data. */
323 ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
324 if (ret) {
325 PERROR("Failed to truncate current stream file to offset %" PRIu64,
326 previous_stream_copy_origin);
327 goto end;
328 }
329
330 /*
331 * Update the offset and FD of all the eventual indexes created by the
332 * data connection before the rotation command arrived.
333 */
334 ret = relay_index_switch_all_files(stream);
335 if (ret < 0) {
336 ERR("Failed to rotate index file");
337 goto end;
338 }
339
340 stream->tracefile_size_current = misplaced_data_size;
341 /* Index and data contents are back in sync. */
342 stream->pos_after_last_complete_data_index = 0;
343 ret = 0;
344end:
345 lttng_trace_chunk_put(previous_chunk);
346 stream_fd_put(previous_stream_fd);
347 return ret;
348}
349
c35f9726
JG
350/*
351 * Check if a stream's data file (as opposed to index) should be rotated
352 * (for session rotation).
353 * Must be called with the stream lock held.
354 *
355 * Return 0 on success, a negative value on error.
356 */
357static int try_rotate_stream_data(struct relay_stream *stream)
358{
359 int ret = 0;
360
361 if (caa_likely(!stream->ongoing_rotation.is_set)) {
362 /* No rotation expected. */
363 goto end;
364 }
365
366 if (stream->ongoing_rotation.value.data_rotated) {
367 /* Rotation of the data file has already occurred. */
368 goto end;
369 }
370
371 if (stream->prev_data_seq == -1ULL ||
372 stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
373 /*
374 * The next packet that will be written is not part of the next
375 * chunk yet.
376 */
377 DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
378 ", prev_data_seq = %" PRIu64 ")",
379 stream->stream_handle,
380 stream->ongoing_rotation.value.seq_num,
381 stream->prev_data_seq);
382 goto end;
383 } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
384 /*
385 * prev_data_seq is checked here since indexes and rotation
386 * commands are serialized with respect to each other.
387 */
388 DBG("Rotation after too much data has been written in tracefile "
389 "for stream %" PRIu64 ", need to truncate before "
390 "rotating", stream->stream_handle);
391 ret = rotate_truncate_stream(stream);
392 if (ret) {
393 ERR("Failed to truncate stream");
394 goto end;
395 }
396 } else {
397 ret = stream_rotate_data_file(stream);
398 }
399
400end:
401 return ret;
402}
403
404/*
405 * Close the current index file if it is open, and create a new one.
406 *
407 * Return 0 on success, -1 on error.
408 */
409static int create_index_file(struct relay_stream *stream,
410 struct lttng_trace_chunk *chunk)
411{
412 int ret;
413 uint32_t major, minor;
414 char *index_subpath = NULL;
3a735fa7 415 enum lttng_trace_chunk_status status;
c35f9726
JG
416
417 ASSERT_LOCKED(stream->lock);
418
419 /* Put ref on previous index_file. */
420 if (stream->index_file) {
421 lttng_index_file_put(stream->index_file);
422 stream->index_file = NULL;
423 }
424 major = stream->trace->session->major;
425 minor = stream->trace->session->minor;
426
427 if (!chunk) {
428 ret = 0;
429 goto end;
430 }
431 ret = asprintf(&index_subpath, "%s/%s", stream->path_name,
432 DEFAULT_INDEX_DIR);
433 if (ret < 0) {
434 goto end;
435 }
436
3a735fa7 437 status = lttng_trace_chunk_create_subdirectory(chunk,
c35f9726
JG
438 index_subpath);
439 free(index_subpath);
3a735fa7
MD
440 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
441 ret = -1;
c35f9726
JG
442 goto end;
443 }
444 stream->index_file = lttng_index_file_create_from_trace_chunk(
445 chunk, stream->path_name,
446 stream->channel_name, stream->tracefile_size,
447 stream->tracefile_current_index,
448 lttng_to_index_major(major, minor),
449 lttng_to_index_minor(major, minor), true);
450 if (!stream->index_file) {
451 ret = -1;
452 goto end;
453 }
454
455 ret = 0;
456
457end:
458 return ret;
459}
460
461/*
462 * Check if a stream's index file should be rotated (for session rotation).
463 * Must be called with the stream lock held.
464 *
465 * Return 0 on success, a negative value on error.
466 */
467static int try_rotate_stream_index(struct relay_stream *stream)
468{
469 int ret = 0;
470
471 if (!stream->ongoing_rotation.is_set) {
472 /* No rotation expected. */
473 goto end;
474 }
475
476 if (stream->ongoing_rotation.value.index_rotated) {
477 /* Rotation of the index has already occurred. */
478 goto end;
479 }
480
481 if (stream->prev_index_seq == -1ULL ||
482 stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
483 DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
484 stream->stream_handle,
485 stream->ongoing_rotation.value.seq_num,
486 stream->prev_index_seq);
487 goto end;
488 } else {
489 /* The next index belongs to the new trace chunk; rotate. */
490 assert(stream->prev_index_seq + 1 ==
491 stream->ongoing_rotation.value.seq_num);
492 DBG("Rotating stream %" PRIu64 " index file",
493 stream->stream_handle);
494 ret = create_index_file(stream,
495 stream->ongoing_rotation.value.next_trace_chunk);
496 stream->ongoing_rotation.value.index_rotated = true;
497
498 if (stream->ongoing_rotation.value.data_rotated &&
499 stream->ongoing_rotation.value.index_rotated) {
500 /* Rotation completed; reset its state. */
501 DBG("Rotation completed for stream %" PRIu64,
502 stream->stream_handle);
503 stream_complete_rotation(stream);
504 }
505 }
506
507end:
508 return ret;
509}
510
348a81dc
JG
511static int stream_set_trace_chunk(struct relay_stream *stream,
512 struct lttng_trace_chunk *chunk)
513{
514 int ret = 0;
515 enum lttng_trace_chunk_status status;
516 bool acquired_reference;
c35f9726 517 struct stream_fd *new_stream_fd = NULL;
348a81dc 518
348a81dc
JG
519 status = lttng_trace_chunk_create_subdirectory(chunk,
520 stream->path_name);
521 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
522 ret = -1;
523 goto end;
524 }
525
526 lttng_trace_chunk_put(stream->trace_chunk);
527 acquired_reference = lttng_trace_chunk_get(chunk);
528 assert(acquired_reference);
529 stream->trace_chunk = chunk;
c35f9726
JG
530
531 if (stream->stream_fd) {
532 stream_fd_put(stream->stream_fd);
533 stream->stream_fd = NULL;
534 }
535 ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
536 false, &new_stream_fd);
537 stream->stream_fd = new_stream_fd;
348a81dc 538end:
348a81dc
JG
539 return ret;
540}
541
2a174661 542/*
7591bab1 543 * We keep ownership of path_name and channel_name.
2a174661 544 */
7591bab1
MD
545struct relay_stream *stream_create(struct ctf_trace *trace,
546 uint64_t stream_handle, char *path_name,
547 char *channel_name, uint64_t tracefile_size,
348a81dc 548 uint64_t tracefile_count)
2a174661 549{
7591bab1
MD
550 int ret;
551 struct relay_stream *stream = NULL;
552 struct relay_session *session = trace->session;
348a81dc
JG
553 bool acquired_reference = false;
554 struct lttng_trace_chunk *current_trace_chunk;
2a174661 555
7591bab1
MD
556 stream = zmalloc(sizeof(struct relay_stream));
557 if (stream == NULL) {
558 PERROR("relay stream zmalloc");
7591bab1
MD
559 goto error_no_alloc;
560 }
2a174661 561
7591bab1 562 stream->stream_handle = stream_handle;
a8f9f353 563 stream->prev_data_seq = -1ULL;
7a45c7e6 564 stream->prev_index_seq = -1ULL;
bda7c7b9 565 stream->last_net_seq_num = -1ULL;
7591bab1
MD
566 stream->ctf_stream_id = -1ULL;
567 stream->tracefile_size = tracefile_size;
568 stream->tracefile_count = tracefile_count;
569 stream->path_name = path_name;
570 stream->channel_name = channel_name;
2f9c3030 571 stream->beacon_ts_end = -1ULL;
7591bab1
MD
572 lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
573 pthread_mutex_init(&stream->lock, NULL);
7591bab1
MD
574 urcu_ref_init(&stream->ref);
575 ctf_trace_get(trace);
576 stream->trace = trace;
2a174661 577
348a81dc
JG
578 pthread_mutex_lock(&trace->session->lock);
579 current_trace_chunk = trace->session->current_trace_chunk;
580 if (current_trace_chunk) {
581 acquired_reference = lttng_trace_chunk_get(current_trace_chunk);
582 }
583 pthread_mutex_unlock(&trace->session->lock);
584 if (!acquired_reference) {
585 ERR("Cannot create stream for channel \"%s\" as a reference to the session's current trace chunk could not be acquired",
586 channel_name);
7591bab1
MD
587 ret = -1;
588 goto end;
2a174661
DG
589 }
590
348a81dc
JG
591 stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
592 if (!stream->indexes_ht) {
593 ERR("Cannot created indexes_ht");
594 ret = -1;
7591bab1
MD
595 goto end;
596 }
2a174661 597
c35f9726 598 pthread_mutex_lock(&stream->lock);
348a81dc 599 ret = stream_set_trace_chunk(stream, current_trace_chunk);
c35f9726 600 pthread_mutex_unlock(&stream->lock);
348a81dc
JG
601 if (ret) {
602 ERR("Failed to set the current trace chunk of session \"%s\" on newly created stream of channel \"%s\"",
603 trace->session->session_name,
604 stream->channel_name);
7591bab1
MD
605 ret = -1;
606 goto end;
2a174661 607 }
a44ca2ca
MD
608 stream->tfa = tracefile_array_create(stream->tracefile_count);
609 if (!stream->tfa) {
610 ret = -1;
611 goto end;
612 }
7591bab1 613
348a81dc
JG
614 stream->is_metadata = !strcmp(stream->channel_name,
615 DEFAULT_METADATA_NAME);
7591bab1
MD
616 stream->in_recv_list = true;
617
618 /*
619 * Add the stream in the recv list of the session. Once the end stream
620 * message is received, all session streams are published.
621 */
622 pthread_mutex_lock(&session->recv_list_lock);
623 cds_list_add_rcu(&stream->recv_node, &session->recv_list);
624 session->stream_count++;
625 pthread_mutex_unlock(&session->recv_list_lock);
626
627 /*
628 * Both in the ctf_trace object and the global stream ht since the data
629 * side of the relayd does not have the concept of session.
630 */
631 lttng_ht_add_unique_u64(relay_streams_ht, &stream->node);
77f7bd85 632 stream->in_stream_ht = true;
2a174661 633
7591bab1
MD
634 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
635 stream->stream_handle);
636 ret = 0;
637
638end:
639 if (ret) {
640 if (stream->stream_fd) {
641 stream_fd_put(stream->stream_fd);
642 stream->stream_fd = NULL;
2a174661 643 }
7591bab1
MD
644 stream_put(stream);
645 stream = NULL;
2a174661 646 }
348a81dc 647 lttng_trace_chunk_put(current_trace_chunk);
7591bab1 648 return stream;
2a174661 649
7591bab1
MD
650error_no_alloc:
651 /*
652 * path_name and channel_name need to be freed explicitly here
653 * because we cannot rely on stream_put().
654 */
655 free(path_name);
656 free(channel_name);
657 return NULL;
658}
659
660/*
661 * Called with the session lock held.
662 */
663void stream_publish(struct relay_stream *stream)
664{
665 struct relay_session *session;
666
667 pthread_mutex_lock(&stream->lock);
668 if (stream->published) {
669 goto unlock;
2a174661
DG
670 }
671
7591bab1 672 session = stream->trace->session;
2a174661 673
7591bab1
MD
674 pthread_mutex_lock(&session->recv_list_lock);
675 if (stream->in_recv_list) {
676 cds_list_del_rcu(&stream->recv_node);
677 stream->in_recv_list = false;
678 }
679 pthread_mutex_unlock(&session->recv_list_lock);
2a174661 680
7591bab1
MD
681 pthread_mutex_lock(&stream->trace->stream_list_lock);
682 cds_list_add_rcu(&stream->stream_node, &stream->trace->stream_list);
683 pthread_mutex_unlock(&stream->trace->stream_list_lock);
2a174661 684
7591bab1
MD
685 stream->published = true;
686unlock:
2a174661 687 pthread_mutex_unlock(&stream->lock);
2a174661
DG
688}
689
7591bab1 690/*
77f7bd85 691 * Stream must be protected by holding the stream lock or by virtue of being
ce4d4083 692 * called from stream_destroy.
7591bab1
MD
693 */
694static void stream_unpublish(struct relay_stream *stream)
2a174661 695{
77f7bd85
MD
696 if (stream->in_stream_ht) {
697 struct lttng_ht_iter iter;
698 int ret;
699
700 iter.iter.node = &stream->node.node;
701 ret = lttng_ht_del(relay_streams_ht, &iter);
702 assert(!ret);
703 stream->in_stream_ht = false;
704 }
705 if (stream->published) {
706 pthread_mutex_lock(&stream->trace->stream_list_lock);
707 cds_list_del_rcu(&stream->stream_node);
708 pthread_mutex_unlock(&stream->trace->stream_list_lock);
709 stream->published = false;
7591bab1 710 }
7591bab1
MD
711}
712
713static void stream_destroy(struct relay_stream *stream)
714{
715 if (stream->indexes_ht) {
49e614cb
MD
716 /*
717 * Calling lttng_ht_destroy in call_rcu worker thread so
718 * we don't hold the RCU read-side lock while calling
719 * it.
720 */
7591bab1
MD
721 lttng_ht_destroy(stream->indexes_ht);
722 }
a44ca2ca
MD
723 if (stream->tfa) {
724 tracefile_array_destroy(stream->tfa);
725 }
7591bab1
MD
726 free(stream->path_name);
727 free(stream->channel_name);
728 free(stream);
729}
730
731static void stream_destroy_rcu(struct rcu_head *rcu_head)
732{
733 struct relay_stream *stream =
734 caa_container_of(rcu_head, struct relay_stream, rcu_node);
735
736 stream_destroy(stream);
737}
738
739/*
740 * No need to take stream->lock since this is only called on the final
741 * stream_put which ensures that a single thread may act on the stream.
7591bab1
MD
742 */
743static void stream_release(struct urcu_ref *ref)
744{
745 struct relay_stream *stream =
746 caa_container_of(ref, struct relay_stream, ref);
747 struct relay_session *session;
2a174661 748
7591bab1
MD
749 session = stream->trace->session;
750
751 DBG("Releasing stream id %" PRIu64, stream->stream_handle);
752
753 pthread_mutex_lock(&session->recv_list_lock);
754 session->stream_count--;
755 if (stream->in_recv_list) {
756 cds_list_del_rcu(&stream->recv_node);
757 stream->in_recv_list = false;
758 }
759 pthread_mutex_unlock(&session->recv_list_lock);
2a174661 760
7591bab1
MD
761 stream_unpublish(stream);
762
763 if (stream->stream_fd) {
764 stream_fd_put(stream->stream_fd);
765 stream->stream_fd = NULL;
766 }
f8f3885c
MD
767 if (stream->index_file) {
768 lttng_index_file_put(stream->index_file);
769 stream->index_file = NULL;
7591bab1
MD
770 }
771 if (stream->trace) {
772 ctf_trace_put(stream->trace);
773 stream->trace = NULL;
774 }
c35f9726 775 stream_complete_rotation(stream);
348a81dc
JG
776 lttng_trace_chunk_put(stream->trace_chunk);
777 stream->trace_chunk = NULL;
7591bab1
MD
778
779 call_rcu(&stream->rcu_node, stream_destroy_rcu);
2a174661
DG
780}
781
7591bab1 782void stream_put(struct relay_stream *stream)
2a174661 783{
7591bab1 784 rcu_read_lock();
7591bab1
MD
785 assert(stream->ref.refcount != 0);
786 /*
787 * Wait until we have processed all the stream packets before
788 * actually putting our last stream reference.
789 */
7591bab1 790 urcu_ref_put(&stream->ref, stream_release);
7591bab1
MD
791 rcu_read_unlock();
792}
793
c35f9726
JG
794int stream_set_pending_rotation(struct relay_stream *stream,
795 struct lttng_trace_chunk *next_trace_chunk,
796 uint64_t rotation_sequence_number)
797{
798 int ret = 0;
799 const struct relay_stream_rotation rotation = {
800 .seq_num = rotation_sequence_number,
801 .next_trace_chunk = next_trace_chunk,
802 };
803
804 if (stream->ongoing_rotation.is_set) {
805 ERR("Attempted to set a pending rotation on a stream already being rotated (protocol error)");
806 ret = -1;
807 goto end;
808 }
809
810 if (next_trace_chunk) {
811 const bool reference_acquired =
812 lttng_trace_chunk_get(next_trace_chunk);
813
814 assert(reference_acquired);
815 }
816 LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
817
818 DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
819 stream->stream_handle, rotation_sequence_number);
820 if (stream->is_metadata) {
821 /*
822 * A metadata stream has no index; consider it already rotated.
823 */
824 stream->ongoing_rotation.value.index_rotated = true;
825 ret = stream_rotate_data_file(stream);
826 } else {
827 ret = try_rotate_stream_data(stream);
828 if (ret < 0) {
829 goto end;
830 }
831
832 ret = try_rotate_stream_index(stream);
833 if (ret < 0) {
834 goto end;
835 }
836 }
837end:
838 return ret;
839}
840
bda7c7b9 841void try_stream_close(struct relay_stream *stream)
7591bab1 842{
98ba050e
JR
843 bool session_aborted;
844 struct relay_session *session = stream->trace->session;
845
bda7c7b9 846 DBG("Trying to close stream %" PRIu64, stream->stream_handle);
98ba050e
JR
847
848 pthread_mutex_lock(&session->lock);
849 session_aborted = session->aborted;
850 pthread_mutex_unlock(&session->lock);
851
7591bab1 852 pthread_mutex_lock(&stream->lock);
bda7c7b9
JG
853 /*
854 * Can be called concurently by connection close and reception of last
855 * pending data.
856 */
857 if (stream->closed) {
858 pthread_mutex_unlock(&stream->lock);
859 DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
860 return;
861 }
862
863 stream->close_requested = true;
3d07a857
MD
864
865 if (stream->last_net_seq_num == -1ULL) {
866 /*
867 * Handle connection close without explicit stream close
868 * command.
869 *
870 * We can be clever about indexes partially received in
871 * cases where we received the data socket part, but not
872 * the control socket part: since we're currently closing
873 * the stream on behalf of the control socket, we *know*
874 * there won't be any more control information for this
875 * socket. Therefore, we can destroy all indexes for
876 * which we have received only the file descriptor (from
877 * data socket). This takes care of consumerd crashes
878 * between sending the data and control information for
879 * a packet. Since those are sent in that order, we take
880 * care of consumerd crashes.
881 */
5312a3ed 882 DBG("relay_index_close_partial_fd");
3d07a857
MD
883 relay_index_close_partial_fd(stream);
884 /*
885 * Use the highest net_seq_num we currently have pending
886 * As end of stream indicator. Leave last_net_seq_num
887 * at -1ULL if we cannot find any index.
888 */
889 stream->last_net_seq_num = relay_index_find_last(stream);
5312a3ed 890 DBG("Updating stream->last_net_seq_num to %" PRIu64, stream->last_net_seq_num);
3d07a857
MD
891 /* Fall-through into the next check. */
892 }
893
bda7c7b9 894 if (stream->last_net_seq_num != -1ULL &&
a8f9f353 895 ((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) < 0
98ba050e 896 && !session_aborted) {
3d07a857
MD
897 /*
898 * Don't close since we still have data pending. This
899 * handles cases where an explicit close command has
900 * been received for this stream, and cases where the
901 * connection has been closed, and we are awaiting for
902 * index information from the data socket. It is
903 * therefore expected that all the index fd information
904 * we need has already been received on the control
905 * socket. Matching index information from data socket
906 * should be Expected Soon(TM).
907 *
908 * TODO: We should implement a timer to garbage collect
909 * streams after a timeout to be resilient against a
910 * consumerd implementation that would not match this
911 * expected behavior.
912 */
bda7c7b9
JG
913 pthread_mutex_unlock(&stream->lock);
914 DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
915 return;
916 }
3d07a857
MD
917 /*
918 * We received all the indexes we can expect.
919 */
77f7bd85 920 stream_unpublish(stream);
2229a09c 921 stream->closed = true;
bda7c7b9 922 /* Relay indexes are only used by the "consumer/sessiond" end. */
7591bab1
MD
923 relay_index_close_all(stream);
924 pthread_mutex_unlock(&stream->lock);
bda7c7b9 925 DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
7591bab1
MD
926 stream_put(stream);
927}
928
c35f9726
JG
929int stream_init_packet(struct relay_stream *stream, size_t packet_size,
930 bool *file_rotated)
931{
932 int ret = 0;
933
934 ASSERT_LOCKED(stream->lock);
935 if (caa_likely(stream->tracefile_size == 0)) {
936 /* No size limit set; nothing to check. */
937 goto end;
938 }
939
940 /*
941 * Check if writing the new packet would exceed the maximal file size.
942 */
943 if (caa_unlikely((stream->tracefile_size_current + packet_size) >
944 stream->tracefile_size)) {
945 const uint64_t new_file_index =
946 (stream->tracefile_current_index + 1) %
947 stream->tracefile_count;
948
949 if (new_file_index < stream->tracefile_current_index) {
950 stream->tracefile_wrapped_around = true;
951 }
952 DBG("New stream packet causes stream file rotation: stream_id = %" PRIu64
953 ", current_file_size = %" PRIu64
76ee9245 954 ", packet_size = %zu, current_file_index = %" PRIu64
c35f9726
JG
955 " new_file_index = %" PRIu64,
956 stream->stream_handle,
957 stream->tracefile_size_current, packet_size,
958 stream->tracefile_current_index, new_file_index);
959 tracefile_array_file_rotate(stream->tfa);
960 stream->tracefile_current_index = new_file_index;
961
962 if (stream->stream_fd) {
963 stream_fd_put(stream->stream_fd);
964 stream->stream_fd = NULL;
965 }
966 ret = stream_create_data_output_file_from_trace_chunk(stream,
967 stream->trace_chunk, false, &stream->stream_fd);
968 if (ret) {
969 ERR("Failed to perform trace file rotation of stream %" PRIu64,
970 stream->stream_handle);
971 goto end;
972 }
973
974 /*
975 * Reset current size because we just performed a stream
976 * rotation.
977 */
978 stream->tracefile_size_current = 0;
979 *file_rotated = true;
980 } else {
981 *file_rotated = false;
982 }
983end:
984 return ret;
985}
986
987/* Note that the packet is not necessarily complete. */
988int stream_write(struct relay_stream *stream,
989 const struct lttng_buffer_view *packet, size_t padding_len)
990{
991 int ret = 0;
992 ssize_t write_ret;
993 size_t padding_to_write = padding_len;
994 char padding_buffer[FILE_IO_STACK_BUFFER_SIZE];
995
996 ASSERT_LOCKED(stream->lock);
997 memset(padding_buffer, 0,
998 min(sizeof(padding_buffer), padding_to_write));
999
1000 if (packet) {
1001 write_ret = lttng_write(stream->stream_fd->fd,
1002 packet->data, packet->size);
1003 if (write_ret != packet->size) {
1004 PERROR("Failed to write to stream file of %sstream %" PRIu64,
1005 stream->is_metadata ? "metadata " : "",
1006 stream->stream_handle);
1007 ret = -1;
1008 goto end;
1009 }
1010 }
1011
1012 while (padding_to_write > 0) {
1013 const size_t padding_to_write_this_pass =
1014 min(padding_to_write, sizeof(padding_buffer));
1015
1016 write_ret = lttng_write(stream->stream_fd->fd,
1017 padding_buffer, padding_to_write_this_pass);
1018 if (write_ret != padding_to_write_this_pass) {
1019 PERROR("Failed to write padding to file of %sstream %" PRIu64,
1020 stream->is_metadata ? "metadata " : "",
1021 stream->stream_handle);
1022 ret = -1;
1023 goto end;
1024 }
1025 padding_to_write -= padding_to_write_this_pass;
1026 }
1027
1028 if (stream->is_metadata) {
014e956d
JG
1029 stream->metadata_received += packet ? packet->size : 0;
1030 stream->metadata_received += padding_len;
c35f9726
JG
1031 }
1032
8c865d87 1033 DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
c35f9726
JG
1034 stream->is_metadata ? "metadata " : "",
1035 stream->stream_handle,
8c865d87 1036 packet ? packet->size : (size_t) 0, padding_len);
c35f9726
JG
1037end:
1038 return ret;
1039}
1040
1041/*
1042 * Update index after receiving a packet for a data stream.
1043 *
1044 * Called with the stream lock held.
1045 *
1046 * Return 0 on success else a negative value.
1047 */
1048int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
1049 bool rotate_index, bool *flushed, uint64_t total_size)
1050{
1051 int ret = 0;
1052 uint64_t data_offset;
1053 struct relay_index *index;
1054
92761772 1055 assert(stream->trace_chunk);
c35f9726
JG
1056 ASSERT_LOCKED(stream->lock);
1057 /* Get data offset because we are about to update the index. */
1058 data_offset = htobe64(stream->tracefile_size_current);
1059
1060 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
1061 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
1062
1063 /*
1064 * Lookup for an existing index for that stream id/sequence
1065 * number. If it exists, the control thread has already received the
1066 * data for it, thus we need to write it to disk.
1067 */
1068 index = relay_index_get_by_id_or_create(stream, net_seq_num);
1069 if (!index) {
1070 ret = -1;
1071 goto end;
1072 }
1073
1074 if (rotate_index || !stream->index_file) {
1075 ret = create_index_file(stream, stream->trace_chunk);
1076 if (ret) {
1077 ERR("Failed to create index file for stream %" PRIu64,
1078 stream->stream_handle);
1079 /* Put self-ref for this index due to error. */
1080 relay_index_put(index);
1081 index = NULL;
1082 goto end;
1083 }
1084 }
1085
1086 if (relay_index_set_file(index, stream->index_file, data_offset)) {
1087 ret = -1;
1088 /* Put self-ref for this index due to error. */
1089 relay_index_put(index);
1090 index = NULL;
1091 goto end;
1092 }
1093
1094 ret = relay_index_try_flush(index);
1095 if (ret == 0) {
1096 tracefile_array_commit_seq(stream->tfa);
1097 stream->index_received_seqcount++;
1098 *flushed = true;
1099 } else if (ret > 0) {
1100 index->total_size = total_size;
1101 /* No flush. */
1102 ret = 0;
1103 } else {
1104 /*
1105 * ret < 0
1106 *
1107 * relay_index_try_flush is responsible for the self-reference
1108 * put of the index object on error.
1109 */
1110 ERR("relay_index_try_flush error %d", ret);
1111 ret = -1;
1112 }
1113end:
1114 return ret;
1115}
1116
1117int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size,
1118 uint64_t sequence_number, bool index_flushed)
1119{
1120 int ret = 0;
1121
1122 ASSERT_LOCKED(stream->lock);
1123
1124 stream->tracefile_size_current += packet_total_size;
1125 if (index_flushed) {
1126 stream->pos_after_last_complete_data_index =
1127 stream->tracefile_size_current;
1128 stream->prev_index_seq = sequence_number;
1129 ret = try_rotate_stream_index(stream);
1130 if (ret < 0) {
1131 goto end;
1132 }
1133 }
1134
1135 stream->prev_data_seq = sequence_number;
1136 ret = try_rotate_stream_data(stream);
d2ec9b88 1137
c35f9726
JG
1138end:
1139 return ret;
1140}
1141
1142int stream_add_index(struct relay_stream *stream,
1143 const struct lttcomm_relayd_index *index_info)
1144{
1145 int ret = 0;
1146 struct relay_index *index;
1147
1148 ASSERT_LOCKED(stream->lock);
1149
1150 /* Live beacon handling */
1151 if (index_info->packet_size == 0) {
1152 DBG("Received live beacon for stream %" PRIu64,
1153 stream->stream_handle);
1154
1155 /*
1156 * Only flag a stream inactive when it has already
1157 * received data and no indexes are in flight.
1158 */
1159 if (stream->index_received_seqcount > 0
1160 && stream->indexes_in_flight == 0) {
1161 stream->beacon_ts_end = index_info->timestamp_end;
1162 }
1163 ret = 0;
1164 goto end;
1165 } else {
1166 stream->beacon_ts_end = -1ULL;
1167 }
1168
1169 if (stream->ctf_stream_id == -1ULL) {
1170 stream->ctf_stream_id = index_info->stream_id;
1171 }
1172
1173 index = relay_index_get_by_id_or_create(stream, index_info->net_seq_num);
1174 if (!index) {
1175 ret = -1;
1176 ERR("Failed to get or create index %" PRIu64,
1177 index_info->net_seq_num);
1178 goto end;
1179 }
1180 if (relay_index_set_control_data(index, index_info,
1181 stream->trace->session->minor)) {
1182 ERR("set_index_control_data error");
1183 relay_index_put(index);
1184 ret = -1;
1185 goto end;
1186 }
1187 ret = relay_index_try_flush(index);
1188 if (ret == 0) {
1189 tracefile_array_commit_seq(stream->tfa);
1190 stream->index_received_seqcount++;
1191 stream->pos_after_last_complete_data_index += index->total_size;
1192 stream->prev_index_seq = index_info->net_seq_num;
1193
1194 ret = try_rotate_stream_index(stream);
1195 if (ret < 0) {
1196 goto end;
1197 }
1198 } else if (ret > 0) {
1199 /* no flush. */
1200 ret = 0;
1201 } else {
1202 /*
1203 * ret < 0
1204 *
1205 * relay_index_try_flush is responsible for the self-reference
1206 * put of the index object on error.
1207 */
1208 ERR("relay_index_try_flush error %d", ret);
1209 ret = -1;
1210 }
1211end:
1212 return ret;
1213}
1214
da412cde
MD
1215static void print_stream_indexes(struct relay_stream *stream)
1216{
1217 struct lttng_ht_iter iter;
1218 struct relay_index *index;
1219
1220 rcu_read_lock();
1221 cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter, index,
1222 index_n.node) {
1223 DBG("index %p net_seq_num %" PRIu64 " refcount %ld"
1224 " stream %" PRIu64 " trace %" PRIu64
1225 " session %" PRIu64,
1226 index,
1227 index->index_n.key,
1228 stream->ref.refcount,
1229 index->stream->stream_handle,
1230 index->stream->trace->id,
1231 index->stream->trace->session->id);
1232 }
1233 rcu_read_unlock();
1234}
1235
c35f9726
JG
1236int stream_reset_file(struct relay_stream *stream)
1237{
1238 ASSERT_LOCKED(stream->lock);
1239
1240 if (stream->stream_fd) {
1241 stream_fd_put(stream->stream_fd);
1242 stream->stream_fd = NULL;
1243 }
1244
1245 stream->tracefile_size_current = 0;
1246 stream->prev_data_seq = 0;
1247 stream->prev_index_seq = 0;
1248 /* Note that this does not reset the tracefile array. */
1249 stream->tracefile_current_index = 0;
1250 stream->pos_after_last_complete_data_index = 0;
1251
1252 return stream_create_data_output_file_from_trace_chunk(stream,
1253 stream->trace_chunk, true, &stream->stream_fd);
1254}
1255
7591bab1
MD
1256void print_relay_streams(void)
1257{
1258 struct lttng_ht_iter iter;
1259 struct relay_stream *stream;
1260
ce3f3ba3
JG
1261 if (!relay_streams_ht) {
1262 return;
1263 }
1264
7591bab1
MD
1265 rcu_read_lock();
1266 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
1267 node.node) {
1268 if (!stream_get(stream)) {
1269 continue;
1270 }
1271 DBG("stream %p refcount %ld stream %" PRIu64 " trace %" PRIu64
1272 " session %" PRIu64,
1273 stream,
1274 stream->ref.refcount,
1275 stream->stream_handle,
1276 stream->trace->id,
1277 stream->trace->session->id);
da412cde 1278 print_stream_indexes(stream);
7591bab1
MD
1279 stream_put(stream);
1280 }
1281 rcu_read_unlock();
2a174661 1282}
This page took 0.092094 seconds and 4 git commands to generate.