Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extensions to log server disk queue (made in the context of version vector/unicast) #11777

Open
wants to merge 7 commits into
base: version-vector-disk-queue
Choose a base branch
from
158 changes: 145 additions & 13 deletions fdbserver/TLogServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ struct TLogQueueEntryRef {
UID id;
Version version;
Version knownCommittedVersion;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;
StringRef messages;
TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {}
TLogQueueEntryRef() : version(0), knownCommittedVersion(0), prevVersion(0) {}
TLogQueueEntryRef(Arena& a, TLogQueueEntryRef const& from)
: id(from.id), version(from.version), knownCommittedVersion(from.knownCommittedVersion),
messages(a, from.messages) {}
messages(a, from.messages), prevVersion(from.prevVersion), tLogLocIds(from.tLogLocIds) {}

// To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, messages, knownCommittedVersion, id);
serializer(ar, version, messages, knownCommittedVersion, prevVersion, tLogLocIds, id);
}
size_t expectedSize() const { return messages.expectedSize(); }
};
Expand All @@ -73,8 +75,11 @@ struct AlternativeTLogQueueEntryRef {
Version version;
Version knownCommittedVersion;
std::vector<TagsAndMessage>* alternativeMessages;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;

AlternativeTLogQueueEntryRef() : version(0), knownCommittedVersion(0), alternativeMessages(nullptr) {}
AlternativeTLogQueueEntryRef()
: version(0), knownCommittedVersion(0), alternativeMessages(nullptr), prevVersion(0) {}

template <class Ar>
void serialize(Ar& ar) {
Expand All @@ -84,7 +89,7 @@ struct AlternativeTLogQueueEntryRef {
for (auto& msg : *alternativeMessages) {
ar.serializeBytes(msg.message);
}
serializer(ar, knownCommittedVersion, id);
serializer(ar, knownCommittedVersion, prevVersion, tLogLocIds, id);
}

uint32_t expectedSize() const {
Expand Down Expand Up @@ -139,6 +144,7 @@ struct TLogQueue final : public IClosable {
queue->close();
delete this;
}
IDiskQueue::location getNextReadLocation() { return queue->getNextReadLocation(); }

private:
IDiskQueue* queue;
Expand Down Expand Up @@ -217,6 +223,8 @@ static const KeyRangeRef persistTxsTagsKeys = KeyRangeRef("TxsTags/"_sr, "TxsTag
static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr);
static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr);
static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr);
static const KeyRef persistUnicastRecoveryLocationKey = KeyRef("UnicastRecoveryLocation"_sr);
static const KeyRef persistSpillTargetLogDataIdKey = KeyRef("SpillTargetLogDataId"_sr);

static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr;

Expand Down Expand Up @@ -731,6 +739,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
tLogData->persistentData->clear(KeyRangeRef(msgRefKey, strinc(msgRefKey)));
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear(KeyRangeRef(poppedKey, strinc(poppedKey)));
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistUnicastRecoveryLocationKey)));
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistSpillTargetLogDataIdKey)));
}
}

for (auto it = peekTracker.begin(); it != peekTracker.end(); ++it) {
Expand Down Expand Up @@ -778,6 +790,12 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
stoppedPromise.send(Void());
}
}

void purgeUnknownCommittedVersions(Version upToVersion) {
while (!unknownCommittedVersions.empty() && unknownCommittedVersions.back().version <= upToVersion) {
unknownCommittedVersions.pop_back();
}
}
};

template <class T>
Expand Down Expand Up @@ -965,10 +983,21 @@ ACTOR Future<Void> popDiskQueue(TLogData* self, Reference<LogData> logData) {
IDiskQueue::location minLocation = 0;
Version minVersion = 0;
auto locationIter = logData->versionLocation.lower_bound(logData->persistentDataVersion);
// If version vector is enabled then we need to preserve all versions from "knownCommittedVersion"
// onwards (for recovery purpose). Adjust the iterator position accordingly.
// @todo extend the code to consider the "knownCommittedVersion" of log servers
// from "priorCommittedLogServers" too.
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST &&
logData->knownCommittedVersion < logData->persistentDataVersion) {
locationIter = logData->versionLocation.lastLessOrEqual(logData->knownCommittedVersion);
}
if (locationIter != logData->versionLocation.end()) {
minLocation = locationIter->value.first;
minVersion = locationIter->key;
}
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
ASSERT_WE_THINK(minVersion <= logData->knownCommittedVersion);
}
logData->minPoppedTagVersion = std::numeric_limits<Version>::max();

