Skip to content

Commit

Permalink
reduce memory waste of row buffer (xitongsys#439)
Browse files Browse the repository at this point in the history
* better estimation of the size of decode output buffers

* detect whether some encodings support decoding values in place

* fix page size

* chunk large writes of rows

* support configuring the sorting buffer pool

* debug page buffer ref count

* track stack traces of buffer allocations

* fix buffer reference counting

* fix backward compatiblity with Go 1.17

* set PARQUETGODEBUG=tracbuf=1 in CI

* reduce memory waste of row buffer
  • Loading branch information
Achille authored Dec 9, 2022
1 parent 97925e3 commit c5ee95a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 52 deletions.
55 changes: 16 additions & 39 deletions row_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type RowBuffer[T any] struct {
schema *Schema
sorting []SortingColumn
rows []Row
numRows int
values []Value
compare func(Row, Row) int
}

Expand Down Expand Up @@ -56,17 +56,19 @@ func NewRowBuffer[T any](options ...RowGroupOption) *RowBuffer[T] {

// Reset clears the content of the buffer without releasing its memory.
func (buf *RowBuffer[T]) Reset() {
for _, row := range buf.rows[:buf.numRows] {
for i := range row {
row[i] = Value{}
}
for i := range buf.rows {
buf.rows[i] = nil
}
for i := range buf.values {
buf.values[i] = Value{}
}
buf.rows = buf.rows[:0]
buf.values = buf.values[:0]
buf.alloc.reset()
buf.numRows = 0
}

// NumRows returns the number of rows currently written to the buffer.
func (buf *RowBuffer[T]) NumRows() int64 { return int64(buf.numRows) }
func (buf *RowBuffer[T]) NumRows() int64 { return int64(len(buf.rows)) }

// ColumnChunks returns a view of the buffer's columns.
//
Expand All @@ -86,7 +88,7 @@ func (buf *RowBuffer[T]) ColumnChunks() []ColumnChunk {
leafColumn, _ := buf.schema.Lookup(column...)
chunks[i] = rowBufferColumnChunk{
page: rowBufferPage{
rows: buf.rows[:buf.numRows],
rows: buf.rows,
typ: leafColumn.Node.Type(),
column: leafColumn.ColumnIndex,
maxRepetitionLevel: byte(leafColumn.MaxRepetitionLevel),
Expand Down Expand Up @@ -119,7 +121,7 @@ func (buf *RowBuffer[T]) Schema() *Schema { return buf.schema }
// Len returns the number of rows in the buffer.
//
// The method contributes to satisfying sort.Interface.
func (buf *RowBuffer[T]) Len() int { return buf.numRows }
func (buf *RowBuffer[T]) Len() int { return len(buf.rows) }

// Less compares the rows at index i and j according to the sorting columns
// configured on the buffer.
Expand All @@ -142,55 +144,30 @@ func (buf *RowBuffer[T]) Swap(i, j int) {
// The returned rows and values read from it remain valid until the next call
// to Reset on the buffer.
func (buf *RowBuffer[T]) Rows() Rows {
return &rowBufferRows{rows: buf.rows[:buf.numRows], schema: buf.schema}
return &rowBufferRows{rows: buf.rows, schema: buf.schema}
}

// Write writes rows to the buffer, returning the number of rows written.
func (buf *RowBuffer[T]) Write(rows []T) (int, error) {
buf.reserve(len(rows))

for i := range rows {
bufRow := buf.rows[buf.numRows][:0]
bufRow = buf.schema.Deconstruct(bufRow, &rows[i])
bufRow := buf.schema.Deconstruct(buf.values, &rows[i])
buf.alloc.capture(bufRow)
buf.rows[buf.numRows] = bufRow
buf.numRows++
buf.rows = append(buf.rows, bufRow)
}

return len(rows), nil
}

// WriteRows writes parquet rows to the buffer, returing the number of rows
// written.
func (buf *RowBuffer[T]) WriteRows(rows []Row) (int, error) {
buf.reserve(len(rows))

for _, row := range rows {
bufRow := buf.rows[buf.numRows][:0]
bufRow = append(bufRow, row...)
bufRow := append(buf.values, row...)
buf.alloc.capture(bufRow)
buf.rows[buf.numRows] = bufRow
buf.numRows++
buf.rows = append(buf.rows, bufRow)
}

return len(rows), nil
}

func (buf *RowBuffer[T]) reserve(n int) {
if newLen := buf.numRows + n; newLen > len(buf.rows) {
bufLen := 2 * len(buf.rows)
if bufLen == 0 {
bufLen = defaultValueBufferSize
}
for bufLen < newLen {
bufLen *= 2
}
newRows := make([]Row, bufLen)
copy(newRows, buf.rows)
buf.rows = newRows
}
}

type rowBufferColumnChunk struct{ page rowBufferPage }

func (c *rowBufferColumnChunk) Type() Type { return c.page.Type() }
Expand Down
26 changes: 13 additions & 13 deletions sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// results in better CPU cache utilization since sorting multi-megabyte arrays
// causes a lot of cache misses since the data set cannot be held in CPU caches.
type SortingWriter[T any] struct {
rows *RowBuffer[T]
rowbuf *RowBuffer[T]
writer *GenericWriter[T]
output *GenericWriter[T]
buffer io.ReadWriteSeeker
Expand All @@ -48,7 +48,7 @@ func NewSortingWriter[T any](output io.Writer, sortRowCount int64, options ...Wr
panic(err)
}
return &SortingWriter[T]{
rows: NewRowBuffer[T](&RowGroupConfig{
rowbuf: NewRowBuffer[T](&RowGroupConfig{
Schema: config.Schema,
Sorting: config.Sorting,
}),
Expand Down Expand Up @@ -122,7 +122,7 @@ func (w *SortingWriter[T]) Flush() error {

reader := RowReader(rows)
if w.sorting.DropDuplicatedRows {
reader = DedupeRowReader(rows, w.rows.compare)
reader = DedupeRowReader(rows, w.rowbuf.compare)
}

if _, err := CopyRows(w.output, reader); err != nil {
Expand All @@ -134,7 +134,7 @@ func (w *SortingWriter[T]) Flush() error {

func (w *SortingWriter[T]) Reset(output io.Writer) {
w.output.Reset(output)
w.rows.Reset()
w.rowbuf.Reset()
w.resetSortingBuffer()
}

Expand All @@ -149,24 +149,24 @@ func (w *SortingWriter[T]) resetSortingBuffer() {
}

func (w *SortingWriter[T]) Write(rows []T) (int, error) {
return w.writeRows(len(rows), func(i, j int) (int, error) { return w.rows.Write(rows[i:j]) })
return w.writeRows(len(rows), func(i, j int) (int, error) { return w.rowbuf.Write(rows[i:j]) })
}

func (w *SortingWriter[T]) WriteRows(rows []Row) (int, error) {
return w.writeRows(len(rows), func(i, j int) (int, error) { return w.rows.WriteRows(rows[i:j]) })
return w.writeRows(len(rows), func(i, j int) (int, error) { return w.rowbuf.WriteRows(rows[i:j]) })
}

func (w *SortingWriter[T]) writeRows(numRows int, writeRows func(i, j int) (int, error)) (int, error) {
wn := 0

for wn < numRows {
if w.rows.NumRows() >= w.maxRows {
if w.rowbuf.NumRows() >= w.maxRows {
if err := w.sortAndWriteBufferedRows(); err != nil {
return wn, err
}
}

n := int(w.maxRows - w.rows.NumRows())
n := int(w.maxRows - w.rowbuf.NumRows())
n += wn
if n > numRows {
n = numRows
Expand All @@ -192,19 +192,19 @@ func (w *SortingWriter[T]) Schema() *Schema {
}

func (w *SortingWriter[T]) sortAndWriteBufferedRows() error {
if w.rows.Len() == 0 {
if w.rowbuf.Len() == 0 {
return nil
}

defer w.rows.Reset()
sort.Sort(w.rows)
defer w.rowbuf.Reset()
sort.Sort(w.rowbuf)

if w.sorting.DropDuplicatedRows {
w.rows.numRows = w.dedupe.deduplicate(w.rows.rows[:w.rows.numRows], w.rows.compare)
w.rowbuf.rows = w.rowbuf.rows[:w.dedupe.deduplicate(w.rowbuf.rows, w.rowbuf.compare)]
defer w.dedupe.reset()
}

rows := w.rows.Rows()
rows := w.rowbuf.Rows()
defer rows.Close()

if w.buffer == nil {
Expand Down

0 comments on commit c5ee95a

Please sign in to comment.