pthread_mutex_lock(&handle->cmd_queue.lock);
cmd = cds_list_first_entry(&handle->cmd_queue.list,
struct notification_thread_command, cmd_list_node);
+ cds_list_del(&cmd->cmd_list_node);
+ pthread_mutex_unlock(&handle->cmd_queue.lock);
switch (cmd->type) {
case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
DBG("[notification-thread] Received register trigger command");
goto error_unlock;
}
end:
- cds_list_del(&cmd->cmd_list_node);
- lttng_waiter_wake_up(&cmd->reply_waiter);
- pthread_mutex_unlock(&handle->cmd_queue.lock);
+ if (cmd->is_async) {
+ free(cmd);
+ cmd = NULL;
+ } else {
+ lttng_waiter_wake_up(&cmd->reply_waiter);
+ }
return ret;
error_unlock:
/* Wake-up and return a fatal error to the calling thread. */
lttng_waiter_wake_up(&cmd->reply_waiter);
- pthread_mutex_unlock(&handle->cmd_queue.lock);
cmd->reply_code = LTTNG_ERR_FATAL;
error:
/* Indicate a fatal error to the caller. */
return error_occurred ? -1 : 0;
}
+static
+int client_handle_transmission_status(
+ struct notification_client *client,
+ enum client_transmission_status transmission_status,
+ struct notification_thread_state *state)
+{
+ int ret = 0;
+
+ switch (transmission_status) {
+ case CLIENT_TRANSMISSION_STATUS_COMPLETE:
+ ret = lttng_poll_mod(&state->events, client->socket,
+ CLIENT_POLL_MASK_IN);
+ if (ret) {
+ goto end;
+ }
+
+ client->communication.outbound.queued_command_reply = false;
+ client->communication.outbound.dropped_notification = false;
+ break;
+ case CLIENT_TRANSMISSION_STATUS_QUEUED:
+ /*
+ * We want to be notified whenever there is buffer space
+ * available to send the rest of the payload.
+ */
+ ret = lttng_poll_mod(&state->events, client->socket,
+ CLIENT_POLL_MASK_IN_OUT);
+ if (ret) {
+ goto end;
+ }
+ break;
+ case CLIENT_TRANSMISSION_STATUS_FAIL:
+ ret = notification_thread_client_disconnect(client, state);
+ if (ret) {
+ goto end;
+ }
+ break;
+ case CLIENT_TRANSMISSION_STATUS_ERROR:
+ ret = -1;
+ goto end;
+ default:
+ abort();
+ }
+end:
+ return ret;
+}
+
/* Client lock must be acquired by caller. */
static
-int client_flush_outgoing_queue(struct notification_client *client,
+enum client_transmission_status client_flush_outgoing_queue(
+ struct notification_client *client,
struct notification_thread_state *state)
{
ssize_t ret;
size_t to_send_count;
+ enum client_transmission_status status;
ASSERT_LOCKED(client->lock);
&client->communication.outbound.buffer,
to_send_count);
if (ret) {
+ status = CLIENT_TRANSMISSION_STATUS_ERROR;
goto error;
}
-
- /*
- * We want to be notified whenever there is buffer space
- * available to send the rest of the payload.
- */
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN_OUT);
- if (ret) {
- goto error;
- }
+ status = CLIENT_TRANSMISSION_STATUS_QUEUED;
} else if (ret < 0) {
/* Generic error, disconnect the client. */
- ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
+ ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
- ret = notification_thread_client_disconnect(client, state);
- if (ret) {
- goto error;
- }
+ status = CLIENT_TRANSMISSION_STATUS_FAIL;
} else {
/* No error and flushed the queue completely. */
ret = lttng_dynamic_buffer_set_size(
&client->communication.outbound.buffer, 0);
if (ret) {
+ status = CLIENT_TRANSMISSION_STATUS_ERROR;
goto error;
}
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN);
- if (ret) {
- goto error;
- }
+ status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ }
- client->communication.outbound.queued_command_reply = false;
- client->communication.outbound.dropped_notification = false;
+ ret = client_handle_transmission_status(client, status, state);
+ if (ret) {
+ goto error;
}
return 0;