Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Inquiry on Java Multithreaded Concurrent Queries via JDBC #142

Open
yss-mengyong opened this issue Feb 20, 2025 · 0 comments
Open

Comments

@yss-mengyong
Copy link

yss-mengyong commented Feb 20, 2025

I'm conducting performance tests on DuckDB's JDBC driver (version 1.1.3) with multithreaded concurrent queries, and observed some counterintuitive results that I'd like to understand better.

Test Environment:

Hardware: 64-core CPU / 256GB RAM (confirmed no resource contention)

Storage Configurations:

Local disk

Local MinIO instance

Remote MinIO cluster

Dataset: TPCH SF=10 generated Parquet files

Queries: All 22 TPCH benchmark queries

Test Variables:

SET threads=X (tested 4128 threads)

SET memory_limit='XGB' (tested 2GB-32GB)

Both connection strategies exhibit similar scaling patterns:

Single connection with heavy multithreading

Connection-per-thread pool (1~32 connections)

Java concurrent requests (Java thread pool): 1~64

Specific Questions:

Is this linear latency growth expected behavior for concurrent analytical workloads?

Could there be implicit contention points in DuckDB's concurrency model?

Global lock mechanisms?

Metadata synchronization overhead?

Resource allocation strategies?

Would you recommend specific configuration adjustments for high-concurrency JDBC workloads?

The test code (init.sql is create view from 'parquet files', query.sql is tpch-query ):

package com.yss.common.duckula;

