Skip to content

Commit

Permalink
Created functions to SQLITE
Browse files Browse the repository at this point in the history
  • Loading branch information
bgelatti committed Nov 13, 2024
1 parent d79ff08 commit 962db64
Show file tree
Hide file tree
Showing 10 changed files with 1,262 additions and 124 deletions.
1,132 changes: 1,100 additions & 32 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 0 additions & 9 deletions plugins/mysql/src/Providers/DeviceData/importDeviceData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,6 @@ async function importDeviceData(
throw new Error("Device not found");
}
return new Promise((resolve, reject) => {
// TODO - Fix to use copy from mysql
// const sqlCommand = `COPY "${deviceID}" FROM '${fileName}' DELIMITER ',' CSV HEADER`;
// deviceDB.write.raw(sqlCommand).then(() => {
// resolve("Data imported successfully");
// }).catch((error) => {
// console.log(error);
// reject(error);
// });

const data: any[] = [];
fs.createReadStream(fileName)
.pipe(parse({ columns: true, encoding: "utf8" }))
Expand Down
60 changes: 37 additions & 23 deletions plugins/postgres/src/Providers/DeviceData/exportDeviceData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,44 @@ async function exportDeviceData(
folder: string,
): Promise<string> {
const client = await getDeviceConnection(deviceID, type);
return new Promise((resolve, reject) => {
const filename = `${folder}/${csvNameGenerator(`data_${deviceID}`)}`;
const file = fs.createWriteStream(filename);
file.setDefaultEncoding("utf8");

// TODO - Fix to use copy from mysql
// const sqlCommand = `COPY "${deviceID}" TO STDOUT WITH CSV HEADER`;
// const deviceDataStream = deviceDB.read.raw(sqlCommand).stream();
const deviceDataStream = client.read("data")?.select("*").stream();
if (!deviceDataStream) {
return reject("No data to export");
}

deviceDataStream
.pipe(stringify({ header: true }))
.pipe(file)
.on("finish", () => {
resolve(filename);
})
.on("error", (error) => {
console.log(error);
reject(error);
});

const filename = `${folder}/${csvNameGenerator(`data_${deviceID}`)}`;
const file = fs.createWriteStream(filename);
file.setDefaultEncoding("utf8");
const columns = [
"id",
"variable",
"type",
"value",
"unit",
"group",
"location",
"metadata",
"time",
"created_at",
];

const stringifier = stringify({
header: true,
columns,
cast: {
date: (value: Date) => value.toISOString(),
},
});

stringifier.pipe(file);

const deviceDataStream = client.read("data")?.select(columns).stream();

if (!deviceDataStream) {
return Promise.reject("No data to export");
}

for await (const row of deviceDataStream) {
stringifier.write(row);
}

return Promise.resolve(filename);
}

export default exportDeviceData;
32 changes: 13 additions & 19 deletions plugins/postgres/src/Providers/DeviceData/importDeviceData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type TGenericID,
generateResourceID,
zDeviceData,
zDeviceImmutableData,
} from "@tago-io/tcore-sdk/types";
import { parse } from "csv";
import type { Knex } from "knex";
Expand Down Expand Up @@ -37,28 +38,30 @@ function _parseData(row: any, device: IDevice) {
id: generateResourceID(),
variable: row.variable,
type: row.type,
value: !row.value ? null : row.value,
unit: !row.unit ? null : row.unit,
value: !row.value ? undefined : row.value,
unit: !row.unit ? undefined : row.unit,
group: row.group,
location: !row.location ? null : row.unit,
metadata: !row.metadata ? null : JSON.parse(row.location),
time: row.time,
location: !row.location ? undefined : JSON.parse(row.location),
metadata: !row.metadata ? undefined : JSON.parse(row.metadata),
time: new Date(row.time),
created_at: new Date(),
chunk_timestamp_start: null,
chunk_timestamp_end: null,
serie: !row.serie ? null : row.unit,
chunk_timestamp_start: undefined,
chunk_timestamp_end: undefined,
serie: !row.serie ? undefined : row.serie,
};

let zodData: any;
if (device.chunk_period) {
const chunkTimestamp = _getChunkTimestamp(new Date(row.time), device);
if (chunkTimestamp) {
parsedData.chunk_timestamp_start = chunkTimestamp.startDate;
parsedData.chunk_timestamp_end = chunkTimestamp.endDate;
}
zodData = zDeviceImmutableData.parse(parsedData);
} else {
zodData = zDeviceData.parse(parsedData);
}

const zodData = zDeviceData.parse(parsedData);

return zodData;
}

Expand Down Expand Up @@ -86,15 +89,6 @@ async function importDeviceData(
throw new Error("Device not found");
}
return new Promise((resolve, reject) => {
// TODO - Fix to use copy from postgres
// const sqlCommand = `COPY "${deviceID}" FROM '${fileName}' DELIMITER ',' CSV HEADER`;
// deviceDB.write.raw(sqlCommand).then(() => {
// resolve("Data imported successfully");
// }).catch((error) => {
// console.log(error);
// reject(error);
// });

const data: any[] = [];
fs.createReadStream(fileName)
.pipe(parse({ columns: true, encoding: "utf8" }))
Expand Down
2 changes: 1 addition & 1 deletion plugins/sqlite/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"dependencies": {
"knex": "3.1.0",
"better-sqlite3": "11.5.0",
"sqlite3": "5.1.7",
"csv": "6.3.10"
},
"devDependencies": {}
Expand Down
2 changes: 1 addition & 1 deletion plugins/sqlite/src/Helpers/DeviceDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export async function createDeviceConnection(
const filename = await helpers.getFileURI(path.join("devices", `${id}.db`));

const connection = knex({
client: "better-sqlite3",
client: "sqlite3",
connection: { filename },
useNullAsDefault: true,
});
Expand Down
2 changes: 1 addition & 1 deletion plugins/sqlite/src/Helpers/PluginConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async function createPluginConnection(id: TGenericID) {
const filename = await helpers.getFileURI(path.join("plugins", `${id}.db`));

const connection = knex({
client: "better-sqlite3",
client: "sqlite3",
connection: { filename },
useNullAsDefault: true,
});
Expand Down
51 changes: 36 additions & 15 deletions plugins/sqlite/src/Providers/DeviceData/exportDeviceData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,43 @@ async function exportDeviceData(
folder: string,
): Promise<string> {
const client = await getDeviceConnection(deviceID, type);
return new Promise((resolve, reject) => {
const deviceDataStream = client.select("*").from("data").stream();
const filename = `${folder}/${csvNameGenerator(`data_${deviceID}`)}`;
const file = fs.createWriteStream(filename);
file.setDefaultEncoding("utf8");

deviceDataStream
.pipe(stringify({ header: true }))
.pipe(file)
.on("finish", () => {
resolve(filename);
})
.on("error", (error) => {
reject(error);
});

const filename = `${folder}/${csvNameGenerator(`data_${deviceID}`)}`;
const file = fs.createWriteStream(filename);
file.setDefaultEncoding("utf8");
const columns = [
"id",
"variable",
"type",
"value",
"unit",
"group",
"location",
"metadata",
"time",
"created_at",
];

const stringifier = stringify({
header: true,
columns,
});

stringifier.pipe(file);

const deviceDataStream = client.select(columns).from("data").stream();

if (!deviceDataStream) {
return Promise.reject("No data to export");
}

for await (const row of deviceDataStream) {
row.time = new Date(row.time).toISOString();
row.created_at = new Date(row.created_at).toISOString();
stringifier.write(row);
}

return Promise.resolve(filename);
}

export default exportDeviceData;
94 changes: 72 additions & 22 deletions plugins/sqlite/src/Providers/DeviceData/importDeviceData.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import fs from "node:fs";
import {
type IDevice,
type TDeviceType,
type TGenericID,
generateResourceID,
zDeviceData,
zDeviceImmutableData,
} from "@tago-io/tcore-sdk/types";
import { parse } from "csv";
import type { Knex } from "knex";
import { getDeviceConnection } from "../../Helpers/DeviceDatabase.ts";
import { DateTime } from "luxon";
import getDeviceInfo from "../Device/getDeviceInfo.ts";

async function _insertData(client: Knex, data: any[]) {
client
Expand All @@ -19,6 +24,67 @@ async function _insertData(client: Knex, data: any[]) {
});
}

/**
* Gets the chunk timestamps for a date.
*/
function _getChunkTimestamp(date: Date, device: IDevice) {
const dateJS = DateTime.fromJSDate(date).toUTC();

if (!device?.chunk_period) {
return null;
}

if (!dateJS.isValid) {
throw "Invalid Database Chunk Address (date)";
}

const startDate = dateJS.startOf(device.chunk_period).toJSDate();
const endDate = dateJS.endOf(device.chunk_period).toJSDate();

return { startDate, endDate };
}

function _parseData(row: any, device: IDevice) {
const parsedData: any = {
id: generateResourceID(),
variable: row.variable,
type: row.type,
value: !row.value ? undefined : row.value,
unit: !row.unit ? undefined : row.unit,
group: row.group,
location: !row.location ? undefined : JSON.parse(row.location),
metadata: !row.metadata ? undefined : JSON.parse(row.metadata),
time: new Date(row.time),
created_at: new Date(),
chunk_timestamp_start: undefined,
chunk_timestamp_end: undefined,
serie: !row.serie ? undefined : row.serie,
};

let zodData: any;
if (device.chunk_period) {
const chunkTimestamp = _getChunkTimestamp(new Date(row.time), device);
if (chunkTimestamp) {
parsedData.chunk_timestamp_start = chunkTimestamp.startDate;
parsedData.chunk_timestamp_end = chunkTimestamp.endDate;
}
zodData = zDeviceImmutableData.parse(parsedData);
} else {
zodData = zDeviceData.parse(parsedData);
}

// This is to fix json type in sqlite
if (zodData.location){
zodData.location = JSON.stringify(zodData.location);
}

if (zodData.metadata){
zodData.metadata = JSON.stringify(zodData.metadata);
}

return zodData;
}

/**
* Import a device data from CSV file.
*/
Expand All @@ -28,34 +94,18 @@ async function importDeviceData(
fileName: string,
): Promise<string> {
const client = await getDeviceConnection(deviceID, type);
const device = await getDeviceInfo(deviceID);
if (!device) {
throw new Error("Device not found");
}
return new Promise((resolve, reject) => {
const data: any[] = [];
fs.createReadStream(fileName)
.pipe(parse({ columns: true, encoding: "utf8" }))
.on("data", (row) => {
row.id = generateResourceID();

if (!row.location) {
row.location = null;
}

if (!row.metadata) {
row.metadata = null;
}

if (!row.value) {
row.value = null;
}

if (!row.unit) {
row.unit = null;
}

if (!row.serie) {
row.serie = null;
}
const parsedData = _parseData(row, device);
data.push(parsedData);

data.push(row);
if (data.length === 1000) {
_insertData(client, data);
data.length = 0;
Expand Down
2 changes: 1 addition & 1 deletion plugins/sqlite/src/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export async function setupKnex(data: any) {
const filename = data.file;

knexClient = knex({
client: "better-sqlite3",
client: "sqlite3",
connection: { filename },
useNullAsDefault: true,
});
Expand Down

0 comments on commit 962db64

Please sign in to comment.