Skip to content

Commit

Permalink
connectd: listen on ports for which we should spawn a proxy.
Browse files Browse the repository at this point in the history
If the port is set, we spawn it (lightning_websocketd) on any
connection to that port.  That means websocketd is a per-peer daemon,
but it means every other daemon uses the connection normally (it's
just actually talking to websocketd instead of the client directly).

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell authored and cdecker committed Oct 22, 2021
1 parent b013b3a commit f78184c
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 27 deletions.
238 changes: 213 additions & 25 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
#include <connectd/tor.h>
#include <connectd/tor_autoservice.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sodium.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <wire/wire_sync.h>

/*~ We are passed two file descriptors when exec'ed from `lightningd`: the
Expand Down Expand Up @@ -127,6 +131,12 @@ struct daemon {

/* Our features, as lightningd told us */
struct feature_set *our_features;

/* Subdaemon to proxy websocket requests. */
char *websocket_helper;

/* If non-zero, port to listen for websocket connections. */
u16 websocket_port;
};

/* Peers we're trying to reach: we iterate through addrs until we succeed
Expand Down Expand Up @@ -529,40 +539,55 @@ static void conn_timeout(struct io_conn *conn)
io_close(conn);
}

/*~ When we get a connection in we set up its network address then call
* handshake.c to set up the crypto state. */
static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon)
/*~ So, where are you from? */
static bool get_remote_address(struct io_conn *conn,
struct wireaddr_internal *addr)
{
struct wireaddr_internal addr;
struct sockaddr_storage s = {};
socklen_t len = sizeof(s);

/* The cast here is a weird Berkeley sockets API feature... */
if (getpeername(io_conn_fd(conn), (struct sockaddr *)&s, &len) != 0) {
status_debug("Failed to get peername for incoming conn: %s",
strerror(errno));
return io_close(conn);
return false;
}

if (s.ss_family == AF_INET6) {
struct sockaddr_in6 *s6 = (void *)&s;
addr.itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv6(&addr.u.wireaddr,
addr->itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv6(&addr->u.wireaddr,
&s6->sin6_addr, ntohs(s6->sin6_port));
} else if (s.ss_family == AF_INET) {
struct sockaddr_in *s4 = (void *)&s;
addr.itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv4(&addr.u.wireaddr,
addr->itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv4(&addr->u.wireaddr,
&s4->sin_addr, ntohs(s4->sin_port));
} else if (s.ss_family == AF_UNIX) {
struct sockaddr_un *sun = (void *)&s;
addr.itype = ADDR_INTERNAL_SOCKNAME;
memcpy(addr.u.sockname, sun->sun_path, sizeof(sun->sun_path));
addr->itype = ADDR_INTERNAL_SOCKNAME;
memcpy(addr->u.sockname, sun->sun_path, sizeof(sun->sun_path));
} else {
status_broken("Unknown socket type %i for incoming conn",
s.ss_family);
return io_close(conn);
return false;
}
return true;
}

/*~ As so common in C, we need to bundle two args into a callback, so we
* allocate a temporary structure to hold them: */
struct conn_in {
struct wireaddr_internal addr;
struct daemon *daemon;
};

/*~ Once we've got a connection in, we set it up here (whether it's via the
* websocket proxy, or direct). */
static struct io_plan *conn_in(struct io_conn *conn,
struct conn_in *conn_in_arg)
{
struct daemon *daemon = conn_in_arg->daemon;

/* If they don't complete handshake in reasonable time, hang up */
notleak(new_reltimer(&daemon->timers, conn,
Expand All @@ -574,10 +599,122 @@ static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon
* Note, again, the notleak() to avoid our simplistic leak detection
* code from thinking `conn` (which we don't keep a pointer to) is
* leaked */
return responder_handshake(notleak(conn), &daemon->mykey, &addr,
return responder_handshake(notleak(conn), &daemon->mykey,
&conn_in_arg->addr,
handshake_in_success, daemon);
}

/*~ When we get a direct connection in we set up its network address
* then call handshake.c to set up the crypto state. */
static struct io_plan *connection_in(struct io_conn *conn,
struct daemon *daemon)
{
struct conn_in conn_in_arg;

if (!get_remote_address(conn, &conn_in_arg.addr))
return io_close(conn);

conn_in_arg.daemon = daemon;
return conn_in(conn, &conn_in_arg);
}

/*~ <hello>I speak web socket</hello>.
*
* Actually that's dumb, websocket (aka rfc6455) looks nothing like that. */
static struct io_plan *websocket_connection_in(struct io_conn *conn,
struct daemon *daemon)
{
int childmsg[2], execfail[2];
pid_t childpid;
int err;
struct conn_in conn_in_arg;

if (!get_remote_address(conn, &conn_in_arg.addr))
return io_close(conn);

status_debug("Websocket connection in from %s",
type_to_string(tmpctx, struct wireaddr_internal,
&conn_in_arg.addr));

if (socketpair(AF_LOCAL, SOCK_STREAM, 0, childmsg) != 0)
goto fail;

if (pipe(execfail) != 0)
goto close_msgfd_fail;

if (fcntl(execfail[1], F_SETFD, fcntl(execfail[1], F_GETFD)
| FD_CLOEXEC) < 0)
goto close_execfail_fail;

childpid = fork();
if (childpid < 0)
goto close_execfail_fail;

if (childpid == 0) {
size_t max;
close(childmsg[0]);
close(execfail[0]);

/* Attach remote socket to stdin. */
if (dup2(io_conn_fd(conn), STDIN_FILENO) == -1)
goto child_errno_fail;

/* Attach our socket to stdout. */
if (dup2(childmsg[1], STDOUT_FILENO) == -1)
goto child_errno_fail;

/* Make (fairly!) sure all other fds are closed. */
max = sysconf(_SC_OPEN_MAX);
for (size_t i = STDERR_FILENO + 1; i < max; i++)
close(i);

/* Tell websocket helper what we read so far. */
execlp(daemon->websocket_helper, daemon->websocket_helper,
NULL);

child_errno_fail:
err = errno;
/* Gcc's warn-unused-result fail. */
if (write(execfail[1], &err, sizeof(err))) {
;
}
exit(127);
}

close(childmsg[1]);
close(execfail[1]);

/* Child will close this without writing on successful exec. */
if (read(execfail[0], &err, sizeof(err)) == sizeof(err)) {
close(execfail[0]);
waitpid(childpid, NULL, 0);
status_broken("Exec of helper %s failed: %s",
daemon->websocket_helper, strerror(err));
errno = err;
return io_close(conn);
}

close(execfail[0]);

/* New connection actually talks to proxy process. */
conn_in_arg.daemon = daemon;
io_new_conn(tal_parent(conn), childmsg[0], conn_in, &conn_in_arg);

/* Abandon original (doesn't close since child has dup'd fd) */
return io_close(conn);

close_execfail_fail:
close_noerr(execfail[0]);
close_noerr(execfail[1]);
close_msgfd_fail:
close_noerr(childmsg[0]);
close_noerr(childmsg[1]);
fail:
status_broken("Preparation of helper failed: %s",
strerror(errno));
return io_close(conn);
}

/*~ These are the mirror functions for the connecting-out case. */
static struct io_plan *handshake_out_success(struct io_conn *conn,
const struct pubkey *key,
Expand Down Expand Up @@ -906,16 +1043,22 @@ struct listen_fd {
* covers IPv4 too. Normally we'd consider failing to listen on a
* port to be fatal, so we note this when setting up addresses. */
bool mayfail;
/* Callback to use for the listening: either connection_in, or for
* our much-derided WebSocket ability, websocket_connection_in! */
struct io_plan *(*in_cb)(struct io_conn *conn, struct daemon *daemon);
};

static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail)
static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail,
struct io_plan *(*in_cb)(struct io_conn *,
struct daemon *))
{
/*~ utils.h contains a convenience macro tal_arr_expand which
* reallocates a tal_arr to make it one longer, then returns a pointer
* to the (new) last element. */
struct listen_fd l;
l.fd = fd;
l.mayfail = mayfail;
l.in_cb = in_cb;
tal_arr_expand(&daemon->listen_fds, l);
}

Expand Down Expand Up @@ -970,11 +1113,18 @@ static int make_listen_fd(int domain, void *addr, socklen_t len, bool mayfail)
/* Return true if it created socket successfully. */
static bool handle_wireaddr_listen(struct daemon *daemon,
const struct wireaddr *wireaddr,
bool mayfail)
bool mayfail,
bool websocket)
{
int fd;
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
struct io_plan *(*in_cb)(struct io_conn *, struct daemon *);

if (websocket)
in_cb = websocket_connection_in;
else
in_cb = connection_in;

/* Note the use of a switch() over enum here, even though it must be
* IPv4 or IPv6 here; that will catch future changes. */
Expand All @@ -984,19 +1134,21 @@ static bool handle_wireaddr_listen(struct daemon *daemon,
/* We might fail if IPv6 bound to port first */
fd = make_listen_fd(AF_INET, &addr, sizeof(addr), mayfail);
if (fd >= 0) {
status_debug("Created IPv4 listener on port %u",
status_debug("Created IPv4 %slistener on port %u",
websocket ? "websocket ": "",
wireaddr->port);
add_listen_fd(daemon, fd, mayfail);
add_listen_fd(daemon, fd, mayfail, in_cb);
return true;
}
return false;
case ADDR_TYPE_IPV6:
wireaddr_to_ipv6(wireaddr, &addr6);
fd = make_listen_fd(AF_INET6, &addr6, sizeof(addr6), mayfail);
if (fd >= 0) {
status_debug("Created IPv6 listener on port %u",
status_debug("Created IPv6 %slistener on port %u",
websocket ? "websocket ": "",
wireaddr->port);
add_listen_fd(daemon, fd, mayfail);
add_listen_fd(daemon, fd, mayfail, in_cb);
return true;
}
return false;
Expand Down Expand Up @@ -1122,7 +1274,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
false);
status_debug("Created socket listener on file %s",
addrun.sun_path);
add_listen_fd(daemon, fd, false);
add_listen_fd(daemon, fd, false, connection_in);
/* We don't announce socket names, though we allow
* them to lazily specify --addr=/socket. */
add_binding(&binding, &wa);
Expand All @@ -1147,7 +1299,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
sizeof(wa.u.wireaddr.addr));

ipv6_ok = handle_wireaddr_listen(daemon, &wa.u.wireaddr,
true);
true, false);
if (ipv6_ok) {
add_binding(&binding, &wa);
if (announce
Expand All @@ -1163,7 +1315,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
sizeof(wa.u.wireaddr.addr));
/* OK if this fails, as long as one succeeds! */
if (handle_wireaddr_listen(daemon, &wa.u.wireaddr,
ipv6_ok)) {
ipv6_ok, false)) {
add_binding(&binding, &wa);
if (announce
&& public_address(daemon, &wa.u.wireaddr))
Expand All @@ -1174,7 +1326,8 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
}
/* This is a vanilla wireaddr as per BOLT #7 */
case ADDR_INTERNAL_WIREADDR:
handle_wireaddr_listen(daemon, &wa.u.wireaddr, false);
handle_wireaddr_listen(daemon, &wa.u.wireaddr,
false, false);
add_binding(&binding, &wa);
if (announce && public_address(daemon, &wa.u.wireaddr))
add_announcable(announcable, &wa.u.wireaddr);
Expand All @@ -1188,6 +1341,38 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
proposed_wireaddr[i].itype);
}

/* If we want websockets to match IPv4/v6, set it up now. */
if (daemon->websocket_port) {
bool announced_some = false;
struct wireaddr addr;

for (size_t i = 0; i < tal_count(binding); i++) {
/* Ignore UNIX sockets */
if (binding[i].itype != ADDR_INTERNAL_WIREADDR)
continue;

/* Override with websocket port */
addr = binding[i].u.wireaddr;
addr.port = daemon->websocket_port;
handle_wireaddr_listen(daemon, &addr, false, true);
announced_some = true;
/* FIXME: We don't report these bindings to
* lightningd, so they don't appear in
* getinfo. */
}


/* We add the websocket port to the announcement if it
* applies to any */
if (announced_some) {
wireaddr_from_websocket(&addr, daemon->websocket_port);
add_announcable(announcable, &addr);
}
}

/* FIXME: Websocket over Tor (difficult for autotor, since we need
* to use the same onion addr!) */

/* Now we have bindings, set up any Tor auto addresses: we will point
* it at the first bound IPv4 or IPv6 address we have. */
for (size_t i = 0; i < tal_count(proposed_wireaddr); i++) {
Expand Down Expand Up @@ -1294,7 +1479,9 @@ static struct io_plan *connect_init(struct io_conn *conn,
&daemon->dev_allow_localhost, &daemon->use_dns,
&tor_password,
&daemon->use_v3_autotor,
&daemon->timeout_secs)) {
&daemon->timeout_secs,
&daemon->websocket_helper,
&daemon->websocket_port)) {
/* This is a helper which prints the type expected and the actual
* message, then exits (it should never be called!). */
master_badmsg(WIRE_CONNECTD_INIT, msg);
Expand Down Expand Up @@ -1367,7 +1554,8 @@ static struct io_plan *connect_activate(struct io_conn *conn,
}
notleak(io_new_listener(daemon,
daemon->listen_fds[i].fd,
connection_in, daemon));
daemon->listen_fds[i].in_cb,
daemon));
}
}
/* Free, with NULL assignment just as an extra sanity check. */
Expand Down
2 changes: 2 additions & 0 deletions connectd/connectd_wire.csv
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ msgdata,connectd_init,use_dns,bool,
msgdata,connectd_init,tor_password,wirestring,
msgdata,connectd_init,use_v3_autotor,bool,
msgdata,connectd_init,timeout_secs,u32,
msgdata,connectd_init,websocket_helper,wirestring,
msgdata,connectd_init,websocket_port,u16,

# Connectd->master, here are the addresses I bound, can announce.
msgtype,connectd_init_reply,2100
Expand Down
Loading

0 comments on commit f78184c

Please sign in to comment.