Skip to content

Commit

Permalink
Refactor (module structure, imports, and type hints)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndebuhr committed Dec 4, 2022
1 parent 723d945 commit 4e15d41
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 103 deletions.
2 changes: 1 addition & 1 deletion analytics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ cd ..
WEBSITE_STREAM_ENDPOINT=$(gcloud beta functions describe isidro-gtm-stream --gen2 --region us-central1 --format="value(serviceConfig.uri)")
cd gtm-stream-dataflow
python3 main.py \
--requirements_file requirements.txt \
--setup_file $PWD/setup.py \
--project "$GOOGLE_PROJECT" \
--service_account_email "isidro-analytics@$GOOGLE_PROJECT.iam.gserviceaccount.com" \
--temp_location="gs://$ANALYTICS_BUCKET/dataflow/temp" \
Expand Down
Empty file.
25 changes: 25 additions & 0 deletions analytics/gtm-stream-dataflow/gtm_stream/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from apache_beam.options.pipeline_options import PipelineOptions

class StreamOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--table',
help='TODO'
)
parser.add_argument(
'--topic',
help='TODO'
)
parser.add_argument(
'--website_origin',
help='TODO'
)
parser.add_argument(
'--report_bucket',
help='TODO'
)
parser.add_argument(
'--report_blob',
help='TODO'
)
22 changes: 22 additions & 0 deletions analytics/gtm-stream-dataflow/gtm_stream/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
raw_stream_bq_schema = {
'fields': [
{'name': 'container_id', 'type': 'STRING'},
{'name': 'container_version', 'type': 'STRING'},
{'name': 'cookie_enabled', 'type': 'BOOL'},
{'name': 'debug_mode', 'type': 'BOOL'},
{'name': 'environment_name', 'type': 'STRING'},
{'name': 'event', 'type': 'STRING'},
{'name': 'language', 'type': 'STRING'},
{'name': 'languages', 'type': 'STRING', 'mode': 'REPEATED'},
{'name': 'online', 'type': 'BOOL'},
{'name': 'page_hostname', 'type': 'STRING'},
{'name': 'page_path', 'type': 'STRING'},
{'name': 'page_url', 'type': 'STRING'},
{'name': 'random_number', 'type': 'INT64'},
{'name': 'referrer', 'type': 'STRING'},
{'name': 'scroll_x', 'type': 'FLOAT64'},
{'name': 'scroll_y', 'type': 'FLOAT64'},
{'name': 'timestamp', 'type': 'TIMESTAMP'},
{'name': 'user_agent', 'type': 'STRING'}
]
}
164 changes: 63 additions & 101 deletions analytics/gtm-stream-dataflow/main.py
Original file line number Diff line number Diff line change
@@ -1,120 +1,82 @@
import json
import logging

from collections import Counter
from dateutil.parser import parse as dp
from typing import Iterable

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.trigger import AccumulationMode, AfterProcessingTime, AfterWatermark, Repeatedly
from apache_beam.transforms.window import SlidingWindows
from google.cloud import storage

from gtm_stream.options import StreamOptions
from gtm_stream.schemas import raw_stream_bq_schema

class StreamOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--table',
help='TODO'
)
parser.add_argument(
'--topic',
help='TODO'
)
parser.add_argument(
'--website_origin',
help='TODO'
)
parser.add_argument(
'--report_bucket',
help='TODO'
)
parser.add_argument(
'--report_blob',
help='TODO'
)


options = StreamOptions(
runner='DataflowRunner',
region='us-central1',
streaming=True,
)

table_schema = {
'fields': [
{'name': 'container_id', 'type': 'STRING'},
{'name': 'container_version', 'type': 'STRING'},
{'name': 'cookie_enabled', 'type': 'BOOL'},
{'name': 'debug_mode', 'type': 'BOOL'},
{'name': 'environment_name', 'type': 'STRING'},
{'name': 'event', 'type': 'STRING'},
{'name': 'language', 'type': 'STRING'},
{'name': 'languages', 'type': 'STRING', 'mode': 'REPEATED'},
{'name': 'online', 'type': 'BOOL'},
{'name': 'page_hostname', 'type': 'STRING'},
{'name': 'page_path', 'type': 'STRING'},
{'name': 'page_url', 'type': 'STRING'},
{'name': 'random_number', 'type': 'INT64'},
{'name': 'referrer', 'type': 'STRING'},
{'name': 'scroll_x', 'type': 'FLOAT64'},
{'name': 'scroll_y', 'type': 'FLOAT64'},
{'name': 'timestamp', 'type': 'TIMESTAMP'},
{'name': 'user_agent', 'type': 'STRING'}
]
}

