Skip to content

Commit

Permalink
Merge pull request #31 from acoustid/json-log
Browse files Browse the repository at this point in the history
Json log
  • Loading branch information
lalinsky authored Feb 25, 2024
2 parents 5ebd17a + 22ff05b commit 269be11
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ env:

jobs:
build:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v2
- run: |
sudo apt-get update
sudo apt-get install -y qtbase5-dev libgtest-dev
sudo apt-get install -y qt6-base-dev libgtest-dev
- run: cmake -DCMAKE_BUILD_TYPE=Release .
- run: make
- run: make check
Expand Down
7 changes: 4 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ set(CMAKE_AUTORCC ON)

find_package(Threads REQUIRED)
find_package(GTest)
find_package(Qt5 COMPONENTS Core Network Concurrent REQUIRED)
find_package(Qt6 COMPONENTS Core Network Concurrent REQUIRED)

set(CPACK_GENERATOR "DEB")
set(CPACK_PACKAGE_NAME acoustid-index)
Expand Down Expand Up @@ -68,9 +68,10 @@ set(fpindexlib_SOURCES
src/store/ram_output_stream.cpp
src/util/crc.c
src/util/options.cpp
src/util/tracing.cpp
)
add_library(fpindexlib ${fpindexlib_SOURCES})
target_link_libraries(fpindexlib Qt5::Core Qt5::Network Qt5::Concurrent)
target_link_libraries(fpindexlib Qt6::Core Qt6::Network Qt6::Concurrent)

