Skip to content

steveniemitz/scales

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Build Status

scales

A protocol agnostic RPC client stack for python.

Features

  • Built in support for HTTP, Thrift, ThriftMux, Kafka, and Redis (experimental).
  • Extensible stack for easy support of other protocols.
  • Fully asynchronous API
  • Robust load balancing and error detection / recovery.
  • Service discovery via ZooKeeper

Installing

pip install scales-rpc

Getting started

Getting started with scales is very simple. For example, lets use it to do an HTTP GET of www.google.com

from scales.http import Http
client = Http.NewClient('tcp://www.google.com:80')
response = client.Get('/')
print(response.text)

The HTTP client is the simplest type, you give it a URI (see service discovery below), and it returns a client with Get(uri) and Post(uri, data) methods. The response is a requests response object.

Service Discovery

Out of the box, scales uses the ScalesUriParser to parse the URIs passed to NewClient. The ScalesUriParser supports two protocols, tcp:// to create a static serverset of host:port pairs (for example tcp://localhost:8080,localhost:8081), and zk:// to create a dynamic serverset based on a ZooKeeper node. ZooKeeper URIs should be in the form of zk://zk_server1:port,zk_server2:port/full/znode/path.

Monitoring / Metrics

Scales provides an internal metrics tracking system called Varz. A component in scales.varz called the VarzReceiver handles tracking and aggregating metrics. This component can be used as-is, or replaced at runtime via monkey patching to integrate with a custom metrics system.

In addition, a helper class, VarzAggregator, can be used to generate varz aggregations. By default metrics are aggregated to the service level, however this can be customized by passing in a custom key selector to Aggregate.

For example:

aggregated_varz = VarzAggregator.Aggregate(
    VarzReceiver.VARZ_DATA,
    VarzReceiver.VARZ_METRICS)

Class Hierarchy

Core

The scales core is composed of 4 modules

  • Messages
  • Sinks
  • Load Balancers
  • Pools

Messages

A message is an envelope to carry some data. In scales, there are two main messages, MethodCallMessage and MethodReturnMessage, representing a request and response.

Sinks

Sinks are the core message processing unit of scales. In scales, every layer of the RPC stack is a sink. Some examples of sinks are:

  • Serializer sinks handle serializing a Message object into a stream.
  • Transport sinks handle sending and receiving data over a transport (socket, HTTP, etc)
  • Dispatch sinks handles initiating a method call, and is called by the tranparent client proxy.

Load Balancers

Load balancers are sinks as well, however, they (as well as pools) are important enough to have their own category. Scales provides two load balancers out of the box, the HeapBalancerSink, and the ApertureBalancerSink.

The HeapBalancerSink maintains a min-heap of all nodes in the serverset, and dispatches requests to the least-loaded node at the time. Nodes detected as down are not dispatched to unless all nodes have failed.

The ApertureBalancerSink is a specialization of the HeapBalancerSink which attempts to maintain the smallest possible subset of the serverset to maintain load within a certain load band. Nodes are added to the aperture when the load average of the serverset reaches a certain amount (2 by default), and are removed when the load average goes below a certain amount (.5 by default). This method is useful for situations where the load the client is generating is small in relation to the size of the serverset. The aperture balancer is the default for all scales clients.

Pools

Pools maintain one or more transport sinks to an underlying endpoint and handle request concurrency to that sink. Scales comes with two pool types, the SingletonPool and the WatermarkPool.

The SingletonPool maintains at most one transport sink, and allows unlimited concurrency to it. This sink is used for transports that allow multiplexing requests over a single connection, such a ThriftMux.

The WatermarkPool maintains a sink pool, sized by a low watermark, and a high watermark. The pool grows until it hits the low watermark, and maintains up to that many sinks forever. Once the number of concurrently open sinks reaches the low watermark, new sinks are created for each request, until the number of concurrently open sinks reaches a high watermark. At this point, incomming requests are queued until the number of concurrently open sinks goes below the high watermark.

Protocol Support

Out of the box, scales supports five protocols, thrift, thriftmux, http, kafka (producer only), and and redis (experimental).

