Replies: 2 comments 1 reply
-
This is interesting, thanks for the write-up Phil! A couple of questions:
I generally like the simplicity of this protocol over our existing one, I would really like to see a prototype of this |
Beta Was this translation helpful? Give feedback.
-
I'd like to to add a wee bit of detail on the protocol, and talk about why I think this would help. As I'm envisioning it, the runtime protocol would be implemented by a
The server has a generic understanding of bindings. There are input bindings and output bindings. Input bindings are for reading source collections, and output bindings are for writing to the destination collections. Derivations would have 1 or more input binding, and exactly one output binding. The bindings are available from a metadata endpoint. Documents that are sent or received in requests or responses always have an associated binding, which is given as an integer offset into the respective bindings array. For example, Putting everything under a single protocol has some nice advantages. It allows any type of task to be run remotely, not just derivations. For example, you could have a CloudRun function that runs as a capture. Or you could run the container on your local laptop in a debugger. It would also enable easy pull-based integrations with external systems. For example, a hypothetical Materialize integration could read from the runtime server. This kind of portability seems more complicated when the runtime dials the connector. Network connections to someone's dev machine or private network would require some trickery, but inbound connections to a data-plane-gateway "just work". The same is true for authorization, since there's already a way to authorize requests to the data-plane-gateway. But consider the current One might point out here that there's a potentially important advantage to using the existing Gazette protocol for building integrations: that it can read directly from cloud storage. While that's true, I think that the tradeoffs are in favor of using the runtime server. It makes development easier, doesn't require us to ship any libraries, and allows clients to be built in any language and run in any runtime. The runtime server is also a nice single point of collection of billing data (stats), so we'd get that built in. The library that we'd need to ship in order to directly use the Gazette protocol outside of Flow would be pretty significant in scope. It would need to implement all of the transactional semantics, as well as reductions, in order to get to parity with what Flow connectors can do. So avoiding the need for that, at least in the short/medium term, seems appealing. Some other benefits that come to mind:
|
Beta Was this translation helpful? Give feedback.
-
A proposal to refactor the runtime
We had a conversation at the offsite in Chicago about "V2" derivations. We talked about them using a simple JSON protocol over stdin/stdout. This got me thinking about the derivation protocol, and a few other ideas around the capture and materialization protocols. Long story short, I've got the beginnings of a proposal to refactor a portion of the data-plane runtime.
Note on the timing of this:
While I personally see this as very much aligned with the overall goals of the project, I was a bit dubious that it's aligned with our short term goals. But we had a live conversation today that caused me to reconsider, so here we are. So I can't really say whether the context of this proposal is "we should do this right now" or "this is an approach we should explore in the future". Please chime in with your thoughts on that question.
The basic idea is to rip out the existing capture and materialize protocols, and replace them with a new protocol. Rather than starting with a long context setting section about all of the various problems we'd like to solve, I'm going to launch right into describing the basic shape of the protocol itself, and then talk about the benefits. Finally, I'll elaborate on a few of the details. The focus here is entirely on the runtime "transactions" part of the protocols, bikeshedding the unary RPCs for build-time connector invocations.
The protocol
I'm calling this proposed protocol the Runtime protocol. The key difference with this protocol is that it operates in the reverse direction of the one we have today. With the runtime protocol, the runtime (the
runtime
go package) starts up a server, and the connector acts as the client. The server has HTTP endpoints that provide things like the configuration and bindings, and the most recent consumer and driver checkpoints. The endpoints for driving transactions basically allow the client to read and write batches of documents corresponding to transactions in the Gazette consumer. Collectively, the endpoints accommodate all of the things we do today in both the capture and materialization protocols (including pipelining). In other words, it's sort of a union of the two existing protocols, but in reverse. It should also be usable for derivations, though there's a bit more to discuss on that.When the protocol runs in reverse, the server becomes the single state machine where all the complex asynchronous transaction processing logic lives, and the clients (connectors) can get way simpler. Because the client is just calling endpoints, the coordination of the individual phases of transaction processing can all be controlled by the server. For example, a call to the
/read
endpoint, which returns a multipart response with all the documents in the transaction, can block until the combiners are ready to drain for the next transaction.Another way to describe it is this: think about the materialize
transactor
package, and how it presents a relatively simple API for executing pipelined transactions that go through various phases (Load, Store, etc.). That API can be translated into a set of HTTP endpoints, which are handled by a server that's run by the consumer shard. In other words, the connector calls a set of HTTP endpoints that correspond to the phases of the transaction. That API can also be extended to optionally support writing documents as part of the transaction, and thus could support captures in addition to materializations. In theory, you could implement a derivation by using the endpoints for both captures and materializations.Most, if not all, of the current materialize
transactor
package could likely be reduced to a thin shim, to retain compatibility with existing connectors. The airbyte-to-flow code could likewise be simplified a great deal. The runtime protocol will make it easier to write connectors in a variety of languages. In order to implement a connector, all you need is an HTTP client, and it requires very little boilerplate in order to actually use it efficiently and correctly.The runtime protocol can consolidate the existing
capture
andmaterialization
protocols andruntime
implementations into a single server implementation. This may be nice, from a maintenance and testing perspective, but I don't think that's fully the point. The point is more about changing the direction of operation, and the server that enables that.In addition to the endpoints for processing transactions, the server can also have endpoints serving up detailed metadata and debug info about the current state of processing, which would be exposed via data-plane-gateway. Things like the most recently committed checkpoint, the current transaction phase (with timing information!), and the number of documents and transactions processed during the current term would all be readily available directly from the data plane. We could expose information about the running container, and even the API calls made the by it. The server would be a place for us to wire in whatever debugging or observability endpoints we wanted, and it would have direct access to all of the transaction processing state!
How it works
With the Runtime protocol, the runtime starts up a server, and the connector calls the runtime server using a client library. The runtime server uses HTTP rather than gRPC (IMO; more on the reasons for that later). There's a few endpoints that serve up data on the config and bindings of the task, as well as endpoints that can be used to process transactions.
In order to explain how the transaction endpoints work, it might be easier to start by explaining how tasks might use them in order to process transactions. I'll start with the simplest example of an airbyte capture. The "client" process in this example is a driver that runs inside the container and adapts the output of the airbyte connector into the runtime protocol. Here's some pseudo-code showing roughly how such a driver might work:
The
/init-term
endpoint allows the connector to optionally pass a consumer checkpoint, and it basically allows transaction processing to begin. Basically, the connector calls the/write
endpoint 0 or more times, followed by a call to/commit
(which blocks until the acknowledgement! more on that later). Captures are pretty simple, so it probably doesn't take much to at least convince you that the reveresed approach could work for them. But the materialize protocol is far more complex, so let's jump right into that with an example of a materialization that loads and stores.Materializations would start out the same way as captures, by creating a client and calling the
/spec
endpoint. It then loads the consumer checkpoint from the remote system and POSTs it to the/init-term
endpoint. Transactions are processed by calling a few endpoints sequentially (all POST requests)./peek-ids
: returns a multipart response with the keys of all the documents participating in the transaction. The connector uses this stream of ids to load the requested documents from the remote store./read
: The connector sends the loaded documents to the server in the body of the read request, and it receives the fully reduced documents back on the response. The read response also includes the consumer checkpoint in the response headers, so the connector can persist it./commit
: Once the database transaction has committed, the connector notifies the runtime by calling this endpoint, passing the driver checkpoint if desired. Deferring the driver checkpoint until this phase is the main difference from the materialization protocol we have today, but I don't think this would be difficult to accommodate.You can probably already see how Delta-updates materializations would work. Just don't bother calling
/peek-ids
and don't send any documents in the/ read
request. This is a good segue into how I imagine the server from an implementation perspective. As the server sees it,/peek-ids
is simply optional. If you call the/peek-ids
endpoint, the server returns the ids participating in the next transaction. If you skip it and just call/ read
, then the server knows that you're not interested in that phase of the transaction. The same goes for/read
and/write
. The only unconditionally required endpoint is/commit
. The server thinks in terms of transaction phases. When it gets a request for a phase that hasn't happened yet, it knows that it can simply skip ahead to that phase. Pipelining still works, as the server can queue up requests until they're ready to be processed. I'll stop myself from going into detail on that just yet.The server can also include some sanity checks. For example, if there are read bindings, then you must call the
/read
endpoint. If there is a protocol error, for example if the client POSTS a document to/write
that doesn't validate against the collection schema, then the server returns errors for that and all future responses, until the connector again calls/init-term
.Derivations, (V?)too?
I want to first clarify that I'm not really considering "V2" (we need a better name for those) derivations at this point. I don't see any reason why this change would make them any more difficult to implement, but I haven't really thought much about them beyond that. But I do think it's worth discussing using this protocol for our V1 derivations because it raises some interesting questions. You might reasonably question whether it makes sense to try to "cram" derivations into the same set of endpoints as captures/materializations, or if it's better to come up with a separate set of endpoints for them.
After all, they do very different things, and not obvious how you'd be able to write a stateful derivation by simply combining the endpoints of both protocols. So the question is basically whether there's value in unifying these protocols, or if it would be better to keep separate protocols that are specific to each task type.
But in either case, we could rely on the same underlying code for running transactions. Most likely, there will simply be optional endpoints or features that are used by derivations, but not captures or materializations. Even if there were multiple different
Server
implementations for the different task types, they would surely still be able to share a lot of code. The concept that I'm trying to illustrate here is that the consumer API has a specific prescribed sequence of phases that every transaction goes through, and that the endpoints are simply exposing (possibly a subset) of each of those phases to clients.So I don't really see why derivations wouldn't also use this server. I've even thought through a few ways that we could work registers into the same set of endpoints. But I think derivations would likely come last in terms of implementation, so I'll defer discussion of the details until there's more buy in.
Why HTTP instead of gRPC?
TLDR: I think HTTP seems preferable, primarily because it's easier to integrate into a variety of languages. HTTP is ubiquitous and easy, and I don't think this protocol would benefit significantly from gRPC.
HTTP makes it a little easier for client applications to only pay for what they use, in terms of complexity. If you're writing a simple capture connector, you can just call
/write
and/commit
and not worry about the bits of the protocol that aren't relevant to you.I think we might also want to take advantage of content type negotiation in HTTP. I'm referring to the use of the
Accept
andContent-Type
headers, which allow the server to return and accept content in a variety of formats. I see a few different possibilities that seem worth exploring, though these are all kinda "far out" ideas. I'm describing these here just in case they help illuminate the broader vision of what this could become.First, is that we could allow capture connectors to send data using whatever content-type they want, and handle the parsing in the runtime. In other words, we could eventually allow the client to POST arbitrary bytes to
/write
, and handle parsing in the server. Connectors would then be freed of needing to build in the parser binary.We could also allow clients to request the data in different formats by setting the
Accept
header. This seems useful if you were writing either a derivation or materialization that uses a particular format, like Arrow.We could also expose the server endpoints via data-plane-gateway, and allow them to be used directly for push-based ingestion, or even for running arbitrary tasks outside of our data-plane. This would play nicely with the ability of the server to handle various input formats, as it would allow pushing data in any format supported by the parser.
Going forward
I think this is a good place for me to stop and let others respond. I've intentionally tried to underspecify a lot of the details of how the HTTP endpoints would work in order to more clearly and concisely explain the essentials. I'm not sure if I struck the right balance there, so please let me know if there are aspects that are unclear. I'm excited to hear what y'all think.
Beta Was this translation helpful? Give feedback.
All reactions