diff --git a/cache/blobs/blobs.go b/cache/blobs/blobs.go index c509e7a5cca1..c78f0ed7a7c5 100644 --- a/cache/blobs/blobs.go +++ b/cache/blobs/blobs.go @@ -2,13 +2,12 @@ package blobs import ( "context" - "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" + "github.com/containerd/containerd/leases" "github.com/containerd/containerd/mount" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/winlayers" digest "github.com/opencontainers/go-digest" @@ -28,11 +27,17 @@ type DiffPair struct { var ErrNoBlobs = errors.Errorf("no blobs for snapshot") -func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter snapshot.Snapshotter, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) { +// GetDiffPairs returns the DiffID/Blobsum pairs for a giver reference and saves it. +// Caller must hold a lease when calling this function. +func GetDiffPairs(ctx context.Context, contentStore content.Store, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) { if ref == nil { return nil, nil } + if _, ok := leases.FromContext(ctx); !ok { + return nil, errors.Errorf("missing lease requirement for GetDiffPairs") + } + if err := ref.Finalize(ctx, true); err != nil { return nil, err } @@ -41,22 +46,23 @@ func GetDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s ctx = winlayers.UseWindowsLayerMode(ctx) } - return getDiffPairs(ctx, contentStore, snapshotter, differ, ref, createBlobs) + return getDiffPairs(ctx, contentStore, differ, ref, createBlobs) } -func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter snapshot.Snapshotter, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) { +func getDiffPairs(ctx context.Context, contentStore content.Store, differ diff.Comparer, ref cache.ImmutableRef, createBlobs bool) ([]DiffPair, error) { if ref == nil { return nil, nil } + baseCtx := ctx eg, ctx := errgroup.WithContext(ctx) var diffPairs []DiffPair - var currentPair DiffPair + var currentDescr ocispec.Descriptor parent := ref.Parent() if parent != nil { defer parent.Release(context.TODO()) eg.Go(func() error { - dp, err := getDiffPairs(ctx, contentStore, snapshotter, differ, parent, createBlobs) + dp, err := getDiffPairs(ctx, contentStore, differ, parent, createBlobs) if err != nil { return err } @@ -66,12 +72,9 @@ func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s } eg.Go(func() error { dp, err := g.Do(ctx, ref.ID(), func(ctx context.Context) (interface{}, error) { - diffID, blob, err := snapshotter.GetBlob(ctx, ref.ID()) - if err != nil { - return nil, err - } - if blob != "" { - return DiffPair{DiffID: diffID, Blobsum: blob}, nil + refInfo := ref.Info() + if refInfo.Blob != "" { + return nil, nil } else if !createBlobs { return nil, errors.WithStack(ErrNoBlobs) } @@ -107,9 +110,6 @@ func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s descr, err := differ.Compare(ctx, lower, upper, diff.WithMediaType(ocispec.MediaTypeImageLayerGzip), diff.WithReference(ref.ID()), - diff.WithLabels(map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - }), ) if err != nil { return nil, err @@ -118,30 +118,37 @@ func getDiffPairs(ctx context.Context, contentStore content.Store, snapshotter s if err != nil { return nil, err } - diffIDStr, ok := info.Labels[containerdUncompressed] - if !ok { + if diffID, ok := info.Labels[containerdUncompressed]; !ok { return nil, errors.Errorf("invalid differ response with no diffID: %v", descr.Digest) + } else { + if descr.Annotations == nil { + descr.Annotations = map[string]string{} + } + descr.Annotations[containerdUncompressed] = diffID } - diffIDDigest, err := digest.Parse(diffIDStr) - if err != nil { - return nil, err - } - if err := snapshotter.SetBlob(ctx, ref.ID(), diffIDDigest, descr.Digest); err != nil { - return nil, err - } - return DiffPair{DiffID: diffIDDigest, Blobsum: descr.Digest}, nil + return descr, nil + }) if err != nil { return err } - currentPair = dp.(DiffPair) + + if dp != nil { + currentDescr = dp.(ocispec.Descriptor) + } return nil }) err := eg.Wait() if err != nil { return nil, err } - return append(diffPairs, currentPair), nil + if currentDescr.Digest != "" { + if err := ref.SetBlob(baseCtx, currentDescr); err != nil { + return nil, err + } + } + refInfo := ref.Info() + return append(diffPairs, DiffPair{DiffID: refInfo.DiffID, Blobsum: refInfo.Blob}), nil } func isTypeWindows(ref cache.ImmutableRef) bool { diff --git a/cache/contenthash/checksum_test.go b/cache/contenthash/checksum_test.go index 5813da3ad225..1a15e20f8649 100644 --- a/cache/contenthash/checksum_test.go +++ b/cache/contenthash/checksum_test.go @@ -11,16 +11,21 @@ import ( "testing" "time" + "github.com/containerd/containerd/content/local" + ctdmetadata "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/native" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/snapshot" + containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/util/leaseutil" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/tonistiigi/fsutil" fstypes "github.com/tonistiigi/fsutil/types" + bolt "go.etcd.io/bbolt" ) const ( @@ -37,7 +42,7 @@ func TestChecksumSymlinkNoParentScan(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -66,7 +71,7 @@ func TestChecksumHardlinks(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -149,7 +154,7 @@ func TestChecksumWildcard(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -204,7 +209,7 @@ func TestSymlinksNoFollow(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -263,7 +268,7 @@ func TestChecksumBasicFile(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -413,7 +418,7 @@ func TestHandleChange(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -491,7 +496,7 @@ func TestHandleRecursiveDir(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -540,7 +545,7 @@ func TestChecksumUnorderedFiles(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -593,7 +598,7 @@ func TestSymlinkInPathScan(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -624,7 +629,7 @@ func TestSymlinkNeedsScan(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -657,7 +662,7 @@ func TestSymlinkAbsDirSuffix(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -684,7 +689,7 @@ func TestSymlinkThroughParent(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -739,7 +744,7 @@ func TestSymlinkInPathHandleChange(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, _ := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -802,7 +807,7 @@ func TestPersistence(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := setupCacheManager(t, tmpdir, "native", snapshotter) + cm, closeBolt := setupCacheManager(t, tmpdir, "native", snapshotter) defer cm.Close() ch := []string{ @@ -838,8 +843,10 @@ func TestPersistence(t *testing.T) { // we can't close snapshotter and open it twice (especially, its internal bbolt store) cm.Close() + closeBolt() getDefaultManager().lru.Purge() - cm = setupCacheManager(t, tmpdir, "native", snapshotter) + cm, closeBolt = setupCacheManager(t, tmpdir, "native", snapshotter) + defer closeBolt() defer cm.Close() ref, err = cm.Get(context.TODO(), id) @@ -872,17 +879,32 @@ func createRef(t *testing.T, cm cache.Manager, files []string) cache.ImmutableRe return ref } -func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snapshotter snapshots.Snapshotter) cache.Manager { +func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snapshotter snapshots.Snapshotter) (cache.Manager, func()) { md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) require.NoError(t, err) + store, err := local.NewStore(tmpdir) + require.NoError(t, err) + + db, err := bolt.Open(filepath.Join(tmpdir, "containerdmeta.db"), 0644, nil) + require.NoError(t, err) + + mdb := ctdmetadata.NewDB(db, store, map[string]snapshots.Snapshotter{ + snapshotterName: snapshotter, + }) + cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, snapshotter, nil), - MetadataStore: md, + Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter(snapshotterName)), nil), + MetadataStore: md, + LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), + ContentStore: mdb.ContentStore(), + GarbageCollect: mdb.GarbageCollect, }) require.NoError(t, err) - return cm + return cm, func() { + db.Close() + } } // these test helpers are from tonistiigi/fsutil diff --git a/cache/manager.go b/cache/manager.go index ebf12e310d75..b801da86043d 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -6,13 +6,19 @@ import ( "sync" "time" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/diff" "github.com/containerd/containerd/filters" - "github.com/containerd/containerd/snapshots" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/leases" "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/snapshot" + "github.com/opencontainers/go-digest" + imagespecidentity "github.com/opencontainers/image-spec/identity" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -25,15 +31,20 @@ var ( ) type ManagerOpt struct { - Snapshotter snapshot.SnapshotterBase + Snapshotter snapshot.Snapshotter MetadataStore *metadata.Store + ContentStore content.Store + LeaseManager leases.Manager PruneRefChecker ExternalRefCheckerFunc + GarbageCollect func(ctx context.Context) (gc.Stats, error) + Applier diff.Applier } type Accessor interface { + GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) - GetFromSnapshotter(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) - New(ctx context.Context, s ImmutableRef, opts ...RefOption) (MutableRef, error) + + New(ctx context.Context, parent ImmutableRef, opts ...RefOption) (MutableRef, error) GetMutable(ctx context.Context, id string) (MutableRef, error) // Rebase? IdentityMapping() *idtools.IdentityMapping Metadata(string) *metadata.StorageItem @@ -53,7 +64,7 @@ type Manager interface { type ExternalRefCheckerFunc func() (ExternalRefChecker, error) type ExternalRefChecker interface { - Exists(key string) bool + Exists(string, []digest.Digest) bool } type cacheManager struct { @@ -81,6 +92,155 @@ func NewManager(opt ManagerOpt) (Manager, error) { return cm, nil } +func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, err error) { + diffID, err := diffIDFromDescriptor(desc) + if err != nil { + return nil, err + } + chainID := diffID + blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID}) + + if _, err := cm.ContentStore.Info(ctx, desc.Digest); err != nil { + return nil, errors.Wrapf(err, "failed to get blob %s", desc.Digest) + } + + var p *immutableRef + var parentID string + if parent != nil { + pInfo := parent.Info() + if pInfo.ChainID == "" || pInfo.BlobChainID == "" { + return nil, errors.Errorf("failed to get ref by blob on non-adressable parent") + } + chainID = imagespecidentity.ChainID([]digest.Digest{pInfo.ChainID, chainID}) + blobChainID = imagespecidentity.ChainID([]digest.Digest{pInfo.BlobChainID, blobChainID}) + p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed) + if err != nil { + return nil, err + } + if err := p2.Finalize(ctx, true); err != nil { + return nil, err + } + parentID = p2.ID() + p = p2.(*immutableRef) + } + + releaseParent := false + defer func() { + if releaseParent || err != nil && p != nil { + p.Release(context.TODO()) + } + }() + + cm.mu.Lock() + defer cm.mu.Unlock() + + sis, err := cm.MetadataStore.Search("blobchainid:" + blobChainID.String()) + if err != nil { + return nil, err + } + + for _, si := range sis { + ref, err := cm.get(ctx, si.ID(), opts...) + if err != nil && errors.Cause(err) != errNotFound { + return nil, errors.Wrapf(err, "failed to get record %s by blobchainid", si.ID()) + } + if p != nil { + releaseParent = true + } + return ref, nil + } + + sis, err = cm.MetadataStore.Search("chainid:" + chainID.String()) + if err != nil { + return nil, err + } + + var link ImmutableRef + for _, si := range sis { + ref, err := cm.get(ctx, si.ID(), opts...) + if err != nil && errors.Cause(err) != errNotFound { + return nil, errors.Wrapf(err, "failed to get record %s by chainid", si.ID()) + } + link = ref + break + } + + id := identity.NewID() + snapshotID := chainID.String() + blobOnly := true + if link != nil { + snapshotID = getSnapshotID(link.Metadata()) + blobOnly = getBlobOnly(link.Metadata()) + go link.Release(context.TODO()) + } + + l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = id + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }) + if err != nil { + return nil, errors.Wrap(err, "failed to create lease") + } + + defer func() { + if err != nil { + if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{ + ID: l.ID, + }); err != nil { + logrus.Errorf("failed to remove lease: %+v", err) + } + } + }() + + if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{ + ID: snapshotID, + Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(), + }); err != nil { + return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id) + } + + if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{ + ID: desc.Digest.String(), + Type: "content", + }); err != nil { + return nil, errors.Wrapf(err, "failed to add blob %s to lease", id) + } + + md, _ := cm.md.Get(id) + + rec := &cacheRecord{ + mu: &sync.Mutex{}, + cm: cm, + refs: make(map[ref]struct{}), + parent: p, + md: md, + } + + if err := initializeMetadata(rec, parentID, opts...); err != nil { + return nil, err + } + + queueDiffID(rec.md, diffID.String()) + queueBlob(rec.md, desc.Digest.String()) + queueChainID(rec.md, chainID.String()) + queueBlobChainID(rec.md, blobChainID.String()) + queueSnapshotID(rec.md, snapshotID) + queueBlobOnly(rec.md, blobOnly) + queueMediaType(rec.md, desc.MediaType) + queueCommitted(rec.md) + + if err := rec.md.Commit(); err != nil { + return nil, err + } + + cm.records[id] = rec + + return rec.ref(true), nil +} + // init loads all snapshots from metadata state and tries to load the records // from the snapshotter. If snaphot can't be found, metadata is deleted as well. func (cm *cacheManager) init(ctx context.Context) error { @@ -90,10 +250,10 @@ func (cm *cacheManager) init(ctx context.Context) error { } for _, si := range items { - if _, err := cm.getRecord(ctx, si.ID(), false); err != nil { - logrus.Debugf("could not load snapshot %s: %v", si.ID(), err) + if _, err := cm.getRecord(ctx, si.ID()); err != nil { + logrus.Debugf("could not load snapshot %s: %+v", si.ID(), err) cm.md.Clear(si.ID()) - // TODO: make sure content is deleted as well + cm.LeaseManager.Delete(ctx, leases.Lease{ID: si.ID()}) } } return nil @@ -115,14 +275,7 @@ func (cm *cacheManager) Close() error { func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) { cm.mu.Lock() defer cm.mu.Unlock() - return cm.get(ctx, id, false, opts...) -} - -// Get returns an immutable snapshot reference for ID -func (cm *cacheManager) GetFromSnapshotter(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) { - cm.mu.Lock() - defer cm.mu.Unlock() - return cm.get(ctx, id, true, opts...) + return cm.get(ctx, id, opts...) } func (cm *cacheManager) Metadata(id string) *metadata.StorageItem { @@ -136,8 +289,8 @@ func (cm *cacheManager) Metadata(id string) *metadata.StorageItem { } // get requires manager lock to be taken -func (cm *cacheManager) get(ctx context.Context, id string, fromSnapshotter bool, opts ...RefOption) (*immutableRef, error) { - rec, err := cm.getRecord(ctx, id, fromSnapshotter, opts...) +func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (*immutableRef, error) { + rec, err := cm.getRecord(ctx, id, opts...) if err != nil { return nil, err } @@ -165,7 +318,7 @@ func (cm *cacheManager) get(ctx context.Context, id string, fromSnapshotter bool } // getRecord returns record for id. Requires manager lock. -func (cm *cacheManager) getRecord(ctx context.Context, id string, fromSnapshotter bool, opts ...RefOption) (cr *cacheRecord, retErr error) { +func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (cr *cacheRecord, retErr error) { if rec, ok := cm.records[id]; ok { if rec.isDead() { return nil, errors.Wrapf(errNotFound, "failed to get dead record %s", id) @@ -174,11 +327,11 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, fromSnapshotte } md, ok := cm.md.Get(id) - if !ok && !fromSnapshotter { - return nil, errors.WithStack(errNotFound) + if !ok { + return nil, errors.Wrapf(errNotFound, "%s not found", id) } if mutableID := getEqualMutable(md); mutableID != "" { - mutable, err := cm.getRecord(ctx, mutableID, fromSnapshotter) + mutable, err := cm.getRecord(ctx, mutableID) if err != nil { // check loading mutable deleted record from disk if errors.Cause(err) == errNotFound { @@ -199,14 +352,10 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, fromSnapshotte return rec, nil } - info, err := cm.Snapshotter.Stat(ctx, id) - if err != nil { - return nil, errors.Wrap(errNotFound, err.Error()) - } - var parent *immutableRef - if info.Parent != "" { - parent, err = cm.get(ctx, info.Parent, fromSnapshotter, append(opts, NoUpdateLastUsed)...) + if parentID := getParent(md); parentID != "" { + var err error + parent, err = cm.get(ctx, parentID, append(opts, NoUpdateLastUsed)...) if err != nil { return nil, err } @@ -221,7 +370,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, fromSnapshotte rec := &cacheRecord{ mu: &sync.Mutex{}, - mutable: info.Kind != snapshots.KindCommitted, + mutable: !getCommitted(md), cm: cm, refs: make(map[ref]struct{}), parent: parent, @@ -236,7 +385,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, fromSnapshotte return nil, errors.Wrapf(errNotFound, "failed to get deleted record %s", id) } - if err := initializeMetadata(rec, opts...); err != nil { + if err := initializeMetadata(rec, getParent(md), opts...); err != nil { return nil, err } @@ -244,11 +393,12 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, fromSnapshotte return rec, nil } -func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOption) (MutableRef, error) { +func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOption) (mr MutableRef, err error) { id := identity.NewID() var parent *immutableRef var parentID string + var parentSnapshotID string if s != nil { p, err := cm.Get(ctx, s.ID(), NoUpdateLastUsed) if err != nil { @@ -257,14 +407,46 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti if err := p.Finalize(ctx, true); err != nil { return nil, err } - parentID = p.ID() parent = p.(*immutableRef) + parentSnapshotID = getSnapshotID(parent.md) + parentID = parent.ID() } - if err := cm.Snapshotter.Prepare(ctx, id, parentID); err != nil { - if parent != nil { + defer func() { + if err != nil && parent != nil { parent.Release(context.TODO()) } + }() + + l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = id + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }) + if err != nil { + return nil, errors.Wrap(err, "failed to create lease") + } + + defer func() { + if err != nil { + if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{ + ID: l.ID, + }); err != nil { + logrus.Errorf("failed to remove lease: %+v", err) + } + } + }() + + if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{ + ID: id, + Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(), + }); err != nil { + return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id) + } + + if err := cm.Snapshotter.Prepare(ctx, id, parentSnapshotID); err != nil { return nil, errors.Wrapf(err, "failed to prepare %s", id) } @@ -279,10 +461,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, opts ...RefOpti md: md, } - if err := initializeMetadata(rec, opts...); err != nil { - if parent != nil { - parent.Release(context.TODO()) - } + if err := initializeMetadata(rec, parentID, opts...); err != nil { return nil, err } @@ -297,7 +476,7 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, cm.mu.Lock() defer cm.mu.Unlock() - rec, err := cm.getRecord(ctx, id, false) + rec, err := cm.getRecord(ctx, id) if err != nil { return nil, err } @@ -328,13 +507,22 @@ func (cm *cacheManager) GetMutable(ctx context.Context, id string) (MutableRef, func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error { cm.muPrune.Lock() - defer cm.muPrune.Unlock() for _, opt := range opts { if err := cm.pruneOnce(ctx, ch, opt); err != nil { + cm.muPrune.Unlock() return err } } + + cm.muPrune.Unlock() + + if cm.GarbageCollect != nil { + if _, err := cm.GarbageCollect(ctx); err != nil { + return err + } + } + return nil } @@ -360,10 +548,8 @@ func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo, return err } for _, ui := range du { - if check != nil { - if check.Exists(ui.ID) { - continue - } + if ui.Shared { + continue } totalSize += ui.Size } @@ -418,7 +604,7 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt shared := false if opt.checkShared != nil { - shared = opt.checkShared.Exists(cr.ID()) + shared = opt.checkShared.Exists(cr.ID(), cr.parentChain()) } if !opt.all { @@ -577,7 +763,7 @@ func (cm *cacheManager) markShared(m map[string]*cacheUsageInfo) error { if m[id].shared { continue } - if b := c.Exists(id); b { + if b := c.Exists(id, m[id].parentChain); b { markAllParentsShared(id) } } @@ -596,6 +782,7 @@ type cacheUsageInfo struct { doubleRef bool recordType client.UsageRecordType shared bool + parentChain []digest.Digest } func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) { @@ -628,6 +815,7 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) description: GetDescription(cr.md), doubleRef: cr.equalImmutable != nil, recordType: GetRecordType(cr), + parentChain: cr.parentChain(), } if c.recordType == "" { c.recordType = client.UsageRecordTypeRegular @@ -769,12 +957,16 @@ func WithCreationTime(tm time.Time) RefOption { } } -func initializeMetadata(m withMetadata, opts ...RefOption) error { +func initializeMetadata(m withMetadata, parent string, opts ...RefOption) error { md := m.Metadata() if tm := GetCreatedAt(md); !tm.IsZero() { return nil } + if err := queueParent(md, parent); err != nil { + return err + } + if err := queueCreatedAt(md, time.Now()); err != nil { return err } @@ -882,3 +1074,15 @@ func sortDeleteRecords(toDelete []*deleteRecord) { float64(toDelete[j].usageCountIndex)/float64(maxUsageCountIndex) }) } + +func diffIDFromDescriptor(desc ocispec.Descriptor) (digest.Digest, error) { + diffIDStr, ok := desc.Annotations["containerd.io/uncompressed"] + if !ok { + return "", errors.Errorf("missing uncompressed annotation for %s", desc.Digest) + } + diffID, err := digest.Parse(diffIDStr) + if err != nil { + return "", errors.Wrapf(err, "failed to parse diffID %q for %s", diffIDStr, desc.Digest) + } + return diffID, nil +} diff --git a/cache/manager_test.go b/cache/manager_test.go index 6b47cc404225..00fa51aa0599 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -1,25 +1,143 @@ package cache import ( + "archive/tar" + "bytes" + "compress/gzip" "context" - "fmt" + "io" "io/ioutil" "os" "path/filepath" "testing" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/diff/apply" + "github.com/containerd/containerd/leases" + ctdmetadata "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/native" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/snapshot" + containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/util/leaseutil" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" ) +type cmOpt struct { + snapshotterName string + snapshotter snapshots.Snapshotter + tmpdir string +} + +type cmOut struct { + manager Manager + lm leases.Manager + cs content.Store +} + +func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() error, err error) { + ns, ok := namespaces.Namespace(ctx) + if !ok { + return nil, nil, errors.Errorf("namespace required for test") + } + + if opt.snapshotterName == "" { + opt.snapshotterName = "native" + } + + tmpdir, err := ioutil.TempDir("", "cachemanager") + if err != nil { + return nil, nil, err + } + + defers := make([]func() error, 0) + cleanup = func() error { + var err error + for i := range defers { + if err1 := defers[len(defers)-1-i](); err1 != nil && err == nil { + err = err1 + } + } + return err + } + defer func() { + if err != nil { + cleanup() + } + }() + if opt.tmpdir == "" { + defers = append(defers, func() error { + return os.RemoveAll(tmpdir) + }) + } else { + os.RemoveAll(tmpdir) + tmpdir = opt.tmpdir + } + + if opt.snapshotter == nil { + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + if err != nil { + return nil, nil, err + } + opt.snapshotter = snapshotter + } + + md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + if err != nil { + return nil, nil, err + } + + store, err := local.NewStore(tmpdir) + if err != nil { + return nil, nil, err + } + + db, err := bolt.Open(filepath.Join(tmpdir, "containerdmeta.db"), 0644, nil) + if err != nil { + return nil, nil, err + } + defers = append(defers, func() error { + return db.Close() + }) + + mdb := ctdmetadata.NewDB(db, store, map[string]snapshots.Snapshotter{ + opt.snapshotterName: opt.snapshotter, + }) + if err := mdb.Init(context.TODO()); err != nil { + return nil, nil, err + } + + lm := ctdmetadata.NewLeaseManager(mdb) + + cm, err := NewManager(ManagerOpt{ + Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil), + MetadataStore: md, + ContentStore: mdb.ContentStore(), + LeaseManager: leaseutil.WithNamespace(lm, ns), + GarbageCollect: mdb.GarbageCollect, + Applier: apply.NewFileSystemApplier(mdb.ContentStore()), + }) + if err != nil { + return nil, nil, err + } + return &cmOut{ + manager: cm, + lm: lm, + cs: mdb.ContentStore(), + }, cleanup, nil +} + func TestManager(t *testing.T) { t.Parallel() + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") tmpdir, err := ioutil.TempDir("", "cachemanager") @@ -28,7 +146,15 @@ func TestManager(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := getCacheManager(t, tmpdir, "native", snapshotter) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + + defer cleanup() + cm := co.manager _, err = cm.Get(ctx, "foobar") require.Error(t, err) @@ -142,6 +268,409 @@ func TestManager(t *testing.T) { require.Equal(t, 0, len(dirs)) } +func TestSnapshotExtract(t *testing.T) { + t.Parallel() + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + + defer cleanup() + + cm := co.manager + + b, desc, err := mapToBlob(map[string]string{"foo": "bar"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc) + require.NoError(t, err) + + snap, err := cm.GetByBlob(ctx, desc, nil) + require.NoError(t, err) + + require.Equal(t, false, snap.Info().Extracted) + + b2, desc2, err := mapToBlob(map[string]string{"foo": "bar123"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b2), desc2) + require.NoError(t, err) + + snap2, err := cm.GetByBlob(ctx, desc2, snap) + require.NoError(t, err) + + size, err := snap2.Size(ctx) + require.NoError(t, err) + require.Equal(t, int64(len(b2)), size) + + require.Equal(t, false, snap2.Info().Extracted) + + dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 0, len(dirs)) + + checkNumBlobs(ctx, t, co.cs, 2) + + err = snap2.Extract(ctx) + require.NoError(t, err) + + require.Equal(t, true, snap.Info().Extracted) + require.Equal(t, true, snap2.Info().Extracted) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 2, len(dirs)) + + buf := pruneResultBuffer() + err = cm.Prune(ctx, buf.C, client.PruneInfo{}) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + + require.Equal(t, len(buf.all), 0) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 2, len(dirs)) + + checkNumBlobs(ctx, t, co.cs, 2) + + id := snap.ID() + + err = snap.Release(context.TODO()) + require.NoError(t, err) + + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C, client.PruneInfo{}) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 2, len(dirs)) + + snap, err = cm.Get(ctx, id) + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + + err = snap2.Release(context.TODO()) + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 1, 1) + + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C, client.PruneInfo{}) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 1, 0) + + require.Equal(t, len(buf.all), 1) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 1, len(dirs)) + + checkNumBlobs(ctx, t, co.cs, 1) + + err = snap.Release(context.TODO()) + require.NoError(t, err) + + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C, client.PruneInfo{}) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 0, 0) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 0, len(dirs)) + + checkNumBlobs(ctx, t, co.cs, 0) +} + +func TestExtractOnMutable(t *testing.T) { + t.Parallel() + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + + defer cleanup() + + cm := co.manager + + active, err := cm.New(ctx, nil) + require.NoError(t, err) + + snap, err := active.Commit(ctx) + require.NoError(t, err) + + b, desc, err := mapToBlob(map[string]string{"foo": "bar"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc) + require.NoError(t, err) + + b2, desc2, err := mapToBlob(map[string]string{"foo2": "1"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2) + require.NoError(t, err) + + snap2, err := cm.GetByBlob(ctx, desc2, snap) + require.Error(t, err) + + err = snap.SetBlob(ctx, desc) + require.NoError(t, err) + + snap2, err = cm.GetByBlob(ctx, desc2, snap) + require.NoError(t, err) + + err = snap.Release(context.TODO()) + require.NoError(t, err) + + require.Equal(t, false, snap2.Info().Extracted) + + size, err := snap2.Size(ctx) + require.NoError(t, err) + require.Equal(t, int64(len(b2)), size) + + dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 1, len(dirs)) + + checkNumBlobs(ctx, t, co.cs, 2) + + err = snap2.Extract(ctx) + require.NoError(t, err) + + require.Equal(t, true, snap.Info().Extracted) + require.Equal(t, true, snap2.Info().Extracted) + + buf := pruneResultBuffer() + err = cm.Prune(ctx, buf.C, client.PruneInfo{}) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 2, 0) + + require.Equal(t, len(buf.all), 0) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 2, len(dirs)) + + err = snap2.Release(context.TODO()) + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 0, 2) + + buf = pruneResultBuffer() + err = cm.Prune(ctx, buf.C, client.PruneInfo{}) + buf.close() + require.NoError(t, err) + + checkDiskUsage(ctx, t, cm, 0, 0) + + require.Equal(t, len(buf.all), 2) + + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 0, len(dirs)) + + checkNumBlobs(ctx, t, co.cs, 0) +} + +func TestSetBlob(t *testing.T) { + t.Parallel() + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") + + tmpdir, err := ioutil.TempDir("", "cachemanager") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) + require.NoError(t, err) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + + defer cleanup() + + cm := co.manager + + active, err := cm.New(ctx, nil) + require.NoError(t, err) + + snap, err := active.Commit(ctx) + require.NoError(t, err) + + info := snap.Info() + require.Equal(t, "", string(info.DiffID)) + require.Equal(t, "", string(info.Blob)) + require.Equal(t, "", string(info.ChainID)) + require.Equal(t, "", string(info.BlobChainID)) + require.Equal(t, info.Extracted, true) + + ctx, clean, err := leaseutil.WithLease(ctx, co.lm) + require.NoError(t, err) + defer clean(context.TODO()) + + b, desc, err := mapToBlob(map[string]string{"foo": "bar"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref1", bytes.NewBuffer(b), desc) + require.NoError(t, err) + + err = snap.SetBlob(ctx, ocispec.Descriptor{ + Digest: digest.FromBytes([]byte("foobar")), + Annotations: map[string]string{ + "containerd.io/uncompressed": digest.FromBytes([]byte("foobar2")).String(), + }, + }) + require.Error(t, err) + + err = snap.SetBlob(ctx, desc) + require.NoError(t, err) + + info = snap.Info() + require.Equal(t, desc.Annotations["containerd.io/uncompressed"], string(info.DiffID)) + require.Equal(t, desc.Digest, info.Blob) + require.Equal(t, desc.MediaType, info.MediaType) + require.Equal(t, info.DiffID, info.ChainID) + require.Equal(t, digest.FromBytes([]byte(desc.Digest+" "+info.DiffID)), info.BlobChainID) + require.Equal(t, snap.ID(), info.SnapshotID) + require.Equal(t, info.Extracted, true) + + active, err = cm.New(ctx, snap) + require.NoError(t, err) + + snap2, err := active.Commit(ctx) + require.NoError(t, err) + + b2, desc2, err := mapToBlob(map[string]string{"foo2": "bar2"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref2", bytes.NewBuffer(b2), desc2) + require.NoError(t, err) + + err = snap2.SetBlob(ctx, desc2) + require.NoError(t, err) + + info2 := snap2.Info() + require.Equal(t, desc2.Annotations["containerd.io/uncompressed"], string(info2.DiffID)) + require.Equal(t, desc2.Digest, info2.Blob) + require.Equal(t, desc2.MediaType, info2.MediaType) + require.Equal(t, digest.FromBytes([]byte(info.ChainID+" "+info2.DiffID)), info2.ChainID) + require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc2.Digest+" "+info2.DiffID)))), info2.BlobChainID) + require.Equal(t, snap2.ID(), info2.SnapshotID) + require.Equal(t, info2.Extracted, true) + + b3, desc3, err := mapToBlob(map[string]string{"foo3": "bar3"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref3", bytes.NewBuffer(b3), desc3) + require.NoError(t, err) + + snap3, err := cm.GetByBlob(ctx, desc3, snap) + require.NoError(t, err) + + info3 := snap3.Info() + require.Equal(t, desc3.Annotations["containerd.io/uncompressed"], string(info3.DiffID)) + require.Equal(t, desc3.Digest, info3.Blob) + require.Equal(t, desc3.MediaType, info3.MediaType) + require.Equal(t, digest.FromBytes([]byte(info.ChainID+" "+info3.DiffID)), info3.ChainID) + require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc3.Digest+" "+info3.DiffID)))), info3.BlobChainID) + require.Equal(t, string(info3.ChainID), info3.SnapshotID) + require.Equal(t, info3.Extracted, false) + + // snap4 is same as snap2 + snap4, err := cm.GetByBlob(ctx, desc2, snap) + require.NoError(t, err) + + require.Equal(t, snap2.ID(), snap4.ID()) + + // snap5 is same different blob but same diffID as snap2 + b5, desc5, err := mapToBlob(map[string]string{"foo5": "bar5"}) + require.NoError(t, err) + + desc5.Annotations["containerd.io/uncompressed"] = info2.DiffID.String() + + err = content.WriteBlob(ctx, co.cs, "ref5", bytes.NewBuffer(b5), desc5) + require.NoError(t, err) + + snap5, err := cm.GetByBlob(ctx, desc5, snap) + require.NoError(t, err) + + require.NotEqual(t, snap2.ID(), snap5.ID()) + require.Equal(t, snap2.Info().SnapshotID, snap5.Info().SnapshotID) + require.Equal(t, info2.DiffID, snap5.Info().DiffID) + require.Equal(t, desc5.Digest, snap5.Info().Blob) + + require.Equal(t, snap2.Info().ChainID, snap5.Info().ChainID) + require.NotEqual(t, snap2.Info().BlobChainID, snap5.Info().BlobChainID) + require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc5.Digest+" "+info2.DiffID)))), snap5.Info().BlobChainID) + + // snap6 is a child of snap3 + b6, desc6, err := mapToBlob(map[string]string{"foo6": "bar6"}) + require.NoError(t, err) + + err = content.WriteBlob(ctx, co.cs, "ref6", bytes.NewBuffer(b6), desc6) + require.NoError(t, err) + + snap6, err := cm.GetByBlob(ctx, desc6, snap3) + require.NoError(t, err) + + info6 := snap6.Info() + require.Equal(t, desc6.Annotations["containerd.io/uncompressed"], string(info6.DiffID)) + require.Equal(t, desc6.Digest, info6.Blob) + require.Equal(t, digest.FromBytes([]byte(snap3.Info().ChainID+" "+info6.DiffID)), info6.ChainID) + require.Equal(t, digest.FromBytes([]byte(info3.BlobChainID+" "+digest.FromBytes([]byte(info6.Blob+" "+info6.DiffID)))), info6.BlobChainID) + require.Equal(t, string(info6.ChainID), info6.SnapshotID) + require.Equal(t, info6.Extracted, false) + + _, err = cm.GetByBlob(ctx, ocispec.Descriptor{ + Digest: digest.FromBytes([]byte("notexist")), + Annotations: map[string]string{ + "containerd.io/uncompressed": digest.FromBytes([]byte("notexist")).String(), + }, + }, snap3) + require.Error(t, err) + + clean(context.TODO()) + + //snap.SetBlob() +} + func TestPrune(t *testing.T) { t.Parallel() ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") @@ -152,7 +681,15 @@ func TestPrune(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := getCacheManager(t, tmpdir, "native", snapshotter) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + + defer cleanup() + cm := co.manager active, err := cm.New(ctx, nil) require.NoError(t, err) @@ -168,6 +705,10 @@ func TestPrune(t *testing.T) { checkDiskUsage(ctx, t, cm, 2, 0) + dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + require.NoError(t, err) + require.Equal(t, 2, len(dirs)) + // prune with keeping refs does nothing buf := pruneResultBuffer() err = cm.Prune(ctx, buf.C, client.PruneInfo{}) @@ -177,7 +718,7 @@ func TestPrune(t *testing.T) { checkDiskUsage(ctx, t, cm, 2, 0) require.Equal(t, len(buf.all), 0) - dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) + dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) require.NoError(t, err) require.Equal(t, 2, len(dirs)) @@ -242,6 +783,7 @@ func TestPrune(t *testing.T) { func TestLazyCommit(t *testing.T) { t.Parallel() + ctx := namespaces.WithNamespace(context.Background(), "buildkit-test") tmpdir, err := ioutil.TempDir("", "cachemanager") @@ -250,7 +792,14 @@ func TestLazyCommit(t *testing.T) { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) require.NoError(t, err) - cm := getCacheManager(t, tmpdir, "native", snapshotter) + + co, cleanup, err := newCacheManager(ctx, cmOpt{ + tmpdir: tmpdir, + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + cm := co.manager active, err := cm.New(ctx, nil, CachePolicyRetain) require.NoError(t, err) @@ -331,8 +880,16 @@ func TestLazyCommit(t *testing.T) { err = cm.Close() require.NoError(t, err) + cleanup() + // we can't close snapshotter and open it twice (especially, its internal bbolt store) - cm = getCacheManager(t, tmpdir, "native", snapshotter) + co, cleanup, err = newCacheManager(ctx, cmOpt{ + tmpdir: tmpdir, + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + cm = co.manager snap2, err = cm.Get(ctx, snap.ID()) require.NoError(t, err) @@ -353,7 +910,16 @@ func TestLazyCommit(t *testing.T) { err = cm.Close() require.NoError(t, err) - cm = getCacheManager(t, tmpdir, "native", snapshotter) + cleanup() + + co, cleanup, err = newCacheManager(ctx, cmOpt{ + tmpdir: tmpdir, + snapshotter: snapshotter, + snapshotterName: "native", + }) + require.NoError(t, err) + defer cleanup() + cm = co.manager snap2, err = cm.Get(ctx, snap.ID()) require.NoError(t, err) @@ -369,18 +935,6 @@ func TestLazyCommit(t *testing.T) { require.Equal(t, errNotFound, errors.Cause(err)) } -func getCacheManager(t *testing.T, tmpdir string, snapshotterName string, snapshotter snapshots.Snapshotter) Manager { - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - require.NoError(t, err) - - cm, err := NewManager(ManagerOpt{ - Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, snapshotter, nil), - MetadataStore: md, - }) - require.NoError(t, err, fmt.Sprintf("error: %+v", err)) - return cm -} - func checkDiskUsage(ctx context.Context, t *testing.T, cm Manager, inuse, unused int) { du, err := cm.DiskUsage(ctx, client.DiskUsageInfo{}) require.NoError(t, err) @@ -396,6 +950,16 @@ func checkDiskUsage(ctx context.Context, t *testing.T, cm Manager, inuse, unused require.Equal(t, unused, unusedActual) } +func checkNumBlobs(ctx context.Context, t *testing.T, cs content.Store, expected int) { + c := 0 + err := cs.Walk(ctx, func(_ content.Info) error { + c++ + return nil + }) + require.NoError(t, err) + require.Equal(t, expected, c) +} + func pruneResultBuffer() *buf { b := &buf{C: make(chan client.UsageInfo), closed: make(chan struct{})} go func() { @@ -417,3 +981,36 @@ func (b *buf) close() { close(b.C) <-b.closed } + +func mapToBlob(m map[string]string) ([]byte, ocispec.Descriptor, error) { + buf := bytes.NewBuffer(nil) + gz := gzip.NewWriter(buf) + sha := digest.SHA256.Digester() + tw := tar.NewWriter(io.MultiWriter(sha.Hash(), gz)) + + for k, v := range m { + if err := tw.WriteHeader(&tar.Header{ + Name: k, + Size: int64(len(v)), + }); err != nil { + return nil, ocispec.Descriptor{}, err + } + if _, err := tw.Write([]byte(v)); err != nil { + return nil, ocispec.Descriptor{}, err + } + } + if err := tw.Close(); err != nil { + return nil, ocispec.Descriptor{}, err + } + if err := gz.Close(); err != nil { + return nil, ocispec.Descriptor{}, err + } + return buf.Bytes(), ocispec.Descriptor{ + Digest: digest.FromBytes(buf.Bytes()), + MediaType: ocispec.MediaTypeImageLayerGzip, + Size: int64(buf.Len()), + Annotations: map[string]string{ + "containerd.io/uncompressed": sha.Digest().String(), + }, + }, nil +} diff --git a/cache/metadata.go b/cache/metadata.go index 9929868844c9..bf4041e098a3 100644 --- a/cache/metadata.go +++ b/cache/metadata.go @@ -19,13 +19,203 @@ const keyLastUsedAt = "cache.lastUsedAt" const keyUsageCount = "cache.usageCount" const keyLayerType = "cache.layerType" const keyRecordType = "cache.recordType" +const keyCommitted = "snapshot.committed" +const keyParent = "cache.parent" +const keyDiffID = "cache.diffID" +const keyChainID = "cache.chainID" +const keyBlobChainID = "cache.blobChainID" +const keyBlob = "cache.blob" +const keySnapshot = "cache.snapshot" +const keyBlobOnly = "cache.blobonly" +const keyMediaType = "cache.mediatype" const keyDeleted = "cache.deleted" +func queueDiffID(si *metadata.StorageItem, str string) error { + if str == "" { + return nil + } + v, err := metadata.NewValue(str) + if err != nil { + return errors.Wrap(err, "failed to create diffID value") + } + si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, keyDiffID, v) + }) + return nil +} + +func getMediaType(si *metadata.StorageItem) string { + v := si.Get(keyMediaType) + if v == nil { + return si.ID() + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func queueMediaType(si *metadata.StorageItem, str string) error { + if str == "" { + return nil + } + v, err := metadata.NewValue(str) + if err != nil { + return errors.Wrap(err, "failed to create mediaType value") + } + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, keyMediaType, v) + }) + return nil +} + +func getSnapshotID(si *metadata.StorageItem) string { + v := si.Get(keySnapshot) + if v == nil { + return si.ID() + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func queueSnapshotID(si *metadata.StorageItem, str string) error { + if str == "" { + return nil + } + v, err := metadata.NewValue(str) + if err != nil { + return errors.Wrap(err, "failed to create chainID value") + } + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, keySnapshot, v) + }) + return nil +} + +func getDiffID(si *metadata.StorageItem) string { + v := si.Get(keyDiffID) + if v == nil { + return "" + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func queueChainID(si *metadata.StorageItem, str string) error { + if str == "" { + return nil + } + v, err := metadata.NewValue(str) + if err != nil { + return errors.Wrap(err, "failed to create chainID value") + } + v.Index = "chainid:" + str + si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, keyChainID, v) + }) + return nil +} + +func getBlobChainID(si *metadata.StorageItem) string { + v := si.Get(keyBlobChainID) + if v == nil { + return "" + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func queueBlobChainID(si *metadata.StorageItem, str string) error { + if str == "" { + return nil + } + v, err := metadata.NewValue(str) + if err != nil { + return errors.Wrap(err, "failed to create chainID value") + } + v.Index = "blobchainid:" + str + si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, keyBlobChainID, v) + }) + return nil +} + +func getChainID(si *metadata.StorageItem) string { + v := si.Get(keyChainID) + if v == nil { + return "" + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func queueBlob(si *metadata.StorageItem, str string) error { + if str == "" { + return nil + } + v, err := metadata.NewValue(str) + if err != nil { + return errors.Wrap(err, "failed to create blob value") + } + si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, keyBlob, v) + }) + return nil +} + +func getBlob(si *metadata.StorageItem) string { + v := si.Get(keyBlob) + if v == nil { + return "" + } + var str string + if err := v.Unmarshal(&str); err != nil { + return "" + } + return str +} + +func queueBlobOnly(si *metadata.StorageItem, b bool) error { + v, err := metadata.NewValue(b) + if err != nil { + return errors.Wrap(err, "failed to create blobonly value") + } + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, keyBlobOnly, v) + }) + return nil +} + +func getBlobOnly(si *metadata.StorageItem) bool { + v := si.Get(keyBlobOnly) + if v == nil { + return false + } + var blobOnly bool + if err := v.Unmarshal(&blobOnly); err != nil { + return false + } + return blobOnly +} + func setDeleted(si *metadata.StorageItem) error { v, err := metadata.NewValue(true) if err != nil { - return errors.Wrap(err, "failed to create size value") + return errors.Wrap(err, "failed to create deleted value") } si.Update(func(b *bolt.Bucket) error { return si.SetValue(b, keyDeleted, v) @@ -45,6 +235,55 @@ func getDeleted(si *metadata.StorageItem) bool { return deleted } +func queueCommitted(si *metadata.StorageItem) error { + v, err := metadata.NewValue(true) + if err != nil { + return errors.Wrap(err, "failed to create committed value") + } + si.Queue(func(b *bolt.Bucket) error { + return si.SetValue(b, keyCommitted, v) + }) + return nil +} + +func getCommitted(si *metadata.StorageItem) bool { + v := si.Get(keyCommitted) + if v == nil { + return false + } + var committed bool + if err := v.Unmarshal(&committed); err != nil { + return false + } + return committed +} + +func queueParent(si *metadata.StorageItem, parent string) error { + if parent == "" { + return nil + } + v, err := metadata.NewValue(parent) + if err != nil { + return errors.Wrap(err, "failed to create parent value") + } + si.Update(func(b *bolt.Bucket) error { + return si.SetValue(b, keyParent, v) + }) + return nil +} + +func getParent(si *metadata.StorageItem) string { + v := si.Get(keyParent) + if v == nil { + return "" + } + var parent string + if err := v.Unmarshal(&parent); err != nil { + return "" + } + return parent +} + func setSize(si *metadata.StorageItem, s int64) error { v, err := metadata.NewValue(s) if err != nil { diff --git a/cache/migrate_v2.go b/cache/migrate_v2.go new file mode 100644 index 000000000000..c880862c265d --- /dev/null +++ b/cache/migrate_v2.go @@ -0,0 +1,254 @@ +package cache + +import ( + "context" + "io" + "os" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/snapshots" + "github.com/moby/buildkit/cache/metadata" + "github.com/moby/buildkit/snapshot" + "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func migrateChainID(si *metadata.StorageItem, all map[string]*metadata.StorageItem) (digest.Digest, digest.Digest, error) { + diffID := digest.Digest(getDiffID(si)) + if diffID == "" { + return "", "", nil + } + blobID := digest.Digest(getBlob(si)) + if blobID == "" { + return "", "", nil + } + chainID := digest.Digest(getChainID(si)) + blobChainID := digest.Digest(getBlobChainID(si)) + + if chainID != "" && blobChainID != "" { + return chainID, blobChainID, nil + } + + chainID = diffID + blobChainID = digest.FromBytes([]byte(blobID + " " + diffID)) + + parent := getParent(si) + if parent != "" { + pChainID, pBlobChainID, err := migrateChainID(all[parent], all) + if err != nil { + return "", "", err + } + chainID = digest.FromBytes([]byte(pChainID + " " + chainID)) + blobChainID = digest.FromBytes([]byte(pBlobChainID + " " + blobChainID)) + } + + queueChainID(si, chainID.String()) + queueBlobChainID(si, blobChainID.String()) + + return chainID, blobChainID, si.Commit() +} + +func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapshot.Snapshotter, lm leases.Manager) error { + _, err := os.Stat(to) + if err != nil { + if !os.IsNotExist(errors.Cause(err)) { + return errors.WithStack(err) + } + } else { + return nil + } + + _, err = os.Stat(from) + if err != nil { + if !os.IsNotExist(errors.Cause(err)) { + return errors.WithStack(err) + } + return nil + } + tmpPath := to + ".tmp" + tmpFile, err := os.Create(tmpPath) + if err != nil { + return errors.WithStack(err) + } + src, err := os.Open(from) + if err != nil { + tmpFile.Close() + return errors.WithStack(err) + } + if _, err = io.Copy(tmpFile, src); err != nil { + tmpFile.Close() + src.Close() + return errors.Wrapf(err, "failed to copy db for migration") + } + src.Close() + tmpFile.Close() + + md, err := metadata.NewStore(tmpPath) + if err != nil { + return err + } + + items, err := md.All() + if err != nil { + return err + } + + byID := map[string]*metadata.StorageItem{} + for _, item := range items { + byID[item.ID()] = item + } + + // add committed, parent, snapshot + for id, item := range byID { + em := getEqualMutable(item) + if em == "" { + info, err := s.Stat(ctx, id) + if err != nil { + return err + } + if info.Kind == snapshots.KindCommitted { + queueCommitted(item) + } + if info.Parent != "" { + queueParent(item, info.Parent) + } + } else { + queueCommitted(item) + } + queueSnapshotID(item, id) + item.Commit() + } + + for _, item := range byID { + em := getEqualMutable(item) + if em != "" { + if getParent(item) == "" { + queueParent(item, getParent(byID[em])) + item.Commit() + } + } + } + + type diffPair struct { + Blobsum string + DiffID string + } + // move diffID, blobsum to new location + for _, item := range byID { + v := item.Get("blobmapping.blob") + if v == nil { + continue + } + var blob diffPair + if err := v.Unmarshal(&blob); err != nil { + return errors.WithStack(err) + } + queueDiffID(item, blob.DiffID) + queueBlob(item, blob.Blobsum) + queueMediaType(item, images.MediaTypeDockerSchema2LayerGzip) + if err := item.Commit(); err != nil { + return err + } + + } + + // calculate new chainid/blobsumid + for _, item := range byID { + if _, _, err := migrateChainID(item, byID); err != nil { + return err + } + } + + ctx = context.TODO() // no cancellation allowed pass this point + + // add new leases + for _, item := range byID { + l, err := lm.Create(ctx, func(l *leases.Lease) error { + l.ID = item.ID() + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }) + if err != nil { + // if we are running the migration twice + if errdefs.IsAlreadyExists(err) { + continue + } + return errors.Wrap(err, "failed to create lease") + } + + if err := lm.AddResource(ctx, l, leases.Resource{ + ID: getSnapshotID(item), + Type: "snapshots/" + s.Name(), + }); err != nil { + return errors.Wrapf(err, "failed to add snapshot %s to lease", item.ID()) + } + + if blobID := getBlob(item); blobID != "" { + if err := lm.AddResource(ctx, l, leases.Resource{ + ID: blobID, + Type: "content", + }); err != nil { + return errors.Wrapf(err, "failed to add blob %s to lease", item.ID()) + } + } + } + + // remove old root labels + for _, item := range byID { + if _, err := s.Update(ctx, snapshots.Info{ + Name: getSnapshotID(item), + }, "labels.containerd.io/gc.root"); err != nil { + if !errdefs.IsNotFound(errors.Cause(err)) { + return err + } + } + + if blob := getBlob(item); blob != "" { + if _, err := cs.Update(ctx, content.Info{ + Digest: digest.Digest(blob), + }, "labels.containerd.io/gc.root"); err != nil { + return err + } + } + } + + // previous implementation can leak views, just clean up all views + err = s.Walk(ctx, func(ctx context.Context, info snapshots.Info) error { + if info.Kind == snapshots.KindView { + if _, err := s.Update(ctx, snapshots.Info{ + Name: info.Name, + }, "labels.containerd.io/gc.root"); err != nil { + if !errdefs.IsNotFound(errors.Cause(err)) { + return err + } + } + } + return nil + }) + if err != nil { + return err + } + + // switch to new DB + if err := md.Close(); err != nil { + return err + } + + if err := os.Rename(tmpPath, to); err != nil { + return err + } + + for _, item := range byID { + logrus.Infof("migrated %s parent:%q snapshot:%v committed:%v blob:%v diffid:%v chainID:%v blobChainID:%v", + item.ID(), getParent(item), getSnapshotID(item), getCommitted(item), getBlob(item), getDiffID(item), getChainID(item), getBlobChainID(item)) + } + + return nil +} diff --git a/cache/refs.go b/cache/refs.go index 046f3d8fb7e0..89c260a6e281 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -2,15 +2,24 @@ package cache import ( "context" + "fmt" "strings" "sync" + "time" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/leases" "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/util/flightcontrol" + "github.com/moby/buildkit/util/leaseutil" + "github.com/opencontainers/go-digest" + imagespecidentity "github.com/opencontainers/image-spec/identity" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -30,6 +39,20 @@ type ImmutableRef interface { Parent() ImmutableRef Finalize(ctx context.Context, commit bool) error // Make sure reference is flushed to driver Clone() ImmutableRef + + Info() RefInfo + SetBlob(ctx context.Context, desc ocispec.Descriptor) error + Extract(ctx context.Context) error // +progress +} + +type RefInfo struct { + SnapshotID string + ChainID digest.Digest + BlobChainID digest.Digest + DiffID digest.Digest + Blob digest.Digest + MediaType string + Extracted bool } type MutableRef interface { @@ -65,6 +88,8 @@ type cacheRecord struct { // these are filled if multiple refs point to same data equalMutable *mutableRef equalImmutable *immutableRef + + parentChainCache []digest.Digest } // hold ref lock before calling @@ -81,6 +106,26 @@ func (cr *cacheRecord) mref(triggerLastUsed bool) *mutableRef { return ref } +func (cr *cacheRecord) parentChain() []digest.Digest { + if cr.parentChainCache != nil { + return cr.parentChainCache + } + blob := getBlob(cr.md) + if blob == "" { + return nil + } + + var parent []digest.Digest + if cr.parent != nil { + parent = cr.parent.parentChain() + } + pcc := make([]digest.Digest, len(parent)+1) + copy(pcc, parent) + pcc[len(parent)] = digest.Digest(blob) + cr.parentChainCache = pcc + return pcc +} + // hold ref lock before calling func (cr *cacheRecord) isDead() bool { return cr.dead || (cr.equalImmutable != nil && cr.equalImmutable.dead) || (cr.equalMutable != nil && cr.equalMutable.dead) @@ -99,20 +144,32 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { cr.mu.Unlock() return s, nil } - driverID := cr.ID() + driverID := getSnapshotID(cr.md) if cr.equalMutable != nil { - driverID = cr.equalMutable.ID() + driverID = getSnapshotID(cr.equalMutable.md) } cr.mu.Unlock() - usage, err := cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID) - if err != nil { - cr.mu.Lock() - isDead := cr.isDead() - cr.mu.Unlock() - if isDead { - return int64(0), nil + var usage snapshots.Usage + if !getBlobOnly(cr.md) { + var err error + usage, err = cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID) + if err != nil { + cr.mu.Lock() + isDead := cr.isDead() + cr.mu.Unlock() + if isDead { + return int64(0), nil + } + if !errdefs.IsNotFound(err) { + return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID()) + } + } + } + if dgst := getBlob(cr.md); dgst != "" { + info, err := cr.cm.ContentStore.Info(ctx, digest.Digest(dgst)) + if err == nil { + usage.Size += info.Size } - return s, errors.Wrapf(err, "failed to get usage for %s", cr.ID()) } cr.mu.Lock() setSize(cr.md, usage.Size) @@ -148,7 +205,7 @@ func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) (snapshot.Mount defer cr.mu.Unlock() if cr.mutable { - m, err := cr.cm.Snapshotter.Mounts(ctx, cr.ID()) + m, err := cr.cm.Snapshotter.Mounts(ctx, getSnapshotID(cr.md)) if err != nil { return nil, errors.Wrapf(err, "failed to mount %s", cr.ID()) } @@ -159,7 +216,7 @@ func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) (snapshot.Mount } if cr.equalMutable != nil && readonly { - m, err := cr.cm.Snapshotter.Mounts(ctx, cr.equalMutable.ID()) + m, err := cr.cm.Snapshotter.Mounts(ctx, getSnapshotID(cr.equalMutable.md)) if err != nil { return nil, errors.Wrapf(err, "failed to mount %s", cr.equalMutable.ID()) } @@ -170,12 +227,24 @@ func (cr *cacheRecord) Mount(ctx context.Context, readonly bool) (snapshot.Mount return nil, err } if cr.viewMount == nil { // TODO: handle this better - cr.view = identity.NewID() - m, err := cr.cm.Snapshotter.View(ctx, cr.view, cr.ID()) + view := identity.NewID() + l, err := cr.cm.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = view + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }, leaseutil.MakeTemporary) if err != nil { - cr.view = "" + return nil, err + } + ctx = leases.WithLease(ctx, l.ID) + m, err := cr.cm.Snapshotter.View(ctx, view, getSnapshotID(cr.md)) + if err != nil { + cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: l.ID}) return nil, errors.Wrapf(err, "failed to mount %s", cr.ID()) } + cr.view = view cr.viewMount = m } return cr.viewMount, nil @@ -190,7 +259,7 @@ func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error { } } if removeSnapshot { - if err := cr.cm.Snapshotter.Remove(ctx, cr.ID()); err != nil { + if err := cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}); err != nil { return errors.Wrapf(err, "failed to remove %s", cr.ID()) } } @@ -221,6 +290,134 @@ func (sr *immutableRef) Clone() ImmutableRef { return ref } +func (sr *immutableRef) Info() RefInfo { + return RefInfo{ + ChainID: digest.Digest(getChainID(sr.md)), + DiffID: digest.Digest(getDiffID(sr.md)), + Blob: digest.Digest(getBlob(sr.md)), + MediaType: getMediaType(sr.md), + BlobChainID: digest.Digest(getBlobChainID(sr.md)), + SnapshotID: getSnapshotID(sr.md), + Extracted: !getBlobOnly(sr.md), + } +} + +func (sr *immutableRef) Extract(ctx context.Context) error { + _, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (interface{}, error) { + snapshotID := getSnapshotID(sr.md) + if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil { + queueBlobOnly(sr.md, false) + return nil, sr.md.Commit() + } + + parentID := "" + if sr.parent != nil { + if err := sr.parent.Extract(ctx); err != nil { + return nil, err + } + parentID = getSnapshotID(sr.parent.md) + } + info := sr.Info() + key := fmt.Sprintf("extract-%s %s", identity.NewID(), info.ChainID) + + err := sr.cm.Snapshotter.Prepare(ctx, key, parentID) + if err != nil { + return nil, err + } + + mountable, err := sr.cm.Snapshotter.Mounts(ctx, key) + if err != nil { + return nil, err + } + mounts, unmount, err := mountable.Mount() + if err != nil { + return nil, err + } + _, err = sr.cm.Applier.Apply(ctx, ocispec.Descriptor{ + Digest: info.Blob, + MediaType: info.MediaType, + }, mounts) + if err != nil { + unmount() + return nil, err + } + + if err := unmount(); err != nil { + return nil, err + } + if err := sr.cm.Snapshotter.Commit(ctx, getSnapshotID(sr.md), key); err != nil { + if !errdefs.IsAlreadyExists(err) { + return nil, err + } + } + queueBlobOnly(sr.md, false) + if err := sr.md.Commit(); err != nil { + return nil, err + } + return nil, nil + }) + return err +} + +// SetBlob associates a blob with the cache record. +// A lease must be held for the blob when calling this function +// Caller should call Info() for knowing what current values are actually set +func (sr *immutableRef) SetBlob(ctx context.Context, desc ocispec.Descriptor) error { + diffID, err := diffIDFromDescriptor(desc) + if err != nil { + return err + } + if _, err := sr.cm.ContentStore.Info(ctx, desc.Digest); err != nil { + return err + } + + sr.mu.Lock() + defer sr.mu.Unlock() + + if getChainID(sr.md) != "" { + return nil + } + + if err := sr.finalize(ctx, true); err != nil { + return err + } + + p := sr.parent + var parentChainID digest.Digest + var parentBlobChainID digest.Digest + if p != nil { + pInfo := p.Info() + if pInfo.ChainID == "" || pInfo.BlobChainID == "" { + return errors.Errorf("failed to set blob for reference with non-addressable parent") + } + parentChainID = pInfo.ChainID + parentBlobChainID = pInfo.BlobChainID + } + + if err := sr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: sr.ID()}, leases.Resource{ + ID: desc.Digest.String(), + Type: "content", + }); err != nil { + return err + } + + queueDiffID(sr.md, diffID.String()) + queueBlob(sr.md, desc.Digest.String()) + chainID := diffID + blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID}) + if parentChainID != "" { + chainID = imagespecidentity.ChainID([]digest.Digest{parentChainID, chainID}) + blobChainID = imagespecidentity.ChainID([]digest.Digest{parentBlobChainID, blobChainID}) + } + queueChainID(sr.md, chainID.String()) + queueBlobChainID(sr.md, blobChainID.String()) + queueMediaType(sr.md, desc.MediaType) + if err := sr.md.Commit(); err != nil { + return err + } + return nil +} + func (sr *immutableRef) Release(ctx context.Context) error { sr.cm.mu.Lock() defer sr.cm.mu.Unlock() @@ -259,8 +456,8 @@ func (sr *immutableRef) release(ctx context.Context) error { if len(sr.refs) == 0 { if sr.viewMount != nil { // TODO: release viewMount earlier if possible - if err := sr.cm.Snapshotter.Remove(ctx, sr.view); err != nil { - return errors.Wrapf(err, "failed to remove view %s", sr.view) + if err := sr.cm.LeaseManager.Delete(ctx, leases.Lease{ID: sr.view}); err != nil { + return errors.Wrapf(err, "failed to remove view lease %s", sr.view) } sr.view = "" sr.viewMount = nil @@ -269,7 +466,6 @@ func (sr *immutableRef) release(ctx context.Context) error { if sr.equalMutable != nil { sr.equalMutable.release(ctx) } - // go sr.cm.GC() } return nil @@ -298,18 +494,42 @@ func (cr *cacheRecord) finalize(ctx context.Context, commit bool) error { } return nil } - err := cr.cm.Snapshotter.Commit(ctx, cr.ID(), mutable.ID()) + + _, err := cr.cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = cr.ID() + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }) + if err != nil { + if !errdefs.IsAlreadyExists(err) { // migrator adds leases for everything + return errors.Wrap(err, "failed to create lease") + } + } + + if err := cr.cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.ID()}, leases.Resource{ + ID: cr.ID(), + Type: "snapshots/" + cr.cm.ManagerOpt.Snapshotter.Name(), + }); err != nil { + cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) + return errors.Wrapf(err, "failed to add snapshot %s to lease", cr.ID()) + } + + err = cr.cm.Snapshotter.Commit(ctx, cr.ID(), mutable.ID()) if err != nil { + cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) return errors.Wrapf(err, "failed to commit %s", mutable.ID()) } mutable.dead = true go func() { cr.cm.mu.Lock() defer cr.cm.mu.Unlock() - if err := mutable.remove(context.TODO(), false); err != nil { + if err := mutable.remove(context.TODO(), true); err != nil { logrus.Error(err) } }() + cr.equalMutable = nil clearEqualMutable(cr.md) return cr.md.Commit() @@ -341,7 +561,11 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) { } } - if err := initializeMetadata(rec); err != nil { + parentID := "" + if rec.parent != nil { + parentID = rec.parent.ID() + } + if err := initializeMetadata(rec, parentID); err != nil { return nil, err } @@ -351,6 +575,7 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) { return nil, err } + queueCommitted(md) setSize(md, sizeUnknown) setEqualMutable(md, sr.ID()) if err := md.Commit(); err != nil { @@ -401,11 +626,6 @@ func (sr *mutableRef) release(ctx context.Context) error { return err } } - if sr.parent != nil { - if err := sr.parent.release(ctx); err != nil { - return err - } - } return sr.remove(ctx, true) } else { if sr.updateLastUsed() { diff --git a/client/build_test.go b/client/build_test.go index a0e3d30f7d98..2071e585c2c2 100644 --- a/client/build_test.go +++ b/client/build_test.go @@ -16,11 +16,6 @@ import ( "github.com/stretchr/testify/require" ) -func init() { - integration.InitOCIWorker() - integration.InitContainerdWorker() -} - func TestClientGatewayIntegration(t *testing.T) { integration.Run(t, []integration.Test{ testClientGatewaySolve, diff --git a/exporter/containerimage/export.go b/exporter/containerimage/export.go index c25b14bf25b4..edc6673cfebf 100644 --- a/exporter/containerimage/export.go +++ b/exporter/containerimage/export.go @@ -144,7 +144,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source) src.Metadata[k] = v } - ctx, done, err := leaseutil.WithLease(ctx, e.opt.LeaseManager) + ctx, done, err := leaseutil.WithLease(ctx, e.opt.LeaseManager, leaseutil.MakeTemporary) if err != nil { return nil, err } diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index 71f17f4feb40..225465558e7e 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -100,9 +100,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool idx.MediaType = images.MediaTypeDockerSchema2ManifestList } - labels := map[string]string{} - - for i, p := range p.Platforms { + for _, p := range p.Platforms { r, ok := inp.Refs[p.ID] if !ok { return nil, errors.Errorf("failed to find ref for ID %s", p.ID) @@ -116,8 +114,6 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool dp := p.Platform desc.Platform = &dp idx.Manifests = append(idx.Manifests, *desc) - - labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = desc.Digest.String() } idxBytes, err := json.MarshalIndent(idx, "", " ") @@ -133,18 +129,11 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp exporter.Source, oci bool } idxDone := oneOffProgress(ctx, "exporting manifest list "+idxDigest.String()) - if err := content.WriteBlob(ctx, ic.opt.ContentStore, idxDigest.String(), bytes.NewReader(idxBytes), idxDesc, content.WithLabels(labels)); err != nil { + if err := content.WriteBlob(ctx, ic.opt.ContentStore, idxDigest.String(), bytes.NewReader(idxBytes), idxDesc); err != nil { return nil, idxDone(errors.Wrapf(err, "error writing manifest list blob %s", idxDigest)) } idxDone(nil) - for _, desc := range idx.Manifests { - // delete manifest root. manifest will remain linked to the index - if err := ic.opt.ContentStore.Delete(context.TODO(), desc.Digest); err != nil { - return nil, errors.Wrap(err, "error removing manifest root") - } - } - return &idxDesc, nil } @@ -157,7 +146,7 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, refs ...cache.Immutable for i, ref := range refs { func(i int, ref cache.ImmutableRef) { eg.Go(func() error { - diffPairs, err := blobs.GetDiffPairs(ctx, ic.opt.ContentStore, ic.opt.Snapshotter, ic.opt.Differ, ref, true) + diffPairs, err := blobs.GetDiffPairs(ctx, ic.opt.ContentStore, ic.opt.Differ, ref, true) if err != nil { return errors.Wrap(err, "failed calculating diff pairs for exported snapshot") } @@ -229,11 +218,7 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache }, } - labels := map[string]string{ - "containerd.io/gc.ref.content.0": configDigest.String(), - } - - for i, dp := range diffPairs { + for _, dp := range diffPairs { info, err := ic.opt.ContentStore.Info(ctx, dp.Blobsum) if err != nil { return nil, errors.Wrapf(err, "could not find blob %s from contentstore", dp.Blobsum) @@ -243,7 +228,6 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache Size: info.Size, MediaType: layerType, }) - labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = dp.Blobsum.String() } mfstJSON, err := json.MarshalIndent(mfst, "", " ") @@ -258,7 +242,7 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache } mfstDone := oneOffProgress(ctx, "exporting manifest "+mfstDigest.String()) - if err := content.WriteBlob(ctx, ic.opt.ContentStore, mfstDigest.String(), bytes.NewReader(mfstJSON), mfstDesc, content.WithLabels(labels)); err != nil { + if err := content.WriteBlob(ctx, ic.opt.ContentStore, mfstDigest.String(), bytes.NewReader(mfstJSON), mfstDesc); err != nil { return nil, mfstDone(errors.Wrapf(err, "error writing manifest blob %s", mfstDigest)) } mfstDone(nil) @@ -275,11 +259,6 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, ref cache } configDone(nil) - // delete config root. config will remain linked to the manifest - if err := ic.opt.ContentStore.Delete(context.TODO(), configDigest); err != nil { - return nil, errors.Wrap(err, "error removing config root") - } - return &ocispec.Descriptor{ Digest: mfstDigest, Size: int64(len(mfstJSON)), diff --git a/exporter/oci/export.go b/exporter/oci/export.go index ed0bcfa65f9c..4c1d9218f424 100644 --- a/exporter/oci/export.go +++ b/exporter/oci/export.go @@ -116,7 +116,7 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source) src.Metadata[k] = v } - ctx, done, err := leaseutil.WithLease(ctx, e.opt.LeaseManager) + ctx, done, err := leaseutil.WithLease(ctx, e.opt.LeaseManager, leaseutil.MakeTemporary) if err != nil { return nil, err } diff --git a/hack/dockerfiles/test.buildkit.Dockerfile b/hack/dockerfiles/test.buildkit.Dockerfile index 00fd68cfcf07..9b2b938855d7 100644 --- a/hack/dockerfiles/test.buildkit.Dockerfile +++ b/hack/dockerfiles/test.buildkit.Dockerfile @@ -218,7 +218,7 @@ RUN apt-get install -y --no-install-recommends uidmap sudo vim iptables \ && chown -R user /run/user/1000 /home/user \ && update-alternatives --set iptables /usr/sbin/iptables-legacy # musl is needed to directly use the registry binary that is built on alpine -ENV BUILDKIT_INTEGRATION_CONTAINERD_EXTRA="containerd-1.2=/opt/containerd-old/bin" +#ENV BUILDKIT_INTEGRATION_CONTAINERD_EXTRA="containerd-1.2=/opt/containerd-old/bin" COPY --from=rootlesskit /rootlesskit /usr/bin/ COPY --from=containerd-old /out/containerd* /opt/containerd-old/bin/ COPY --from=registry /bin/registry /usr/bin diff --git a/snapshot/blobmapping/snapshotter.go b/snapshot/blobmapping/snapshotter.go deleted file mode 100644 index b4a16e29c7b5..000000000000 --- a/snapshot/blobmapping/snapshotter.go +++ /dev/null @@ -1,151 +0,0 @@ -package blobmapping - -import ( - "context" - "time" - - "github.com/containerd/containerd/content" - "github.com/containerd/containerd/snapshots" - "github.com/moby/buildkit/cache/metadata" - "github.com/moby/buildkit/snapshot" - digest "github.com/opencontainers/go-digest" - "github.com/sirupsen/logrus" - bolt "go.etcd.io/bbolt" -) - -const blobKey = "blobmapping.blob" - -type Opt struct { - Content content.Store - Snapshotter snapshot.SnapshotterBase - MetadataStore *metadata.Store -} - -type Info struct { - snapshots.Info - Blob string -} - -type DiffPair struct { - Blobsum digest.Digest - DiffID digest.Digest -} - -// this snapshotter keeps an internal mapping between a snapshot and a blob - -type Snapshotter struct { - snapshot.SnapshotterBase - opt Opt -} - -func NewSnapshotter(opt Opt) snapshot.Snapshotter { - s := &Snapshotter{ - SnapshotterBase: opt.Snapshotter, - opt: opt, - } - - return s -} - -// Remove also removes a reference to a blob. If it is a last reference then it deletes it the blob as well -// Remove is not safe to be called concurrently -func (s *Snapshotter) Remove(ctx context.Context, key string) error { - _, blob, err := s.GetBlob(ctx, key) - if err != nil { - return err - } - - blobs, err := s.opt.MetadataStore.Search(index(blob)) - if err != nil { - return err - } - - if err := s.SnapshotterBase.Remove(ctx, key); err != nil { - return err - } - - if len(blobs) == 1 && blobs[0].ID() == key { // last snapshot - if err := s.opt.Content.Delete(ctx, blob); err != nil { - logrus.Errorf("failed to delete blob %v: %+v", blob, err) - } - } - return nil -} - -func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) { - u, err := s.SnapshotterBase.Usage(ctx, key) - if err != nil { - return snapshots.Usage{}, err - } - _, blob, err := s.GetBlob(ctx, key) - if err != nil { - return u, err - } - if blob != "" { - info, err := s.opt.Content.Info(ctx, blob) - if err != nil { - return u, err - } - (&u).Add(snapshots.Usage{Size: info.Size, Inodes: 1}) - } - return u, nil -} - -func (s *Snapshotter) GetBlob(ctx context.Context, key string) (digest.Digest, digest.Digest, error) { - md, _ := s.opt.MetadataStore.Get(key) - v := md.Get(blobKey) - if v == nil { - return "", "", nil - } - var blob DiffPair - if err := v.Unmarshal(&blob); err != nil { - return "", "", err - } - return blob.DiffID, blob.Blobsum, nil -} - -// Validates that there is no blob associated with the snapshot. -// Checks that there is a blob in the content store. -// If same blob has already been set then this is a noop. -func (s *Snapshotter) SetBlob(ctx context.Context, key string, diffID, blobsum digest.Digest) error { - info, err := s.opt.Content.Info(ctx, blobsum) - if err != nil { - return err - } - if _, ok := info.Labels["containerd.io/uncompressed"]; !ok { - labels := map[string]string{ - "containerd.io/uncompressed": diffID.String(), - } - if _, err := s.opt.Content.Update(ctx, content.Info{ - Digest: blobsum, - Labels: labels, - }, "labels.containerd.io/uncompressed"); err != nil { - return err - } - } - // update gc.root cause blob might be held by lease only - if _, err := s.opt.Content.Update(ctx, content.Info{ - Digest: blobsum, - Labels: map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - }, - }, "labels.containerd.io/gc.root"); err != nil { - return err - } - - md, _ := s.opt.MetadataStore.Get(key) - - v, err := metadata.NewValue(DiffPair{DiffID: diffID, Blobsum: blobsum}) - if err != nil { - return err - } - v.Index = index(blobsum) - - return md.Update(func(b *bolt.Bucket) error { - return md.SetValue(b, blobKey, v) - }) -} - -func index(blob digest.Digest) string { - return "blobmap::" + blob.String() -} diff --git a/snapshot/containerd/content.go b/snapshot/containerd/content.go index 5c763851dd89..8e42d4257ebf 100644 --- a/snapshot/containerd/content.go +++ b/snapshot/containerd/content.go @@ -2,25 +2,21 @@ package containerd import ( "context" - "time" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/namespaces" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" ) -type garbageCollectFn func(context.Context) error - -func NewContentStore(store content.Store, ns string, gc func(context.Context) error) content.Store { - return &noGCContentStore{&nsContent{ns, store, gc}} +func NewContentStore(store content.Store, ns string) content.Store { + return &nsContent{ns, store} } type nsContent struct { ns string content.Store - gc garbageCollectFn } func (c *nsContent) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { @@ -39,16 +35,7 @@ func (c *nsContent) Walk(ctx context.Context, fn content.WalkFunc, filters ...st } func (c *nsContent) Delete(ctx context.Context, dgst digest.Digest) error { - ctx = namespaces.WithNamespace(ctx, c.ns) - if _, err := c.Update(ctx, content.Info{ - Digest: dgst, - }, "labels.containerd.io/gc.root"); err != nil { - return err - } // calling snapshotter.Remove here causes a race in containerd - if c.gc == nil { - return nil - } - return c.gc(ctx) + return errors.Errorf("contentstore.Delete usage is forbidden") } func (c *nsContent) Status(ctx context.Context, ref string) (content.Status, error) { @@ -76,31 +63,12 @@ func (c *nsContent) Writer(ctx context.Context, opts ...content.WriterOpt) (cont } func (c *nsContent) writer(ctx context.Context, retries int, opts ...content.WriterOpt) (content.Writer, error) { - var wOpts content.WriterOpts - for _, opt := range opts { - if err := opt(&wOpts); err != nil { - return nil, err - } - } - _, noRoot := wOpts.Desc.Annotations["buildkit/noroot"] - delete(wOpts.Desc.Annotations, "buildkit/noroot") - opts = append(opts, content.WithDescriptor(wOpts.Desc)) ctx = namespaces.WithNamespace(ctx, c.ns) w, err := c.Store.Writer(ctx, opts...) if err != nil { - if !noRoot && errdefs.IsAlreadyExists(err) && wOpts.Desc.Digest != "" && retries > 0 { - _, err2 := c.Update(ctx, content.Info{ - Digest: wOpts.Desc.Digest, - Labels: map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - }, - }, "labels.containerd.io/gc.root") - if err2 != nil { - return c.writer(ctx, retries-1, opts...) - } - } + return nil, err } - return &nsWriter{Writer: w, ns: c.ns}, err + return &nsWriter{Writer: w, ns: c.ns}, nil } type nsWriter struct { @@ -112,26 +80,3 @@ func (w *nsWriter) Commit(ctx context.Context, size int64, expected digest.Diges ctx = namespaces.WithNamespace(ctx, w.ns) return w.Writer.Commit(ctx, size, expected, opts...) } - -type noGCContentStore struct { - content.Store -} -type noGCWriter struct { - content.Writer -} - -func (cs *noGCContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { - w, err := cs.Store.Writer(ctx, opts...) - return &noGCWriter{w}, err -} - -func (w *noGCWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { - opts = append(opts, func(info *content.Info) error { - if info.Labels == nil { - info.Labels = map[string]string{} - } - info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) - return nil - }) - return w.Writer.Commit(ctx, size, expected, opts...) -} diff --git a/snapshot/containerd/snapshotter.go b/snapshot/containerd/snapshotter.go index 62f53f66a2a9..e38e72523887 100644 --- a/snapshot/containerd/snapshotter.go +++ b/snapshot/containerd/snapshotter.go @@ -2,52 +2,39 @@ package containerd import ( "context" - "time" - "github.com/containerd/containerd/content" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - ctdsnapshot "github.com/containerd/containerd/snapshots" + "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/snapshot" - "github.com/moby/buildkit/snapshot/blobmapping" + "github.com/pkg/errors" ) -func NewSnapshotter(name string, snapshotter ctdsnapshot.Snapshotter, store content.Store, mdstore *metadata.Store, ns string, gc func(context.Context) error, idmap *idtools.IdentityMapping) snapshot.Snapshotter { - return blobmapping.NewSnapshotter(blobmapping.Opt{ - Content: store, - Snapshotter: snapshot.FromContainerdSnapshotter(name, &nsSnapshotter{ns, snapshotter, gc}, idmap), - MetadataStore: mdstore, - }) +func NewSnapshotter(name string, snapshotter snapshots.Snapshotter, ns string, idmap *idtools.IdentityMapping) snapshot.Snapshotter { + return snapshot.FromContainerdSnapshotter(name, &nsSnapshotter{ns, snapshotter}, idmap) +} + +func NSSnapshotter(ns string, snapshotter snapshots.Snapshotter) snapshots.Snapshotter { + return &nsSnapshotter{ns: ns, Snapshotter: snapshotter} } type nsSnapshotter struct { ns string - ctdsnapshot.Snapshotter - gc garbageCollectFn + snapshots.Snapshotter } -func (s *nsSnapshotter) Stat(ctx context.Context, key string) (ctdsnapshot.Info, error) { +func (s *nsSnapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) { ctx = namespaces.WithNamespace(ctx, s.ns) - info, err := s.Snapshotter.Stat(ctx, key) - if err == nil { - if _, ok := info.Labels["labels.containerd.io/gc.root"]; !ok { - if err := addRootLabel()(&info); err != nil { - return info, err - } - return s.Update(ctx, info, "labels.containerd.io/gc.root") - } - } - return info, err + return s.Snapshotter.Stat(ctx, key) } -func (s *nsSnapshotter) Update(ctx context.Context, info ctdsnapshot.Info, fieldpaths ...string) (ctdsnapshot.Info, error) { +func (s *nsSnapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) { ctx = namespaces.WithNamespace(ctx, s.ns) return s.Snapshotter.Update(ctx, info, fieldpaths...) } -func (s *nsSnapshotter) Usage(ctx context.Context, key string) (ctdsnapshot.Usage, error) { +func (s *nsSnapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) { ctx = namespaces.WithNamespace(ctx, s.ns) return s.Snapshotter.Usage(ctx, key) } @@ -55,46 +42,22 @@ func (s *nsSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, ctx = namespaces.WithNamespace(ctx, s.ns) return s.Snapshotter.Mounts(ctx, key) } -func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { +func (s *nsSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) { ctx = namespaces.WithNamespace(ctx, s.ns) - return s.Snapshotter.Prepare(ctx, key, parent, addRootLabel(opts...)) + return s.Snapshotter.Prepare(ctx, key, parent, opts...) } -func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...ctdsnapshot.Opt) ([]mount.Mount, error) { +func (s *nsSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) { ctx = namespaces.WithNamespace(ctx, s.ns) - return s.Snapshotter.View(ctx, key, parent, addRootLabel(opts...)) + return s.Snapshotter.View(ctx, key, parent, opts...) } -func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...ctdsnapshot.Opt) error { +func (s *nsSnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { ctx = namespaces.WithNamespace(ctx, s.ns) - return s.Snapshotter.Commit(ctx, name, key, addRootLabel(opts...)) + return s.Snapshotter.Commit(ctx, name, key, opts...) } func (s *nsSnapshotter) Remove(ctx context.Context, key string) error { - ctx = namespaces.WithNamespace(ctx, s.ns) - if _, err := s.Update(ctx, ctdsnapshot.Info{ - Name: key, - }, "labels.containerd.io/gc.root"); err != nil { - return err - } // calling snapshotter.Remove here causes a race in containerd - if s.gc == nil { - return nil - } - return s.gc(ctx) + return errors.Errorf("calling snapshotter.Remove is forbidden") } -func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, ctdsnapshot.Info) error) error { +func (s *nsSnapshotter) Walk(ctx context.Context, fn func(context.Context, snapshots.Info) error) error { ctx = namespaces.WithNamespace(ctx, s.ns) return s.Snapshotter.Walk(ctx, fn) } - -func addRootLabel(opts ...ctdsnapshot.Opt) ctdsnapshot.Opt { - return func(info *ctdsnapshot.Info) error { - for _, opt := range opts { - if err := opt(info); err != nil { - return err - } - } - if info.Labels == nil { - info.Labels = map[string]string{} - } - info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339Nano) - return nil - } -} diff --git a/snapshot/imagerefchecker/checker.go b/snapshot/imagerefchecker/checker.go index 5782cc2ad7f4..a8dd746ed5fa 100644 --- a/snapshot/imagerefchecker/checker.go +++ b/snapshot/imagerefchecker/checker.go @@ -9,7 +9,6 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/snapshot" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -20,27 +19,26 @@ const ( ) type Opt struct { - Snapshotter snapshot.Snapshotter ImageStore images.Store - ContentStore content.Provider + ContentStore content.Store } // New creates new image reference checker that can be used to see if a reference // is being used by any of the images in the image store func New(opt Opt) cache.ExternalRefCheckerFunc { return func() (cache.ExternalRefChecker, error) { - return &checker{opt: opt}, nil + return &Checker{opt: opt}, nil } } -type checker struct { +type Checker struct { opt Opt once sync.Once images map[string]struct{} cache map[string]bool } -func (c *checker) Exists(key string) bool { +func (c *Checker) Exists(key string, blobs []digest.Digest) bool { if c.opt.ImageStore == nil { return false } @@ -51,37 +49,12 @@ func (c *checker) Exists(key string) bool { return b } - l, err := c.getLayers(key) - if err != nil { - c.cache[key] = false - return false - } - - _, ok := c.images[layerKey(l)] + _, ok := c.images[layerKey(blobs)] c.cache[key] = ok return ok } -func (c *checker) getLayers(key string) ([]specs.Descriptor, error) { - _, blob, err := c.opt.Snapshotter.GetBlob(context.TODO(), key) - if err != nil { - return nil, err - } - stat, err := c.opt.Snapshotter.Stat(context.TODO(), key) - if err != nil { - return nil, err - } - var layers []specs.Descriptor - if parent := stat.Parent; parent != "" { - layers, err = c.getLayers(parent) - if err != nil { - return nil, err - } - } - return append(layers, specs.Descriptor{Digest: blob}), nil -} - -func (c *checker) init() { +func (c *Checker) init() { c.images = map[string]struct{}{} c.cache = map[string]bool{} @@ -103,17 +76,25 @@ func (c *checker) init() { } } -func (c *checker) registerLayers(l []specs.Descriptor) { - if k := layerKey(l); k != "" { +func (c *Checker) registerLayers(l []specs.Descriptor) { + if k := layerKey(toDigests(l)); k != "" { c.images[k] = struct{}{} } } -func layerKey(layers []specs.Descriptor) string { +func toDigests(layers []specs.Descriptor) []digest.Digest { + digests := make([]digest.Digest, len(layers)) + for i, l := range layers { + digests[i] = l.Digest + } + return digests +} + +func layerKey(layers []digest.Digest) string { b := &strings.Builder{} for _, l := range layers { - if l.Digest != emptyGZLayer { - b.Write([]byte(l.Digest)) + if l != emptyGZLayer { + b.Write([]byte(l)) } } return b.String() diff --git a/snapshot/snapshotter.go b/snapshot/snapshotter.go index 4e930e901381..f45ced4e72c2 100644 --- a/snapshot/snapshotter.go +++ b/snapshot/snapshotter.go @@ -9,7 +9,6 @@ import ( "github.com/containerd/containerd/mount" "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" - digest "github.com/opencontainers/go-digest" ) type Mountable interface { @@ -18,7 +17,8 @@ type Mountable interface { IdentityMapping() *idtools.IdentityMapping } -type SnapshotterBase interface { +// Snapshotter defines interface that any snapshot implementation should satisfy +type Snapshotter interface { Name() string Mounts(ctx context.Context, key string) (Mountable, error) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) error @@ -34,18 +34,7 @@ type SnapshotterBase interface { IdentityMapping() *idtools.IdentityMapping } -// Snapshotter defines interface that any snapshot implementation should satisfy -type Snapshotter interface { - Blobmapper - SnapshotterBase -} - -type Blobmapper interface { - GetBlob(ctx context.Context, key string) (digest.Digest, digest.Digest, error) - SetBlob(ctx context.Context, key string, diffID, blob digest.Digest) error -} - -func FromContainerdSnapshotter(name string, s snapshots.Snapshotter, idmap *idtools.IdentityMapping) SnapshotterBase { +func FromContainerdSnapshotter(name string, s snapshots.Snapshotter, idmap *idtools.IdentityMapping) Snapshotter { return &fromContainerd{name: name, Snapshotter: s, idmap: idmap} } diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index 4b31701be730..aa817e2d4c40 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "runtime" + "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" @@ -18,6 +19,8 @@ import ( "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/imageutil" + "github.com/moby/buildkit/util/leaseutil" + "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/util/pull" "github.com/moby/buildkit/util/resolver" "github.com/moby/buildkit/util/winlayers" @@ -104,19 +107,20 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *se Src: imageIdentifier.Reference, Resolver: pull.NewResolver(ctx, is.ResolverOpt, sm, is.ImageStore, imageIdentifier.ResolveMode, imageIdentifier.Reference.String()), Platform: &platform, - LeaseManager: is.LeaseManager, } p := &puller{ CacheAccessor: is.CacheAccessor, Puller: pullerUtil, Platform: platform, id: imageIdentifier, + LeaseManager: is.LeaseManager, } return p, nil } type puller struct { CacheAccessor cache.Accessor + LeaseManager leases.Manager Platform specs.Platform id *source.ImageIdentifier *pull.Puller @@ -176,7 +180,7 @@ func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error) return k, true, nil } -func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { +func (p *puller) Snapshot(ctx context.Context) (ir cache.ImmutableRef, err error) { layerNeedsTypeWindows := false if platform := p.Puller.Platform; platform != nil { if platform.OS == "windows" && runtime.GOOS != "windows" { @@ -188,33 +192,65 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { // workaround for gcr, authentication not supported on blob endpoints pull.EnsureManifestRequested(ctx, p.Puller.Resolver, p.Puller.Src.String()) - pulled, err := p.Puller.Pull(ctx) + ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager, leaseutil.MakeTemporary) if err != nil { return nil, err } - if pulled.ChainID == "" { - return nil, nil - } - ref, err := p.CacheAccessor.GetFromSnapshotter(ctx, string(pulled.ChainID), cache.WithDescription("pulled from "+pulled.Ref)) + defer done(ctx) + + pulled, err := p.Puller.Pull(ctx) if err != nil { return nil, err } + if len(pulled.Layers) == 0 { + return nil, nil + } - if layerNeedsTypeWindows && ref != nil { - if err := markRefLayerTypeWindows(ref); err != nil { + extractDone := oneOffProgress(ctx, "unpacking "+pulled.Ref) + var current cache.ImmutableRef + defer func() { + if err != nil && current != nil { + current.Release(context.TODO()) + } + extractDone(err) + }() + for _, l := range pulled.Layers { + ref, err := p.CacheAccessor.GetByBlob(ctx, l, current, cache.WithDescription("pulled from "+pulled.Ref)) + if err != nil { + return nil, err + } + if err := ref.Extract(ctx); err != nil { ref.Release(context.TODO()) return nil, err } + if current != nil { + current.Release(context.TODO()) + } + current = ref + } + + for _, desc := range pulled.MetadataBlobs { + if err := p.LeaseManager.AddResource(ctx, leases.Lease{ID: current.ID()}, leases.Resource{ + ID: desc.Digest.String(), + Type: "content", + }); err != nil { + return nil, err + } } - if p.id.RecordType != "" && cache.GetRecordType(ref) == "" { - if err := cache.SetRecordType(ref, p.id.RecordType); err != nil { - ref.Release(context.TODO()) + if layerNeedsTypeWindows && current != nil { + if err := markRefLayerTypeWindows(current); err != nil { return nil, err } } - return ref, nil + if p.id.RecordType != "" && cache.GetRecordType(current) == "" { + if err := cache.SetRecordType(current, p.id.RecordType); err != nil { + return nil, err + } + } + + return current, nil } func markRefLayerTypeWindows(ref cache.ImmutableRef) error { @@ -240,3 +276,20 @@ func cacheKeyFromConfig(dt []byte) digest.Digest { } return identity.ChainID(img.RootFS.DiffIDs) } + +func oneOffProgress(ctx context.Context, id string) func(err error) error { + pw, _, _ := progress.FromContext(ctx) + now := time.Now() + st := progress.Status{ + Started: &now, + } + pw.Write(id, st) + return func(err error) error { + // TODO: set error on status + now := time.Now() + st.Completed = &now + pw.Write(id, st) + pw.Close() + return err + } +} diff --git a/source/git/gitsource_test.go b/source/git/gitsource_test.go index f5cf1ac7bf42..432adcf62376 100644 --- a/source/git/gitsource_test.go +++ b/source/git/gitsource_test.go @@ -9,15 +9,21 @@ import ( "strings" "testing" + "github.com/containerd/containerd/content/local" + ctdmetadata "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/native" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/snapshot" + containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/source" + "github.com/moby/buildkit/util/leaseutil" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" ) func TestRepeatedFetch(t *testing.T) { @@ -311,11 +317,24 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source { md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) assert.NoError(t, err) + store, err := local.NewStore(tmpdir) + require.NoError(t, err) + + db, err := bolt.Open(filepath.Join(tmpdir, "containerdmeta.db"), 0644, nil) + require.NoError(t, err) + + mdb := ctdmetadata.NewDB(db, store, map[string]snapshots.Snapshotter{ + "native": snapshotter, + }) + cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: snapshot.FromContainerdSnapshotter("native", snapshotter, nil), - MetadataStore: md, + Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil), + MetadataStore: md, + LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), + ContentStore: mdb.ContentStore(), + GarbageCollect: mdb.GarbageCollect, }) - assert.NoError(t, err) + require.NoError(t, err) gs, err := NewSource(Opt{ CacheAccessor: cm, diff --git a/source/http/httpsource_test.go b/source/http/httpsource_test.go index 159c4e5a8269..a9688ce1b148 100644 --- a/source/http/httpsource_test.go +++ b/source/http/httpsource_test.go @@ -7,15 +7,21 @@ import ( "path/filepath" "testing" + "github.com/containerd/containerd/content/local" + ctdmetadata "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/native" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/snapshot" + containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/source" + "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/testutil/httpserver" digest "github.com/opencontainers/go-digest" "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" ) func TestHTTPSource(t *testing.T) { @@ -314,9 +320,26 @@ func newHTTPSource(tmpdir string) (source.Source, error) { return nil, err } + store, err := local.NewStore(tmpdir) + if err != nil { + return nil, err + } + + db, err := bolt.Open(filepath.Join(tmpdir, "containerdmeta.db"), 0644, nil) + if err != nil { + return nil, err + } + + mdb := ctdmetadata.NewDB(db, store, map[string]snapshots.Snapshotter{ + "native": snapshotter, + }) + cm, err := cache.NewManager(cache.ManagerOpt{ - Snapshotter: snapshot.FromContainerdSnapshotter("native", snapshotter, nil), - MetadataStore: md, + Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil), + MetadataStore: md, + LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), + ContentStore: mdb.ContentStore(), + GarbageCollect: mdb.GarbageCollect, }) if err != nil { return nil, err diff --git a/util/imageutil/config.go b/util/imageutil/config.go index 07a5846677fc..9568864a5522 100644 --- a/util/imageutil/config.go +++ b/util/imageutil/config.go @@ -3,7 +3,6 @@ package imageutil import ( "context" "encoding/json" - "fmt" "sync" "time" @@ -50,7 +49,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co } if leaseManager != nil { - ctx2, done, err := leaseutil.WithLease(ctx, leaseManager, leases.WithExpiration(5*time.Minute)) + ctx2, done, err := leaseutil.WithLease(ctx, leaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary) if err != nil { return "", nil, errors.WithStack(err) } @@ -94,12 +93,9 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co } children := childrenConfigHandler(cache, platform) - if m, ok := cache.(content.Manager); ok { - children = SetChildrenLabelsNonBlobs(m, children) - } handlers := []images.Handler{ - fetchWithoutRoot(remotes.FetchHandler(cache, fetcher)), + remotes.FetchHandler(cache, fetcher), children, } if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { @@ -118,16 +114,6 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co return desc.Digest, dt, nil } -func fetchWithoutRoot(fetch images.HandlerFunc) images.HandlerFunc { - return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { - if desc.Annotations == nil { - desc.Annotations = map[string]string{} - } - desc.Annotations["buildkit/noroot"] = "true" - return fetch(ctx, desc) - } -} - func childrenConfigHandler(provider content.Provider, platform platforms.MatchComparer) images.HandlerFunc { return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { var descs []specs.Descriptor @@ -207,39 +193,3 @@ func DetectManifestBlobMediaType(dt []byte) (string, error) { } return images.MediaTypeDockerSchema2ManifestList, nil } - -func SetChildrenLabelsNonBlobs(manager content.Manager, f images.HandlerFunc) images.HandlerFunc { - return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { - children, err := f(ctx, desc) - if err != nil { - return children, err - } - - if len(children) > 0 { - info := content.Info{ - Digest: desc.Digest, - Labels: map[string]string{}, - } - fields := []string{} - for i, ch := range children { - switch ch.MediaType { - case images.MediaTypeDockerSchema2Layer, images.MediaTypeDockerSchema2LayerGzip, specs.MediaTypeImageLayer, specs.MediaTypeImageLayerGzip: - continue - default: - } - - info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String() - fields = append(fields, fmt.Sprintf("labels.containerd.io/gc.ref.content.%d", i)) - } - - if len(info.Labels) > 0 { - _, err := manager.Update(ctx, info, fields...) - if err != nil { - return nil, err - } - } - } - - return children, err - } -} diff --git a/util/leaseutil/manager.go b/util/leaseutil/manager.go index fe363e0a0227..45a35273a51d 100644 --- a/util/leaseutil/manager.go +++ b/util/leaseutil/manager.go @@ -27,26 +27,49 @@ func WithLease(ctx context.Context, ls leases.Manager, opts ...leases.Opt) (cont }, nil } +func MakeTemporary(l *leases.Lease) error { + if l.Labels == nil { + l.Labels = map[string]string{} + } + l.Labels["buildkit/lease.temporary"] = time.Now().UTC().Format(time.RFC3339Nano) + return nil +} + func WithNamespace(lm leases.Manager, ns string) leases.Manager { - return &nsLM{Manager: lm, ns: ns} + return &nsLM{manager: lm, ns: ns} } type nsLM struct { - leases.Manager - ns string + manager leases.Manager + ns string } func (l *nsLM) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) { ctx = namespaces.WithNamespace(ctx, l.ns) - return l.Manager.Create(ctx, opts...) + return l.manager.Create(ctx, opts...) } func (l *nsLM) Delete(ctx context.Context, lease leases.Lease, opts ...leases.DeleteOpt) error { ctx = namespaces.WithNamespace(ctx, l.ns) - return l.Manager.Delete(ctx, lease, opts...) + return l.manager.Delete(ctx, lease, opts...) } func (l *nsLM) List(ctx context.Context, filters ...string) ([]leases.Lease, error) { ctx = namespaces.WithNamespace(ctx, l.ns) - return l.Manager.List(ctx, filters...) + return l.manager.List(ctx, filters...) +} + +func (l *nsLM) AddResource(ctx context.Context, lease leases.Lease, resource leases.Resource) error { + ctx = namespaces.WithNamespace(ctx, l.ns) + return l.manager.AddResource(ctx, lease, resource) +} + +func (l *nsLM) DeleteResource(ctx context.Context, lease leases.Lease, resource leases.Resource) error { + ctx = namespaces.WithNamespace(ctx, l.ns) + return l.manager.DeleteResource(ctx, lease, resource) +} + +func (l *nsLM) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) { + ctx = namespaces.WithNamespace(ctx, l.ns) + return l.manager.ListResources(ctx, lease) } diff --git a/util/pull/pull.go b/util/pull/pull.go index eaf8770e2934..e22e7f3e5274 100644 --- a/util/pull/pull.go +++ b/util/pull/pull.go @@ -9,20 +9,15 @@ import ( "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/leases" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/reference" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker/schema1" - "github.com/containerd/containerd/rootfs" - ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/util/imageutil" - "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/progress" digest "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/identity" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -33,7 +28,6 @@ type Puller struct { Applier diff.Applier Src reference.Spec Platform *ocispec.Platform - LeaseManager leases.Manager // See NewResolver() Resolver remotes.Resolver resolveOnce sync.Once @@ -43,9 +37,10 @@ type Puller struct { } type Pulled struct { - Ref string - Descriptor ocispec.Descriptor - ChainID digest.Digest + Ref string + Descriptor ocispec.Descriptor + Layers []ocispec.Descriptor + MetadataBlobs []ocispec.Descriptor } func (p *Puller) Resolve(ctx context.Context) (string, ocispec.Descriptor, error) { @@ -98,12 +93,6 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) { platform = platforms.Default() } - ctx, done, err := leaseutil.WithLease(ctx, p.LeaseManager) - if err != nil { - return nil, err - } - defer done(ctx) - ongoing := newJobs(p.ref) pctx, stopProgress := context.WithCancel(ctx) @@ -132,8 +121,6 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) { } else { // Get all the children for a descriptor childrenHandler := images.ChildrenHandler(p.ContentStore) - // Set any children labels for that content - childrenHandler = imageutil.SetChildrenLabelsNonBlobs(p.ContentStore, childrenHandler) // Filter the children by the platform childrenHandler = images.FilterPlatforms(childrenHandler, platform) // Limit manifests pulled to the best match in an index @@ -209,86 +196,20 @@ func (p *Puller) Pull(ctx context.Context) (*Pulled, error) { } } - for _, l := range layerBlobs { - labels := map[string]string{} - var fields []string - for _, nl := range notLayerBlobs { - k := "containerd.io/gc.ref.content." + nl.Digest.Hex()[:12] - labels[k] = nl.Digest.String() - fields = append(fields, "labels."+k) - } - if _, err := p.ContentStore.Update(ctx, content.Info{ - Digest: l.Digest, - Labels: labels, - }, fields...); err != nil { - return nil, err - } - } - - for _, nl := range append(notLayerBlobs, unusedBlobs...) { - if err := p.ContentStore.Delete(ctx, nl.Digest); err != nil { - return nil, err - } - } - - csh, release := snapshot.NewContainerdSnapshotter(p.Snapshotter) - defer release() - - unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.Src.String()) - chainid, err := unpack(ctx, p.desc, p.ContentStore, csh, p.Snapshotter, p.Applier, platform) + layers, err := getLayers(ctx, p.ContentStore, p.desc, platform) if err != nil { - return nil, unpackProgressDone(err) + return nil, err } - unpackProgressDone(nil) return &Pulled{ - Ref: p.ref, - Descriptor: p.desc, - ChainID: chainid, + Ref: p.ref, + Descriptor: p.desc, + Layers: layers, + MetadataBlobs: notLayerBlobs, }, nil } -func unpack(ctx context.Context, desc ocispec.Descriptor, cs content.Store, csh ctdsnapshot.Snapshotter, s snapshot.Snapshotter, applier diff.Applier, platform platforms.MatchComparer) (digest.Digest, error) { - layers, err := getLayers(ctx, cs, desc, platform) - if err != nil { - return "", err - } - - var chain []digest.Digest - for _, layer := range layers { - labels := map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - } - if _, err := rootfs.ApplyLayer(ctx, layer, chain, csh, applier, ctdsnapshot.WithLabels(labels)); err != nil { - return "", err - } - chain = append(chain, layer.Diff.Digest) - } - chainID := identity.ChainID(chain) - if err != nil { - return "", err - } - - if err := fillBlobMapping(ctx, s, layers); err != nil { - return "", err - } - - return chainID, nil -} - -func fillBlobMapping(ctx context.Context, s snapshot.Snapshotter, layers []rootfs.Layer) error { - var chain []digest.Digest - for _, l := range layers { - chain = append(chain, l.Diff.Digest) - chainID := identity.ChainID(chain) - if err := s.SetBlob(ctx, string(chainID), l.Diff.Digest, l.Blob.Digest); err != nil { - return err - } - } - return nil -} - -func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform platforms.MatchComparer) ([]rootfs.Layer, error) { +func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Descriptor, platform platforms.MatchComparer) ([]ocispec.Descriptor, error) { manifest, err := images.Manifest(ctx, provider, desc, platform) if err != nil { return nil, errors.WithStack(err) @@ -301,14 +222,14 @@ func getLayers(ctx context.Context, provider content.Provider, desc ocispec.Desc if len(diffIDs) != len(manifest.Layers) { return nil, errors.Errorf("mismatched image rootfs and manifest layers %+v %+v", diffIDs, manifest.Layers) } - layers := make([]rootfs.Layer, len(diffIDs)) + layers := make([]ocispec.Descriptor, len(diffIDs)) for i := range diffIDs { - layers[i].Diff = ocispec.Descriptor{ - // TODO: derive media type from compressed type - MediaType: ocispec.MediaTypeImageLayer, - Digest: diffIDs[i], + desc := manifest.Layers[i] + if desc.Annotations == nil { + desc.Annotations = map[string]string{} } - layers[i].Blob = manifest.Layers[i] + desc.Annotations["containerd.io/uncompressed"] = diffIDs[i].String() + layers[i] = desc } return layers, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 10b7f60fcab8..ea621b16369c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -26,12 +26,15 @@ github.com/codahale/hdrhistogram # github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50 github.com/containerd/console # github.com/containerd/containerd v1.4.0-0.20191014053712-acdcf13d5eaf -github.com/containerd/containerd/filters -github.com/containerd/containerd/mount -github.com/containerd/containerd/snapshots github.com/containerd/containerd/content github.com/containerd/containerd/diff +github.com/containerd/containerd/errdefs +github.com/containerd/containerd/filters +github.com/containerd/containerd/gc github.com/containerd/containerd/images +github.com/containerd/containerd/leases +github.com/containerd/containerd/mount +github.com/containerd/containerd/snapshots github.com/containerd/containerd/remotes github.com/containerd/containerd/remotes/docker github.com/containerd/containerd/content/local @@ -46,8 +49,6 @@ github.com/containerd/containerd/oci github.com/containerd/containerd/containers github.com/containerd/containerd/contrib/seccomp github.com/containerd/containerd/namespaces -github.com/containerd/containerd/errdefs -github.com/containerd/containerd/leases github.com/containerd/containerd/rootfs github.com/containerd/containerd/images/archive github.com/containerd/containerd/api/services/content/v1 @@ -85,7 +86,6 @@ github.com/containerd/containerd/runtime/v2/runc/options github.com/containerd/containerd/snapshots/proxy github.com/containerd/containerd/snapshots/storage github.com/containerd/containerd/reference/docker -github.com/containerd/containerd/gc github.com/containerd/containerd/identifiers github.com/containerd/containerd/metadata/boltutil github.com/containerd/containerd/events/exchange @@ -205,9 +205,9 @@ github.com/morikuni/aec # github.com/opencontainers/go-digest v1.0.0-rc1 github.com/opencontainers/go-digest # github.com/opencontainers/image-spec v1.0.1 +github.com/opencontainers/image-spec/identity github.com/opencontainers/image-spec/specs-go/v1 github.com/opencontainers/image-spec/specs-go -github.com/opencontainers/image-spec/identity # github.com/opencontainers/runc v1.0.0-rc8.0.20190621203724-f4982d86f7fd github.com/opencontainers/runc/libcontainer/system github.com/opencontainers/runc/libcontainer/user diff --git a/worker/base/worker.go b/worker/base/worker.go index f2dbce56149c..802ccf0c239d 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -12,11 +12,10 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" + "github.com/containerd/containerd/gc" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/remotes/docker" - "github.com/containerd/containerd/rootfs" - cdsnapshot "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/blobs" @@ -48,7 +47,6 @@ import ( "github.com/moby/buildkit/util/resolver" "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" - ociidentity "github.com/opencontainers/image-spec/identity" ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -78,6 +76,7 @@ type WorkerOpt struct { ResolveOptionsFunc resolver.ResolveOptionsFunc IdentityMapping *idtools.IdentityMapping LeaseManager leases.Manager + GarbageCollect func(context.Context) (gc.Stats, error) } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -94,7 +93,6 @@ type Worker struct { func NewWorker(opt WorkerOpt) (*Worker, error) { imageRefChecker := imagerefchecker.New(imagerefchecker.Opt{ ImageStore: opt.ImageStore, - Snapshotter: opt.Snapshotter, ContentStore: opt.ContentStore, }) @@ -102,6 +100,10 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { Snapshotter: opt.Snapshotter, MetadataStore: opt.MetadataStore, PruneRefChecker: imageRefChecker, + Applier: opt.Applier, + GarbageCollect: opt.GarbageCollect, + LeaseManager: opt.LeaseManager, + ContentStore: opt.ContentStore, }) if err != nil { return nil, err @@ -169,6 +171,14 @@ func NewWorker(opt WorkerOpt) (*Worker, error) { return nil, err } + leases, err := opt.LeaseManager.List(context.TODO(), "labels.\"buildkit/lease.temporary\"") + if err != nil { + return nil, err + } + for _, l := range leases { + opt.LeaseManager.Delete(context.TODO(), l) + } + return &Worker{ WorkerOpt: opt, CacheManager: cm, @@ -332,7 +342,13 @@ func (w *Worker) Exporter(name string, sm *session.Manager) (exporter.Exporter, } func (w *Worker) GetRemote(ctx context.Context, ref cache.ImmutableRef, createIfNeeded bool) (*solver.Remote, error) { - diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Snapshotter, w.Differ, ref, createIfNeeded) + ctx, done, err := leaseutil.WithLease(ctx, w.LeaseManager, leaseutil.MakeTemporary) + if err != nil { + return nil, err + } + defer done(ctx) + + diffPairs, err := blobs.GetDiffPairs(ctx, w.ContentStore(), w.Differ, ref, createIfNeeded) if err != nil { return nil, errors.Wrap(err, "failed calculating diff pairs for exported snapshot") } @@ -384,8 +400,8 @@ func getCreatedTimes(ref cache.ImmutableRef) (out []time.Time) { return append(out, cache.GetCreatedAt(ref.Metadata())) } -func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error) { - ctx, done, err := leaseutil.WithLease(ctx, w.LeaseManager) +func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cache.ImmutableRef, err error) { + ctx, done, err := leaseutil.WithLease(ctx, w.LeaseManager, leaseutil.MakeTemporary) if err != nil { return nil, err } @@ -416,84 +432,39 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.I return nil, err } - cd, release := snapshot.NewContainerdSnapshotter(w.Snapshotter) - defer release() - unpackProgressDone := oneOffProgress(ctx, "unpacking") - chainIDs, refs, err := w.unpack(ctx, w.CacheManager, remote.Descriptors, cd) - if err != nil { - return nil, unpackProgressDone(err) - } defer func() { - for _, ref := range refs { - ref.Release(context.TODO()) - } + err = unpackProgressDone(err) }() - unpackProgressDone(nil) - - for i, chainID := range chainIDs { + var current cache.ImmutableRef + for i, desc := range remote.Descriptors { tm := time.Now() - if tmstr, ok := remote.Descriptors[i].Annotations[labelCreatedAt]; ok { + if tmstr, ok := desc.Annotations[labelCreatedAt]; ok { if err := (&tm).UnmarshalText([]byte(tmstr)); err != nil { + if current != nil { + current.Release(context.TODO()) + } return nil, err } } descr := fmt.Sprintf("imported %s", remote.Descriptors[i].Digest) - if v, ok := remote.Descriptors[i].Annotations["buildkit/description"]; ok { + if v, ok := desc.Annotations["buildkit/description"]; ok { descr = v } - ref, err := w.CacheManager.Get(ctx, chainID, cache.WithDescription(descr), cache.WithCreationTime(tm)) - if err != nil { - return nil, err + ref, err := w.CacheManager.GetByBlob(ctx, desc, current, cache.WithDescription(descr), cache.WithCreationTime(tm)) + if current != nil { + current.Release(context.TODO()) } - if i == len(remote.Descriptors)-1 { - return ref, nil - } - ref.Release(context.TODO()) - } - return nil, errors.Errorf("unreachable") -} - -func (w *Worker) unpack(ctx context.Context, cm cache.Manager, descs []ocispec.Descriptor, s cdsnapshot.Snapshotter) (ids []string, refs []cache.ImmutableRef, err error) { - defer func() { if err != nil { - for _, r := range refs { - r.Release(context.TODO()) - } - } - }() - - layers, err := getLayers(ctx, descs) - if err != nil { - return nil, nil, err - } - - var chain []digest.Digest - for _, layer := range layers { - newChain := append(chain, layer.Diff.Digest) - - chainID := ociidentity.ChainID(newChain) - ref, err := cm.Get(ctx, string(chainID)) - if err == nil { - refs = append(refs, ref) - } else { - if _, err := rootfs.ApplyLayer(ctx, layer, chain, s, w.Applier); err != nil { - return nil, nil, err - } + return nil, err } - chain = newChain - - if err := w.Snapshotter.SetBlob(ctx, string(chainID), layer.Diff.Digest, layer.Blob.Digest); err != nil { - return nil, nil, err + if err := ref.Extract(ctx); err != nil { + ref.Release(context.TODO()) + return nil, err } + current = ref } - - ids = make([]string, len(chain)) - for i := range chain { - ids[i] = string(ociidentity.ChainID(chain[:i+1])) - } - - return ids, refs, nil + return current, nil } // Labels returns default labels @@ -528,30 +499,6 @@ func ID(root string) (string, error) { return string(b), nil } -func getLayers(ctx context.Context, descs []ocispec.Descriptor) ([]rootfs.Layer, error) { - layers := make([]rootfs.Layer, len(descs)) - for i, desc := range descs { - diffIDStr := desc.Annotations["containerd.io/uncompressed"] - if diffIDStr == "" { - return nil, errors.Errorf("%s missing uncompressed digest", desc.Digest) - } - diffID, err := digest.Parse(diffIDStr) - if err != nil { - return nil, err - } - layers[i].Diff = ocispec.Descriptor{ - MediaType: ocispec.MediaTypeImageLayer, - Digest: diffID, - } - layers[i].Blob = ocispec.Descriptor{ - MediaType: desc.MediaType, - Digest: desc.Digest, - Size: desc.Size, - } - } - return layers, nil -} - func oneOffProgress(ctx context.Context, id string) func(err error) error { pw, _, _ := progress.FromContext(ctx) now := time.Now() diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index 5bae4f955d41..7d28982432a4 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -5,25 +5,22 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/containerd/containerd" introspection "github.com/containerd/containerd/api/services/introspection/v1" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/snapshots" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/leases" + "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/containerdexecutor" "github.com/moby/buildkit/executor/oci" - "github.com/moby/buildkit/identity" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/network/netproviders" - "github.com/moby/buildkit/util/throttle" "github.com/moby/buildkit/util/winlayers" "github.com/moby/buildkit/worker/base" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) // NewWorkerOpt creates a WorkerOpt. @@ -49,10 +46,6 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s return base.WorkerOpt{}, errors.Wrapf(err, "failed to create %s", root) } - md, err := metadata.NewStore(filepath.Join(root, "metadata.db")) - if err != nil { - return base.WorkerOpt{}, err - } df := client.DiffService() // TODO: should use containerd daemon instance ID (containerd/containerd#1862)? id, err := base.ID(root) @@ -64,28 +57,17 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s xlabels[k] = v } - throttledGC := throttle.Throttle(time.Second, func() { - // TODO: how to avoid this? - ctx := context.TODO() - snapshotter := client.SnapshotService(snapshotterName) - ctx = namespaces.WithNamespace(ctx, ns) - key := identity.NewID() - if _, err := snapshotter.Prepare(ctx, key, "", snapshots.WithLabels(map[string]string{ - "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339Nano), - })); err != nil { - logrus.Errorf("GC error: %+v", err) - } - if err := snapshotter.Remove(ctx, key); err != nil { - logrus.Errorf("GC error: %+v", err) - } - }) + lm := leaseutil.WithNamespace(client.LeasesService(), ns) - gc := func(ctx context.Context) error { - throttledGC() - return nil + gc := func(ctx context.Context) (gc.Stats, error) { + l, err := lm.Create(ctx) + if err != nil { + return nil, nil + } + return nil, lm.Delete(ctx, leases.Lease{ID: l.ID}, leases.SynchronousDelete) } - cs := containerdsnapshot.NewContentStore(client.ContentStore(), ns, gc) + cs := containerdsnapshot.NewContentStore(client.ContentStore(), ns) resp, err := client.IntrospectionService().Plugins(context.TODO(), &introspection.PluginsRequest{Filters: []string{"type==io.containerd.runtime.v1"}}) if err != nil { @@ -111,18 +93,30 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s return base.WorkerOpt{}, err } + snap := containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), ns, nil) + + if err := cache.MigrateV2(context.TODO(), filepath.Join(root, "metadata.db"), filepath.Join(root, "metadata_v2.db"), cs, snap, lm); err != nil { + return base.WorkerOpt{}, err + } + + md, err := metadata.NewStore(filepath.Join(root, "metadata_v2.db")) + if err != nil { + return base.WorkerOpt{}, err + } + opt := base.WorkerOpt{ - ID: id, - Labels: xlabels, - MetadataStore: md, - Executor: containerdexecutor.New(client, root, "", np, dns), - Snapshotter: containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), cs, md, ns, gc, nil), - ContentStore: cs, - Applier: winlayers.NewFileSystemApplierWithWindows(cs, df), - Differ: winlayers.NewWalkingDiffWithWindows(cs, df), - ImageStore: client.ImageService(), - Platforms: platforms, - LeaseManager: leaseutil.WithNamespace(client.LeasesService(), ns), + ID: id, + Labels: xlabels, + MetadataStore: md, + Executor: containerdexecutor.New(client, root, "", np, dns), + Snapshotter: snap, + ContentStore: cs, + Applier: winlayers.NewFileSystemApplierWithWindows(cs, df), + Differ: winlayers.NewWalkingDiffWithWindows(cs, df), + ImageStore: client.ImageService(), + Platforms: platforms, + LeaseManager: lm, + GarbageCollect: gc, } return opt, nil } diff --git a/worker/runc/runc.go b/worker/runc/runc.go index b65c426c9a0c..b80e93ace253 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -4,7 +4,6 @@ import ( "context" "os" "path/filepath" - "time" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/diff/apply" @@ -13,17 +12,16 @@ import ( "github.com/containerd/containerd/platforms" ctdsnapshot "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" + "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor/oci" "github.com/moby/buildkit/executor/runcexecutor" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/network/netproviders" - "github.com/moby/buildkit/util/throttle" "github.com/moby/buildkit/util/winlayers" "github.com/moby/buildkit/worker/base" specs "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" ) @@ -41,10 +39,6 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc if err := os.MkdirAll(root, 0700); err != nil { return opt, err } - md, err := metadata.NewStore(filepath.Join(root, "metadata.db")) - if err != nil { - return opt, err - } np, err := netproviders.Providers(nopt) if err != nil { @@ -85,18 +79,7 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc return opt, err } - throttledGC := throttle.Throttle(time.Second, func() { - if _, err := mdb.GarbageCollect(context.TODO()); err != nil { - logrus.Errorf("GC error: %+v", err) - } - }) - - gc := func(ctx context.Context) error { - throttledGC() - return nil - } - - c = containerdsnapshot.NewContentStore(mdb.ContentStore(), "buildkit", gc) + c = containerdsnapshot.NewContentStore(mdb.ContentStore(), "buildkit") id, err := base.ID(root) if err != nil { @@ -106,19 +89,31 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc for k, v := range labels { xlabels[k] = v } + snap := containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap) + lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit") + if err := cache.MigrateV2(context.TODO(), filepath.Join(root, "metadata.db"), filepath.Join(root, "metadata_v2.db"), c, snap, lm); err != nil { + return opt, err + } + + md, err := metadata.NewStore(filepath.Join(root, "metadata_v2.db")) + if err != nil { + return opt, err + } + opt = base.WorkerOpt{ ID: id, Labels: xlabels, MetadataStore: md, Executor: exe, - Snapshotter: containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), c, md, "buildkit", gc, idmap), + Snapshotter: snap, ContentStore: c, Applier: winlayers.NewFileSystemApplierWithWindows(c, apply.NewFileSystemApplier(c)), Differ: winlayers.NewWalkingDiffWithWindows(c, walking.NewWalkingDiff(c)), ImageStore: nil, // explicitly Platforms: []specs.Platform{platforms.Normalize(platforms.DefaultSpec())}, IdentityMapping: idmap, - LeaseManager: leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit"), + LeaseManager: lm, + GarbageCollect: mdb.GarbageCollect, } return opt, nil }