Skip to content

Commit

Permalink
chore(refactor): improve requester internals (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-app[bot] authored and stainless-ci-bot committed Feb 26, 2025
1 parent 37ee661 commit cf4388f
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 122 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ Style/MutableConstant:
Style/NumberedParameters:
Enabled: false

Style/NumberedParametersLimit:
Max: 2

# Reasonable to use brackets for errors with long messages.
Style/RaiseArgs:
Enabled: false
Expand Down
18 changes: 11 additions & 7 deletions lib/modern_treasury/base_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ def should_retry?(status, headers:)
#
# @option request [Object] :body
#
# @option request [Boolean] :streaming
#
# @option request [Integer] :max_retries
#
# @option request [Float] :timeout
Expand Down Expand Up @@ -268,7 +266,6 @@ def initialize(
url: ModernTreasury::Util.join_parsed_uri(@base_url, {**req, path: path, query: query}),
headers: headers,
body: encoded,
streaming: false,
max_retries: opts.fetch(:max_retries, @max_retries),
timeout: timeout
}
Expand Down Expand Up @@ -311,8 +308,6 @@ def initialize(
#
# @option request [Object] :body
#
# @option request [Boolean] :streaming
#
# @option request [Integer] :max_retries
#
# @option request [Float] :timeout
Expand Down Expand Up @@ -341,14 +336,22 @@ def initialize(
status = e
end

# normally we want to drain the response body and reuse the HTTP session by clearing the socket buffers
# unless we hit a server error
srv_fault = (500...).include?(status)

case status
in ..299
[response, stream]
in 300..399 if redirect_count >= self.class::MAX_REDIRECTS
message = "Failed to complete the request within #{self.class::MAX_REDIRECTS} redirects."

stream.each { next }
raise ModernTreasury::APIConnectionError.new(url: url, message: message)
in 300..399
request = self.class.follow_redirect(request, status: status, response_headers: response)

stream.each { next }
send_request(
request,
redirect_count: redirect_count + 1,
Expand All @@ -363,6 +366,7 @@ def initialize(
))
decoded = ModernTreasury::Util.decode_content(response, stream: stream, suppress_error: true)

stream.each { srv_fault ? break : next }
raise ModernTreasury::APIStatusError.for(
url: url,
status: status,
Expand All @@ -372,6 +376,8 @@ def initialize(
)
in (400..) | ModernTreasury::APIConnectionError
delay = retry_delay(response, retry_count: retry_count)

stream&.each { srv_fault ? break : next }
sleep(delay)

send_request(
Expand All @@ -381,8 +387,6 @@ def initialize(
send_retry_header: send_retry_header
)
end
ensure
stream&.each { break } unless status.is_a?(Integer) && status < 300
end

# @private
Expand Down
2 changes: 0 additions & 2 deletions lib/modern_treasury/base_model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,7 @@ def self.coerce(value)
end
end

# rubocop:disable Style/NumberedParametersLimit
_, variant = matches.sort! { _2.first <=> _1.first }.find { |score,| !score.zero? }
# rubocop:enable Style/NumberedParametersLimit
variant.nil? ? value : ModernTreasury::Converter.coerce(variant, value)
end

Expand Down
137 changes: 72 additions & 65 deletions lib/modern_treasury/pooled_net_requester.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,50 +33,66 @@ def connect(url)
# @param conn [Net::HTTP]
# @param deadline [Float]
#
private def calibrate_socket_timeout(conn, deadline)
def calibrate_socket_timeout(conn, deadline)
timeout = deadline - ModernTreasury::Util.monotonic_secs
(conn.open_timeout = timeout) unless conn.started?
conn.read_timeout = conn.write_timeout = conn.continue_timeout = timeout
end

# @private
#
# @param conn [Net::HTTP]
# @param req [Net::HTTPGenericRequest]
# @param deadline [Float]
# @param blk [Proc]
# @param request [Hash{Symbol=>Object}] .
#
# @option request [Symbol] :method
#
# @option request [URI::Generic] :url
#
# @option request [Hash{String=>String}] :headers
#
def transport(conn, req, deadline, &blk)
unless conn.started?
conn.open_timeout = deadline - ModernTreasury::Util.monotonic_secs
conn.start
# @return [Net::HTTPGenericRequest]
#
def build_request(request)
method, url, headers, body = request.fetch_values(:method, :url, :headers, :body)
req = Net::HTTPGenericRequest.new(
method.to_s.upcase,
!body.nil?,
method != :head,
url.to_s
)

headers.each { req[_1] = _2 }

case body
in nil

Check warning on line 66 in lib/modern_treasury/pooled_net_requester.rb

View workflow job for this annotation

GitHub Actions / lint

Lint/EmptyInPattern: Avoid `in` branches without a body.
in String
req.body = body
in StringIO
req.body = body.string
in IO
body.rewind
req.body_stream = body
end

calibrate_socket_timeout(conn, deadline)
conn.request(req) do |rsp|
blk.call(rsp)
rsp.read_body do |bytes|
blk.call(bytes)
calibrate_socket_timeout(conn, deadline)
end
end
req
end
end

# @private
#
# @param url [URI::Generic]
# @param streaming [Boolean]
# @param blk [Proc]
#
private def with_pool(url, streaming:, &blk)
private def with_pool(url, &blk)
origin = ModernTreasury::Util.uri_origin(url)
th = Thread.current
key = :"#{object_id}-#{self.class.name}-connection_in_use_for_#{origin}"

if th[key] || streaming
return Enumerator.new do |y|
if th[key]
tap do
conn = self.class.connect(url)
blk.call(conn, y)
return blk.call(conn)
ensure
conn.finish if conn&.started?
end
end

Expand All @@ -87,20 +103,11 @@ def transport(conn, req, deadline, &blk)
end
end

Enumerator.new do |y|
pool.with do |conn|
th[key] = true

blk.call(conn, y)
# rubocop:disable Lint/RescueException
rescue Exception => e
# rubocop:enable Lint/RescueException
# should close connection on all errors to ensure no invalid state persists
conn.finish if conn.started?
raise e
ensure
th[key] = nil
end
pool.with do |conn|
th[key] = true
blk.call(conn)
ensure
th[key] = nil
end
end

Expand All @@ -116,43 +123,43 @@ def transport(conn, req, deadline, &blk)
#
# @option request [Object] :body
#
# @option request [Boolean] :streaming
#
# @option request [Integer] :max_retries
#
# @option request [Float] :deadline
#
# @return [Array(Net::HTTPResponse, Enumerable)]
#
def execute(request)
method, url, headers, body, deadline = request.fetch_values(:method, :url, :headers, :body, :deadline)
streaming = request.fetch(:streaming)

req = Net::HTTPGenericRequest.new(
method.to_s.upcase,
!body.nil?,
method != :head,
url.to_s
)

headers.each { req[_1] = _2 }
case body
in nil
in String
req.body = body
in IO | StringIO
body.rewind
req.body_stream = body
url, deadline = request.fetch_values(:url, :deadline)
req = self.class.build_request(request)

eof = false
enum = Enumerator.new do |y|
with_pool(url) do |conn|
conn.start unless conn.started?
self.class.calibrate_socket_timeout(conn, deadline)

conn.request(req) do |rsp|
y << [conn, rsp]
rsp.read_body do |bytes|
y << bytes
self.class.calibrate_socket_timeout(conn, deadline)
end
eof = true
end
end
end

enum =
with_pool(url, streaming: streaming) do |conn, y|
self.class.transport(conn, req, deadline, &y)
end
# need to protect the `Enumerator` against `#.rewind`
fused = false
conn, response = enum.next
body = Enumerator.new do |y|
next if fused

enum = streaming ? enum.lazy : enum.to_a
response = enum.take(1).first
[response, (response.body = enum.drop(1))]
fused = true
loop { y << enum.next }
ensure
conn.finish if !eof && conn.started?
end
[response, (response.body = body)]
end

def initialize
Expand Down
6 changes: 3 additions & 3 deletions lib/modern_treasury/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -454,18 +454,18 @@ def self.encode_content(headers, body)
def self.decode_content(headers, stream:, suppress_error: false)
case headers["content-type"]
in %r{^application/json}
json = stream.to_a.join("")
json = stream.to_a.join
begin
JSON.parse(json, symbolize_names: true)
rescue JSON::ParserError => e
raise e unless suppress_error
json
end
in %r{^text/}
stream.to_a.join("")
stream.to_a.join
else
# TODO: parsing other response types
StringIO.new(stream.to_a.join(""))
StringIO.new(stream.to_a.join)
end
end

Expand Down
1 change: 0 additions & 1 deletion rbi/lib/modern_treasury/base_client.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ module ModernTreasury
url: URI::Generic,
headers: T::Hash[String, String],
body: T.anything,
streaming: T::Boolean,
max_retries: Integer,
timeout: Float
}
Expand Down
35 changes: 7 additions & 28 deletions rbi/lib/modern_treasury/pooled_net_requester.rbi
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,25 @@
module ModernTreasury
class PooledNetRequester
RequestShape = T.type_alias do
{
method: Symbol,
url: URI::Generic,
headers: T::Hash[String, String],
body: T.anything,
streaming: T::Boolean,
max_retries: Integer,
deadline: Float
}
{method: Symbol, url: URI::Generic, headers: T::Hash[String, String], body: T.anything, deadline: Float}
end

sig { params(url: URI::Generic).returns(Net::HTTP) }
def self.connect(url)
end

sig { params(conn: Net::HTTP, deadline: Float).void }
private_class_method def self.calibrate_socket_timeout(conn, deadline)
def self.calibrate_socket_timeout(conn, deadline)
end

sig do
params(
conn: Net::HTTP,
req: Net::HTTPGenericRequest,
deadline: Float,
blk: T.proc.params(arg0: T.any(Net::HTTPGenericRequest, String)).void
)
.void
end
def self.transport(conn, req, deadline, &blk)
sig { params(request: ModernTreasury::PooledNetRequester::RequestShape).returns(Net::HTTPGenericRequest) }
def self.build_request(request)
end

sig do
params(
url: URI::Generic,
streaming: T::Boolean,
blk: T.proc.params(arg0: Net::HTTP, arg1: Enumerator::Yielder).void
)
.void
end
private def with_pool(url, streaming:, &blk)
params(url: URI::Generic, blk: T.proc.params(arg0: Net::HTTP, arg1: Enumerator::Yielder).void).void
end
private def with_pool(url, &blk)
end

sig do
Expand Down
1 change: 0 additions & 1 deletion sig/modern_treasury/base_client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module ModernTreasury
url: URI::Generic,
headers: ::Hash[String, String],
body: top,
streaming: bool,
max_retries: Integer,
timeout: Float
}
Expand Down
Loading

0 comments on commit cf4388f

Please sign in to comment.