Skip to content

Commit

Permalink
sketch throttling progress
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Jan 9, 2025
1 parent 416c0f4 commit 7975f2d
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions R/class_active.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ active_class <- R6::R6Class(
exports = NULL,
process = NULL,
seconds_start = NULL,
seconds_meta_appended = NULL,
seconds_appended_meta = -Inf,
seconds_appended_progress = -Inf,
seconds_meta_uploaded = NULL,
skipping = TRUE,
initialize = function(
pipeline = NULL,
meta = NULL,
Expand Down Expand Up @@ -78,22 +80,27 @@ active_class <- R6::R6Class(
self$meta$database$flush_rows()
self$scheduler$progress$database$flush_rows()
},
upload_meta = function() {
self$meta$database$upload_staged()
self$scheduler$progress$database$upload_staged()
},
sync_meta_time = function() {
self$flush_meta_time()
self$upload_meta_time()
},
flush_meta_time = function() {
self$seconds_meta_appended <- self$seconds_meta_appended %|||% -Inf
now <- time_seconds_local()
if ((now - self$seconds_meta_appended) >= self$seconds_meta_append) {
self$flush_meta()
self$seconds_meta_appended <- time_seconds_local()
if ((now - seconds_appended_meta) >= seconds_meta_append) {
self$meta$database$flush_rows()
self$seconds_appended_meta <- now
}
if (skipping) {
threshold <- max(1, seconds_meta_append)
} else {
threshold <- seconds_meta_append
}
if ((now - seconds_appended_progress) >= threshold) {
self$scheduler$progress$database$flush_rows()
self$seconds_appended_progress <- now
self$skipping <- TRUE
}
},
upload_meta = function() {
self$meta$database$upload_staged()
self$scheduler$progress$database$upload_staged()
},
upload_meta_time = function() {
self$seconds_meta_uploaded <- self$seconds_meta_uploaded %|||% -Inf
now <- time_seconds_local()
Expand All @@ -102,6 +109,10 @@ active_class <- R6::R6Class(
self$seconds_meta_uploaded <- time_seconds_local()
}
},
sync_meta_time = function() {
flush_meta_time()
upload_meta_time()
},
flush_upload_meta_file = function(target) {
if (target_allow_meta(target)) {
self$flush_meta()
Expand Down Expand Up @@ -188,6 +199,7 @@ active_class <- R6::R6Class(
self$scheduler$trim(target, self$pipeline)
counter_del_name(self$scheduler$progress$queued, name)
} else if (target_should_run(target, self$meta)) {
self$skipping <- FALSE
self$flush_upload_meta_file(target)
runtime_increment_targets_run(tar_runtime)
target_gc(target)
Expand Down

0 comments on commit 7975f2d

Please sign in to comment.