-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpopkiller.js
103 lines (78 loc) · 2.33 KB
/
popkiller.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
'use strict';
const config = require('./config');
const SMTPServer = require('smtp-server').SMTPServer;
const simpleParser = require('mailparser').simpleParser;
const amqp = require('amqplib');
const BJSON = require('buffer-json');
const server = new SMTPServer({
logger: false,
banner: 'POPKiller smtp server',
size: 10 * 1024 * 1024,
// No auth
authOptional: true,
onAuth (auth, session, callback) {
return callback(new Error('No auth here'), null);
},
onRcptTo (address, session, callback) {
if (!getQueueForAddress(address.address)) {
return callback(new Error('Not allowed'));
}
callback();
},
async onData (stream, session, callback) {
const toAddress = session.envelope.rcptTo[0].address;
const queue = getQueueForAddress(toAddress);
await broker.channel.assertQueue(queue, { durable: true });
const parsed = await simpleParser(stream);
parsed.tags = parseAddress(toAddress).tags;
await broker.channel.sendToQueue(queue, Buffer.from(BJSON.stringify(parsed)));
return callback(null, 'Message queued as ' + Date.now());
}
});
var broker = {
connection: null,
channel: null
};
server.on('error', err => {
console.log('Error occurred');
console.log(err);
});
function getQueueForAddress (address) {
const pa = parseAddress(address);
if (config.routes[pa.canonical] !== undefined) {
return config.routes[pa.canonical] || pa.canonical;
}
if (config.routes[pa.domain] === undefined) {
return null;
}
return config.routes[pa.domain] || pa.domain;
}
function parseAddress (address) {
const [localPart, domain] = address.split('@');
const user = localPart.split('+')[0];
const tags = localPart.split('+').slice(1);
const canonical = user + '@' + domain;
return {
user: user,
tags: tags,
domain: domain,
canonical: canonical
};
}
async function start () {
try {
broker.connection = await amqp.connect(config.broker_url);
broker.channel = await broker.connection.createChannel();
} catch (error) {
console.error('Error connecting to broker: %s', error);
throw error;
}
return server.listen(config.server.port, config.server.host);
}
async function stop (callback) {
await broker.channel.close();
await broker.connection.close();
return server.close(callback);
}
exports.start = start;
exports.stop = stop;