projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
License: common: error_query: fix typo in SPDX specifier
[lttng-tools.git]
/
src
/
common
/
ust-consumer
/
ust-consumer.cpp
diff --git
a/src/common/ust-consumer/ust-consumer.cpp
b/src/common/ust-consumer/ust-consumer.cpp
index 5339553cb5fd7fb7b54d61f8b74814f21e1faf6c..00671a876a8bb637d631b58e31d1bb55cd793322 100644
(file)
--- a/
src/common/ust-consumer/ust-consumer.cpp
+++ b/
src/common/ust-consumer/ust-consumer.cpp
@@
-12,7
+12,6
@@
#include <common/common.hpp>
#include <common/compat/endian.hpp>
#include <common/common.hpp>
#include <common/compat/endian.hpp>
-#include <common/compat/fcntl.hpp>
#include <common/consumer/consumer-metadata-cache.hpp>
#include <common/consumer/consumer-stream.hpp>
#include <common/consumer/consumer-timer.hpp>
#include <common/consumer/consumer-metadata-cache.hpp>
#include <common/consumer/consumer-stream.hpp>
#include <common/consumer/consumer-timer.hpp>
@@
-22,12
+21,14
@@
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/shm.hpp>
#include <common/relayd/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/shm.hpp>
+#include <common/urcu.hpp>
#include <common/utils.hpp>
#include <lttng/ust-ctl.h>
#include <lttng/ust-sigbus.h>
#include <bin/lttng-consumerd/health-consumerd.hpp>
#include <common/utils.hpp>
#include <lttng/ust-ctl.h>
#include <lttng/ust-sigbus.h>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
@@
-167,7
+168,8
@@
static int send_stream_to_thread(struct lttng_consumer_stream *stream,
stream->globally_visible = 1;
cds_list_del_init(&stream->send_node);
stream->globally_visible = 1;
cds_list_del_init(&stream->send_node);
- ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
+ ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); /* NOLINT sizeof used on a
+ pointer. */
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
if (ret < 0) {
ERR("Consumer write %s stream to pipe %d",
stream->metadata_flag ? "metadata" : "data",
@@
-665,7
+667,7
@@
static int flush_channel(uint64_t chan_key)
DBG("UST consumer flush channel key %" PRIu64, chan_key);
DBG("UST consumer flush channel key %" PRIu64, chan_key);
-
rcu_read_lock()
;
+
lttng::urcu::read_lock_guard read_lock
;
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
@@
-720,7
+722,6
@@
static int flush_channel(uint64_t chan_key)
*/
sample_and_send_channel_buffer_stats(channel);
error:
*/
sample_and_send_channel_buffer_stats(channel);
error:
- rcu_read_unlock();
return ret;
}
return ret;
}
@@
-740,7
+741,7
@@
static int clear_quiescent_channel(uint64_t chan_key)
DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
-
rcu_read_lock()
;
+
lttng::urcu::read_lock_guard read_lock
;
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
channel = consumer_find_channel(chan_key);
if (!channel) {
ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
@@
-766,7
+767,6
@@
static int clear_quiescent_channel(uint64_t chan_key)
pthread_mutex_unlock(&stream->lock);
}
error:
pthread_mutex_unlock(&stream->lock);
}
error:
- rcu_read_unlock();
return ret;
}
return ret;
}
@@
-930,6
+930,8
@@
error:
*/
consumer_stream_destroy(metadata->metadata_stream, nullptr);
metadata->metadata_stream = nullptr;
*/
consumer_stream_destroy(metadata->metadata_stream, nullptr);
metadata->metadata_stream = nullptr;
+ lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
send_streams_error:
error_no_stream:
end:
send_streams_error:
error_no_stream:
end:
@@
-957,7
+959,7
@@
static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
-
rcu_read_lock()
;
+
lttng::urcu::read_lock_guard read_lock
;
LTTNG_ASSERT(!metadata_channel->monitor);
LTTNG_ASSERT(!metadata_channel->monitor);
@@
-967,7
+969,7
@@
static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
0
, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
false
, 1);
if (ret < 0) {
goto error;
}
if (ret < 0) {
goto error;
}
@@
-1013,9
+1015,9
@@
error_stream:
*/
consumer_stream_destroy(metadata_stream, nullptr);
metadata_channel->metadata_stream = nullptr;
*/
consumer_stream_destroy(metadata_stream, nullptr);
metadata_channel->metadata_stream = nullptr;
+ lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
error:
error:
- rcu_read_unlock();
return ret;
}
return ret;
}
@@
-1066,7
+1068,7
@@
static int snapshot_channel(struct lttng_consumer_channel *channel,
LTTNG_ASSERT(ctx);
ASSERT_RCU_READ_LOCKED();
LTTNG_ASSERT(ctx);
ASSERT_RCU_READ_LOCKED();
-
rcu_read_lock()
;
+
lttng::urcu::read_lock_guard read_lock
;
if (relayd_id != (uint64_t) -1ULL) {
use_relayd = 1;
if (relayd_id != (uint64_t) -1ULL) {
use_relayd = 1;
@@
-1217,7
+1219,6
@@
static int snapshot_channel(struct lttng_consumer_channel *channel,
pthread_mutex_unlock(&stream->lock);
}
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
return 0;
error_put_subbuf:
return 0;
error_put_subbuf:
@@
-1228,7
+1229,6
@@
error_close_stream:
consumer_stream_close_output(stream);
error_unlock:
pthread_mutex_unlock(&stream->lock);
consumer_stream_close_output(stream);
error_unlock:
pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
return ret;
}
return ret;
}
@@
-1254,7
+1254,7
@@
int lttng_ustconsumer_recv_metadata(int sock,
uint64_t len,
uint64_t version,
struct lttng_consumer_channel *channel,
uint64_t len,
uint64_t version,
struct lttng_consumer_channel *channel,
-
int
timer,
+
bool invoked_by_
timer,
int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
@@
-1342,13
+1342,8
@@
int lttng_ustconsumer_recv_metadata(int sock,
if (!wait) {
goto end_free;
}
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
- health_code_update();
-
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
end_free:
free(metadata_str);
@@
-1400,7
+1395,7
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
health_code_update();
/* relayd needs RCU read-side lock */
health_code_update();
/* relayd needs RCU read-side lock */
-
rcu_read_lock()
;
+
lttng::urcu::read_lock_guard read_lock
;
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
@@
-1455,7
+1450,6
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
- rcu_read_unlock();
return -ENOSYS;
}
case LTTNG_CONSUMER_DATA_PENDING:
return -ENOSYS;
}
case LTTNG_CONSUMER_DATA_PENDING:
@@
-1794,7
+1788,7
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
health_code_update();
ret = lttng_ustconsumer_recv_metadata(
health_code_update();
ret = lttng_ustconsumer_recv_metadata(
- sock, key, offset, len, version, found_channel,
0
, 1);
+ sock, key, offset, len, version, found_channel,
false
, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
@@
-1876,7
+1870,6
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
uint64_t key = msg.u.discarded_events.channel_key;
DBG("UST consumer discarded events command for session id %" PRIu64, id);
uint64_t key = msg.u.discarded_events.channel_key;
DBG("UST consumer discarded events command for session id %" PRIu64, id);
- rcu_read_lock();
pthread_mutex_lock(&the_consumer_data.lock);
ht = the_consumer_data.stream_list_ht;
pthread_mutex_lock(&the_consumer_data.lock);
ht = the_consumer_data.stream_list_ht;
@@
-1903,7
+1896,6
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
DBG("UST consumer discarded events command for session id %" PRIu64
", channel key %" PRIu64,
DBG("UST consumer discarded events command for session id %" PRIu64
", channel key %" PRIu64,
@@
-1932,7
+1924,6
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
uint64_t key = msg.u.lost_packets.channel_key;
DBG("UST consumer lost packets command for session id %" PRIu64, id);
uint64_t key = msg.u.lost_packets.channel_key;
DBG("UST consumer lost packets command for session id %" PRIu64, id);
- rcu_read_lock();
pthread_mutex_lock(&the_consumer_data.lock);
ht = the_consumer_data.stream_list_ht;
pthread_mutex_lock(&the_consumer_data.lock);
ht = the_consumer_data.stream_list_ht;
@@
-1958,7
+1949,6
@@
int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
}
}
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
DBG("UST consumer lost packets command for session id %" PRIu64
", channel key %" PRIu64,
DBG("UST consumer lost packets command for session id %" PRIu64
", channel key %" PRIu64,
@@
-2295,7
+2285,6
@@
error_fatal:
goto end;
end:
goto end;
end:
- rcu_read_unlock();
health_code_update();
return ret_func;
}
health_code_update();
return ret_func;
}
@@
-2560,6
+2549,7
@@
static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
goto end;
}
stream->ust_metadata_pushed += write_len;
goto end;
}
stream->ust_metadata_pushed += write_len;
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
ret = write_len;
LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
ret = write_len;
@@
-2608,7
+2598,7
@@
lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
0
, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel,
false
, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
@@
-3212,15
+3202,17
@@
void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
DBG("UST consumer closing all metadata streams");
DBG("UST consumer closing all metadata streams");
- rcu_read_lock();
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
- health_code_update();
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ health_code_update();
- pthread_mutex_lock(&stream->chan->lock);
- lttng_ustconsumer_close_metadata(stream->chan);
- pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ lttng_ustconsumer_close_metadata(stream->chan);
+ pthread_mutex_unlock(&stream->chan->lock);
+ }
}
}
- rcu_read_unlock();
}
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
}
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
@@
-3245,7
+3237,7
@@
void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *channel,
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_channel *channel,
-
int
timer,
+
bool invoked_by_
timer,
int wait)
{
struct lttcomm_metadata_request_msg request;
int wait)
{
struct lttcomm_metadata_request_msg request;
@@
-3350,8
+3342,14
@@
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
health_code_update();
health_code_update();
- ret = lttng_ustconsumer_recv_metadata(
- ctx->consumer_metadata_socket, key, offset, len, version, channel, timer, wait);
+ ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
+ key,
+ offset,
+ len,
+ version,
+ channel,
+ invoked_by_timer,
+ wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
This page took
0.033404 seconds
and
4
git commands to generate.