Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GTiff: MultiThreadedRead(): make it take into account AdviseRead() limit to reduce the number of I/O requests #9694

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions autotest/gcore/tiff_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -4447,6 +4447,92 @@ def method(request):
gdal.GetDriverByName("GTIFF").Delete(cog_filename)


###############################################################################
# Test GTiffDataset::MultiThreadedRead() when the amount of requested bytes
# exceed the allowed limit.


@pytest.mark.require_curl()
@pytest.mark.skipif(
not check_libtiff_internal_or_at_least(4, 0, 11),
reason="libtiff >= 4.0.11 required",
)
def test_tiff_read_vsicurl_multi_threaded_beyond_advise_read_limit(tmp_path):

webserver_process = None
webserver_port = 0

(webserver_process, webserver_port) = webserver.launch(
handler=webserver.DispatcherHttpHandler
)
if webserver_port == 0:
pytest.skip()

gdal.VSICurlClearCache()

tmp_filename = str(tmp_path / "tmp.tif")
gdal.Translate(
tmp_filename,
"data/utmsmall.tif",
options="-co TILED=YES -co COMPRESS=LZW -outsize 1024 0",
)
ds = gdal.Open(tmp_filename)
expected_data = ds.ReadRaster()
ds = None

try:
filesize = os.stat(tmp_filename).st_size
handler = webserver.SequentialHandler()
handler.add("HEAD", "/test.tif", 200, {"Content-Length": "%d" % filesize})

def method(request):
# sys.stderr.write('%s\n' % str(request.headers))

if request.headers["Range"].startswith("bytes="):
rng = request.headers["Range"][len("bytes=") :]
assert len(rng.split("-")) == 2
start = int(rng.split("-")[0])
end = int(rng.split("-")[1])

request.protocol_version = "HTTP/1.1"
request.send_response(206)
request.send_header("Content-type", "application/octet-stream")
request.send_header(
"Content-Range", "bytes %d-%d/%d" % (start, end, filesize)
)
request.send_header("Content-Length", end - start + 1)
request.send_header("Connection", "close")
request.end_headers()
with open(tmp_filename, "rb") as f:
f.seek(start, 0)
request.wfile.write(f.read(end - start + 1))

for i in range(3):
handler.add("GET", "/test.tif", custom_method=method)

with webserver.install_http_handler(handler):
with gdaltest.config_options(
{
"GDAL_NUM_THREADS": "2",
"CPL_VSIL_CURL_ALLOWED_EXTENSIONS": ".tif",
"GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR",
"CPL_VSIL_CURL_ADVISE_READ_TOTAL_BYTES_LIMIT": str(
2 * filesize // 3
),
}
):
ds = gdal.Open("/vsicurl/http://127.0.0.1:%d/test.tif" % webserver_port)
assert ds is not None, "could not open dataset"

got_data = ds.ReadRaster()
assert got_data == expected_data

finally:
webserver.server_stop(webserver_process, webserver_port)

gdal.VSICurlClearCache()


###############################################################################
# Check that GetMetadataDomainList() works properly

