You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm conducting performance tests on DuckDB's JDBC driver (version 1.1.3) with multithreaded concurrent queries, and observed some counterintuitive results that I'd like to understand better.
Test Environment:
Hardware: 64-core CPU / 256GB RAM (confirmed no resource contention)
Storage Configurations:
Local disk
Local MinIO instance
Remote MinIO cluster
Dataset: TPCH SF=10 generated Parquet files
Queries: All 22 TPCH benchmark queries
Test Variables:
SET threads=X (tested 4128 threads)
SET memory_limit='XGB' (tested 2GB-32GB)
Both connection strategies exhibit similar scaling patterns:
Single connection with heavy multithreading
Connection-per-thread pool (1~32 connections)
Java concurrent requests (Java thread pool): 1~64
Specific Questions:
Is this linear latency growth expected behavior for concurrent analytical workloads?
Could there be implicit contention points in DuckDB's concurrency model?
Global lock mechanisms?
Metadata synchronization overhead?
Resource allocation strategies?
Would you recommend specific configuration adjustments for high-concurrency JDBC workloads?
The test code (init.sql is create view from 'parquet files', query.sql is tpch-query ):
package com.yss.common.duckula;
import org.duckdb.DuckDBConnection;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.sql.*;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class App {
private static class TpchRunner implements Supplier<Long> {
private final String name;
private final Connection connection;
private final String query;
public TpchRunner(String name, Connection connection, String query) {
this.name = name;
this.connection = connection;
this.query = query;
}
@Override
public Long get() {
System.out.println(this.name + " is running ...");
long time = -1;
try (Statement statement = this.connection.createStatement()) {
try {
String sql = "copy (" + this.query + ") to '" + PARQUET_FILE_DIR + "/" + UUID.randomUUID() + ".parquet' (FORMAT PARQUET, COMPRESSION ZSTD);";
long begin = System.currentTimeMillis();
statement.execute(sql);
long end = System.currentTimeMillis();
time = end - begin;
} catch (Throwable e) {
throw e;
}
return time;
} catch (SQLException e) {
throw new RuntimeException(this.name + " is error.", e);
} finally {
System.out.println(this.name + " is finish");
try {
this.connection.close();
} catch (Exception e) {
throw new RuntimeException(this.name + " is error.", e);
}
}
}
}
private static List<String> loadSQLFromFile(final String fileName) throws IOException {
return new ArrayList<>(Files.readAllLines(new File(fileName).toPath()));
}
private static DuckDBConnection createConnectionAndInitDB(int thread, int memory, List<String> initSQLs) throws SQLException {
DuckDBConnection connection = (DuckDBConnection) DriverManager.getConnection("jdbc:duckdb:");
try (Statement statement = connection.createStatement()) {
for (String sql : initSQLs) {
statement.execute(sql);
}
statement.execute("set max_memory='" + memory + "GB'");
statement.execute("set threads=" + thread);
}
return connection;
}
private static final class MAM {
public final double min;
public final double avg;
public final double max;
private MAM(double min, double avg, double max) {
this.min = min;
this.avg = avg;
this.max = max;
}
public static MAM of(double min, double avg, double max) {
return new MAM(min, avg, max);
}
}
private static MAM printTpchReuslt(List<Long> times) {
int n = times.size();
long sum = 0, min = Long.MAX_VALUE, max = Long.MIN_VALUE;
int count = 0;
DecimalFormat formatter = new DecimalFormat("0.000");
System.out.print("workload_" + n + " detail times is:");
for (int i = 0; i < n; i++) {
long t = times.get(i);
if (t < 0) {
System.out.print(" error" + ",");
} else {
count++;
System.out.print(" " + formatter.format(times.get(i) / 1000D) + ",");
sum += t;
if (t < min) {
min = t;
}
if (t > max) {
max = t;
}
}
}
final int err = n - count;
System.out.println(" error count is: " + err);
ERROR_TASK_COUNT.addAndGet(err);
if (count == 0) {
return MAM.of(-1, -1, -1);
} else {
return MAM.of(min / 1000D, sum / (count * 1000D), max / 1000D);
}
}
private static MAM run(final int workload, final List<DuckDBConnection> connections, final String SQL) throws SQLException, ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(workload);
try {
List<CompletableFuture<Long>> futures = new ArrayList<>(workload);
for (int i = 1; i <= workload; i++) {
final int n = connections.size();
DuckDBConnection connection = (DuckDBConnection) connections.get(i % n).duplicate();
futures.add(CompletableFuture.supplyAsync(new TpchRunner("query_task [" + i + " of " + workload + "]", connection, SQL), executor).handle((time, e) -> {
if (e != null) {
e.printStackTrace(System.out);
return -1L;
}
return time;
}));
}
List<Long> result = Collections.synchronizedList(new ArrayList<>(workload));
for (CompletableFuture<Long> future : futures) {
result.add(future.get());
}
return printTpchReuslt(result);
} finally {
executor.shutdown();
}
}
private static final String PARQUET_FILE_DIR = "./parquet-" + UUID.randomUUID().toString().toLowerCase();
private static final AtomicInteger ERROR_TASK_COUNT = new AtomicInteger(0);
private static void deleteParquetDir() throws IOException {
Path directory = Paths.get(PARQUET_FILE_DIR);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
public static void main(String[] args) throws SQLException, ExecutionException, InterruptedException, IOException {
int connection_pool_size = 1;
int max_workload = 1;
int per_connection_memory = 1;
int per_connection_threads = 1;
String sql_dir = null;
for (String arg : args) {
String[] kv = arg.split("=");
if (kv.length != 2) {
System.out.println("arg error: " + arg);
return;
}
switch (kv[0]) {
case "p":
case "connection_pool_size":
connection_pool_size = Integer.parseInt(kv[1]);
break;
case "w":
case "max_workload":
max_workload = Integer.parseInt(kv[1]);
break;
case "m":
case "per_connection_memory":
per_connection_memory = Integer.parseInt(kv[1]);
break;
case "t":
case "per_connection_threads":
per_connection_threads = Integer.parseInt(kv[1]);
break;
case "s":
case "sql":
case "sql_dir":
sql_dir = kv[1];
break;
default:
System.out.println("arg error: " + arg);
return;
}
}
System.out.println("max_workload=" + max_workload);
System.out.println("connection_pool_size=" + connection_pool_size);
System.out.println("per_connection_memory=" + per_connection_memory);
System.out.println("per_connection_threads=" + per_connection_threads);
System.out.println("sql_dir=" + sql_dir);
if(sql_dir==null || sql_dir.isEmpty()){
System.out.println("arg: sql_dir is null");
return;
}
if (!new File(sql_dir + "/init.sql").exists() || !new File(sql_dir + "/query.sql").exists()) {
System.out.println(sql_dir + "/init.sql" + " or " + sql_dir + "/query.sql" + " not exists");
return;
}
if (!new File(PARQUET_FILE_DIR).mkdir()) {
System.out.println("error to mkdir " + PARQUET_FILE_DIR);
return;
}
final List<String> initSQLs = loadSQLFromFile(sql_dir + "/init.sql");
final String querySQL = String.join("\n", loadSQLFromFile(sql_dir + "/query.sql"));
System.out.println("initSQLs=");
System.out.println(String.join("\n", initSQLs));
System.out.println("querySQL=");
System.out.println(querySQL);
final List<DuckDBConnection> connections = new ArrayList<>(connection_pool_size);
for (int i = 0; i < connection_pool_size; i++) {
connections.add(createConnectionAndInitDB(per_connection_threads, per_connection_memory, initSQLs));
}
List<MAM> mams = new ArrayList<>(max_workload);
for (int i = 1; i <= max_workload; i++) {
MAM mam = run(i, connections, querySQL);
mams.add(mam);
}
for (DuckDBConnection c : connections) {
c.close();
}
System.out.println("==========ALL TASKS COMPLETED==========");
System.out.println("ARGS IS [W=" + max_workload + ", P=" + connection_pool_size + ", T=" + per_connection_threads + ", M=" + per_connection_memory + "]");
System.out.println("WROKLOAD_SIZE\tMIN\tAVG\tMAX");
DecimalFormat formatter1 = new DecimalFormat("0000");
DecimalFormat formatter2 = new DecimalFormat("0.000");
for (int i = 0; i < max_workload; i++) {
System.out.println("WORKLOAD_" + formatter1.format(i + 1)
+ "\t" + formatter2.format(mams.get(i).min)
+ "\t" + formatter2.format(mams.get(i).avg)
+ "\t" + formatter2.format(mams.get(i).max)
);
}
System.out.println("=======================================");
System.out.println("TOTAL ERROR TASK COUNT IS " + ERROR_TASK_COUNT.get());
deleteParquetDir();
}
}
The text was updated successfully, but these errors were encountered:
I'm conducting performance tests on DuckDB's JDBC driver (version 1.1.3) with multithreaded concurrent queries, and observed some counterintuitive results that I'd like to understand better.
Test Environment:
Hardware: 64-core CPU / 256GB RAM (confirmed no resource contention)
Storage Configurations:
Local disk
Local MinIO instance
Remote MinIO cluster
Dataset: TPCH SF=10 generated Parquet files
Queries: All 22 TPCH benchmark queries
Test Variables:
SET threads=X (tested 4128 threads)
SET memory_limit='XGB' (tested 2GB-32GB)
Both connection strategies exhibit similar scaling patterns:
Single connection with heavy multithreading
Connection-per-thread pool (1~32 connections)
Java concurrent requests (Java thread pool): 1~64
Specific Questions:
Is this linear latency growth expected behavior for concurrent analytical workloads?
Could there be implicit contention points in DuckDB's concurrency model?
Global lock mechanisms?
Metadata synchronization overhead?
Resource allocation strategies?
Would you recommend specific configuration adjustments for high-concurrency JDBC workloads?
The test code (init.sql is create view from 'parquet files', query.sql is tpch-query ):
The text was updated successfully, but these errors were encountered: