Skip to content

Commit

Permalink
Factor out more common code in examples
Browse files Browse the repository at this point in the history
  • Loading branch information
satabin committed Feb 7, 2024
1 parent b213684 commit 3d8a402
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
26 changes: 12 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] =
// results are non important
.drain

def program(publisher: QueuePublisher[String], subscriber: QueueSubscriber[String]): IO[Unit] =
// subscribe and publish concurrently
subscribeStream(subscriber)
.concurrently(publishStream(publisher))
.compile
// runs forever
.drain
def program(client: QueueClient): IO[Unit] = {
val queueName = "my-queue"
client.publisher[String](queueName).use { publisher =>
// subscribe and publish concurrently
subscribeStream(client.subscriber[String](queueName))
.concurrently(publishStream(publisher))
.compile
// runs forever
.drain
}
}
```

## Working with Azure Service Bus queues
Expand All @@ -63,13 +67,7 @@ import com.azure.identity._
val namespace = "{namespace}.servicebus.windows.net" // your namespace
val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate

ServiceBusClient(namespace, credentials).use { client =>
val queueName = "my-queue"

client.publisher[String](queueName).use { publisher =>
program(publisher, client.subscriber[String](queueName))
}
}
ServiceBusClient(namespace, credentials).use(program(_))
```

## Working with AWS SQS
Expand Down
26 changes: 12 additions & 14 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ def subscribeStream(subscriber: QueueSubscriber[String]): Stream[IO, Nothing] =
// results are non important
.drain

def program(publisher: QueuePublisher[String], subscriber: QueueSubscriber[String]): IO[Unit] =
// subscribe and publish concurrently
subscribeStream(subscriber)
.concurrently(publishStream(publisher))
.compile
// runs forever
.drain
def program(client: QueueClient): IO[Unit] = {
val queueName = "my-queue"
client.publisher[String](queueName).use { publisher =>
// subscribe and publish concurrently
subscribeStream(client.subscriber[String](queueName))
.concurrently(publishStream(publisher))
.compile
// runs forever
.drain
}
}
```

## Working with Azure Service Bus queues
Expand All @@ -63,13 +67,7 @@ import com.azure.identity._
val namespace = "{namespace}.servicebus.windows.net" // your namespace
val credentials = new DefaultAzureCredentialBuilder().build() // however you want to authenticate

ServiceBusClient(namespace, credentials).use { client =>
val queueName = "my-queue"

client.publisher[String](queueName).use { publisher =>
program(publisher, client.subscriber[String](queueName))
}
}
ServiceBusClient(namespace, credentials).use(program(_))
```

## Working with AWS SQS
Expand Down

0 comments on commit 3d8a402

Please sign in to comment.