*
*/
+#include "lttng/action/action.h"
+#include "lttng/trigger/trigger-internal.h"
#define _LGPL_SOURCE
#include <urcu.h>
#include <urcu/rculfhash.h>
lttng_trigger_get_const_condition(list->trigger);
assert(!list->notification_trigger_clients_ht);
+ notification_client_list_get(list);
list->notification_trigger_clients_ht =
state->notification_trigger_clients_ht;
client->socket = -1;
}
client->communication.active = false;
- lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
- lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
+ lttng_payload_reset(&client->communication.inbound.payload);
+ lttng_payload_reset(&client->communication.outbound.payload);
pthread_mutex_destroy(&client->lock);
call_rcu(&client->rcu_node, free_notification_client_rcu);
}
cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
node) {
const struct lttng_condition *condition;
- const struct lttng_action *action;
struct lttng_trigger *trigger;
struct notification_client_list *client_list;
struct lttng_evaluation *evaluation = NULL;
enum lttng_condition_type condition_type;
- bool client_list_is_empty;
enum action_executor_status executor_status;
trigger = trigger_list_element->trigger;
continue;
}
- action = lttng_trigger_get_const_action(trigger);
-
- /* Notify actions are the only type currently supported. */
- assert(lttng_action_get_type_const(action) ==
- LTTNG_ACTION_TYPE_NOTIFY);
-
client_list = get_client_list_from_condition(state, condition);
- assert(client_list);
-
- pthread_mutex_lock(&client_list->lock);
- client_list_is_empty = cds_list_empty(&client_list->list);
- pthread_mutex_unlock(&client_list->lock);
- if (client_list_is_empty) {
- /*
- * No clients interested in the evaluation's result,
- * skip it.
- */
- continue;
- }
-
if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
evaluation = lttng_evaluation_session_rotation_ongoing_create(
trace_archive_chunk_id);
return ret;
}
+static
+bool is_trigger_action_notify(const struct lttng_trigger *trigger)
+{
+ bool is_notify = false;
+ unsigned int i, count;
+ enum lttng_action_status action_status;
+ const struct lttng_action *action =
+ lttng_trigger_get_const_action(trigger);
+ enum lttng_action_type action_type;
+
+ assert(action);
+ action_type = lttng_action_get_type_const(action);
+ if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
+ is_notify = true;
+ goto end;
+ } else if (action_type != LTTNG_ACTION_TYPE_GROUP) {
+ goto end;
+ }
+
+ action_status = lttng_action_group_get_count(action, &count);
+ assert(action_status == LTTNG_ACTION_STATUS_OK);
+
+ for (i = 0; i < count; i++) {
+ const struct lttng_action *inner_action =
+ lttng_action_group_get_at_index(
+ action, i);
+
+ action_type = lttng_action_get_type_const(inner_action);
+ if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
+ is_notify = true;
+ goto end;
+ }
+ }
+
+end:
+ return is_notify;
+}
+
/*
- * FIXME A client's credentials are not checked when registering a trigger, nor
- * are they stored alongside with the trigger.
+ * FIXME A client's credentials are not checked when registering a trigger.
*
* The effects of this are benign since:
* - The client will succeed in registering the trigger, as it is valid,
struct notification_client *client;
struct notification_client_list *client_list = NULL;
struct lttng_trigger_ht_element *trigger_ht_element = NULL;
- struct notification_client_list_element *client_list_element, *tmp;
+ struct notification_client_list_element *client_list_element;
struct cds_lfht_node *node;
struct cds_lfht_iter iter;
bool free_trigger = true;
+ struct lttng_evaluation *evaluation = NULL;
+ struct lttng_credentials object_creds;
+ enum action_executor_status executor_status;
rcu_read_lock();
* It is not skipped as this is the only action type currently
* supported.
*/
- client_list = notification_client_list_create(trigger);
- if (!client_list) {
- ret = -1;
- goto error_free_ht_element;
- }
-
- /* Build a list of clients to which this new trigger applies. */
- cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
- client_socket_ht_node) {
- if (!trigger_applies_to_client(trigger, client)) {
- continue;
+ if (is_trigger_action_notify(trigger)) {
+ client_list = notification_client_list_create(trigger);
+ if (!client_list) {
+ ret = -1;
+ goto error_free_ht_element;
}
- client_list_element = zmalloc(sizeof(*client_list_element));
- if (!client_list_element) {
- ret = -1;
- goto error_put_client_list;
+ /* Build a list of clients to which this new trigger applies. */
+ cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
+ client_socket_ht_node) {
+ if (!trigger_applies_to_client(trigger, client)) {
+ continue;
+ }
+
+ client_list_element =
+ zmalloc(sizeof(*client_list_element));
+ if (!client_list_element) {
+ ret = -1;
+ goto error_put_client_list;
+ }
+
+ CDS_INIT_LIST_HEAD(&client_list_element->node);
+ client_list_element->client = client;
+ cds_list_add(&client_list_element->node,
+ &client_list->list);
}
- CDS_INIT_LIST_HEAD(&client_list_element->node);
- client_list_element->client = client;
- cds_list_add(&client_list_element->node, &client_list->list);
+
+ /*
+ * Client list ownership transferred to the
+ * notification_trigger_clients_ht.
+ */
+ publish_notification_client_list(state, client_list);
}
switch (get_condition_binding_object(condition)) {
case LTTNG_OBJECT_TYPE_NONE:
break;
default:
- ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
+ ERR("Unknown object type on which to bind a newly registered trigger was encountered");
ret = -1;
goto error_put_client_list;
}
/*
- * Since there is nothing preventing clients from subscribing to a
- * condition before the corresponding trigger is registered, we have
- * to evaluate this new condition right away.
+ * The new trigger's condition must be evaluated against the current
+ * state.
+ *
+ * In the case of `notify` action, nothing preventing clients from
+ * subscribing to a condition before the corresponding trigger is
+ * registered, we have to evaluate this new condition right away.
*
* At some point, we were waiting for the next "evaluation" (e.g. on
* reception of a channel sample) to evaluate this new condition, but
* current state. Otherwise, the next evaluation cycle may only see
* that the evaluations remain the same (true for samples n-1 and n) and
* the client will never know that the condition has been met.
- *
- * No need to lock the list here as it has not been published yet.
*/
- cds_list_for_each_entry_safe(client_list_element, tmp,
- &client_list->list, node) {
- ret = evaluate_condition_for_client(trigger, condition,
- client_list_element->client, state);
- if (ret) {
- goto error_put_client_list;
- }
+ switch (get_condition_binding_object(condition)) {
+ case LTTNG_OBJECT_TYPE_SESSION:
+ ret = evaluate_session_condition_for_client(condition, state,
+ &evaluation, &object_creds.uid,
+ &object_creds.gid);
+ break;
+ case LTTNG_OBJECT_TYPE_CHANNEL:
+ ret = evaluate_channel_condition_for_client(condition, state,
+ &evaluation, &object_creds.uid,
+ &object_creds.gid);
+ break;
+ case LTTNG_OBJECT_TYPE_NONE:
+ ret = 0;
+ goto error_put_client_list;
+ case LTTNG_OBJECT_TYPE_UNKNOWN:
+ default:
+ ret = -1;
+ goto error_put_client_list;
+ }
+
+ if (ret) {
+ /* Fatal error. */
+ goto error_put_client_list;
+ }
+
+ DBG("Newly registered trigger's condition evaluated to %s",
+ evaluation ? "true" : "false");
+ if (!evaluation) {
+ /* Evaluation yielded nothing. Normal exit. */
+ ret = 0;
+ goto error_put_client_list;
}
/*
- * Client list ownership transferred to the
- * notification_trigger_clients_ht.
+ * Ownership of `evaluation` transferred to the action executor
+ * no matter the result.
*/
- publish_notification_client_list(state, client_list);
- client_list = NULL;
+ executor_status = action_executor_enqueue(state->executor, trigger,
+ evaluation, &object_creds, client_list);
+ evaluation = NULL;
+ switch (executor_status) {
+ case ACTION_EXECUTOR_STATUS_OK:
+ break;
+ case ACTION_EXECUTOR_STATUS_ERROR:
+ case ACTION_EXECUTOR_STATUS_INVALID:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ */
+ ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
+ ret = -1;
+ goto error_put_client_list;
+ case ACTION_EXECUTOR_STATUS_OVERFLOW:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ *
+ * Not a fatal error.
+ */
+ WARN("No space left when enqueuing action associated to newly registered trigger");
+ ret = 0;
+ goto error_put_client_list;
+ default:
+ abort();
+ }
*cmd_result = LTTNG_OK;
client_id);
ret = 0;
} else {
- pthread_mutex_lock(&client->lock);
ret = client_handle_transmission_status(
client, client_status, state);
- pthread_mutex_unlock(&client->lock);
}
rcu_read_unlock();
break;
return ret;
}
-/* Client lock must be acquired by caller. */
static
int client_reset_inbound_state(struct notification_client *client)
{
int ret;
- ASSERT_LOCKED(client->lock);
- ret = lttng_dynamic_buffer_set_size(
- &client->communication.inbound.buffer, 0);
- assert(!ret);
+ lttng_payload_clear(&client->communication.inbound.payload);
client->communication.inbound.bytes_to_receive =
sizeof(struct lttng_notification_channel_message);
LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
ret = lttng_dynamic_buffer_set_size(
- &client->communication.inbound.buffer,
+ &client->communication.inbound.payload.buffer,
client->communication.inbound.bytes_to_receive);
+
return ret;
}
ret = -1;
goto error;
}
+
pthread_mutex_init(&client->lock, NULL);
client->id = state->next_notification_client_id++;
CDS_INIT_LIST_HEAD(&client->condition_list);
- lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
- lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
+ lttng_payload_init(&client->communication.inbound.payload);
+ lttng_payload_init(&client->communication.outbound.payload);
client->communication.inbound.expect_creds = true;
- pthread_mutex_lock(&client->lock);
ret = client_reset_inbound_state(client);
- pthread_mutex_unlock(&client->lock);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
ret = 0;
rcu_read_unlock();
return ret;
+
error:
notification_client_destroy(client, state);
return ret;
}
-/* RCU read-lock must be held by the caller. */
-/* Client lock must be held by the caller */
+/*
+ * RCU read-lock must be held by the caller.
+ * Client lock must _not_ be held by the caller.
+ */
static
int notification_thread_client_disconnect(
struct notification_client *client,
struct lttng_condition_list_element *condition_list_element, *tmp;
/* Acquire the client lock to disable its communication atomically. */
+ pthread_mutex_lock(&client->lock);
client->communication.active = false;
+ cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
+ cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
+ pthread_mutex_unlock(&client->lock);
+
ret = lttng_poll_del(&state->events, client->socket);
if (ret) {
ERR("[notification-thread] Failed to remove client socket %d from poll set",
client->socket);
}
- cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
- cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
-
/* Release all conditions to which the client was subscribed. */
cds_list_for_each_entry_safe(condition_list_element, tmp,
&client->condition_list, node) {
goto end;
}
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(client, state);
- pthread_mutex_unlock(&client->lock);
end:
rcu_read_unlock();
return ret;
client_socket_ht_node) {
int ret;
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(
client, state);
- pthread_mutex_unlock(&client->lock);
if (ret) {
error_encoutered = true;
}
goto end;
}
- client->communication.outbound.queued_command_reply = false;
- client->communication.outbound.dropped_notification = false;
break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
/*
ssize_t ret;
size_t to_send_count;
enum client_transmission_status status;
+ struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const int fds_to_send_count =
+ lttng_payload_view_get_fd_handle_count(&pv);
ASSERT_LOCKED(client->lock);
goto end;
}
- assert(client->communication.outbound.buffer.size != 0);
- to_send_count = client->communication.outbound.buffer.size;
+ if (pv.buffer.size == 0) {
+ /*
+ * If both data and fds are equal to zero, we are in an invalid
+ * state.
+ */
+ assert(fds_to_send_count != 0);
+ goto send_fds;
+ }
+
+ /* Send data. */
+ to_send_count = pv.buffer.size;
DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
client->socket);
ret = lttcomm_send_unix_sock_non_block(client->socket,
- client->communication.outbound.buffer.data,
+ pv.buffer.data,
to_send_count);
if ((ret >= 0 && ret < to_send_count)) {
DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
client->socket);
to_send_count -= max(ret, 0);
- memcpy(client->communication.outbound.buffer.data,
- client->communication.outbound.buffer.data +
- client->communication.outbound.buffer.size - to_send_count,
+ memmove(client->communication.outbound.payload.buffer.data,
+ pv.buffer.data +
+ pv.buffer.size - to_send_count,
to_send_count);
ret = lttng_dynamic_buffer_set_size(
- &client->communication.outbound.buffer,
+ &client->communication.outbound.payload.buffer,
to_send_count);
if (ret) {
goto error;
}
+
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
+ goto end;
} else if (ret < 0) {
- /* Generic error, disconnect the client. */
+ /* Generic error, disable the client's communication. */
ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
+ client->communication.active = false;
status = CLIENT_TRANSMISSION_STATUS_FAIL;
+ goto end;
} else {
- /* No error and flushed the queue completely. */
+ /*
+ * No error and flushed the queue completely.
+ *
+ * The payload buffer size is used later to
+ * check if there is notifications queued. So albeit that the
+ * direct caller knows that the transmission is complete, we
+ * need to set the buffer size to zero.
+ */
ret = lttng_dynamic_buffer_set_size(
- &client->communication.outbound.buffer, 0);
+ &client->communication.outbound.payload.buffer, 0);
if (ret) {
goto error;
}
+ }
+
+send_fds:
+ /* No fds to send, transmission is complete. */
+ if (fds_to_send_count == 0) {
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ goto end;
+ }
+
+ ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
+ client->socket, &pv);
+ if (ret < 0) {
+ /* Generic error, disable the client's communication. */
+ ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
+ client->socket);
+ client->communication.active = false;
+ status = CLIENT_TRANSMISSION_STATUS_FAIL;
+ goto end;
+ } else if (ret == 0) {
+ /* Nothing could be sent. */
+ status = CLIENT_TRANSMISSION_STATUS_QUEUED;
+ } else {
+ /* Fd passing is an all or nothing kind of thing. */
+ status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ /*
+ * The payload _fd_array count is used later to
+ * check if there is notifications queued. So although the
+ * direct caller knows that the transmission is complete, we
+ * need to clear the _fd_array for the queuing check.
+ */
+ lttng_dynamic_pointer_array_clear(
+ &client->communication.outbound.payload
+ ._fd_handles);
}
+
end:
+ if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
+ client->communication.outbound.queued_command_reply = false;
+ client->communication.outbound.dropped_notification = false;
+ lttng_payload_clear(&client->communication.outbound.payload);
+ }
+
return status;
error:
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-/* Client lock must be acquired by caller. */
+static
+bool client_has_outbound_data_left(
+ const struct notification_client *client)
+{
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const bool has_data = pv.buffer.size != 0;
+ const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+ return has_data || has_fds;
+}
+
+/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
struct notification_thread_state *state,
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
- ASSERT_LOCKED(client->lock);
+ memcpy(buffer, &msg, sizeof(msg));
+ memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
+ DBG("[notification-thread] Send command reply (%i)", (int) status);
+ pthread_mutex_lock(&client->lock);
if (client->communication.outbound.queued_command_reply) {
/* Protocol error. */
- goto error;
+ goto error_unlock;
}
- memcpy(buffer, &msg, sizeof(msg));
- memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
- DBG("[notification-thread] Send command reply (%i)", (int) status);
-
/* Enqueue buffer to outgoing queue and flush it. */
ret = lttng_dynamic_buffer_append(
- &client->communication.outbound.buffer,
+ &client->communication.outbound.payload.buffer,
buffer, sizeof(buffer));
if (ret) {
- goto error;
+ goto error_unlock;
}
transmission_status = client_flush_outgoing_queue(client);
+
+ if (client_has_outbound_data_left(client)) {
+ /* Queue could not be emptied. */
+ client->communication.outbound.queued_command_reply = true;
+ }
+
+ pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(
client, transmission_status, state);
if (ret) {
goto error;
}
- if (client->communication.outbound.buffer.size != 0) {
- /* Queue could not be emptied. */
- client->communication.outbound.queued_command_reply = true;
- }
-
return 0;
+error_unlock:
+ pthread_mutex_unlock(&client->lock);
error:
return -1;
}
struct notification_thread_state *state)
{
int ret;
-
- pthread_mutex_lock(&client->lock);
-
/*
* Receiving message header. The function will be called again
* once the rest of the message as been received and can be
*/
const struct lttng_notification_channel_message *msg;
- assert(sizeof(*msg) == client->communication.inbound.buffer.size);
+ assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
msg = (const struct lttng_notification_channel_message *)
- client->communication.inbound.buffer.data;
+ client->communication.inbound.payload.buffer.data;
if (msg->size == 0 ||
msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
}
client->communication.inbound.bytes_to_receive = msg->size;
+ client->communication.inbound.fds_to_receive = msg->fds;
client->communication.inbound.msg_type =
(enum lttng_notification_channel_message_type) msg->type;
ret = lttng_dynamic_buffer_set_size(
- &client->communication.inbound.buffer, msg->size);
+ &client->communication.inbound.payload.buffer, msg->size);
+
+ /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
+ msg = NULL;
end:
- pthread_mutex_unlock(&client->lock);
return ret;
}
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
- enum client_transmission_status transmission_status;
-
- pthread_mutex_lock(&client->lock);
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
handshake_client =
(struct lttng_notification_channel_command_handshake *)
- client->communication.inbound.buffer
+ client->communication.inbound.payload.buffer
.data;
client->major = handshake_client->major;
client->minor = handshake_client->minor;
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
}
+ pthread_mutex_lock(&client->lock);
+ /* Outgoing queue will be flushed when the command reply is sent. */
ret = lttng_dynamic_buffer_append(
- &client->communication.outbound.buffer, send_buffer,
+ &client->communication.outbound.payload.buffer, send_buffer,
sizeof(send_buffer));
if (ret) {
ERR("[notification-thread] Failed to send protocol version to notification channel client");
- goto end;
+ goto end_unlock;
}
client->validated = true;
client->communication.active = true;
+ pthread_mutex_unlock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
- ret = client_handle_transmission_status(
- client, transmission_status, state);
+ /* Set reception state to receive the next message header. */
+ ret = client_reset_inbound_state(client);
if (ret) {
+ ERR("[notification-thread] Failed to reset client communication's inbound state");
goto end;
}
+ /* Flushes the outgoing queue. */
ret = client_send_command_reply(client, state, status);
if (ret) {
ERR("[notification-thread] Failed to send reply to notification channel client");
goto end;
}
- /* Set reception state to receive the next message header. */
- ret = client_reset_inbound_state(client);
- if (ret) {
- ERR("[notification-thread] Failed to reset client communication's inbound state");
- goto end;
- }
-
-end:
+ goto end;
+end_unlock:
pthread_mutex_unlock(&client->lock);
+end:
return ret;
}
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
struct lttng_payload_view condition_view =
- lttng_payload_view_from_dynamic_buffer(
- &client->communication.inbound.buffer,
+ lttng_payload_view_from_payload(
+ &client->communication.inbound.payload,
0, -1);
size_t expected_condition_size;
- pthread_mutex_lock(&client->lock);
- expected_condition_size = client->communication.inbound.buffer.size;
- pthread_mutex_unlock(&client->lock);
-
+ /*
+ * No need to lock client to sample the inbound state as the only
+ * other thread accessing clients (action executor) only uses the
+ * outbound state.
+ */
+ expected_condition_size = client->communication.inbound.payload.buffer.size;
ret = lttng_condition_create_from_payload(&condition_view, &condition);
if (ret != expected_condition_size) {
ERR("[notification-thread] Malformed condition received from client");
ret = notification_thread_client_unsubscribe(
client, condition, state, &status);
}
- if (ret) {
- goto end;
- }
- pthread_mutex_lock(&client->lock);
- ret = client_send_command_reply(client, state, status);
if (ret) {
- ERR("[notification-thread] Failed to send reply to notification channel client");
- goto end_unlock;
+ goto end;
}
/* Set reception state to receive the next message header. */
ret = client_reset_inbound_state(client);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
- goto end_unlock;
+ goto end;
+ }
+
+ ret = client_send_command_reply(client, state, status);
+ if (ret) {
+ ERR("[notification-thread] Failed to send reply to notification channel client");
+ goto end;
}
-end_unlock:
- pthread_mutex_unlock(&client->lock);
end:
return ret;
}
struct notification_client *client;
ssize_t recv_ret;
size_t offset;
- bool message_is_complete = false;
+ rcu_read_lock();
client = get_client_from_socket(socket, state);
if (!client) {
/* Internal error, abort. */
goto end;
}
- pthread_mutex_lock(&client->lock);
- offset = client->communication.inbound.buffer.size -
+ offset = client->communication.inbound.payload.buffer.size -
client->communication.inbound.bytes_to_receive;
if (client->communication.inbound.expect_creds) {
recv_ret = lttcomm_recv_creds_unix_sock(socket,
- client->communication.inbound.buffer.data + offset,
+ client->communication.inbound.payload.buffer.data + offset,
client->communication.inbound.bytes_to_receive,
&client->communication.inbound.creds);
if (recv_ret > 0) {
}
} else {
recv_ret = lttcomm_recv_unix_sock_non_block(socket,
- client->communication.inbound.buffer.data + offset,
+ client->communication.inbound.payload.buffer.data + offset,
client->communication.inbound.bytes_to_receive);
}
if (recv_ret >= 0) {
client->communication.inbound.bytes_to_receive -= recv_ret;
- message_is_complete = client->communication.inbound
- .bytes_to_receive == 0;
- }
- pthread_mutex_unlock(&client->lock);
- if (recv_ret < 0) {
+ } else {
goto error_disconnect_client;
}
- if (message_is_complete) {
- ret = client_dispatch_message(client, state);
- if (ret) {
+ if (client->communication.inbound.bytes_to_receive != 0) {
+ /* Message incomplete wait for more data. */
+ ret = 0;
+ goto end;
+ }
+
+ assert(client->communication.inbound.bytes_to_receive == 0);
+
+ /* Receive fds. */
+ if (client->communication.inbound.fds_to_receive != 0) {
+ ret = lttcomm_recv_payload_fds_unix_sock_non_block(
+ client->socket,
+ client->communication.inbound.fds_to_receive,
+ &client->communication.inbound.payload);
+ if (ret > 0) {
/*
- * Only returns an error if this client must be
- * disconnected.
+ * Fds received. non blocking fds passing is all
+ * or nothing.
*/
+ ssize_t expected_size;
+
+ expected_size = sizeof(int) *
+ client->communication.inbound
+ .fds_to_receive;
+ assert(ret == expected_size);
+ client->communication.inbound.fds_to_receive = 0;
+ } else if (ret == 0) {
+ /* Received nothing. */
+ ret = 0;
+ goto end;
+ } else {
goto error_disconnect_client;
}
}
+
+ /* At this point the message is complete.*/
+ assert(client->communication.inbound.bytes_to_receive == 0 &&
+ client->communication.inbound.fds_to_receive == 0);
+ ret = client_dispatch_message(client, state);
+ if (ret) {
+ /*
+ * Only returns an error if this client must be
+ * disconnected.
+ */
+ goto error_disconnect_client;
+ }
+
end:
+ rcu_read_unlock();
return ret;
+
error_disconnect_client:
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(client, state);
- pthread_mutex_unlock(&client->lock);
- return ret;
+ goto end;
}
/* Client ready to receive outgoing data. */
struct notification_client *client;
enum client_transmission_status transmission_status;
+ rcu_read_lock();
client = get_client_from_socket(socket, state);
if (!client) {
/* Internal error, abort. */
pthread_mutex_lock(&client->lock);
transmission_status = client_flush_outgoing_queue(client);
+ pthread_mutex_unlock(&client->lock);
+
ret = client_handle_transmission_status(
client, transmission_status, state);
- pthread_mutex_unlock(&client->lock);
if (ret) {
goto end;
}
end:
+ rcu_read_unlock();
return ret;
}
client->communication.outbound.dropped_notification = true;
ret = lttng_dynamic_buffer_append(
- &client->communication.outbound.buffer, &msg,
+ &client->communication.outbound.payload.buffer, &msg,
sizeof(msg));
if (ret) {
PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
->size = (uint32_t)(
msg_payload.buffer.size - sizeof(msg_header));
+ /* Update the payload number of fds. */
+ {
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &msg_payload, 0, -1);
+
+ ((struct lttng_notification_channel_message *)
+ msg_payload.buffer.data)->fds = (uint32_t)
+ lttng_payload_view_get_fd_handle_count(&pv);
+ }
+
pthread_mutex_lock(&client_list->lock);
cds_list_for_each_entry_safe(client_list_element, tmp,
&client_list->list, node) {
ret = 0;
pthread_mutex_lock(&client->lock);
+ if (!client->communication.active) {
+ /*
+ * Skip inactive client (protocol error or
+ * disconnecting).
+ */
+ DBG("Skipping client at it is marked as inactive");
+ goto skip_client;
+ }
+
if (source_object_creds) {
if (client->uid != source_object_creds->uid &&
client->gid != source_object_creds->gid &&
* object.
*/
DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
- goto unlock_client;
+ goto skip_client;
}
}
if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
- goto unlock_client;
+ goto skip_client;
}
DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
client->socket, msg_payload.buffer.size);
- if (client->communication.outbound.buffer.size) {
+
+ if (client_has_outbound_data_left(client)) {
/*
* Outgoing data is already buffered for this client;
* drop the notification and enqueue a "dropped
*/
ret = client_notification_overflow(client);
if (ret) {
- goto unlock_client;
+ /* Fatal error. */
+ goto skip_client;
}
}
- ret = lttng_dynamic_buffer_append_buffer(
- &client->communication.outbound.buffer,
- &msg_payload.buffer);
+ ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
if (ret) {
- goto unlock_client;
+ /* Fatal error. */
+ goto skip_client;
}
transmission_status = client_flush_outgoing_queue(client);
+ pthread_mutex_unlock(&client->lock);
ret = client_report(client, transmission_status, user_data);
if (ret) {
- goto unlock_client;
+ /* Fatal error. */
+ goto end_unlock_list;
}
-unlock_client:
+
+ continue;
+
+skip_client:
pthread_mutex_unlock(&client->lock);
if (ret) {
+ /* Fatal error. */
goto end_unlock_list;
}
}
cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
node) {
const struct lttng_condition *condition;
- const struct lttng_action *action;
struct lttng_trigger *trigger;
struct notification_client_list *client_list = NULL;
struct lttng_evaluation *evaluation = NULL;
- bool client_list_is_empty;
enum action_executor_status executor_status;
ret = 0;
trigger = trigger_list_element->trigger;
condition = lttng_trigger_get_const_condition(trigger);
assert(condition);
- action = lttng_trigger_get_const_action(trigger);
-
- /* Notify actions are the only type currently supported. */
- assert(lttng_action_get_type_const(action) ==
- LTTNG_ACTION_TYPE_NOTIFY);
/*
* Check if any client is subscribed to the result of this
* evaluation.
*/
client_list = get_client_list_from_condition(state, condition);
- assert(client_list);
- client_list_is_empty = cds_list_empty(&client_list->list);
- if (client_list_is_empty) {
- /*
- * No clients interested in the evaluation's result,
- * skip it.
- */
- goto put_list;
- }
ret = evaluate_buffer_condition(condition, &evaluation, state,
previous_sample_available ? &previous_sample : NULL,