import org.duckdb.DuckDBConnection;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.sql.*;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public class App {
  private static class TpchRunner implements Supplier<Long> {
    private final String name;
    private final Connection connection;
    private final String query;

    public TpchRunner(String name, Connection connection, String query) {
      this.name = name;
      this.connection = connection;
      this.query = query;
    }

    @Override
    public Long get() {
      System.out.println(this.name + " is running ...");
      long time = -1;
      try (Statement statement = this.connection.createStatement()) {
        try {
          String sql = "copy (" + this.query + ") to '" + PARQUET_FILE_DIR + "/" + UUID.randomUUID() + ".parquet' (FORMAT PARQUET, COMPRESSION ZSTD);";
          long begin = System.currentTimeMillis();
          statement.execute(sql);
          long end = System.currentTimeMillis();
          time = end - begin;
        } catch (Throwable e) {
          throw e;
        }
        return time;
      } catch (SQLException e) {
        throw new RuntimeException(this.name + " is error.", e);
      } finally {
        System.out.println(this.name + " is finish");
        try {
          this.connection.close();
        } catch (Exception e) {
          throw new RuntimeException(this.name + " is error.", e);
        }
      }
    }
  }

  private static List<String> loadSQLFromFile(final String fileName) throws IOException {
    return new ArrayList<>(Files.readAllLines(new File(fileName).toPath()));
  }

  private static DuckDBConnection createConnectionAndInitDB(int thread, int memory, List<String> initSQLs) throws SQLException {
    DuckDBConnection connection = (DuckDBConnection) DriverManager.getConnection("jdbc:duckdb:");
    try (Statement statement = connection.createStatement()) {
      for (String sql : initSQLs) {
        statement.execute(sql);
      }
      statement.execute("set max_memory='" + memory + "GB'");
      statement.execute("set threads=" + thread);
    }
    return connection;
  }

  private static final class MAM {
    public final double min;
    public final double avg;
    public final double max;

    private MAM(double min, double avg, double max) {
      this.min = min;
      this.avg = avg;
      this.max = max;
    }

    public static MAM of(double min, double avg, double max) {
      return new MAM(min, avg, max);
    }
  }

  private static MAM printTpchReuslt(List<Long> times) {
    int n = times.size();
    long sum = 0, min = Long.MAX_VALUE, max = Long.MIN_VALUE;
    int count = 0;
    DecimalFormat formatter = new DecimalFormat("0.000");
    System.out.print("workload_" + n + " detail times is:");
    for (int i = 0; i < n; i++) {
      long t = times.get(i);
      if (t < 0) {
        System.out.print(" error" + ",");
      } else {
        count++;
        System.out.print(" " + formatter.format(times.get(i) / 1000D) + ",");
        sum += t;
        if (t < min) {
          min = t;
        }
        if (t > max) {
          max = t;
        }
      }
    }
    final int err = n - count;
    System.out.println(" error count is: " + err);
    ERROR_TASK_COUNT.addAndGet(err);
    if (count == 0) {
      return MAM.of(-1, -1, -1);
    } else {
      return MAM.of(min / 1000D, sum / (count * 1000D), max / 1000D);
    }
  }

  private static MAM run(final int workload, final List<DuckDBConnection> connections, final String SQL) throws SQLException, ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newFixedThreadPool(workload);
    try {
      List<CompletableFuture<Long>> futures = new ArrayList<>(workload);
      for (int i = 1; i <= workload; i++) {
        final int n = connections.size();
        DuckDBConnection connection = (DuckDBConnection) connections.get(i % n).duplicate();
        futures.add(CompletableFuture.supplyAsync(new TpchRunner("query_task [" + i + " of " + workload + "]", connection, SQL), executor).handle((time, e) -> {
          if (e != null) {
            e.printStackTrace(System.out);
            return -1L;
          }
          return time;
        }));
      }
      List<Long> result = Collections.synchronizedList(new ArrayList<>(workload));
      for (CompletableFuture<Long> future : futures) {
        result.add(future.get());
      }
      return printTpchReuslt(result);
    } finally {
      executor.shutdown();
    }
  }

  private static final String PARQUET_FILE_DIR = "./parquet-" + UUID.randomUUID().toString().toLowerCase();
  private static final AtomicInteger ERROR_TASK_COUNT = new AtomicInteger(0);

  private static void deleteParquetDir() throws IOException {
    Path directory = Paths.get(PARQUET_FILE_DIR);
    
    Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
      @Override
      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
        Files.delete(file); 
        return FileVisitResult.CONTINUE;
      }

      @Override
      public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
        Files.delete(dir); 
        return FileVisitResult.CONTINUE;
      }
    });
  }

  public static void main(String[] args) throws SQLException, ExecutionException, InterruptedException, IOException {
    int connection_pool_size = 1;
    int max_workload = 1;
    int per_connection_memory = 1;
    int per_connection_threads = 1;
    String sql_dir = null;
    for (String arg : args) {
      String[] kv = arg.split("=");
      if (kv.length != 2) {
        System.out.println("arg error: " + arg);
        return;
      }
      switch (kv[0]) {
        case "p":
        case "connection_pool_size":
          connection_pool_size = Integer.parseInt(kv[1]);
          break;
        case "w":
        case "max_workload":
          max_workload = Integer.parseInt(kv[1]);
          break;
        case "m":
        case "per_connection_memory":
          per_connection_memory = Integer.parseInt(kv[1]);
          break;
        case "t":
        case "per_connection_threads":
          per_connection_threads = Integer.parseInt(kv[1]);
          break;
        case "s":
        case "sql":
        case "sql_dir":
          sql_dir = kv[1];
          break;
        default:
          System.out.println("arg error: " + arg);
          return;
      }
    }
    System.out.println("max_workload=" + max_workload);
    System.out.println("connection_pool_size=" + connection_pool_size);
    System.out.println("per_connection_memory=" + per_connection_memory);
    System.out.println("per_connection_threads=" + per_connection_threads);
    System.out.println("sql_dir=" + sql_dir);
    if(sql_dir==null || sql_dir.isEmpty()){
      System.out.println("arg: sql_dir is null");
      return;
    }
    if (!new File(sql_dir + "/init.sql").exists() || !new File(sql_dir + "/query.sql").exists()) {
      System.out.println(sql_dir + "/init.sql" + " or " + sql_dir + "/query.sql" + " not exists");
      return;
    }
    if (!new File(PARQUET_FILE_DIR).mkdir()) {
      System.out.println("error to mkdir " + PARQUET_FILE_DIR);
      return;
    }
    final List<String> initSQLs = loadSQLFromFile(sql_dir + "/init.sql");
    final String querySQL = String.join("\n", loadSQLFromFile(sql_dir + "/query.sql"));
    System.out.println("initSQLs=");
    System.out.println(String.join("\n", initSQLs));
    System.out.println("querySQL=");
    System.out.println(querySQL);
    final List<DuckDBConnection> connections = new ArrayList<>(connection_pool_size);
    for (int i = 0; i < connection_pool_size; i++) {
      connections.add(createConnectionAndInitDB(per_connection_threads, per_connection_memory, initSQLs));
    }
    List<MAM> mams = new ArrayList<>(max_workload);
    for (int i = 1; i <= max_workload; i++) {
      MAM mam = run(i, connections, querySQL);
      mams.add(mam);
    }

    for (DuckDBConnection c : connections) {
      c.close();
    }
    System.out.println("==========ALL TASKS COMPLETED==========");
    System.out.println("ARGS IS [W=" + max_workload + ", P=" + connection_pool_size + ", T=" + per_connection_threads + ", M=" + per_connection_memory + "]");
    System.out.println("WROKLOAD_SIZE\tMIN\tAVG\tMAX");
    DecimalFormat formatter1 = new DecimalFormat("0000");
    DecimalFormat formatter2 = new DecimalFormat("0.000");
    for (int i = 0; i < max_workload; i++) {
      System.out.println("WORKLOAD_" + formatter1.format(i + 1)
          + "\t" + formatter2.format(mams.get(i).min)
          + "\t" + formatter2.format(mams.get(i).avg)
          + "\t" + formatter2.format(mams.get(i).max)
      );
    }
    System.out.println("=======================================");
    System.out.println("TOTAL ERROR TASK COUNT IS " + ERROR_TASK_COUNT.get());
    deleteParquetDir();
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant