Fix: define _LGPL_SOURCE in C files
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
CommitLineData
2a174661
DG
1/*
2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
6c1c0768 20#define _LGPL_SOURCE
2a174661
DG
21#include <common/common.h>
22
23#include "index.h"
24#include "stream.h"
25#include "viewer-stream.h"
26
27static void rcu_destroy_stream(struct rcu_head *head)
28{
29 struct relay_stream *stream =
30 caa_container_of(head, struct relay_stream, rcu_node);
31
32 free(stream->path_name);
33 free(stream->channel_name);
34 free(stream);
35}
36
37/*
38 * Get stream from stream id from the given hash table. Return stream if found
39 * else NULL.
40 *
41 * Need to be called with RCU read-side lock held.
42 */
43struct relay_stream *stream_find_by_id(struct lttng_ht *ht,
44 uint64_t stream_id)
45{
46 struct lttng_ht_node_u64 *node;
47 struct lttng_ht_iter iter;
48 struct relay_stream *stream = NULL;
49
50 assert(ht);
51
52 lttng_ht_lookup(ht, &stream_id, &iter);
53 node = lttng_ht_iter_get_node_u64(&iter);
54 if (node == NULL) {
55 DBG("Relay stream %" PRIu64 " not found", stream_id);
56 goto end;
57 }
58 stream = caa_container_of(node, struct relay_stream, node);
59
60end:
61 return stream;
62}
63
64/*
65 * Close a given stream. If an assosiated viewer stream exists it is updated.
66 *
67 * RCU read side lock MUST be acquired.
68 *
69 * Return 0 if close was successful or 1 if already closed.
70 */
71int stream_close(struct relay_session *session, struct relay_stream *stream)
72{
73 int delret, ret;
74 struct relay_viewer_stream *vstream;
75 struct ctf_trace *ctf_trace;
76
77 assert(stream);
78
79 pthread_mutex_lock(&stream->lock);
80
81 if (stream->terminated_flag) {
82 /* This stream is already closed. Ignore. */
83 ret = 1;
84 goto end_unlock;
85 }
86
87 DBG("Closing stream id %" PRIu64, stream->stream_handle);
88
89 if (stream->fd >= 0) {
90 delret = close(stream->fd);
91 if (delret < 0) {
92 PERROR("close stream");
93 }
94 }
95
96 if (stream->index_fd >= 0) {
97 delret = close(stream->index_fd);
98 if (delret < 0) {
99 PERROR("close stream index_fd");
100 }
101 }
102
103 vstream = viewer_stream_find_by_id(stream->stream_handle);
104 if (vstream) {
105 /*
106 * Set the last good value into the viewer stream. This is done
107 * right before the stream gets deleted from the hash table. The
108 * lookup failure on the live thread side of a stream indicates
109 * that the viewer stream index received value should be used.
110 */
111 pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
112 vstream->total_index_received = stream->total_index_received;
113 vstream->tracefile_count_last = stream->tracefile_count_current;
114 vstream->close_write_flag = 1;
115 pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
116 }
117
118 /* Cleanup index of that stream. */
119 relay_index_destroy_by_stream_id(stream->stream_handle);
120
121 ctf_trace = ctf_trace_find_by_path(session->ctf_traces_ht,
122 stream->path_name);
123 assert(ctf_trace);
124 ctf_trace_put_ref(ctf_trace);
125
8bf28e67 126 stream->close_flag = 1;
2a174661
DG
127 stream->terminated_flag = 1;
128 ret = 0;
129
130end_unlock:
131 pthread_mutex_unlock(&stream->lock);
132 return ret;
133}
134
135void stream_delete(struct lttng_ht *ht, struct relay_stream *stream)
136{
137 int ret;
138 struct lttng_ht_iter iter;
139
140 assert(ht);
141 assert(stream);
142
143 iter.iter.node = &stream->node.node;
144 ret = lttng_ht_del(ht, &iter);
145 assert(!ret);
146
147 cds_list_del(&stream->trace_list);
148}
149
150void stream_destroy(struct relay_stream *stream)
151{
152 assert(stream);
153
154 call_rcu(&stream->rcu_node, rcu_destroy_stream);
155}
This page took 0.029443 seconds and 4 git commands to generate.