diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index 44b45c472c..e6b30857b6 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -332,21 +332,33 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, // Then try reusing blobs from other locations. candidates := options.Cache.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, options.CanSubstitute) for _, candidate := range candidates { - candidateRepo, err := parseBICLocationReference(candidate.Location) - if err != nil { - logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) - continue - } - if candidate.CompressorName != blobinfocache.Uncompressed { - logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + var candidateRepo reference.Named + if !candidate.UnknownLocation { + candidateRepo, err = parseBICLocationReference(candidate.Location) + if err != nil { + logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) + continue + } + if candidate.CompressorName != blobinfocache.Uncompressed { + logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + } else { + logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name()) + } + // Sanity checks: + if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { + logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) + continue + } } else { - logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name()) - } - - // Sanity checks: - if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { - logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) - continue + if candidate.CompressorName != blobinfocache.Uncompressed { + logrus.Debugf("Trying to reuse cached location %s compressed with %s", candidate.Digest.String(), candidate.CompressorName) + } else { + logrus.Debugf("Trying to reuse cached location %s with no compression", candidate.Digest.String()) + } + // This digest is a known variant of this blob but we don’t + // have a recorded location in this registry, let’s try looking + // for it in the current repo. + candidateRepo = reference.TrimNamed(d.ref.ref) } if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest { logrus.Debug("... Already tried the primary destination") diff --git a/internal/blobinfocache/types.go b/internal/blobinfocache/types.go index 3c2be57f32..d28bbfdd19 100644 --- a/internal/blobinfocache/types.go +++ b/internal/blobinfocache/types.go @@ -39,7 +39,8 @@ type BlobInfoCache2 interface { // BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2. type BICReplacementCandidate2 struct { - Digest digest.Digest - CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression - Location types.BICLocationReference + Digest digest.Digest + CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression + UnknownLocation bool // is true when `Location` for this blob is not set + Location types.BICLocationReference // not set if UnknownLocation is set to `true` } diff --git a/pkg/blobinfocache/boltdb/boltdb.go b/pkg/blobinfocache/boltdb/boltdb.go index a472efd95b..4f47e8199f 100644 --- a/pkg/blobinfocache/boltdb/boltdb.go +++ b/pkg/blobinfocache/boltdb/boltdb.go @@ -282,12 +282,15 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type }) // FIXME? Log error (but throttle the log volume on repeated accesses)? } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket with corresponding compression info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them to candidates. -func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { +// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket +// (or with an unknown location if scope is nil and v2Output is set) with corresponding compression +// info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them +// to candidates. +func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { digestKey := []byte(digest.String()) - b := scopeBucket.Bucket(digestKey) - if b == nil { - return candidates + var b *bolt.Bucket + if scopeBucket != nil { + b = scopeBucket.Bucket(digestKey) } compressorName := blobinfocache.UnknownCompression if compressionBucket != nil { @@ -297,24 +300,38 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW compressorName = string(compressorNameValue) } } - if compressorName == blobinfocache.UnknownCompression && requireCompressionInfo { + if compressorName == blobinfocache.UnknownCompression && v2Output { return candidates } - _ = b.ForEach(func(k, v []byte) error { - t := time.Time{} - if err := t.UnmarshalBinary(v); err != nil { - return err + if b != nil { + _ = b.ForEach(func(k, v []byte) error { + t := time.Time{} + if err := t.UnmarshalBinary(v); err != nil { + return err + } + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: types.BICLocationReference{Opaque: string(k)}, + }, + LastSeen: t, + }) + return nil + }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } else { + if v2Output { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, + }, + LastSeen: time.Time{}, + }) } - candidates = append(candidates, prioritize.CandidateWithTime{ - Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: types.BICLocationReference{Opaque: string(k)}, - }, - LastSeen: t, - }) - return nil - }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } return candidates } @@ -328,27 +345,22 @@ func (bdc *cache) CandidateLocations2(transport types.ImageTransport, scope type return bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) } -func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { +func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 { res := []prioritize.CandidateWithTime{} var uncompressedDigestValue digest.Digest // = "" if err := bdc.view(func(tx *bolt.Tx) error { scopeBucket := tx.Bucket(knownLocationsBucket) - if scopeBucket == nil { - return nil + if scopeBucket != nil { + scopeBucket = scopeBucket.Bucket([]byte(transport.Name())) } - scopeBucket = scopeBucket.Bucket([]byte(transport.Name())) - if scopeBucket == nil { - return nil - } - scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) - if scopeBucket == nil { - return nil + if scopeBucket != nil { + scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque)) } // compressionBucket won't have been created if previous writers never recorded info about compression, // and we don't want to fail just because of that compressionBucket := tx.Bucket(digestCompressorBucket) - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, v2Output) if canSubstitute { if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" { b := tx.Bucket(digestByUncompressedBucket) @@ -361,7 +373,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types return err } if d != primaryDigest && d != uncompressedDigestValue { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, v2Output) } return nil }); err != nil { @@ -370,7 +382,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types } } if uncompressedDigestValue != primaryDigest { - res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, requireCompressionInfo) + res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, v2Output) } } } diff --git a/pkg/blobinfocache/internal/prioritize/prioritize.go b/pkg/blobinfocache/internal/prioritize/prioritize.go index 6f5506d94c..cab60701b4 100644 --- a/pkg/blobinfocache/internal/prioritize/prioritize.go +++ b/pkg/blobinfocache/internal/prioritize/prioritize.go @@ -37,6 +37,16 @@ func (css *candidateSortState) Less(i, j int) bool { xi := css.cs[i] xj := css.cs[j] + // If i has location unknown and j has known location + // always treat i as lesser than j or vice-versa so the + // candidate with location unknown alaways gets lower priority. + if xi.Candidate.UnknownLocation && !xj.Candidate.UnknownLocation { + return true + } + if !xi.Candidate.UnknownLocation && xj.Candidate.UnknownLocation { + return false + } + // primaryDigest entries come first, more recent first. // uncompressedDigest entries, if uncompressedDigest is set and != primaryDigest, come last, more recent entry first. // Other digest values are primarily sorted by time (more recent first), secondarily by digest (to provide a deterministic order) diff --git a/pkg/blobinfocache/memory/memory.go b/pkg/blobinfocache/memory/memory.go index 426640366f..8ff7471649 100644 --- a/pkg/blobinfocache/memory/memory.go +++ b/pkg/blobinfocache/memory/memory.go @@ -120,25 +120,43 @@ func (mem *cache) RecordDigestCompressorName(blobDigest digest.Digest, compresso mem.compressors[blobDigest] = compressorName } -// appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates. -func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { - locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: scope, blobDigest: digest}] // nil if not present - for l, t := range locations { - compressorName, compressorKnown := mem.compressors[digest] - if !compressorKnown { - if requireCompressionInfo { - continue - } - compressorName = blobinfocache.UnknownCompression +// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket +// (or with an unknown location if scope is nil and v2Output is set) with corresponding compression +// info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them +// to candidates. +func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope *types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { + compressorName := blobinfocache.UnknownCompression + compressorNameValue, compressorKnown := mem.compressors[digest] + if compressorKnown { + compressorName = compressorNameValue + } + if compressorName == blobinfocache.UnknownCompression && v2Output { + return candidates + } + if scope != nil { + locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: *scope, blobDigest: digest}] // nil if not present + for l, t := range locations { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: l, + }, + LastSeen: t, + }) + } + } else { + if v2Output { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + UnknownLocation: true, + Location: types.BICLocationReference{Opaque: ""}, + }, + LastSeen: time.Time{}, + }) } - candidates = append(candidates, prioritize.CandidateWithTime{ - Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: l, - }, - LastSeen: t, - }) } return candidates } @@ -163,24 +181,32 @@ func (mem *cache) CandidateLocations2(transport types.ImageTransport, scope type return mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) } -func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { +func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 { mem.mutex.Lock() defer mem.mutex.Unlock() res := []prioritize.CandidateWithTime{} - res = mem.appendReplacementCandidates(res, transport, scope, primaryDigest, requireCompressionInfo) + resAllBlobs := []prioritize.CandidateWithTime{} var uncompressedDigest digest.Digest // = "" - if canSubstitute { + getCandidatesForUncompressedDigest := func(res []prioritize.CandidateWithTime, scope *types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime { if uncompressedDigest = mem.uncompressedDigestLocked(primaryDigest); uncompressedDigest != "" { otherDigests := mem.digestsByUncompressed[uncompressedDigest] // nil if not present in the map for d := range otherDigests { if d != primaryDigest && d != uncompressedDigest { - res = mem.appendReplacementCandidates(res, transport, scope, d, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, d, v2Output) } } if uncompressedDigest != primaryDigest { - res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, v2Output) } } + return res + } + res = mem.appendReplacementCandidates(res, transport, &scope, primaryDigest, v2Output) + if canSubstitute { + res = getCandidatesForUncompressedDigest(res, &scope, primaryDigest, v2Output) } + // Add alls potential blobs with location unknown if v2Output is configured true. + resAllBlobs = getCandidatesForUncompressedDigest(resAllBlobs, nil, primaryDigest, v2Output) + res = append(res, resAllBlobs...) return prioritize.DestructivelyPrioritizeReplacementCandidates(res, primaryDigest, uncompressedDigest) }