Skip to content

Commit

Permalink
Decouple gatekeeping (and the initial chat platform response) from or…
Browse files Browse the repository at this point in the history
…chestration and the actual processing workflows, using PubSub
  • Loading branch information
ndebuhr committed Oct 11, 2022
1 parent bcf0b81 commit fd7f9dc
Show file tree
Hide file tree
Showing 21 changed files with 425 additions and 143 deletions.
4 changes: 2 additions & 2 deletions chart/templates/gatekeeper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ spec:
value: "{{ .Values.mattermost.verificationToken }}"
- name: MATTERMOST_DOMAIN
value: "{{ .Values.mattermost.domain }}"
- name: ORCHESTRATION_HOST
value: orchestration.{{ .Release.Namespace }}.svc.clusterset.local
- name: QUEUER_HOST
value: queuer.{{ .Release.Namespace }}.svc.clusterset.local
resources: {{ toYaml .Values.gatekeeper.resources | nindent 10 }}
restartPolicy: Always
---
Expand Down
58 changes: 3 additions & 55 deletions chart/templates/orchestration.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,4 @@
{{ if eq .Values.orchestration.enabled true }}
apiVersion: v1
kind: Service
metadata:
name: orchestration
labels:
app: orchestration
spec:
type: ClusterIP
ports:
- name: http
port: 80
targetPort: 80
selector:
app: orchestration
---
kind: ServiceExport
apiVersion: net.gke.io/v1
metadata:
name: orchestration
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand All @@ -37,12 +17,14 @@ spec:
sidecar.istio.io/inject: "true"
istio.io/rev: "asm-managed-rapid"
spec:
serviceAccountName: db-client-microservice
serviceAccountName: orchestration-microservice
containers:
- name: orchestration
image: {{ .Values.orchestration.image.repository }}:{{ .Values.orchestration.image.tag }}
imagePullPolicy: Always
env:
- name: PUBSUB_PROJECT
value: {{ .Values.project }}
- name: SPANNER_INSTANCE_ID
value: "isidro"
- name: SPANNER_DATABASE_ID
Expand All @@ -68,19 +50,6 @@ spec:
resources: {{ toYaml .Values.orchestration.resources | nindent 10 }}
restartPolicy: Always
---
apiVersion: monitoring.googleapis.com/v1alpha1
kind: PodMonitoring
metadata:
name: orchestration
spec:
selector:
matchLabels:
app: orchestration
endpoints:
- port: 80
interval: 30s
timeout: 10s
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
Expand All @@ -92,27 +61,6 @@ spec:
policyTypes:
- Egress
- Ingress
ingress:
- from:
# GCP Health Check IPs
- ipBlock:
cidr: 35.191.0.0/16
- ipBlock:
cidr: 130.211.0.0/22
# RFC1918 (overkill, but enables multi-cluster and multi-region)
- ipBlock:
cidr: 10.0.0.0/8
- ipBlock:
cidr: 172.16.0.0/12
- ipBlock:
cidr: 192.168.0.0/16
# In-cluster services (failover for non-RFC1918 topologies)
- podSelector:
matchLabels:
app: gatekeeper
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: gmp-system
egress:
- {}
{{ end }}
98 changes: 98 additions & 0 deletions chart/templates/queuer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{{ if eq .Values.queuer.enabled true }}
apiVersion: v1
kind: Service
metadata:
name: queuer
labels:
app: queuer
spec:
type: ClusterIP
ports:
- name: http
port: 80
targetPort: 80
selector:
app: queuer
---
kind: ServiceExport
apiVersion: net.gke.io/v1
metadata:
name: queuer
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: queuer
labels:
app: queuer
spec:
replicas: 1
selector:
matchLabels:
app: queuer
template:
metadata:
labels:
app: queuer
sidecar.istio.io/inject: "true"
istio.io/rev: "asm-managed-rapid"
spec:
serviceAccountName: orchestration-microservice
containers:
- name: queuer
image: {{ .Values.queuer.image.repository }}:{{ .Values.queuer.image.tag }}
imagePullPolicy: Always
env:
- name: PUBSUB_PROJECT
value: {{ .Values.project }}
resources: {{ toYaml .Values.queuer.resources | nindent 10 }}
restartPolicy: Always
---
apiVersion: monitoring.googleapis.com/v1alpha1
kind: PodMonitoring
metadata:
name: queuer
spec:
selector:
matchLabels:
app: queuer
endpoints:
- port: 80
interval: 30s
timeout: 10s
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: queuer
spec:
podSelector:
matchLabels:
app: queuer
policyTypes:
- Egress
- Ingress
ingress:
- from:
# GCP Health Check IPs
- ipBlock:
cidr: 35.191.0.0/16
- ipBlock:
cidr: 130.211.0.0/22
# RFC1918 (overkill, but enables multi-cluster and multi-region)
- ipBlock:
cidr: 10.0.0.0/8
- ipBlock:
cidr: 172.16.0.0/12
- ipBlock:
cidr: 192.168.0.0/16
# In-cluster services (failover for non-RFC1918 topologies)
- podSelector:
matchLabels:
app: gatekeeper
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: gmp-system
egress:
- {}
{{ end }}
4 changes: 2 additions & 2 deletions chart/templates/service-account.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ metadata:
apiVersion: v1
kind: ServiceAccount
metadata:
name: db-client-microservice
name: orchestration-microservice
annotations:
iam.gke.io/gcp-service-account: isidro-db-client-microservices@{{ .Values.project }}.iam.gserviceaccount.com
iam.gke.io/gcp-service-account: isidro-orchestration@{{ .Values.project }}.iam.gserviceaccount.com
---
apiVersion: v1
kind: ServiceAccount
Expand Down
13 changes: 13 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@ policyAgent:
enabled: false
config: {}

