Skip to content

Commit

Permalink
test: support for testing parallelization planner rules (#4541)
Browse files Browse the repository at this point in the history
Moved helpers for creating plan nodes with attributes into plantest so we can
use those in planner rule tests.

Including output and required physical attributes in the want/got diff in
PhysicalRuleTestHelper. This lets us specify attributes in planner rule tests
for the parallelization rules.
  • Loading branch information
adrian-thurston authored Mar 9, 2022
1 parent 44cf37b commit 93bfbc9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 74 deletions.
114 changes: 46 additions & 68 deletions execute/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,6 @@ func init() {
execute.RegisterSource(executetest.ParallelFromTestKind, executetest.CreateParallelFromSource)
}

type physicalNodeOption func(*plan.PhysicalPlanNode)

func withOutputAttr(name string, attr plan.PhysicalAttr) physicalNodeOption {
return func(node *plan.PhysicalPlanNode) {
node.SetOutputAttr(name, attr)
}
}

func withRequiredAttr(name string, attr plan.PhysicalAttr) physicalNodeOption {
return func(node *plan.PhysicalPlanNode) {
node.SetRequiredAttr(name, attr)
}
}

func createPhysicalNode(id plan.NodeID, spec plan.PhysicalProcedureSpec, opts ...physicalNodeOption) *plan.PhysicalPlanNode {
node := plan.CreatePhysicalNode(id, spec)
for _, opt := range opts {
opt(node)
}
return node
}

func TestParallel_Execute(t *testing.T) {

testcases := []struct {
Expand All @@ -65,7 +43,7 @@ func TestParallel_Execute(t *testing.T) {
name: `parallel-from-merge-filter`,
spec: &plantest.PlanSpec{
Nodes: []plan.Node{
createPhysicalNode("parallel-from-test",
plantest.CreatePhysicalNode("parallel-from-test",
executetest.NewParallelFromProcedureSpec(
[]*executetest.ParallelTable{
{
Expand Down Expand Up @@ -109,17 +87,17 @@ func TestParallel_Execute(t *testing.T) {
ResidesOnPartition: 1,
},
}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
createPhysicalNode("filter", &universe.FilterProcedureSpec{
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
plantest.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: interpreter.ResolvedFunction{
Scope: runtime.Prelude(),
Fn: executetest.FunctionExpression(t, "(r) => r._value < 7.5"),
},
}),
createPhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
plantest.CreatePhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
},
Edges: [][2]int{
{0, 1},
Expand Down Expand Up @@ -170,7 +148,7 @@ func TestParallel_Execute(t *testing.T) {
name: `parallel-from-filter-merge`,
spec: &plantest.PlanSpec{
Nodes: []plan.Node{
createPhysicalNode("parallel-from-test",
plantest.CreatePhysicalNode("parallel-from-test",
executetest.NewParallelFromProcedureSpec(
[]*executetest.ParallelTable{
{
Expand Down Expand Up @@ -214,20 +192,20 @@ func TestParallel_Execute(t *testing.T) {
ResidesOnPartition: 1,
},
}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("filter",
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("filter",
&universe.FilterProcedureSpec{
Fn: interpreter.ResolvedFunction{
Scope: runtime.Prelude(),
Fn: executetest.FunctionExpression(t, "(r) => r._value < 7.5"),
},
},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
createPhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
plantest.CreatePhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
},
Edges: [][2]int{
{0, 1},
Expand Down Expand Up @@ -276,7 +254,7 @@ func TestParallel_Execute(t *testing.T) {
name: `parallel-from-merge-no-successor`,
spec: &plantest.PlanSpec{
Nodes: []plan.Node{
createPhysicalNode("parallel-from-test",
plantest.CreatePhysicalNode("parallel-from-test",
executetest.NewParallelFromProcedureSpec(
[]*executetest.ParallelTable{
{
Expand Down Expand Up @@ -320,10 +298,10 @@ func TestParallel_Execute(t *testing.T) {
ResidesOnPartition: 1,
},
}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
},
Edges: [][2]int{
{0, 1},
Expand Down Expand Up @@ -374,21 +352,21 @@ func TestParallel_Execute(t *testing.T) {
name: `from-missing-output`,
spec: &plantest.PlanSpec{
Nodes: []plan.Node{
createPhysicalNode("parallel-from-test",
plantest.CreatePhysicalNode("parallel-from-test",
executetest.NewParallelFromProcedureSpec([]*executetest.ParallelTable{})),
createPhysicalNode("filter",
plantest.CreatePhysicalNode("filter",
&universe.FilterProcedureSpec{
Fn: interpreter.ResolvedFunction{
Scope: runtime.Prelude(),
Fn: executetest.FunctionExpression(t, "(r) => r._value < 7.5"),
},
},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
createPhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
plantest.CreatePhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
},
Edges: [][2]int{
{0, 1},
Expand All @@ -409,21 +387,21 @@ func TestParallel_Execute(t *testing.T) {
name: `from-missing-required`,
spec: &plantest.PlanSpec{
Nodes: []plan.Node{
createPhysicalNode("parallel-from-test",
plantest.CreatePhysicalNode("parallel-from-test",
executetest.NewParallelFromProcedureSpec([]*executetest.ParallelTable{}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("filter",
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("filter",
&universe.FilterProcedureSpec{
Fn: interpreter.ResolvedFunction{
Scope: runtime.Prelude(),
Fn: executetest.FunctionExpression(t, "(r) => r._value < 7.5"),
},
},
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
withOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
createPhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2}),
plantest.WithOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 2})),
plantest.CreatePhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
},
Edges: [][2]int{
{0, 1},
Expand All @@ -443,22 +421,22 @@ func TestParallel_Execute(t *testing.T) {
name: `from-factor-mismatch`,
spec: &plantest.PlanSpec{
Nodes: []plan.Node{
createPhysicalNode("parallel-from-test",
plantest.CreatePhysicalNode("parallel-from-test",
executetest.NewParallelFromProcedureSpec([]*executetest.ParallelTable{}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
createPhysicalNode("filter",
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 2})),
plantest.CreatePhysicalNode("filter",
&universe.FilterProcedureSpec{
Fn: interpreter.ResolvedFunction{
Scope: runtime.Prelude(),
Fn: executetest.FunctionExpression(t, "(r) => r._value < 7.5"),
},
},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 1}),
withOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 1})),
createPhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
withRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 1}),
withOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 1})),
createPhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 1}),
plantest.WithOutputAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 1})),
plantest.CreatePhysicalNode("merge", &universe.PartitionMergeProcedureSpec{},
plantest.WithRequiredAttr(plan.ParallelRunKey, plan.ParallelRunAttribute{Factor: 1}),
plantest.WithOutputAttr(plan.ParallelMergeKey, plan.ParallelMergeAttribute{Factor: 1})),
plantest.CreatePhysicalNode("yield", executetest.NewYieldProcedureSpec("_result")),
},
Edges: [][2]int{
{0, 1},
Expand Down
55 changes: 49 additions & 6 deletions plan/plantest/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,23 +201,44 @@ func PhysicalRuleTestHelper(t *testing.T, tc *RuleTestCase, options ...cmp.Optio
}

type testAttrs struct {
ID plan.NodeID
Spec plan.PhysicalProcedureSpec
ID plan.NodeID
Spec plan.PhysicalProcedureSpec
RequiredAttrs plan.PhysicalAttributes
OutputAttrs plan.PhysicalAttributes
}
want := make([]testAttrs, 0)
after.BottomUpWalk(func(node plan.Node) error {
var outputAttrs plan.PhysicalAttributes
var requiredAttrs plan.PhysicalAttributes

if ppn, ok := node.(*plan.PhysicalPlanNode); ok {
outputAttrs = ppn.OutputAttrs
requiredAttrs = ppn.RequiredAttrs
}
want = append(want, testAttrs{
ID: node.ID(),
Spec: node.ProcedureSpec().(plan.PhysicalProcedureSpec),
ID: node.ID(),
Spec: node.ProcedureSpec().(plan.PhysicalProcedureSpec),
RequiredAttrs: requiredAttrs,
OutputAttrs: outputAttrs,
})
return nil
})

got := make([]testAttrs, 0)
pp.BottomUpWalk(func(node plan.Node) error {
var outputAttrs plan.PhysicalAttributes
var requiredAttrs plan.PhysicalAttributes

if ppn, ok := node.(*plan.PhysicalPlanNode); ok {
outputAttrs = ppn.OutputAttrs
requiredAttrs = ppn.RequiredAttrs
}

got = append(got, testAttrs{
ID: node.ID(),
Spec: node.ProcedureSpec().(plan.PhysicalProcedureSpec),
ID: node.ID(),
Spec: node.ProcedureSpec().(plan.PhysicalProcedureSpec),
RequiredAttrs: requiredAttrs,
OutputAttrs: outputAttrs,
})
return nil
})
Expand Down Expand Up @@ -287,3 +308,25 @@ func LogicalRuleTestHelper(t *testing.T, tc *RuleTestCase, options ...cmp.Option
cmp.Diff(want, got, tempOptions...))
}
}

type PhysicalNodeOption func(*plan.PhysicalPlanNode)

func WithOutputAttr(name string, attr plan.PhysicalAttr) PhysicalNodeOption {
return func(node *plan.PhysicalPlanNode) {
node.SetOutputAttr(name, attr)
}
}

func WithRequiredAttr(name string, attr plan.PhysicalAttr) PhysicalNodeOption {
return func(node *plan.PhysicalPlanNode) {
node.SetRequiredAttr(name, attr)
}
}

func CreatePhysicalNode(id plan.NodeID, spec plan.PhysicalProcedureSpec, opts ...PhysicalNodeOption) *plan.PhysicalPlanNode {
node := plan.CreatePhysicalNode(id, spec)
for _, opt := range opts {
opt(node)
}
return node
}

0 comments on commit 93bfbc9

Please sign in to comment.