Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: how to SQL cli #320

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions sdf/_embeds/dataflows/sql_cli.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
apiVersion: 0.5.0
meta:
name: sql-example
version: 0.1.0
namespace: examples

config:
converter: json

types:
vehicle-data-type:
type: object
properties:
vehicle_id:
type: string
latitude:
type: f64
longitude:
type: f64
sensor_status:
type: string
fuel_consumption:
type: u32
engine_rpm:
type: u32
engine_temperature:
type: i32
speed:
type: float32

topics:
vehicle-sensor:
schema:
value:
type: vehicle-data-type
services:
collect-sensor-data:
sources:
- type: topic
id: vehicle-sensor
states:
vehicle-data:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
latitude:
type: f64
longitude:
type: f64
fuel_consumption:
type: u32
sensor_status:
type: string
engine_temperature:
type: i32

partition:
assign-key:
run: |
fn key_by_id(data: VehicleDataType) -> Result<String> {
Ok(data.vehicle_id)
}

update-state:
run: |
fn update_temperature(data: VehicleDataType) -> Result<()> {
let mut vd = vehicle_data();

vd.latitude = data.latitude;
vd.longitude = data.longitude;
vd.fuel_consumption = data.fuel_consumption;
vd.engine_temperature = data.engine_temperature;
vd.sensor_status = data.sensor_status;
vd.update()?;
Ok(())
}
187 changes: 187 additions & 0 deletions sdf/how-to/state_sql_cli.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
---
title: "State CLI SQL access"
description: "State Example using CLI SQL"
sidebar_position: 1050
---

import CodeBlock from '@theme/CodeBlock';
import SqlCLi from '!!raw-loader!../_embeds/dataflows/sql_cli.yaml';

This tutorial is a continuation from the previous state example. This tutorial shows how to use SQL interface through CLI to access state data.

## Prerequisites

This guide uses `local` Fluvio cluster. If you need to install it, please follow the instructions at [here][installation].

## Dataflow

### Overview
From previous examples we learned how to create and manipulate a state. Here we will do the same but also will use CLI interface to access it.

### Collect-sensor-data

#### 1. Define the state

For this state, we will track the `latitude`, `longitude`, `sensor_status` and more parameters from the sensors.

```YAML
states:
vehicle-data:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
latitude:
type: f64
longitude:
type: f64
fuel_consumption:
type: u32
sensor_status:
type: string
engine_temperature:
type: i32
```

Here, the `key` is a string but the value is stored as an `arrow-row` which can contain multiple `properties`(acts like columns).

#### 2. Assign key
We will access the id from the sensor data to partition the state.
```YAML
partition:
assign-key:
run: |
fn key_by_id(data: VehicleDataType) -> Result<String> {
Ok(data.vehicle_id)
}
update-state:
(...)
```
#### 3. Updating State
To update the state in an `arrow-row`, we need to update the individual row's columns manual and call an `update()`.
```YAML
partition:
assign-key:
(...)
update-state:
run: |
fn update_temperature(data: VehicleDataType) -> Result<()> {
let mut vd = vehicle_data();

vd.latitude = data.latitude;
vd.longitude = data.longitude;
vd.fuel_consumption = data.fuel_consumption;
vd.engine_temperature = data.engine_temperature;
vd.sensor_status = data.sensor_status;
vd.update()?;
Ok(())
}
```

States are terminal so no other action will be run.

In this example there is not other service consuming the state, we will use the SQL interface to access it from CLI.


## Running the Example
### Full Code
Copy and paste following config and save it as `dataflow.yaml`.
<CodeBlock language="yaml">{SqlCLi}</CodeBlock>

### Running SDF
To run example:
```bash copy="cmd"
$ sdf run
```

### Produce data
We will produce some data to mimic sensors behavior.
```bash copy="cmd"
$ echo '{ "timestamp": "2023-11-22T12:34:56Z", "vehicle_id": "V001", "latitude": 40.7128, "longitude": -74.0060, "speed": 60, "engine_temperature": 90, "engine_rpm": 2000, "fuel_consumption": 10, "sensor_status": "ok" }
' | fluvio produce vehicle-sensor
$ echo '{ "timestamp": "2023-11-22T12:35:01Z", "vehicle_id": "V002", "latitude": 34.0522, "longitude": -118.2437, "speed": 30, "engine_temperature": 85, "engine_rpm": 1500, "fuel_consumption": 8, "sensor_status": "failed"}
' | fluvio produce vehicle-sensor
```

### Enter SQL Mode

In the SDF interactive shell use the `sql` command to enter the SQL Mode:

```bash
>> sql
SDF SQL version sdf-beta5
Type .help for help.
sql >>
```

### Run queries on the SQL mode

In the SQL mode we will be able to access the dataframe states of the dataflow.

We can list the tables available with:

```bash
sql >> show tables
shape: (1, 1)
┌──────────────┐
│ name │
│ --- │
│ str │
╞══════════════╡
│ vehicle_data │
└──────────────┘
```

We can also perform normal sql queries:

```
select * from vehicle_data
shape: (2, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V001 ┆ 90 ┆ 10 ┆ 40.7128 ┆ -74.006 ┆ ok │
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘

sql >> select * from vehicle_data where sensor_status = 'failed'
shape: (1, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
```

### Exit the SQL mode

Use `.quit` or `.exit` to exit the SQL mode.

```bash
sql >> .quit
```

## Cleanup

Exit `sdf` terminal and clean-up. The `--force` flag removes the topics:

```bash copy="cmd"
$ sdf clean --force
```


## Conclusion

We just implement example accessing arrow states through SQL interface. The following link contains another example using the data from multiple states to perform a JOIN Query
1. [Join Example][github_sql_join]


[installation]: /docs/fluvio/quickstart#install-fluvio
[github_sql_join]: https://github.com/infinyon/stateful-dataflows-examples/tree/main/primitives/sql/join
2 changes: 1 addition & 1 deletion sdf/how-to/tumbling_window.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: "Tumbling Window"
description: "Tumbling Window"
sidebar_position: 1000
sidebar_position: 1100
---

import CodeBlock from '@theme/CodeBlock';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
apiVersion: 0.5.0
meta:
name: sql-example
version: 0.1.0
namespace: examples

config:
converter: json

types:
vehicle-data-type:
type: object
properties:
vehicle_id:
type: string
latitude:
type: f64
longitude:
type: f64
sensor_status:
type: string
fuel_consumption:
type: u32
engine_rpm:
type: u32
engine_temperature:
type: i32
speed:
type: float32

topics:
vehicle-sensor:
schema:
value:
type: vehicle-data-type
services:
collect-sensor-data:
sources:
- type: topic
id: vehicle-sensor
states:
vehicle-data:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
latitude:
type: f64
longitude:
type: f64
fuel_consumption:
type: u32
sensor_status:
type: string
engine_temperature:
type: i32

partition:
assign-key:
run: |
fn key_by_id(data: VehicleDataType) -> Result<String> {
Ok(data.vehicle_id)
}

update-state:
run: |
fn update_temperature(data: VehicleDataType) -> Result<()> {
let mut vd = vehicle_data();

vd.latitude = data.latitude;
vd.longitude = data.longitude;
vd.fuel_consumption = data.fuel_consumption;
vd.engine_temperature = data.engine_temperature;
vd.sensor_status = data.sensor_status;
vd.update()?;
Ok(())
}
Loading
Loading