-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExternalSorter.cpp
225 lines (179 loc) · 8.43 KB
/
ExternalSorter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// skeleton of all methods:
#include "ExternalSorter.h"
#include "utils.h"
ExternalSorter::ExternalSorter (const vector<string>& runNames, RowSize recordSize, bool removeDuplicates)
: recordSize(recordSize), removeDuplicates(removeDuplicates), runNames(runNames)
{
TRACE (false);
}
ExternalSorter::~ExternalSorter ()
{
TRACE (false);
}
SortedRecordRenderer * ExternalSorter::init () {
u_int8_t pass = 0;
SortedRecordRenderer * renderer = nullptr;
vector<string> mergedRunNames;
// Multi-pass merge
while (runNames.size() > 1) { // Need another pass to merge the runs
++ pass;
Trace::PrintStdout("Pass %d: Merging %zu runs\n", pass, runNames.size());
u_int16_t rendererNum = 0;
// Gracefully merge all in one pass if possible (this is the last pass if there is only one renderer)
u_int64_t allMemoryNeeded = calculateMemoryForAll(runNames);
if (allMemoryNeeded <= MEMORY_SIZE * GRACEFUL_DEGRADATION_THRESHOLD && allMemoryNeeded > MEMORY_SIZE) {
renderer = gracefulMerge(runNames, pass, rendererNum);
return renderer;
}
u_int16_t mergedRunCount = 0;
mergedRunNames.clear();
while (mergedRunCount < runNames.size()) { // has more runs to merge, may still be the last pass if allMemoryNeeded <= MEMORY_SIZE
u_int16_t mergedRunCountSoFar = mergedRunCount;
auto [mergedRunCountNew, readAheadSize] = assignRuns(runNames, mergedRunCountSoFar);
mergedRunCount = mergedRunCountNew;
#if defined(VERBOSEL1) || defined(VERBOSEL2)
traceprintf ("Pass %d renderer %d: Merging runs from %d to %d out of %zu\n", pass, rendererNum, mergedRunCountSoFar, mergedRunCount - 1, runNames.size() - 1);
#endif
renderer = new ExternalRenderer(recordSize,
runNames.cbegin() + mergedRunCountSoFar, runNames.cbegin() + mergedRunCount,
readAheadSize, pass, rendererNum, removeDuplicates);
if (rendererNum == 0 && mergedRunCount == runNames.size()) // The last renderer in the last pass
{
#if defined(VERBOSEL1) || defined(VERBOSEL2)
traceprintf ("====== Stopped at pass %d\n", pass);
#endif
return renderer;
} else { // Need another pass for merged runs
mergedRunNames.push_back(renderer->run());
rendererNum++;
delete renderer;
}
// print progress: (XX / XX)
Trace::PrintStdout("Pass %d: Merged %d runs out of %zu\n", pass, mergedRunCount, runNames.size());
}
runNames = mergedRunNames;
}
throw std::runtime_error("External sort not returning renderer.");
}
tuple<u_int16_t, u_int64_t> ExternalSorter::assignRuns(const vector<string>& runNames, u_int16_t mergedRunCount)
{
auto [readAheadSize, outputPageSize] = profileReadAheadAndOutput(runNames, mergedRunCount);
// INPUT BUFFERS
u_int64_t memoryConsumption = readAheadSize + outputPageSize;
while (mergedRunCount < runNames.size()) { // max. 120 G / 98 M = 2^11
const string &runName = runNames.at(mergedRunCount);
auto deviceType = getLargestDeviceType(runName);
auto pageSize = Metrics::getParams(deviceType).pageSize;
if (memoryConsumption + pageSize > MEMORY_SIZE) break;
memoryConsumption += pageSize;
mergedRunCount++;
}
Assert (memoryConsumption <= MEMORY_SIZE, __FILE__, __LINE__);
readAheadSize += MEMORY_SIZE - memoryConsumption; // Use the remaining memory for more read-ahead buffers if any
return std::make_tuple(mergedRunCount, readAheadSize);
}
u_int64_t ExternalSorter::calculateMemoryForAll (const vector<string>& runNames)
{
auto [readAheadSize, outputPageSize] = profileReadAheadAndOutput(runNames, 0);
// Calculate all memory consumption till the end of runNames
u_int64_t allMemoryConsumption = readAheadSize + outputPageSize;
for (size_t i = 0; i < runNames.size(); i++) {
auto deviceType = getLargestDeviceType(runNames.at(i));
auto pageSize = Metrics::getParams(deviceType).pageSize;
allMemoryConsumption += pageSize;
}
return allMemoryConsumption;
}
tuple<u_int64_t, u_int64_t> ExternalSorter::profileReadAheadAndOutput (const vector<string>& runNames, u_int16_t mergedRunCount)
{
TRACE (false);
// A conservative estimate of the output page size
// The profiled outputFileSize is the largest possible
u_int64_t memoryConsumption = 0;
u_int64_t outputFileSize = 0;
u_int16_t ssdRunCount = 0;
u_int16_t hddRunCount = 0;
while (mergedRunCount < runNames.size()) {
const string &runName = runNames.at(mergedRunCount);
auto deviceType = getLargestDeviceType(runName);
auto pageSize = Metrics::getParams(deviceType).pageSize;
if (memoryConsumption + pageSize > MEMORY_SIZE) break;
memoryConsumption += pageSize;
mergedRunCount++;
outputFileSize += std::filesystem::file_size(runName);
if (deviceType == 0) ssdRunCount++;
else hddRunCount++;
}
// Output page size
int deviceType = Metrics::getAvailableStorage(outputFileSize);
// Allocate output buffer conservatively: max page sizes of all storage devices needed
// But will try to write into SSD first if possible
auto outputPageSize = Metrics::getParams(deviceType).pageSize;
// Read-ahead buffer size
double hddRunRatio = (double) hddRunCount / (hddRunCount + ssdRunCount);
int hddBufferCount;
if (hddRunRatio > 0.67) hddBufferCount = 2;
else if (hddRunCount > 0.33) hddBufferCount = 1;
else hddBufferCount = 0;
u_int64_t readAheadSize = hddBufferCount * HDD_PAGE_SIZE +
(READ_AHEAD_BUFFERS_MIN - hddBufferCount) * SSD_PAGE_SIZE;
return std::make_tuple(readAheadSize, outputPageSize);
}
SortedRecordRenderer * ExternalSorter::gracefulMerge (const vector<string>& runNames, int basePass, int rendererNum)
{
TRACE (false);
Trace::PrintStdout("GRACEFUL MERGE\n");
#if defined(VERBOSEL1) || defined(VERBOSEL2)
traceprintf ("====== Pass %d Graceful merge %zu runs\n", basePass, runNames.size());
#endif
// Optimization problem:
// Memory consumption of the graceful renderer <= MEMORY_SIZE
// Minimize the size of the initial run so that
auto [gracefulReadAheadSize, ret2] = profileReadAheadAndOutput(runNames, 0);
// Only the first few runs whose pages can fit into memory are profiled --- for graceful renderer
// It is assumed that the initial renderer would have plenty of available memory for reading ahead
// Because we are minimizing the size of the initial run
int n = runNames.size();
u_int64_t inputMemoryForAllRuns = 0; // Input buffer size to contain pages from all runs
u_int64_t allRunSize = 0; // Total size of all runs
for (int i = 0; i < n; i++) {
auto deviceType = getLargestDeviceType(runNames.at(i));
auto pageSize = Metrics::getParams(deviceType).pageSize;
inputMemoryForAllRuns += pageSize;
allRunSize += std::filesystem::file_size(runNames.at(i));
}
int const gracefulOutputDevice = Metrics::getAvailableStorage(allRunSize);
u_int64_t initialRunSize = 0;
u_int64_t initialPageSize = 0;
u_int64_t initialInputMemory = 0; // output page size for initialRenderer, input page size of initialRun for gracefulRenderer
u_int64_t const gracefulOutputPage = Metrics::getParams(gracefulOutputDevice).pageSize;
u_int64_t const gracefulMemoryFixed = gracefulOutputPage + gracefulReadAheadSize;
// Find the smallest initial run size
int i;
for (i = 1; i <= n; i++) {
// the last i runs are merged in the initial renderer
string runName = runNames.at(n - i);
initialRunSize += std::filesystem::file_size(runName);
auto deviceType = getLargestDeviceType(runName);
auto pageSize = Metrics::getParams(deviceType).pageSize;
initialInputMemory += pageSize;
auto initialRunDevice = Metrics::getAvailableStorage(initialRunSize);
initialPageSize = Metrics::getParams(initialRunDevice).pageSize;
if (gracefulMemoryFixed + inputMemoryForAllRuns - initialInputMemory + initialPageSize <= MEMORY_SIZE) // Memory consumption of the graceful renderer
{
break;
}
}
u_int64_t initialReadAheadSize = MEMORY_SIZE - initialInputMemory - initialPageSize;
Assert (initialReadAheadSize >= 0, __FILE__, __LINE__);
ExternalRenderer * initialRenderer = new ExternalRenderer(recordSize,
runNames.cend() - i, runNames.cend(),
initialReadAheadSize, basePass, rendererNum + 1, removeDuplicates
); // rendererNum + 1: This run will not be pushed back to mergedRunNames in _externalSort
string initialRunFileName = initialRenderer->run();
// Create the graceful renderer for the rest of the runs + initial run
vector<string> restOfRuns = vector<string>(runNames.begin(), runNames.end() - i);
restOfRuns.push_back(initialRunFileName);
SortedRecordRenderer * gracefulRenderer = new ExternalRenderer(recordSize, restOfRuns.cbegin(), restOfRuns.cend(), gracefulReadAheadSize, basePass, rendererNum, removeDuplicates);
return gracefulRenderer;
}