Skip to content

Does stream computing need a framework? SPL may be a better choice

esProcSPL edited this page Feb 1, 2024 · 1 revision

Usually, the streaming data sources are dynamic and unbounded, and appear quite different from the static and bounded batch data source. For framework reasons, it is difficult for traditional database technologies to directly process streaming data source, so programmers have to resort to later technologies. The computing frameworks such as heron\samza\storm\spark\flink were the first to make breakthroughs and gained first-mover advantage in stream computing technology. These frameworks are so successful that as soon as a stream computing is involved, the application programmers will naturally turn to one of them. On the contrary, for those computing technologies that do not claim to be a certain framework, they are generally considered unsuitable for implementing stream computing.

Although the computing framework is the first to make a breakthrough for stream computing, the framework itself is of little significance to stream computing. Framework refers to the architectural code pre-prepared for a given application scenario, which sets the scaling mode (distributed or centralized), main control process (asynchronous or synchronous), etc.; when faced with a real-world application, programmers only need to fill the business logic code in the framework. The advantages of computing framework are clear, allowing us to build a mature and stable program architecture with a lower amount of code. However, its shortcomings cannot be ignored: the application cannot break free from the architectural constraints of framework, so it has to adapt to the scenario preset by framework; yet it is difficult for such preset scenario to take into account all aspects (otherwise it would be too complex and difficult to use), so such scenario can usually only target a single scenario. There are various application scenarios, with new ones emerging every day, but they often do not match the preset scenario, for example, let spark called by a jasperReport report, or use flink for small cluster or edge computing. In this case, in addition to trying their best to adapt to the preset scenario, developers can only make some modifications to the framework. However, the former way often results in poor computing performance or an increase in hardware cost, while the latter needs a huge workload. If the architectural code is designed according to real-world scenarios (and importing the stream computing library), then the unfavorable results just mentioned can be avoided. Even if the real-world scenario happens to match the preset scenario, the stream computing framework is not necessarily the best solution, for example, in distributed scenario, the distributed stream computing framework is not as good as the solution “distributed service framework (such as Zookeeper, HSF, Dubbo) + stream computing library” in terms of reducing the coupling between architectural code and computing code.

What is significant to stream computing is the access capability and computing ability. Stream computing consist of the streaming data source and computation, in other words, only the data access capability and the computing ability make sense for stream computing. The access capability is related to the streaming data source interface and inflow mechanism. There are many types of streaming data sources, including the character and byte streams in a narrow sense (such as http stream, file stream and message stream like kafka), as well as the data stream in a broad sense (such as RDB’s record stream [cursor] and NoSQL’s document stream); the inflow mechanism is divided into two types: active and passive, the active mechanism means the system actively obtains data from the outside world, while the passive mechanism means the system passively waits for data input from the outside world. As for the computing ability, the threshold is relatively high; as a stream computing technology, it should at least provide basic structured data computing functions (such as filter, distinct, sort, group and aggregate, associate, merge), basic flow control syntax (branch structure like if, loop structure like for), as well as basic semi-structured data processing ability (parse semi-structured data like json\xml as record).

The access capability is the basis and the computing ability is the core. Stream computing is still a computation in essence, except that the data source is in streaming motion, so the computing ability is crucial, especially the advanced computing ability. The access capability has a relatively low threshold, and can be easily replicated and scaled and, such capability is highly similar from each other and is not the core of stream computing. In contrast, the advanced computing ability has a high threshold, which fundamentally determines the development efficiency and computing performance of stream computing and is the core of stream computing, including the mixed computing of streaming data and batch data, simplifying complex calculation, and high-performance computing. Many stream computing frameworks often focus on the access capability (some frameworks even lack this capability) but ignore the computing ability, resulting in a failure to simplify the complexity of business logic, and making them generally inferior to traditional databases.

Advanced computing ability: mixed computing of streaming data and batch data, simplifying complex calculation, high-performance computing. Except for special scenarios such as monitoring, a pure stream computing is rarely seen in reality, yet the mixed computing of streaming data and batch data is very common. In fact, the monitoring-related calculations usually also need to merge the batch data (minute/hour level) and the real-time data (second level) before computing. In practice, although there are many simple computing logics such as filter and distinct, more valuable computing logics are often complicated, requiring stream computing technology to provide rich computing functions and flexible expression syntax so as to achieve the computing objective intuitively and quickly. The real-timeness of any stream computing technology is not bad, even exceeds the range of human perception. However, the millisecond-level speed is of little significance to most applications, for the reason that the mixed computing of streaming data and batch data is common, and the batch data processing performance of many stream computing technologies is weak (far inferior to RDB), resulting in a poor overall performance. In this case, only by providing rich high-performance computing library can the overall performance be improved. In addition, it often needs to write out the streaming data before computing, such as temporarily storing them as warm data or persisting them as cold data and, the volume of streaming data is generally large, and the time to compute them is generally long, so the high-performance storage formats of both the memory and external storage are required for improving the computing (and read and write) performance. In short, the stream computing framework does not provide sufficient support for computation itself, and lacks professional computing ability, and its advanced computing ability is not yet mature.

