Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and Enable Real-time Audio Bytes to Trigger Events #1395

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions backend/database/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def get_app_by_id_db(app_id: str):
return None


def get_audio_apps_count(app_ids: List[str]):
filters = [FieldFilter('id', 'in', app_ids), FieldFilter('deleted', '==', False), FieldFilter('external_integration.triggers_on', '==', 'audio_bytes')]
apps_ref = db.collection('plugins_data').where(filter=BaseCompositeFilter('AND', filters)).count().get()
return apps_ref[0][0].value


def get_private_apps_db(uid: str) -> List:
filters = [FieldFilter('uid', '==', uid), FieldFilter('private', '==', True), FieldFilter('deleted', '==', False)]
private_apps = db.collection('plugins_data').where(filter=BaseCompositeFilter('AND', filters)).stream()
Expand Down
3 changes: 3 additions & 0 deletions backend/models/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def triggers_on_memory_creation(self) -> bool:
def triggers_realtime(self) -> bool:
return self.works_externally() and self.external_integration.triggers_on == 'transcript_processed'

def triggers_realtime_audio_bytes(self) -> bool:
return self.works_externally() and self.external_integration.triggers_on == 'audio_bytes'

def filter_proactive_notification_scopes(self, params: [str]) -> []:
if not self.proactive_notification:
return []
Expand Down
1 change: 1 addition & 0 deletions backend/routers/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def get_plugin_capabilities():
{'title': 'Chat', 'id': 'chat'},
{'title': 'Memories', 'id': 'memories'},
{'title': 'External Integration', 'id': 'external_integration', 'triggers': [
{'title': 'Audio Bytes', 'id': 'audio_bytes'},
{'title': 'Memory Creation', 'id': 'memory_creation'},
{'title': 'Transcript Processed', 'id': 'transcript_processed'},
]},
Expand Down
16 changes: 14 additions & 2 deletions backend/routers/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
from fastapi.websockets import WebSocketDisconnect, WebSocket
from starlette.websockets import WebSocketState

from utils.plugins import trigger_realtime_integrations
from utils.apps import is_audio_bytes_app_enabled
from utils.plugins import trigger_realtime_integrations, trigger_realtime_audio_bytes
from utils.webhooks import send_audio_bytes_developer_webhook, realtime_transcript_webhook, \
get_audio_bytes_webhook_seconds

router = APIRouter()


