From 4e15d41cda56b2a5f69d4ac192b1a64af9d3c9a0 Mon Sep 17 00:00:00 2001 From: Neal DeBuhr Date: Sun, 4 Dec 2022 17:10:00 +0000 Subject: [PATCH] Refactor (module structure, imports, and type hints) --- analytics/README.md | 2 +- .../gtm_stream/__init__.py | 0 .../gtm-stream-dataflow/gtm_stream/options.py | 25 +++ .../gtm-stream-dataflow/gtm_stream/schemas.py | 22 +++ analytics/gtm-stream-dataflow/main.py | 164 +++++++----------- .../gtm-stream-dataflow/requirements.txt | 1 - analytics/gtm-stream-dataflow/setup.py | 10 ++ 7 files changed, 121 insertions(+), 103 deletions(-) create mode 100644 analytics/gtm-stream-dataflow/gtm_stream/__init__.py create mode 100644 analytics/gtm-stream-dataflow/gtm_stream/options.py create mode 100644 analytics/gtm-stream-dataflow/gtm_stream/schemas.py delete mode 100644 analytics/gtm-stream-dataflow/requirements.txt create mode 100644 analytics/gtm-stream-dataflow/setup.py diff --git a/analytics/README.md b/analytics/README.md index fd6061d..96ac9f9 100644 --- a/analytics/README.md +++ b/analytics/README.md @@ -38,7 +38,7 @@ cd .. WEBSITE_STREAM_ENDPOINT=$(gcloud beta functions describe isidro-gtm-stream --gen2 --region us-central1 --format="value(serviceConfig.uri)") cd gtm-stream-dataflow python3 main.py \ - --requirements_file requirements.txt \ + --setup_file $PWD/setup.py \ --project "$GOOGLE_PROJECT" \ --service_account_email "isidro-analytics@$GOOGLE_PROJECT.iam.gserviceaccount.com" \ --temp_location="gs://$ANALYTICS_BUCKET/dataflow/temp" \ diff --git a/analytics/gtm-stream-dataflow/gtm_stream/__init__.py b/analytics/gtm-stream-dataflow/gtm_stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/analytics/gtm-stream-dataflow/gtm_stream/options.py b/analytics/gtm-stream-dataflow/gtm_stream/options.py new file mode 100644 index 0000000..8093e96 --- /dev/null +++ b/analytics/gtm-stream-dataflow/gtm_stream/options.py @@ -0,0 +1,25 @@ +from apache_beam.options.pipeline_options import PipelineOptions + +class StreamOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--table', + help='TODO' + ) + parser.add_argument( + '--topic', + help='TODO' + ) + parser.add_argument( + '--website_origin', + help='TODO' + ) + parser.add_argument( + '--report_bucket', + help='TODO' + ) + parser.add_argument( + '--report_blob', + help='TODO' + ) \ No newline at end of file diff --git a/analytics/gtm-stream-dataflow/gtm_stream/schemas.py b/analytics/gtm-stream-dataflow/gtm_stream/schemas.py new file mode 100644 index 0000000..f5ae8d2 --- /dev/null +++ b/analytics/gtm-stream-dataflow/gtm_stream/schemas.py @@ -0,0 +1,22 @@ +raw_stream_bq_schema = { + 'fields': [ + {'name': 'container_id', 'type': 'STRING'}, + {'name': 'container_version', 'type': 'STRING'}, + {'name': 'cookie_enabled', 'type': 'BOOL'}, + {'name': 'debug_mode', 'type': 'BOOL'}, + {'name': 'environment_name', 'type': 'STRING'}, + {'name': 'event', 'type': 'STRING'}, + {'name': 'language', 'type': 'STRING'}, + {'name': 'languages', 'type': 'STRING', 'mode': 'REPEATED'}, + {'name': 'online', 'type': 'BOOL'}, + {'name': 'page_hostname', 'type': 'STRING'}, + {'name': 'page_path', 'type': 'STRING'}, + {'name': 'page_url', 'type': 'STRING'}, + {'name': 'random_number', 'type': 'INT64'}, + {'name': 'referrer', 'type': 'STRING'}, + {'name': 'scroll_x', 'type': 'FLOAT64'}, + {'name': 'scroll_y', 'type': 'FLOAT64'}, + {'name': 'timestamp', 'type': 'TIMESTAMP'}, + {'name': 'user_agent', 'type': 'STRING'} + ] +} \ No newline at end of file diff --git a/analytics/gtm-stream-dataflow/main.py b/analytics/gtm-stream-dataflow/main.py index 0bf922a..9c008ea 100644 --- a/analytics/gtm-stream-dataflow/main.py +++ b/analytics/gtm-stream-dataflow/main.py @@ -1,120 +1,82 @@ import json +import logging + +from collections import Counter +from dateutil.parser import parse as dp +from typing import Iterable import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.transforms.trigger import AccumulationMode, AfterProcessingTime, AfterWatermark, Repeatedly from apache_beam.transforms.window import SlidingWindows +from google.cloud import storage + +from gtm_stream.options import StreamOptions +from gtm_stream.schemas import raw_stream_bq_schema -class StreamOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument( - '--table', - help='TODO' - ) - parser.add_argument( - '--topic', - help='TODO' - ) - parser.add_argument( - '--website_origin', - help='TODO' - ) - parser.add_argument( - '--report_bucket', - help='TODO' - ) - parser.add_argument( - '--report_blob', - help='TODO' - ) - - -options = StreamOptions( - runner='DataflowRunner', - region='us-central1', - streaming=True, -) - -table_schema = { - 'fields': [ - {'name': 'container_id', 'type': 'STRING'}, - {'name': 'container_version', 'type': 'STRING'}, - {'name': 'cookie_enabled', 'type': 'BOOL'}, - {'name': 'debug_mode', 'type': 'BOOL'}, - {'name': 'environment_name', 'type': 'STRING'}, - {'name': 'event', 'type': 'STRING'}, - {'name': 'language', 'type': 'STRING'}, - {'name': 'languages', 'type': 'STRING', 'mode': 'REPEATED'}, - {'name': 'online', 'type': 'BOOL'}, - {'name': 'page_hostname', 'type': 'STRING'}, - {'name': 'page_path', 'type': 'STRING'}, - {'name': 'page_url', 'type': 'STRING'}, - {'name': 'random_number', 'type': 'INT64'}, - {'name': 'referrer', 'type': 'STRING'}, - {'name': 'scroll_x', 'type': 'FLOAT64'}, - {'name': 'scroll_y', 'type': 'FLOAT64'}, - {'name': 'timestamp', 'type': 'TIMESTAMP'}, - {'name': 'user_agent', 'type': 'STRING'} - ] -} class TopFivePagesFn(beam.CombineFn): + def create_accumulator(self): + return Counter() + + def add_input(self, counter: Counter, page: str) -> Counter: + counter[page] += 1 + return counter + + def merge_accumulators(self, counters: [Counter]) -> Counter: + return sum(counters, Counter()) + + def extract_output(self, counter: Counter) -> [str]: + return [top_page for top_page, _ in counter.most_common(5)] + + +class WriteToGCS(beam.DoFn): def __init__(self, options): self.options = options def setup(self): - from collections import Counter - from google.cloud import storage - - self.Counter = Counter - storage_client = storage.Client() bucket = storage_client.bucket(self.options.report_bucket) self.blob = bucket.blob(self.options.report_blob) - def create_accumulator(self): - return self.Counter() - - def add_input(self, counter, page): - counter[page] += 1 - return counter - - def merge_accumulators(self, counters): - return sum(counters, self.Counter()) - - def extract_output(self, counter): + def process(self, results: [str]) -> None: with self.blob.open("w") as f: f.write( - json.dumps([top_page for top_page, _ in counter.most_common(5)]) + json.dumps(results) + ) + +def run(options: StreamOptions): + """ Main entrypoint; defines and runs the streaming pipeline. """ + + with beam.Pipeline(options=options) as pipeline: + hostname = options.website_origin.replace("https://", "").replace("http://", "") + # Ingest data as JSON + ingestion = pipeline \ + | 'StreamFromPubSub' >> beam.io.ReadFromPubSub(topic=options.topic) \ + | 'DeserializeJson' >> beam.Map(json.loads) + # Stream data into BigQuery (10 second windowing) + ingestion \ + | 'StreamToBigQuery' >> beam.io.WriteToBigQuery( + options.table, + schema=raw_stream_bq_schema, + method='STREAMING_INSERTS', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED ) - return None - - -with beam.Pipeline(options=options) as pipeline: - hostname = options.website_origin.replace("https://", "").replace("http://", "") - # Ingest data as JSON - ingestion = pipeline \ - | 'StreamFromPubSub' >> beam.io.ReadFromPubSub(topic=options.topic) \ - | 'DeserializeJson' >> beam.Map(json.loads) - # Stream data into BigQuery (10 second windowing) - ingestion \ - | 'StreamToBigQuery' >> beam.io.WriteToBigQuery( - options.table, - schema=table_schema, - method='STREAMING_INSERTS', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED - ) - # Setup top pages JSON report with GCS (1 day windowing) - ingestion \ - | 'FilterByHostname' >> beam.Filter( - lambda event, hostname=hostname: event["page_hostname"] == hostname - ) \ - | 'GetPagePath' >> beam.Map(lambda event: event["page_path"]) \ - | 'SlidingWindow24h' >> beam.WindowInto( - SlidingWindows(24*60*60, 60*60), - allowed_lateness=60*60, - accumulation_mode=AccumulationMode.ACCUMULATING - ) \ - | 'PublishTop5' >> beam.CombineGlobally(TopFivePagesFn(options)).without_defaults() + # Setup top pages JSON report with GCS (1 day windowing) + ingestion \ + | 'FilterByHostname' >> beam.Filter( + lambda event: event["page_hostname"] == hostname + ) \ + | 'GetPagePath' >> beam.Map(lambda event: event["page_path"]) \ + | 'SlidingWindow24h' >> beam.WindowInto(SlidingWindows(24*60*60, 60*60)) \ + | 'CompileTop5' >> beam.CombineGlobally(TopFivePagesFn()).without_defaults() \ + | 'WriteToGCS' >> beam.ParDo(WriteToGCS(options)) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + options = StreamOptions( + runner='DataflowRunner', + region='us-central1', + save_main_session=True, + streaming=True, + ) + run(options) \ No newline at end of file diff --git a/analytics/gtm-stream-dataflow/requirements.txt b/analytics/gtm-stream-dataflow/requirements.txt deleted file mode 100644 index 7c4ce45..0000000 --- a/analytics/gtm-stream-dataflow/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -google-cloud-storage>=2.6.0,<3.0.0 \ No newline at end of file diff --git a/analytics/gtm-stream-dataflow/setup.py b/analytics/gtm-stream-dataflow/setup.py new file mode 100644 index 0000000..1d489fa --- /dev/null +++ b/analytics/gtm-stream-dataflow/setup.py @@ -0,0 +1,10 @@ +import setuptools + +setuptools.setup( + name='gtm_stream', + version='0.0.1', + install_requires=[ + 'google-cloud-storage>=2.6.0,<3.0.0' + ], + packages=setuptools.find_packages() +) \ No newline at end of file