Skip to content

Commit

Permalink
lnrpc: add filters to forwardhistoryrequest
Browse files Browse the repository at this point in the history
This commit adds incoming and outgoing channel ids filter to forwarding history request to filter events received/forwarded from/to a particular channel
  • Loading branch information
funyug committed Dec 13, 2024
1 parent 6298f76 commit 21b7c97
Show file tree
Hide file tree
Showing 7 changed files with 1,068 additions and 799 deletions.
58 changes: 52 additions & 6 deletions channeldb/forwarding_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
Expand Down Expand Up @@ -200,6 +201,16 @@ type ForwardingEventQuery struct {

// NumMaxEvents is the max number of events to return.
NumMaxEvents uint32

// IncomingChanIds is the list of channels to filter HTLCs being
// received from a particular channel.
// If the list is empty, then it is ignored.
IncomingChanIDs fn.Set[uint64]

// OutgoingChanIds is the list of channels to filter HTLCs being
// forwarded to a particular channel.
// If the list is empty, then it is ignored.
OutgoingChanIDs fn.Set[uint64]
}

// ForwardingLogTimeSlice is the response to a forwarding query. It includes
Expand Down Expand Up @@ -264,9 +275,12 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e
return nil
}

// If we're not yet past the user defined offset, then
// If no incoming or outgoing channel IDs were provided
// and we're not yet past the user defined offset, then
// we'll continue to seek forward.
if recordsToSkip > 0 {
if recordsToSkip > 0 &&
q.IncomingChanIDs.IsEmpty() &&
q.OutgoingChanIDs.IsEmpty() {
recordsToSkip--
continue
}
Expand All @@ -287,10 +301,42 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e
return err
}

event.Timestamp = currentTime
resp.ForwardingEvents = append(resp.ForwardingEvents, event)

recordOffset++
// Check if the incoming channel ID matches the
// filter criteria.
// Either no filtering is applied (IsEmpty), or
// the ID is explicitly included.
incomingMatch := q.IncomingChanIDs.IsEmpty() ||
q.IncomingChanIDs.Contains(
event.IncomingChanID.ToUint64(),
)

// Check if the outgoing channel ID matches the
// filter criteria.
// Either no filtering is applied (IsEmpty), or
// the ID is explicitly included.
outgoingMatch := q.OutgoingChanIDs.IsEmpty() ||
q.OutgoingChanIDs.Contains(
event.OutgoingChanID.ToUint64(),
)

// If both conditions are met, then we'll add
// the event to our return payload.
if incomingMatch && outgoingMatch {
// If we're not yet past the user
// defined offset , then we'll continue
// to seek forward.
if recordsToSkip > 0 {
recordsToSkip--
continue
}

event.Timestamp = currentTime
resp.ForwardingEvents = append(
resp.ForwardingEvents,
event,
)
recordOffset++
}
}
}

Expand Down
129 changes: 129 additions & 0 deletions channeldb/forwarding_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -360,3 +361,131 @@ func TestForwardingLogStoreEvent(t *testing.T) {
}
}
}

func TestForwardingLogQueryIncomingChanIDs(t *testing.T) {
t.Parallel()

// Set up a test database.
db, err := MakeTestDB(t)
require.NoError(t, err, "unable to make test db")

log := ForwardingLog{
db: db,
}

initialTime := time.Unix(1234, 0)
endTime := time.Unix(1234, 0)

// Create 10 random events with varying incoming ChanIDs.
numEvents := 10
events := make([]ForwardingEvent, numEvents)
incomingChanIDs := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(2001),
lnwire.NewShortChanIDFromInt(2002),
lnwire.NewShortChanIDFromInt(2003),
}

for i := 0; i < numEvents; i++ {
events[i] = ForwardingEvent{
Timestamp: endTime,
IncomingChanID: incomingChanIDs[i%len(incomingChanIDs)],
AmtIn: lnwire.MilliSatoshi(rand.Int63()),
AmtOut: lnwire.MilliSatoshi(rand.Int63()),
}
endTime = endTime.Add(time.Minute * 10)
}

// Add events to the database.
require.NoError(t, log.AddForwardingEvents(events),
"unable to add events")

// Query with multiple incoming channel IDs.
eventQuery := ForwardingEventQuery{
StartTime: initialTime,
EndTime: endTime,
IncomingChanIDs: fn.NewSet(incomingChanIDs[0].ToUint64(),
incomingChanIDs[1].ToUint64()),
IndexOffset: 0,
NumMaxEvents: 10,
}
timeSlice, err := log.Query(eventQuery)
require.NoError(t, err, "unable to query for events")

// Verify that only events with the specified incomingChanIDs are
// returned.
expectedEvents := []ForwardingEvent{}
for _, e := range events {
if e.IncomingChanID == incomingChanIDs[0] ||
e.IncomingChanID == incomingChanIDs[1] {

expectedEvents = append(expectedEvents, e)
}
}

