Skip to content

Commit

Permalink
Add plugin ability to watch file descriptors async
Browse files Browse the repository at this point in the history
With test using libcurl.

#34
  • Loading branch information
halfgaar committed Feb 28, 2023
1 parent 075e787 commit 3494771
Show file tree
Hide file tree
Showing 19 changed files with 648 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/checkout@v3
- run: sudo apt update
# Build prerequisites
- run: sudo apt install -y cmake libssl-dev qtbase5-dev ${{ matrix.aptpkg }}
- run: sudo apt install -y cmake libssl-dev qtbase5-dev libcurl4-openssl-dev ${{ matrix.aptpkg }}
# Building
- run: qmake -spec "${{ matrix.SPEC }}" FlashMQTestsMeta.pro
- run: make
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_executable(flashmq
subscription.h
sharedsubscribers.h
pluginloader.h
queuedtasks.h


mainapp.cpp
Expand Down Expand Up @@ -118,6 +119,7 @@ add_executable(flashmq
subscription.cpp
sharedsubscribers.cpp
pluginloader.cpp
queuedtasks.cpp

)

Expand Down
2 changes: 2 additions & 0 deletions FlashMQTests/FlashMQTests.pro
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ SOURCES += tst_maintests.cpp \
../subscription.cpp \
../sharedsubscribers.cpp \
../pluginloader.cpp \
../queuedtasks.cpp \
conffiletemp.cpp \
flashmqtempdir.cpp \
mainappthread.cpp \
Expand Down Expand Up @@ -118,6 +119,7 @@ HEADERS += \
../subscription.h \
../sharedsubscribers.h \
../pluginloader.h \
../queuedtasks.h \
conffiletemp.h \
flashmqtempdir.h \
mainappthread.h \
Expand Down
131 changes: 131 additions & 0 deletions FlashMQTests/plugins/curlfunctions.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#include "curlfunctions.h"
#include <sys/epoll.h>
#include "../../flashmq_plugin.h"
#include "test_plugin.h"
#include <cstring>

/**
* @brief This is curl telling us what events to watch for.
* @param easy
* @param s
* @param what
* @param clientp
* @param socketp
* @return
*/
int socket_event_watch_notification(CURL *easy, curl_socket_t s, int what, void *clientp, void *socketp)
{
(void)easy;
(void)clientp;
(void)socketp;

if (what == CURL_POLL_REMOVE)
flashmq_poll_remove_fd(s);
else
{
int events = 0;

if (what == CURL_POLL_IN)
events |= EPOLLIN;
else if (what == CURL_POLL_OUT)
events |= EPOLLOUT;
else if (what == CURL_POLL_INOUT)
events = EPOLLIN | EPOLLOUT;
else
return 1;

flashmq_poll_add_fd(s, events, std::weak_ptr<void>());
}

return 0;
}

void check_all_active_curls(CURLM *curlMulti)
{
CURLMsg *msg;
int msgs_left;
while((msg = curl_multi_info_read(curlMulti, &msgs_left)))
{
if (msg->msg == CURLMSG_DONE)
{
CURL *easy = msg->easy_handle;
AuthenticatingClient *c = nullptr;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &c);

flashmq_logf(LOG_INFO, "Libcurl said: %s", curl_easy_strerror(msg->data.result));

std::string answer(c->response.data(), std::min<int>(9, c->response.size()));

if (answer == "<!doctype")
flashmq_continue_async_authentication(c->client, AuthResult::success, std::string(), std::string());
else
flashmq_continue_async_authentication(c->client, AuthResult::login_denied, std::string(), std::string());

delete c;
}
}
}

void call_timed_curl_multi_socket_action(CURLM *multi, TestPluginData *p)
{
p->current_timer = 0;

int a = 0;
int rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &a);

/* Curl says: "When this function returns error, the state of all transfers are uncertain and they cannot be
* continued. curl_multi_socket_action should not be called again on the same multi handle after an error has
* been returned, unless first removing all the handles and adding new ones."
*
* It's not clear to me how to remove them all. Is this right? Or will this not give in-progress ones? The
* API doesn't seem to have a function to get all handles. Do I have to do external book-keeping?
*/
if (rc != CURLM_OK)
{
CURLMsg *msg;
int msgs_left;
while((msg = curl_multi_info_read(multi, &msgs_left)))
{
if (msg->msg == CURLMSG_DONE)
{
CURL *easy = msg->easy_handle;
AuthenticatingClient *c = nullptr;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &c);
delete c;
}
}
}

check_all_active_curls(multi);
}

int timer_callback(CURLM *multi, long timeout_ms, void *clientp)
{
TestPluginData *p = static_cast<TestPluginData*>(clientp);

// We also remove the last known task before it executes if curl tells us to install a new one. This
// is suggested by the unclear and incomplete example at https://curl.se/libcurl/c/CURLMOPT_TIMERFUNCTION.html.
if (timeout_ms == -1 || p->current_timer > 0)
{
flashmq_remove_task(p->current_timer);
p->current_timer = 0;
}

if (timeout_ms >= 0)
{
auto f = std::bind(&call_timed_curl_multi_socket_action, multi, p);
p->current_timer = flashmq_add_task(f, timeout_ms);
}
return CURLM_OK;
}

size_t curl_write_cb(char *data, size_t n, size_t l, void *userp)
{
AuthenticatingClient *ac = static_cast<AuthenticatingClient*>(userp);

int pos = ac->response.size();
ac->response.resize(ac->response.size() + n*l);
std::memcpy(&ac->response[pos], data, n*l);

return n*l;
}
13 changes: 13 additions & 0 deletions FlashMQTests/plugins/curlfunctions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef CURLFUNCTIONS_H
#define CURLFUNCTIONS_H

#include <curl/curl.h>
#include "test_plugin.h"

int socket_event_watch_notification(CURL *easy, curl_socket_t s, int what, void *clientp, void *socketp);
void check_all_active_curls(CURLM *curlMulti);
void call_timed_curl_multi_socket_action(CURLM *multi, TestPluginData *p);
int timer_callback(CURLM *multi, long timeout_ms, void *clientp);
size_t curl_write_cb(char *data, size_t n, size_t l, void *userp);

#endif // CURLFUNCTIONS_H
117 changes: 114 additions & 3 deletions FlashMQTests/plugins/test_plugin.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#include "functional"
#include <unistd.h>
#include <cassert>
#include <cstring>

#include "../../flashmq_plugin.h"
#include "test_plugin.h"

#include <curl/curl.h>
#include <sys/epoll.h>
#include "curlfunctions.h"


TestPluginData::~TestPluginData()
{
Expand All @@ -26,15 +32,55 @@ int flashmq_plugin_version()

void flashmq_plugin_allocate_thread_memory(void **thread_data, std::unordered_map<std::string, std::string> &plugin_opts)
{
*thread_data = new TestPluginData();
TestPluginData *p = new TestPluginData();
*thread_data = p;
(void)plugin_opts;

p->curlMulti = curl_multi_init();

if (p->curlMulti == nullptr)
throw std::runtime_error("Curl failed to init");

curl_multi_setopt(p->curlMulti, CURLMOPT_SOCKETFUNCTION, socket_event_watch_notification);
curl_multi_setopt(p->curlMulti, CURLMOPT_TIMERFUNCTION, timer_callback);
curl_multi_setopt(p->curlMulti, CURLMOPT_TIMERDATA, p);

}

void flashmq_plugin_deallocate_thread_memory(void *thread_data, std::unordered_map<std::string, std::string> &plugin_opts)
{
(void)plugin_opts;

TestPluginData *p = static_cast<TestPluginData*>(thread_data);

curl_multi_cleanup(p->curlMulti);

delete p;
(void)plugin_opts;
}

void flashmq_plugin_poll_event_received(void *thread_data, int fd, uint32_t events, const std::weak_ptr<void> &ptr)
{
(void)ptr;

TestPluginData *p = static_cast<TestPluginData*>(thread_data);

int new_events = CURL_CSELECT_ERR;

if (events & EPOLLIN)
{
new_events &= ~CURL_CSELECT_ERR;
new_events |= CURL_CSELECT_IN;
}
if (events & EPOLLOUT)
{
new_events &= ~CURL_CSELECT_ERR;
new_events |= CURL_CSELECT_OUT;
}

int n = -1;
curl_multi_socket_action(p->curlMulti, fd, new_events, &n);

check_all_active_curls(p->curlMulti);
}

void flashmq_plugin_init(void *thread_data, std::unordered_map<std::string, std::string> &plugin_opts, bool reloading)
Expand All @@ -54,7 +100,6 @@ void flashmq_plugin_deinit(void *thread_data, std::unordered_map<std::string, st
(void)thread_data;
(void)plugin_opts;
(void)reloading;

}

void flashmq_plugin_periodic_event(void *thread_data)
Expand Down Expand Up @@ -102,6 +147,29 @@ AuthResult flashmq_plugin_login_check(void *thread_data, const std::string &clie
}
}

if (username == "curl")
{
TestPluginData *p = static_cast<TestPluginData*>(thread_data);

// Libcurl is C, so we unfortunately have to use naked new and hope we'll delete it in all the right places.
AuthenticatingClient *c = new AuthenticatingClient;
c->client = client;
c->globalData = p;

curl_easy_setopt(c->eh, CURLOPT_WRITEFUNCTION, curl_write_cb);
curl_easy_setopt(c->eh, CURLOPT_WRITEDATA, c);
curl_easy_setopt(c->eh, CURLOPT_PRIVATE, c);

// Keep in mind that DNS resovling may be blocking too. You could perhaps resolve the DNS once and use the result.
curl_easy_setopt(c->eh, CURLOPT_URL, "http://www.google.com/");

// TODO: I don't like this error handling.
if (!c->addToMulti(p->curlMulti))
delete c;

return AuthResult::async;
}

return AuthResult::success;
}

Expand Down Expand Up @@ -228,4 +296,47 @@ void flashmq_plugin_main_init(std::unordered_map<std::string, std::string> &plug

// The plugin_opts aren't const. I don't know if that was a mistake or not anymore, but it works in my favor now.
plugin_opts["main_init was here"] = "true";

if (curl_global_init(CURL_GLOBAL_ALL) != 0)
throw std::runtime_error("Global curl init failed to init");
}

void flashmq_plugin_main_deinit(std::unordered_map<std::string, std::string> &plugin_opts)
{
(void)plugin_opts;

curl_global_cleanup();
}

void AuthenticatingClient::cleanup()
{
if (curlMulti)
{
curl_multi_remove_handle(curlMulti, eh);
curlMulti = nullptr;
}

curl_easy_cleanup(eh);
eh = nullptr;
}

AuthenticatingClient::AuthenticatingClient()
{
eh = curl_easy_init();
}

AuthenticatingClient::~AuthenticatingClient()
{
cleanup();
}

bool AuthenticatingClient::addToMulti(CURLM *curlMulti)
{
if (curl_multi_add_handle(curlMulti, eh) != CURLM_OK)
{
cleanup();
return false;
}
this->curlMulti = curlMulti;
return true;
}
25 changes: 25 additions & 0 deletions FlashMQTests/plugins/test_plugin.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#ifndef TESTPLUGIN_H
#define TESTPLUGIN_H

#include <unordered_map>
#include <vector>
#include <thread>
#include "../../forward_declarations.h"

#include <curl/curl.h>

class TestPluginData
{
public:
Expand All @@ -13,8 +17,29 @@ class TestPluginData

bool main_init_ran = false;

CURLM *curlMulti;

uint32_t current_timer = 0;

public:
~TestPluginData();
};

struct AuthenticatingClient
{
TestPluginData *globalData;
std::weak_ptr<Client> client;
std::vector<char> response;
CURL *eh = nullptr;
CURLM *curlMulti = nullptr;

void cleanup();

public:
AuthenticatingClient();
~AuthenticatingClient();

bool addToMulti(CURLM *curlMulti);
};

#endif // TESTPLUGIN_H
Loading

0 comments on commit 3494771

Please sign in to comment.