async def _websocket_util_trigger(
websocket: WebSocket, uid: str, sample_rate: int = 8000,
):
Expand All @@ -31,13 +33,16 @@ async def _websocket_util_trigger(

# audio bytes
audio_bytes_webhook_delay_seconds = get_audio_bytes_webhook_seconds(uid)
audio_bytes_trigger_delay_seconds = 5
has_audio_apps_enabled = is_audio_bytes_app_enabled(uid)

# task
async def receive_audio_bytes():
nonlocal websocket_active
nonlocal websocket_close_code

audiobuffer = bytearray()
trigger_audiobuffer = bytearray()

try:
while websocket_active:
Expand All @@ -61,9 +66,16 @@ async def receive_audio_bytes():
# Audio bytes
if header_type == 101:
audiobuffer.extend(data[4:])
trigger_audiobuffer.extend(data[4:])
if has_audio_apps_enabled and len(
trigger_audiobuffer) > sample_rate * audio_bytes_trigger_delay_seconds * 2:
asyncio.run_coroutine_threadsafe(
trigger_realtime_audio_bytes(uid, sample_rate, trigger_audiobuffer.copy()), loop)
trigger_audiobuffer = bytearray()
if audio_bytes_webhook_delay_seconds and len(
audiobuffer) > sample_rate * audio_bytes_webhook_delay_seconds * 2:
asyncio.run_coroutine_threadsafe(send_audio_bytes_developer_webhook(uid, sample_rate, audiobuffer.copy()), loop)
asyncio.run_coroutine_threadsafe(
send_audio_bytes_developer_webhook(uid, sample_rate, audiobuffer.copy()), loop)
audiobuffer = bytearray()
continue

Expand Down
3 changes: 2 additions & 1 deletion backend/routers/transcribe_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from database.redis_db import get_cached_user_geolocation
from models.memory import Memory, TranscriptSegment, MemoryStatus, Structured, Geolocation
from models.message_event import MemoryEvent, MessageEvent
from utils.apps import is_audio_bytes_app_enabled
from utils.memories.location import get_google_maps_location
from utils.memories.process_memory import process_memory
from utils.plugins import trigger_external_integrations
Expand Down Expand Up @@ -384,7 +385,7 @@ async def transcript_consume():
# Audio bytes
audio_bytes_ws = None
audio_buffers = bytearray()
audio_bytes_enabled = bool(get_audio_bytes_webhook_seconds(uid))
audio_bytes_enabled = bool(get_audio_bytes_webhook_seconds(uid)) or is_audio_bytes_app_enabled(uid)

def audio_bytes_send(audio_bytes):
nonlocal audio_buffers
Expand Down
28 changes: 20 additions & 8 deletions backend/utils/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
get_app_usage_count_db, get_app_memory_created_integration_usage_count_db, get_app_memory_prompt_usage_count_db, \
add_tester_db, add_app_access_for_tester_db, remove_app_access_for_tester_db, remove_tester_db, \
is_tester_db, can_tester_access_app_db, get_apps_for_tester_db, get_app_chat_message_sent_usage_count_db, \
update_app_in_db
update_app_in_db, get_audio_apps_count
from database.redis_db import get_enabled_plugins, get_plugin_reviews, get_generic_cache, \
set_generic_cache, set_app_usage_history_cache, get_app_usage_history_cache, get_app_money_made_cache, \
set_app_money_made_cache, get_plugins_installs_count, get_plugins_reviews, get_app_cache_by_id, set_app_cache_by_id, \
Expand All @@ -17,8 +17,8 @@
from models.app import App, UsageHistoryItem, UsageHistoryType
from utils import stripe


MarketplaceAppReviewUIDs = os.getenv('MARKETPLACE_APP_REVIEWERS').split(',') if os.getenv('MARKETPLACE_APP_REVIEWERS') else []
MarketplaceAppReviewUIDs = os.getenv('MARKETPLACE_APP_REVIEWERS').split(',') if os.getenv(
'MARKETPLACE_APP_REVIEWERS') else []


# ********************************
Expand Down Expand Up @@ -48,6 +48,7 @@ def add_app_access_for_tester(app_id: str, uid: str):
def remove_app_access_for_tester(app_id: str, uid: str):
remove_app_access_for_tester_db(app_id, uid)


# ********************************

def weighted_rating(plugin):
Expand Down Expand Up @@ -144,7 +145,7 @@ def get_available_app_by_id_with_reviews(app_id: str, uid: str | None) -> dict |

# install
plugins_install = get_plugins_installs_count([app['id']])
app['installs'] = plugins_install.get(app['id'],0)
app['installs'] = plugins_install.get(app['id'], 0)
return app


Expand Down Expand Up @@ -175,7 +176,7 @@ def get_approved_available_apps(include_reviews: bool = False) -> list[App]:
apps = []
for app in all_apps:
app_dict = app
app_dict['installs'] = plugins_install.get(app['id'],0)
app_dict['installs'] = plugins_install.get(app['id'], 0)
if include_reviews:
reviews = plugins_review.get(app['id'], {})
sorted_reviews = reviews.values()
Expand Down Expand Up @@ -273,7 +274,9 @@ def get_app_money_made(app_id: str) -> dict[str, int | float]:

return money

def upsert_app_payment_link(app_id: str, is_paid_app: bool, price: float, payment_plan: str, previous_price: float | None = None):

def upsert_app_payment_link(app_id: str, is_paid_app: bool, price: float, payment_plan: str,
previous_price: float | None = None):
if not is_paid_app:
print(f"App is not a paid app, app_id: {app_id}")
return None
Expand Down Expand Up @@ -305,7 +308,7 @@ def upsert_app_payment_link(app_id: str, is_paid_app: bool, price: float, paymen
app.payment_product_id = payment_product.id

# price
payment_price = stripe.create_app_monthly_recurring_price(app.payment_product_id, int(price*100))
payment_price = stripe.create_app_monthly_recurring_price(app.payment_product_id, int(price * 100))
app.payment_price_id = payment_price.id

# payment link
Expand All @@ -317,17 +320,26 @@ def upsert_app_payment_link(app_id: str, is_paid_app: bool, price: float, paymen
update_app_in_db(app.dict())
return app


def get_is_user_paid_app(app_id: str, uid: str):
if uid in MarketplaceAppReviewUIDs:
return True
return get_user_paid_app(app_id, uid) is not None


def is_permit_payment_plan_get(uid: str):
if uid in MarketplaceAppReviewUIDs:
return False

return True


def paid_app(app_id: str, uid: str):
expired_seconds = 60*60*24*30 # 30 days
expired_seconds = 60 * 60 * 24 * 30 # 30 days
set_user_paid_app(app_id, uid, expired_seconds)


def is_audio_bytes_app_enabled(uid: str):
enabled_apps = get_enabled_plugins(uid)
audio_apps_count = get_audio_apps_count(enabled_apps)
return audio_apps_count > 0
50 changes: 49 additions & 1 deletion backend/utils/plugins.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import threading
from typing import List, Optional
from typing import List
import os
import requests
import time
Expand Down Expand Up @@ -230,6 +230,11 @@ async def trigger_realtime_integrations(uid: str, segments: list[dict], memory_i
_trigger_realtime_integrations(uid, token, segments, memory_id)


async def trigger_realtime_audio_bytes(uid: str, sample_rate: int, data: bytearray):
"""REALTIME AUDIO STREAMING"""
_trigger_realtime_audio_bytes(uid, sample_rate, data)


# proactive notification
def _retrieve_contextual_memories(uid: str, user_context):
vector = (
Expand Down Expand Up @@ -325,6 +330,49 @@ def _process_proactive_notification(uid: str, token: str, plugin: App, data):
return message


def _trigger_realtime_audio_bytes(uid: str, sample_rate: int, data: bytearray):
apps: List[App] = get_available_apps(uid)
filtered_apps = [
app for app in apps if
app.triggers_realtime_audio_bytes() and app.enabled and not app.deleted
]
if not filtered_apps:
return {}

threads = []
results = {}

def _single(app: App):
if not app.external_integration.webhook_url:
return

url = app.external_integration.webhook_url
url += f'?sample_rate={sample_rate}&uid={uid}'
try:
response = requests.post(url, data=data, headers={'Content-Type': 'application/octet-stream'}, timeout=15)
if response.status_code != 200:
print('trigger_realtime_audio_bytes', app.id, 'status:', response.status_code, 'results:',
response.text[:100])
return

response_data = response.json()
if not response_data:
return

except Exception as e:
print(f"Plugin integration error: {e}")
return

for app in filtered_apps:
threads.append(threading.Thread(target=_single, args=(app,)))

[t.start() for t in threads]
[t.join() for t in threads]

return results



def _trigger_realtime_integrations(uid: str, token: str, segments: List[dict], memory_id: str | None) -> dict:
apps: List[App] = get_available_apps(uid)
filtered_apps = [
Expand Down