-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.js
153 lines (131 loc) · 3.77 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
const express = require('express')
const cors = require('cors')
const { Client } = require('cassandra-driver')
const fs = require('fs').promises
const cron = require('node-cron')
const { exec } = require('child_process')
const { promisify } = require('util')
const execAsync = promisify(exec)
const app = express()
app.use(express.json())
app.use(cors())
// Configuration Cassandra sans keyspace initial
const client = new Client({
contactPoints: ['127.0.0.1'],
localDataCenter: 'datacenter1',
})
// Création du schéma
async function setupDatabase() {
try {
// Création du keyspace
await client.execute(`
CREATE KEYSPACE IF NOT EXISTS etl_data
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
`)
// Connexion au keyspace
await client.execute('USE etl_data')
// Suppression de la table si elle existe
await client.execute('DROP TABLE IF EXISTS articles')
// Création de la table
await client.execute(`
CREATE TABLE IF NOT EXISTS articles (
id text PRIMARY KEY,
title text,
url text,
domain text,
word_count int,
title_length int,
category text,
is_tech boolean,
processed_at timestamp
)
`)
console.log('Base de données initialisée avec succès')
} catch (error) {
console.error(
"Erreur lors de l'initialisation de la base de données:",
error
)
throw error
}
}
// Chargement des données
async function loadData() {
try {
const data = JSON.parse(await fs.readFile('transformed_data.json', 'utf8'))
const query =
'INSERT INTO articles (id, title, url, domain, word_count, title_length, category, is_tech, processed_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'
for (const item of data) {
await client.execute(
query,
[
item.id,
item.title,
item.url,
item.domain,
item.word_count,
item.title_length,
item.category,
item.is_tech,
new Date(item.processed_at),
],
{ prepare: true }
)
}
console.log('Données chargées avec succès')
} catch (error) {
console.error('Erreur lors du chargement des données:', error)
throw error
}
}
// Routes API
app.get('/articles', async (req, res) => {
try {
const result = await client.execute('SELECT * FROM articles')
res.json(result.rows)
} catch (error) {
console.error('Erreur lors de la récupération des articles:', error)
res.status(500).json({ error: 'Erreur serveur' })
}
})
// Pipeline ETL complet
async function runETLPipeline() {
try {
console.log('Démarrage du pipeline ETL...')
// Extraction
console.log('1. Extraction...')
await execAsync('npm run scrape')
// Transformation
console.log('2. Transformation...')
await execAsync('python transform.py')
// Chargement
console.log('3. Chargement...')
await loadData()
console.log('Pipeline ETL terminé avec succès!')
} catch (error) {
console.error('Erreur dans le pipeline ETL:', error)
}
}
// Initialisation avec CRON
async function init() {
try {
await client.connect()
console.log('Connecté à Cassandra')
await setupDatabase()
// Premier run du pipeline
await runETLPipeline()
// Configuration du CRON pour exécuter le pipeline toutes les heures
cron.schedule('1 * * * *', () => {
console.log('Exécution programmée du pipeline ETL')
runETLPipeline()
})
app.listen(3000, () => {
console.log('Serveur démarré sur http://localhost:3000')
console.log('Le pipeline ETL s\'exécutera toutes les minutes')
})
} catch (error) {
console.error("Erreur lors de l'initialisation:", error)
process.exit(1)
}
}
init()