A good stream computing technology should focus on stream computing so as to simplify the complexity to develop a business logic. And meanwhile, it should weaken the role of framework and let the application handle the architectural code so as to adapt to various application scenarios. In addition, on the basis of ensuring the access capability, more attention should be given to improving the computing ability, especially the advanced computing ability.

esProc SPL is precisely a stream computing technology that meets these criteria.

As a lightweight JVM-based open-source computing library, SPL supports flexible and simple JDBC integration driver, provides convenient streaming data access capability and basic computing ability, and supports the mixed computing of streaming data and batch data, the simplification of complex calculations, as well as high-performance computing, and is a more professional computing language.

Flexible and simple integration interface

Convenient and easy-to-use JDBC driver. SPL calculation code is stored in the operating system directory as the script file. Java code calls the script file through JDBC, and the calling method is the same as calling a stored procedure. SPL’s integrated interface is flexible and simple, and suitable for a wide range of application scenarios, such as report, desktop, Web, single machine environment, distributed cluster, mobile application, and edge computing.

For example, first write the SPL script:

A B
1 =connect("ORCL") Connect to Oracle
2 =A1.cursor@x("select OrderID,Amount,Client,SellerID,OrderDate from orders where OrderDate>? && OrderDate<=?",arg1,arg2) Bounded data stream from Oracle
3

Then, save this script as mix.splx, and Java calls the script through JDBC:

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
CallableStatement statement = conn.prepareCall("{call mix(?, ?)}");
statement.setObject(1, "2022-01-01");
statement.setObject(2, "2022-12-31");
statement.execute();

Placing the calculation outside the application reduces coupling. Instead of adopting a framework that combines the calculation code with non-calculation code, SPL places its calculation code out of Java’s non-calculation code, enabling these two codes to bear their own responsibilities and play to their respective strengths, and modifying one code will not affect the other, and allowing programmers to decouple between the computing logic and front-end application using simple code without resorting to framework.

Interpreted language supports hot swap. SPL is an interpreted language, which can be executed immediately after being modified without the need to compile, shut down or restart JAVA applications, allowing programmers to implement hot swap of computing logics without writing architectural code.

Convenient streaming data access capability

SPL supports rich streaming data sources, including the character and byte streams in a narrow sense such as http stream, file stream and message streams like kafka, as well as the streaming data in a broad sense, such as RDB’s record stream [cursor] and NoSQL’s document stream.

For example, filter the unbounded data stream from kafka:

A B C
1 =kafka_open("D://kafka.properties","topic-test") Connect to kafka
2 for =kafka_poll(A1).(json(value)) Fetch data in loop
3 =B2.news(Datas;~.TagFullName,
round(Time,-3):Time,
~.Type,
~.Qualitie,
~.Value)
Convert to records
4 =B3.select(Value>3.0 && Value<=4.0) Fiter
5

SPL also supports many other streaming data sources: any RDB, Cassandra, InfluxDB, Redis, Dynamodb, ElasticSearch, Hbase, Hive, HDFS, MongoDB, SAP, S3, AliCloud, etc. In addition to reading data, SPL also supports writing calculation result to these data sources.

Active and passive inflow mechanisms. The active inflow mechanism means the SPL script actively gets data through the streaming data source interface and computes. Refer to the above script for details.

The passive inflow mechanism means the SPL script passively receives data and computes. For example, the following is part of SPL script to detect the abnormal operating condition in real time:

A B
1 =func(A12,learn_interval,extrem_interval) Call the sub-function of cell A12 to calculate the sequence of extreme values
2 =func(A16,A1.(~(1)), learn_interval,extrem_interval,seq,"up") Call the sub-function of cell A16 to calculate the upper limit of threshold
3 Remaining code omitted

First, access the sensors of the factory equipment through API in Java to get a real-time time series; then call the SPL script through JDBC. The main parameter is the time series Seq, and the secondary parameters are the learning interval learn_interval, the extreme value interval extrem_interval and the warn interval warn_interval; finally receive the parameter in SPL and use the inflow time series to find abnormal conditions.

Basic computing ability

SPL provides built-in basic structured data computing functions, making it possible to easily accomplish everyday SQL-style calculations.

Filter: data.select(Amount>1000 && Amount<=3000 && like(Client,"\*s\*"))

Sort: data.sort(Client,-Amount)

Distinct: data.id(Client)

Group and aggregate: data.groups(year(OrderDate);sum(Amount))

Associate: join(T ("D:/data/Orders.csv"):O,SellerId; T("D:/data/Employees.txt"):E,EId)

TopN: data.top(-3;Amount)

In addition, SPL provides the syntax that conforms to SQL92 standard, and supports the set calculation, case when, with, nested subqueries, etc.

SPL provides basic flow control syntax to facilitate the implementation of daily business logic.

Branch structure:

A B
2
3 if T.AMOUNT>10000 =T.BONUS=T.AMOUNT*0.05
4 else if T.AMOUNT>=5000 && T.AMOUNT<10000 =T.BONUS=T.AMOUNT*0.03
5 else if T.AMOUNT>=2000 && T.AMOUNT<5000 =T.BONUS=T.AMOUNT*0.02

Loop structure:

A B
3 for T =A3.BONUS=A3.BONUS+A3.AMOUNT*0.01
4 =A3.CLIENT=CONCAT(LEFT(A3.CLIENT,4), "co.,ltd.")
5

SPL offers basic semi-structured data processing ability, making it possible to conveniently process Json\XML or irregular text and especially suitable for message queues like kafka or NoSQL databases like MongoDB.

For example, the following script is to perform conditional query on multi-layer Json string, and return the result in the form of Json string:

A B
1 =json(arg_JsonStr) Parse Json string
2 =A1.conj(Orders) Merge lower-level records
3 =A2.select(Amount>1000 && Amount<=2000) Conditional query
4 =json(A3) Convert result to Json string

Advanced computing ability: mixed computing of streaming data and batch data

The stream computing technology with weak computing ability often does not support the mixed computing of streaming data and batch data. Even if it supports apparently, two sets of engines are used to calculate their own data in fact. Thus, it needs to convert the two types of data to the same type (usually the streaming data) in advance, and employ one engine (usually the streaming data engine) to perform fake mixed computation.

SPL is a professional computing language, and adopts the same model for the streaming data and batch data, and has the ability to use a single engine to calculate both data, and naturally, it can perform the mixed computing directly without converting to same type of data in advance.

Example: left join Oracle table and txt table. Note that the Oracle table is too large to be loaded into memory.

A B
1 =connect("ORCL") Connect to Oracle
2 =A1.cursor@x("select OrderID,Amount,Client,SellerID,OrderDate from orders where OrderDate>? && OrderDate<=?",arg1,arg2) Bounded data stream from Oracle
3 =T("d:/data/Employees.txt") txt table in memory
4 =A2.join(SellerID,A3:EID,Dept) Associate oracle table with txt table
5 =A4.groups(Dept, Client; sum(Amount),count(1)) Group and aggregate

For some mixed calculations, it must convert different types of data to the same type before computing in principle, for example, merge the streaming data and batch data, outer join of batch data and streaming data (the data are small in amount and bounded). In this case, data type conversion is required. However, since the stream computing technology has many streaming structured data types and batch structured data types, it involves many conversion relationships, resulting in a difficulty to directly convert and a need to hard code.

In contrast, SPL is a professional computing language, and has only two structured data types: table sequence (batch data) and cursor (streaming data), which can be conveniently converted from one to the other and vice versa. For example: merge and associate Oracle’s streaming records and externally passed Json string.