Expand Down
73 changes: 62 additions & 11 deletions frmts/gtiff/gtiffdataset_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ CPLErr GTiffDataset::ReadCompressedData(const char *pszFormat, int nXOff,

struct GTiffDecompressContext
{
std::mutex oMutex{};
// The mutex must be recursive because ThreadDecompressionFuncErrorHandler()
// which acquires the mutex can be called from a section where the mutex is
// already acquired.
std::recursive_mutex oMutex{};
bool bSuccess = true;

std::vector<CPLErrorHandlerAccumulatorStruct> aoErrors{};
Expand Down Expand Up @@ -416,7 +419,7 @@ static void CPL_STDCALL ThreadDecompressionFuncErrorHandler(
{
GTiffDecompressContext *psContext =
static_cast<GTiffDecompressContext *>(CPLGetErrorHandlerUserData());
std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
psContext->aoErrors.emplace_back(eErr, eErrorNum, pszMsg);
}

Expand Down Expand Up @@ -495,7 +498,7 @@ static void CPL_STDCALL ThreadDecompressionFuncErrorHandler(
if (psJob->nSize == 0)
{
{
std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
if (!psContext->bSuccess)
return;
}
Expand Down Expand Up @@ -616,7 +619,7 @@ static void CPL_STDCALL ThreadDecompressionFuncErrorHandler(
if (psContext->bHasPRead)
{
{
std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
if (!psContext->bSuccess)
return;

Expand All @@ -634,7 +637,7 @@ static void CPL_STDCALL ThreadDecompressionFuncErrorHandler(
{
if (!AllocInputBuffer())
{
std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
psContext->bSuccess = false;
return;
}
Expand All @@ -647,15 +650,15 @@ static void CPL_STDCALL ThreadDecompressionFuncErrorHandler(
static_cast<GUIntBig>(psJob->nSize),
static_cast<GUIntBig>(psJob->nOffset));

std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
psContext->bSuccess = false;
return;
}
}
}
else
{
std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
if (!psContext->bSuccess)
return;

Expand Down Expand Up @@ -808,7 +811,7 @@ static void CPL_STDCALL ThreadDecompressionFuncErrorHandler(

if (!bRet)
{
std::lock_guard<std::mutex> oLock(psContext->oMutex);
std::lock_guard<std::recursive_mutex> oLock(psContext->oMutex);
psContext->bSuccess = false;
return;
}
Expand Down Expand Up @@ -1271,6 +1274,9 @@ CPLErr GTiffDataset::MultiThreadedRead(int nXOff, int nYOff, int nXSize,
std::vector<size_t> anSizes(nBlocks);
int iJob = 0;
int nAdviseReadRanges = 0;
const size_t nAdviseReadTotalBytesLimit =
sContext.poHandle->GetAdviseReadTotalBytesLimit();
size_t nAdviseReadAccBytes = 0;
for (int y = 0; y < nYBlocks; ++y)
{
for (int x = 0; x < nXBlocks; ++x)
Expand Down Expand Up @@ -1298,7 +1304,8 @@ CPLErr GTiffDataset::MultiThreadedRead(int nXOff, int nYOff, int nXSize,
// false since we could have concurrent uses of the handle,
// when when reading the TIFF TileOffsets / TileByteCounts
// array
std::lock_guard<std::mutex> oLock(sContext.oMutex);
std::lock_guard<std::recursive_mutex> oLock(
sContext.oMutex);

IsBlockAvailable(nBlockId, &asJobs[iJob].nOffset,
&asJobs[iJob].nSize);
Expand All @@ -1314,7 +1321,8 @@ CPLErr GTiffDataset::MultiThreadedRead(int nXOff, int nYOff, int nXSize,
{
if (nFileSize == 0)
{
std::lock_guard<std::mutex> oLock(sContext.oMutex);
std::lock_guard<std::recursive_mutex> oLock(
sContext.oMutex);
sContext.poHandle->Seek(0, SEEK_END);
nFileSize = sContext.poHandle->Tell();
}
Expand All @@ -1326,7 +1334,8 @@ CPLErr GTiffDataset::MultiThreadedRead(int nXOff, int nYOff, int nXSize,
static_cast<GUIntBig>(asJobs[iJob].nSize),
static_cast<GUIntBig>(asJobs[iJob].nOffset));

std::lock_guard<std::mutex> oLock(sContext.oMutex);
std::lock_guard<std::recursive_mutex> oLock(
sContext.oMutex);
sContext.bSuccess = false;
break;
}
Expand Down Expand Up @@ -1377,6 +1386,48 @@ CPLErr GTiffDataset::MultiThreadedRead(int nXOff, int nYOff, int nXSize,
static_cast<size_t>(std::min<vsi_l_offset>(
std::numeric_limits<size_t>::max(),
asJobs[iJob].nSize));

// If the total number of bytes we must read excess the
// capacity of AdviseRead(), then split the RasterIO()
// request in 2 halves.
if (nAdviseReadTotalBytesLimit > 0 &&
anSizes[nAdviseReadRanges] <
nAdviseReadTotalBytesLimit &&
anSizes[nAdviseReadRanges] >
nAdviseReadTotalBytesLimit - nAdviseReadAccBytes &&
nYBlocks >= 2)
{
const int nYOff2 =
(nBlockYStart + nYBlocks / 2) * m_nBlockYSize;
CPLDebugOnly("GTiff",
"Splitting request (%d,%d,%dx%d) into "
"(%d,%d,%dx%d) and (%d,%d,%dx%d)",
nXOff, nYOff, nXSize, nYSize, nXOff, nYOff,
nXSize, nYOff2 - nYOff, nXOff, nYOff2,
nXSize, nYOff + nYSize - nYOff2);

asJobs.clear();
anOffsets.clear();
anSizes.clear();
poQueue.reset();

CPLErr eErr = MultiThreadedRead(
nXOff, nYOff, nXSize, nYOff2 - nYOff, pData,
eBufType, nBandCount, panBandMap, nPixelSpace,
nLineSpace, nBandSpace);
if (eErr == CE_None)
{
eErr = MultiThreadedRead(
nXOff, nYOff2, nXSize, nYOff + nYSize - nYOff2,
static_cast<GByte *>(pData) +
(nYOff2 - nYOff) * nLineSpace,
eBufType, nBandCount, panBandMap, nPixelSpace,
nLineSpace, nBandSpace);
}
return eErr;
}
nAdviseReadAccBytes += anSizes[nAdviseReadRanges];

++nAdviseReadRanges;
}

Expand Down
17 changes: 17 additions & 0 deletions port/cpl_vsi_virtual.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@ struct CPL_DLL VSIVirtualHandle
{
}

/** Return the total maximum number of bytes that AdviseRead() can handle
* at once.
*
* Some AdviseRead() implementations may give up if the sum of the values
* in the panSizes[] array provided to AdviseRead() exceeds a limit.
*
* Callers might use that threshold to optimize the efficiency of
* AdviseRead().
*
* A returned value of 0 indicates a unknown limit.
* @since GDAL 3.9
*/
virtual size_t GetAdviseReadTotalBytesLimit() const
{
return 0;
}

virtual size_t Write(const void *pBuffer, size_t nSize, size_t nCount) = 0;

int Printf(CPL_FORMAT_STRING(const char *pszFormat), ...)
Expand Down
26 changes: 23 additions & 3 deletions port/cpl_vsil_curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@

#include <algorithm>
#include <array>
#include <set>
#include <limits>
#include <map>
#include <memory>
#include <set>

#include "cpl_aws.h"
#include "cpl_json.h"
Expand Down Expand Up @@ -3153,6 +3154,21 @@ size_t VSICurlHandle::PRead(void *pBuffer, size_t nSize,
return nRet;
}

/************************************************************************/
/* GetAdviseReadTotalBytesLimit() */
/************************************************************************/

size_t VSICurlHandle::GetAdviseReadTotalBytesLimit() const
{
return static_cast<size_t>(std::min<unsigned long long>(
std::numeric_limits<size_t>::max(),
// 100 MB
std::strtoull(
CPLGetConfigOption("CPL_VSIL_CURL_ADVISE_READ_TOTAL_BYTES_LIMIT",
"104857600"),
nullptr, 10)));
}

/************************************************************************/
/* AdviseRead() */
/************************************************************************/
Expand All @@ -3171,9 +3187,10 @@ void VSICurlHandle::AdviseRead(int nRanges, const vsi_l_offset *panOffsets,

// Give up if we need to allocate too much memory
vsi_l_offset nMaxSize = 0;
const size_t nLimit = GetAdviseReadTotalBytesLimit();
for (int i = 0; i < nRanges; ++i)
{
if (panSizes[i] > 100 * 1024 * 1024 - nMaxSize)
if (panSizes[i] > nLimit - nMaxSize)
{
CPLDebug(poFS->GetDebugKey(),
"Trying to request too many bytes in AdviseRead()");
Expand Down Expand Up @@ -3994,7 +4011,10 @@ const char *VSICurlFilesystemHandlerBase::GetActualURL(const char *pszFilename)
"default='16384000'/>" \
" <Option name='CPL_VSIL_CURL_IGNORE_GLACIER_STORAGE' type='boolean' " \
"description='Whether to skip files with Glacier storage class in " \
"directory listing.' default='YES'/>"
"directory listing.' default='YES'/>" \
" <Option name='CPL_VSIL_CURL_ADVISE_READ_TOTAL_BYTES_LIMIT' " \
"type='integer' description='Maximum number of bytes AdviseRead() is " \
"allowed to fetch at once' default='104857600'/>"

const char *VSICurlFilesystemHandlerBase::GetOptionsStatic()
{
Expand Down
2 changes: 2 additions & 0 deletions port/cpl_vsil_curl_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ class VSICurlHandle : public VSIVirtualHandle
void AdviseRead(int nRanges, const vsi_l_offset *panOffsets,
const size_t *panSizes) override;

size_t GetAdviseReadTotalBytesLimit() const override;

bool IsKnownFileSize() const
{
return oFileProp.bHasComputedFileSize;
Expand Down
Loading