Skip to content

Commit

Permalink
Filter messages in sub and tap command (#100)
Browse files Browse the repository at this point in the history
add --filter to sub,tap commands
message filtering support for cmd and tapcommands: Both commands can now optionally filter messages using a
filter-expression, which can be applied to the message meta-data (aka properties) and the message body.
  • Loading branch information
jandelgado authored Aug 28, 2024
1 parent 5f06ad0 commit 8219fa6
Show file tree
Hide file tree
Showing 21 changed files with 933 additions and 446 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog for rabtap

## v1.41 (2024-08-27)

* new: `--filter=FILTER` option for `tap` and `sub` commands to filter output
of received messages, e.g. `rabtap sub JDQ --filter="r.msg.RoutingKey == 'test'"`
* breaking change: the `binding`, `queue`, `exchange`, `connection` and
`channel` variables available in expressions of `rabtap info --filter=FILTER`
are now all prefixed with `r.` and are thus now to be referenced as `r.binding`,
`r.queue` etc.

## v1.40 (2024-08-20)

* govaluate not being maintained since 2017, we switch to
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ lint: phony
golangci-lint run

short-test: phony
go test -v -race github.com/jandelgado/rabtap/cmd/rabtap
go test -v -race github.com/jandelgado/rabtap/pkg
go test -v $(TESTOPTS) -race github.com/jandelgado/rabtap/cmd/rabtap
go test -v $(TESTOPTS) -race github.com/jandelgado/rabtap/pkg

test-app: phony
go test -race -v -tags "integration" -cover -coverprofile=coverage_app.out github.com/jandelgado/rabtap/cmd/rabtap
go test -race -v -tags "integration" $(TESTOPTS) -cover -coverprofile=coverage_app.out github.com/jandelgado/rabtap/cmd/rabtap

test-lib: phony
go test -race -v -tags "integration" -cover -coverprofile=coverage.out github.com/jandelgado/rabtap/pkg
go test -race -v -tags "integration" $(TESTOPTS) -cover -coverprofile=coverage.out github.com/jandelgado/rabtap/pkg

test: test-app test-lib
grep -v "^mode:" coverage_app.out >> coverage.out
Expand Down
655 changes: 464 additions & 191 deletions README.md

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions cmd/rabtap/cmd_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCmdInfoByExchangeInTextFormatProducesExpectedTree(t *testing.T) {
Mode: "byExchange",
ShowConsumers: true,
ShowDefaultExchange: false,
Filter: TruePredicate,
Filter: constantPred{true},
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{
Format: "text",
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestCmdInfoByConnectionInTextFormatProducesExpectedTree(t *testing.T) {
Mode: "byConnection",
ShowConsumers: true,
ShowDefaultExchange: false,
Filter: TruePredicate,
Filter: constantPred{true},
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{
Format: "text",
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestCmdInfoByExchangeInDotFormat(t *testing.T) {
Mode: "byExchange",
ShowConsumers: false,
ShowDefaultExchange: false,
Filter: TruePredicate,
Filter: constantPred{true},
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{Format: "dot"},
out: &actual})
Expand All @@ -258,7 +258,7 @@ func TestCmdInfoByConnectionInDotFormat(t *testing.T) {
Mode: "byConnection",
ShowConsumers: false,
ShowDefaultExchange: false,
Filter: TruePredicate,
Filter: constantPred{true},
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{Format: "dot"},
out: &actual})
Expand Down
23 changes: 13 additions & 10 deletions cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ import (

// CmdSubscribeArg contains arguments for the subscribe command
type CmdSubscribeArg struct {
amqpURL *url.URL
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
messageReceiveLoopPred MessageReceiveLoopPred
reject bool
requeue bool
args rabtap.KeyValueMap
timeout time.Duration
amqpURL *url.URL
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
termPred Predicate
filterPred Predicate
reject bool
requeue bool
args rabtap.KeyValueMap
timeout time.Duration
}

// cmdSub subscribes to messages from the given queue
Expand All @@ -43,12 +44,14 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error {
errorChannel := make(rabtap.SubscribeErrorChannel)
g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel, errorChannel) })
g.Go(func() error {

acknowledger := createAcknowledgeFunc(cmd.reject, cmd.requeue)
err := messageReceiveLoop(ctx,
messageChannel,
errorChannel,
cmd.messageReceiveFunc,
cmd.messageReceiveLoopPred,
cmd.filterPred,
cmd.termPred,
acknowledger,
cmd.timeout)
cancel()
Expand Down
32 changes: 14 additions & 18 deletions cmd/rabtap/cmd_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"io"
"net/url"
"os"
"syscall"
"testing"
"time"

Expand All @@ -33,12 +32,12 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) {
go func() {
// we expect cmdSubscribe to return
cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: amqpURL,
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
messageReceiveLoopPred: func(rabtap.TapMessage) bool { return true },
timeout: time.Second * 10,
amqpURL: amqpURL,
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
termPred: &constantPred{false},
timeout: time.Second * 10,
})
done <- true
}()
Expand Down Expand Up @@ -79,12 +78,13 @@ func TestCmdSub(t *testing.T) {

// subscribe to testQueue
go cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: amqpURL,
queue: testQueue,
tlsConfig: tlsConfig,
messageReceiveFunc: receiveFunc,
messageReceiveLoopPred: func(rabtap.TapMessage) bool { return true },
timeout: time.Second * 10,
amqpURL: amqpURL,
queue: testQueue,
tlsConfig: tlsConfig,
messageReceiveFunc: receiveFunc,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10,
})

time.Sleep(time.Second * 1)
Expand Down Expand Up @@ -149,16 +149,12 @@ func TestCmdSubIntegration(t *testing.T) {
})
require.Nil(t, err)

