Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query WAL/Buffer + Timestamps #55

Open
wants to merge 61 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
461f6fa
query memory buffer
Feb 10, 2025
06aedab
query mem buffer
Feb 10, 2025
1053555
query mem buffer
Feb 10, 2025
caccb65
query mem buffer
Feb 10, 2025
58f24a2
query mem buffer
Feb 10, 2025
142b1e1
query mem buffer
Feb 10, 2025
f491dd5
query mem buffer
Feb 10, 2025
46c994e
query mem buffer
Feb 10, 2025
80f9745
query mem buffer
Feb 10, 2025
f8eb918
query mem buffer
Feb 10, 2025
1c712e0
Add test (#53)
lmangani Feb 10, 2025
e53ce54
query mem buffer
Feb 11, 2025
4ea67f6
fix ui
Feb 11, 2025
fad6316
fix ui
Feb 11, 2025
563aa4d
fix ui
Feb 11, 2025
e8c5a2c
fix ui
Feb 11, 2025
13dbbac
fix ui
Feb 11, 2025
f30991e
fix ui
Feb 11, 2025
90a3cb1
fix ui
Feb 11, 2025
6d3ceb9
fix ui
Feb 11, 2025
b1c39dc
fix ui
Feb 11, 2025
dc1e507
fix ui
Feb 11, 2025
949d299
fix ui
Feb 11, 2025
b2bef81
fix ui
Feb 11, 2025
b6b4b32
fix timestamping
Feb 11, 2025
20594a8
fix timestamping
Feb 11, 2025
fdd025d
fix timestamping
Feb 11, 2025
43bc807
fix union queries
Feb 11, 2025
7415791
Merge branch 'master' into buffer
lmangani Feb 11, 2025
9030cc3
Update loopback_test.sh
lmangani Feb 11, 2025
64c5c4e
fix timestamping
Feb 11, 2025
45f5592
fix timestamping
Feb 11, 2025
3bd35e4
fix timestamping
Feb 11, 2025
a223398
fix timestamping
Feb 11, 2025
10df828
fix timestamping
Feb 11, 2025
0461b4c
fix timestamping
Feb 11, 2025
fa014ce
fix timestamping
Feb 11, 2025
dbf5e9b
fix timestamping
Feb 11, 2025
258974e
fix timestamping
Feb 11, 2025
b853367
fix timestamping
Feb 11, 2025
b1d1a85
fix timestamping
Feb 11, 2025
92e10f8
fix timestamping
Feb 11, 2025
49b1f6f
restore buffer version
lmangani Feb 11, 2025
a25d234
fix timestamping
Feb 11, 2025
31a111d
fix timestamping
Feb 11, 2025
43b6cfd
fix timestamping
Feb 11, 2025
be38be7
fix timestamping
Feb 11, 2025
5e22726
fix timestamping
Feb 11, 2025
54138a5
fix timestamping
Feb 11, 2025
b5cf06f
fix timestamping
Feb 11, 2025
5fb1b39
fix timestamping
Feb 11, 2025
3b5e6dc
restore
lmangani Feb 11, 2025
dbea76b
fix timestamping
Feb 11, 2025
32d9631
fix timestamping
Feb 11, 2025
9faf5cd
fix timestamping
Feb 11, 2025
d579bb5
fix WHERE handler
Feb 12, 2025
35c6d26
fix WHERE handler
Feb 14, 2025
fda4b27
fix WHERE handler
Feb 14, 2025
07896b0
fix WHERE handler
Feb 14, 2025
2f8dd0e
fix WHERE handler
Feb 14, 2025
02291d9
fix WHERE handler
Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/loopback_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ while true; do
echo "Total sent: $counter"
fi

# Wait for 1 second before sending the next request
# Wait for x before sending the next request
read -t 0.5

done
238 changes: 164 additions & 74 deletions hepop.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class ParquetBufferManager {
tags: { type: 'UTF8' }, // JSON string of tags
// Dynamic fields will be added based on data
});

// Add metadata write queue
this.metadataQueue = new Map(); // type -> Promise
}

