Skip to content

Commit

Permalink
Implement the scale_factor for host-based sliding windows
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Kipper committed Jun 26, 2019
1 parent efd72d0 commit d16c6ab
Show file tree
Hide file tree
Showing 18 changed files with 184 additions and 61 deletions.
2 changes: 2 additions & 0 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ static inline void
semian_resource_free(void *ptr)
{
semian_resource_t *res = (semian_resource_t *) ptr;
dprintf("Freeing resource sem_id:%d", res->sem_id);

if (res->name) {
free(res->name);
res->name = NULL;
Expand Down
78 changes: 68 additions & 10 deletions ext/semian/sliding_window.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ static int
check_max_size_arg(VALUE max_size)
{
int retval = -1;
switch (TYPE(max_size)) {
switch (rb_type(max_size)) {
case T_NIL:
case T_UNDEF:
retval = SLIDING_WINDOW_MAX_SIZE; break;
case T_FLOAT:
rb_warn("semian sliding window max_size is a float, converting to fixnum");
retval = (int)(RFLOAT_VALUE(max_size)); break;
default:
case T_FIXNUM:
case T_BIGNUM:
retval = RB_NUM2INT(max_size); break;
default:
rb_raise(rb_eArgError, "unknown type for max_size: %d", TYPE(max_size));
}

if (retval <= 0) {
Expand All @@ -60,6 +64,33 @@ check_max_size_arg(VALUE max_size)
return retval;
}

static float
check_scale_factor_arg(VALUE scale_factor)
{
float retval = 1.0;
switch(rb_type(scale_factor)) {
case T_NIL:
case T_UNDEF:
retval = 1.0; break;
case T_FLOAT:
retval = rb_float_value(scale_factor); break;
case T_FIXNUM:
case T_BIGNUM:
rb_warn("semian sliding window scale_factor is an int, converting to float");
retval = (float)RB_NUM2INT(scale_factor); break;
default:
rb_raise(rb_eArgError, "unknown type for scale_factor: %d", TYPE(scale_factor));
}

if (retval <= 0.0) {
rb_raise(rb_eArgError, "scale_factor must be greater than zero");
} else if (retval > 1.0) {
rb_raise(rb_eArgError, "scale_factor cannot be greater than 1.0");
}

return retval;
}

static VALUE
grow_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
{
Expand Down Expand Up @@ -106,12 +137,12 @@ shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
window->end = 0;
} else if (window->end > window->start) {
// Easy case - the window doesn't wrap around
window->end = window->start + new_length;
window->start = window->start + new_length;
} else {
// Hard case - the window wraps, so re-index the data
// Adapted from http://www.cplusplus.com/reference/algorithm/rotate/
int first = 0;
int middle = window->start;
int middle = (window->end - new_max_size + window->max_size) % window->max_size;
int last = window->max_size;
int next = middle;
while (first != next) {
Expand Down Expand Up @@ -169,10 +200,11 @@ Init_SlidingWindow()
VALUE cSlidingWindow = rb_const_get(cSimple, rb_intern("SlidingWindow"));

rb_define_alloc_func(cSlidingWindow, semian_simple_sliding_window_alloc);
rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 2);
rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 3);
rb_define_method(cSlidingWindow, "size", semian_simple_sliding_window_size, 0);
rb_define_method(cSlidingWindow, "resize_to", semian_simple_sliding_window_resize_to, 1);
rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size, 0);
rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size_get, 0);
rb_define_method(cSlidingWindow, "max_size=", semian_simple_sliding_window_max_size_set, 1);
rb_define_method(cSlidingWindow, "values", semian_simple_sliding_window_values, 0);
rb_define_method(cSlidingWindow, "last", semian_simple_sliding_window_last, 0);
rb_define_method(cSlidingWindow, "<<", semian_simple_sliding_window_push, 1);
Expand Down Expand Up @@ -212,7 +244,7 @@ static int max(int a, int b) {
}

VALUE
semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size)
semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, VALUE scale_factor)
{
semian_simple_sliding_window_t *res = get_object(self);

Expand All @@ -228,13 +260,15 @@ semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size)
res->sem_id = initialize_single_semaphore(res->key, SEM_DEFAULT_PERMISSIONS);
res->shmem = get_or_create_shared_memory(res->key, init_fn);
res->error_threshold = check_max_size_arg(max_size);
res->scale_factor = check_scale_factor_arg(scale_factor);

