Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track downloads statistics on timescaledb #4979

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -806,9 +806,10 @@ GEM
tailwindcss-ruby (3.4.14-x86_64-linux)
thor (1.3.2)
timeout (0.4.3)
timescaledb (0.3.0)
timescaledb (0.3.1)
activerecord
activesupport
ostruct
pg (~> 1.2)
toxiproxy (2.0.2)
tpm-key_attestation (0.12.1)
Expand Down Expand Up @@ -1302,7 +1303,7 @@ CHECKSUMS
tailwindcss-ruby (3.4.14-x86_64-linux) sha256=c23dec24cb855d748750e8f4bca1b048a1e617f381e81336c8ab65ede7e84d5c
thor (1.3.2) sha256=eef0293b9e24158ccad7ab383ae83534b7ad4ed99c09f96f1a6b036550abbeda
timeout (0.4.3) sha256=9509f079b2b55fe4236d79633bd75e34c1c1e7e3fb4b56cb5fda61f80a0fe30e
timescaledb (0.3.0) sha256=9ce2b39417d30544054cb609fbd84e18e304c7b7952a793846b8f4489551a28f
timescaledb (0.3.1) sha256=760917be9649475ce63c3126eee4ba83257a5c6dabaf1f9db45c98c8d6f64e0b
toxiproxy (2.0.2) sha256=2e3b53604fb921d40da3db8f78a52b3133fcae33e93d440725335b15974e440a
tpm-key_attestation (0.12.1) sha256=3c1315bed06ba3563aee98ff69c270d9b45b586a43ac2da250b23cad3c3caca3
turbo-rails (2.0.11) sha256=fc47674736372780abd2a4dc0d84bef242f5ca156a457cd7fa6308291e397fcf
Expand Down
4 changes: 4 additions & 0 deletions app/controllers/avo/log_downloads_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This controller has been generated to enable Rails' resource routes.
# More information on https://docs.avohq.io/2.0/controllers.html
class Avo::LogDownloadsController < Avo::ResourcesController
end
99 changes: 99 additions & 0 deletions app/jobs/fastly_log_downloads_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
require "zlib"

# Process log files downloaded from Fastly and insert row by row into the database.
# It works in a similar way to FastlyLogProcessor, but it's optimized for a different
# use case: it processes log files downloaded from Fastly and inserts the raw data into
# the database in batches.
# The counters and other metrics are calculated in a separate job directly in
# the database through the continuous aggregates.
# Check Download::PerMinute, Download::PerHour and other classes as an example.
class FastlyLogDownloadsProcessor
class LogFileNotFoundError < ::StandardError; end

extend StatsD::Instrument

BATCH_SIZE = 5000

attr_accessor :bucket, :key
attr_reader :processed_count

def initialize(bucket, key)
@bucket = bucket
@key = key
@processed_count = 0

Check warning on line 23 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L21-L23

Added lines #L21 - L23 were not covered by tests
end

def perform
StatsD.increment("fastly_log_downloads_processor.started")
raise LogFileNotFoundError if body.nil?

Check warning on line 28 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L27-L28

Added lines #L27 - L28 were not covered by tests

count = 0
parse_success_downloads.each_slice(BATCH_SIZE) do |batch|
Download.insert_all batch
count += batch.size

Check warning on line 33 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L30-L33

Added lines #L30 - L33 were not covered by tests
end

if count > 0
element.update(status: "processed", processed_count: count)

Check warning on line 37 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L36-L37

Added lines #L36 - L37 were not covered by tests
else
element.update(status: "failed")

Check warning on line 39 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L39

Added line #L39 was not covered by tests
end

# This value may diverge from numbers from the fastly_log_processor as totals are
# not aggregated with the number of downloads but each row represents a download.
StatsD.gauge("fastly_log_downloads_processor.processed_count", count)
count

Check warning on line 45 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L44-L45

Added lines #L44 - L45 were not covered by tests
end

def body
@body ||= element&.body

Check warning on line 49 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L49

Added line #L49 was not covered by tests
end

def element
@element ||= LogDownload.pop(directory: @bucket, key: @key)

Check warning on line 53 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L53

Added line #L53 was not covered by tests
end

def parse_success_downloads
body.each_line.map do |log_line|
fragments = log_line.split
path, response_code = fragments[10, 2]
case response_code.to_i

Check warning on line 60 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L57-L60

