Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jagged bulk update #187

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .env.test-server
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Any setting defined in the balsam.server.conf.Settings class
# can be set as an environment variable below.
# Settings use either the BALSAM_, BALSAM_AUTH_, or BALSAM_OAUTH_ prefix,
# depending on the category

# docker-compose.yml specific environ
# Relative to CWD:
export GUNICORN_CONFIG_FILE="balsam/server/gunicorn.conf.py"
export SERVER_PORT=8888


# Logging
export BALSAM_LOG_LEVEL=DEBUG
export BALSAM_LOG_DIR="./balsam-logs"

# Server (pass-through directly to server/conf.py)
export BALSAM_DATABASE_URL="postgresql://postgres:postgres@postgres-test:5432/balsam"
export BALSAM_REDIS_PARAMS='{"host": "redis", "port": "6379"}'

# Balsam API Auth
export BALSAM_AUTH_SECRET_KEY="SOME_SECRET_KEY"
export BALSAM_AUTH_TOKEN_TTL=259200
export BALSAM_AUTH_LOGIN_METHODS='["password"]'

# Login with external OAuth Provider
export BALSAM_OAUTH_CLIENT_ID="SOME_CLIENT_ID"
export BALSAM_OAUTH_CLIENT_SECRET="SOME_CLIENT_SECRET"
export BALSAM_OAUTH_SCOPE="read_basic_user_data"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__

.*
!.env.example
!.env.test-server
!.gitignore
!.readthedocs.yml
!.readthedocs.yaml
Expand Down
34 changes: 31 additions & 3 deletions balsam/server/models/crud/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def owned_job_selector(owner: schemas.UserOut, columns: Optional[List[Column[Any]]] = None) -> Select:
if columns is None:
stmt = select(models.Job.__table__) # type: ignore[arg-type]
stmt = select(models.Job.__table__) # type: ignore[arg-type] # noqa
else:
stmt = select(columns)
return ( # type: ignore
Expand Down Expand Up @@ -249,7 +249,7 @@ def update_waiting_children(db: Session, finished_parent_ids: Iterable[int]) ->
qs = (
select((models.Job.__table__,))
.where(models.Job.state == "AWAITING_PARENTS")
.where(models.Job.parent_ids.overlap(finished_parent_ids)) # type: ignore[attr-defined]
.where(models.Job.parent_ids.overlap(finished_parent_ids)) # type: ignore[attr-defined] # noqa
)
children, transfer_items_by_jobid = select_jobs_for_update(db, qs, extra_cols=[models.Job.parent_ids])

Expand Down Expand Up @@ -328,10 +328,38 @@ def do_update_jobs(
update_jobs: List[Any],
transfer_items_by_jobid: Dict[int, List[Any]],
patch_dicts: Dict[int, Dict[str, Any]],
) -> None:
# This wrapper to the original do_update_jobs was created because
# sometimes the list of jobs to be updated have different columns
# and SQLAlchemy does not support bulk updates unless every row
# contains the same columns to be updated. This code loops over
# the jobs and groups them by unique tuples of column names, then
# calls the original update function on each group.
grouped_update_jobs: Dict[Tuple[str, ...], List[Any]] = {}
grouped_patch_dicts: Dict[Tuple[str, ...], Dict[int, Any]] = {}

for job in update_jobs:
update_data = patch_dicts[job.id]
keytuple = tuple(sorted(list(update_data.keys())))
try:
grouped_update_jobs[keytuple].append(job)
grouped_patch_dicts[keytuple][job.id] = patch_dicts[job.id]
except KeyError:
grouped_update_jobs[keytuple] = [job]
grouped_patch_dicts[keytuple] = {job.id: patch_dicts[job.id]}

for keys, jobs in grouped_update_jobs.items():
do_update_jobs_orig(db, jobs, transfer_items_by_jobid, grouped_patch_dicts[keys])


def do_update_jobs_orig(
db: Session,
update_jobs: List[Any],
transfer_items_by_jobid: Dict[int, List[Any]],
patch_dicts: Dict[int, Dict[str, Any]],
) -> None:
events: List[Dict[str, Any]] = []
ready_transfers: List[int] = []

# First, perform job-wise updates:
for job in update_jobs:
update_data = patch_dicts[job.id]
Expand Down
70 changes: 70 additions & 0 deletions docker-compose.test-server.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
version: "3.9"

services:
gunicorn:
container_name: gunicorn-test
build: .
image: masalim2/balsam
restart: always
ports:
- ${SERVER_PORT}:${SERVER_PORT}
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
# Vars in env_file are exported to the containers
# Vars in ".env" specifically are also usable in the compose file as ${VAR}
env_file: ".env"
environment:
SERVER_PORT: ${SERVER_PORT}
BALSAM_LOG_DIR: ${BALSAM_LOG_DIR}
volumes:
- "${BALSAM_LOG_DIR}:/balsam/log"
- "./balsam:/balsam/balsam:ro"
- "./tests:/balsam/tests:ro"
- "${PWD}/${GUNICORN_CONFIG_FILE}:/balsam/gunicorn.conf.py:ro" # Must be abs path
networks:
- balsam-test

postgres:
container_name: postgres-test
image: postgres
restart: always
ports:
- 5433:5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: balsam
volumes:
- "pgdata-test:/var/lib/postgresql/data"
command: "-c log_min_duration_statement=0"
logging:
options:
max-size: "50m"
max-file: "5"
driver: "json-file"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 6
networks:
- balsam-test

redis:
container_name: redis-test
image: redis
restart: always
ports:
- 6380:6379
networks:
- balsam-test

volumes:
pgdata-test:

networks:
balsam-test:
driver: bridge