class TopFivePagesFn(beam.CombineFn):
def create_accumulator(self):
return Counter()

def add_input(self, counter: Counter, page: str) -> Counter:
counter[page] += 1
return counter

def merge_accumulators(self, counters: [Counter]) -> Counter:
return sum(counters, Counter())

def extract_output(self, counter: Counter) -> [str]:
return [top_page for top_page, _ in counter.most_common(5)]


class WriteToGCS(beam.DoFn):
def __init__(self, options):
self.options = options

def setup(self):
from collections import Counter
from google.cloud import storage

self.Counter = Counter

storage_client = storage.Client()
bucket = storage_client.bucket(self.options.report_bucket)
self.blob = bucket.blob(self.options.report_blob)

def create_accumulator(self):
return self.Counter()

def add_input(self, counter, page):
counter[page] += 1
return counter

def merge_accumulators(self, counters):
return sum(counters, self.Counter())

def extract_output(self, counter):
def process(self, results: [str]) -> None:
with self.blob.open("w") as f:
f.write(
json.dumps([top_page for top_page, _ in counter.most_common(5)])
json.dumps(results)
)

def run(options: StreamOptions):
""" Main entrypoint; defines and runs the streaming pipeline. """

with beam.Pipeline(options=options) as pipeline:
hostname = options.website_origin.replace("https://", "").replace("http://", "")
# Ingest data as JSON
ingestion = pipeline \
| 'StreamFromPubSub' >> beam.io.ReadFromPubSub(topic=options.topic) \
| 'DeserializeJson' >> beam.Map(json.loads)
# Stream data into BigQuery (10 second windowing)
ingestion \
| 'StreamToBigQuery' >> beam.io.WriteToBigQuery(
options.table,
schema=raw_stream_bq_schema,
method='STREAMING_INSERTS',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
return None


with beam.Pipeline(options=options) as pipeline:
hostname = options.website_origin.replace("https://", "").replace("http://", "")
# Ingest data as JSON
ingestion = pipeline \
| 'StreamFromPubSub' >> beam.io.ReadFromPubSub(topic=options.topic) \
| 'DeserializeJson' >> beam.Map(json.loads)
# Stream data into BigQuery (10 second windowing)
ingestion \
| 'StreamToBigQuery' >> beam.io.WriteToBigQuery(
options.table,
schema=table_schema,
method='STREAMING_INSERTS',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
# Setup top pages JSON report with GCS (1 day windowing)
ingestion \
| 'FilterByHostname' >> beam.Filter(
lambda event, hostname=hostname: event["page_hostname"] == hostname
) \
| 'GetPagePath' >> beam.Map(lambda event: event["page_path"]) \
| 'SlidingWindow24h' >> beam.WindowInto(
SlidingWindows(24*60*60, 60*60),
allowed_lateness=60*60,
accumulation_mode=AccumulationMode.ACCUMULATING
) \
| 'PublishTop5' >> beam.CombineGlobally(TopFivePagesFn(options)).without_defaults()
# Setup top pages JSON report with GCS (1 day windowing)
ingestion \
| 'FilterByHostname' >> beam.Filter(
lambda event: event["page_hostname"] == hostname
) \
| 'GetPagePath' >> beam.Map(lambda event: event["page_path"]) \
| 'SlidingWindow24h' >> beam.WindowInto(SlidingWindows(24*60*60, 60*60)) \
| 'CompileTop5' >> beam.CombineGlobally(TopFivePagesFn()).without_defaults() \
| 'WriteToGCS' >> beam.ParDo(WriteToGCS(options))

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
options = StreamOptions(
runner='DataflowRunner',
region='us-central1',
save_main_session=True,
streaming=True,
)
run(options)
1 change: 0 additions & 1 deletion analytics/gtm-stream-dataflow/requirements.txt

This file was deleted.

10 changes: 10 additions & 0 deletions analytics/gtm-stream-dataflow/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import setuptools

setuptools.setup(
name='gtm_stream',
version='0.0.1',
install_requires=[
'google-cloud-storage>=2.6.0,<3.0.0'
],
packages=setuptools.find_packages()
)

0 comments on commit 4e15d41

Please sign in to comment.