Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gradio #44

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker-compose.yml
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM manimcommunity/manim as system-dependencies
USER root
RUN apt-get update \
&& apt-get install -y -q sox libsox-fmt-all portaudio19-dev

FROM system-dependencies
USER ${NB_USER}

WORKDIR /src/manim_voiceover
RUN pip install --no-cache-dir gtts>=2.2.4 gradio>=3.23.0
COPY . ./
RUN pip install .[gtts,gradio]
WORKDIR /manim
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# docker run -it --name my-manim-container -v "/full/path/to/your/directory:/manim" manimcommunity/manim /bin/bash
version: "3.3"
services:
manim-server:
image: manim-voiceover
build:
dockerfile: ./Dockerfile
volumes:
- ./out-data:/manim/out-data
- ./notebooks/:/manim/notebooks
- .:/manim/manim-voiceover
ports:
- 10123:10123 # jupyter
- 10124:10124 # gradio
working_dir: /manim/manim-voiceover/
command: jupyter lab --allow-root --ip=0.0.0.0 --port 10123 ;
26 changes: 26 additions & 0 deletions examples/gradio-example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from manim import *
from manim_voiceover import *
from manim_voiceover.services.gradio import GradioRecordingService

class GradioExample(VoiceoverScene):
def construct(self):
self.set_speech_service(
GradioRecordingService(user_filler=True)
)

circle = Circle()
square = Square().shift(2 * RIGHT)

with self.voiceover(text="This circle is drawn as I speak.") as tracker:
self.play(Create(circle), run_time=tracker.duration)

with self.voiceover(text="Let's shift it to the left 2 units.") as tracker:
self.play(circle.animate.shift(2 * LEFT), run_time=tracker.duration)

with self.voiceover(text="Now, let's transform it into a square.") as tracker:
self.play(Transform(circle, square), run_time=tracker.duration)

with self.voiceover(text="Thank you for watching."):
self.play(Uncreate(circle))

self.wait()
139 changes: 139 additions & 0 deletions manim_voiceover/services/gradio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from queue import Queue
from manim import config
from manim_voiceover.services.base import SpeechService
from manim_voiceover.helper import prompt_ask_missing_extras
from subprocess import Popen, DEVNULL, PIPE
from pathlib import Path

class GradioRecordingService(SpeechService):
'''
The idea here is to show how to create create an asyncrhonous queue
using a generator
'''
def __init__(self, filler_duration=False, inline=True, port=None, **kwargs):
SpeechService.__init__(self, **kwargs)
self._text_queue = None
self._audio_queue = None
self._interface = None
self.use_filler = False
self.inline = inline
self.filler_duration = 1.0
self.gradio_server_port = port


def generate_from_text(
self, text: str, cache_dir: str = None, path: str = None, duration=1, **kwargs
) -> dict:

if cache_dir is None:
cache_dir = self.cache_dir
cache_path = Path(cache_dir)

if self.use_filler or config.dry_run:
input_data = {"input_text": text, "service": "gradio", "filler": True,"duration": duration or self.filler_duration}
audio_path = f'quiet-{input_data["duration"]:.2f}.mp3'
self.generate_silence(str(cache_path / audio_path), input_data['duration'])
else:
input_data = {"input_text": text, "service": "gradio"}
if not config.disable_caching:
cached_result = self.get_cached_result(input_data, cache_dir)
if cached_result is not None and Path(cache_path / cached_result.get('original_audio', '')).is_file():
return cached_result

if path is None:
audio_path = self.get_data_hash(input_data) + ".mp3"
else:
audio_path = path
self.record_if_needed(str(cache_path / audio_path), text)

json_dict = {
"input_text": text,
"input_data": input_data,
"original_audio": audio_path,
}
return json_dict

def generate_silence(self, fname, duration):
result = Popen(f'ffmpeg -f lavfi -i anullsrc=channel_layout=mono:sample_rate=48000 -c:a mp3 -y -t {float(duration)}'.split() + [fname],
stdout=DEVNULL, stderr=DEVNULL)
if result.wait() != 0:
raise RuntimeError('Unable to generate silent audio')


def record_if_needed(self, fname, text):
if Path(fname).is_file():
return fname;

tmp_file = self.prompt_recording(text)
tmp_path = Path(tmp_file)
if tmp_path.is_file():
tmp_path.parent.mkdir(parents=True, exist_ok=True)
result = Popen(['ffmpeg', '-i', tmp_file, '-vn', '-c:a', 'mp3', '-y', fname],
stdout=DEVNULL, stderr=PIPE)
if result.wait() == 0:
return
else:
raise RuntimeError(result.stderr.read().decode('utf-8'))
else:
raise RuntimeError('File recording failed to generate file')

def prompt_recording(self, text):
if self._text_queue is None:
self._text_queue = Queue()
self._audio_queue = Queue()
self._text_queue.put(text)
if self._interface is None:
prompt_ask_missing_extras("gradio", "gradio", "GradioRecordingService")
self.create_interface()
return self._audio_queue.get()

