Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix use of Threads.nthreads and Threads.threadid #164

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions src/util/obbt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function _solve_bound_problem!(wm::AbstractWaterModel, bound_problem::BoundProbl
# Optimize the variable (or affine expression) being tightened.
JuMP.@objective(wm.model, bound_problem.sense, v)
JuMP.optimize!(wm.model)
termination_status = JuMP.termination_status(wm.model)
termination_status = JuMP.termination_status(wm.model)

if ismultinetwork(wm)
JuMP.set_binary.(vars_relaxed)
Expand Down Expand Up @@ -229,7 +229,7 @@ function _log_bound_widths(data::Dict{String,<:Any})
wm_data = get_wm_data(data)

message = ""
message *= _log_node_bound_width(wm_data, "node")
message *= _log_node_bound_width(wm_data, "node")
message *= _log_flow_bound_width(wm_data, "pipe")
message *= _log_flow_bound_width(wm_data, "short_pipe")
message *= _log_flow_bound_width(wm_data, "valve")
Expand Down Expand Up @@ -302,22 +302,29 @@ function solve_obbt_owf!(
_check_obbt_options(upper_bound, upper_bound_constraint)

# Instantiate the bound tightening model and relax integrality, if requested.
wms = [instantiate_model(data, model_type, build_type) for i in 1:Threads.nthreads()]

if upper_bound_constraint
map(x -> _constraint_obj_bound(x, upper_bound), wms)
end

if solve_relaxed
# Relax the binary variables if requested.
map(x -> relax_all_binary_variables!(x), wms)
# There is a channel with one element for each worker thread.
# `num_models` doesn't have to be `nthreads` here, it could be smaller, but
# then there will be some lock contention as each thread fights for a model
# to use.
num_models = Threads.nthreads()
wms_channel = Channel{Any}(num_models)
for i in 1:num_models
model = instantiate_model(data, model_type, build_type)
if upper_bound_constraint
_constraint_obj_bound(model, upper_bound)
end
if solve_relaxed # Relax the binary variables if requested.
relax_all_binary_variables!(model)
end
# Set the optimizer for the bound tightening model.
JuMP.set_optimizer(model.model, optimizer)
put!(wms_channel, model)
end

# Set the optimizer for the bound tightening model.
map(x -> JuMP.set_optimizer(x.model, optimizer), wms)

# Collect all problems.
bound_problems = _get_bound_problems(wms[1]; limit = limit_problems)
wm = take!(wms_channel)
bound_problems = _get_bound_problems(wm; limit = limit_problems)
put!(wms_channel, wm)
_update_data_bounds!(data, bound_problems) # Populate data with bounds.

# Log widths.
Expand All @@ -334,8 +341,13 @@ function solve_obbt_owf!(
vals = zeros(length(bound_problems))

time_elapsed += @elapsed Threads.@threads for i in 1:length(bound_problems)
vals[i] = _solve_bound_problem!(wms[Threads.threadid()], bound_problems[i])
# To ensure each thread in this loop operates on a different model,
# we have then take a model from the channel, solve it, and then
# return it to the channel for future use.
wm = take!(wms_channel)
vals[i] = _solve_bound_problem!(wm, bound_problems[i])
_set_new_bound!(bound_problems[i], vals[i])
put!(wms_channel, wm)
end

time_elapsed > time_limit && ((terminate = true) && break)
Expand All @@ -351,21 +363,26 @@ function solve_obbt_owf!(
# Update algorithm metadata.
current_iteration += 1

# Set up the next optimization problem using the new bounds.
wms = [instantiate_model(data, model_type, build_type) for i in 1:Threads.nthreads()]

if upper_bound_constraint
map(x -> _constraint_obj_bound(x, upper_bound), wms)
# Set up the next optimization problem using the new bounds. We first
# need to empty the channel of the current models, and then repopulate
# with a set of new models.
for i in 1:num_models
take!(wms_channel)
end

if solve_relaxed
# Relax the binary variables if requested.
map(x -> relax_all_binary_variables!(x), wms)
for i in 1:num_models
model = instantiate_model(data, model_type, build_type)
if upper_bound_constraint
_constraint_obj_bound(model, upper_bound)
end
if solve_relaxed
# Relax the binary variables if requested.
relax_all_binary_variables!(model)
end
# Set the optimizer for the bound tightening model.
JuMP.set_optimizer(model.model, optimizer)
put!(wms_channel, model)
end

# Set the optimizer for the bound tightening model.
map(x -> JuMP.set_optimizer(x.model, optimizer), wms)

# Set the termination variable if max iterations is exceeded.
current_iteration >= max_iter && (terminate = true)
end
Expand Down
Loading