Skip to content

Commit

Permalink
Release 0.5.4
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Nov 22, 2024
1 parent 95f83a7 commit 3508bdd
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 14 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ the project!
To test Ox, use the following dependency, using either [sbt](https://www.scala-sbt.org):

```scala
"com.softwaremill.ox" %% "core" % "0.5.3"
"com.softwaremill.ox" %% "core" % "0.5.4"
```

Or [scala-cli](https://scala-cli.virtuslab.org):

```scala
//> using dep "com.softwaremill.ox::core:0.5.3"
//> using dep "com.softwaremill.ox::core:0.5.4"
```

Documentation is available at [https://ox.softwaremill.com](https://ox.softwaremill.com), ScalaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.ox).
Expand Down
2 changes: 1 addition & 1 deletion generated-doc/out/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Safe direct-style concurrency and resiliency for Scala on the JVM. Requires JDK 21 & Scala 3.

To start using Ox, add the `com.softwaremill.ox::core:0.5.3` [dependency](info/dependency.md) to your project.
To start using Ox, add the `com.softwaremill.ox::core:0.5.4` [dependency](info/dependency.md) to your project.
Then, take a look at the tour of Ox, or follow one of the topics listed in the menu to get to know Ox's API!

In addition to this documentation, ScalaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.ox).
Expand Down
4 changes: 2 additions & 2 deletions generated-doc/out/info/dependency.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ To use ox core in your project, add:

```scala
// sbt dependency
"com.softwaremill.ox" %% "core" % "0.5.3"
"com.softwaremill.ox" %% "core" % "0.5.4"

// scala-cli dependency
//> using dep com.softwaremill.ox::core:0.5.3
//> using dep com.softwaremill.ox::core:0.5.4
```

Ox core depends only on the Java [jox](https://github.com/softwaremill/jox) project, where channels are implemented. There are no other direct or transitive dependencies.
Expand Down
17 changes: 11 additions & 6 deletions generated-doc/out/integrations/kafka.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Kafka sources & drains
# Kafka flows

Dependency:

```scala
"com.softwaremill.ox" %% "kafka" % "0.5.3"
"com.softwaremill.ox" %% "kafka" % "0.5.4"
```

`Flow`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through
Expand All @@ -17,7 +17,8 @@ To read from a Kafka topic, use:
import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage}
import ox.kafka.ConsumerSettings.AutoOffsetReset

val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092")
.autoOffsetReset(AutoOffsetReset.Earliest)
val topic = "my_topic"

val source = KafkaFlow.subscribe(settings, topic)
Expand Down Expand Up @@ -49,7 +50,9 @@ In order to do so, a `Flow[SendPacket]` needs to be created. The definition of `
import org.apache.kafka.clients.producer.ProducerRecord
import ox.kafka.ReceivedMessage

case class SendPacket[K, V](send: List[ProducerRecord[K, V]], commit: List[ReceivedMessage[_, _]])
case class SendPacket[K, V](
send: List[ProducerRecord[K, V]],
commit: List[ReceivedMessage[_, _]])
```

The `send` list contains the messages to be sent (each message is a Kafka `ProducerRecord`). The `commit` list contains
Expand All @@ -63,15 +66,17 @@ import ox.kafka.ConsumerSettings.AutoOffsetReset
import ox.pipe
import org.apache.kafka.clients.producer.ProducerRecord

val consumerSettings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
val consumerSettings = ConsumerSettings.default("my_group")
.bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092")
val sourceTopic = "source_topic"
val destTopic = "dest_topic"

KafkaFlow
.subscribe(consumerSettings, sourceTopic)
.map(in => (in.value.toLong * 2, in))
.map((value, original) => SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
.map((value, original) =>
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
.pipe(KafkaDrain.runPublishAndCommit(producerSettings))
```

Expand Down
2 changes: 1 addition & 1 deletion generated-doc/out/integrations/mdc-logback.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Dependency:

```scala
"com.softwaremill.ox" %% "mdc-logback" % "0.5.3"
"com.softwaremill.ox" %% "mdc-logback" % "0.5.4"
```

Ox provides support for setting inheritable MDC (mapped diagnostic context) values, when using the [Logback](https://logback.qos.ch)
Expand Down
40 changes: 39 additions & 1 deletion generated-doc/out/streaming/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,42 @@ import ox.flow.Flow
Flow.fromValues(1, 2, 3)
.tap(n => println(s"Received: $n"))
.runToList()
```
```

## Reactive streams interoperability

### Flow -> Publisher

A `Flow` can be converted to a `java.util.concurrent.Flow.Publisher` using the `.toPublisher` method.

This needs to be run within an `Ox` concurrency scope, as upon subscribing, a fork is created to run the publishing
process. Hence, the scope should remain active as long as the publisher is used.

Internally, elements emitted by the flow are buffered, using a buffer of capacity given by the `BufferCapacity` in
scope.

To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the following dependency and import, to
bring the `toReactiveStreamsPublisher` method into scope:

```scala
// sbt dependency: "com.softwaremill.ox" %% "flow-reactive-streams" % "0.5.4"

import ox.supervised
import ox.flow.Flow
import ox.flow.reactive.*

val myFlow: Flow[Int] = ???
supervised:
myFlow.toReactiveStreamsPublisher: org.reactivestreams.Publisher[Int]
// use the publisher
```

### Publisher -> Flow

A `java.util.concurrent.Flow.Publisher` can be converted to a `Flow` using `Flow.fromPublisher`.

Internally, elements published to the subscription are buffered, using a buffer of capacity given by the
`BufferCapacity` in scope. That's also how many elements will be at most requested from the publisher at a time.

To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and call the
`FlowReactiveStreams.fromPublisher` method.
2 changes: 1 addition & 1 deletion generated-doc/out/streaming/selecting-from-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Channel clauses include:
* `Sink.sendClause(value)` - to send a value to a channel
* `Default(value)` - to return the given value from the `select`, if no other clause can be immediately satisfied

## Receiving from exactly one channel
## Receiving exactly one value from multiple channels

The most common use-case for `select` is to receive exactly one value from a number of channels. There's a dedicated
`select` variant for this use-case, which accepts a number of `Source`s, for which receive clauses are created. The
Expand Down

0 comments on commit 3508bdd

Please sign in to comment.