queuer:
enabled: true
image:
repository: us.gcr.io/PROJECT/isidro/queuer
tag: latest
resources:
requests:
cpu: 250m
memory: 1Gi
limits:
cpu: 1
memory: 1Gi

repeater:
enabled: true
image:
Expand Down
8 changes: 4 additions & 4 deletions gatekeeper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
MATTERMOST_ACCESS_TOKEN = os.environ.get("MATTERMOST_ACCESS_TOKEN")
MATTERMOST_VERIFICATION_TOKEN = os.environ.get("MATTERMOST_VERIFICATION_TOKEN")
MATTERMOST_DOMAIN = os.environ.get("MATTERMOST_DOMAIN")
ORCHESTRATION_HOST = os.environ.get("ORCHESTRATION_HOST")
QUEUER_HOST = os.environ.get("QUEUER_HOST")

if not SLACK_VERIFICATION_TOKEN:
raise ValueError("No SLACK_VERIFICATION_TOKEN environment variable set")
Expand All @@ -18,8 +18,8 @@
raise ValueError("No MATTERMOST_VERIFICATION_TOKEN environment variable set")
if not MATTERMOST_DOMAIN:
raise ValueError("No MATTERMOST_DOMAIN environment variable set")
if not ORCHESTRATION_HOST:
raise ValueError("No ORCHESTRATION_HOST environment variable set")
if not QUEUER_HOST:
raise ValueError("No QUEUER_HOST environment variable set")

app = Flask(__name__)

Expand Down Expand Up @@ -127,7 +127,7 @@ def challenge_response(self):
def general_response(self):
# Pass data to the orchestration service
requests.post(
f"http://{ORCHESTRATION_HOST}/v1/orchestrate",
f"http://{QUEUER_HOST}/v1/queue",
json={
"platform": self.get_platform(),
"channel": self.get_channel(),
Expand Down
2 changes: 1 addition & 1 deletion orchestration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ COPY . .

ENV PYTHONUNBUFFERED True

CMD exec gunicorn --bind :80 --workers 1 --threads 8 --timeout 0 --log-level=debug main:app
CMD python ./main.py
50 changes: 23 additions & 27 deletions orchestration/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import re
import requests

import metrics
import observability

from flask import Flask, abort, request
from google.cloud import pubsub_v1
from spanner import database, insert_post
from prometheus_client import generate_latest

CONFIRMATION_WORDS = [
"yes",
Expand All @@ -22,6 +20,7 @@
"sure",
]

PUBSUB_PROJECT = os.environ.get("PUBSUB_PROJECT")
GREETING = os.environ.get("GREETING")
RESPONDER_HOST = os.environ.get("RESPONDER_HOST")
KEYWORDS_HOST = os.environ.get("KEYWORDS_HOST")
Expand All @@ -32,6 +31,8 @@
TASKS_HOST = os.environ.get("TASKS_HOST")
REPEATER_HOST = os.environ.get("REPEATER_HOST")

if not PUBSUB_PROJECT:
raise ValueError("No PUBSUB_PROJECT environment variable set")
if not GREETING:
raise ValueError("No GREETING environment variable set")
if not RESPONDER_HOST:
Expand All @@ -51,18 +52,19 @@
if not REPEATER_HOST:
raise ValueError("No REPEATER_HOST environment variable set")

app = Flask(__name__)
trace = observability.setup(flask_app=app, requests_enabled=True)
trace = observability.setup(requests_enabled=True)
db = database()
subscriber = pubsub_v1.SubscriberClient()


class Orchestration:
def __init__(self, request):
request = request.get_json()
self.platform = request["platform"]
self.channel = request["channel"]
self.thread_ts = request["thread_ts"]
self.user = request["user"]
self.text = request["text"]
def __init__(self, message):
message = json.loads(message.data)
self.platform = message["platform"]
self.channel = message["channel"]
self.thread_ts = message["thread_ts"]
self.user = message["user"]
self.text = message["text"]
self.confirmed = False
self.confirmation_text = None
self.action = None
Expand Down Expand Up @@ -296,15 +298,13 @@ def fail_interpolation(self):
abort(400)


@app.route("/v1/orchestrate", methods=["POST"])
def orchestrate():
orchestration = Orchestration(request)
def callback(message):
orchestration = Orchestration(message)
orchestration.insert_post()
orchestration.confirmation()
if not orchestration.confirmed:
orchestration.get_action()
orchestration.send_confirmation()
return ""
elif orchestration.user_is_confirming():
orchestration.get_action()
if orchestration.is_gbash_async():
Expand All @@ -320,19 +320,15 @@ def orchestrate():
# Fallback is to defer to async task system
elif orchestration.is_async():
orchestration.send_task()
return ""
elif not orchestration.user_is_confirming():
orchestration.send_rejection()
return ""
else:
abort(400)

message.ack()

@app.route("/metrics")
def metrics():
return generate_latest()

streaming_pull_future = subscriber.subscribe(
f"projects/{PUBSUB_PROJECT}/subscriptions/isidro-requests-orchestration",
callback=callback,
)

@app.route("/", methods=["GET"])
def health():
return ""
with subscriber:
streaming_pull_future.result()
Loading

0 comments on commit fd7f9dc

Please sign in to comment.