-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GPU implementation of the Kudo shuffle format. #2800
base: branch-25.04
Are you sure you want to change the base?
Conversation
…a basis for shuffle_split and shuffle_assemble.
…fixed width types.
…assemble) to make it more manageable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am no where near done with the review. I am planning on writing some JNI APIs for this and put together some kudo interoperability tests. Would you prefer to have them be a separate PR based on this one, or should I make a PR to your branch so they can be in one place?
Signed-off-by: Dave Baranec <[email protected]>
Signed-off-by: Dave Baranec <[email protected]>
…e some padding values and convert 64 bit header values into 32 bit. Make sure to swap header fields to big endian. Add clarifying comments on why shuffle_split_col_data is storing values as union (kud0 size purposes).
@nvdbaranec your other PR was merged in. Could you upmerge so I can better see the changes. |
…gs inside of a struct. The 'branching' case similar to what happens in cudf::row_bit_count.
Upmerged. It...kind of helps I guess. There's a few big obvious blocks of code that got removed. Lots of little changes spread throughout though. |
… in the source data isn't necessarily just (num_rows + 7) / 8. We have to account for leading rows/bits in the byte we are copying from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits and some comments/questions for now. Still working through this.
namespace spark_rapids_jni { | ||
|
||
// TODO: this is duplicated from cudf because the cudf function is not marked as constexpr, so it | ||
// cannot be called on the gpu there is an issue filed against cudf to make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you link that issue here?
// - each partition is prepended with a metadata buffer | ||
// the metadata is of the format: | ||
// - 4 byte row count for the partition | ||
// - for each string column in huffle_split_metadata.col_info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// - for each string column in huffle_split_metadata.col_info | |
// - for each string column in shuffle_split_metadata.col_info |
* | ||
* The result is returned as a blob of bytes representing the individual partitions resulting from | ||
* the splits, and a set of offsets indicating the beginning of each resulting partition in the | ||
* result. The function also returns a shuffle_split_metadat struct which contains additional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* result. The function also returns a shuffle_split_metadat struct which contains additional | |
* result. The function also returns a shuffle_split_metadata struct which contains additional |
* @param partitions A buffer of anonymous bytes representing multiple partitions of data to be | ||
* merged | ||
* @param partition_offsets Offsets into the partitions buffer indicating where each individual | ||
* partition begins. The number of partitions is partition_offsets.size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not number of partitions + 1?
struct partition_header { | ||
uint32_t magic_number; | ||
uint32_t row_index; // row index in the source table that this partition started at | ||
uint32_t num_rows; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially not supporting large tables?
|
||
// padding values for each validity type, as applied at the end of that data type | ||
// in each partition. so for example all of the grouped-together validity buffers for | ||
// a given partition will have a final 4 byte pad applied before the offset buffers begin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't 4 bytes of padding as much as it is padded to align on a 4 byte boundary, right? I wonder if there is better wording we could use here.
*/ | ||
constexpr size_t compute_per_partition_metadata_size(size_t total_columns) | ||
{ | ||
auto const has_validity_length = (total_columns + 7) / 8; // has-validity bit per column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you should prefer bitmask_allocation_size_bytes()
here, but it isn't constexpr
. Maybe use div_rounding_up_safe
directly?
"mauve", | ||
"ultraviolet"}}; | ||
cudf::test::strings_column_wrapper col2{ | ||
{"left", "up", "right", "down", "", "space", "", "delete", "end", "insert"}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm disturbed by your lack of Contra cheat code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going, only about halfway through shuffle_assemble unfortunately.
int& max_branch_depth) | ||
{ | ||
bool const is_struct = col->type == cudf::type_id::STRUCT; | ||
if (col->type == cudf::type_id::STRING || col->type == cudf::type_id::LIST) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're doing this twice, should we bool const is_string_or_list
this thing?
cudf::device_span<size_t const> partition_offsets, | ||
size_t per_partition_metadata_size) | ||
{ | ||
if (threadIdx.x != 0) { return; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me sad, but I assume there isn't a good way around it.
validity_pad); | ||
size_type const* offsets = reinterpret_cast<size_type const*>(partitions.begin() + offsets_begin); | ||
|
||
auto base_col_index = column_metadata.size() * partition_index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should can we const this?
// fallthrough | ||
default: { | ||
if (rc_stack_pos >= 0) { | ||
auto const num_children = --rc_stack[(rc_stack_pos * 2) + 1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment describing this calculation would be welcome.
// returns: | ||
// - a vector of assemble_column_info structs representing the destination column data. | ||
// the vector is of length global_metadata.col_info.size() that is, the flattened list of columns | ||
// in the table. | ||
// | ||
// - the same vector as above, but in host memory. | ||
// | ||
// - a vector of assemble_column_info structs, representing the source column data. | ||
// the vector is of length global_metadata.col_info.size() * the # of partitions. | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// returns: | |
// - a vector of assemble_column_info structs representing the destination column data. | |
// the vector is of length global_metadata.col_info.size() that is, the flattened list of columns | |
// in the table. | |
// | |
// - the same vector as above, but in host memory. | |
// | |
// - a vector of assemble_column_info structs, representing the source column data. | |
// the vector is of length global_metadata.col_info.size() * the # of partitions. | |
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems out of date and out of place.
partitions + partition_offsets[partition_index] + sizeof(partition_header)); | ||
auto const col_index = i / num_partitions; | ||
|
||
return has_validity_buf[col_index / 32] & (1 << (col_index % 32)) ? 1 : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a helper function in the null mask code for this computation?
bitmask_type const* const has_validity_buf = | ||
reinterpret_cast<bitmask_type const*>(buf_start + sizeof(partition_header)); | ||
cinstance_info.has_validity = | ||
has_validity_buf[col_index / 32] & (1 << (col_index % 32)) ? 1 : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is that calculation again. Should we bump this out into a function?
int shmem_limit_per_block; | ||
CUDF_CUDA_TRY(cudaDeviceGetAttribute( | ||
&shmem_limit_per_block, cudaDevAttrMaxSharedMemoryPerBlock, device_id)); | ||
CUDF_EXPECTS(shmem_per_block <= shmem_limit_per_block, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the failure plan here? Will Java catch this and re-run on CPU? How deep of a table is required?
* | ||
* @return Size of the required allocation in bytes | ||
*/ | ||
constexpr size_t bitmask_allocation_size_bytes(size_type number_of_bits, int pad) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we reproducing this code to make it constexpr
?
// we need to ignore them because the split-copy happens at | ||
// byte boundaries, not bit/row boundaries. so we may have | ||
// irrelevant rows at the very beginning. | ||
int dst_bit_shift; // dest bit (left) shift |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does right shift not equal left shift? I would think the number of bits you ignore would be the number you shift back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looking at this. I have a very basic java API and I am just getting started to test conf compatibility with the two.
namespace spark_rapids_jni { | ||
|
||
// TODO: this is duplicated from cudf because the cudf function is not marked as constexpr, so it | ||
// cannot be called on the gpu there is an issue filed against cudf to make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could you include a link to that issue here? I don't see the issue when I search for it.
if (input.num_columns() == 0) { return {}; } | ||
// empty inputs | ||
if (input.num_columns() == 0 || input.num_rows() == 0) { | ||
rmm::device_uvector<size_t> empty_offsets(1, stream, mr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we want this to be an empty buffer or an error. Kudo CPU explicitly supports rows with no columns, but CUDF cannot support this, so we cannot do it here. Because of that we have to make sure that we use the CPU version for those cases, and an error would make it more obvious that we messed something up.
uint32_t num_rows; | ||
uint32_t validity_size; | ||
uint32_t offset_size; | ||
uint32_t data_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last value in here is supposed to be the number of columns.
uint32_t num_columns;
This feels redundant, seeing how we need the schema for this to be read back in. So should I remove it from the CPU code or should we add it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liurenjie1024 the numColummns field in the header looks like it should be something that we do not need. Is there a reason we are including it instead of relying on the schema to get it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it could be inferred from schema. But this field helps simplifying code a little since there is a bit array to indicate whether each column has validity buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One extra usage of num_cols
is that it could be used for identity if it's a row count only record, 0
means row count only.
I am seeing another difference between this and the CPU code. It looks like this is padding the data after the header, whereas the CPU is not doing that. I think is because the header is parsed totally on the host, and then only the data part is sent to the GPU. I was able to remove the columnCount from the header, so not I am going to try and add in padding too. But I would like some feedback from @liurenjie1024 and @nvdbaranec on what we want a final solution to look like. |
@nvdbaranec can you please explain to me how padding works for this PR? It looks like there is padding after the header, but there is no padding in between columns? Is that right? |
@liurenjie1024 can you please explain to me why we have different padding for concat vs over the wire? It is very confusing to see that some are padded to 4 bytes and others are padded to 64 bytes. |
…te copy sizes and bit shifts.
|
Padding over the wire is 4 bytes is for reducing padding size, which is very important for small partitions. |
64-bytes was me being overly cautious. Cuda malloc does 64-byte alignment (I think), because like malloc it does not know the data types so it cannot make any assumption about how it needs to be aligned. In reality we need each column to be aligned based on how it is accessed.
Offsets we could have as 4 byte aligned, but because CUDF supports longer offsets we might want to think about 8 byte aligned (unless we have a way to store what type of offset is being stored in the header) In JCUDF serialization I thought about reordering the columns to reduce the amount of padding that would be needed, but I never did it because it didn't feel like it saved that much, but I wasn't looking at it from thousands of partitions. @nvdbaranec how is the padding needed for the kernels in this patch to work? Would it be alright if there was no padding at all? Or is the data copied in a non-byte wise way? |
} | ||
} | ||
|
||
TEST_F(ShuffleSplitTests, Simple) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need tests for bools and decimal32/64/128 (nothing here tests scale factor). Especially 128 if the data is really only going to be 8-byte-aligned.
} | ||
auto temp_mr = cudf::get_current_device_resource_ref(); | ||
|
||
size_t const num_partitions = splits.size() + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the last split is at the end of the column, will we generate an empty partition at the end then? Is that what we want? Still reading everything.
@nvdbaranec I was looking for the hasValidity part of the header and I am not finding it in this code, but it is in the CPU code, just like the numColumns part. spark-rapids-jni/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoTableHeader.java Lines 79 to 82 in ef7bb92
Is this intentional or is it an oversight? |
That sounds reasonable to me. The CPU version has the following padding
@nvdbaranec I modified your patch to have a num_columns field. diff --git a/src/main/cpp/src/shuffle_split.cu b/src/main/cpp/src/shuffle_split.cu
index 92f7dfc92..4ecbb3eef 100644
--- a/src/main/cpp/src/shuffle_split.cu
+++ b/src/main/cpp/src/shuffle_split.cu
@@ -637,6 +637,8 @@ __global__ void pack_per_partition_metadata_kernel(uint8_t* out_buffer,
pheader->offset_size =
cudf::hashing::detail::swap_endian(static_cast<uint32_t>(psize.offset_size));
pheader->data_size = cudf::hashing::detail::swap_endian(static_cast<uint32_t>(psize.data_size));
+ pheader->num_flattened_columns =
+ cudf::hashing::detail::swap_endian(static_cast<uint32_t>(columns_per_partition));
}
bitmask_type* has_validity =
diff --git a/src/main/cpp/src/shuffle_split_detail.hpp b/src/main/cpp/src/shuffle_split_detail.hpp
index da1841e80..d53cfeeb4 100644
--- a/src/main/cpp/src/shuffle_split_detail.hpp
+++ b/src/main/cpp/src/shuffle_split_detail.hpp
@@ -61,6 +61,7 @@ struct partition_header {
uint32_t validity_size;
uint32_t offset_size;
uint32_t data_size;
+ uint32_t num_flattened_columns; // used by the CPU version, but not the GPU version
};
// padding values for each validity type, as applied at the end of that data type
@@ -92,4 +93,4 @@ enum class buffer_type { VALIDITY = 0, OFFSETS = 1, DATA = 2 };
} // namespace detail
-} // namespace spark_rapids_jni
\ No newline at end of file
+} // namespace spark_rapids_jni I have been trying to modify the CPU code to do padding like you are doing, but it is proving to be a little difficult. I'll keep trying to see if I can make this work. |
{ | ||
// if the input is empty, just generate an empty table | ||
if (partition_offsets.size() == 1) { return build_empty_table(metadata.col_info, stream, mr); } | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a shortcut here for there being only one partition
Has this been tested yet on complex customer data? |
// fallthrough | ||
default: { | ||
if (rc_stack_pos >= 0) { | ||
src_row_index = rc_stack[(rc_stack_pos * 2) + 2]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this need to be rc_stack_pos * 3 instead of * 2?
@nvdbaranec I got an Illegal Memory Access issue when I added the following test (note it was in java, but I ported it to C++) diff --git a/src/main/cpp/tests/shuffle_split.cpp b/src/main/cpp/tests/shuffle_split.cpp
index 97f68f0e3..0d9a59c66 100644
--- a/src/main/cpp/tests/shuffle_split.cpp
+++ b/src/main/cpp/tests/shuffle_split.cpp
@@ -104,6 +104,23 @@ TEST_F(ShuffleSplitTests, Strings)
}
}
+TEST_F(ShuffleSplitTests, SimpleWithStrings)
+{
+ cudf::test::fixed_width_column_wrapper<int8_t> col0(
+ {0, 0xF0, 0x0F, 0xAA, 0},
+ {1, 0, 0, 0, 1});
+ cudf::test::strings_column_wrapper col1(
+ {"0xFF", "", "0x0F", "0xAA", "0x55"},
+ {0, 1, 0, 0, 0});
+
+ // 2 columns split once
+ {
+ cudf::table_view tbl{{static_cast<cudf::column_view>(col0),
+ static_cast<cudf::column_view>(col1)}};
+ run_split(tbl, {5});
+ }
+}
+
TEST_F(ShuffleSplitTests, Lists)
{
// list<uint64_t>
@@ -360,4 +377,4 @@ TEST_F(ShuffleSplitTests, NestedTypes)
run_split(tbl, {2});
run_split(tbl, {2, 4});
}
-}
\ No newline at end of file
+}
|
word = (word >> batch.src_bit_shift) & relevant_row_mask; | ||
|
||
// any bits that are not being stored in the current dest word get overflowed to the next copy | ||
prev_word[0] = word >> (32 - batch.dst_bit_shift); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do any of the existing test trigger these misaligned cases? I just want to make sure our tests are actually hitting them. Do printfs in here show all kinds of alignments? Ditto for the main loop below.
* included in the buffer. So the source buffer will be 2 bytes (10 bits) even though the we are | ||
* only using 8 bits of that data in the output. Therefore, ths size returned from this functor is | ||
* only valid for -source- data. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a guarantee that the src buffer validity bits are aligned to the beginning of a byte? Suppose we have 10 rows and an offset of 7, wouldn't we need 3 bytes? E.g.: xxxxxxxB BBBBBBBB Bxxxxxxx
This PR provides the GPU implementation of the split and assemble halves of the Kud0 shuffle format. It is dependent on #2799. That PR exists solely to help reduce the amount of code that needs to get reviewed for this PR.
shuffle_split.cu
should be a little easier to review once it gets merged.shuffle_split
implements the sender side half of the Kud0 format. Splitting an input table up into the final output binary and associated metadata.shuffle_split.cu
is where it is implemented, and is based oncudf::contiguous_split
shuffle_assemble
implements the receiver side of the process - reassembling one or more blobs produced by one or moreshuffle_split
calls.This is feature complete and in a reviewable state, but there is a known issue with
struct<list>
andstruct<string>
columns that I have to fix. I don't expect it to alter the code much. I also need to add more tests. But in the interest of getting reviews going, I'm putting this up in draft form.