Thrift (and ThriftMux)

Scales supports calling thrift services via autogenerated python thrift code generated by the Apache Thrift compiler. Serialization and deserialization are handled by the thrift library using the TBinaryProtocol (specifically TBinaryProtocolAccelerated).

Scales proxies are created for Thrift and ThriftMux using their respective builders (scales.thrift.Thrift, scales.thriftmux.ThriftMux), and passing the generated thrift interface.

For example:

from my_project.gen_py.example_rpc_service import (ExampleService, ttypes)
from scales.thriftmux import ThriftMux
client = ThriftMux.NewClient(ExampleService.Iface, 'tcp://localhost:8080')
ret = client.passMessage(ttypes.Message('hi!'))

Kafka

Scales provides a robust, high performance Kafka producer client. It supports discovering the kafka brokers either directly or via ZooKeeper. Sending messages is very simple:

from scales.kafka import Kafka
client = Kafka.NewClient("tcp://broker1:9092,broker2:9092")
client.Put("my-topic", ["payload 1", "payload 2"])

Limitiations:

  • Currently messages are distributed across partitions using a least-loaded strategy (via a HeapBalancer). Partition selection via a hash function is unsupported.
  • Only the producer API is implemented.

Redis (EXPERIMENTAL)

The redis client is a highly experimental wrapper of the python redis client. It is not recommended for production use.

Scales architecture

A primary goal of scales was to build a fully asychronous system. All methods should be non-blocking, instead opting to either return an AsyncResult representing the state of the operation, or operate in a continuation passing style.

Transparent Proxy / Dispatcher

The entry point to scales is a transparent proxy, which is generated by the Scales builder, or through helper methods such as ThriftMux.NewClient. A proxy's job is to intercept all methods defined by a type (an interface) and route it to a dispatcher. Scales provides one dispatcher, the MessageDispatcher. The MessageDispatcher is a special type of ClientMessageSink that initiates the sink chain. It takes a method call, packages it into a MethodCallMessage, then fowards it to the next sink. On the response side, it terminates the chain by taking a response method and applying it to a gevent AsyncResult. The proxy uses this async result to either wait on (in the synchronous case) or return to the caller to use (in the asynchronous case).

Message processing

Messages (request or response) flow through scales in a cooperative chain. Each sink takes a message from the previous, does something to it, and passes it to the next. If a sink wants to also handle the response message, it installs itself in the response sink_stack via sink_stack.Push(self, ...). Response messages traverse the stack in a similar cooperative way, with each sink calling the next sink on the stack via sink_stack.AsyncProcessResponse(stream, msg).

Serializer sinks

Serializer sinks translate a MethodCallMessage into a serialized stream. Serializer sinks perform the switch from msg to stream (and the reverse) during sink stack processing.

Transport sinks

Transport sinks are the final sink in the request sink chain, and initiate the response sink chain. As the name implies, transport sinks are ClientMessageSinks that take message and stream, and send it across some form of transport (for example, Thrift and ThriftMux both use a TCP transport). They also handle reading from the transport and dispatching the response data back up the sink chain for the message that the response belongs to. This may range from simple, in the thrift case, a transport sink can only handle one request concurrently, or more complicated. For example, the ThriftMux transport sink maintains a hash table of tags (read from the response stream) to response sink stacks.

Transport sinks are also responsible for handling timeouts. They can either read the deadline from the message passed in and calculate a timeout from it to be used by the transport, or use the timeout even also on the message to asynchronously trigger a timeout. They are not however, required to notify the client of the timeout. The timeout sink will accomplish that.

In addition, transport sinks must support having multiple concurrent Open() calls pending. Some upstream sinks may call Open() more than once on a transport, and rely on this being a safe operation.

Finally, transport sinks must detect when they're underlying connection is no longer usable, and report this up the stack by setting their on_faulted observable. Upstream sinks use this event to trigger reconnection logic, load balancer adjustment, etc.

Extending scales

There are a few common extension points that one would want to use when implementing new features in scales.

The simplest is implementing a sink that simply inspects (and possibly modifies) messages as they pass through the system.