Skip to content

Commit

Permalink
Add support for disambiguating multiple responses for cache exporters
Browse files Browse the repository at this point in the history
by adding IDs similar to output exporters.

Signed-off-by: a-palchikov <[email protected]>
  • Loading branch information
a-palchikov committed Feb 21, 2025
1 parent 7a96ed6 commit 27b04ac
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 152 deletions.
326 changes: 238 additions & 88 deletions api/services/control/control.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/services/control/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ message CacheOptionsEntry {
// Attrs are like mode=(min,max), ref=example.com:5000/foo/bar .
// See cache importer/exporter implementations' documentation.
map<string, string> Attrs = 2;
// ID identifies this exporter.
// ID should be treated by the exporter as opaque.
string ID = 3;
}

message SolveResponse {
Expand Down
47 changes: 47 additions & 0 deletions api/services/control/control_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 103 additions & 42 deletions client/solve.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,50 +47,63 @@ type SolveOpt struct {
CacheImports []CacheOptionsEntry
Session []session.Attachable
AllowedEntitlements []string
// When the session is custom-initialized, ParseExporterOpts need to be used to correctly
// set up the session for export.
// When the session is custom-initialized, Init can be used to
// set up the session for export automatically.
SharedSession *session.Session // TODO: refactor to better session syncing
SessionPreInitialized bool // TODO: refactor to better session syncing
Internal bool
SourcePolicy *spb.Policy
Ref string

// internal exporter state
// internal solver state
s solverState
}

type solverState struct {
// storesToUpdate maps exporter ID -> oci store
storesToUpdate map[string]ociStore
cacheOpt *cacheOptions
exporterOpt *exporterOptions
cacheOpt *cacheOptions
// Only one of runGateway or def can be set.
// runGateway optionally defines the gateway callback
runGateway runGatewayCB
// def optionally defines the LLB definition for the client
def *llb.Definition
}

type exporterOptions struct {
// storesToUpdate maps exporter ID -> oci store
storesToUpdate map[string]ociStore
}

type cacheOptions struct {
options controlapi.CacheOptions
contentStores map[string]content.Store // key: ID of content store ("local:" + csDir)
storesToUpdate map[string]string // key: path to content store, value: tag
storesToUpdate map[string]ociStore // key: exporter ID
frontendAttrs map[string]string
}

type ociStore struct {
path string
tag string
}

type ExportEntry struct {
Type string
Attrs map[string]string
Output filesync.FileOutputFunc // for ExporterOCI and ExporterDocker
OutputDir string // for ExporterLocal

// id identifies the exporter in the configuration.
// Will be assigned automatically and should not be set by the user.
id string
}

type CacheOptionsEntry struct {
Type string
Attrs map[string]string

// id identifies the exporter in the configuration.
// Will be assigned automatically and should not be set by the user.
id string
}

// Solve calls Solve on the controller.
Expand All @@ -115,9 +128,32 @@ func (c *Client) Solve(ctx context.Context, def *llb.Definition, opt SolveOpt, s

type runGatewayCB func(ref string, s *session.Session, opts map[string]string) error

// ParseExporterOpts configures the specified session with the underlying exporter configuration.
// Init initializes the SolveOpt.
// It parses and initializes the cache exports/imports and output exporters.
func (opt *SolveOpt) Init(ctx context.Context, s *session.Session) error {
opt.initExporterIDs()
if err := opt.parseCacheOptions(ctx); err != nil {
return err
}
return opt.parseExporterOptions(s)
}

func (opt *SolveOpt) initExporterIDs() {
for i := range opt.Exports {
opt.Exports[i].id = strconv.Itoa(i)
}
for i := range opt.CacheExports {
opt.CacheExports[i].id = strconv.Itoa(i)
}
}

// parseExporterOptions configures the specified session with the underlying exporter configuration.
// It needs to be invoked *after* ParseCacheOpts
func ParseExporterOpts(opt *SolveOpt, s *session.Session) error {
func (opt *SolveOpt) parseExporterOptions(s *session.Session) error {
if opt.s.exporterOpt != nil {
return nil
}

mounts, err := prepareMounts(opt)
if err != nil {
return err
Expand Down Expand Up @@ -145,8 +181,9 @@ func ParseExporterOpts(opt *SolveOpt, s *session.Session) error {
contentStores[key2] = store
}

opt.s.exporterOpt = &exporterOptions{}
var syncTargets []filesync.FSSyncTarget
for exID, ex := range opt.Exports {
for _, ex := range opt.Exports {
var supportFile bool
var supportDir bool
switch ex.Type {
Expand All @@ -171,7 +208,7 @@ func ParseExporterOpts(opt *SolveOpt, s *session.Session) error {
if ex.Output == nil {
return errors.Errorf("output file writer is required for %s exporter", ex.Type)
}
syncTargets = append(syncTargets, filesync.WithFSSync(exID, ex.Output))
syncTargets = append(syncTargets, filesync.WithFSSync(ex.id, ex.Output))
}
if supportDir {
if ex.OutputDir == "" {
Expand All @@ -187,12 +224,12 @@ func ParseExporterOpts(opt *SolveOpt, s *session.Session) error {
return err
}
contentStores["export"] = cs
if opt.s.storesToUpdate == nil {
opt.s.storesToUpdate = make(map[string]ociStore)
if opt.s.exporterOpt.storesToUpdate == nil {
opt.s.exporterOpt.storesToUpdate = make(map[string]ociStore)
}
opt.s.storesToUpdate[strconv.Itoa(exID)] = ociStore{path: ex.OutputDir}
opt.s.exporterOpt.storesToUpdate[ex.id] = ociStore{path: ex.OutputDir}
default:
syncTargets = append(syncTargets, filesync.WithFSSyncDir(exID, ex.OutputDir))
syncTargets = append(syncTargets, filesync.WithFSSyncDir(ex.id, ex.OutputDir))
}
}
}
Expand Down Expand Up @@ -236,13 +273,15 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve
}
}

err := ParseCacheOptions(ctx, &opt)
opt.initExporterIDs()

err := opt.parseCacheOptions(ctx)
if err != nil {
return nil, err
}

if !opt.SessionPreInitialized {
if err := ParseExporterOpts(&opt, s); err != nil {
if err := opt.parseExporterOptions(s); err != nil {
return nil, err
}

Expand Down Expand Up @@ -299,8 +338,7 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve
exports = append(exports, &controlapi.Exporter{
Type: exp.Type,
Attrs: exp.Attrs,
// Keep this in sync with SetupExporters id assignment
ID: strconv.Itoa(i),
ID: exp.id,
})
}

Expand Down Expand Up @@ -330,6 +368,7 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve
for _, resp := range resp.ExporterResponses {
res.ExporterResponses = append(res.ExporterResponses, ExporterResponse{
ID: resp.Metadata.ID,
Type: resp.Metadata.Type,
Data: resp.Data,
})
}
Expand Down Expand Up @@ -389,26 +428,27 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve
if err := eg.Wait(); err != nil {
return nil, err
}
// Update index.json of exported cache content store
// FIXME(AkihiroSuda): dedupe const definition of cache/remotecache.ExporterResponseManifestDesc = "cache.manifest"
if manifestDescJSON := res.ExporterResponse["cache.manifest"]; manifestDescJSON != "" {
var manifestDesc ocispecs.Descriptor
if err = json.Unmarshal([]byte(manifestDescJSON), &manifestDesc); err != nil {

for id, store := range opt.s.cacheOpt.storesToUpdate {
// Update index.json of exported cache content store
manifestDesc, err := getCacheManifestDescriptor(id, res)
if err != nil {
return nil, err
}
for storePath, tag := range opt.s.cacheOpt.storesToUpdate {
idx := ociindex.NewStoreIndex(storePath)
if err := idx.Put(manifestDesc, ociindex.Tag(tag)); err != nil {
return nil, err
}
if manifestDesc == nil {
continue
}
idx := ociindex.NewStoreIndex(store.path)
if err := idx.Put(*manifestDesc, ociindex.Tag(store.tag)); err != nil {
return nil, err
}
}

if len(opt.s.storesToUpdate) == 0 {
if len(opt.s.exporterOpt.storesToUpdate) == 0 {
return res, nil
}
for id, store := range opt.s.storesToUpdate {
manifestDesc, err := getManifestDescriptor(id, res)
for id, store := range opt.s.exporterOpt.storesToUpdate {
manifestDesc, err := getImageManifestDescriptor(id, res)
if err != nil {
return nil, err
}
Expand All @@ -433,25 +473,43 @@ func (c *Client) solve(ctx context.Context, opt SolveOpt, statusChan chan *Solve
return res, nil
}

func getManifestDescriptor(exporterID string, resp *SolveResponse) (*ocispecs.Descriptor, error) {
func getCacheManifestDescriptor(exporterID string, resp *SolveResponse) (*ocispecs.Descriptor, error) {
const exporterResponseManifestDesc = "cache.manifest"
if resp := resp.cacheExporter(exporterID); resp != nil {
// FIXME(AkihiroSuda): dedupe const definition of cache/remotecache.ExporterResponseManifestDesc = "cache.manifest"
if manifestDescDt := resp.Data[exporterResponseManifestDesc]; manifestDescDt != "" {
return unmarshalManifestDescriptor(manifestDescDt)
}
}
if manifestDescDt := resp.ExporterResponse[exporterResponseManifestDesc]; manifestDescDt != "" {
return unmarshalManifestDescriptor(manifestDescDt)
}
return nil, nil
}

func getImageManifestDescriptor(exporterID string, resp *SolveResponse) (*ocispecs.Descriptor, error) {
if resp := resp.exporter(exporterID); resp != nil {
if manifestDescDt := resp.Data[exptypes.ExporterImageDescriptorKey]; manifestDescDt != "" {
return unmarshalManifestDescriptor(manifestDescDt)
return unmarshalEncodedManifestDescriptor(manifestDescDt)
}
}
if manifestDescDt := resp.ExporterResponse[exptypes.ExporterImageDescriptorKey]; manifestDescDt != "" {
return unmarshalManifestDescriptor(manifestDescDt)
return unmarshalEncodedManifestDescriptor(manifestDescDt)
}
return nil, nil
}

func unmarshalManifestDescriptor(manifestDesc string) (*ocispecs.Descriptor, error) {
manifestDescDt, err := base64.StdEncoding.DecodeString(manifestDesc)
func unmarshalEncodedManifestDescriptor(base64Payload string) (*ocispecs.Descriptor, error) {
manifestDescDt, err := base64.StdEncoding.DecodeString(base64Payload)
if err != nil {
return nil, err
}
return unmarshalManifestDescriptor(string(manifestDescDt))
}

func unmarshalManifestDescriptor(manifestDescJSON string) (*ocispecs.Descriptor, error) {
var desc ocispecs.Descriptor
if err = json.Unmarshal([]byte(manifestDescDt), &desc); err != nil {
if err := json.Unmarshal([]byte(manifestDescJSON), &desc); err != nil {
return nil, err
}
return &desc, nil
Expand Down Expand Up @@ -502,13 +560,16 @@ func prepareSyncedFiles(def *llb.Definition, localMounts map[string]fsutil.FS) (
return result, nil
}

func ParseCacheOptions(ctx context.Context, opt *SolveOpt) error {
func (opt *SolveOpt) parseCacheOptions(ctx context.Context) error {
if opt.s.cacheOpt != nil {
return nil
}
var (
cacheExports []*controlapi.CacheOptionsEntry
cacheImports []*controlapi.CacheOptionsEntry
)
contentStores := make(map[string]content.Store)
storesToUpdate := make(map[string]string)
storesToUpdate := make(map[string]ociStore)
frontendAttrs := make(map[string]string)
for _, ex := range opt.CacheExports {
if ex.Type == "local" {
Expand All @@ -529,8 +590,7 @@ func ParseCacheOptions(ctx context.Context, opt *SolveOpt) error {
if t, ok := ex.Attrs["tag"]; ok {
tag = t
}
// TODO(AkihiroSuda): support custom index JSON path and tag
storesToUpdate[csDir] = tag
storesToUpdate[ex.id] = ociStore{path: csDir, tag: tag}
}
if ex.Type == "registry" {
regRef := ex.Attrs["ref"]
Expand All @@ -541,6 +601,7 @@ func ParseCacheOptions(ctx context.Context, opt *SolveOpt) error {
cacheExports = append(cacheExports, &controlapi.CacheOptionsEntry{
Type: ex.Type,
Attrs: ex.Attrs,
ID: ex.id,
})
}
for _, im := range opt.CacheImports {
Expand Down
Loading

0 comments on commit 27b04ac

Please sign in to comment.