Network streaming support
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/stat.h>
23 #include <unistd.h>
24
25 #include <common/common.h>
26 #include <common/defaults.h>
27
28 #include "consumer.h"
29 #include "kernel-consumer.h"
30
31 /*
32 * Sending a single channel to the consumer with command ADD_CHANNEL.
33 */
34 int kernel_consumer_add_channel(int sock, struct ltt_kernel_channel *channel)
35 {
36 int ret;
37 struct lttcomm_consumer_msg lkm;
38
39 /* Safety net */
40 assert(channel);
41
42 DBG("Kernel consumer adding channel %s to kernel consumer",
43 channel->channel->name);
44
45 /* Prep channel message structure */
46 consumer_init_channel_comm_msg(&lkm,
47 LTTNG_CONSUMER_ADD_CHANNEL,
48 channel->fd,
49 channel->channel->attr.subbuf_size,
50 0, /* Kernel */
51 channel->channel->name);
52
53 ret = consumer_send_channel(sock, &lkm);
54 if (ret < 0) {
55 goto error;
56 }
57
58 error:
59 return ret;
60 }
61
62 /*
63 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
64 */
65 int kernel_consumer_add_metadata(int sock, struct ltt_kernel_session *session)
66 {
67 int ret;
68 const char *pathname;
69 struct lttcomm_consumer_msg lkm;
70 struct consumer_output *output;
71
72 /* Safety net */
73 assert(session);
74 assert(session->consumer);
75
76 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
77
78 /* Get consumer output pointer */
79 output = session->consumer;
80
81 /* Get correct path name destination */
82 if (output->type == CONSUMER_DST_LOCAL) {
83 pathname = output->dst.trace_path;
84 } else {
85 pathname = output->subdir;
86 }
87
88 /* Prep channel message structure */
89 consumer_init_channel_comm_msg(&lkm,
90 LTTNG_CONSUMER_ADD_CHANNEL,
91 session->metadata->fd,
92 session->metadata->conf->attr.subbuf_size,
93 0, /* for kernel */
94 "metadata");
95
96 ret = consumer_send_channel(sock, &lkm);
97 if (ret < 0) {
98 goto error;
99 }
100
101 /* Prep stream message structure */
102 consumer_init_stream_comm_msg(&lkm,
103 LTTNG_CONSUMER_ADD_STREAM,
104 session->metadata->fd,
105 session->metadata_stream_fd,
106 LTTNG_CONSUMER_ACTIVE_STREAM,
107 DEFAULT_KERNEL_CHANNEL_OUTPUT,
108 0, /* Kernel */
109 session->uid,
110 session->gid,
111 output->net_seq_index,
112 1, /* Metadata flag set */
113 "metadata",
114 pathname);
115
116 /* Send stream and file descriptor */
117 ret = consumer_send_stream(sock, output, &lkm,
118 &session->metadata_stream_fd, 1);
119 if (ret < 0) {
120 goto error;
121 }
122
123 error:
124 return ret;
125 }
126
127 /*
128 * Sending a single stream to the consumer with command ADD_STREAM.
129 */
130 int kernel_consumer_add_stream(int sock, struct ltt_kernel_channel *channel,
131 struct ltt_kernel_stream *stream, struct ltt_kernel_session *session)
132 {
133 int ret;
134 const char *pathname;
135 struct lttcomm_consumer_msg lkm;
136 struct consumer_output *output;
137
138 assert(channel);
139 assert(stream);
140 assert(session);
141 assert(session->consumer);
142
143 DBG("Sending stream %d of channel %s to kernel consumer",
144 stream->fd, channel->channel->name);
145
146 /* Get consumer output pointer */
147 output = session->consumer;
148
149 /* Get correct path name destination */
150 if (output->type == CONSUMER_DST_LOCAL) {
151 pathname = output->dst.trace_path;
152 DBG3("Consumer is local to %s", pathname);
153 } else {
154 pathname = output->subdir;
155 DBG3("Consumer is network to subdir %s", pathname);
156 }
157
158 /* Prep stream consumer message */
159 consumer_init_stream_comm_msg(&lkm, LTTNG_CONSUMER_ADD_STREAM,
160 channel->fd,
161 stream->fd,
162 stream->state,
163 channel->channel->attr.output,
164 0, /* Kernel */
165 session->uid,
166 session->gid,
167 output->net_seq_index,
168 0, /* Metadata flag unset */
169 stream->name,
170 pathname);
171
172 /* Send stream and file descriptor */
173 ret = consumer_send_stream(sock, output, &lkm, &stream->fd, 1);
174 if (ret < 0) {
175 goto error;
176 }
177
178 error:
179 return ret;
180 }
181
182 /*
183 * Send all stream fds of kernel channel to the consumer.
184 */
185 int kernel_consumer_send_channel_stream(int sock,
186 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
187 {
188 int ret;
189 struct ltt_kernel_stream *stream;
190
191 /* Safety net */
192 assert(channel);
193 assert(session);
194 assert(session->consumer);
195
196 /* Bail out if consumer is disabled */
197 if (!session->consumer->enabled) {
198 ret = LTTCOMM_OK;
199 goto error;
200 }
201
202 DBG("Sending streams of channel %s to kernel consumer",
203 channel->channel->name);
204
205 ret = kernel_consumer_add_channel(sock, channel);
206 if (ret < 0) {
207 goto error;
208 }
209
210 /* Send streams */
211 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
212 if (!stream->fd) {
213 continue;
214 }
215
216 /* Add stream on the kernel consumer side. */
217 ret = kernel_consumer_add_stream(sock, channel, stream, session);
218 if (ret < 0) {
219 goto error;
220 }
221 }
222
223 error:
224 return ret;
225 }
226
227 /*
228 * Send all stream fds of the kernel session to the consumer.
229 */
230 int kernel_consumer_send_session(int sock, struct ltt_kernel_session *session)
231 {
232 int ret;
233 struct ltt_kernel_channel *chan;
234
235 /* Safety net */
236 assert(session);
237 assert(session->consumer);
238
239 /* Bail out if consumer is disabled */
240 if (!session->consumer->enabled) {
241 ret = LTTCOMM_OK;
242 goto error;
243 }
244
245 DBG("Sending session stream to kernel consumer");
246
247 if (session->metadata_stream_fd >= 0) {
248 ret = kernel_consumer_add_metadata(sock, session);
249 if (ret < 0) {
250 goto error;
251 }
252
253 /* Flag that at least the metadata has been sent to the consumer. */
254 session->consumer_fds_sent = 1;
255 }
256
257 /* Send channel and streams of it */
258 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
259 ret = kernel_consumer_send_channel_stream(sock, chan, session);
260 if (ret < 0) {
261 goto error;
262 }
263 }
264
265 DBG("Kernel consumer FDs of metadata and channel streams sent");
266
267 return 0;
268
269 error:
270 return ret;
271 }
272
273 /*
274 * Send relayd socket to consumer associated with a session name.
275 *
276 * On success return positive value. On error, negative value.
277 */
278 int kernel_consumer_send_relayd_socket(int consumer_sock,
279 struct lttcomm_sock *sock, struct consumer_output *consumer,
280 enum lttng_stream_type type)
281 {
282 int ret;
283 struct lttcomm_consumer_msg msg;
284
285 /* Code flow error. Safety net. */
286 assert(sock);
287 assert(consumer);
288
289 /* Bail out if consumer is disabled */
290 if (!consumer->enabled) {
291 ret = LTTCOMM_OK;
292 goto error;
293 }
294
295 msg.cmd_type = LTTNG_CONSUMER_ADD_RELAYD_SOCKET;
296 /*
297 * Assign network consumer output index using the temporary consumer since
298 * this call should only be made from within a set_consumer_uri() function
299 * call in the session daemon.
300 */
301 msg.u.relayd_sock.net_index = consumer->net_seq_index;
302 msg.u.relayd_sock.type = type;
303 memcpy(&msg.u.relayd_sock.sock, sock, sizeof(msg.u.relayd_sock.sock));
304
305 DBG2("Sending relayd sock info to consumer");
306 ret = lttcomm_send_unix_sock(consumer_sock, &msg, sizeof(msg));
307 if (ret < 0) {
308 PERROR("send consumer relayd socket info");
309 goto error;
310 }
311
312 DBG2("Sending relayd socket file descriptor to consumer");
313 ret = consumer_send_fds(consumer_sock, &sock->fd, 1);
314 if (ret < 0) {
315 goto error;
316 }
317
318 DBG("Kernel consumer relayd socket sent");
319
320 error:
321 return ret;
322 }
This page took 0.051459 seconds and 4 git commands to generate.