Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Commit

Permalink
Update to Trino-411
Browse files Browse the repository at this point in the history
  • Loading branch information
DimitrisStaratzis committed Apr 3, 2023
1 parent 058a1fe commit 9ab4d56
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 51 deletions.
19 changes: 10 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>trino-root</artifactId>
<groupId>io.trino</groupId>
<version>405</version>
<version>411</version>
</parent>

<artifactId>trino-tiledb</artifactId>
Expand Down Expand Up @@ -151,7 +151,7 @@
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
<version>405</version>
<version>411</version>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -184,7 +184,12 @@
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-annotations</artifactId>
<scope>test</scope>
</dependency>

Expand All @@ -197,21 +202,17 @@
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.3.0</version>
<version>7.7.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
<version>2.6.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M6</version>
<configuration>
<useSystemClassLoader>false</useSystemClassLoader>
</configuration>
Expand Down
24 changes: 15 additions & 9 deletions src/main/java/io/trino/plugin/tiledb/TileDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public HardwareAbstractionLayer getHardwareAbstractionLayer()

/**
* Get plugin configuration
*
* @return TileDB plugin configuration
*/
public TileDBConfig getConfig()
Expand Down Expand Up @@ -146,7 +147,8 @@ public TileDBTable addTableFromURI(Context localCtx, String schema, URI arrayUri

/**
* Return list of schema's. Since we don't have true discovery this is a static map which we return the keys
* @return List of tiledb schema names (currently just "tiledb")
*
* @return List of tiledb schema names (currently just "TileDB")
*/
public Set<String> getSchemaNames()
{
Expand All @@ -155,7 +157,8 @@ public Set<String> getSchemaNames()

/**
* Return all "tables" in a schema
* @param schema
*
* @param schema the schema
* @return Set of table names
*/
public Set<String> getTableNames(String schema)
Expand All @@ -170,10 +173,11 @@ public Set<String> getTableNames(String schema)

/**
* Fetches a table object given a schema and a table name
* @param schema
* @param tableName
* @param encryptionType
* @param encryptionKey
*
* @param schema the schema
* @param tableName the table name
* @param encryptionType the encryption type
* @param encryptionKey the encryption key
* @return table object
*/
public TileDBTable getTable(ConnectorSession session, String schema, String tableName,
Expand All @@ -200,8 +204,9 @@ public TileDBTable getTable(ConnectorSession session, String schema, String tabl

/**
* Fetches a table object given a schema and a table name
* @param schema
* @param tableName
*
* @param schema the schema
* @param tableName the table name
* @return table object
*/
public TileDBTable getTable(ConnectorSession session, String schema, String tableName)
Expand All @@ -222,7 +227,8 @@ public Context getCtx()

/**
* Rollback a create table statement, this just drops the array
* @param handle tiledb table handler
*
* @param handle TileDB table handler
*/
public void rollbackCreateTable(TileDBOutputTableHandle handle)
{
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/io/trino/plugin/tiledb/TileDBMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
}

/**
* Create table creates a table without any data
* Create table creates a table without any data
*
* @param session connector session
* @param tableMetadata metadata for new table
* @param ignoreExisting ignore existing tables? Currently not supported
Expand All @@ -239,6 +240,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe

/**
* beginCreateTable creates a table with data
*
* @param session connector sessions
* @param tableMetadata metadata for table
* @param layout layout of new table
Expand All @@ -254,6 +256,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

/**
* Finish/commit creating a table with data
*
* @param session connector session
* @param tableHandle table handle
* @param fragments any fragements (ignored)
Expand All @@ -270,8 +273,9 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
}

/**
* Set a rollback for a method to run some function at the rollback of a presto trasnaction
* @param action
* Set a rollback for a method to run some function at the rollback of a presto transaction
*
* @param action the action
*/
private void setRollback(Runnable action)
{
Expand All @@ -295,7 +299,8 @@ public void rollback()
}

/**
* Allow dropping of a table/tiledb array
* Allow dropping of a table/TileDB array
*
* @param session connector session
* @param tableHandle handle of table to be dropped
*/
Expand Down Expand Up @@ -352,6 +357,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,

/**
* Create an array given a presto table layout/schema
*
* @param tableMetadata metadata about table
* @return Output table handler
*/
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/trino/plugin/tiledb/TileDBModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ else if (type.equals(TIMESTAMP_MILLIS)) {

/**
* This is a helper function to create an ArrayList for a given type
*
* @param type datatype to create list of
* @param isVariableLength if its variable length we will create a list of arrays
* @return List
Expand Down
25 changes: 16 additions & 9 deletions src/main/java/io/trino/plugin/tiledb/TileDBPageSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ public class TileDBPageSink

/**
* Initialize an instance of page sink preparing for inserts
*
* @param handle table handler
* @param tileDBClient client (for context)
* @param session
* @param session the session
*/
public TileDBPageSink(TileDBOutputTableHandle handle, TileDBClient tileDBClient, ConnectorSession session)
{
Expand Down Expand Up @@ -144,8 +145,9 @@ public TileDBPageSink(TileDBOutputTableHandle handle, TileDBClient tileDBClient,

/**
* Reset query, as of TileDB 2.0 a query object should not be reused for writing unless in global order
* @param buffers
* @throws TileDBError
*
* @param buffers the buffers
* @throws TileDBError TileDBError
*/
private void resetQuery(Map<String, Pair<NativeArray, NativeArray>> buffers) throws TileDBError
{
Expand All @@ -159,8 +161,9 @@ private void resetQuery(Map<String, Pair<NativeArray, NativeArray>> buffers) thr

/**
* Reset query buffers by closing and re-allocating
* @param buffers
* @throws TileDBError
*
* @param buffers the buffers
* @throws TileDBError TileDBError
*/
private void resetBuffers(Map<String, Pair<NativeArray, NativeArray>> buffers) throws TileDBError
{
Expand Down Expand Up @@ -212,6 +215,7 @@ private void resetBuffers(Map<String, Pair<NativeArray, NativeArray>> buffers) t

/**
* appendPage adds the rows
*
* @param page rows/columns to insert
* @return Future not currently used, but could be for async writing. It does not support nullable attributes for now.
*/
Expand Down Expand Up @@ -285,7 +289,8 @@ public CompletableFuture<?> appendPage(Page page)

/**
* Initialize the map holding the effective buffer sizes
* @param bufferEffectiveSizes
*
* @param bufferEffectiveSizes the buffer sizes
*/
private void initBufferEffectiveSizes(Map<String, Pair<Optional<Long>, Long>> bufferEffectiveSizes)
{
Expand All @@ -300,11 +305,12 @@ private void initBufferEffectiveSizes(Map<String, Pair<Optional<Long>, Long>> bu
}

/**
* Submit a query to tiledb for writing
* Submit a query to TileDB for writing
*
* @param buffers Map of buffers to write
* @param bufferEffectiveSizes Map of effective buffer sizes
* @return QueryStatus
* @throws TileDBError
* @throws TileDBError TileDBError
*/
private QueryStatus submitQuery(Map<String, Pair<NativeArray, NativeArray>> buffers, Map<String, Pair<Optional<Long>, Long>> bufferEffectiveSizes) throws TileDBError
{
Expand Down Expand Up @@ -386,13 +392,14 @@ private QueryStatus submitQuery(Map<String, Pair<NativeArray, NativeArray>> buff

/**
* Append a column to appropriate buffer
*
* @param page Page from presto containing data
* @param position current row number
* @param channel column index
* @param columnBuffer NativeBuffer for column data
* @param bufferPosition The current position of the buffer (where a write should start)
* @return new effective buffer size after write
* @throws TileDBError
* @throws TileDBError TileDBError
*/
private long appendColumn(Page page, int position, int channel, NativeArray columnBuffer, int bufferPosition) throws TileDBError
{
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/trino/plugin/tiledb/TileDBRecordCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ private int getClampedBufferSize(int numElements, int elementBytes)
* Build the ranges for a query based on the split
*
* @param split the split to build the subArray based off of
* @return
*/
private void setRanges(TileDBSplit split) throws TileDBError
{
Expand Down Expand Up @@ -542,12 +541,13 @@ else if (attBounds.getSecond() == null) {

/**
* Returns the Query condition for the given bound.
*
* @param attr The attribute
* @param isString True if the attribute is String
* @param bound The bound
* @param op TIleDB operator
* @return The query Condition
* @throws TileDBError
* @throws TileDBError TileDBError
*/
private QueryCondition conditionForBound(Attribute attr, boolean isString, Object bound, tiledb_query_condition_op_t op) throws TileDBError
{
Expand Down Expand Up @@ -1021,6 +1021,7 @@ else if (currentNumRecords > 0) {

/**
* Function to calculate the bytes read based on the buffer sizes
*
* @return byte in current buffers
*/
private long calculateNativeArrayByteSizes()
Expand Down Expand Up @@ -1053,7 +1054,8 @@ private long calculateNativeArrayByteSizes()

/**
* Check if we can double the buffer, or if there is not enough memory space
* @return
*
* @return true if buffers can be reallocated
*/
private boolean canReallocBuffers()
{
Expand Down Expand Up @@ -1095,7 +1097,8 @@ private void calculateAdditionalBytesRead()
/**
* Copy buffers from NativeArray to java arrays for faster access times
* Making the JNI calls in NativeArray is too slow
* @throws TileDBError
*
* @throws TileDBError TileDBError
*/
private void copyQueryBuffers() throws TileDBError
{
Expand Down Expand Up @@ -1366,6 +1369,7 @@ private Pair<Long, Long> startTimer()

/**
* Save function timing to the hashmap of times
*
* @param functionName Name of function being recorded
* @param timer Times from record
*/
Expand Down
21 changes: 12 additions & 9 deletions src/main/java/io/trino/plugin/tiledb/TileDBSplitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co

/**
* Returns the tuple domain and the column handles
*
* @param session The connector session
* @param tableHandle The table handle
* @param constraint The constraint
Expand Down Expand Up @@ -209,11 +210,11 @@ else if (type instanceof VarcharType) {
/**
* Split tuple domain if there are multiple ranges
*
* @param tupleDomain
* @param splitOnlyPredicates
* @param nonEmptyDomains
* @param dimensionCount
* @return
* @param tupleDomain the tuple domain
* @param splitOnlyPredicates the split predicates
* @param nonEmptyDomains the non-empty domains
* @param dimensionCount the dimension count
* @return tuple domains after split
*/
private List<TupleDomain<ColumnHandle>> splitTupleDomainOnRanges(TupleDomain<ColumnHandle> tupleDomain, int splits, boolean splitOnlyPredicates, HashMap<String, Pair> nonEmptyDomains, int dimensionCount)
{
Expand Down Expand Up @@ -282,10 +283,11 @@ private List<TupleDomain<ColumnHandle>> splitTupleDomainOnRanges(TupleDomain<Col

/**
* Function to generate each tuple domain combination based on a list of domains for each column handle
* @param lists
* @param result
* @param depth
* @param current
*
* @param lists the lists of domains
* @param result the result
* @param depth the depth
* @param current the current domain
*/
private void generateCombinationTupleDomains(List<Pair<ColumnHandle, List<Domain>>> lists, List<Map<ColumnHandle, Domain>> result, int depth, Map<ColumnHandle, Domain> current)
{
Expand All @@ -307,6 +309,7 @@ private void generateCombinationTupleDomains(List<Pair<ColumnHandle, List<Domain

/**
* Split a range into N buckets. Currently only ranges of type long can be split with this naive algorithm.
*
* @param range to split
* @param buckets number
* @param nonEmptyDomain The none empty domain, used if there the range is one sides, i.e. >= X we change to >= X AND less than MaxNonEmpty
Expand Down
Loading

0 comments on commit 9ab4d56

Please sign in to comment.