set(qhttp_SOURCES
./src/3rdparty/qhttp/src/qhttpserverconnection.cpp
Expand All @@ -85,7 +86,7 @@ set(qhttp_SOURCES
)
add_library(qhttp ${qhttp_SOURCES})
target_include_directories(qhttp PRIVATE ./src/3rdparty/)
target_link_libraries(qhttp Qt5::Core Qt5::Network Qt5::Concurrent)
target_link_libraries(qhttp Qt6::Core Qt6::Network Qt6::Concurrent)

set(fpserver_SOURCES
src/server/listener.cpp
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM ubuntu:20.04
FROM ubuntu:22.04

RUN useradd -m -s /bin/bash -u 1000 acoustid

RUN apt-get update && \
apt-get install -y libqt5network5 libqt5core5a libstdc++6 libgcc1 libgcc-s1
apt-get install -y libqt6network6 libqt6core6 libstdc++6 libgcc1 libgcc-s1

ADD acoustid-index.deb /tmp/
RUN dpkg -i /tmp/acoustid-index.deb && rm /tmp/acoustid-index.deb
Expand Down
6 changes: 5 additions & 1 deletion src/index/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
using namespace Acoustid;

Index::Index(DirectorySharedPtr dir, bool create)
: m_mutex(QMutex::Recursive), m_dir(dir), m_open(false),
: m_mutex(), m_dir(dir), m_open(false),
m_hasWriter(false),
m_deleter(new IndexFileDeleter(dir))
{
Expand All @@ -28,9 +28,11 @@ Index::~Index()

void Index::open(bool create)
{
QMutexLocker locker(&m_mutex);
if (!m_info.load(m_dir.data(), true)) {
if (create) {
IndexWriter(m_dir, m_info).commit();
locker.unlock();
return open(false);
}
throw IOException("there is no index in the directory");
Expand All @@ -45,6 +47,7 @@ QSharedPointer<IndexReader> Index::openReader()
if (!m_open) {
throw IndexIsNotOpen("index is not open");
}
locker.unlock();
return QSharedPointer<IndexReader>::create(sharedFromThis());
}

Expand All @@ -55,6 +58,7 @@ QSharedPointer<IndexWriter> Index::openWriter(bool wait, int64_t timeoutInMSecs)
throw IndexIsNotOpen("index is not open");
}
acquireWriterLockInt(wait, timeoutInMSecs);
locker.unlock();
return QSharedPointer<IndexWriter>::create(sharedFromThis(), true);
}

Expand Down
2 changes: 1 addition & 1 deletion src/index/index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void IndexWriter::merge(const QList<int>& merge)
throw CorruptIndexException("checksum mismatch after merge");
}

QSet<int> merged = merge.toSet();
QSet<int> merged = QSet<int>(merge.begin(), merge.end());
info.clearSegments();
for (size_t i = 0; i < segments.size(); i++) {
const SegmentInfo& s = segments.at(i);
Expand Down
2 changes: 1 addition & 1 deletion src/index/segment_merge_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ QList<int> SegmentMergePolicy::findMerges(const SegmentInfoList& infos)
for (size_t i = 0; i < infos.size(); i++) {
segments.append(i);
}
qStableSort(segments.begin(), segments.end(), SegmentSizeLessThan(&infos));
std::stable_sort(segments.begin(), segments.end(), SegmentSizeLessThan(&infos));
//qDebug() << "Order after sorting is " << segments;

size_t minSegmentSize = infos.at(segments.last()).blockCount();
Expand Down
2 changes: 2 additions & 0 deletions src/server/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "errors.h"
#include "protocol.h"
#include "metrics.h"
#include "util/tracing.h"

using namespace Acoustid;
using namespace Acoustid::Server;
Expand Down Expand Up @@ -133,6 +134,7 @@ void Connection::readIncomingData()
}

auto futureResult = QtConcurrent::run([=]() {
setTraceId(m_session->getTraceId());
QString response;
try {
response = renderResponse(handler());
Expand Down
9 changes: 9 additions & 0 deletions src/server/http/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ QString HttpRequest::param(const QString &name, const QString &defaultValue) con
return defaultValue;
}

QString HttpRequest::header(const QString &name, const QString &defaultValue) const {
auto encodedName = name.toUtf8();
auto it = m_headers.find(encodedName);
if (it != m_headers.end()) {
return *it;
}
return defaultValue;
}

QUrl HttpRequest::url() const { return m_url; }

QJsonDocument HttpRequest::json() const {
Expand Down
2 changes: 2 additions & 0 deletions src/server/http/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class HttpRequest {

QByteArray body() const { return m_body; }

QString header(const QString &name, const QString &defaultValue = QString()) const;

void setMethod(HttpMethod method) { m_method = method; }

void setUrl(const QUrl &url) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/http/response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void HttpResponse::send(qhttp::server::QHttpRequest *req, qhttp::server::QHttpRe
res->end();
return;
}
res->addHeaderValue("Content-Length", m_body.size());
res->addHeaderValue("Content-Length", static_cast<size_t>(m_body.size()));
res->end(m_body);
}

Expand Down
11 changes: 8 additions & 3 deletions src/server/http/router.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include "router.h"
#include "util/tracing.h"

#include <QRegularExpression>
#include <QtConcurrent>
#include <QThreadPool>

namespace Acoustid {
namespace Server {
Expand All @@ -10,7 +11,7 @@ void HttpRouter::route(HttpMethod method, const QString &path, HttpHandlerFunc h
auto pathParts = path.split('/');
for (auto &pathPart : pathParts) {
if (pathPart.startsWith(':')) {
auto paramName = pathPart.midRef(1);
auto paramName = pathPart.mid(1);
pathPart = "(?<" + paramName + ">[^/]+)";
}
}
Expand Down Expand Up @@ -51,7 +52,11 @@ void HttpRouter::handle(qhttp::server::QHttpRequest *req, qhttp::server::QHttpRe
HttpRequest request(req->method(), req->url());
request.setHeaders(req->headers());
request.setBody(req->collectedData());
QtConcurrent::run([=]() {
QThreadPool::globalInstance()->start([=]() {
auto traceId = request.header("X-Trace-Id");
if (!traceId.isEmpty()) {
setTraceId(traceId);
}
HttpResponse response;
try {
response = handle(request);
Expand Down
47 changes: 44 additions & 3 deletions src/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include <QCoreApplication>
#include <QThreadPool>
#include <QJsonObject>
#include <QJsonDocument>
#include "qhttpserver.hpp"
#include "qhttpserverrequest.hpp"
#include "qhttpserverresponse.hpp"
Expand All @@ -16,8 +18,47 @@ using namespace Acoustid::Server;

using namespace qhttp::server;

static QTextStream stderrStream(stderr);

void handleLogMessage(QtMsgType type, const QMessageLogContext &context, const QString &msg)
{
QString time = QDateTime::currentDateTimeUtc().toString(Qt::ISODateWithMs);

QString level;
switch (type) {
case QtDebugMsg:
level = "debug";
break;
case QtInfoMsg:
level = "info";
break;
case QtWarningMsg:
level = "warning";
break;
case QtCriticalMsg:
level = "error";
break;
case QtFatalMsg:
level = "error";
break;
}

QJsonObject obj;
obj.insert("time", time);
obj.insert("level", level);
obj.insert("message", msg);

stderrStream << QJsonDocument(obj).toJson(QJsonDocument::Compact) << Qt::endl;

if (type == QtFatalMsg) {
abort();
}
}

int main(int argc, char **argv)
{
qInstallMessageHandler(handleLogMessage);

OptionParser parser("%prog [options]");
parser.addOption("directory", 'd')
.setArgument()
Expand Down Expand Up @@ -72,16 +113,16 @@ int main(int argc, char **argv)
Listener listener(path, opts->contains("mmap"));
listener.setMetrics(metrics);
listener.listen(QHostAddress(address), port);
qDebug() << "Simple server listening on" << address << "port" << port;
qInfo() << "Simple server listening on" << address << "port" << port;

HttpRequestHandler handler(listener.index(), metrics);

QHttpServer httpListener(&app);
httpListener.listen(QHostAddress(httpAddress), httpPort, [&](QHttpRequest *req, QHttpResponse *res) {
handler.router().handle(req, res);
});
qDebug() << "HTTP server listening on" << httpAddress << "port" << httpPort;
qDebug() << "Prometheus metrics available at" << QString("http://%1:%2/_metrics").arg(httpAddress).arg(httpPort);
qInfo() << "HTTP server listening on" << httpAddress << "port" << httpPort;
qInfo() << "Prometheus metrics available at" << QString("http://%1:%2/_metrics").arg(httpAddress).arg(httpPort);

return app.exec();
}
Expand Down
13 changes: 8 additions & 5 deletions src/server/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ ScopedHandlerFunc buildHandler(const QString &command, const QStringList &args)
} else if (command == "begin") {
return [=](QSharedPointer<Session> session) { session->begin(); return QString(); };
} else if (command == "commit") {
return [=](QSharedPointer<Session> session) { session->commit(); return QString(); };
return [=](QSharedPointer<Session> session) { session->commit(); session->clearTraceId(); return QString(); };
} else if (command == "rollback") {
return [=](QSharedPointer<Session> session) { session->rollback(); return QString(); };
return [=](QSharedPointer<Session> session) { session->rollback(); session->clearTraceId(); return QString(); };
} else if (command == "optimize") {
return [=](QSharedPointer<Session> session) { session->optimize(); return QString(); };
return [=](QSharedPointer<Session> session) { session->optimize(); session->clearTraceId(); return QString(); };
} else if (command == "cleanup") {
return [=](QSharedPointer<Session> session) { session->cleanup(); return QString(); };
return [=](QSharedPointer<Session> session) { session->cleanup(); session->clearTraceId(); return QString(); };
} else if (command == "insert") {
if (args.size() != 2) {
throw BadRequest("expected two arguments");
Expand All @@ -80,6 +80,7 @@ ScopedHandlerFunc buildHandler(const QString &command, const QStringList &args)
auto id = args.at(0).toInt();
auto hashes = parseFingerprint(args.at(1));
session->insert(id, hashes);
session->clearTraceId();
return QString();
};
} else if (command == "search") {
Expand All @@ -94,7 +95,9 @@ ScopedHandlerFunc buildHandler(const QString &command, const QStringList &args)
for (int i = 0; i < results.size(); i++) {
output.append(QString("%1:%2").arg(results[i].id()).arg(results[i].score()));
}
return output.join(" ");
QString outputString = output.join(" ");
session->clearTraceId();
return outputString;
};
} else {
throw BadRequest(QString("unknown command %1").arg(command));
Expand Down
14 changes: 14 additions & 0 deletions src/server/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ void Session::setAttribute(const QString &name, const QString &value) {
m_idle_timeout = value.toInt();
return;
}
if (name == "trace_id") {
m_traceId = value;
return;
}
if (m_indexWriter.isNull()) {
throw NotInTransactionException();
}
Expand All @@ -114,3 +118,13 @@ QList<Result> Session::search(const QVector<uint32_t> &hashes) {
}
return collector.topResults();
}

QString Session::getTraceId() {
QMutexLocker locker(&m_mutex);
return m_traceId;
}

void Session::clearTraceId() {
QMutexLocker locker(&m_mutex);
m_traceId.clear();
}
4 changes: 4 additions & 0 deletions src/server/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class Session

QSharedPointer<Metrics> metrics() const { return m_metrics; }

QString getTraceId();
void clearTraceId();

private:
QMutex m_mutex;
QSharedPointer<Index> m_index;
Expand All @@ -48,6 +51,7 @@ class Session
int m_maxResults { 500 };
int64_t m_timeout { 0 };
int64_t m_idle_timeout { 60 * 1000 };
QString m_traceId;
};

}
Expand Down
14 changes: 14 additions & 0 deletions src/util/tracing.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include <QThreadStorage>
#include "util/tracing.h"

static QThreadStorage<QString> traceId;

void setTraceId(const QString &value)
{
traceId.setLocalData(value);
}

QString getTraceId()
{
return traceId.localData();
}
6 changes: 6 additions & 0 deletions src/util/tracing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#pragma once

#include <QString>

QString getTraceId();
void setTraceId(const QString &id);

0 comments on commit 269be11

Please sign in to comment.