def create_interface(self):
import gradio
try:
import IPython.display as ipd
except ImportError:
ipd = None
with gradio.Blocks() as interface:
def format_next_script_line():
line = self._text_queue.get()
if line is None:
raise StopIteration
return f'<h2>speak the following line</h2><blockquote><h2>{line}</h2></blockquote>'
prompt = gradio.HTML(format_next_script_line())
with gradio.Row(visible=True) as recorder:
audio_input = gradio.Audio(source='microphone', type='filepath')
action_btn = gradio.Button('Accept', visible=False)

def next_line(action, audio):
self._audio_queue.put(audio)
if action == 'Done':
interface.close()
return
try:
return {prompt: format_next_script_line(), audio_input: None,
action_btn: gradio.update(value='Accept', visible=False)}
except StopIteration:
return {
prompt: "Recording finished",
action_btn: gradio.update(value='Done',visible=True),
recorder: gradio.update(visible=False)
}
def audio_recorded(a):
if a is None:
return {action_btn: gradio.update(value='Accept', visible=False)}
else:
return {action_btn: gradio.update(value='Accept', visible=True)}
audio_input.change(audio_recorded, [audio_input], [action_btn])
action_btn.click(next_line, [action_btn, audio_input], [recorder, prompt, action_btn, audio_input])
self._interface = interface

if self.gradio_server_port is None:
self._interface.launch(share=True, inline=self.inline, show_tips=True)
else:
self._interface.launch(inline=False, server_port=self.gradio_server_port)
if self.inline and ipd is not None:
ipd.HTML(
f'''<div><iframe src="http://127.0.0.1:{self.gradio_server_port}/" width="{interface.width}" height="{interface.height}"
allow="autoplay; camera; microphone; clipboard-read; clipboard-write;" frameborder="0" allowfullscreen>
</iframe></div>'''
)
132 changes: 132 additions & 0 deletions manim_voiceover/services/recorder/rkey_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
def has_keyboard_listener():
try:
from pynput import keyboard;
import os
if os.environ('MANIM_USE_PYNPUT', 'yes') in ['yes', 'true', '1']:
return True
else:
return False
except:
return False

if has_keyboard_listener():
# Keyboard listener is more powerful but requires some privileges that
# could lead to a security issues.
print('Using pynput.keyboard')
from pynput import keyboard;
class RKeyListener(keyboard.Listener):
def __init__(self, verbose=False):
super(RKeyListener, self).__init__(self.on_press, self.on_release)
self.key_pressed = None
self.verbose = verbose
def on_press(self, key):
if not hasattr(key, "char"):
return True

if key.char == "r":
self.key_pressed = True
if self.verbose:
print('R key pressed')
return True

def on_release(self, key):
if not hasattr(key, "char"):
return True

if key.char == "r":
self.key_pressed = False
if self.verbose:
print('R key released')

return True
else:
print('Using readchar. If you want a system wide key listener, set environment variable MANIM_USE_PYNPUT=yes')
import readchar, threading, time
from collections import namedtuple
PastKeyboardEvent = namedtuple('PastKeyboardEvent', ['key', 'time'])
class KeyboardCapture(threading.Thread):
instance = None
@staticmethod
def get_instance():
return KeyboardCapture.instance or KeyboardCapture()

def __init__(self, autostart=True):
if KeyboardCapture.instance is None:
KeyboardCapture.instance = self
else:
raise InterruptedError("One instance of key capture already initialized")
super(KeyboardCapture, self).__init__()
self.last_key = None
self.last_time = time.time()
self.capturing = False
if autostart:
self.start()
def get_last_key(self):
return PastKeyboardEvent(self.last_key, self.last_time)

def run(self):
self.capturing = True
while self.capturing:
self.last_key = readchar.readchar()
self.last_time = time.time()
if self.last_key == '\x03':
raise KeyboardInterrupt()
def stop(self):
self.capturing = False

class RKeyListener(threading.Thread):

def __init__(self, verbose=True):
super(RKeyListener, self).__init__()
# Delay for the first repetition
self.first_repeat = 0.5
# Delay for subsequent repetitions
self.repeat_rate = 0.2
self.key_pressed = False
self.verbose = verbose

def run(self):
'''
Relies on the fact that if you hold a key it will be
entered repeatedly on the therminal to detect the press and release
events.
'''
self.keyboard = None
try:
self.keyboard = KeyboardCapture.get_instance()
self.run_logic(self.keyboard)
finally:
if self.keyboard is not None:
self.keyboard.stop()

def run_logic(self, keyboard):
prev_time = keyboard.get_last_key().time
first = False
self.listening = True
if self.verbose:
print('Start listening')
while self.listening:
k = keyboard.get_last_key()

if not self.key_pressed:
if k.key in ('r', 'R') and k.time > prev_time:
self.key_pressed = True
if self.verbose:
print('R key pressed')
first = True
time.sleep(self.first_repeat - self.repeat_rate)
else:
if k.key not in ('r', 'R') or k.time == prev_time:
if self.verbose:
print('R key released')
self.key_pressed = False
prev_time = k.time
time.sleep(self.repeat_rate)
if self.verbose:
print('Stop listening')
def stop(self):
self.keyboard.stop()
self.listening = False
self.join()
if __name__ == '__main__':
RKeyListener(verbose=True)
Loading