async initialize() {
Expand Down Expand Up @@ -113,7 +116,27 @@ class ParquetBufferManager {

async getFilePath(type, timestamp) {
const typeMetadata = await this.getTypeMetadata(type);
const date = new Date(timestamp);

// Handle nanosecond timestamps
let date;
if (typeof timestamp === 'number') {
// Keep nanosecond precision by using floor division for date parts
const ms = Math.floor(timestamp / 1000000); // Get milliseconds
date = new Date(ms);
} else if (typeof timestamp === 'string') {
// Parse string timestamp
date = new Date(timestamp);
} else if (timestamp instanceof Date) {
date = timestamp;
} else {
throw new Error('Invalid timestamp format');
}

if (isNaN(date.getTime())) {
throw new Error(`Invalid date from timestamp: ${timestamp}`);
}

// Use date for directory structure only
const datePath = date.toISOString().split('T')[0];
const hour = date.getHours().toString().padStart(2, '0');
const minute = Math.floor(date.getMinutes() / 10) * 10;
Expand Down Expand Up @@ -225,21 +248,33 @@ class ParquetBufferManager {
}

async writeTypeMetadata(type, metadata) {
const metadataPath = this.getTypeMetadataPath(type);
await fs.promises.mkdir(path.dirname(metadataPath), { recursive: true });

const tempPath = `${metadataPath}.tmp`;
try {
await fs.promises.writeFile(tempPath, JSON.stringify(metadata, null, 2));
await fs.promises.rename(tempPath, metadataPath);
} catch (error) {
// Get or create queue for this type
if (!this.metadataQueue.has(type)) {
this.metadataQueue.set(type, Promise.resolve());
}

// Add write to queue
const queue = this.metadataQueue.get(type);
const writePromise = queue.then(async () => {
const metadataPath = this.getTypeMetadataPath(type);
await fs.promises.mkdir(path.dirname(metadataPath), { recursive: true });

const tempPath = `${metadataPath}.tmp`;
try {
await fs.promises.unlink(tempPath);
} catch (e) {
// Ignore cleanup errors
await fs.promises.writeFile(tempPath, JSON.stringify(metadata, null, 2));
await fs.promises.rename(tempPath, metadataPath);
} catch (error) {
try {
await fs.promises.unlink(tempPath);
} catch (e) {
// Ignore cleanup errors
}
throw error;
}
throw error;
}
});

this.metadataQueue.set(type, writePromise);
return writePromise;
}

async updateMetadata(type, filePath, sizeBytes, rowCount, timestamps) {
Expand Down Expand Up @@ -374,50 +409,78 @@ class ParquetBufferManager {
}

async addLineProtocolBulk(measurement, rows) {
// Use measurement directly as type (like HEP types)
const type = measurement;

if (!this.buffers.has(type)) {
// Create new schema for this measurement including its fields
const schema = new parquet.ParquetSchema({
// Create schema from first row to ensure correct types
const firstRow = rows[0];
const schemaFields = {
timestamp: { type: 'TIMESTAMP_MILLIS' },
tags: { type: 'UTF8' },
...Object.entries(rows[0]).reduce((acc, [key, value]) => {
if (key !== 'timestamp' && key !== 'tags') {
acc[key] = {
type: typeof value === 'number' ? 'DOUBLE' :
typeof value === 'boolean' ? 'BOOLEAN' : 'UTF8'
};
}
return acc;
}, {})
});
tags: { type: 'UTF8' }
};

// Add fields with proper types
Object.entries(firstRow)
.filter(([key]) => !['timestamp', 'tags'].includes(key))
.forEach(([key, value]) => {
schemaFields[key] = {
type: typeof value === 'number' ? 'DOUBLE' :
typeof value === 'boolean' ? 'BOOLEAN' : 'UTF8',
optional: true
};
});

// Create new buffer with schema
this.buffers.set(type, {
rows: [],
schema,
isLineProtocol: true // Mark as Line Protocol data
schema: new parquet.ParquetSchema(schemaFields),
isLineProtocol: true
});
}

const buffer = this.buffers.get(type);
buffer.rows.push(...rows);

// Ensure all rows have all fields with proper types
const allFields = new Set();
rows.forEach(row => {
Object.keys(row).forEach(key => {
if (!['timestamp', 'tags'].includes(key)) {
allFields.add(key);
}
});
});

// Add rows with normalized fields
buffer.rows.push(...rows.map(row => {
const normalized = {
timestamp: row.timestamp,
tags: row.tags
};

// Add all fields, using null for missing ones
allFields.forEach(field => {
normalized[field] = row[field] ?? null;
});

return normalized;
}));

if (buffer.rows.length >= this.bufferSize) {
await this.flush(type); // Use the same flush method as HEP
await this.flush(type);
}
}
}

class CompactionManager {
constructor(bufferManager) {
constructor(bufferManager, debug = false) {
this.bufferManager = bufferManager;
this.compactionIntervals = {
'10m': 10 * 60 * 1000,
'1h': 60 * 60 * 1000,
'24h': 24 * 60 * 60 * 1000
};
this.compactionLock = new Map();
this.debug = debug;
}

async initialize() {
Expand Down Expand Up @@ -493,7 +556,9 @@ class CompactionManager {

async checkAndCompact() {
const typeDirs = await this.getTypeDirectories();
console.log('Found types for compaction:', typeDirs);
if (this.debug || typeDirs.length > 0) {
console.log('Found types for compaction:', typeDirs);
}

for (const type of typeDirs) {
if (this.compactionLock.get(type)) {
Expand All @@ -506,28 +571,17 @@ class CompactionManager {
let metadata = await this.bufferManager.getTypeMetadata(type);

if (!metadata.files || !metadata.files.length) {
console.log(`No files found in metadata for type ${type}`);
if (this.debug) console.log(`No files found in metadata for type ${type}`);
continue;
}

// Verify and clean metadata before compaction
metadata = await this.verifyAndCleanMetadata(type, metadata);

if (!metadata.files.length) {
console.log(`No valid files remain after metadata cleanup for type ${type}`);
continue;
if (metadata.files.length > 0) {
console.log(`Type ${type} has ${metadata.files.length} files to consider for compaction`);
await this.compactTimeRange(type, metadata.files, '10m', '1h');
await this.compactTimeRange(type, metadata.files, '1h', '24h');
}

console.log(`Type ${type} has ${metadata.files.length} files to consider for compaction`);
console.log('Files:', metadata.files.map(f => ({
path: f.path,
type: f.type,
min_time: new Date(f.min_time / 1000000).toISOString(),
max_time: new Date(f.max_time / 1000000).toISOString()
})));

await this.compactTimeRange(type, metadata.files, '10m', '1h');
await this.compactTimeRange(type, metadata.files, '1h', '24h');
} catch (error) {
console.error(`Error during compaction for type ${type}:`, error);
} finally {
Expand Down Expand Up @@ -961,24 +1015,29 @@ class CompactionManager {
class HEPServer {
constructor(config = {}) {
this.debug = config.debug || false;
this.queryClient = null; // Add queryClient property
this.queryClient = null;
this.buffer = null;
this.compaction = null;
}

async initialize() {
try {
// Initialize buffer manager
this.buffer = new ParquetBufferManager();
await this.buffer.initialize();

this.compaction = new CompactionManager(this.buffer);

// Initialize compaction manager with debug flag
this.compaction = new CompactionManager(this.buffer, true); // Always show compaction logs
await this.compaction.initialize();

// Initialize query client
this.queryClient = new QueryClient(this.buffer.baseDir);
// Initialize query client with buffer manager
this.queryClient = new QueryClient(this.buffer.baseDir, this.buffer);
await this.queryClient.initialize();


// Start servers
await this.startServers();
} catch (error) {
console.error('Failed to initialize HEPServer:', error);
console.error('Failed to initialize HEP server:', error);
throw error;
}
}
Expand Down Expand Up @@ -1021,7 +1080,17 @@ class HEPServer {
async fetch(req) {
const url = new URL(req.url);

if (url.pathname === '/query') {
if (url.pathname === '/') {
try {
const html = await Bun.file('./index.html').text();
return new Response(html, {
headers: { 'Content-Type': 'text/html' }
});
} catch (error) {
console.error('Error serving index.html:', error);
return new Response('Error loading interface', { status: 500 });
}
} else if (url.pathname === '/query') {
try {
let query;

Expand Down Expand Up @@ -1061,43 +1130,59 @@ class HEPServer {
try {
const body = await req.text();
const lines = body.split('\n').filter(line => line.trim());

const config = {
addTimestamp: true,
typeMappings: [],
defaultTypeMapping: 'float'
};

// Process lines in bulk
const bulkData = new Map(); // measurement -> rows
const bulkData = new Map();

for (const line of lines) {
const parsed = parse(line, config);
const measurement = parsed.measurement;

if (!bulkData.has(measurement)) {
bulkData.set(measurement, []);
try {
const parsed = parse(line, config);
const measurement = parsed.measurement;

if (!bulkData.has(measurement)) {
bulkData.set(measurement, []);
}

// Use let for timestamp since we might need to reassign
let timestamp = new Date(parsed.timestamp);
if (isNaN(timestamp.getTime())) {
console.warn(`Invalid timestamp in line: ${line}, using current time`);
timestamp = new Date();
}

bulkData.get(measurement).push({
timestamp,
tags: JSON.stringify(parsed.tags || {}),
// Convert undefined values to null
...Object.fromEntries(
Object.entries(parsed.fields || {})
.map(([k, v]) => [k, v ?? null])
)
});
} catch (error) {
console.warn(`Error parsing line: ${line}`, error);
continue; // Skip invalid lines
}

bulkData.get(measurement).push({
timestamp: new Date(parsed.timestamp),
tags: JSON.stringify(parsed.tags),
...parsed.fields
});
}

// Bulk insert by measurement
for (const [measurement, rows] of bulkData) {
// console.log(`Writing ${rows.length} rows to measurement ${measurement}`);
await self.buffer.addLineProtocolBulk(measurement, rows);
if (rows.length > 0) {
await self.buffer.addLineProtocolBulk(measurement, rows);
}
}

return new Response(null, { status: 201 });
} catch (error) {
console.error('Write error:', error);
return new Response(error.message, { status: 400 });
}

}

return new Response('Not found', { status: 404 });
Expand Down Expand Up @@ -1132,6 +1217,11 @@ class HEPServer {
async shutdown() {
console.log('Shutting down HEP server...');

// Stop compaction first
if (this.compaction) {
await this.compaction.close();
}

// Stop TCP server
if (this.tcpServer) {
try {
Expand Down
Loading