Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lnrpc: add incoming/outgoing channel ids filter to forwarding history request #9356

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions channeldb/forwarding_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
Expand Down Expand Up @@ -200,6 +201,16 @@ type ForwardingEventQuery struct {

// NumMaxEvents is the max number of events to return.
NumMaxEvents uint32

// IncomingChanIds is the list of channels to filter HTLCs being
// received from a particular channel.
// If the list is empty, then it is ignored.
IncomingChanIDs fn.Set[uint64]

// OutgoingChanIds is the list of channels to filter HTLCs being
// forwarded to a particular channel.
// If the list is empty, then it is ignored.
OutgoingChanIDs fn.Set[uint64]
}

// ForwardingLogTimeSlice is the response to a forwarding query. It includes
Expand Down Expand Up @@ -264,9 +275,12 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e
return nil
}

// If we're not yet past the user defined offset, then
// If no incoming or outgoing channel IDs were provided
// and we're not yet past the user defined offset, then
// we'll continue to seek forward.
if recordsToSkip > 0 {
if recordsToSkip > 0 &&
q.IncomingChanIDs.IsEmpty() &&
q.OutgoingChanIDs.IsEmpty() {
recordsToSkip--
continue
}
Expand All @@ -287,10 +301,42 @@ func (f *ForwardingLog) Query(q ForwardingEventQuery) (ForwardingLogTimeSlice, e
return err
}

event.Timestamp = currentTime
resp.ForwardingEvents = append(resp.ForwardingEvents, event)

recordOffset++
// Check if the incoming channel ID matches the
// filter criteria.
// Either no filtering is applied (IsEmpty), or
// the ID is explicitly included.
incomingMatch := q.IncomingChanIDs.IsEmpty() ||
q.IncomingChanIDs.Contains(
event.IncomingChanID.ToUint64(),
)

// Check if the outgoing channel ID matches the
// filter criteria.
// Either no filtering is applied (IsEmpty), or
// the ID is explicitly included.
outgoingMatch := q.OutgoingChanIDs.IsEmpty() ||
q.OutgoingChanIDs.Contains(
event.OutgoingChanID.ToUint64(),
)

// If both conditions are met, then we'll add
// the event to our return payload.
if incomingMatch && outgoingMatch {
// If we're not yet past the user
// defined offset , then we'll continue
// to seek forward.
if recordsToSkip > 0 {
recordsToSkip--
continue
}

event.Timestamp = currentTime
resp.ForwardingEvents = append(
resp.ForwardingEvents,
event,
)
recordOffset++
}
}
}

Expand Down
129 changes: 129 additions & 0 deletions channeldb/forwarding_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -360,3 +361,131 @@ func TestForwardingLogStoreEvent(t *testing.T) {
}
}
}

func TestForwardingLogQueryIncomingChanIDs(t *testing.T) {
t.Parallel()

// Set up a test database.
db, err := MakeTestDB(t)
require.NoError(t, err, "unable to make test db")

log := ForwardingLog{
db: db,
}

initialTime := time.Unix(1234, 0)
endTime := time.Unix(1234, 0)

// Create 10 random events with varying incoming ChanIDs.
numEvents := 10
events := make([]ForwardingEvent, numEvents)
incomingChanIDs := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(2001),
lnwire.NewShortChanIDFromInt(2002),
lnwire.NewShortChanIDFromInt(2003),
}

for i := 0; i < numEvents; i++ {
events[i] = ForwardingEvent{
Timestamp: endTime,
IncomingChanID: incomingChanIDs[i%len(incomingChanIDs)],
AmtIn: lnwire.MilliSatoshi(rand.Int63()),
AmtOut: lnwire.MilliSatoshi(rand.Int63()),
}
endTime = endTime.Add(time.Minute * 10)
}

// Add events to the database.
require.NoError(t, log.AddForwardingEvents(events),
"unable to add events")

// Query with multiple incoming channel IDs.
eventQuery := ForwardingEventQuery{
StartTime: initialTime,
EndTime: endTime,
IncomingChanIDs: fn.NewSet(incomingChanIDs[0].ToUint64(),
incomingChanIDs[1].ToUint64()),
IndexOffset: 0,
NumMaxEvents: 10,
}
timeSlice, err := log.Query(eventQuery)
require.NoError(t, err, "unable to query for events")

// Verify that only events with the specified incomingChanIDs are
// returned.
expectedEvents := []ForwardingEvent{}
for _, e := range events {
if e.IncomingChanID == incomingChanIDs[0] ||
e.IncomingChanID == incomingChanIDs[1] {

expectedEvents = append(expectedEvents, e)
}
}

require.Equal(t, expectedEvents, timeSlice.ForwardingEvents,
"unexpected events returned")
}

