Skip to content

Commit

Permalink
Remove queue setting from config and move exclusively to command line…
Browse files Browse the repository at this point in the history
… arg
  • Loading branch information
knadh committed Sep 25, 2018
1 parent c3fe7fb commit 4d5878a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ Often times, different queries have different priorities of execution. Some may
```shell
# Run the primary worker + HTTP control interface
sql-jobber --config /path/to/config.toml --sql-directory /path/to/sql/dir \
--queue-name "high_priority" \
--queue "high_priority" \
--worker-name "high_priority_worker" \
--worker-concurrency 30

# Run another worker on a different queue to handle low priority jobs
sql-jobber --config /path/to/config.toml --sql-directory /path/to/sql/dir \
--queue-name "low_priority" \
--queue "low_priority" \
--worker-name "low_priority_worker" \
--worker-concurrency 5 \
--worker-only
Expand Down
3 changes: 0 additions & 3 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ broker_address = "redis://127.0.0.1:6379/1"
# where SQL Jobber stores results of jobs, bypassing machinery.
state_address = "redis://127.0.0.1:6379/1"

# The queue from which jobs should be proessed.
queue = "sqljob_queue"

[results]
# These are the result backends where the results of various SQL query jobs are saved.
# There can be more than one backends defined here, for eg: [results.my1], [db.my2] ...
Expand Down
23 changes: 14 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,15 @@ func init() {
viper.SetDefault("config", "config.toml")
viper.SetDefault("server", ":6060")
viper.SetDefault("sql-directory", "./sql")
viper.SetDefault("queue", "sqljob_queue")
viper.SetDefault("worker-name", "sqljob")
viper.SetDefault("worker-name", "sqljobber")
viper.SetDefault("worker-concurrency", 10)
viper.SetDefault("worker-only", false)

flagSet.String("config", "config.toml", "Path to the TOML configuration file")
flagSet.String("server", "127.0.0.1:6060", "Web server address")
flagSet.String("sql-directory", "./sql", "Path to the directory with .sql scripts")
flagSet.String("queue", "sqljob_queue", "Name of the job queue to accept jobs from")
flagSet.String("worker-name", "sqljob", "Name of this worker instance")
flagSet.String("queue", "default_queue", "Name of the job queue to accept jobs from")
flagSet.String("worker-name", "sqljobber", "Name of this worker instance")
flagSet.Int("worker-concurrency", 10, "Number of concurrent worker threads to run")
flagSet.Bool("worker-only", false, "Don't start the HTTP server and run in worker-only mode?")
flagSet.Bool("version", false, "Current version of the build")
Expand All @@ -117,7 +116,15 @@ func main() {
sysLog.Printf("commit: %v\nBuild: %v", buildVersion, buildDate)
return
}
sysLog.Printf("starting server %s", viper.GetString("worker-name"))

mode := "default"
if viper.GetBool("worker-only") {
mode = "worker only"
}
sysLog.Printf("starting server %s (queue = %s) in %s mode",
viper.GetString("worker-name"),
viper.GetString("queue"),
mode)

// Source and result backend DBs.
var (
Expand Down Expand Up @@ -170,7 +177,7 @@ func main() {
// Parse and load SQL queries.
sysLog.Printf("loading SQL queries from %s", viper.GetString("sql-directory"))
if jobber.Tasks, err = loadSQLTasks(viper.GetString("sql-directory"),
jobber.DBs, jobber.ResultBackends, viper.GetString("machinery.queue")); err != nil {
jobber.DBs, jobber.ResultBackends, viper.GetString("queue")); err != nil {
sysLog.Fatal(err)
}
sysLog.Printf("loaded %d SQL queries", len(jobber.Tasks))
Expand All @@ -196,7 +203,7 @@ func main() {
// Setup the job server.
jobber.Machinery, err = connectJobServer(jobber, &config.Config{
Broker: viper.GetString("machinery.broker_address"),
DefaultQueue: viper.GetString("machinery.queue"),
DefaultQueue: viper.GetString("queue"),
ResultBackend: viper.GetString("machinery.state_address"),
ResultsExpireIn: viper.GetInt("result_backend.results_ttl"),
}, jobber.Tasks)
Expand All @@ -211,8 +218,6 @@ func main() {
sysLog.Println(http.ListenAndServe(viper.GetString("server"), r))
os.Exit(0)
}()
} else {
sysLog.Printf("worker-only mode (no HTTP server)")
}

jobber.Worker = jobber.Machinery.NewWorker(viper.GetString("worker-name"),
Expand Down

0 comments on commit 4d5878a

Please sign in to comment.