require.Equal(t, expectedEvents, timeSlice.ForwardingEvents,
"unexpected events returned")
}

func TestForwardingLogQueryOutgoingChanIDs(t *testing.T) {
t.Parallel()

// Set up a test database.
db, err := MakeTestDB(t)
require.NoError(t, err, "unable to make test db")

log := ForwardingLog{
db: db,
}

initialTime := time.Unix(1234, 0)
endTime := time.Unix(1234, 0)

// Create 10 random events with varying outgoing ChanIDs.
numEvents := 10
events := make([]ForwardingEvent, numEvents)
outgoingChanIDs := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(2001),
lnwire.NewShortChanIDFromInt(2002),
lnwire.NewShortChanIDFromInt(2003),
}

for i := 0; i < numEvents; i++ {
events[i] = ForwardingEvent{
Timestamp: endTime,
OutgoingChanID: outgoingChanIDs[i%len(outgoingChanIDs)],
AmtIn: lnwire.MilliSatoshi(rand.Int63()),
AmtOut: lnwire.MilliSatoshi(rand.Int63()),
}
endTime = endTime.Add(time.Minute * 10)
}

// Add events to the database.
require.NoError(t, log.AddForwardingEvents(events),
"unable to add events")

// Query with multiple outgoing channel IDs.
eventQuery := ForwardingEventQuery{
StartTime: initialTime,
EndTime: endTime,
OutgoingChanIDs: fn.NewSet(outgoingChanIDs[0].ToUint64(),
outgoingChanIDs[1].ToUint64()),
IndexOffset: 0,
NumMaxEvents: 10,
}
timeSlice, err := log.Query(eventQuery)
require.NoError(t, err, "unable to query for events")

// Verify that only events with the specified outgoingChanIDs are
// returned.
expectedEvents := []ForwardingEvent{}
for _, e := range events {
if e.OutgoingChanID == outgoingChanIDs[0] ||
e.OutgoingChanID == outgoingChanIDs[1] {

expectedEvents = append(expectedEvents, e)
}
}

require.Equal(t, expectedEvents, timeSlice.ForwardingEvents,
"unexpected events returned")
}
37 changes: 33 additions & 4 deletions cmd/commands/cmd_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,10 +1470,11 @@ func listPayments(ctx *cli.Context) error {
}

var forwardingHistoryCommand = cli.Command{
Name: "fwdinghistory",
Category: "Payments",
Usage: "Query the history of all forwarded HTLCs.",
ArgsUsage: "start_time [end_time] [index_offset] [max_events]",
Name: "fwdinghistory",
Category: "Payments",
Usage: "Query the history of all forwarded HTLCs.",
ArgsUsage: "start_time [end_time] [index_offset] [max_events] " +
"[incoming_channel_ids] [outgoing_channel_ids]",
Description: `
Query the HTLC switch's internal forwarding log for all completed
payment circuits (HTLCs) over a particular time range (--start_time and
Expand All @@ -1488,6 +1489,9 @@ var forwardingHistoryCommand = cli.Command{
The max number of events returned is 50k. The default number is 100,
callers can use the --max_events param to modify this value.
Incoming and outgoing channel IDs can be provided to further filter
the events. If not provided, all events will be returned.
Finally, callers can skip a series of events using the --index_offset
parameter. Each response will contain the offset index of the last
entry. Using this callers can manually paginate within a time slice.
Expand Down Expand Up @@ -1516,6 +1520,16 @@ var forwardingHistoryCommand = cli.Command{
Usage: "skip the peer alias lookup per forwarding " +
"event in order to improve performance",
},
cli.Int64SliceFlag{
Name: "incoming_chan_ids",
Usage: "the short channel ids of the incoming " +
"channels to filter events by",
},
cli.Int64SliceFlag{
Name: "outgoing_chan_ids",
Usage: "the short channel ids of the outgoing " +
"channels to filter events by",
},
},
Action: actionDecorator(forwardingHistory),
}
Expand Down Expand Up @@ -1596,6 +1610,21 @@ func forwardingHistory(ctx *cli.Context) error {
NumMaxEvents: maxEvents,
PeerAliasLookup: lookupPeerAlias,
}
outChan := ctx.Int64Slice("outgoing_chan_ids")
if len(outChan) != 0 {
req.OutgoingChanIds = make([]uint64, len(outChan))
for i, c := range outChan {
req.OutgoingChanIds[i] = uint64(c)
}
}

inChan := ctx.Int64Slice("incoming_chan_ids")
if len(inChan) != 0 {
req.IncomingChanIds = make([]uint64, len(inChan))
for i, c := range inChan {
req.IncomingChanIds[i] = uint64(c)
}
}
resp, err := client.ForwardingHistory(ctxc, req)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 21b7c97

Please sign in to comment.