-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExternalRenderer.cpp
105 lines (95 loc) · 3.55 KB
/
ExternalRenderer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include "ExternalRenderer.h"
#include "utils.h"
ExternalRenderer::ExternalRenderer (RowSize recordSize,
vector<string>::const_iterator runFileNames_begin,
vector<string>::const_iterator runFileNames_end,
u_int64_t readAheadSize,
u_int8_t pass, u_int16_t rendererNumber,
bool removeDuplicates) : // 500 KB = 2^19
SortedRecordRenderer(recordSize, pass, rendererNumber, removeDuplicates)
{
TRACE (false);
u_int64_t totalSize = 0;
u_int64_t maxSize = 0;
string longRunFileName;
for (auto it = runFileNames_begin; it != runFileNames_end; ++it) {
auto &runFileName = *it;
u_int64_t fileSize = std::filesystem::file_size(runFileName);
totalSize += fileSize;
if (fileSize > maxSize) {
longRunFileName = runFileName;
maxSize = fileSize;
}
}
#ifdef PRODUCTION
// check the devices of the files
set<int> deviceTypes;
for (auto it = runFileNames_begin; it != runFileNames_end; ++it) {
auto &runFileName = *it;
auto [deviceType, deviceSize] = parseDeviceType(runFileName);
for (auto &type : deviceType) {
deviceTypes.insert(type);
}
}
if (deviceTypes.size() == 1) {
auto deviceType = *deviceTypes.begin();
string output = "Merge sorted runs on the ";
output += getDeviceName(deviceType);
output += " device";
Trace::PrintTrace(OP_STATE, deviceType == STORAGE_SSD? MERGE_RUNS_SSD : MERGE_RUNS_HDD, output);
}
else if (deviceTypes.size() == 2) {
Trace::PrintTrace(OP_STATE, MERGE_RUNS_BOTH, "Merge sorted runs on both devices");
}
else {
throw std::invalid_argument("ExternalRenderer: run files are on more than two devices");
}
#endif
if (((double) maxSize) / (totalSize - maxSize) > LONG_RUN_THRESHOLD) {
#if defined(VERBOSEL1) || defined(VERBOSEL2)
traceprintf("Optimizing merge pattern because of long run file %s, total size %llu, max size %llu\n", longRunFileName.c_str(), totalSize, maxSize);
#endif
longRun = new ExternalRun(longRunFileName, _recordSize);
} else {
longRun = nullptr;
}
vector<byte *> formingRows;
ExternalRun::READ_AHEAD_SIZE = readAheadSize;
ExternalRun::READ_AHEAD_THRESHOLD = std::max(0.5, ((double) MEMORY_SIZE - readAheadSize) / MEMORY_SIZE);
#if defined(VERBOSEL1) || defined(VERBOSEL2)
traceprintf ("Renderer %d: %zu run files, read-ahead size %llu threshold %f\n", rendererNumber, runFileNames.size(), readAheadSize, ExternalRun::READ_AHEAD_THRESHOLD);
#endif
for (auto it = runFileNames_begin; it != runFileNames_end; ++it) {
auto &runFileName = *it;
if (longRun != nullptr && runFileName == longRunFileName) continue;
ExternalRun * run = new ExternalRun(runFileName, _recordSize);
_runs.push_back(run);
formingRows.push_back(run->next());
}
_tree = new TournamentTree(formingRows, _recordSize);
} // ExternalRenderer::ExternalRenderer
ExternalRenderer::~ExternalRenderer ()
{
TRACE (false);
for (auto &run : _runs) {
delete run;
}
} // ExternalRenderer::~ExternalRenderer
byte * ExternalRenderer::next ()
{
TRACE (false);
return SortedRecordRenderer::renderRow(
[this] () -> byte * {
auto bufferNum = _tree->peekTopBuffer();
auto run = _runs.at(bufferNum);
return run->next();
},
_tree,
longRun
);
} // ExternalRenderer::next
void ExternalRenderer::print ()
{
traceprintf ("%zu run files\n", _runs.size());
_tree->printTree();
} // ExternalRenderer::print