go func() {
time.Sleep(time.Second * 2)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"rabtap", "sub",
"--uri", amqpURL.String(),
testQueue,
"--limit=1",
"--format=raw",
"--no-color"}
output := testcommon.CaptureOutput(main)
Expand Down
6 changes: 4 additions & 2 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type CmdTapArg struct {
tapConfig []rabtap.TapConfiguration
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
pred MessageReceiveLoopPred
termPred Predicate
filterPred Predicate
timeout time.Duration
}

Expand Down Expand Up @@ -47,7 +48,8 @@ func cmdTap(
tapMessageChannel,
errorChannel,
cmd.messageReceiveFunc,
cmd.pred,
cmd.filterPred,
cmd.termPred,
acknowledger,
cmd.timeout)
cancel()
Expand Down
12 changes: 5 additions & 7 deletions cmd/rabtap/cmd_tap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"crypto/tls"
"os"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -45,13 +44,12 @@ func TestCmdTap(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

pred := func(rabtap.TapMessage) bool { return true }

// when
go cmdTap(ctx, CmdTapArg{tapConfig: tapConfig,
tlsConfig: &tls.Config{},
messageReceiveFunc: receiveFunc,
pred: pred,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10})

time.Sleep(time.Second * 1)
Expand Down Expand Up @@ -82,8 +80,9 @@ func TestCmdTapIntegration(t *testing.T) {
testKey := testQueue
testExchange := "amq.topic"

// message must be published, after rabtap tap command is started
go func() {
time.Sleep(time.Second * 1)
time.Sleep(3 * time.Second)
_, ch := testcommon.IntegrationTestConnection(t, "", "", 0, false)
err := ch.Publish(
testExchange,
Expand All @@ -97,15 +96,14 @@ func TestCmdTapIntegration(t *testing.T) {
Headers: amqp.Table{},
})
require.Nil(t, err)
time.Sleep(time.Second * 1)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"rabtap", "tap",
"--uri", testcommon.IntegrationURIFromEnv().String(),
"amq.topic:" + testKey,
"--limit=1",
"--format=raw",
"--no-color"}
output := testcommon.CaptureOutput(main)
Expand Down
24 changes: 14 additions & 10 deletions cmd/rabtap/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ Usage:
rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty]
[--show-default] [--mode=MODE] [--format=FORMAT] [-kncv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR]
[--format=FORMAT] [--limit=NUM] [--idle-timeout=DURATION] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR]
[--format=FORMAT] [--limit=NUM] [--idle-timeout=DURATION] [-jkncsv]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jkcsvn]
[--idle-timeout=DURATION]
[--filter=EXPR] [--idle-timeout=DURATION]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=KV)...]
Expand Down Expand Up @@ -99,7 +99,7 @@ Arguments and options:
-d, --durable create durable exchange/queue.
--exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will
be taken from message being published (see JSON message format).
--filter=EXPR Predicate for info command to filter queues [default: true]
--filter=EXPR Predicate for sub, tap, info command to filter the output [default: true]
--format=FORMAT * for tap, pub, sub command: format to write/read messages to console
and optionally to file (when --saveto DIR is given).
Valid options are: "raw", "json", "json-nopp". Default: raw
Expand Down Expand Up @@ -158,12 +158,14 @@ Examples:
rabtap queue bind JDQ to amq.topic --bindingkey=key
echo "Hello" | rabtap pub --exchange amq.topic --routingkey "key"
rabtap sub JDQ
# print only messages that have ".Name == 'JAN'" in their JSON payload
rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'"
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
rabtap info
rabtap info --filter "binding.Source == 'amq.topic'" --omit-empty
rabtap info --filter "r.binding.Source == 'amq.topic'" --omit-empty
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
# use RABTAP_TLS_CERTFILE | RABTAP_TLS_KEYFILE | RABTAP_TLS_CAFILE environments variables
Expand Down Expand Up @@ -280,9 +282,9 @@ type CommandLineArgs struct {
ShowConsumers bool // info: also show consumer
InfoMode string // info: byExchange, byConnection
ShowStats bool // info: also show statistics
QueueFilter string // info: optional filter predicate
OmitEmptyExchanges bool // info: do not show exchanges wo/ bindings
ShowDefaultExchange bool // info: show default exchange
Filter string // sub/tap/info: optional filter predicate
Format string // output format, depends on command
Durable bool // queue create, exchange create
Autodelete bool // queue create, exchange create
Expand Down Expand Up @@ -361,7 +363,7 @@ func parseInfoCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result := CommandLineArgs{
Cmd: InfoCmd,
commonArgs: parseCommonArgs(args),
QueueFilter: args["--filter"].(string),
Filter: args["--filter"].(string),
OmitEmptyExchanges: args["--omit-empty"].(bool),
ShowConsumers: args["--consumers"].(bool),
ShowStats: args["--stats"].(bool),
Expand Down Expand Up @@ -431,6 +433,7 @@ func parseSubCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
Reject: args["--reject"].(bool),
Requeue: args["--requeue"].(bool),
QueueName: args["QUEUE"].(string),
Filter: args["--filter"].(string),
Silent: args["--silent"].(bool),
IdleTimeout: time.Duration(math.MaxInt64),
}
Expand Down Expand Up @@ -640,6 +643,7 @@ func parseTapCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result := CommandLineArgs{
Cmd: TapCmd,
commonArgs: parseCommonArgs(args),
Filter: args["--filter"].(string),
Silent: args["--silent"].(bool),
TapConfig: []rabtap.TapConfiguration{},
IdleTimeout: time.Duration(math.MaxInt64)}
Expand Down
10 changes: 6 additions & 4 deletions cmd/rabtap/command_line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestCliAllOptsInTapCommandiAreRecognized(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"tap", "--uri=uri", "exchange:binding", "--silent", "--verbose",
"--format=json-nopp", "--insecure", "--saveto", "savedir", "--limit", "123",
"--idle-timeout=10s"})
"--idle-timeout=10s", "--filter=filter"})