A B
1 =connect("ORCL").cursor@x("select * from Orders Order by SellerID") Ordered streaming data
2 =json(arg_JsonStr).sort(EID) Sort the batch data
3 =A2.cursor() Convert batch data to streaming data
4 =joinx(A1,SellerID;A3,EID) Merge two streaming data
5 =A4.groups(#2.Dept;sum(#1.Amount)) Group and aggregate

Advanced computing ability: simplifying complex calculation

SPL supports ordered computing, set computing, stepwise computing and association computing. By means of these abilities, complex structured data computation can be simplified. Example: calculate the top 3 voltage records for each sensor:

data.group(SensorID).(~.top(3;V))

SPL has true row number field, supports the ordered set, and can compute using intuitive code, i.e., group by SensorID and then compute the TopN of each group (represented by the symbol ~). The set orientation operation in SPL is more thorough, which can implement real grouping (grouping only, not aggregating). In this way, the in-group data can be calculated intuitively

Example 1, calculate the maximum consecutive days that a stock keeps rising:

A
1 =tbl.sort(day)
2 =t=0,A1.max(t=if(price>price[-1],t+1,0))

It is easy to express consecutive rising concept in SPL, which takes only two steps: sort by date, then traverse the records. If it is found the stock price rises, increment the counter by 1. In this code, 'max’ is a loop function that can traverse every record in turn; ‘[-1]’ is used for ordered set, representing the previous record, which is the representation method of relative position, and ‘price[-1]’ represents the stock price of the previous trading day, which is more intuitive than shifting the whole row (lag function in SQL).

Example 2: find out the top n customers whose cumulative sales account for half of the total sales:

A B
2 =sales.sort(amount:-1) /Sort records by sales in descending order (can be done in SQL)
3 =A2.cumulate(amount) /Get a sequence of cumulative amounts
4 =A3.m(-1)/2 /Calculate the final accumulative amount, i.e., the total
5 =A3.pselect(~>=A4) /Get the position where the amount reaches half of the total
6 =A2(to(A5)) /Get records by position

The set orientation in SPL is more thorough, as it can use variables to express the set conveniently, and continue to use variables to reference the set in the subsequent calculation, therefore, SPL is particularly suitable for multi-step calculation. Dividing a big problem into multiple small steps can easily achieve complex calculation objective, and the code is not only short, but easy to understand. In addition, multi-step calculation naturally supports debugging, which virtually improves development efficiency.

Moreover, SPL supports discrete records, and can use the dot notation to reference the associated table intuitively, thereby simplifying complex association calculation.

SPL provides a wealth of date and string functions, making it possible to effectively simplify related calculations.

Get the date before or after a specified number of quarters: elapse@q("2020-02-27",-3) // Return 2019-05-27

Get the date after N workdays: workday(date("2022-01-01"),25) // Return 2022-02-04

String functions: check if a string consists entirely of numbers: isdigit("12345") // Return true

Get a string before a specified substring: substr@l("abCDcdef","cd") // Return abCD

Split a string into an array of substrings by vertical bar: "aa|bb|cc".split("|") // Return \["aa","bb","cc"\]

SPL also offers many other functions, for example, get a date before or after specified number of years, get which quarter the date belongs to, split a string according to a regular expression, get words from a string, split HTML by the specific marker, etc.

It is worth mentioning that in order to further improve development efficiency, SPL invents unique function syntax. For example, we can use option to distinguish similar functions. If we only want to filter out the first record that meets condition, we can use the option @1:

T.select@1(Amount>1000)

To search for the first record from back to front, we can use the option @z:

T.select@z1(Amount>1000)

Advanced computing ability: high-performance computing

SPL offers a large number of high-performance computing functions, some of which (and syntax) are self-created and can achieve the same computing objective with lower time complexity, such as multipurpose traversal, double increment segmentation, and some functions are universal, with simpler code and more convenient to use, such as binary search and hash index.

imagepng

SPL provides high-performance storage format. A high-performance algorithm typically relies on high-performance storage, for example, the ordered merge and one-sided partitioning algorithms can be implemented only when data are stored in order. To this end, SPL provides a high-performance storage format for external storage, called composite table, which supports the data features such as columnar storage, ordered nature, compression, parallel computing, segmentation, primary key and index. The information density and computing performance of composite table far outperform those of ordinary formats, thus significantly improving the data loading speed and fully releasing the potential of high-performance algorithm. In addition, SPL provides a high-performance storage format for memory, called in-memory table, which supports in-memory compression and can be conveniently converted to and from composite table, allowing more data to be loaded into memory for high-performance in-memory computing. Refer to: SPL computing performance test series: TPCH for details.

SPL supports layering by temperature and routing by hot and cold data. Since many streaming data are generated at high speed, and the calculation involves too much data, the data must be stored in layers by temperature (usually by time), and overflow to the next layer level by level. During calculation, it is routed to the data of different temperatures by parameter (usually by time) to perform mixed computing. SPL supports and simplifies such calculation structure, for example: when a calculation involves historical cold data, recent warm data and real-time hot data, the second/minute level hot data are stored in an in-memory table and periodically written to warm data; the minute/hour level warm data are stored in a composite table and periodically written to cold data; and the hour/day/month level cold data are stored in RDB/data warehouse/data lake, and participate in large-span mixed computing when necessary. The actual project structure is more complex and has more layers, refer to Real-time storage and count of ultra-multi-point high-frequency time series data for details.

Clone this wiki locally