sem_meta_lock(res->sem_id);
{
int workers = get_number_of_registered_workers(res);
float scale_factor = (workers > 1) ? 0.2 : 1.0; // TODO: Parameterize
int error_threshold = max(res->error_threshold, (int) ceil(workers * scale_factor * res->error_threshold));
float scale = (workers > 1) ? res->scale_factor : 1.0; // TODO: Parameterize
int error_threshold = max(res->error_threshold, (int) ceil(workers * scale * res->error_threshold));

dprintf(" workers:%d scale:%0.2f error_threshold:%d", workers, scale, error_threshold);
resize_window(res->shmem, error_threshold);
}
sem_meta_unlock(res->sem_id);
Expand Down Expand Up @@ -264,6 +298,10 @@ semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size)
VALUE retval = Qnil;

int new_max_size = RB_NUM2INT(new_size);
if (new_max_size < 1) {
rb_raise(rb_eArgError, "cannot resize to %d", new_max_size);
}

sem_meta_lock(res->sem_id);
{
retval = resize_window(res->shmem, new_max_size);
Expand All @@ -274,7 +312,7 @@ semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size)
}

VALUE
semian_simple_sliding_window_max_size(VALUE self)
semian_simple_sliding_window_max_size_get(VALUE self)
{
semian_simple_sliding_window_t *res = get_object(self);
VALUE retval;
Expand All @@ -288,6 +326,26 @@ semian_simple_sliding_window_max_size(VALUE self)
return retval;
}

VALUE
semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size)
{
semian_simple_sliding_window_t *res = get_object(self);
VALUE retval;

int new_max_size = RB_NUM2INT(new_size);
if (new_max_size < 1) {
rb_raise(rb_eArgError, "max_size must be positive");
}

sem_meta_lock(res->sem_id);
{
retval = resize_window(res->shmem, new_max_size);
}
sem_meta_unlock(res->sem_id);

return retval;
}