for (int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
Expand Down Expand Up @@ -1108,6 +1137,49 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
KeyValueRef(persistRecoveryLocationKey, BinaryWriter::toValue(locationIter->value.first, Unversioned())));
}

state Version unicastRelevantMinimumVersion = std::numeric_limits<Version>::max();
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
// Track the position of the known committed version of "logData" - that is
// the position we will need to start reading the diskQueue from, in order to
// build "LogData::unknownCommittedVersions", in case of a restart.
state Reference<LogData> unicastRelevantLogData = logData;

// Track the position of the known committed version of the relevant log server
// from the previous epoch's log system - if the system is currently going through
// recovery and the current log system is not yet committed to the coordinated
// state then the previous epoch's log system will become the current log system
// in case the current recovery fails, so we will need to consider (committed) log
// servers from the previous epoch too.
auto const& dbInfo = self->dbInfo->get();
for (auto& it : self->id_data) {
if (std::find(dbInfo.priorCommittedLogServers.begin(),
dbInfo.priorCommittedLogServers.end(),
it.second->logId) != dbInfo.priorCommittedLogServers.end()) {
unicastRelevantLogData = it.second;
}
}

if (unicastRelevantLogData) {
auto kcvLocationIter =
unicastRelevantLogData->versionLocation.lastLessOrEqual(unicastRelevantLogData->knownCommittedVersion);
if (kcvLocationIter != unicastRelevantLogData->versionLocation.end()) {
self->persistentData->set(
KeyValueRef(persistUnicastRecoveryLocationKey,
BinaryWriter::toValue(kcvLocationIter->value.first, Unversioned())));
}

// @note "persistSpillTargetLogDataIdKey" is not used in the latest version
// of the code that builds "unknownCommittedVersions" list on restart. Consider
// removing this key later.
self->persistentData->set(
KeyValueRef(persistSpillTargetLogDataIdKey, BinaryWriter::toValue(logData->logId, Unversioned())));

unicastRelevantMinimumVersion =
unicastRelevantLogData
->knownCommittedVersion; // we don't want to purge versions from diskqueue from this version onwards
}
}

self->persistentData->set(
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
BinaryWriter::toValue(newPersistentDataVersion, Unversioned())));
Expand Down Expand Up @@ -1186,7 +1258,9 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
}
if (minVersion != std::numeric_limits<Version>::max()) {
self->persistentQueue->forgetBefore(
newPersistentDataVersion,
(!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST
? newPersistentDataVersion
: std::min(minVersion, unicastRelevantMinimumVersion)),
logData); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions.
// Should we limit the number of versions cleared at a time?
}
Expand Down Expand Up @@ -2430,6 +2504,8 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = req.messages;
qe.id = logData->logId;
qe.prevVersion = req.seqPrevVersion;
qe.tLogLocIds = req.tLogLocIds;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand All @@ -2439,11 +2515,10 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set(req.version);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
ASSERT(req.tLogCount == req.tLogLocIds.size());
logData->unknownCommittedVersions.emplace_front(req.version, req.seqPrevVersion, req.tLogLocIds);
while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= req.knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
}
// Purge versions from "unknownCommittedVersions" list till "req.knownCommittedVersion".
logData->purgeUnknownCommittedVersions(req.knownCommittedVersion);
} else {
ASSERT(req.prevVersion == req.seqPrevVersion); // @todo remove this assert later
}
Expand Down Expand Up @@ -3058,6 +3133,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down Expand Up @@ -3213,10 +3289,13 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
state Future<RangeResult> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<RangeResult> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
state Future<RangeResult> fTLogSpillTypes = storage->readRange(persistTLogSpillTypeKeys);
state Future<Optional<Value>> fUnicastRecoveryLocation = storage->readValue(persistUnicastRecoveryLocationKey);
state Future<Optional<Value>> fSpillTargetLogDataId = storage->readValue(persistSpillTargetLogDataIdKey);

// FIXME: metadata in queue?

wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode }));
wait(waitForAll(std::vector{
fFormat, fRecoveryLocation, fEncryptionAtRestMode, fUnicastRecoveryLocation, fSpillTargetLogDataId }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
Expand Down Expand Up @@ -3290,10 +3369,31 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
BinaryReader::fromStringRef<Version>(it.value, Unversioned());
}

state IDiskQueue::location minimumRecoveryLocation = 0;
state IDiskQueue::location minimumRecoveryLocation =
0; // the position to start reading the disk queue from (on recovery)
state IDiskQueue::location nonUnicastRecoveryLocation =
0; // the position to start reading the disk queue from (on recovery) when version vector/unicast is disabled
if (fRecoveryLocation.get().present()) {
minimumRecoveryLocation =
nonUnicastRecoveryLocation =
BinaryReader::fromStringRef<IDiskQueue::location>(fRecoveryLocation.get().get(), Unversioned());

// Initialize "minimumRecoveryLocation".
minimumRecoveryLocation = nonUnicastRecoveryLocation;
}

state IDiskQueue::location unicastRecoveryLocation =
0; // the position to start reading the disk queue from (on recovery) when version vector/unicast is enabled
// @note versions in the position range (unicastRecoveryLocation, nonUnicastRecoveryLocation]
// will be read in order to build "LogData::unknownCommittedVersions" (needed by the unicast
// recovery algorithm) only.
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && fUnicastRecoveryLocation.get().present()) {
unicastRecoveryLocation =
BinaryReader::fromStringRef<IDiskQueue::location>(fUnicastRecoveryLocation.get().get(), Unversioned());

// Update "minimumRecoveryLocation".
if (unicastRecoveryLocation < minimumRecoveryLocation) {
minimumRecoveryLocation = unicastRecoveryLocation;
}
}

state int idx = 0;
Expand Down Expand Up @@ -3403,6 +3503,10 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
bool recoveryFinished = wait(self->persistentQueue->initializeRecovery(minimumRecoveryLocation));
if (recoveryFinished)
throw end_of_stream();
// Check if the version to be read is to be used to build "LogData::unknownCommittedVersions" only.
state bool buildUnknownCommittedOnly =
(SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST &&
(self->persistentQueue->getNextReadLocation() < nonUnicastRecoveryLocation));
loop {
if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery");
Expand All @@ -3425,11 +3529,32 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// logData->version.get());

if (logData) {

if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && buildUnknownCommittedOnly) {
logData->knownCommittedVersion =
std::max(logData->knownCommittedVersion, qe.knownCommittedVersion);
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);
// Purge versions from "unknownCommittedVersions" list till the "knownCommittedVersion".
logData->purgeUnknownCommittedVersions(logData->knownCommittedVersion);
if (buildUnknownCommittedOnly) {
buildUnknownCommittedOnly =
(self->persistentQueue->getNextReadLocation() < nonUnicastRecoveryLocation);
}
continue;
}

if (!self->spillOrder.size() || self->spillOrder.back() != qe.id) {
self->spillOrder.push_back(qe.id);
}
logData->knownCommittedVersion =
std::max(logData->knownCommittedVersion, qe.knownCommittedVersion);

if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);
// Purge versions from "unknownCommittedVersions" list till the "knownCommittedVersion".
logData->purgeUnknownCommittedVersions(logData->knownCommittedVersion);
}

if (qe.version > logData->version.get()) {
commitMessages(self, logData, qe.version, qe.arena(), qe.messages);
logData->version.set(qe.version);
Expand Down Expand Up @@ -3458,6 +3583,10 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
ASSERT_WE_THINK(qe.version == logData->version.get());
}
}
if (buildUnknownCommittedOnly) {
buildUnknownCommittedOnly =
(self->persistentQueue->getNextReadLocation() < nonUnicastRecoveryLocation);
}
}
when(wait(allRemoved)) {
throw worker_removed();
Expand All @@ -3473,6 +3602,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
CODE_PROBE(now() - startt >= 1.0, "TLog recovery took more than 1 second");

for (auto it : self->id_data) {
// @todo to be removed later (after getting enough test coverage for the above code)
ASSERT_WE_THINK(it.second->knownCommittedVersion <= it.second->version.get());
if (it.second->queueCommittedVersion.get() == 0) {
TraceEvent("TLogZeroVersion", self->dbgid).detail("LogId", it.first);
it.second->queueCommittedVersion.set(it.second->version.get());
Expand Down Expand Up @@ -3707,6 +3838,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down