Skip to content

Commit

Permalink
[Refactor] Support adding custom accelerator to resources in rayStart…
Browse files Browse the repository at this point in the history
…Params (#2425)
  • Loading branch information
mounchin authored Oct 8, 2024
1 parent b5f14f1 commit bf21d2d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 40 deletions.
94 changes: 63 additions & 31 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"os"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -757,27 +758,9 @@ func generateRayStartCommand(ctx context.Context, nodeType rayv1.RayNodeType, ra
}
}

if _, ok := rayStartParams["num-gpus"]; !ok {
// Scan for resource keys ending with "gpu" like "nvidia.com/gpu".
for resourceKey, resource := range resource.Limits {
if strings.HasSuffix(string(resourceKey), "gpu") && !resource.IsZero() {
rayStartParams["num-gpus"] = strconv.FormatInt(resource.Value(), 10)
// For now, only support one GPU type. Break on first match.
break
}
}
}

if _, ok := rayStartParams["resources"]; !ok {
for resourceKey, resource := range resource.Limits {
if rayResourceName, ok := customAcceleratorToRayResourceMap[string(resourceKey)]; ok && !resource.IsZero() {
if err := addCustomAcceleratorToResourcesIfNotExists(rayStartParams, rayResourceName, resource.Value()); err != nil {
log.Error(err, fmt.Sprintf("failed to add %s to resources", rayResourceName))
}
// For now, only support one custom accelerator type. Break on first match.
break
}
}
// Add GPU and custom accelerator resources to rayStartParams if not already present.
if err := addWellKnownAcceleratorResources(rayStartParams, resource.Limits); err != nil {
log.Error(err, "failed to add accelerator resources to rayStartParams")
}

rayStartCmd := ""
Expand All @@ -793,30 +776,71 @@ func generateRayStartCommand(ctx context.Context, nodeType rayv1.RayNodeType, ra
return rayStartCmd
}

func addCustomAcceleratorToResourcesIfNotExists(rayStartParams map[string]string, resourceName string, resourceCount int64) error {
func addWellKnownAcceleratorResources(rayStartParams map[string]string, resourceLimits corev1.ResourceList) error {
if len(resourceLimits) == 0 {
return nil
}

resourcesMap, err := getResourcesMap(rayStartParams)
if err != nil {
return err
return fmt.Errorf("failed to get resources map from rayStartParams: %w", err)
}

if _, exists := resourcesMap[resourceName]; !exists {
resourcesMap[resourceName] = float64(resourceCount)
}
// Flag to track if any custom accelerator resource are present/added in rayStartParams resources.
isCustomAcceleratorResourceAdded := isCustomAcceleratorPresentInResources(resourcesMap)

updatedResourcesStr, err := json.Marshal(resourcesMap)
if err != nil {
return fmt.Errorf("failed to marshal resources map to string %w", err)
for resourceKey, resourceValue := range resourceLimits {
resourceKeyString := string(resourceKey)

// Scan for resource keys ending with "gpu" like "nvidia.com/gpu"
if _, ok := rayStartParams["num-gpus"]; !ok {
if strings.HasSuffix(resourceKeyString, "gpu") && !resourceValue.IsZero() {
rayStartParams["num-gpus"] = strconv.FormatInt(resourceValue.Value(), 10)
}
}

// Add the first encountered custom accelerator resource from the resource limits to the rayStartParams if not already present
if !isCustomAcceleratorResourceAdded {
if rayResourceName, ok := customAcceleratorToRayResourceMap[resourceKeyString]; ok && !resourceValue.IsZero() {
if _, exists := resourcesMap[rayResourceName]; !exists {
resourcesMap[rayResourceName] = float64(resourceValue.Value())

// Update the resources map in the rayStartParams
updatedResourcesStr, err := json.Marshal(resourcesMap)
if err != nil {
return fmt.Errorf("failed to marshal resources map to string: %w", err)
}

rayStartParams["resources"] = fmt.Sprintf("'%s'", updatedResourcesStr)
}
isCustomAcceleratorResourceAdded = true
}
}
}

rayStartParams["resources"] = string(updatedResourcesStr)
return nil
}

func isCustomAcceleratorPresentInResources(resourcesMap map[string]float64) bool {
// Check whether there exists any custom accelerator resources specified as part of rayStartParams
if len(resourcesMap) > 0 {
for _, customAcceleratorRayResource := range customAcceleratorToRayResourceMap {
if _, ok := resourcesMap[customAcceleratorRayResource]; ok {
return true
}
}
}

return false
}

func getResourcesMap(rayStartParams map[string]string) (map[string]float64, error) {
var resources map[string]float64
if resourcesStr, ok := rayStartParams["resources"]; !ok {
resources = make(map[string]float64)
} else {
// Trim any surrounding quotes (single, double, or backticks) and spaces
resourcesStr = strings.Trim(resourcesStr, "'\"` ")
err := json.Unmarshal([]byte(resourcesStr), &resources)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal resources %w", err)
Expand All @@ -826,11 +850,19 @@ func getResourcesMap(rayStartParams map[string]string) (map[string]float64, erro
}

func convertParamMap(rayStartParams map[string]string) (s string) {
// Order rayStartParams keys for consistent ray start command flags generation
keys := make([]string, 0, len(rayStartParams))
for k := range rayStartParams {
keys = append(keys, k)
}
sort.Strings(keys)

flags := new(bytes.Buffer)
// specialParameterOptions' arguments can be true or false.
// For example, --log-color can be auto | false | true.
specialParameterOptions := []string{"log-color", "include-dashboard"}
for option, argument := range rayStartParams {
for _, option := range keys {
argument := rayStartParams[option]
if utils.Contains([]string{"true", "false"}, strings.ToLower(argument)) && !utils.Contains(specialParameterOptions, option) {
// booleanOptions: do not require any argument. Essentially represent boolean on-off switches.
if strings.ToLower(argument) == "true" {
Expand Down
68 changes: 60 additions & 8 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,11 +1158,12 @@ func TestInitLivenessAndReadinessProbe(t *testing.T) {

func TestGenerateRayStartCommand(t *testing.T) {
tests := []struct {
rayStartParams map[string]string
name string
expected string
nodeType rayv1.RayNodeType
resource corev1.ResourceRequirements
rayStartParams map[string]string
mockCustomAcceleratorToRayResourceMap map[string]string
name string
expected string
nodeType rayv1.RayNodeType
resource corev1.ResourceRequirements
}{
{
name: "WorkerNode with GPU",
Expand All @@ -1184,20 +1185,62 @@ func TestGenerateRayStartCommand(t *testing.T) {
"aws.amazon.com/neuroncore": resource.MustParse("4"),
},
},
expected: `ray start --head --resources={"neuron_cores":4} `,
expected: `ray start --head --resources='{"neuron_cores":4}' `,
},
{
name: "HeadNode with multiple accelerators",
nodeType: rayv1.HeadNode,
rayStartParams: map[string]string{},
resource: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"aws.amazon.com/neuroncore": resource.MustParse("4"),
"nvidia.com/gpu": resource.MustParse("1"),
},
},
expected: `ray start --head --num-gpus=1 --resources='{"neuron_cores":4}' `,
},
{
name: "HeadNode with multiple custom accelerators",
nodeType: rayv1.HeadNode,
rayStartParams: map[string]string{},
resource: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"cloud-tpus.google.com/v3": resource.MustParse("8"),
"aws.amazon.com/neuroncore": resource.MustParse("4"),
"nvidia.com/gpu": resource.MustParse("1"),
},
},
mockCustomAcceleratorToRayResourceMap: map[string]string{
NeuronCoreContainerResourceName: NeuronCoreRayResourceName,
"cloud-tpus.google.com/v3": "tpu",
},
expected: `ray start --head --num-gpus=1 --resources='{"tpu":8}' `,
},
{
name: "HeadNode with existing resources",
nodeType: rayv1.HeadNode,
rayStartParams: map[string]string{
"resources": `{"custom_resource":2}`,
"resources": `"{"custom_resource":2}"`,
},
resource: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"aws.amazon.com/neuroncore": resource.MustParse("4"),
},
},
expected: `ray start --head --resources={"custom_resource":2} `,
expected: `ray start --head --resources='{"custom_resource":2,"neuron_cores":4}' `,
},
{
name: "HeadNode with existing neuron_cores resources",
nodeType: rayv1.HeadNode,
rayStartParams: map[string]string{
"resources": `'{"custom_resource":2,"neuron_cores":3}'`,
},
resource: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"aws.amazon.com/neuroncore": resource.MustParse("4"),
},
},
expected: `ray start --head --resources='{"custom_resource":2,"neuron_cores":3}' `,
},
{
name: "HeadNode with invalid resources string",
Expand All @@ -1223,6 +1266,15 @@ func TestGenerateRayStartCommand(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Mock the customAcceleratorToRayResourceMap with the value specified in the test
if tt.mockCustomAcceleratorToRayResourceMap != nil {
originalCustomAcceleratorToRayResourceMap := customAcceleratorToRayResourceMap
customAcceleratorToRayResourceMap = tt.mockCustomAcceleratorToRayResourceMap
defer func() {
customAcceleratorToRayResourceMap = originalCustomAcceleratorToRayResourceMap
}()
}

result := generateRayStartCommand(context.TODO(), tt.nodeType, tt.rayStartParams, tt.resource)
assert.Equal(t, tt.expected, result)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestRayClusterAutoscalerWithCustomResource(t *testing.T) {
WithMinReplicas(0).
WithMaxReplicas(3).
WithGroupName(groupName).
WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `"{\"CustomResource\": 1}"`}).
WithRayStartParams(map[string]string{"num-cpus": "1", "resources": `'{"CustomResource": 1}'`}).
WithTemplate(workerPodTemplateApplyConfiguration()))
rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name).
WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))
Expand Down

0 comments on commit bf21d2d

Please sign in to comment.