Skip to content

Commit

Permalink
Add a real-time analytics pipeline for documentation analysis and ena…
Browse files Browse the repository at this point in the history
…blement of (future) recommendation systems
  • Loading branch information
ndebuhr committed Dec 2, 2022
1 parent 7bd6a1a commit b090bb1
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
provisioning/install_asm_1.9

# Key files
isidro-analytics.json
isidro-certbot.json
isidro-provisioner.json
isidro-skaffold.json
Expand Down
100 changes: 100 additions & 0 deletions analytics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Isidro Analytics

## Prerequisites

1. Setup Google Analytics 4
1. Establish a Google Analytics BigQuery Export
1. Setup Google Tag Manager

## Setup Cloud Resources

Provision required cloud resources in [/provisioning/analytics](/provisioning/analytics):
```bash
terraform init
terraform apply
export ANALYTICS_BUCKET="$(terraform output --raw bucket_name)"
```

Set the `GOOGLE_PROJECT` environment variable, then create a service account key:
```bash
gcloud iam service-accounts keys create isidro-analytics.json \
--iam-account="isidro-analytics@$GOOGLE_PROJECT.iam.gserviceaccount.com"
```

Store a variable for the analytics credentials `export GOOGLE_APPLICATION_CREDENTIALS=$PWD/isidro-analytics.json`, export your website domain (with http/https scheme) in the `WEBSITE_ORIGIN` environment variable (like `export WEBSITE_ORIGIN=https://isidro.ai`), then deploy the required Cloud Function and run the Dataflow pipeline:
```bash
cd gtm-stream-functions
gcloud beta functions deploy isidro-gtm-stream \
--gen2 \
--runtime=python310 \
--region=us-central1 \
--source=. \
--entry-point=to_pubsub \
--trigger-http \
--service-account isidro-analytics-functions@$GOOGLE_PROJECT.iam.gserviceaccount.com \
--set-env-vars WEBSITE_ORIGIN="$WEBSITE_ORIGIN",GOOGLE_PROJECT="$GOOGLE_PROJECT" \
--allow-unauthenticated
cd ..
WEBSITE_STREAM_ENDPOINT=$(gcloud beta functions describe isidro-gtm-stream --gen2 --region us-central1 --format="value(serviceConfig.uri)")
cd gtm-stream-dataflow
python3 main.py \
--requirements_file requirements.txt \
--project "$GOOGLE_PROJECT" \
--service_account_email "isidro-analytics@$GOOGLE_PROJECT.iam.gserviceaccount.com" \
--temp_location="gs://$ANALYTICS_BUCKET/dataflow/temp" \
--table isidro_analytics.events \
--topic "projects/$GOOGLE_PROJECT/topics/isidro-gtm-stream" \
--report_bucket="$ANALYTICS_BUCKET" \
--report_blob="top-pages.json" \
--website_origin="$WEBSITE_ORIGIN"
cd ..
```

## Setup GTM Configuration

### GTM Trigger

Create a trigger of type timer:
* Event name is `gtm.timer`
* Interval is `1000` (ms)
* Limit is `6000`
* Condition is `Page Hostname` is `equal` to the website's hostname
* Fire on `All Timers`

### GTM Tag

Create a Custom HTML tag in Google Tag Manager. Replace "WEBSITE_STREAM_ENDPOINT" with the Cloud Functions endpoint (`echo $WEBSITE_STREAM_ENDPOINT`). Use the previously created trigger for this tag.
```html
<script>
var headers = new Headers();
headers.append("Content-Type", "application/json");
var body = {
"container_id": "{{Container ID}}",
"container_version": "{{Container Version}}",
"debug_mode": {{Debug Mode}},
"environment_name": "{{Environment Name}}",
"event": "{{Event}}",
"page_hostname": "{{Page Hostname}}",
"page_path": "{{Page Path}}",
"page_url": "{{Page URL}}",
"random_number": {{Random Number}},
"referrer": "{{Referrer}}",
"cookie_enabled": navigator.cookieEnabled,
"language": navigator.language,
"languages": navigator.languages,
"online": navigator.onLine,
"user_agent": navigator.userAgent,
"scroll_x": window.scrollX,
"scroll_y": window.scrollY,
"timestamp": Date.now()
};
var options = {
method: "POST",
headers: headers,
body: JSON.stringify(body)
};
fetch("WEBSITE_STREAM_ENDPOINT", options);
</script>
```
121 changes: 121 additions & 0 deletions analytics/gtm-stream-dataflow/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.trigger import AccumulationMode, AfterProcessingTime, AfterWatermark, Repeatedly
from apache_beam.transforms.window import SlidingWindows

class StreamOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--table',
help='TODO'
)
parser.add_argument(
'--topic',
help='TODO'
)
parser.add_argument(
'--website_origin',
help='TODO'
)
parser.add_argument(
'--report_bucket',
help='TODO'
)
parser.add_argument(
'--report_blob',
help='TODO'
)


options = StreamOptions(
runner='DataflowRunner',
region='us-central1',
streaming=True,
)

table_schema = {
'fields': [
{'name': 'container_id', 'type': 'STRING'},
{'name': 'container_version', 'type': 'STRING'},
{'name': 'cookie_enabled', 'type': 'BOOL'},
{'name': 'debug_mode', 'type': 'BOOL'},
{'name': 'environment_name', 'type': 'STRING'},
{'name': 'event', 'type': 'STRING'},
{'name': 'language', 'type': 'STRING'},
{'name': 'languages', 'type': 'STRING', 'mode': 'REPEATED'},
{'name': 'online', 'type': 'BOOL'},
{'name': 'page_hostname', 'type': 'STRING'},
{'name': 'page_path', 'type': 'STRING'},
{'name': 'page_url', 'type': 'STRING'},
{'name': 'random_number', 'type': 'INT64'},
{'name': 'referrer', 'type': 'STRING'},
{'name': 'scroll_x', 'type': 'FLOAT64'},
{'name': 'scroll_y', 'type': 'FLOAT64'},
{'name': 'timestamp', 'type': 'TIMESTAMP'},
{'name': 'user_agent', 'type': 'STRING'}
]
}

class TopFivePagesFn(beam.CombineFn):
def __init__(self, options):
self.options = options

def setup(self):
from collections import Counter
from google.cloud import storage

self.Counter = Counter

storage_client = storage.Client()
bucket = storage_client.bucket(self.options.report_bucket)
self.blob = bucket.blob(self.options.report_blob)

def create_accumulator(self):
return self.Counter()

def add_input(self, counter, page):
counter[page] += 1
return counter

def merge_accumulators(self, counters):
return sum(counters, self.Counter())

def extract_output(self, counter):
with self.blob.open("w") as f:
f.write(
json.dumps([top_page for top_page, _ in counter.most_common(5)])
)
return None