VALUE
semian_simple_sliding_window_values(VALUE self)
{
Expand Down
5 changes: 3 additions & 2 deletions ext/semian/sliding_window.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
void Init_SlidingWindow();

VALUE semian_simple_sliding_window_alloc(VALUE klass);
VALUE semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size);
VALUE semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, VALUE scale_factor);
VALUE semian_simple_sliding_window_size(VALUE self);
VALUE semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size);
VALUE semian_simple_sliding_window_max_size(VALUE self);
VALUE semian_simple_sliding_window_max_size_get(VALUE self);
VALUE semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size);
VALUE semian_simple_sliding_window_push(VALUE self, VALUE value);
VALUE semian_simple_sliding_window_values(VALUE self);
VALUE semian_simple_sliding_window_last(VALUE self);
Expand Down
7 changes: 6 additions & 1 deletion ext/semian/sysv_semaphores.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ raise_semian_syscall_error(const char *syscall, int error_num)
void
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota)
{

res->key = generate_key(id_str);
dprintf("Initializing semaphore set for key:%lu", res->key);

res->strkey = (char*) malloc((2 /*for 0x*/+ sizeof(uint64_t) /*actual key*/+ 1 /*null*/) * sizeof(char));
sprintf(res->strkey, "0x%08x", (unsigned int) res->key);

Expand Down Expand Up @@ -60,6 +61,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis
Ensure that a worker for this process is registered.
Note that from ruby we ensure that at most one worker may be registered per process.
*/
dprintf("Registering worker for sem_id:%d", res->sem_id);
if (perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, 1, SEM_UNDO, NULL) == -1) {
rb_raise(eInternal, "error incrementing registered workers, errno: %d (%s)", errno, strerror(errno));
}
Expand Down Expand Up @@ -120,10 +122,12 @@ perform_semop(int sem_id, short index, short op, short flags, struct timespec *t
int
get_sem_val(int sem_id, int sem_index)
{
dprintf("get_sem_val(sem_id: %d, sem_index: %d)", sem_id, sem_index);
int ret = semctl(sem_id, sem_index, GETVAL);
if (ret == -1) {
rb_raise(eInternal, "error getting value of %s for sem %d, errno: %d (%s)", SEMINDEX_STRING[sem_index], sem_id, errno, strerror(errno));
}
dprintf("get_sem_val(sem_id: %d, sem_index: %d) == %d", sem_id, sem_index, ret);
return ret;
}

Expand Down Expand Up @@ -245,6 +249,7 @@ diff_timespec_ms(struct timespec *end, struct timespec *begin)
int
initialize_single_semaphore(uint64_t key, long permissions)
{
dprintf("Initializing single semaphore for key:%lu", key);
int sem_id = semget(key, 1, IPC_CREAT | IPC_EXCL | permissions);

/*
Expand Down
1 change: 1 addition & 0 deletions ext/semian/sysv_semaphores.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ initialize_single_semaphore(uint64_t key, long permissions);
static inline void
dprint_sem_vals(int sem_id)
{
dprintf("dprintf(%d)", sem_id);
dprintf("sem_id: %d, lock: %d, tickets: %d configured: %d, registered workers %d",
sem_id,
get_sem_val(sem_id, SI_SEM_LOCK),
Expand Down
1 change: 1 addition & 0 deletions ext/semian/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef struct {
int sem_id;
uint64_t parent_key;
int error_threshold;
float scale_factor;
semian_simple_sliding_window_shared_t* shmem;
} semian_simple_sliding_window_t;

Expand Down
1 change: 1 addition & 0 deletions lib/semian.rb
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def create_circuit_breaker(name, **options)
exceptions: Array(exceptions) + [::Semian::BaseError],
half_open_resource_timeout: options[:half_open_resource_timeout],
implementation: implementation(**options),
scale_factor: options[:scale_factor],
)
end

Expand Down
9 changes: 5 additions & 4 deletions lib/semian/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ class CircuitBreaker #:nodoc:
attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error

def initialize(name, exceptions:, success_threshold:, error_threshold:,
error_timeout:, implementation:, half_open_resource_timeout: nil)
@name = name.to_sym
initialize_circuit_breaker(name, error_threshold)
error_timeout:, implementation:, half_open_resource_timeout: nil, scale_factor: nil)
initialize_circuit_breaker(name, error_threshold) if respond_to?(:initialize_circuit_breaker)

@name = name.to_sym
@success_count_threshold = success_threshold
@error_count_threshold = error_threshold
@scale_factor = scale_factor
@error_timeout = error_timeout
@exceptions = exceptions
@half_open_resource_timeout = half_open_resource_timeout

@errors = implementation::SlidingWindow.new(name, max_size: @error_count_threshold)
@errors = implementation::SlidingWindow.new(name, max_size: @error_count_threshold, scale_factor: @scale_factor)
@successes = implementation::Integer.new("#{name}_successes")
state_val = implementation::Integer.new("#{name}_state")
@state = implementation::State.new(state_val)
Expand Down
12 changes: 10 additions & 2 deletions lib/semian/simple_sliding_window.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ class SlidingWindow #:nodoc:
# like this: if @max_size = 4, current time is 10, @window =[5,7,9,10].
# Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5.

def initialize(name, max_size:)
initialize_sliding_window(name, max_size)
def initialize(name, max_size:, scale_factor: nil)
initialize_sliding_window(name, max_size, scale_factor) if respond_to?(:initialize_sliding_window)

@name = name.to_sym
@max_size = max_size
@window = []
end
Expand Down Expand Up @@ -44,6 +46,12 @@ def clear
end
alias_method :destroy, :clear

def max_size=(value)
raise ArgumentError, "max_size must be positive" if value <= 0
@max_size = value
resize_to(value)
end

private

def resize_to(size)
Expand Down
2 changes: 1 addition & 1 deletion test/adapter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def test_adapter_registers_consumer
end

def test_unregister
skip if ENV["SKIP_FLAKY_TESTS"]
skip if flaky
client = Semian::AdapterTestClient.new(quota: 0.5)
assert_nil(Semian.resources[:testing])
resource = Semian.register(:testing, tickets: 2, error_threshold: 1, error_timeout: 0, success_threshold: 0)
Expand Down
5 changes: 4 additions & 1 deletion test/circuit_breaker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def test_acquire_yield_when_the_circuit_is_closed
def test_acquire_raises_circuit_open_error_when_the_circuit_is_open
open_circuit!
assert_raises Semian::OpenCircuitError do
puts "test: Acquiring resource #{@resource.circuit_breaker}"
@resource.acquire { 1 + 1 }
end
assert_match(/State transition from closed to open/, @strio.string)
Expand Down Expand Up @@ -150,6 +149,10 @@ def test_semian_wide_env_var_disables_circuit_breaker
end

class RawResource
def initialize
@timeout = 2
end

def timeout
@timeout || 2
end
Expand Down
4 changes: 2 additions & 2 deletions test/grpc_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_unavailable_server_opens_the_circuit
end

def test_timeout_opens_the_circuit
skip if ENV["SKIP_FLAKY_TESTS"]
skip if flaky
stub = build_insecure_stub(EchoStub, host: "#{SemianConfig['toxiproxy_upstream_host']}:#{SemianConfig['grpc_toxiproxy_port']}", opts: {timeout: 0.1})
run_services_on_server(@server, services: [EchoService]) do
Toxiproxy['semian_test_grpc'].downstream(:latency, latency: 1000).apply do
Expand All @@ -86,7 +86,7 @@ def test_timeout_opens_the_circuit
def test_instrumentation
notified = false
subscriber = Semian.subscribe do |event, resource, scope, adapter|
next if event != :success
next unless event == :success

notified = true
assert_equal Semian[@host], resource
Expand Down
1 change: 0 additions & 1 deletion test/helpers/circuit_breaker_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def half_open_cicuit!(resource = @resource, backwards_time_travel = 10)

def trigger_error!(resource = @resource, error = SomeError)
resource.acquire do
puts "Triggering error"
raise error
end
rescue error
Expand Down
4 changes: 3 additions & 1 deletion test/lru_hash_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ def test_clean_instrumentation

notified = false
subscriber = Semian.subscribe do |event, resource, scope, adapter, payload|
next unless event == :lru_hash_gc

notified = true
assert_equal :lru_hash_gc, event
assert_equal @lru_hash, resource
assert_nil scope
assert_nil adapter
Expand Down Expand Up @@ -234,6 +235,7 @@ def create_circuit_breaker(name, exceptions = true, bulkhead = false, error_time
exceptions: [::Semian::BaseError],
half_open_resource_timeout: nil,
implementation: implementation,
scale_factor: 1.0,
)
circuit_breaker.mark_failed(nil) if exceptions
Semian::ProtectedResource.new(name, create_bulkhead(name, bulkhead), circuit_breaker)
Expand Down
2 changes: 1 addition & 1 deletion test/net_http_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def test_5xxs_trip_circuit_when_fatal_server_flag_enabled
end

def test_5xxs_dont_raise_exceptions_unless_fatal_server_flag_enabled
skip if ENV["SKIP_FLAKY_TESTS"]
skip if flaky
with_semian_configuration do
with_server do
http = Net::HTTP.new(SemianConfig['http_host'], SemianConfig['http_port_service_a'])
Expand Down
Loading

0 comments on commit d16c6ab

Please sign in to comment.