A protocol agnostic RPC client stack for python.
- Built in support for HTTP, thrift, and thrift-mux (for finagle servers, see http://twitter.github.io/finagle/guide/Protocols.html)
- Extensible stack for easy support of other protocols.
- Fully asynchronous API
- Robust load balancing and error detection / recovery.
- Service discovery via ZooKeeper
Using getting start with scales is very simple. For example, lets use it to do an HTTP GET of www.google.com
from scales.http import Http
client = Http.NewClient('tcp://www.google.com:80')
response = client.Get('/')
print(response.text)
The HTTP client is the simplest type, you give it a URI (see service discovery below), and it returns a client with Get(uri)
and Post(uri, data)
methods. The response is a requests
response object.
Out of the box, scales uses the ScalesUriParser
to parse the URIs passed to NewClient. The ScalesUriParser
supports two protocols, tcp://
to create a static serverset of host:port pairs (for example tcp://localhost:8080,localhost:8081
), and zk://
to create a dynamic serverset based on a ZooKeeper node. ZooKeeper URIs should be in the form of zk://zk_server1:port,zk_server2:port/full/znode/path
.
The scales core is composed of 4 modules
- Messages
- Sinks
- Load Balancers
- Pools
A message is an envelope to carry some data. In scales, there are two main messages, MethodCallMessage
and MethodReturnMessage
, representing a request and response.
Sinks are the core message processing unit of scales. In scales, very layer of the RPC stack is a sink. Some examples of sinks are:
- Formatter (Serializer) sinks handle serializing a
Message
object into a stream. - Transport sinks handle sending and receiving data over a transport (socket, HTTP, etc)
- The Dispatch sink handles initiating process, and is called by the tranparent client proxy.
Load balancers are sinks as well, however, they (as well as pools) are important enough to have their own category. Scales provides two load balancers out of the box, the HeapBalancerSink
, and the ApertureBalancerSink
.
The HeapBalancerSink
maintains a min-heap of all nodes in the serverset, and dispatches requests to the least-loaded node at the time. Nodes detected as down are not dispatched to unless all nodes have failed.
The ApertureBalancerSink
is a specialization of the HeapBalancerSink
which attempts to maintain the smallest possible subset of the serverset to maintain load within a certain load band. Nodes are added to the aperture when the load average of the serverset reaches a certain amount (2 by default), and are removed when the load average goes below a certain amount (.5 by default). This method is useful for situations where the load the client is generating is small in relation to the size of the serverset. The aperture balancer is the default for all scales clients.
Pools maintain one or more transport sinks to an underlying endpoint and handle request concurrency to that sink. Scales comes with two pool types, the SingletonPool
and the WatermarkPool
.
The SingletonPool
maintains at most one transport sink, and allows unlimited concurrency to it. This sink is used for transports that allow multiplexing requests over a single connection, such a ThriftMux.
The WatermarkPool
maintains a sink pool, sized by a low watermark, and a high watermark. The pool grows until it hits the low watermark, and maintains up to that many sinks forever. Once the number of concurrently open sinks reaches the low watermark, new sinks are created for each request, until the number of concurrently open sinks reaches a high watermark. At this point, incomming requests are queued until the number of concurrently open sinks goes below the high watermark.
Out of the box, scales supports three protocols, thrift
, thriftmux
, and http
.
Scales supports calling thrift services via autogenerated python thrift code generated by the Apache Thrift compiler. Serialization and deserialization are handled by the thrift library using the TBinaryProtocol
(specifically TBinaryProtocolAccelerated
).
Scales proxies are created for Thrift and ThriftMux using their respective builders (scales.thrift.Thrift
, scales.thriftmux.ThriftMux
), and passing the generated thrift interface.
For example:
from my_project.gen_py.example_rpc_service import (ExampleService, ttypes)
from scales.thriftmux import ThriftMux
client = ThriftMux.NewClient(ExampleService.Iface, 'tcp://localhost:8080')
ret = client.passMessage(ttypes.Message('hi!'))
A primary goal of scales was to build a fully asychronous system. All methods should be non-blocking, instead opting to either return an AsyncResult representing the state of the operation, or operate in a continuation passing style.
The entry point to scales is a transparent proxy, which is generated by the Scales builder, or through helper methods such as ThriftMux.NewClient
. A proxy's job is to intercept all methods defined by a type (an interface) and route it to a dispatcher. Scales provides one dispatcher, the MessageDispatcher
. The MessageDispatcher
is a special type of ClientMessageSink
that initiates the sink chain. It takes a method call, packages it into a MethodCallMessage
, then fowards it to the next sink. On the response side, it terminates the chain by taking a response method and applying it to a gevent AsyncResult
. The proxy uses this async result to either wait on (in the synchronous case) or return to the caller to use (in the asynchronous case).
Messages (request or response) flow through scales in a cooperative chain. Each sink takes a message from the previous, does something to it, and passes it to the next. If a sink wants to also handle the response message, it installs itself in the response sink_stack
via sink_stack.Push(self, ...)
. Response messages traverse the stack in a similar cooperative way, with each sink calling the next sink on the stack via sink_stack.AsyncProcessResponse(stream, msg)
.
Formatter sinks translate a MethodCallMessage
into a serialized stream. Scales provides a formatter sink for thrift serialization by default. A key point of formatter sinks is that during request processing they typically create the stream passed to the next sinks as the stream
parameter. Until then, the steam will typically be None. Similarly, during response processing, they take a stream in the stream
parameter and turn it into a message, to be pass up the chain in the msg
parameter. Until then, the msg
parameter is None.
Transport sinks are the final sink in the request sink chain, and initiate the response sink chain. As the name implies, transport sinks are ClientMessageSinks
that take message and stream, and send it across some form of transport (for example, Thrift and ThriftMux both use a TCP transport). They also handle reading from the transport and dispatching the response data back up the sink chain for the message that the response belongs to. This may range from simple, in the thrift case, a transport sink can only handle one request concurrently, or more complicated. For example, the ThriftMux transport sink maintains a hash table of tags (read from the response stream) to response sink stacks.
Transport sinks are also responsible for handling timeouts. They can either read the deadline from the message passed in and calculate a timeout from it to be used by the transport, or use the timeout even also on the message to asynchronously trigger a timeout. They are not however, required to notify the client of the timeout. The timeout sink will accomplish that.
In addition, transport sinks must support having multiple concurrent Open()
calls pending. Some upstream sinks may call Open()
more than once on a transport, and rely on this being a safe operation.
Finally, transport sinks must detect when they're underlying connection is no longer usable, and report this up the stack by setting their on_faulted observable. Upstream sinks use this event to trigger reconnection logic, load balancer adjustment, etc.
There are a few common extension points that one would want to use when implementing new features in scales.
The simplest is implementing a sink that simply inspects (and possibly modifies) messages as they pass through the system.