with beam.Pipeline(options=options) as pipeline:
hostname = options.website_origin.replace("https://", "").replace("http://", "")
# Ingest data as JSON
ingestion = pipeline \
| 'StreamFromPubSub' >> beam.io.ReadFromPubSub(topic=options.topic) \
| 'DeserializeJson' >> beam.Map(json.loads)
# Stream data into BigQuery (10 second windowing)
ingestion \
| 'StreamToBigQuery' >> beam.io.WriteToBigQuery(
options.table,
schema=table_schema,
method='STREAMING_INSERTS',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
# Setup top pages JSON report with GCS (1 day windowing)
ingestion \
| 'FilterByHostname' >> beam.Filter(
lambda event, hostname=hostname: event["page_hostname"] == hostname
) \
| 'GetPagePath' >> beam.Map(lambda event: event["page_path"]) \
| 'SlidingWindow24h' >> beam.WindowInto(
SlidingWindows(24*60*60, 60*60),
trigger=Repeatedly(AfterWatermark(late=AfterProcessingTime(0))),
allowed_lateness=60*60,
accumulation_mode=AccumulationMode.ACCUMULATING
) \
| 'PublishTop5' >> beam.CombineGlobally(TopFivePagesFn(options)).without_defaults()
1 change: 1 addition & 0 deletions analytics/gtm-stream-dataflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-storage>=2.6.0,<3.0.0
65 changes: 65 additions & 0 deletions analytics/gtm-stream-functions/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import functions_framework
import os

from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable

GOOGLE_PROJECT = os.environ.get("GOOGLE_PROJECT")
TOPIC_ID = "isidro-gtm-stream"
WEBSITE_ORIGIN = os.environ.get("WEBSITE_ORIGIN")

if not GOOGLE_PROJECT:
raise ValueError("No GOOGLE_PROJECT environment variable set")
if not WEBSITE_ORIGIN:
raise ValueError("No WEBSITE_ORIGIN environment variable set")

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(GOOGLE_PROJECT, TOPIC_ID)
# Async message publishing to avoid web clients choking up with timeouts, errors, etc.
publish_futures = []

# https://cloud.google.com/pubsub/docs/publisher#python
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")

return callback

@functions_framework.http
def to_pubsub(request):
# Set CORS headers for the preflight request
if request.method == 'OPTIONS':
# Allows GET requests from the website origin with the Content-Type
# header and caches preflight response for an 3600s
headers = {
'Access-Control-Allow-Origin': [WEBSITE_ORIGIN],
'Access-Control-Allow-Methods': ['POST'],
'Access-Control-Allow-Headers': ['Content-Type'],
'Access-Control-Max-Age': '3600'
}

return ('', 204, headers)

publish_future = publisher.publish(topic_path, request.get_data())
# Non-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, request.get_data()))
publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

headers = {
'Access-Control-Allow-Origin': [WEBSITE_ORIGIN],
'Access-Control-Allow-Methods': ['POST'],
'Access-Control-Allow-Headers': ['Content-Type'],
'Access-Control-Max-Age': '3600'
}

return ('', 200, headers)
3 changes: 3 additions & 0 deletions analytics/gtm-stream-functions/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
google-cloud-pubsub>=2.11.0,<3.0.0
functions-framework==3.*
futures>=3.0.5,<4.0.0
6 changes: 6 additions & 0 deletions analytics/provisioning/dataset.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
resource "google_bigquery_dataset" "gtm_stream" {
dataset_id = "isidro_gtm_stream"
friendly_name = "Isidro GTM Stream"
description = "Datasets around the Google Tag Manager stream for Isidro"
location = "US"
}
46 changes: 46 additions & 0 deletions analytics/provisioning/iam-dataflow.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
resource "google_service_account" "analytics" {
account_id = "isidro-analytics"
display_name = "Isidro Analytics"
}

resource "google_project_iam_member" "analytics_dataflow_worker" {
project = data.google_project.project.project_id
role = "roles/dataflow.worker"
member = "serviceAccount:${google_service_account.analytics.email}"
}

resource "google_project_iam_member" "analytics_dataflow_developer" {
project = data.google_project.project.project_id
role = "roles/dataflow.developer"
member = "serviceAccount:${google_service_account.analytics.email}"
}

resource "google_project_iam_member" "analytics_storage_admin" {
project = data.google_project.project.project_id
role = "roles/storage.admin"
member = "serviceAccount:${google_service_account.analytics.email}"
}

resource "google_project_iam_member" "analytics_bigquery_data_owner" {
project = data.google_project.project.project_id
role = "roles/bigquery.dataOwner"
member = "serviceAccount:${google_service_account.analytics.email}"
}

resource "google_project_iam_member" "analytics_bigquery_job_user" {
project = data.google_project.project.project_id
role = "roles/bigquery.jobUser"
member = "serviceAccount:${google_service_account.analytics.email}"
}

resource "google_project_iam_member" "analytics_sa_user" {
project = data.google_project.project.project_id
role = "roles/iam.serviceAccountUser"
member = "serviceAccount:${google_service_account.analytics.email}"
}

resource "google_project_iam_member" "analytics_pubsub_editor" {
project = data.google_project.project.project_id
role = "roles/pubsub.editor"
member = "serviceAccount:${google_service_account.analytics.email}"
}
10 changes: 10 additions & 0 deletions analytics/provisioning/iam-functions.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
resource "google_service_account" "analytics_functions" {
account_id = "isidro-analytics-functions"
display_name = "Isidro Analytics Functions"
}

resource "google_project_iam_member" "analytics_functions_dns_admin" {
project = data.google_project.project.project_id
role = "roles/pubsub.publisher"
member = "serviceAccount:${google_service_account.analytics_functions.email}"
}
3 changes: 3 additions & 0 deletions analytics/provisioning/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
output "bucket_name" {
value = google_storage_bucket.analytics.name
}
5 changes: 5 additions & 0 deletions analytics/provisioning/pubsub.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
data "google_project" "project" {}

resource "google_pubsub_topic" "gtm_stream" {
name = "isidro-gtm-stream"
}
Loading

0 comments on commit b090bb1

Please sign in to comment.