Make the consumer sends a ACK after each command
[lttng-tools.git] / src / bin / lttng-sessiond / kernel-consumer.c
CommitLineData
f1e16794
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _GNU_SOURCE
19#include <stdio.h>
20#include <stdlib.h>
21#include <string.h>
22#include <sys/stat.h>
23#include <unistd.h>
24
25#include <common/common.h>
26#include <common/defaults.h>
f1e16794 27
00e2e675 28#include "consumer.h"
f1e16794
DG
29#include "kernel-consumer.h"
30
00e2e675
DG
31/*
32 * Sending a single channel to the consumer with command ADD_CHANNEL.
33 */
f50f23d9
DG
34int kernel_consumer_add_channel(struct consumer_socket *sock,
35 struct ltt_kernel_channel *channel)
00e2e675
DG
36{
37 int ret;
38 struct lttcomm_consumer_msg lkm;
39
40 /* Safety net */
41 assert(channel);
42
43 DBG("Kernel consumer adding channel %s to kernel consumer",
44 channel->channel->name);
45
46 /* Prep channel message structure */
47 consumer_init_channel_comm_msg(&lkm,
48 LTTNG_CONSUMER_ADD_CHANNEL,
49 channel->fd,
50 channel->channel->attr.subbuf_size,
51 0, /* Kernel */
c30aaa51
MD
52 channel->channel->name,
53 channel->stream_count);
00e2e675
DG
54
55 ret = consumer_send_channel(sock, &lkm);
56 if (ret < 0) {
57 goto error;
58 }
59
60error:
61 return ret;
62}
63
64/*
65 * Sending metadata to the consumer with command ADD_CHANNEL and ADD_STREAM.
66 */
f50f23d9
DG
67int kernel_consumer_add_metadata(struct consumer_socket *sock,
68 struct ltt_kernel_session *session)
00e2e675
DG
69{
70 int ret;
a7d9a3e7 71 char tmp_path[PATH_MAX];
00e2e675
DG
72 const char *pathname;
73 struct lttcomm_consumer_msg lkm;
a7d9a3e7 74 struct consumer_output *consumer;
00e2e675
DG
75
76 /* Safety net */
77 assert(session);
78 assert(session->consumer);
f50f23d9 79 assert(sock);
00e2e675
DG
80
81 DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
82
83 /* Get consumer output pointer */
a7d9a3e7 84 consumer = session->consumer;
00e2e675 85
a7d9a3e7
DG
86 /* Get the right path name destination */
87 if (consumer->type == CONSUMER_DST_LOCAL) {
88 /* Set application path to the destination path */
89 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
90 consumer->dst.trace_path, consumer->subdir);
91 if (ret < 0) {
92 PERROR("snprintf metadata path");
93 goto error;
94 }
95 pathname = tmp_path;
96
97 /* Create directory */
98 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
99 session->uid, session->gid);
100 if (ret < 0) {
101 if (ret != -EEXIST) {
102 ERR("Trace directory creation error");
103 goto error;
104 }
105 }
106 DBG3("Kernel local consumer tracefile path: %s", pathname);
00e2e675 107 } else {
a7d9a3e7
DG
108 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
109 if (ret < 0) {
110 PERROR("snprintf metadata path");
111 goto error;
112 }
113 pathname = tmp_path;
114 DBG3("Kernel network consumer subdir path: %s", pathname);
00e2e675
DG
115 }
116
117 /* Prep channel message structure */
118 consumer_init_channel_comm_msg(&lkm,
119 LTTNG_CONSUMER_ADD_CHANNEL,
120 session->metadata->fd,
121 session->metadata->conf->attr.subbuf_size,
122 0, /* for kernel */
c30aaa51
MD
123 "metadata",
124 1);
00e2e675
DG
125
126 ret = consumer_send_channel(sock, &lkm);
127 if (ret < 0) {
128 goto error;
129 }
130
131 /* Prep stream message structure */
132 consumer_init_stream_comm_msg(&lkm,
133 LTTNG_CONSUMER_ADD_STREAM,
134 session->metadata->fd,
135 session->metadata_stream_fd,
136 LTTNG_CONSUMER_ACTIVE_STREAM,
137 DEFAULT_KERNEL_CHANNEL_OUTPUT,
138 0, /* Kernel */
139 session->uid,
140 session->gid,
a7d9a3e7 141 consumer->net_seq_index,
00e2e675
DG
142 1, /* Metadata flag set */
143 "metadata",
ca22feea
DG
144 pathname,
145 session->id);
00e2e675
DG
146
147 /* Send stream and file descriptor */
a7d9a3e7 148 ret = consumer_send_stream(sock, consumer, &lkm,
00e2e675
DG
149 &session->metadata_stream_fd, 1);
150 if (ret < 0) {
151 goto error;
152 }
153
154error:
155 return ret;
156}
157
158/*
159 * Sending a single stream to the consumer with command ADD_STREAM.
160 */
f50f23d9
DG
161int kernel_consumer_add_stream(struct consumer_socket *sock,
162 struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
163 struct ltt_kernel_session *session)
00e2e675
DG
164{
165 int ret;
a7d9a3e7 166 char tmp_path[PATH_MAX];
00e2e675
DG
167 const char *pathname;
168 struct lttcomm_consumer_msg lkm;
a7d9a3e7 169 struct consumer_output *consumer;
00e2e675
DG
170
171 assert(channel);
172 assert(stream);
173 assert(session);
174 assert(session->consumer);
f50f23d9 175 assert(sock);
00e2e675
DG
176
177 DBG("Sending stream %d of channel %s to kernel consumer",
178 stream->fd, channel->channel->name);
179
180 /* Get consumer output pointer */
a7d9a3e7 181 consumer = session->consumer;
00e2e675 182
a7d9a3e7
DG
183 /* Get the right path name destination */
184 if (consumer->type == CONSUMER_DST_LOCAL) {
185 /* Set application path to the destination path */
186 ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
187 consumer->dst.trace_path, consumer->subdir);
188 if (ret < 0) {
189 PERROR("snprintf stream path");
190 goto error;
191 }
192 pathname = tmp_path;
193 DBG3("Kernel local consumer tracefile path: %s", pathname);
00e2e675 194 } else {
a7d9a3e7
DG
195 ret = snprintf(tmp_path, sizeof(tmp_path), "%s", consumer->subdir);
196 if (ret < 0) {
197 PERROR("snprintf stream path");
198 goto error;
199 }
200 pathname = tmp_path;
201 DBG3("Kernel network consumer subdir path: %s", pathname);
00e2e675
DG
202 }
203
204 /* Prep stream consumer message */
205 consumer_init_stream_comm_msg(&lkm, LTTNG_CONSUMER_ADD_STREAM,
206 channel->fd,
207 stream->fd,
208 stream->state,
209 channel->channel->attr.output,
210 0, /* Kernel */
211 session->uid,
212 session->gid,
a7d9a3e7 213 consumer->net_seq_index,
00e2e675
DG
214 0, /* Metadata flag unset */
215 stream->name,
ca22feea
DG
216 pathname,
217 session->id);
00e2e675
DG
218
219 /* Send stream and file descriptor */
a7d9a3e7 220 ret = consumer_send_stream(sock, consumer, &lkm, &stream->fd, 1);
00e2e675
DG
221 if (ret < 0) {
222 goto error;
223 }
224
225error:
226 return ret;
227}
228
f1e16794
DG
229/*
230 * Send all stream fds of kernel channel to the consumer.
231 */
f50f23d9 232int kernel_consumer_send_channel_stream(struct consumer_socket *sock,
00e2e675 233 struct ltt_kernel_channel *channel, struct ltt_kernel_session *session)
f1e16794 234{
00e2e675 235 int ret;
f1e16794 236 struct ltt_kernel_stream *stream;
00e2e675
DG
237
238 /* Safety net */
239 assert(channel);
240 assert(session);
241 assert(session->consumer);
f50f23d9 242 assert(sock);
00e2e675
DG
243
244 /* Bail out if consumer is disabled */
245 if (!session->consumer->enabled) {
f73fabfd 246 ret = LTTNG_OK;
00e2e675
DG
247 goto error;
248 }
f1e16794
DG
249
250 DBG("Sending streams of channel %s to kernel consumer",
251 channel->channel->name);
252
00e2e675 253 ret = kernel_consumer_add_channel(sock, channel);
f1e16794 254 if (ret < 0) {
f1e16794
DG
255 goto error;
256 }
257
258 /* Send streams */
259 cds_list_for_each_entry(stream, &channel->stream_list.head, list) {
260 if (!stream->fd) {
261 continue;
262 }
00e2e675
DG
263
264 /* Add stream on the kernel consumer side. */
265 ret = kernel_consumer_add_stream(sock, channel, stream, session);
f1e16794 266 if (ret < 0) {
f1e16794
DG
267 goto error;
268 }
269 }
270
f1e16794
DG
271error:
272 return ret;
273}
274
275/*
276 * Send all stream fds of the kernel session to the consumer.
277 */
f50f23d9
DG
278int kernel_consumer_send_session(struct consumer_socket *sock,
279 struct ltt_kernel_session *session)
f1e16794
DG
280{
281 int ret;
282 struct ltt_kernel_channel *chan;
f1e16794 283
00e2e675
DG
284 /* Safety net */
285 assert(session);
286 assert(session->consumer);
f50f23d9 287 assert(sock);
f1e16794 288
00e2e675
DG
289 /* Bail out if consumer is disabled */
290 if (!session->consumer->enabled) {
f73fabfd 291 ret = LTTNG_OK;
00e2e675 292 goto error;
f1e16794
DG
293 }
294
00e2e675
DG
295 DBG("Sending session stream to kernel consumer");
296
f1e16794 297 if (session->metadata_stream_fd >= 0) {
00e2e675 298 ret = kernel_consumer_add_metadata(sock, session);
f1e16794 299 if (ret < 0) {
f1e16794
DG
300 goto error;
301 }
302
00e2e675
DG
303 /* Flag that at least the metadata has been sent to the consumer. */
304 session->consumer_fds_sent = 1;
f1e16794
DG
305 }
306
00e2e675 307 /* Send channel and streams of it */
f1e16794 308 cds_list_for_each_entry(chan, &session->channel_list.head, list) {
00e2e675 309 ret = kernel_consumer_send_channel_stream(sock, chan, session);
f1e16794
DG
310 if (ret < 0) {
311 goto error;
312 }
313 }
314
00e2e675 315 DBG("Kernel consumer FDs of metadata and channel streams sent");
f1e16794
DG
316
317 return 0;
318
319error:
320 return ret;
321}
This page took 0.038304 seconds and 4 git commands to generate.