Skip to content

Commit

Permalink
[RST-1477] Simplified the sensor<-->optimizer API (#35)
Browse files Browse the repository at this point in the history
* Simplified the sensor<-->optimizer API. Moved the implementation details of the optimizer transaction callback into the optimizer where it belongs.
  • Loading branch information
svwilliams authored Jan 23, 2019
1 parent 93e5ec7 commit fab0547
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 145 deletions.
1 change: 0 additions & 1 deletion fuse_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ add_library(${PROJECT_NAME}
src/async_sensor_model.cpp
src/constraint.cpp
src/graph.cpp
src/sensor_model.cpp
src/timestamp_manager.cpp
src/transaction.cpp
src/variable.cpp
Expand Down
41 changes: 16 additions & 25 deletions fuse_core/include/fuse_core/async_sensor_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,35 @@ namespace fuse_core
/**
* @brief A sensor model base class that provides node handles and a private callback queue.
*
* A sensor model plugin is responsible for generating new constraints and passing them along to the optimizer, where
* the actual sensor fusion takes place. The asynchronous sensor model plugin is designed similar to the nodelet
* plugin, attempting to be as generic and easy to use as a standard ROS node.
* A sensor model plugin is responsible for generating new constraints, packaging them in a fuse_core::Transaction
* object, and passing them along to the optimizer. The asynchronous sensor model plugin is designed similar to the
* nodelet plugin, attempting to be as generic and easy to use as a standard ROS node.
*
* There are a few notable differences between the asynchronous sensor model plugin and a standard ROS node. First
* and most obvious, the sensor model is designed as a plugin, with all of the stipulations and requirements that
* come with all ROS plugins (must be derived from a known base class, will be default constructed). Second, the base
* AsyncSensorModel class provides a global and private node handle, both hooked to a local callback queue and local
* spinner. This makes it act like a full ROS node -- subscriptions trigger message callbacks, callbacks will fire
* sequentially, etc. However, authors of derived sensor models should be aware of this fact and avoid creating
* additional node handles, or at least take care when creating new node handles and additional callback queues.
* Finally, the concept of "publishing a transaction" has been abused slightly. A base class
* SensorModel::injectCallback() method has been provided. This serves the same basic purpose as publishing a
* Transaction using a standard ROS publisher, but this method is specific to sensor models and the fuse ecosystem.
* Under the hood, this inserts a call into the fuse optimizer's callback queue, just as publishing a message would,
* but the memory copy and serialization are avoided.
* additional node handles, or at least take care when creating new node handles and additional callback queues.
* Finally, instead of publishing the generated constraints using a ROS publisher, the asynchronous sensor model
* provides a "sendTransaction()" method that should be called whenever the sensor is ready to send a
* fuse_core::Transaction object to the Optimizer. Under the hood, this executes the TransactionCallback that was
* provided during plugin construction.
*
* Derived classes:
* - _must_ implement the onInit() method. This method is used to configure the sensor model for operation.
* This includes things like accessing the parameter server and subscribing to sensor topics.
* - may _optionally_ implement the onGraphUpdate() method. This should only be done if the derived sensor needs
* access to the latest values of the state variables. In many cases, sensors will simply not need that information.
* If the sensor does need access the to graph, the most common implementation will simply be to move the provided
* pointer into a class memebr variable, for use in other callbacks.
* If the sensor does need access to the graph, the most common implementation will simply be to move the provided
* pointer into a class member variable, for use in other callbacks.
* @code{.cpp}
* void onGraphUpdate(Graph::ConstSharedPtr graph) override { this->graph_ = std::move(graph); }
* @endcode
* - will _probably_ subscribe to a sensor message topic and write a custom message callback function. Within that
* function, the derived class will generate new constraints based on the received sensor data.
* - _must_ call injectCallback() everytime a new constraints are generated. This is how constraints are sent to the
* - _must_ call sendTransaction() every time new constraints are generated. This is how constraints are sent to the
* optimizer. Otherwise, the optimizer will not know about the derived sensor's constraints, and the sensor will
* have no effect.
*/
Expand Down Expand Up @@ -119,31 +118,26 @@ class AsyncSensorModel : public SensorModel
*
* @param[in] name A unique name to give this plugin instance
* @param[in] transaction_callback The function to call every time a transaction is published
* @param[in] transaction_callback_queue The callback queue the callback function should be inserted into. This
* will likely belong to the parent of the plugin, as no attempt to spin this
* queue is performed.
*/
void initialize(
const std::string& name,
TransactionCallback transaction_callback,
ros::CallbackQueue* transaction_callback_queue) final;
TransactionCallback transaction_callback) final;

/**
* @brief Inject the Transaction into the callback queue registered during initialize()
* @brief Send a transaction to the Optimizer
*
* It is expected that sensor model plugins will generate new constraints (packaged inside a Transaction) as a result
* of received sensor data. Instead of the Optimizer periodically checking for new transactions, we provide a
* "push" mechanism for the sensor model to send the transaction to the Optimizer immediately by injecting the
* transaction into the Optimizer's callback queue. The Optimizer's transaction callback function will fire within
* the Optimizer's callback thread(s).
* "push" mechanism for the sensor model to send the transaction to the Optimizer immediately by calling the callback
* function provided by the Optimizer. This function will be executed by the SensorModel's thread.
*
* This should be called by derived classes whenever a new Transaction is generated, probably from within the sensor
* message callback function.
*
* @param[in] stamps Any timestamps associated with the added variables. These are sent to the motion models.
* @param[in] transaction A Transaction object describing the set of variables that have been added and removed.
*/
void injectCallback(
void sendTransaction(
const std::set<ros::Time>& stamps,
const Transaction::SharedPtr& transaction);

Expand All @@ -159,9 +153,6 @@ class AsyncSensorModel : public SensorModel
ros::NodeHandle private_node_handle_; //!< A node handle in the private namespace using the local callback queue
ros::AsyncSpinner spinner_; //!< A single/multi-threaded spinner assigned to the local callback queue
TransactionCallback transaction_callback_; //!< The function to be executed every time a Transaction is "published"
ros::CallbackQueue* transaction_callback_queue_; //!< The callback queue used for transaction callbacks. This will
//!< likely belong to the parent of the plugin, as no attempt to
//!< spin this queue is performed.

/**
* @brief Constructor
Expand Down
40 changes: 7 additions & 33 deletions fuse_core/include/fuse_core/sensor_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ using TransactionCallback = std::function<void(const std::set<ros::Time>& stamps
* optimizer. If you are developing your own sensor model, look at the AsyncSensorModel class, as this provides
* additional features that make the sensor model act similar to a ROS node or nodelet.
*
* The interface SensorModel class provides the injectCallback() function. This function serves the same basic purpose
* as publishing a Transaction using a standard ROS publisher, but this method is specific to sensor models and the
* fuse ecosystem. Under the hood, this inserts a call into the fuse optimizer's callback queue, just as publishing a
* message would, but the memory copy and serialization are avoided. Derived classes should call this method whenever
* they are ready to send a fuse_core::Transaction object to the optimizer.
* During the initialize() call, the optimizer will provide a transaction callback. This function serves the same
* basic purpose as publishing a Transaction using a standard ROS publisher. Derived classes should call this function
* whenever they are ready to send a fuse_core::Transaction object to the optimizer. This function will be called in
* the SensorModel's thread. The optimizer implementation should ensure that this callback is safe to execute in this
* thread. The optimizer should also ensure the callback executes quickly so that it will not block execution in the
* SensorModel.
*/
class SensorModel
{
Expand Down Expand Up @@ -102,37 +103,10 @@ class SensorModel
*
* @param[in] name A unique name to give this plugin instance
* @param[in] transaction_callback The function to call every time a transaction is published
* @param[in] transaction_callback_queue The callback queue the callback function should be inserted into. This
* will likely belong to the parent of the plugin, as no attempt to spin this
* queue is performed.
*/
virtual void initialize(
const std::string& name,
TransactionCallback transaction_callback,
ros::CallbackQueue* transaction_callback_queue) = 0;

/**
* @brief Inject a transaction callback function into a callback queue
*
* It is expected that sensor model plugins will generate new constraints (packaged inside a Transaction) as a result
* of received sensor data. Instead of the Optimizer periodically checking for new transactions, we provide a
* "push" mechanism for the sensor model to send the transaction to the Optimizer immediately by injecting the
* transaction into the Optimizer's callback queue. The Optimizer's transaction callback function will fire within
* the Optimizer's callback thread(s). The optimize will supply a pointer to its transaction callback function and
* its callback queue within the initialize() call.
*
* @param[in] stamps All timestamps associated with the added variables. These are sent to the
* motion models.
* @param[in] transaction Object describing the set of variables and constraints that have been added
* and/or removed.
* @param[in] transaction_callback The callback function to inject into the callback queue
* @param[in] transaction_callback_queue The callback queue the callback function should be inserted into.
*/
static void injectCallback(
const std::set<ros::Time>& stamps,
const Transaction::SharedPtr& transaction,
const TransactionCallback& transaction_callback,
ros::CallbackQueue* transaction_callback_queue);
TransactionCallback transaction_callback) = 0;

/**
* @brief Get the unique name of this sensor
Expand Down
11 changes: 4 additions & 7 deletions fuse_core/src/async_sensor_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ namespace fuse_core

AsyncSensorModel::AsyncSensorModel(size_t thread_count) :
name_("uninitialized"),
spinner_(thread_count, &callback_queue_),
transaction_callback_queue_(nullptr)
spinner_(thread_count, &callback_queue_)
{
}

Expand All @@ -63,16 +62,14 @@ void AsyncSensorModel::graphCallback(Graph::ConstSharedPtr graph)

void AsyncSensorModel::initialize(
const std::string& name,
TransactionCallback transaction_callback,
ros::CallbackQueue* transaction_callback_queue)
TransactionCallback transaction_callback)
{
// Initialize internal state
name_ = name;
node_handle_.setCallbackQueue(&callback_queue_);
private_node_handle_ = ros::NodeHandle(ros::NodeHandle("~"), name_);
private_node_handle_.setCallbackQueue(&callback_queue_);
transaction_callback_ = transaction_callback;
transaction_callback_queue_ = transaction_callback_queue;

// Call the derived onInit() function to perform implementation-specific initialization
onInit();
Expand All @@ -81,11 +78,11 @@ void AsyncSensorModel::initialize(
spinner_.start();
}

void AsyncSensorModel::injectCallback(
void AsyncSensorModel::sendTransaction(
const std::set<ros::Time>& stamps,
const Transaction::SharedPtr& transaction)
{
SensorModel::injectCallback(stamps, transaction, transaction_callback_, transaction_callback_queue_);
transaction_callback_(stamps, transaction);
}

} // namespace fuse_core
62 changes: 0 additions & 62 deletions fuse_core/src/sensor_model.cpp

This file was deleted.

20 changes: 6 additions & 14 deletions fuse_core/test/test_async_sensor_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ class MySensor : public fuse_core::AsyncSensorModel
TEST(AsyncSensorModel, OnInit)
{
MySensor sensor;
sensor.initialize("my_sensor", &transactionCallback, ros::getGlobalCallbackQueue());
sensor.initialize("my_sensor", &transactionCallback);
EXPECT_TRUE(sensor.initialized);
}

TEST(AsyncSensorModel, OnGraphUpdate)
{
MySensor sensor;
sensor.initialize("my_sensor", &transactionCallback, ros::getGlobalCallbackQueue());
sensor.initialize("my_sensor", &transactionCallback);

// Execute the graph callback in this thread. This should push a call to MySensor::onGraphUpdate()
// into MySensor's callback queue, which will get executed by MySensor's async spinner.
Expand All @@ -109,23 +109,15 @@ TEST(AsyncSensorModel, OnGraphUpdate)
EXPECT_TRUE(sensor.graph_received);
}

TEST(AsyncSensorModel, InjectCallback)
TEST(AsyncSensorModel, SendTransaction)
{
MySensor sensor;
sensor.initialize("my_sensor", &transactionCallback, ros::getGlobalCallbackQueue());
sensor.initialize("my_sensor", &transactionCallback);

// Use the sensor to inject a callback into the global callback queue. This will get executed by main's spinner.
// There is a time delay there. So, this call should return almost immediately, then we have to wait
// a bit before the "received_transaction" flag gets flipped.
// Use the sensor "sendTransaction()" method to execute the transaction callback. This will get executed immediately.
std::set<ros::Time> stamps;
fuse_core::Transaction::SharedPtr transaction; // nullptr, okay because we don't actually use it for anything
sensor.injectCallback(stamps, transaction);
EXPECT_FALSE(received_transaction);
ros::Time wait_time_elapsed = ros::Time::now() + ros::Duration(10.0);
while (!received_transaction && ros::Time::now() < wait_time_elapsed)
{
ros::Duration(0.1).sleep();
}
sensor.sendTransaction(stamps, transaction);
EXPECT_TRUE(received_transaction);
}

Expand Down
15 changes: 14 additions & 1 deletion fuse_optimizers/include/fuse_optimizers/optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Optimizer
/**
* @brief Callback fired every time a SensorModel plugin creates a new transaction
*
* @param[in] name The name of the sensor that produced the Transaction
* @param[in] sensor_name The name of the sensor that produced the Transaction
* @param[in] stamps Any timestamps associated with the added variables. These are sent to the motion models
* to generate connected constraints.
* @param[in] transaction The populated Transaction object created by the loaded SensorModel plugin
Expand Down Expand Up @@ -199,6 +199,19 @@ class Optimizer
void notify(
fuse_core::Transaction::ConstSharedPtr transaction,
fuse_core::Graph::ConstSharedPtr graph);

/**
* @brief Inject a transaction callback function into the global callback queue
*
* @param[in] sensor_name The name of the sensor that produced the Transaction
* @param[in] stamps Any timestamps associated with the added variables. These are sent to the motion models
* to generate connected constraints.
* @param[in] transaction The populated Transaction object created by the loaded SensorModel plugin
*/
void injectCallback(
const std::string& sensor_name,
const std::set<ros::Time>& stamps,
const fuse_core::Transaction::SharedPtr& transaction);
};

} // namespace fuse_optimizers
Expand Down
16 changes: 14 additions & 2 deletions fuse_optimizers/src/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <fuse_core/callback_wrapper.h>
#include <fuse_core/graph.h>
#include <fuse_core/transaction.h>
#include <fuse_core/uuid.h>
Expand Down Expand Up @@ -143,8 +144,7 @@ void Optimizer::loadSensorModels()
// Initialize the sensor
sensor_model->initialize(
sensor_name,
std::bind(&Optimizer::transactionCallback, this, sensor_name, std::placeholders::_1, std::placeholders::_2),
ros::getGlobalCallbackQueue());
std::bind(&Optimizer::injectCallback, this, sensor_name, std::placeholders::_1, std::placeholders::_2));
// Store the sensor in a member variable for use later
sensor_models_.emplace(sensor_name, std::move(sensor_model));
// Parse out the list of associated motion models, if any
Expand Down Expand Up @@ -284,4 +284,16 @@ void Optimizer::notify(
}
}

void Optimizer::injectCallback(
const std::string& sensor_name,
const std::set<ros::Time>& stamps,
const fuse_core::Transaction::SharedPtr& transaction)
{
// We are going to insert a call to the derived class's transactionCallback() method into the global callback queue.
// This returns execution to the sensor's thread quickly by moving the transaction processing to the optimizer's
// thread. And by using the existing ROS callback queue, we simplify the threading model of the optimizer.
ros::getGlobalCallbackQueue()->addCallback(boost::make_shared<fuse_core::CallbackWrapper<void>>(
std::bind(&Optimizer::transactionCallback, this, sensor_name, stamps, transaction)));
}

} // namespace fuse_optimizers

0 comments on commit fab0547

Please sign in to comment.