Skip to content

Commit

Permalink
Simplify Job: remove allJobStatus, jobStatus and writeStatus.
Browse files Browse the repository at this point in the history
allJobStatus and jobStatus were just used by JobManager and are
now implemented just there (using jobStatusRepo).
writeStatus is extracted as trait WriteStatus and can be mixed in if
needed.

Now Job no longer depends on the JobStatusRepository which is another
benefit (less dependencies).
  • Loading branch information
magro committed Dec 27, 2014
1 parent c1f9006 commit cf6d92a
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/de/kaufhof/hajobs/ActorJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package de.kaufhof.hajobs
import akka.actor._

import scala.concurrent.Future

import scala.concurrent.duration._
import scala.language.postfixOps

Expand All @@ -16,11 +17,10 @@ import scala.language.postfixOps
class ActorJob(jobType: JobType,
props: JobContext => Props,
system: ActorSystem,
jobStatusRepository: JobStatusRepository,
retriggerCount: Int = 0,
cronExpression: Option[String] = None,
lockTimeout: FiniteDuration = 60 seconds)
extends Job(jobType, jobStatusRepository, retriggerCount, cronExpression, lockTimeout) {
extends Job(jobType, retriggerCount, cronExpression, lockTimeout) {

private var actor: Option[ActorRef] = None

Expand Down
9 changes: 0 additions & 9 deletions src/main/scala/de/kaufhof/hajobs/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ case class JobContext(jobId: UUID, triggerId: UUID, finishCallback: () => Unit)
* Base class for jobs, wraps job execution and status.
*/
abstract class Job(val jobType: JobType,
jobStatusRepository: JobStatusRepository,
val retriggerCount: Int,
val cronExpression: Option[String] = None,
val lockTimeout: FiniteDuration = 60 seconds) {
Expand All @@ -53,12 +52,4 @@ abstract class Job(val jobType: JobType,
*/
def cancel()

def allJobStatus: Future[List[JobStatus]] = jobStatusRepository.list(jobType)

def jobStatus(jobId: UUID): Future[Option[JobStatus]] = jobStatusRepository.get(jobType, jobId)

protected def writeStatus(jobState: JobState, content: Option[JsValue] = None)(implicit jobContext: JobContext): Future[JobStatus] = {
val status = JobStatus(jobContext.triggerId, jobType, jobContext.jobId, jobState, JobStatus.stateToResult(jobState), DateTime.now(), content)
jobStatusRepository.save(status)
}
}
10 changes: 6 additions & 4 deletions src/main/scala/de/kaufhof/hajobs/JobManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import scala.util.control.NonFatal
*/
class JobManager(managedJobs: => Jobs,
lockRepo: LockRepository,
jobStatusRepo: JobStatusRepository,
actorSystem: ActorSystem,
scheduler: Scheduler,
enableJobScheduling: Boolean) {

def this(jobs: Seq[Job], lockRepo: LockRepository, actorSystem: ActorSystem, sched: Scheduler = JobManager.createScheduler, enableJobScheduling: Boolean = true) =
this(Jobs(jobs), lockRepo, actorSystem, sched, enableJobScheduling)
def this(jobs: Seq[Job], lockRepo: LockRepository, jobStatusRepo: JobStatusRepository, actorSystem: ActorSystem, sched: Scheduler = JobManager.createScheduler, enableJobScheduling: Boolean = true) =
this(Jobs(jobs), lockRepo, jobStatusRepo, actorSystem, sched, enableJobScheduling)

init()

Expand Down Expand Up @@ -151,9 +152,10 @@ class JobManager(managedJobs: => Jobs,

def getJob(jobType: JobType): Job = managedJobs(jobType)

def allJobStatus(jobType: JobType): Future[List[JobStatus]] = managedJobs.get(jobType).get.allJobStatus
def allJobStatus(jobType: JobType): Future[List[JobStatus]] = jobStatusRepo.list(jobType)

def jobStatus(jobType: JobType, jobId: UUID): Future[Option[JobStatus]] = jobStatusRepo.get(jobType, jobId)

def jobStatus(jobType: JobType, jobId: UUID): Future[Option[JobStatus]] = managedJobs.get(jobType).get.jobStatus(jobId)
}

object JobManager {
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/de/kaufhof/hajobs/JobManagerModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ abstract class JobManagerModule extends Module {
// Non-lazy because the job manager should start cron schedules
binding toNonLazy new JobManager(jobMap,
inject[LockRepository],
inject[JobStatusRepository],
actorSystem,
scheduler,
inject[Configuration].getBoolean("job.import.enableJobTriggering").getOrElse(true)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/de/kaufhof/hajobs/JobSupervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.util.{Failure, Success}
class JobSupervisor(jobManager: => JobManager,
jobUpdater: JobUpdater,
jobStatusRepository: JobStatusRepository,
cronExpression: Option[String]) extends Job(JobTypes.JobSupervisor, jobStatusRepository, 0, cronExpression) {
cronExpression: Option[String]) extends Job(JobTypes.JobSupervisor, 0, cronExpression) {

def this(jobManager: => JobManager,
lockRepository: LockRepository,
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/de/kaufhof/hajobs/WriteStatus.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package de.kaufhof.hajobs

import JobState.JobState
import org.joda.time.DateTime
import play.api.libs.json.JsValue

import scala.concurrent.Future

/**
* Supports shortcut to store the job status, can be mixed into Jobs.
*/
trait WriteStatus {

def jobStatusRepository: JobStatusRepository
def jobType: JobType

protected def writeStatus(jobState: JobState, content: Option[JsValue] = None)(implicit jobContext: JobContext): Future[JobStatus] = {
val status = JobStatus(jobContext.triggerId, jobType, jobContext.jobId, jobState, JobStatus.stateToResult(jobState), DateTime.now(), content)
jobStatusRepository.save(status)
}

}
2 changes: 1 addition & 1 deletion src/test/scala/de/kaufhof/hajobs/ActorJobSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ActorJobSpec extends StandardSpec with TestKitBase {
val testProbe = new TestProbe(system)
val jobType = new JobType("actorJob", new LockType("actorJobLock"))
val props = Props(actor(testProbe.ref))
val job = new ActorJob(jobType, _ => props, system, mock[JobStatusRepository])
val job = new ActorJob(jobType, _ => props, system)

var finishCalled = new AtomicBoolean(false)
implicit val context = JobContext(randomUUID(), randomUUID(), () => finishCalled.set(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class JobManagerIntegrationSpec extends CassandraSpec with DefaultAwaitTimeout w
val cdl = new CountDownLatch(1)
val mockedScheduler = mock[Scheduler]

val manager = new JobManager(Seq(new Job1(jobStatusRepository, cdl), new Job12(jobStatusRepository)), lockRepository, mock[ActorSystem], mockedScheduler, false)
val manager = new JobManager(Seq(new Job1(jobStatusRepository, cdl), new Job12(jobStatusRepository)), lockRepository, jobStatusRepository, mock[ActorSystem], mockedScheduler, false)
manager.triggerJob(JobType1)

eventually {
Expand All @@ -39,7 +39,7 @@ class JobManagerIntegrationSpec extends CassandraSpec with DefaultAwaitTimeout w
}

object JobManagerIntegrationSpec {
class Job1(jobStatusRepository: JobStatusRepository, cdl: CountDownLatch) extends Job(JobType1, jobStatusRepository, 3) {
class Job1(jobStatusRepository: JobStatusRepository, cdl: CountDownLatch) extends Job(JobType1, 3) {
override def run()(implicit context: JobContext): Future[JobStartStatus] = {
import scala.concurrent.ExecutionContext.Implicits.global
Future {
Expand All @@ -54,7 +54,7 @@ object JobManagerIntegrationSpec {

object JobType12 extends JobType("testJob12", JobType1.lockType)

class Job12(jobStatusRepository: JobStatusRepository) extends Job(JobType12, jobStatusRepository, 3) {
class Job12(jobStatusRepository: JobStatusRepository) extends Job(JobType12, 3) {
override def run()(implicit context: JobContext): Future[JobStartStatus] = {
context.finishCallback()
Future.successful(Started(context.jobId))
Expand Down
13 changes: 6 additions & 7 deletions src/test/scala/de/kaufhof/hajobs/JobManagerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class JobManagerSpec extends StandardSpec {
when(job.cronExpression).thenReturn(Some("* * * * * ?"))


val manager = new JobManager(Seq(job), lockRepository, actorSystem, scheduler, true)
val manager = new JobManager(Seq(job), lockRepository, jobStatusRepository, actorSystem, scheduler, true)

eventually(Timeout(scaled(3 seconds))) {
verify(job, atLeastOnce()).run()(any[JobContext])
Expand All @@ -64,7 +64,7 @@ class JobManagerSpec extends StandardSpec {
when(job.jobType).thenReturn(JobType1)
when(job.cronExpression).thenReturn(None)

val manager = new JobManager(Seq(job), lockRepository, actorSystem, mockedScheduler, true)
val manager = new JobManager(Seq(job), lockRepository, jobStatusRepository, actorSystem, mockedScheduler, true)
verify(mockedScheduler, times(1)).start()
verifyNoMoreInteractions(mockedScheduler)
}
Expand All @@ -76,7 +76,7 @@ class JobManagerSpec extends StandardSpec {
when(job.cronExpression).thenReturn(Some("* * * * * ?"))

val jobUpdater = new JobUpdater(lockRepository, jobStatusRepository)
val manager = new JobManager(Seq(job), lockRepository, actorSystem, mockedScheduler, false)
val manager = new JobManager(Seq(job), lockRepository, jobStatusRepository, actorSystem, mockedScheduler, false)
verifyNoMoreInteractions(mockedScheduler)
}
}
Expand All @@ -85,7 +85,7 @@ class JobManagerSpec extends StandardSpec {
"release lock after a synchronous job finished" in {
val job = new TestJob(jobStatusRepository)

val manager = new JobManager(Seq(job), lockRepository, actorSystem, scheduler, false)
val manager = new JobManager(Seq(job), lockRepository, jobStatusRepository, actorSystem, scheduler, false)
await(manager.retriggerJob(JobType1, UUIDs.timeBased()))

verify(lockRepository, times(1)).acquireLock(any(), any(), any())
Expand All @@ -100,7 +100,7 @@ class JobManagerSpec extends StandardSpec {
when(job.jobType).thenReturn(JobType1)
when(job.run()(any())).thenReturn(Future.failed(new RuntimeException("test exception")))

val manager = new JobManager(Seq(job), lockRepository, actorSystem, mockedScheduler, false)
val manager = new JobManager(Seq(job), lockRepository, jobStatusRepository, actorSystem, mockedScheduler, false)
await(manager.retriggerJob(JobType1, UUIDs.timeBased()))
verify(lockRepository, times(1)).acquireLock(any(), any(), any())
verify(lockRepository, times(1)).releaseLock(any(), any())
Expand All @@ -111,8 +111,7 @@ class JobManagerSpec extends StandardSpec {
}

object JobManagerSpec {
class TestJob(jobStatusRepository: JobStatusRepository) extends Job(JobType1,
jobStatusRepository, 0) {
class TestJob(jobStatusRepository: JobStatusRepository) extends Job(JobType1, 0) {
override def run()(implicit context: JobContext): Future[JobStartStatus] = {
context.finishCallback()
Future.successful(Started(context.jobId))
Expand Down
19 changes: 10 additions & 9 deletions src/test/scala/de/kaufhof/hajobs/JobStatusRepositorySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.UUID

import com.datastax.driver.core.utils.UUIDs
import de.kaufhof.hajobs.testutils.CassandraSpec
import JobState._
import org.joda.time.DateTime
import play.api.libs.json.Json

Expand All @@ -28,9 +29,9 @@ class JobStatusRepositorySpec extends CassandraSpec {
"get a job status by id" in {
assume(await(repo.getAllMetadata()) === List.empty)
// given
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, UUIDs.timeBased(), JobState.Finished, JobResult.Success, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, UUIDs.timeBased(), Finished, JobResult.Success, DateTime.now, Some(Json.toJson("muhmuh")))
// a job status without content should also be loaded
val jobStatus2: JobStatus = JobStatus(anyTriggerId, type1, UUIDs.timeBased(), JobState.Finished, JobResult.Success, DateTime.now)
val jobStatus2: JobStatus = JobStatus(anyTriggerId, type1, UUIDs.timeBased(), Finished, JobResult.Success, DateTime.now)

// when
await(Future.sequence((Seq(repo.save(jobStatus1), repo.save(jobStatus2)))))
Expand All @@ -52,8 +53,8 @@ class JobStatusRepositorySpec extends CassandraSpec {
// given
val jobId1: UUID = UUIDs.timeBased()
val jobId2: UUID = UUIDs.timeBased()
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, jobId1, JobState.Finished, JobResult.Failed, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus2: JobStatus = JobStatus(anyTriggerId, type1, jobId2, JobState.Finished, JobResult.Failed, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, jobId1, Finished, JobResult.Failed, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus2: JobStatus = JobStatus(anyTriggerId, type1, jobId2, Finished, JobResult.Failed, DateTime.now, Some(Json.toJson("muhmuh")))

// when
await(Future.sequence(Seq(repo.save(jobStatus1), repo.save(jobStatus2))))
Expand All @@ -68,8 +69,8 @@ class JobStatusRepositorySpec extends CassandraSpec {
assume(await(repo.getAllMetadata()) === List.empty)
val jobId1: UUID = UUIDs.timeBased()
val jobId2: UUID = UUIDs.timeBased()
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, jobId1, JobState.Running, JobResult.Pending, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus2: JobStatus = JobStatus(anyTriggerId, type2, jobId2, JobState.Running, JobResult.Pending, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, jobId1, Running, JobResult.Pending, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus2: JobStatus = JobStatus(anyTriggerId, type2, jobId2, Running, JobResult.Pending, DateTime.now, Some(Json.toJson("muhmuh")))

// when
await(Future.sequence(Seq(repo.save(jobStatus1), repo.save(jobStatus2))))
Expand All @@ -84,21 +85,21 @@ class JobStatusRepositorySpec extends CassandraSpec {
"update meta data and insert data on update" in {
assume(await(repo.getAllMetadata()) === List.empty)
val jobId1: UUID = UUIDs.timeBased()
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, jobId1, JobState.Finished, JobResult.Failed, DateTime.now, Some(Json.toJson("muhmuh")))
val jobStatus1: JobStatus = JobStatus(anyTriggerId, type1, jobId1, Finished, JobResult.Failed, DateTime.now, Some(Json.toJson("muhmuh")))

// when
await(repo.save(jobStatus1))
eventually{
await(repo.getAllMetadata()).find(_.jobId == jobId1).map(_.jobState).value should be (jobStatus1.jobState)
}

val jobStatus2 = await(repo.updateJobState(jobStatus1, JobState.Canceled))
val jobStatus2 = await(repo.updateJobState(jobStatus1, Canceled))
eventually{
await(repo.getAllMetadata()).find(_.jobId == jobId1).map(_.jobState).value should be (jobStatus2.jobState)
}

eventually {
await(repo.getJobHistory(type1, jobId1)).map(_.jobState) should contain theSameElementsAs (Seq(JobState.Canceled, JobState.Finished))
await(repo.getJobHistory(type1, jobId1)).map(_.jobState) should contain theSameElementsAs (Seq(Canceled, Finished))
}
}
}
Expand Down

0 comments on commit cf6d92a

Please sign in to comment.