Skip to content

Commit

Permalink
Add UDAF(User Defined Aggregation Function) documentation. (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sh-Zh-7 authored Feb 27, 2024
1 parent 6761d7c commit 2c45048
Show file tree
Hide file tree
Showing 4 changed files with 962 additions and 4 deletions.
256 changes: 255 additions & 1 deletion src/UserGuide/Master/User-Manual/Database-Programming.md
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ In IoTDB, you can expand two types of UDF:
| UDF Class | Description |
| --------------------------------------------------- | ------------------------------------------------------------ |
| UDTF(User Defined Timeseries Generating Function) | This type of function can take **multiple** time series as input, and output **one** time series, which can have any number of data points. |
| UDAF(User Defined Aggregation Function) | Under development, please stay tuned. |
| UDAF(User Defined Aggregation Function) | Custom Aggregation Functions. This type of function can take one time series as input, and output **one** aggregated data point for each group based on the GROUP BY type. |

### UDF Development Dependencies

Expand Down Expand Up @@ -1410,6 +1410,260 @@ This method is called by the framework. For a UDF instance, `beforeDestroy` will



### UDAF (User Defined Aggregation Function)

A complete definition of UDAF involves two classes, `State` and `UDAF`.

#### State Class

To write your own `State`, you need to implement the `org.apache.iotdb.udf.api.State` interface.

The following table shows all the interfaces available for user implementation.

| Interface Definition | Description | Required to Implement |
| -------------------------------- | ------------------------------------------------------------ | --------------------- |
| `void reset()` | To reset the `State` object to its initial state, you need to fill in the initial values of the fields in the `State` class within this method as if you were writing a constructor. | Required |
| `byte[] serialize()` | Serializes `State` to binary data. This method is used for IoTDB internal `State` passing. Note that the order of serialization must be consistent with the following deserialization methods. | Required |
| `void deserialize(byte[] bytes)` | Deserializes binary data to `State`. This method is used for IoTDB internal `State` passing. Note that the order of deserialization must be consistent with the serialization method above. | Required |

The following section describes the usage of each interface in detail.



##### void reset()

This method resets the `State` to its initial state, you need to fill in the initial values of the fields in the `State` object in this method. For optimization reasons, IoTDB reuses `State` as much as possible internally, rather than creating a new `State` for each group, which would introduce unnecessary overhead. When `State` has finished updating the data in a group, this method is called to reset to the initial state as a way to process the next group.

In the case of `State` for averaging (aka `avg`), for example, you would need the sum of the data, `sum`, and the number of entries in the data, `count`, and initialize both to 0 in the `reset()` method.

```java
class AvgState implements State {
double sum;

long count;

@Override
public void reset() {
sum = 0;
count = 0;
}

// other methods
}
```



##### byte[] serialize()/void deserialize(byte[] bytes)

These methods serialize the `State` into binary data, and deserialize the `State` from the binary data. IoTDB, as a distributed database, involves passing data among different nodes, so you need to write these two methods to enable the passing of the State among different nodes. Note that the order of serialization and deserialization must be the consistent.

In the case of `State` for averaging (aka `avg`), for example, you can convert the content of State to `byte[]` array and read out the content of State from `byte[]` array in any way you want, the following shows the code for serialization/deserialization using `ByteBuffer` introduced by Java8:

```java
@Override
public byte[] serialize() {
ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES);
buffer.putDouble(sum);
buffer.putLong(count);

return buffer.array();
}

@Override
public void deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
sum = buffer.getDouble();
count = buffer.getLong();
}
```



#### UDAF Classes

To write a UDAF, you need to implement the `org.apache.iotdb.udf.api.UDAF` interface.

The following table shows all the interfaces available for user implementation.

| Interface definition | Description | Required to Implement |
| ------------------------------------------------------------ | ------------------------------------------------------------ | --------------------- |
| `void validate(UDFParameterValidator validator) throws Exception` | This method is mainly used to validate `UDFParameters` and it is executed before `beforeStart(UDFParameters, UDTFConfigurations)` is called. | Optional |
| `void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception` | Initialization method that invokes user-defined initialization behavior before UDAF processes the input data. Unlike UDTF, configuration is of type `UDAFConfiguration`. | Required |
| `State createState()` | To create a `State` object, usually just call the default constructor and modify the default initial value as needed. | Required |
| `void addInput(State state, Column[] columns, BitMap bitMap)` | Update `State` object according to the incoming data `Column[]` in batch, note that `column[0]` always represents the time column. In addition, `BitMap` represents the data that has been filtered out before, you need to manually determine whether the corresponding data has been filtered out when writing this method. | Required |
| `void combineState(State state, State rhs)` | Merge `rhs` state into `state` state. In a distributed scenario, the same set of data may be distributed on different nodes, IoTDB generates a `State` object for the partial data on each node, and then calls this method to merge it into the complete `State`. | Required |
| `void outputFinal(State state, ResultValue resultValue)` | Computes the final aggregated result based on the data in `State`. Note that according to the semantics of the aggregation, only one value can be output per group. | Required |
| `void beforeDestroy() ` | This method is called by the framework after the last input data is processed, and will only be called once in the life cycle of each UDF instance. | Optional |

In the life cycle of a UDAF instance, the calling sequence of each method is as follows:

1. `State createState()`
2. `void validate(UDFParameterValidator validator) throws Exception`
3. `void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception`
4. `void addInput(State state, Column[] columns, BitMap bitMap)`
5. `void combineState(State state, State rhs)`
6. `void outputFinal(State state, ResultValue resultValue)`
7. `void beforeDestroy()`

Similar to UDTF, every time the framework executes a UDAF query, a new UDF instance will be constructed. When the query ends, the corresponding instance will be destroyed. Therefore, the internal data of the instances in different UDAF queries (even in the same SQL statement) are isolated. You can maintain some state data in the UDAF without considering the influence of concurrency and other factors.

The usage of each interface will be described in detail below.



##### void validate(UDFParameterValidator validator) throws Exception

Same as UDTF, the `validate` method is used to validate the parameters entered by the user.

In this method, you can limit the number and types of input time series, check the attributes of user input, or perform any custom verification.



##### void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception

The `beforeStart` method does the same thing as the UDAF:

1. Use UDFParameters to get the time series paths and parse key-value pair attributes entered by the user.
2. Set the strategy to access the raw data and set the output data type in UDAFConfigurations.
3. Create resources, such as establishing external connections, opening files, etc.

The role of the `UDFParameters` type can be seen above.

###### UDAFConfigurations

The difference from UDTF is that UDAF uses `UDAFConfigurations` as the type of `configuration` object.

Currently, this class only supports setting the type of output data.

```java
void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception {
// parameters
// ...

// configurations
configurations
.setOutputDataType(Type.INT32); }
}
```

The relationship between the output type set in `setOutputDataType` and the type of data output that `ResultValue` can actually receive is as follows:

| The output type set in `setOutputDataType` | The output type that `ResultValue` can actually receive |
| ------------------------------------------ | ------------------------------------------------------- |
| `INT32` | `int` |
| `INT64` | `long` |
| `FLOAT` | `float` |
| `DOUBLE` | `double` |
| `BOOLEAN` | `boolean` |
| `TEXT` | `org.apache.iotdb.udf.api.type.Binary` |

The output type of the UDAF is determined at runtime. You can dynamically determine the output sequence type based on the input type.

Here is a simple example:

```java
void beforeStart(UDFParameters parameters, UDAFConfigurations configurations) throws Exception {
// do something
// ...

configurations
.setOutputDataType(parameters.getDataType(0));
}
```



##### State createState()

This method creates and initializes a `State` object for UDAF. Due to the limitations of the Java language, you can only call the default constructor for the `State` class. The default constructor assigns a default initial value to all the fields in the class, and if that initial value does not meet your requirements, you need to initialize them manually within this method.

The following is an example that includes manual initialization. Suppose you want to implement an aggregate function that multiply all numbers in the group, then your initial `State` value should be set to 1, but the default constructor initializes it to 0, so you need to initialize `State` manually after calling the default constructor:

```java
public State createState() {
MultiplyState state = new MultiplyState();
state.result = 1;
return state;
}
```



##### void addInput(State state, Column[] columns, BitMap bitMap)

This method updates the `State` object with the raw input data. For performance reasons, also to align with the IoTDB vectorized query engine, the raw input data is no longer a data point, but an array of columns ``Column[]``. Note that the first column (i.e. `column[0]`) is always the time column, so you can also do different operations in UDAF depending on the time.

Since the input parameter is not of a single data point type, but of multiple columns, you need to manually filter some of the data in the columns, which is why the third parameter, `BitMap`, exists. It identifies which of these columns have been filtered out, so you don't have to think about the filtered data in any case.

Here's an example of `addInput()` that counts the number of items (aka count). It shows how you can use `BitMap` to ignore data that has been filtered out. Note that due to the limitations of the Java language, you need to do the explicit cast the `State` object from type defined in the interface to a custom `State` type at the beginning of the method, otherwise you won't be able to use the `State` object.

```java
public void addInput(State state, Column[] column, BitMap bitMap) {
CountState countState = (CountState) state;

int count = column[0].getPositionCount();
for (int i = 0; i < count; i++) {
if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
if (!column[1].isNull(i)) {
countState.count++;
}
}
}
```



##### void combineState(State state, State rhs)

This method combines two `State`s, or more precisely, updates the first `State` object with the second `State` object. IoTDB is a distributed database, and the data of the same group may be distributed on different nodes. For performance reasons, IoTDB will first aggregate some of the data on each node into `State`, and then merge the `State`s on different nodes that belong to the same group, which is what `combineState` does.

Here's an example of `combineState()` for averaging (aka avg). Similar to `addInput`, you need to do an explicit type conversion for the two `State`s at the beginning. Also note that you are updating the value of the first `State` with the contents of the second `State`.

```java
public void combineState(State state, State rhs) {
AvgState avgState = (AvgState) state;
AvgState avgRhs = (AvgState) rhs;

avgState.count += avgRhs.count;
avgState.sum += avgRhs.sum;
}
```



##### void outputFinal(State state, ResultValue resultValue)

This method works by calculating the final result from `State`. You need to access the various fields in `State`, derive the final result, and set the final result into the `ResultValue` object.IoTDB internally calls this method once at the end for each group. Note that according to the semantics of aggregation, the final result can only be one value.

Here is another `outputFinal` example for averaging (aka avg). In addition to the forced type conversion at the beginning, you will also see a specific use of the `ResultValue` object, where the final result is set by `setXXX` (where `XXX` is the type name).

```java
public void outputFinal(State state, ResultValue resultValue) {
AvgState avgState = (AvgState) state;

if (avgState.count != 0) {
resultValue.setDouble(avgState.sum / avgState.count);
} else {
resultValue.setNull();
}
}
```



##### void beforeDestroy()

The method for terminating a UDF.

This method is called by the framework. For a UDF instance, `beforeDestroy` will be called after the last record is processed. In the entire life cycle of the instance, `beforeDestroy` will only be called once.





### Maven Project Example

If you use Maven, you can build your own UDF project referring to our **udf-example** module. You can find the project [here](https://github.com/apache/iotdb/tree/master/example/udf).
Expand Down
Loading

0 comments on commit 2c45048

Please sign in to comment.