func TestForwardingLogQueryOutgoingChanIDs(t *testing.T) {
t.Parallel()

// Set up a test database.
db, err := MakeTestDB(t)
require.NoError(t, err, "unable to make test db")

log := ForwardingLog{
db: db,
}

initialTime := time.Unix(1234, 0)
endTime := time.Unix(1234, 0)

// Create 10 random events with varying outgoing ChanIDs.
numEvents := 10
events := make([]ForwardingEvent, numEvents)
outgoingChanIDs := []lnwire.ShortChannelID{
lnwire.NewShortChanIDFromInt(2001),
lnwire.NewShortChanIDFromInt(2002),
lnwire.NewShortChanIDFromInt(2003),
}

for i := 0; i < numEvents; i++ {
events[i] = ForwardingEvent{
Timestamp: endTime,
OutgoingChanID: outgoingChanIDs[i%len(outgoingChanIDs)],
AmtIn: lnwire.MilliSatoshi(rand.Int63()),
AmtOut: lnwire.MilliSatoshi(rand.Int63()),
}
endTime = endTime.Add(time.Minute * 10)
}

// Add events to the database.
require.NoError(t, log.AddForwardingEvents(events),
"unable to add events")

// Query with multiple outgoing channel IDs.
eventQuery := ForwardingEventQuery{
StartTime: initialTime,
EndTime: endTime,
OutgoingChanIDs: fn.NewSet(outgoingChanIDs[0].ToUint64(),
outgoingChanIDs[1].ToUint64()),
IndexOffset: 0,
NumMaxEvents: 10,
}
timeSlice, err := log.Query(eventQuery)
require.NoError(t, err, "unable to query for events")

// Verify that only events with the specified outgoingChanIDs are
// returned.
expectedEvents := []ForwardingEvent{}
for _, e := range events {
if e.OutgoingChanID == outgoingChanIDs[0] ||
e.OutgoingChanID == outgoingChanIDs[1] {

expectedEvents = append(expectedEvents, e)
}
}

require.Equal(t, expectedEvents, timeSlice.ForwardingEvents,
"unexpected events returned")
}
37 changes: 33 additions & 4 deletions cmd/commands/cmd_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,10 +1470,11 @@ func listPayments(ctx *cli.Context) error {
}

var forwardingHistoryCommand = cli.Command{
Name: "fwdinghistory",
Category: "Payments",
Usage: "Query the history of all forwarded HTLCs.",
ArgsUsage: "start_time [end_time] [index_offset] [max_events]",
Name: "fwdinghistory",
Category: "Payments",
Usage: "Query the history of all forwarded HTLCs.",
ArgsUsage: "start_time [end_time] [index_offset] [max_events] " +
"[incoming_channel_ids] [outgoing_channel_ids]",
Description: `
Query the HTLC switch's internal forwarding log for all completed
payment circuits (HTLCs) over a particular time range (--start_time and
Expand All @@ -1488,6 +1489,9 @@ var forwardingHistoryCommand = cli.Command{
The max number of events returned is 50k. The default number is 100,
callers can use the --max_events param to modify this value.

Incoming and outgoing channel IDs can be provided to further filter
the events. If not provided, all events will be returned.

Finally, callers can skip a series of events using the --index_offset
parameter. Each response will contain the offset index of the last
entry. Using this callers can manually paginate within a time slice.
Expand Down Expand Up @@ -1516,6 +1520,16 @@ var forwardingHistoryCommand = cli.Command{
Usage: "skip the peer alias lookup per forwarding " +
"event in order to improve performance",
},
cli.Int64SliceFlag{
Name: "incoming_chan_ids",
Usage: "the short channel ids of the incoming " +
"channels to filter events by",
},
cli.Int64SliceFlag{
Name: "outgoing_chan_ids",
Usage: "the short channel ids of the outgoing " +
"channels to filter events by",
},
},
Action: actionDecorator(forwardingHistory),
}
Expand Down Expand Up @@ -1596,6 +1610,21 @@ func forwardingHistory(ctx *cli.Context) error {
NumMaxEvents: maxEvents,
PeerAliasLookup: lookupPeerAlias,
}
outChan := ctx.Int64Slice("outgoing_chan_ids")
if len(outChan) != 0 {
req.OutgoingChanIds = make([]uint64, len(outChan))
for i, c := range outChan {
req.OutgoingChanIds[i] = uint64(c)
}
}

inChan := ctx.Int64Slice("incoming_chan_ids")
if len(inChan) != 0 {
req.IncomingChanIds = make([]uint64, len(inChan))
for i, c := range inChan {
req.IncomingChanIds[i] = uint64(c)
}
}
resp, err := client.ForwardingHistory(ctxc, req)
if err != nil {
return err
Expand Down
Loading
Loading