ba74112fa72a35758cde4006320dbbfbfd15ce37
[lttng-tools.git] / src / bin / lttng-sessiond / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - David Goulet <david.goulet@polymtl.ca>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2 only,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <errno.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <unistd.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/consumer.h>
28 #include <common/defaults.h>
29
30 #include "consumer.h"
31 #include "health.h"
32 #include "ust-consumer.h"
33
34 /*
35 * Return allocated full pathname of the session using the consumer trace path
36 * and subdir if available. On a successful allocation, the directory of the
37 * trace is created with the session credentials.
38 *
39 * The caller can safely free(3) the returned value. On error, NULL is
40 * returned.
41 */
42 static char *setup_trace_path(struct consumer_output *consumer,
43 struct ust_app_session *ua_sess)
44 {
45 int ret;
46 char *pathname;
47
48 assert(consumer);
49 assert(ua_sess);
50
51 health_code_update();
52
53 /* Allocate our self the string to make sure we never exceed PATH_MAX. */
54 pathname = zmalloc(PATH_MAX);
55 if (!pathname) {
56 goto error;
57 }
58
59 /* Get correct path name destination */
60 if (consumer->type == CONSUMER_DST_LOCAL) {
61 /* Set application path to the destination path */
62 ret = snprintf(pathname, PATH_MAX, "%s/%s/%s",
63 consumer->dst.trace_path, consumer->subdir, ua_sess->path);
64 if (ret < 0) {
65 PERROR("snprintf channel path");
66 goto error;
67 }
68
69 /* Create directory. Ignore if exist. */
70 ret = run_as_mkdir_recursive(pathname, S_IRWXU | S_IRWXG,
71 ua_sess->euid, ua_sess->egid);
72 if (ret < 0) {
73 if (ret != -EEXIST) {
74 ERR("Trace directory creation error");
75 goto error;
76 }
77 }
78 } else {
79 ret = snprintf(pathname, PATH_MAX, "%s/%s", consumer->subdir,
80 ua_sess->path);
81 if (ret < 0) {
82 PERROR("snprintf channel path");
83 goto error;
84 }
85 }
86
87 return pathname;
88
89 error:
90 free(pathname);
91 return NULL;
92 }
93
94 /*
95 * Send a single channel to the consumer using command ADD_CHANNEL.
96 *
97 * Consumer socket lock MUST be acquired before calling this.
98 */
99 static int ask_channel_creation(struct ust_app_session *ua_sess,
100 struct ust_app_channel *ua_chan, struct consumer_output *consumer,
101 struct consumer_socket *socket, struct ust_registry_session *registry)
102 {
103 int ret;
104 uint32_t chan_id;
105 uint64_t key, chan_reg_key;
106 char *pathname = NULL;
107 struct lttcomm_consumer_msg msg;
108 struct ust_registry_channel *chan_reg;
109
110 assert(ua_sess);
111 assert(ua_chan);
112 assert(socket);
113 assert(consumer);
114 assert(registry);
115
116 DBG2("Asking UST consumer for channel");
117
118 /* Get and create full trace path of session. */
119 pathname = setup_trace_path(consumer, ua_sess);
120 if (!pathname) {
121 ret = -1;
122 goto error;
123 }
124
125 /* Depending on the buffer type, a different channel key is used. */
126 if (ua_sess->buffer_type == LTTNG_BUFFER_PER_UID) {
127 chan_reg_key = ua_chan->tracing_channel_id;
128 } else {
129 chan_reg_key = ua_chan->key;
130 }
131
132 if (ua_chan->attr.type == LTTNG_UST_CHAN_METADATA) {
133 chan_id = -1U;
134 } else {
135 chan_reg = ust_registry_channel_find(registry, chan_reg_key);
136 assert(chan_reg);
137 chan_id = chan_reg->chan_id;
138 }
139
140 consumer_init_ask_channel_comm_msg(&msg,
141 ua_chan->attr.subbuf_size,
142 ua_chan->attr.num_subbuf,
143 ua_chan->attr.overwrite,
144 ua_chan->attr.switch_timer_interval,
145 ua_chan->attr.read_timer_interval,
146 (int) ua_chan->attr.output,
147 (int) ua_chan->attr.type,
148 ua_sess->tracing_id,
149 pathname,
150 ua_chan->name,
151 ua_sess->euid,
152 ua_sess->egid,
153 consumer->net_seq_index,
154 ua_chan->key,
155 registry->uuid,
156 chan_id);
157
158 health_code_update();
159
160 ret = lttcomm_send_unix_sock(socket->fd, &msg, sizeof(msg));
161 if (ret < 0) {
162 goto error;
163 }
164
165 ret = consumer_recv_status_channel(socket, &key,
166 &ua_chan->expected_stream_count);
167 if (ret < 0) {
168 goto error;
169 }
170 /* Communication protocol error. */
171 assert(key == ua_chan->key);
172 /* We need at least one where 1 stream for 1 cpu. */
173 assert(ua_chan->expected_stream_count > 0);
174
175 DBG2("UST ask channel %" PRIu64 " successfully done with %u stream(s)", key,
176 ua_chan->expected_stream_count);
177
178 error:
179 free(pathname);
180 health_code_update();
181 return ret;
182 }
183
184 /*
185 * Ask consumer to create a channel for a given session.
186 *
187 * Returns 0 on success else a negative value.
188 */
189 int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
190 struct ust_app_channel *ua_chan, struct consumer_output *consumer,
191 struct consumer_socket *socket, struct ust_registry_session *registry)
192 {
193 int ret;
194
195 assert(ua_sess);
196 assert(ua_chan);
197 assert(consumer);
198 assert(socket);
199 assert(socket->fd >= 0);
200 assert(registry);
201
202 pthread_mutex_lock(socket->lock);
203
204 ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
205 if (ret < 0) {
206 goto error;
207 }
208
209 error:
210 pthread_mutex_unlock(socket->lock);
211 return ret;
212 }
213
214 /*
215 * Send a get channel command to consumer using the given channel key. The
216 * channel object is populated and the stream list.
217 *
218 * Return 0 on success else a negative value.
219 */
220 int ust_consumer_get_channel(struct consumer_socket *socket,
221 struct ust_app_channel *ua_chan)
222 {
223 int ret;
224 struct lttcomm_consumer_msg msg;
225
226 assert(ua_chan);
227 assert(socket);
228 assert(socket->fd >= 0);
229
230 msg.cmd_type = LTTNG_CONSUMER_GET_CHANNEL;
231 msg.u.get_channel.key = ua_chan->key;
232
233 pthread_mutex_lock(socket->lock);
234 health_code_update();
235
236 /* Send command and wait for OK reply. */
237 ret = consumer_send_msg(socket, &msg);
238 if (ret < 0) {
239 goto error;
240 }
241
242 /* First, get the channel from consumer. */
243 ret = ustctl_recv_channel_from_consumer(socket->fd, &ua_chan->obj);
244 if (ret < 0) {
245 if (ret != -EPIPE) {
246 ERR("Error recv channel from consumer %d with ret %d",
247 socket->fd, ret);
248 } else {
249 DBG3("UST app recv channel from consumer. Consumer is dead.");
250 }
251 goto error;
252 }
253
254 /* Next, get all streams. */
255 while (1) {
256 struct ust_app_stream *stream;
257
258 /* Create UST stream */
259 stream = ust_app_alloc_stream();
260 if (stream == NULL) {
261 ret = -ENOMEM;
262 goto error;
263 }
264
265 /* Stream object is populated by this call if successful. */
266 ret = ustctl_recv_stream_from_consumer(socket->fd, &stream->obj);
267 if (ret < 0) {
268 free(stream);
269 if (ret == -LTTNG_UST_ERR_NOENT) {
270 DBG3("UST app consumer has no more stream available");
271 ret = 0;
272 break;
273 }
274 if (ret != -EPIPE) {
275 ERR("Recv stream from consumer %d with ret %d",
276 socket->fd, ret);
277 } else {
278 DBG3("UST app recv stream from consumer. Consumer is dead.");
279 }
280 goto error;
281 }
282
283 /* Order is important this is why a list is used. */
284 cds_list_add_tail(&stream->list, &ua_chan->streams.head);
285 ua_chan->streams.count++;
286
287 DBG2("UST app stream %d received succesfully", ua_chan->streams.count);
288 }
289
290 /* This MUST match or else we have a synchronization problem. */
291 assert(ua_chan->expected_stream_count == ua_chan->streams.count);
292
293 /* Wait for confirmation that we can proceed with the streams. */
294 ret = consumer_recv_status_reply(socket);
295 if (ret < 0) {
296 goto error;
297 }
298
299 error:
300 health_code_update();
301 pthread_mutex_unlock(socket->lock);
302 return ret;
303 }
304
305 /*
306 * Send a destroy channel command to consumer using the given channel key.
307 *
308 * Note that this command MUST be used prior to a successful
309 * LTTNG_CONSUMER_GET_CHANNEL because once this command is done successfully,
310 * the streams are dispatched to the consumer threads and MUST be teardown
311 * through the hang up process.
312 *
313 * Return 0 on success else a negative value.
314 */
315 int ust_consumer_destroy_channel(struct consumer_socket *socket,
316 struct ust_app_channel *ua_chan)
317 {
318 int ret;
319 struct lttcomm_consumer_msg msg;
320
321 assert(ua_chan);
322 assert(socket);
323 assert(socket->fd >= 0);
324
325 msg.cmd_type = LTTNG_CONSUMER_DESTROY_CHANNEL;
326 msg.u.destroy_channel.key = ua_chan->key;
327
328 pthread_mutex_lock(socket->lock);
329 health_code_update();
330
331 ret = consumer_send_msg(socket, &msg);
332 if (ret < 0) {
333 goto error;
334 }
335
336 error:
337 health_code_update();
338 pthread_mutex_unlock(socket->lock);
339 return ret;
340 }
341
342 /*
343 * Send a given stream to UST tracer.
344 *
345 * On success return 0 else a negative value.
346 */
347 int ust_consumer_send_stream_to_ust(struct ust_app *app,
348 struct ust_app_channel *channel, struct ust_app_stream *stream)
349 {
350 int ret;
351
352 assert(app);
353 assert(stream);
354 assert(channel);
355
356 DBG2("UST consumer send stream to app %d", app->sock);
357
358 /* Relay stream to application. */
359 ret = ustctl_send_stream_to_ust(app->sock, channel->obj, stream->obj);
360 if (ret < 0) {
361 if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
362 ERR("Error ustctl send stream %s to app pid: %d with ret %d",
363 stream->name, app->pid, ret);
364 } else {
365 DBG3("UST app send stream to ust failed. Application is dead.");
366 }
367 goto error;
368 }
369 channel->handle = channel->obj->handle;
370
371 error:
372 return ret;
373 }
374
375 /*
376 * Send channel previously received from the consumer to the UST tracer.
377 *
378 * On success return 0 else a negative value.
379 */
380 int ust_consumer_send_channel_to_ust(struct ust_app *app,
381 struct ust_app_session *ua_sess, struct ust_app_channel *channel)
382 {
383 int ret;
384
385 assert(app);
386 assert(ua_sess);
387 assert(channel);
388 assert(channel->obj);
389
390 DBG2("UST app send channel to sock %d pid %d (name: %s, key: %" PRIu64 ")",
391 app->sock, app->pid, channel->name, channel->tracing_channel_id);
392
393 /* Send stream to application. */
394 ret = ustctl_send_channel_to_ust(app->sock, ua_sess->handle, channel->obj);
395 if (ret < 0) {
396 if (ret != -EPIPE && ret != -LTTNG_UST_ERR_EXITING) {
397 ERR("Error ustctl send channel %s to app pid: %d with ret %d",
398 channel->name, app->pid, ret);
399 } else {
400 DBG3("UST app send channel to ust failed. Application is dead.");
401 }
402 goto error;
403 }
404
405 error:
406 return ret;
407 }
This page took 0.036436 seconds and 3 git commands to generate.