Skip to content

Commit

Permalink
Enable Http2 Support (#557)
Browse files Browse the repository at this point in the history
* Enable Http2 Support

* back to realTime

* switch to IO for Utils.now
  • Loading branch information
etspaceman authored Aug 3, 2023
1 parent 4a538a5 commit a307f3f
Show file tree
Hide file tree
Showing 49 changed files with 1,949 additions and 2,067 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ jobs:

- name: Integration Tests
if: matrix.java == 'temurin@17' && matrix.os == 'ubuntu-latest' && startsWith(matrix.project, 'root-js')
env:
CBOR_ENABLED: ${{ matrix.cbor_enabled }}
SERVICE_PORT: ${{ matrix.service_port }}
uses: nick-fields/retry@v2
with:
timeout_minutes: 15
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,5 +327,4 @@ object MyApp {
```

# Known issues
- Does not currently support Http2 requests (https://github.com/http4s/http4s/issues/4707)
- Does not currently support SubscribeToShard due to lack of push-promise support (https://github.com/http4s/http4s/issues/4624)
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ object KinesisMockService extends IOApp {
.withHost(host)
.withTLS(tlsContext)
.withHttpApp(app)
.withHttp2
.build
plainServer = EmberServerBuilder
.default[IO]
.withPort(serviceConfig.plainPort)
.withHost(host)
.withHttpApp(app)
.withHttp2
.build
_ <- logger.info(
s"Starting Kinesis TLS Mock Service on port ${serviceConfig.tlsPort}"
Expand Down
4 changes: 2 additions & 2 deletions kinesis-mock/src/main/scala/kinesis/mock/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package kinesis.mock

import java.time.Instant

import cats.effect.IO
import cats.effect.SyncIO
import cats.effect.std.UUIDGen

object Utils {
def randomUUID = UUIDGen.randomUUID[SyncIO].unsafeRunSync()
def randomUUIDString = UUIDGen.randomString[SyncIO].unsafeRunSync()
def md5(bytes: Array[Byte]): Array[Byte] = MD5.compute(bytes)
def now =
SyncIO.realTime.map(d => Instant.EPOCH.plusNanos(d.toNanos)).unsafeRunSync()
def now = IO.realTime.map(d => Instant.EPOCH.plusNanos(d.toNanos))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,51 @@ final case class CreateStreamRequest(
awsRegion: AwsRegion,
awsAccountId: AwsAccountId
): IO[Response[Unit]] =
streamsRef.modify { streams =>
val shardCountOrDefault = shardCount.getOrElse(4)
val streamArn = StreamArn(awsRegion, streamName, awsAccountId)
(
CommonValidations.validateStreamName(streamName),
if (streams.streams.contains(streamArn))
ResourceInUseException(
s"Stream $streamName already exists"
).asLeft
else Right(()),
CommonValidations.validateShardCount(shardCountOrDefault),
if (
streams.streams.count { case (_, stream) =>
stream.streamStatus == StreamStatus.CREATING
} >= 5
)
LimitExceededException(
"Limit for streams being created concurrently exceeded"
).asLeft
else Right(()),
CommonValidations.validateOnDemandStreamCount(
streams,
onDemandStreamCountLimit
),
CommonValidations.validateShardLimit(
shardCountOrDefault,
streams,
shardLimit
)
).mapN { (_, _, _, _, _, _) =>
val newStream =
StreamData.create(shardCountOrDefault, streamArn, streamModeDetails)
Utils.now.flatMap { now =>
streamsRef.modify { streams =>
val shardCountOrDefault = shardCount.getOrElse(4)
val streamArn = StreamArn(awsRegion, streamName, awsAccountId)
(
streams
.copy(streams = streams.streams ++ Seq(streamArn -> newStream)),
()
)
}.sequenceWithDefault(streams)
CommonValidations.validateStreamName(streamName),
if (streams.streams.contains(streamArn))
ResourceInUseException(
s"Stream $streamName already exists"
).asLeft
else Right(()),
CommonValidations.validateShardCount(shardCountOrDefault),
if (
streams.streams.count { case (_, stream) =>
stream.streamStatus == StreamStatus.CREATING
} >= 5
)
LimitExceededException(
"Limit for streams being created concurrently exceeded"
).asLeft
else Right(()),
CommonValidations.validateOnDemandStreamCount(
streams,
onDemandStreamCountLimit
),
CommonValidations.validateShardLimit(
shardCountOrDefault,
streams,
shardLimit
)
).mapN { (_, _, _, _, _, _) =>
val newStream =
StreamData.create(
shardCountOrDefault,
streamArn,
streamModeDetails,
now
)
(
streams
.copy(streams = streams.streams ++ Seq(streamArn -> newStream)),
()
)
}.sequenceWithDefault(streams)
}
}
}

Expand Down
Loading

0 comments on commit a307f3f

Please sign in to comment.