assert.Nil(t, err)
assert.Equal(t, 1, len(args.TapConfig))
Expand All @@ -253,6 +253,7 @@ func TestCliAllOptsInTapCommandiAreRecognized(t *testing.T) {
assert.Equal(t, time.Duration(time.Second*10), args.IdleTimeout)
assert.Equal(t, "savedir", *args.SaveDir)
assert.Equal(t, "json-nopp", args.Format)
assert.Equal(t, "filter", args.Filter)
assert.True(t, args.Verbose)
assert.True(t, args.Silent)
assert.True(t, args.InsecureTLS)
Expand All @@ -266,7 +267,7 @@ func TestCliInfoCmdIsParsed(t *testing.T) {
assert.Equal(t, 0, len(args.TapConfig))
assert.Equal(t, InfoCmd, args.Cmd)
assertEqualURL(t, "APIURI", args.APIURL)
assert.Equal(t, "true", args.QueueFilter)
assert.Equal(t, "true", args.Filter)
assert.False(t, args.Verbose)
assert.False(t, args.ShowStats)
assert.False(t, args.ShowConsumers)
Expand Down Expand Up @@ -346,7 +347,7 @@ func TestCliInfoCmdAllOptionsAreSet(t *testing.T) {
assertEqualURL(t, "APIURI", args.APIURL)
assert.Nil(t, args.SaveDir)
assert.False(t, args.Verbose)
assert.Equal(t, "EXPR", args.QueueFilter)
assert.Equal(t, "EXPR", args.Filter)
assert.True(t, args.ShowStats)
assert.True(t, args.ShowConsumers)
assert.True(t, args.ShowDefaultExchange)
Expand Down Expand Up @@ -483,7 +484,7 @@ func TestCliSubCmdInvalidNumReturnsError(t *testing.T) {
func TestCliSubCmdAllOptsSet(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"sub", "queuename", "--uri", "uri", "--saveto", "dir",
"--limit=99", "--offset=123", "--idle-timeout=10s"})
"--limit=99", "--offset=123", "--idle-timeout=10s", "--filter=filter"})

assert.Nil(t, err)
assert.Equal(t, SubCmd, args.Cmd)
Expand All @@ -492,6 +493,7 @@ func TestCliSubCmdAllOptsSet(t *testing.T) {
assert.Equal(t, "dir", *args.SaveDir)
assert.Equal(t, int64(99), args.Limit)
assert.Equal(t, time.Duration(time.Second*10), args.IdleTimeout)
assert.Equal(t, "filter", args.Filter)
assert.False(t, args.Reject)
assert.False(t, args.Requeue)
}
Expand Down
Loading

0 comments on commit 8219fa6

Please sign in to comment.