Skip to content
This repository has been archived by the owner on Sep 2, 2021. It is now read-only.

Commit

Permalink
Merge pull request #18 from intuit/kinesis_log_level
Browse files Browse the repository at this point in the history
Add log levels for Kinesis client
  • Loading branch information
wtait1 authored Mar 30, 2020
2 parents af71d0b + f36c587 commit 79b4b6c
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 44 deletions.
56 changes: 38 additions & 18 deletions kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,56 +36,71 @@ func getRegion() string {
return region
}

func buildClient(streamName, streamRole string) *kinesis.Kinesis {
type kinesisWrapper struct {
client kinesisiface.KinesisAPI
// allows different clients to log certain messages at different levels
// Ex. Regular Kinesis handler for sending HTTP data is "INFO" (log.Printf)
// Telemetry Kinesis handler is set at "DEBUG" (logDebug)
logger func(string, ...interface{})
}

func buildClient(streamName, streamRole string, logFunc func(string, ...interface{})) *kinesisWrapper {
if streamName == kinesaliteStreamName {
return buildKinesaliteClient(streamName)
}
return buildKinesisClient(streamName, streamRole)
return buildKinesisClient(streamName, streamRole, logFunc)
}

// Uses STS to assume an IAM role for credentials to write records
// to a real Kinesis stream in AWS
func buildKinesisClient(streamName, streamRole string) *kinesis.Kinesis {
log.Printf("Creating AWS Kinesis client")
func buildKinesisClient(streamName, streamRole string, logFunc func(string, ...interface{})) *kinesisWrapper {
logFunc("Creating AWS Kinesis client (stream=%s)", streamName)
userSession := session.Must(session.NewSession(&aws.Config{
CredentialsChainVerboseErrors: aws.Bool(verboseCredentialErrors),
Region: aws.String(getRegion()),
}))

log.Println("Fetching temp credentials...")
logFunc("Fetching temp credentials...")
kinesisTempCreds := stscreds.NewCredentials(userSession, streamRole)
log.Println("Success!")
logFunc("Success!")

client := kinesis.New(userSession, &aws.Config{
kinesisHandle := kinesis.New(userSession, &aws.Config{
CredentialsChainVerboseErrors: aws.Bool(verboseCredentialErrors),
Credentials: kinesisTempCreds,
Region: aws.String(getRegion()),
})

sseEnabled, err := streamHasSSE(streamName, client)
sseEnabled, err := streamHasSSE(streamName, kinesisHandle)
if err != nil {
// Note: %s "wraps" the original error
logErr(fmt.Errorf("Could not determine if SSE is enabled for stream %s: %s", streamName, err))
} else if !sseEnabled {
logWarn(fmt.Sprintf("Kinesis stream %s does NOT have Server-Side Encryption (SSE) enabled", streamName))
logFunc(err.Error())
}
if !sseEnabled {
logWarn(fmt.Sprintf("Kinesis stream '%s' does NOT have Server-Side Encryption (SSE) enabled", streamName))
}
return &kinesisWrapper{
client: kinesisHandle,
logger: logFunc,
}
return client
}

func streamHasSSE(streamName string, client kinesisiface.KinesisAPI) (bool, error) {
streamInfo, err := client.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
})
if streamInfo == nil || streamInfo.StreamDescription == nil {
return false, fmt.Errorf("Could not determine if SSE is enabled for stream %s: %w", streamName, err)
}

return *streamInfo.StreamDescription.EncryptionType == kinesis.EncryptionTypeKms, err
}

// Kinesalite is a lightweight implementation of Kinesis
// useful for development scenarios.
// https://github.com/mhart/kinesalite
func buildKinesaliteClient(streamName string) *kinesis.Kinesis {
func buildKinesaliteClient(streamName string) *kinesisWrapper {
log.Printf("Creating local Kinesalite client")
log.Printf("Sending unverified traffic to stream endpoint=" + kinesaliteEndpoint)
return kinesis.New(session.Must(session.NewSession()), &aws.Config{
kinesisHandle := kinesis.New(session.Must(session.NewSession()), &aws.Config{
Endpoint: aws.String(kinesaliteEndpoint),
Credentials: credentials.NewStaticCredentials("x", "x", "x"),
Region: aws.String(getRegion()),
Expand All @@ -95,6 +110,11 @@ func buildKinesaliteClient(streamName string) *kinesis.Kinesis {
InsecureSkipVerify: true},
}},
})

return &kinesisWrapper{
client: kinesisHandle,
logger: log.Printf,
}
}

func chunkData(data string, size int) []string {
Expand Down Expand Up @@ -139,20 +159,20 @@ func buildMessages(line string) []EventChunk {
return messages
}

func sendToStream(message interface{}, stream string, client kinesisiface.KinesisAPI) error {
func (c *kinesisWrapper) sendToStream(message interface{}, stream string) error {
dataBytes, err := json.Marshal(message)
if err != nil {
return err
}
partition := "replay-partition-key-" + time.Now().String()
response, err := client.PutRecord(&kinesis.PutRecordInput{
response, err := c.client.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String(stream),
Data: dataBytes,
PartitionKey: &partition,
})
if err != nil {
return err
}
log.Printf("%+v\n", response)
c.logger("Successfully put record to stream=%s\n%+v\n", stream, response)
return nil
}
32 changes: 29 additions & 3 deletions kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ func TestGetRegionOverride(t *testing.T) {
}
}

func TestStreamHasSSEError(t *testing.T) {
mockKinesis := &mockKinesisClient{}

hasSSE, err := streamHasSSE("simulate_empty_response", mockKinesis)
if err == nil {
t.Errorf("Expected error, but got <nil>")
}
if hasSSE {
t.Errorf("Expected 'hasSSE' == FALSE, but was TRUE")
}

hasSSE, err = streamHasSSE("success", mockKinesis)
if err != nil {
t.Errorf("Expected no error, but got %s", err)
}
if !hasSSE {
t.Errorf("Expected 'hasSSE' == TRUE, but was FALSE")
}
}

func TestBuildMessages(t *testing.T) {
expected := EventChunk{
ChunkNumber: 0,
Expand All @@ -65,9 +85,10 @@ func TestBuildMessages(t *testing.T) {

func TestSendToStreamMarshalError(t *testing.T) {
mockKinesis := &mockKinesisClient{}
wrapper := &kinesisWrapper{client: mockKinesis}

// json.Marshal can't Marshal certain types, like channels
err := sendToStream(make(chan int), "test", mockKinesis)
err := wrapper.sendToStream(make(chan int), "test")
if mockKinesis.timesCalled > 0 {
t.Error("Expected mock Kinesis client not to be called, but it was")
}
Expand All @@ -78,8 +99,9 @@ func TestSendToStreamMarshalError(t *testing.T) {

func TestSendToStreamKinesisError(t *testing.T) {
mockKinesis := &mockKinesisClient{}
wrapper := &kinesisWrapper{client: mockKinesis}

err := sendToStream(`{"data": "test"}`, "simulate_error", mockKinesis)
err := wrapper.sendToStream(`{"data": "test"}`, "simulate_error")
if mockKinesis.timesCalled == 0 {
t.Error("Expected mock Kinesis client to be called, but it was NOT")
}
Expand All @@ -90,8 +112,12 @@ func TestSendToStreamKinesisError(t *testing.T) {

func TestSendToStreamKinesisSuccess(t *testing.T) {
mockKinesis := &mockKinesisClient{}
wrapper := &kinesisWrapper{
client: mockKinesis,
logger: nopLog,
}

err := sendToStream(`{"data": "test"}`, "test", mockKinesis)
err := wrapper.sendToStream(`{"data": "test"}`, "test")
if mockKinesis.timesCalled == 0 {
t.Error("Expected mock Kinesis client to be called, but it was NOT")
}
Expand Down
23 changes: 17 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,32 @@ func logErr(err error) {
}
}

func logWarn(msg string) {
if flags.debug {
log.Printf("[WARN] %s\n", msg)
func logWarn(msg string, v ...interface{}) {
if len(v) > 0 {
println(msg)
log.Printf("[WARN] "+msg+"\n", v...)
} else {
log.Printf("[WARN] " + msg + "\n")
}
}

func logDebug(msg string) {
func logDebug(msg string, v ...interface{}) {
if flags.debug {
log.Printf("[DEBUG] %s\n", msg)
if len(v) > 0 {
log.Printf("[DEBUG] "+msg+"\n", v...)
} else {
log.Printf("[DEBUG] " + msg + "\n")
}
}
}

func readFlags() {
// Overrides the default message of "pflag: help requested" in the case of -h / --help
flag.ErrHelp = errors.New("")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of replay-zero:\n")
flag.PrintDefaults()
}
flag.BoolVarP(&flags.version, "version", "V", false, "Print version info and exit")
flag.IntVarP(&flags.listenPort, "listen-port", "l", 9000, "The port the Replay Zero proxy will listen on")
flag.IntVarP(&flags.defaultTargetPort, "target-port", "t", 8080, "The port the Replay Zero proxy will forward to on localhost")
Expand Down Expand Up @@ -183,9 +194,9 @@ func createServerHandler(h eventHandler) func(http.ResponseWriter, *http.Request
}

func main() {
readFlags()
telemetry = getTelemetryAgent()
go telemetry.logUsage(telemetryUsageOpen)
readFlags()

var h eventHandler
if len(flags.streamName) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion offline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func init() {
telemetry = &noopTelemetryAgent{}
telemetry = &nopTelemetryAgent{}
}

func TestOfflineHandleEventNoFlush(t *testing.T) {
Expand Down
8 changes: 3 additions & 5 deletions online.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package main

import (
"log"

"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)

// EventChunk contains raw event data + metadata if chunking a large event
Expand All @@ -16,13 +14,13 @@ type EventChunk struct {

type onlineHandler struct {
kinesisStreamName string
client kinesisiface.KinesisAPI
kinesisHandle *kinesisWrapper
}

func getOnlineHandler(streamName, streamRole string) *onlineHandler {
return &onlineHandler{
kinesisStreamName: streamName,
client: buildClient(streamName, streamRole),
kinesisHandle: buildClient(streamName, streamRole, log.Printf),
}
}

Expand All @@ -32,7 +30,7 @@ func (h *onlineHandler) handleEvent(line HTTPEvent) {
messages := buildMessages(lineStr)
for _, m := range messages {
log.Println("Sending event with UUID=" + m.UUID)
err := sendToStream(m, flags.streamName, h.client)
err := h.kinesisHandle.sendToStream(m, flags.streamName)
if err != nil {
log.Println(err)
}
Expand Down
5 changes: 3 additions & 2 deletions online_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// - - - - - - - - - - - - -

func init() {
telemetry = &noopTelemetryAgent{}
telemetry = &nopTelemetryAgent{}
}

// - - - - - - - - - - - - -
Expand All @@ -18,9 +18,10 @@ func init() {

func TestHandleEvent(t *testing.T) {
mockKinesis := &mockKinesisClient{}
wrapper := &kinesisWrapper{client: mockKinesis, logger: nopLog}
testHandler := &onlineHandler{
kinesisStreamName: "test",
client: mockKinesis,
kinesisHandle: wrapper,
}

onlineSampleEvent := generateSampleEvent()
Expand Down
14 changes: 6 additions & 8 deletions telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"os"
"os/user"
"time"

"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)

// events
Expand All @@ -24,12 +22,12 @@ type telemetryAgent interface {
}

// No-op agent in case the proper variables are not set
type noopTelemetryAgent struct{}
type nopTelemetryAgent struct{}

// Agent to send telemetry to a Kinesis stream
type kinesisTelemetryAgent struct {
stream string
client kinesisiface.KinesisAPI
client *kinesisWrapper
}

// logInfo contains data to send to the HTTP endpoint.
Expand All @@ -48,16 +46,16 @@ func getTelemetryAgent() telemetryAgent {

if streamName == "" {
logDebug("Missing telemetry stream name, returning no-op agent (will not send telemetry)")
return &noopTelemetryAgent{}
return &nopTelemetryAgent{}
}
logDebug("Building Kinesis agent for sending telemetry")
return &kinesisTelemetryAgent{
stream: streamName,
client: buildClient(streamName, streamRole),
client: buildClient(streamName, streamRole, logDebug),
}
}

func (agent *noopTelemetryAgent) logUsage(event int) {
func (agent *nopTelemetryAgent) logUsage(event int) {
logDebug("Telemetry: NO-OP")
}

Expand Down Expand Up @@ -107,5 +105,5 @@ func getCurrentUser() string {
// Send a telemetry message to the stream specified by `REPLAY_ZERO_TELEMETRY_STREAM`
// and authorized by the IAM role `REPLAY_ZERO_TELEMETRY_ROLE`
func (agent *kinesisTelemetryAgent) streamTelemetry(info *logInfo) error {
return sendToStream(info, agent.stream, agent.client)
return agent.client.sendToStream(info, agent.stream)
}
19 changes: 18 additions & 1 deletion testUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,26 @@ type mockKinesisClient struct {
kinesisiface.KinesisAPI
}

func (m *mockKinesisClient) DescribeStream(inp *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) {
m.timesCalled++
if *inp.StreamName == "simulate_empty_response" {
return &kinesis.DescribeStreamOutput{}, nil
} else {
return &kinesis.DescribeStreamOutput{
StreamDescription: &kinesis.StreamDescription{
StreamName: aws.String("test-stream"),
EncryptionType: aws.String(kinesis.EncryptionTypeKms),
},
}, nil
}
}

func (m *mockKinesisClient) PutRecord(inp *kinesis.PutRecordInput) (*kinesis.PutRecordOutput, error) {
m.timesCalled++
// Used to deterministically simulate an error in the AWS SDK
// See `TestSendToStreamKinesisError` for usage
if *inp.StreamName == "simulate_error" {
return &kinesis.PutRecordOutput{}, fmt.Errorf("simulated error")
return &kinesis.PutRecordOutput{}, fmt.Errorf("simulated service error")
}
return &kinesis.PutRecordOutput{
SequenceNumber: aws.String("a"),
Expand All @@ -80,3 +94,6 @@ func (m *mockKinesisClient) PutRecord(inp *kinesis.PutRecordInput) (*kinesis.Put
func emptyWriter(h *offlineHandler) io.Writer {
return ioutil.Discard
}

// Accepts a log message and does nothing with it
func nopLog(msg string, v ...interface{}) {}

0 comments on commit 79b4b6c

Please sign in to comment.