Fix: consumer should await for initial streams
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/stat.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27
28 #include "consumer.h"
29 #include "kernel-consumer.h"
30
31 /*
32 * Sending a single channel to the consumer with command ADD_CHANNEL.
33 */
34 int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
35 {
36 int ret;
37 struct lttcomm_consumer_msg lkm;
38
39 /* Safety net */
40 assert(channel);
41
42 DBG("Kernel consumer adding channel %s to kernel consumer",
43 channel->channel->name);
44
45 /* Prep channel message structure */
46 consumer_init_channel_comm_msg(&lkm,
47 LTTNG_CONSUMER_ADD_CHANNEL,
48 channel->fd,
49 channel->channel->attr.subbuf_size,
50 0, /* Kernel */
51 channel->channel->name,
52 channel->stream_count);
53
54 ret = consumer_send_channel(sock, &lkm);
55 if (ret < 0) {
56 goto error;
57 }
58
59 error:
60 return ret;
61 }
62
63 /*
64 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
65 */
66 int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
67 {
68 int ret;
69 char tmp_path[PATH_MAX];
70 const char *pathname;
71 struct lttcomm_consumer_msg lkm;
72 struct consumer_output *consumer;
73
74 /* Safety net */
75 assert(session);
76 assert(session->consumer);
77
78 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
79
80 /* Get consumer output pointer */
81 consumer = session->consumer;
82
83 /* Get the right path name destination */
84 if (consumer->type == CONSUMER_DST_LOCAL) {
85 /* Set application path to the destination path */
86 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
87 consumer->dst.trace_path, consumer->subdir);
88 if (ret < 0) {
89 PERROR("snprintf metadata path");
90 goto error;
91 }
92 pathname = tmp_path;
93
94 /* Create directory */
95 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
96 session->uid, session->gid);
97 if (ret < 0) {
98 if (ret != -EEXIST) {
99 ERR("Trace directory creation error");
100 goto error;
101 }
102 }
103 DBG3("Kernel local consumer tracefile path: %s", pathname);
104 } else {
105 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
106 if (ret < 0) {
107 PERROR("snprintf metadata path");
108 goto error;
109 }
110 pathname = tmp_path;
111 DBG3("Kernel network consumer subdir path: %s", pathname);
112 }
113
114 /* Prep channel message structure */
115 consumer_init_channel_comm_msg(&lkm,
116 LTTNG_CONSUMER_ADD_CHANNEL,
117 session->metadata->fd,
118 session->metadata->conf->attr.subbuf_size,
119 0, /* for kernel */
120 "metadata",
121 1);
122
123 ret = consumer_send_channel(sock, &lkm);
124 if (ret < 0) {
125 goto error;
126 }
127
128 /* Prep stream message structure */
129 consumer_init_stream_comm_msg(&lkm,
130 LTTNG_CONSUMER_ADD_STREAM,
131 session->metadata->fd,
132 session->metadata_stream_fd,
133 LTTNG_CONSUMER_ACTIVE_STREAM,
134 DEFAULT_KERNEL_CHANNEL_OUTPUT,
135 0, /* Kernel */
136 session->uid,
137 session->gid,
138 consumer->net_seq_index,
139 1, /* Metadata flag set */
140 "metadata",
141 pathname);
142
143 /* Send stream and file descriptor */
144 ret = consumer_send_stream(sock, consumer, &lkm,
145 &session->metadata_stream_fd, 1);
146 if (ret < 0) {
147 goto error;
148 }
149
150 error:
151 return ret;
152 }
153
154 /*
155 * Sending a single stream to the consumer with command ADD_STREAM.
156 */
157 int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
158 struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
159 {
160 int ret;
161 char tmp_path[PATH_MAX];
162 const char *pathname;
163 struct lttcomm_consumer_msg lkm;
164 struct consumer_output *consumer;
165
166 assert(channel);
167 assert(stream);
168 assert(session);
169 assert(session->consumer);
170
171 DBG("Sending stream %d of channel %s to kernel consumer",
172 stream->fd, channel->channel->name);
173
174 /* Get consumer output pointer */
175 consumer = session->consumer;
176
177 /* Get the right path name destination */
178 if (consumer->type == CONSUMER_DST_LOCAL) {
179 /* Set application path to the destination path */
180 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
181 consumer->dst.trace_path, consumer->subdir);
182 if (ret < 0) {
183 PERROR("snprintf stream path");
184 goto error;
185 }
186 pathname = tmp_path;
187 DBG3("Kernel local consumer tracefile path: %s", pathname);
188 } else {
189 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
190 if (ret < 0) {
191 PERROR("snprintf stream path");
192 goto error;
193 }
194 pathname = tmp_path;
195 DBG3("Kernel network consumer subdir path: %s", pathname);
196 }
197
198 /* Prep stream consumer message */
199 consumer_init_stream_comm_msg(&lkm, LTTNG_CONSUMER_ADD_STREAM,
200 channel->fd,
201 stream->fd,
202 stream->state,
203 channel->channel->attr.output,
204 0, /* Kernel */
205 session->uid,
206 session->gid,
207 consumer->net_seq_index,
208 0, /* Metadata flag unset */
209 stream->name,
210 pathname);
211
212 /* Send stream and file descriptor */
213 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
214 if (ret < 0) {
215 goto error;
216 }
217
218 error:
219 return ret;
220 }
221
222 /*
223 * Send all stream fds of kernel channel to the consumer.
224 */
225 int kernel_consumer_send_channel_stream(int sock,
226 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
227 {
228 int ret;
229 struct ltt_kernel_stream *stream;
230
231 /* Safety net */
232 assert(channel);
233 assert(session);
234 assert(session->consumer);
235
236 /* Bail out if consumer is disabled */
237 if (!session->consumer->enabled) {
238 ret = LTTNG_OK;
239 goto error;
240 }
241
242 DBG("Sending streams of channel %s to kernel consumer",
243 channel->channel->name);
244
245 ret = kernel_consumer_add_channel(sock, channel);
246 if (ret < 0) {
247 goto error;
248 }
249
250 /* Send streams */
251 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
252 if (!stream->fd) {
253 continue;
254 }
255
256 /* Add stream on the kernel consumer side. */
257 ret = kernel_consumer_add_stream(sock, channel, stream, session);
258 if (ret < 0) {
259 goto error;
260 }
261 }
262
263 error:
264 return ret;
265 }
266
267 /*
268 * Send all stream fds of the kernel session to the consumer.
269 */
270 int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
271 {
272 int ret;
273 struct ltt_kernel_channel *chan;
274
275 /* Safety net */
276 assert(session);
277 assert(session->consumer);
278
279 /* Bail out if consumer is disabled */
280 if (!session->consumer->enabled) {
281 ret = LTTNG_OK;
282 goto error;
283 }
284
285 DBG("Sending session stream to kernel consumer");
286
287 if (session->metadata_stream_fd >= 0) {
288 ret = kernel_consumer_add_metadata(sock, session);
289 if (ret < 0) {
290 goto error;
291 }
292
293 /* Flag that at least the metadata has been sent to the consumer. */
294 session->consumer_fds_sent = 1;
295 }
296
297 /* Send channel and streams of it */
298 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
299 ret = kernel_consumer_send_channel_stream(sock, chan, session);
300 if (ret < 0) {
301 goto error;
302 }
303 }
304
305 DBG("Kernel consumer FDs of metadata and channel streams sent");
306
307 return 0;
308
309 error:
310 return ret;
311 }
This page took 0.036426 seconds and 5 git commands to generate.