Relayd data available command support
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _GNU_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/sessiond-comm/relayd.h>
29
30 #include "relayd.h"
31
32 /*
33 * Send command. Fill up the header and append the data.
34 */
35 static int send_command(struct lttcomm_sock *sock,
36 enum lttcomm_sessiond_command cmd, void *data, size_t size,
37 int flags)
38 {
39 int ret;
40 struct lttcomm_relayd_hdr header;
41 char *buf;
42 uint64_t buf_size = sizeof(header);
43
44 if (data) {
45 buf_size += size;
46 }
47
48 buf = zmalloc(buf_size);
49 if (buf == NULL) {
50 PERROR("zmalloc relayd send command buf");
51 ret = -1;
52 goto alloc_error;
53 }
54
55 header.cmd = htobe32(cmd);
56 header.data_size = htobe64(size);
57
58 /* Zeroed for now since not used. */
59 header.cmd_version = 0;
60 header.circuit_id = 0;
61
62 /* Prepare buffer to send. */
63 memcpy(buf, &header, sizeof(header));
64 if (data) {
65 memcpy(buf + sizeof(header), data, size);
66 }
67
68 ret = sock->ops->sendmsg(sock, buf, buf_size, flags);
69 if (ret < 0) {
70 goto error;
71 }
72
73 DBG3("Relayd sending command %d of size %" PRIu64, cmd, buf_size);
74
75 error:
76 free(buf);
77 alloc_error:
78 return ret;
79 }
80
81 /*
82 * Receive reply data on socket. This MUST be call after send_command or else
83 * could result in unexpected behavior(s).
84 */
85 static int recv_reply(struct lttcomm_sock *sock, void *data, size_t size)
86 {
87 int ret;
88
89 DBG3("Relayd waiting for reply of size %ld", size);
90
91 ret = sock->ops->recvmsg(sock, data, size, 0);
92 if (ret < 0) {
93 goto error;
94 }
95
96 error:
97 return ret;
98 }
99
100 /*
101 * Add stream on the relayd and assign stream handle to the stream_id argument.
102 *
103 * On success return 0 else return ret_code negative value.
104 */
105 int relayd_add_stream(struct lttcomm_sock *sock, const char *channel_name,
106 const char *pathname, uint64_t *stream_id)
107 {
108 int ret;
109 struct lttcomm_relayd_add_stream msg;
110 struct lttcomm_relayd_status_stream reply;
111
112 /* Code flow error. Safety net. */
113 assert(sock);
114 assert(channel_name);
115 assert(pathname);
116
117 DBG("Relayd adding stream for channel name %s", channel_name);
118
119 strncpy(msg.channel_name, channel_name, sizeof(msg.channel_name));
120 strncpy(msg.pathname, pathname, sizeof(msg.pathname));
121
122 /* Send command */
123 ret = send_command(sock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
124 if (ret < 0) {
125 goto error;
126 }
127
128 /* Waiting for reply */
129 ret = recv_reply(sock, (void *) &reply, sizeof(reply));
130 if (ret < 0) {
131 goto error;
132 }
133
134 /* Back to host bytes order. */
135 reply.handle = be64toh(reply.handle);
136 reply.ret_code = be32toh(reply.ret_code);
137
138 /* Return session id or negative ret code. */
139 if (reply.ret_code != LTTNG_OK) {
140 ret = -reply.ret_code;
141 ERR("Relayd add stream replied error %d", ret);
142 } else {
143 /* Success */
144 ret = 0;
145 *stream_id = reply.handle;
146 }
147
148 DBG("Relayd stream added successfully with handle %" PRIu64,
149 reply.handle);
150
151 error:
152 return ret;
153 }
154
155 /*
156 * Check version numbers on the relayd.
157 *
158 * Return 0 if compatible else negative value.
159 */
160 int relayd_version_check(struct lttcomm_sock *sock, uint32_t major,
161 uint32_t minor)
162 {
163 int ret;
164 struct lttcomm_relayd_version reply;
165
166 /* Code flow error. Safety net. */
167 assert(sock);
168
169 DBG("Relayd version check for major.minor %u.%u", major, minor);
170
171 /* Send command */
172 ret = send_command(sock, RELAYD_VERSION, NULL, 0, 0);
173 if (ret < 0) {
174 goto error;
175 }
176
177 /* Recevie response */
178 ret = recv_reply(sock, (void *) &reply, sizeof(reply));
179 if (ret < 0) {
180 goto error;
181 }
182
183 /* Set back to host bytes order */
184 reply.major = be32toh(reply.major);
185 reply.minor = be32toh(reply.minor);
186
187 /* Validate version */
188 if (reply.major <= major) {
189 if (reply.minor <= minor) {
190 /* Compatible */
191 ret = 0;
192 DBG2("Relayd version is compatible");
193 goto error;
194 }
195 }
196
197 /* Version number not compatible */
198 DBG2("Relayd version is NOT compatible %u.%u > %u.%u", reply.major,
199 reply.minor, major, minor);
200 ret = -1;
201
202 error:
203 return ret;
204 }
205
206 /*
207 * Add stream on the relayd and assign stream handle to the stream_id argument.
208 *
209 * On success return 0 else return ret_code negative value.
210 */
211 int relayd_send_metadata(struct lttcomm_sock *sock, size_t len)
212 {
213 int ret;
214
215 /* Code flow error. Safety net. */
216 assert(sock);
217
218 DBG("Relayd sending metadata of size %zu", len);
219
220 /* Send command */
221 ret = send_command(sock, RELAYD_SEND_METADATA, NULL, len, 0);
222 if (ret < 0) {
223 goto error;
224 }
225
226 DBG2("Relayd metadata added successfully");
227
228 /*
229 * After that call, the metadata data MUST be sent to the relayd so the
230 * receive size on the other end matches the len of the metadata packet
231 * header. This is why we don't wait for a reply here.
232 */
233
234 error:
235 return ret;
236 }
237
238 /*
239 * Connect to relay daemon with an allocated lttcomm_sock.
240 */
241 int relayd_connect(struct lttcomm_sock *sock)
242 {
243 /* Code flow error. Safety net. */
244 assert(sock);
245
246 DBG3("Relayd connect ...");
247
248 return sock->ops->connect(sock);
249 }
250
251 /*
252 * Close relayd socket with an allocated lttcomm_sock.
253 */
254 int relayd_close(struct lttcomm_sock *sock)
255 {
256 /* Code flow error. Safety net. */
257 assert(sock);
258
259 DBG3("Relayd closing socket %d", sock->fd);
260
261 return sock->ops->close(sock);
262 }
263
264 /*
265 * Send data header structure to the relayd.
266 */
267 int relayd_send_data_hdr(struct lttcomm_sock *sock,
268 struct lttcomm_relayd_data_hdr *hdr, size_t size)
269 {
270 int ret;
271
272 /* Code flow error. Safety net. */
273 assert(sock);
274 assert(hdr);
275
276 DBG3("Relayd sending data header of size %ld", size);
277
278 /* Again, safety net */
279 if (size == 0) {
280 size = sizeof(struct lttcomm_relayd_data_hdr);
281 }
282
283 /* Only send data header. */
284 ret = sock->ops->sendmsg(sock, hdr, size, 0);
285 if (ret < 0) {
286 goto error;
287 }
288
289 /*
290 * The data MUST be sent right after that command for the receive on the
291 * other end to match the size in the header.
292 */
293
294 error:
295 return ret;
296 }
297
298 /*
299 * Send close stream command to the relayd.
300 */
301 int relayd_send_close_stream(struct lttcomm_sock *sock, uint64_t stream_id,
302 uint64_t last_net_seq_num)
303 {
304 int ret;
305 struct lttcomm_relayd_close_stream msg;
306 struct lttcomm_relayd_generic_reply reply;
307
308 /* Code flow error. Safety net. */
309 assert(sock);
310
311 DBG("Relayd closing stream id %" PRIu64, stream_id);
312
313 msg.stream_id = htobe64(stream_id);
314 msg.last_net_seq_num = htobe64(last_net_seq_num);
315
316 /* Send command */
317 ret = send_command(sock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
318 if (ret < 0) {
319 goto error;
320 }
321
322 /* Recevie response */
323 ret = recv_reply(sock, (void *) &reply, sizeof(reply));
324 if (ret < 0) {
325 goto error;
326 }
327
328 reply.ret_code = be32toh(reply.ret_code);
329
330 /* Return session id or negative ret code. */
331 if (reply.ret_code != LTTNG_OK) {
332 ret = -reply.ret_code;
333 ERR("Relayd close stream replied error %d", ret);
334 } else {
335 /* Success */
336 ret = 0;
337 }
338
339 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
340
341 error:
342 return ret;
343 }
344
345 /*
346 * Check for data availability for a given stream id.
347 *
348 * Return 0 if NOT available, 1 if so and a negative value on error.
349 */
350 int relayd_data_available(struct lttcomm_sock *sock, uint64_t stream_id,
351 uint64_t last_net_seq_num)
352 {
353 int ret;
354 struct lttcomm_relayd_data_available msg;
355 struct lttcomm_relayd_generic_reply reply;
356
357 /* Code flow error. Safety net. */
358 assert(sock);
359
360 DBG("Relayd data available for stream id %" PRIu64, stream_id);
361
362 msg.stream_id = htobe64(stream_id);
363 msg.last_net_seq_num = htobe64(last_net_seq_num);
364
365 /* Send command */
366 ret = send_command(sock, RELAYD_DATA_AVAILABLE, (void *) &msg,
367 sizeof(msg), 0);
368 if (ret < 0) {
369 goto error;
370 }
371
372 /* Recevie response */
373 ret = recv_reply(sock, (void *) &reply, sizeof(reply));
374 if (ret < 0) {
375 goto error;
376 }
377
378 reply.ret_code = be32toh(reply.ret_code);
379
380 /* Return session id or negative ret code. */
381 if (reply.ret_code >= LTTNG_OK) {
382 ret = -reply.ret_code;
383 ERR("Relayd data available replied error %d", ret);
384 }
385
386 /* At this point, the ret code is either 1 or 0 */
387 ret = reply.ret_code;
388
389 DBG("Relayd data is %s available for stream id %" PRIu64,
390 ret == 1 ? "" : "NOT", stream_id);
391
392 error:
393 return ret;
394 }
395
396 /*
397 * Check on the relayd side for a quiescent state on the control socket.
398 */
399 int relayd_quiescent_control(struct lttcomm_sock *sock)
400 {
401 int ret;
402 struct lttcomm_relayd_generic_reply reply;
403
404 /* Code flow error. Safety net. */
405 assert(sock);
406
407 DBG("Relayd checking quiescent control state");
408
409 /* Send command */
410 ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0);
411 if (ret < 0) {
412 goto error;
413 }
414
415 /* Recevie response */
416 ret = recv_reply(sock, (void *) &reply, sizeof(reply));
417 if (ret < 0) {
418 goto error;
419 }
420
421 reply.ret_code = be32toh(reply.ret_code);
422
423 /* Return session id or negative ret code. */
424 if (reply.ret_code != LTTNG_OK) {
425 ret = -reply.ret_code;
426 ERR("Relayd quiescent control replied error %d", ret);
427 goto error;
428 }
429
430 /* Control socket is quiescent */
431 return 1;
432
433 error:
434 return ret;
435 }
This page took 0.055823 seconds and 4 git commands to generate.