Added lines #L57 - L60 were not covered by tests
# Only count successful downloads
# NB: we consider a 304 response a download attempt
when 200, 304
m = path.match(PATH_PATTERN)
gem_name = m[:gem_name] || path
gem_version = m[:gem_version]
created_at = Time.parse fragments[4..9].join(' ')
env = parse_env fragments[12..-1]
payload = {env:}

Check warning on line 69 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L64-L69

Added lines #L64 - L69 were not covered by tests

{created_at:, gem_name:, gem_version:, payload:}

Check warning on line 71 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L71

Added line #L71 was not covered by tests
end
end.compact
end


# Parse the env into a hash of key value pairs
# example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0"
# output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"}
# case it says single word like jruby it appends true as the value
# example env = "jruby"
# output = {jruby: "true"}
# also removes some unwanted characters
def parse_env(output)
env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip
env = nil if env == "(null)"
env = env.split(' ').map do |info|
pair = info.split(/\/|-/,2)
pair << "true" if pair.size == 1
pair

Check warning on line 90 in app/jobs/fastly_log_downloads_processor.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor.rb#L85-L90

Added lines #L85 - L90 were not covered by tests
end.to_h
end

statsd_count_success :perform, "fastly_log_downloads_processor.perform"
statsd_measure :perform, "fastly_log_downloads_processor.job_performance"

PATH_PATTERN = /\/gems\/(?<gem_name>.*)-(?<gem_version>\d+.*)\.gem/
private_constant :PATH_PATTERN
end
21 changes: 21 additions & 0 deletions app/jobs/fastly_log_downloads_processor_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Same as the FastlyLogProcessorJob but for saving it to TimescaleDB
# and the Download table as flat downloads.
class FastlyLogDownloadsProcessorJob < ApplicationJob
queue_as :default
queue_with_priority PRIORITIES.fetch(:stats)

include GoodJob::ActiveJobExtensions::Concurrency
good_job_control_concurrency_with(
# Maximum number of jobs with the concurrency key to be
# concurrently performed (excludes enqueued jobs)
#
# Limited to avoid overloading the gem_download table with
# too many concurrent conflicting updates
perform_limit: good_job_concurrency_perform_limit(default: 5),
key: name
)

def perform(bucket:, key:)
FastlyLogDownloadsProcessor.new(bucket, key).perform

Check warning on line 19 in app/jobs/fastly_log_downloads_processor_job.rb

View check run for this annotation

Codecov / codecov/patch

app/jobs/fastly_log_downloads_processor_job.rb#L19

Added line #L19 was not covered by tests
end
end
20 changes: 20 additions & 0 deletions app/models/download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class Download < DownloadsDB
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper

Check warning on line 3 in app/models/download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/download.rb#L2-L3

Added lines #L2 - L3 were not covered by tests

acts_as_hypertable time_column: 'created_at', segment_by: [:gem_name, :gem_version]

Check warning on line 5 in app/models/download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/download.rb#L5

Added line #L5 was not covered by tests

scope :total_downloads, -> { select("count(*) as downloads").order(:created_at) }
scope :downloads_by_gem, -> { select("gem_name, count(*) as downloads").group(:gem_name).order(:created_at) }
scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as downloads").group(:gem_name, :gem_version).order(:created_at) }

Check warning on line 9 in app/models/download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/download.rb#L7-L9

Added lines #L7 - L9 were not covered by tests

continuous_aggregates(

Check warning on line 11 in app/models/download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/download.rb#L11

Added line #L11 was not covered by tests
timeframes: [:minute, :hour, :day, :month],
scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" },
month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" }
})
end
5 changes: 5 additions & 0 deletions app/models/downloads_db.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class DownloadsDB < ApplicationRecord
self.abstract_class = true

connects_to database: { writing: :downloads }
end
33 changes: 33 additions & 0 deletions app/models/log_download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Mimic LogTicket model to store the log files but for the downloads database.
# It will be backfilled with the log files from the main database to the downloads database.
# There will be a background job to process the log files
class LogDownload < DownloadsDB
enum backend: { s3: 0, local: 1 }
enum status: %i[pending processing failed processed].index_with(&:to_s)

Check warning on line 6 in app/models/log_download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/log_download.rb#L4-L6

Added lines #L4 - L6 were not covered by tests

scope :latest_created_at, -> { order(created_at: :desc).select(:created_at).pluck(:created_at).first }

Check warning on line 8 in app/models/log_download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/log_download.rb#L8

Added line #L8 was not covered by tests

def self.pop(key: nil, directory: nil)
scope = pending.limit(1).order("created_at ASC")
scope = scope.where(key: key) if key
scope = scope.where(directory: directory) if directory
scope.lock(true).sole.tap do |log_download|
log_download.update_column(:status, "processing")
end
rescue ActiveRecord::RecordNotFound
nil # no ticket in queue found by `sole` call
end

