-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Luis Moreno <[email protected]>
- Loading branch information
Showing
6 changed files
with
536 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
apiVersion: 0.5.0 | ||
meta: | ||
name: sql-example | ||
version: 0.1.0 | ||
namespace: examples | ||
|
||
config: | ||
converter: json | ||
|
||
types: | ||
vehicle-data-type: | ||
type: object | ||
properties: | ||
vehicle_id: | ||
type: string | ||
latitude: | ||
type: f64 | ||
longitude: | ||
type: f64 | ||
sensor_status: | ||
type: string | ||
fuel_consumption: | ||
type: u32 | ||
engine_rpm: | ||
type: u32 | ||
engine_temperature: | ||
type: i32 | ||
speed: | ||
type: float32 | ||
|
||
topics: | ||
vehicle-sensor: | ||
schema: | ||
value: | ||
type: vehicle-data-type | ||
services: | ||
collect-sensor-data: | ||
sources: | ||
- type: topic | ||
id: vehicle-sensor | ||
states: | ||
vehicle-data: | ||
type: keyed-state | ||
properties: | ||
key: | ||
type: string | ||
value: | ||
type: arrow-row | ||
properties: | ||
latitude: | ||
type: f64 | ||
longitude: | ||
type: f64 | ||
fuel_consumption: | ||
type: u32 | ||
sensor_status: | ||
type: string | ||
engine_temperature: | ||
type: i32 | ||
|
||
partition: | ||
assign-key: | ||
run: | | ||
fn key_by_id(data: VehicleDataType) -> Result<String> { | ||
Ok(data.vehicle_id) | ||
} | ||
update-state: | ||
run: | | ||
fn update_temperature(data: VehicleDataType) -> Result<()> { | ||
let mut vd = vehicle_data(); | ||
vd.latitude = data.latitude; | ||
vd.longitude = data.longitude; | ||
vd.fuel_consumption = data.fuel_consumption; | ||
vd.engine_temperature = data.engine_temperature; | ||
vd.sensor_status = data.sensor_status; | ||
vd.update()?; | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
--- | ||
title: "State CLI SQL access" | ||
description: "State Example using CLI SQL" | ||
sidebar_position: 1050 | ||
--- | ||
|
||
import CodeBlock from '@theme/CodeBlock'; | ||
import SqlCLi from '!!raw-loader!../_embeds/dataflows/sql_cli.yaml'; | ||
|
||
This tutorial is a continuation from the previous state example. This tutorial shows how to use SQL interface through CLI to access state data. | ||
|
||
## Prerequisites | ||
|
||
This guide uses `local` Fluvio cluster. If you need to install it, please follow the instructions at [here][installation]. | ||
|
||
## Dataflow | ||
|
||
### Overview | ||
From previous examples we learned how to create and manipulate a state. Here we will do the same but also will use CLI interface to access it. | ||
|
||
### Collect-sensor-data | ||
|
||
#### 1. Define the state | ||
|
||
For this state, we will track the `latitude`, `longitude`, `sensor_status` and more parameters from the sensors. | ||
|
||
```YAML | ||
states: | ||
vehicle-data: | ||
type: keyed-state | ||
properties: | ||
key: | ||
type: string | ||
value: | ||
type: arrow-row | ||
properties: | ||
latitude: | ||
type: f64 | ||
longitude: | ||
type: f64 | ||
fuel_consumption: | ||
type: u32 | ||
sensor_status: | ||
type: string | ||
engine_temperature: | ||
type: i32 | ||
``` | ||
Here, the `key` is a string but the value is stored as an `arrow-row` which can contain multiple `properties`(acts like columns). | ||
|
||
#### 2. Assign key | ||
We will access the id from the sensor data to partition the state. | ||
```YAML | ||
partition: | ||
assign-key: | ||
run: | | ||
fn key_by_id(data: VehicleDataType) -> Result<String> { | ||
Ok(data.vehicle_id) | ||
} | ||
update-state: | ||
(...) | ||
``` | ||
#### 3. Updating State | ||
To update the state in an `arrow-row`, we need to update the individual row's columns manual and call an `update()`. | ||
```YAML | ||
partition: | ||
assign-key: | ||
(...) | ||
update-state: | ||
run: | | ||
fn update_temperature(data: VehicleDataType) -> Result<()> { | ||
let mut vd = vehicle_data(); | ||
vd.latitude = data.latitude; | ||
vd.longitude = data.longitude; | ||
vd.fuel_consumption = data.fuel_consumption; | ||
vd.engine_temperature = data.engine_temperature; | ||
vd.sensor_status = data.sensor_status; | ||
vd.update()?; | ||
Ok(()) | ||
} | ||
``` | ||
|
||
States are terminal so no other action will be run. | ||
|
||
In this example there is not other service consuming the state, we will use the SQL interface to access it from CLI. | ||
|
||
|
||
## Running the Example | ||
### Full Code | ||
Copy and paste following config and save it as `dataflow.yaml`. | ||
<CodeBlock language="yaml">{SqlCLi}</CodeBlock> | ||
|
||
### Running SDF | ||
To run example: | ||
```bash copy="cmd" | ||
$ sdf run | ||
``` | ||
|
||
### Produce data | ||
We will produce some data to mimic sensors behavior. | ||
```bash copy="cmd" | ||
$ echo '{ "timestamp": "2023-11-22T12:34:56Z", "vehicle_id": "V001", "latitude": 40.7128, "longitude": -74.0060, "speed": 60, "engine_temperature": 90, "engine_rpm": 2000, "fuel_consumption": 10, "sensor_status": "ok" } | ||
' | fluvio produce vehicle-sensor | ||
$ echo '{ "timestamp": "2023-11-22T12:35:01Z", "vehicle_id": "V002", "latitude": 34.0522, "longitude": -118.2437, "speed": 30, "engine_temperature": 85, "engine_rpm": 1500, "fuel_consumption": 8, "sensor_status": "failed"} | ||
' | fluvio produce vehicle-sensor | ||
``` | ||
|
||
### Enter SQL Mode | ||
|
||
In the SDF interactive shell use the `sql` command to enter the SQL Mode: | ||
|
||
```bash | ||
>> sql | ||
SDF SQL version sdf-beta5 | ||
Type .help for help. | ||
sql >> | ||
``` | ||
|
||
### Run queries on the SQL mode | ||
|
||
In the SQL mode we will be able to access the dataframe states of the dataflow. | ||
|
||
We can list the tables available with: | ||
|
||
```bash | ||
sql >> show tables | ||
shape: (1, 1) | ||
┌──────────────┐ | ||
│ name │ | ||
│ --- │ | ||
│ str │ | ||
╞══════════════╡ | ||
│ vehicle_data │ | ||
└──────────────┘ | ||
``` | ||
|
||
We can also perform normal sql queries: | ||
|
||
``` | ||
select * from vehicle_data | ||
shape: (2, 6) | ||
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐ | ||
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │ | ||
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ | ||
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │ | ||
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡ | ||
│ V001 ┆ 90 ┆ 10 ┆ 40.7128 ┆ -74.006 ┆ ok │ | ||
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │ | ||
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘ | ||
sql >> select * from vehicle_data where sensor_status = 'failed' | ||
shape: (1, 6) | ||
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐ | ||
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │ | ||
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ | ||
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │ | ||
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡ | ||
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │ | ||
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘ | ||
``` | ||
|
||
### Exit the SQL mode | ||
|
||
Use `.quit` or `.exit` to exit the SQL mode. | ||
|
||
```bash | ||
sql >> .quit | ||
``` | ||
|
||
## Cleanup | ||
|
||
Exit `sdf` terminal and clean-up. The `--force` flag removes the topics: | ||
|
||
```bash copy="cmd" | ||
$ sdf clean --force | ||
``` | ||
|
||
|
||
## Conclusion | ||
|
||
We just implement example accessing arrow states through SQL interface. The following link contains another example using the data from multiple states to perform a JOIN Query | ||
1. [Join Example][github_sql_join] | ||
|
||
|
||
[installation]: /docs/fluvio/quickstart#install-fluvio | ||
[github_sql_join]: https://github.com/infinyon/stateful-dataflows-examples/tree/main/primitives/sql/join |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
sdf_versioned_docs/version-sdf-beta5/_embeds/dataflows/sql_cli.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
apiVersion: 0.5.0 | ||
meta: | ||
name: sql-example | ||
version: 0.1.0 | ||
namespace: examples | ||
|
||
config: | ||
converter: json | ||
|
||
types: | ||
vehicle-data-type: | ||
type: object | ||
properties: | ||
vehicle_id: | ||
type: string | ||
latitude: | ||
type: f64 | ||
longitude: | ||
type: f64 | ||
sensor_status: | ||
type: string | ||
fuel_consumption: | ||
type: u32 | ||
engine_rpm: | ||
type: u32 | ||
engine_temperature: | ||
type: i32 | ||
speed: | ||
type: float32 | ||
|
||
topics: | ||
vehicle-sensor: | ||
schema: | ||
value: | ||
type: vehicle-data-type | ||
services: | ||
collect-sensor-data: | ||
sources: | ||
- type: topic | ||
id: vehicle-sensor | ||
states: | ||
vehicle-data: | ||
type: keyed-state | ||
properties: | ||
key: | ||
type: string | ||
value: | ||
type: arrow-row | ||
properties: | ||
latitude: | ||
type: f64 | ||
longitude: | ||
type: f64 | ||
fuel_consumption: | ||
type: u32 | ||
sensor_status: | ||
type: string | ||
engine_temperature: | ||
type: i32 | ||
|
||
partition: | ||
assign-key: | ||
run: | | ||
fn key_by_id(data: VehicleDataType) -> Result<String> { | ||
Ok(data.vehicle_id) | ||
} | ||
update-state: | ||
run: | | ||
fn update_temperature(data: VehicleDataType) -> Result<()> { | ||
let mut vd = vehicle_data(); | ||
vd.latitude = data.latitude; | ||
vd.longitude = data.longitude; | ||
vd.fuel_consumption = data.fuel_consumption; | ||
vd.engine_temperature = data.engine_temperature; | ||
vd.sensor_status = data.sensor_status; | ||
vd.update()?; | ||
Ok(()) | ||
} |
Oops, something went wrong.