Skip to content

Commit

Permalink
lwcapi: remove the /v1/subscribe endpoint (#1451)
Browse files Browse the repository at this point in the history
This was an earlier version that relied used unbatched
text messages for passing the data and was much less
efficient. It is no longer in use internally.
  • Loading branch information
brharrington authored Jul 22, 2022
1 parent 7b02e34 commit 483dfc5
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 100 deletions.
2 changes: 1 addition & 1 deletion atlas-eval/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ atlas.eval {
]

// Which version of the LWC server API to use
lwcapi-version = 1
lwcapi-version = 2
}

graph {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,17 +496,12 @@ private[stream] abstract class EvaluatorImpl(
Flow[Set[LwcExpression]]
.via(StreamOps.unique(uniqueTimeout)) // Updating subscriptions only if there's a change
.map { exprs =>
if (lwcapiVersion == 1)
TextMessage(Json.encode(exprs))
else
BinaryMessage(LwcMessages.encodeBatch(exprs.toSeq))
BinaryMessage(LwcMessages.encodeBatch(exprs.toSeq))
}
.via(webSocketFlowOrigin)
.flatMapConcat {
case TextMessage.Strict(str) =>
parseMessage(str)
case msg: TextMessage =>
msg.textStream.fold("")(_ + _).flatMapConcat(parseMessage)
case _: TextMessage =>
throw new MatchError("text messages are not supported")
case BinaryMessage.Strict(str) =>
parseBatch(str)
case msg: BinaryMessage =>
Expand All @@ -515,18 +510,6 @@ private[stream] abstract class EvaluatorImpl(
.mapMaterializedValue(_ => NotUsed)
}

private def parseMessage(message: String): Source[AnyRef, NotUsed] = {
try {
ReplayLogging.log(message)
Source.single(LwcMessages.parse(message))
} catch {
case e: Exception =>
logger.warn(s"failed to process message [$message]", e)
badMessages.increment()
Source.empty
}
}

private def parseBatch(message: ByteString): Source[AnyRef, NotUsed] = {
try {
ReplayLogging.log(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging

import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import javax.inject.Inject
import scala.concurrent.duration._
import scala.util.Failure
Expand Down Expand Up @@ -71,12 +70,6 @@ class SubscribeApi @Inject() (

def routes: Route = {
extractClientIP { addr =>
endpointPathPrefix("api" / "v1" / "subscribe") {
path(Remaining) { streamId =>
val meta = StreamMetadata(streamId, addr.value)
handleWebSocketMessages(createHandlerFlow(meta))
}
} ~
endpointPathPrefix("api" / "v2" / "subscribe") {
path(Remaining) { streamId =>
val meta = StreamMetadata(streamId, addr.value)
Expand All @@ -98,38 +91,6 @@ class SubscribeApi @Inject() (
}
}

/**
* Uses text messages and sends each datapoint individually.
*/
private def createHandlerFlow(streamMeta: StreamMetadata): Flow[Message, Message, Any] = {
dropSameIdConnections(streamMeta)

Flow[Message]
.flatMapConcat {
case TextMessage.Strict(str) =>
Source.single(str)
case msg: TextMessage =>
msg.textStream.fold("")(_ + _)
case BinaryMessage.Strict(str) =>
Source.single(str.decodeString(StandardCharsets.UTF_8))
case msg: BinaryMessage =>
msg.dataStream.fold(ByteString.empty)(_ ++ _).map(_.decodeString(StandardCharsets.UTF_8))
}
.via(new WebSocketSessionManager(streamMeta, register, subscribe))
.flatMapMerge(Int.MaxValue, s => s)
.map(obj => TextMessage(obj.toJson))
.watchTermination() { (_, f) =>
f.onComplete {
case Success(_) =>
logger.debug(s"lost client for $streamMeta.streamId")
sm.unregister(streamMeta.streamId)
case Failure(t) =>
logger.debug(s"lost client for $streamMeta.streamId", t)
sm.unregister(streamMeta.streamId)
}
}
}

/**
* Uses a binary format for the messages and batches output to achieve higher throughput.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import com.netflix.atlas.eval.model.LwcExpression
import com.netflix.atlas.eval.model.LwcHeartbeat
import com.netflix.atlas.eval.model.LwcMessages
import com.netflix.atlas.eval.model.LwcSubscription
import com.netflix.atlas.json.Json
import com.netflix.spectator.api.NoopRegistry
import com.typesafe.config.ConfigFactory

Expand All @@ -52,45 +51,6 @@ class SubscribeApiSuite extends MUnitRouteSuite {
// Subscribe websocket
//

private def parse(msg: Message): AnyRef = {
LwcMessages.parse(msg.asTextMessage.getStrictText)
}

test("subscribe websocket") {
val client = WSProbe()
WS("/api/v1/subscribe/111", client.flow) ~> routes ~> check {
assert(isWebSocketUpgrade)

// Send list of expressions to subscribe to
val exprs = List(LwcExpression("name,cpu,:eq,:avg", 60000))
client.sendMessage(Json.encode(exprs))

// Look for subscription messages, one for sum and one for count
var subscriptions = List.empty[LwcSubscription]
while (subscriptions.size < 2) {
parse(client.expectMessage()) match {
case _: DiagnosticMessage =>
case sub: LwcSubscription => subscriptions = sub :: subscriptions
case h: LwcHeartbeat => assertEquals(h.step, 60000L)
case v => throw new MatchError(v)
}
}

// Verify subscription is in the manager, push a message to the queue check that it
// is received by the client
assertEquals(subscriptions.flatMap(_.metrics).size, 2)
subscriptions.flatMap(_.metrics).foreach { m =>
val tags = Map("name" -> "cpu")
val datapoint = LwcDatapoint(60000, m.id, tags, 42.0)
val handlers = sm.handlersForSubscription(m.id)
assertEquals(handlers.size, 1)
handlers.head.offer(Seq(datapoint))

assertEquals(parse(client.expectMessage()), datapoint)
}
}
}

private def parseBatch(msg: Message): List[AnyRef] = {
LwcMessages.parseBatch(msg.asBinaryMessage.getStrictData)
}
Expand Down

0 comments on commit 483dfc5

Please sign in to comment.