Check warning on line 19 in app/models/log_download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/log_download.rb#L10-L19

Added lines #L10 - L19 were not covered by tests

def fs
@fs ||=
if s3?
RubygemFs::S3.new(bucket: directory)
else
RubygemFs::Local.new(directory)
end
end

Check warning on line 28 in app/models/log_download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/log_download.rb#L21-L28

Added lines #L21 - L28 were not covered by tests

def body
fs.get(key)
end
end

Check warning on line 33 in app/models/log_download.rb

View check run for this annotation

Codecov / codecov/patch

app/models/log_download.rb#L30-L33

Added lines #L30 - L33 were not covered by tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module Maintenance

Check warning on line 3 in app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb#L3

Added line #L3 was not covered by tests
# This task is used to backfill log downloads from log tickets.
# It is used to migrate from past to present using created_at date to order
# limit 500 per iteration and use latest created_at date to get the next 500
# later union with pending tickets.
class BackfillLogDownloadsFromLogTicketsTask < MaintenanceTasks::Task
def collection

Check warning on line 9 in app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb#L8-L9

Added lines #L8 - L9 were not covered by tests
# migrate from past to present using created_at date to order
# limit 500 per iteration and use latest created_at date to get the next 500
# later union with pending tickets
scope = LogTicket.processed.order(created_at: :asc)
last_created_at = LogDownload.latest_created_at
scope = scope.where("created_at < ?", last_created_at) if last_created_at
scope
.limit(500)
.union(LogTicket.pending.order(created_at: :asc).limit(500))
end

Check warning on line 19 in app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb#L13-L19

Added lines #L13 - L19 were not covered by tests

def process(batch)
LogDownload.insert_all(batch.select(:id, :status, :directory, :key, :created_at).to_a.map(&:attributes))
end
end
end

Check warning on line 25 in app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb#L21-L25

Added lines #L21 - L25 were not covered by tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Maintenance

Check warning on line 3 in app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb#L3

Added line #L3 was not covered by tests
# Helper to keep backfilling LogTickets to TimescaleDB downloads table.
# It will be used to migrate the data from the old LogTicket table to the new LogDownload table.
# It will be executed in the background and it will be a one time task.
# Later, after all pending LogTickets are migrated, this job will be removed.
class BackfillLogTicketsToTimescaleDownloadsTask < MaintenanceTasks::Task
def collection
LogDownload.where(status: "pending")
end

Check warning on line 11 in app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb#L8-L11

Added lines #L8 - L11 were not covered by tests

def process(element)
FastlyLogDownloadsProcessor.new(element.directory, element.key).perform
end
end
end

Check warning on line 17 in app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb

View check run for this annotation

Codecov / codecov/patch

app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb#L13-L17

Added lines #L13 - L17 were not covered by tests
3 changes: 2 additions & 1 deletion config/initializers/zeitwerk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

Rails.autoloaders.once.inflector.inflect(
"http" => "HTTP",
"oidc" => "OIDC"
"oidc" => "OIDC",
"downloads_db" => "DownloadsDB"
)
29 changes: 29 additions & 0 deletions db/downloads_migrate/20240708184547_create_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class CreateDownloads < ActiveRecord::Migration[7.1]

disable_ddl_transaction!

def self.up
self.down if Download.table_exists?

hypertable_options = {
time_column: 'created_at',
chunk_time_interval: '1 day',
compress_segmentby: 'gem_name, gem_version',
compress_orderby: 'created_at DESC',
compress_after: '7 days'
}

create_table(:downloads, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :created_at, null: false
t.text :gem_name, :gem_version, null: false
t.jsonb :payload
end

Download.create_continuous_aggregates
end
def self.down
Download.drop_continuous_aggregates

drop_table(:downloads, force: :cascade, if_exists: true) if Download.table_exists?
end
end
16 changes: 16 additions & 0 deletions db/downloads_migrate/20240823181725_create_log_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Mimic LogTicket table to store the log files but for the downloads database
# It will be used to store the log files to be processed during the migration
class CreateLogDownloads < ActiveRecord::Migration[7.1]
def change
create_table :log_downloads do |t|
t.string :key
t.string :directory
t.integer :backend
t.string :status, default: "pending"
t.integer :processed_count, default: 0
t.timestamps
end

add_index :log_downloads, [:key, :directory], unique: true
end
end
Loading
Loading