Skip to content

Commit

Permalink
Use correct procpid or pid in unlock, based on postgres version.
Browse files Browse the repository at this point in the history
Closes QueueClassic#261.

In postgres 9.2, `procpid` was changed to `pid`.

Earlier versions should use `procpid`, this change fixes the issue
where this query would return an error in versions prior to postgres
9.2.
  • Loading branch information
joepestro authored and senny committed Jan 15, 2016
1 parent 4824eb4 commit 19e99f3
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Unreleased
- Fixed a bug in the offset calculation of `.enqueue_at`.
- Use the jsonb type for the args column from now on. If not available, fall back to json or text.
- `enqueue`, `enqueue_at`, `enqueue_in` return job hash with id.
- Fixed unlock query for versions below Postgres 9.2

Version 3.0.0rc
- Improved signal handling
Expand Down
3 changes: 2 additions & 1 deletion lib/queue_classic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def self.measure(data)
# This will unlock all jobs any postgres' PID that is not existing anymore
# to prevent any infinitely locked jobs
def self.unlock_jobs_of_dead_workers
default_conn_adapter.execute("UPDATE #{QC.table_name} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT pid FROM pg_stat_activity);")
pid_column = default_conn_adapter.server_version < 90200 ? "procpid" : "pid"
default_conn_adapter.execute("UPDATE #{QC.table_name} SET locked_at = NULL, locked_by = NULL WHERE locked_by NOT IN (SELECT #{pid_column} FROM pg_stat_activity);")
end

# private class methods
Expand Down
7 changes: 7 additions & 0 deletions lib/queue_classic/conn_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ def disconnect
end
end

def server_version
@server_version ||= begin
version = execute("SHOW server_version_num;")["server_version_num"]
version && version.to_i
end
end

private

def wait_for_notify(t)
Expand Down

0 comments on commit 19e99f3

Please sign in to comment.