Skip to content

Commit

Permalink
use same logic for allegris and generic flight search
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed Nov 22, 2024
1 parent 60af11e commit bea95c0
Show file tree
Hide file tree
Showing 15 changed files with 641 additions and 344 deletions.
6 changes: 0 additions & 6 deletions go/api/data/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,6 @@ type Aircraft struct {
Configurations common.Set[string] `json:"configurations"`
}

type RouteAndRange struct {
DepartureAirport string `json:"departureAirport"`
ArrivalAirport string `json:"arrivalAirport"`
Range xtime.LocalDateRange `json:"range"`
}

type MinimalS3Client interface {
adapt.S3Getter
adapt.S3Lister
Expand Down
57 changes: 25 additions & 32 deletions go/api/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"github.com/explore-flights/monorepo/go/common/xiter"
"iter"
"maps"
"slices"
)

func (h *Handler) QuerySchedules(ctx context.Context, opts ...QueryScheduleOption) (map[common.FlightNumber][]RouteAndRange, error) {
func (h *Handler) QuerySchedules(ctx context.Context, opts ...QueryScheduleOption) (map[common.FlightNumber]*common.FlightSchedule, error) {
var o querySchedulesOptions
if err := o.apply(opts...); err != nil {
return nil, err
Expand All @@ -32,28 +31,24 @@ func (h *Handler) QuerySchedules(ctx context.Context, opts ...QueryScheduleOptio
airlineCount = len(o.airlines)
}

accumulate := func(acc map[common.FlightNumber][]RouteAndRange, fn common.FlightNumber, rr RouteAndRange) {
if existingRRs, ok := acc[fn]; ok {
idx := slices.IndexFunc(existingRRs, func(existingRR RouteAndRange) bool {
return existingRR.DepartureAirport == rr.DepartureAirport && existingRR.ArrivalAirport == rr.ArrivalAirport
})

if idx == -1 {
acc[fn] = append(acc[fn], rr)
accumulate := func(acc map[common.FlightNumber]*common.FlightSchedule, fn common.FlightNumber, fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) {
if existingFs, ok := acc[fn]; ok {
if existingFsv, ok := existingFs.Variant(fsv.Data); ok {
existingFsv.Ranges = existingFsv.Ranges.ExpandAll(fsv.Ranges)
} else {
existingRRs[idx].Range[0] = min(existingRRs[idx].Range[0], rr.Range[0])
existingRRs[idx].Range[1] = max(existingRRs[idx].Range[1], rr.Range[1])
existingFs.Variants = append(existingFs.Variants, fsv.Clone(true))
}
} else {
acc[fn] = append(acc[fn], rr)
acc[fn] = fs.Clone(false)
acc[fn].Variants = append(acc[fn].Variants, fsv.Clone(true))
}
}

wg := concurrent.WorkGroup[common.AirlineIdentifier, map[common.FlightNumber][]RouteAndRange, map[common.FlightNumber][]RouteAndRange]{
wg := concurrent.WorkGroup[common.AirlineIdentifier, map[common.FlightNumber]*common.FlightSchedule, map[common.FlightNumber]*common.FlightSchedule]{
Parallelism: min(uint(airlineCount), 10),
Worker: func(ctx context.Context, airline common.AirlineIdentifier, acc map[common.FlightNumber][]RouteAndRange) (map[common.FlightNumber][]RouteAndRange, error) {
Worker: func(ctx context.Context, airline common.AirlineIdentifier, acc map[common.FlightNumber]*common.FlightSchedule) (map[common.FlightNumber]*common.FlightSchedule, error) {
if acc == nil {
acc = make(map[common.FlightNumber][]RouteAndRange)
acc = make(map[common.FlightNumber]*common.FlightSchedule)
}

return acc, h.flightSchedulesStream(ctx, airline, func(seq iter.Seq2[string, *onceIter[*common.FlightSchedule]]) error {
Expand All @@ -63,47 +58,45 @@ func (h *Handler) QuerySchedules(ctx context.Context, opts ...QueryScheduleOptio
return err
}

if !o.testSchedule(fs) {
fs, ok := o.visitSchedule(fs)
if !ok {
continue
}

fn := fs.Number()
for _, variant := range fs.Variants {
if !o.testVariant(fs, variant) {
for _, fsv := range fs.Variants {
fsv, ok = o.visitVariant(fs, fsv)
if !ok {
continue
}

if span, ok := variant.Ranges.Span(); ok {
accumulate(acc, fn, RouteAndRange{
DepartureAirport: variant.Data.DepartureAirport,
ArrivalAirport: variant.Data.ArrivalAirport,
Range: span,
})
if !fsv.Ranges.Empty() {
accumulate(acc, fn, fs, fsv)
}
}
}

return nil
})
},
Combiner: func(ctx context.Context, a, b map[common.FlightNumber][]RouteAndRange) (map[common.FlightNumber][]RouteAndRange, error) {
Combiner: func(ctx context.Context, a, b map[common.FlightNumber]*common.FlightSchedule) (map[common.FlightNumber]*common.FlightSchedule, error) {
if a == nil {
a = make(map[common.FlightNumber][]RouteAndRange)
a = make(map[common.FlightNumber]*common.FlightSchedule)
}

if b != nil {
for fn, rrs := range b {
for _, rr := range rrs {
accumulate(a, fn, rr)
for fn, fs := range b {
for _, fsv := range fs.Variants {
accumulate(a, fn, fs, fsv)
}
}
}

return a, nil
},
Finisher: func(ctx context.Context, acc map[common.FlightNumber][]RouteAndRange) (map[common.FlightNumber][]RouteAndRange, error) {
Finisher: func(ctx context.Context, acc map[common.FlightNumber]*common.FlightSchedule) (map[common.FlightNumber]*common.FlightSchedule, error) {
if acc == nil {
acc = make(map[common.FlightNumber][]RouteAndRange)
acc = make(map[common.FlightNumber]*common.FlightSchedule)
}

return acc, nil
Expand Down
133 changes: 75 additions & 58 deletions go/api/data/query_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ package data
import (
"errors"
"github.com/explore-flights/monorepo/go/common"
"github.com/explore-flights/monorepo/go/common/xtime"
"maps"
"time"
)

type QueryScheduleOption func(*querySchedulesOptions) error

type schedulePredicate func(fs *common.FlightSchedule) bool
type variantPredicate func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool
type scheduleVisitor func(fs *common.FlightSchedule) (*common.FlightSchedule, bool)
type variantVisitor func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool)

type querySchedulesOptions struct {
airlines common.Set[common.AirlineIdentifier]
schedulePredicates []schedulePredicate
variantPredicates []variantPredicate
airlines common.Set[common.AirlineIdentifier]
scheduleVisitors []scheduleVisitor
variantVisitors []variantVisitor
}

func (o *querySchedulesOptions) apply(opts ...QueryScheduleOption) error {
Expand All @@ -36,79 +37,83 @@ func (o *querySchedulesOptions) and(opts []querySchedulesOptions) error {
err = errors.Join(err, errors.New("cannot use airline filter in AND clauses"))
}

o.schedulePredicates = append(o.schedulePredicates, child.schedulePredicates...)
o.variantPredicates = append(o.variantPredicates, child.variantPredicates...)
o.scheduleVisitors = append(o.scheduleVisitors, child.scheduleVisitors...)
o.variantVisitors = append(o.variantVisitors, child.variantVisitors...)
}

return err
}

func (o *querySchedulesOptions) or(opts []querySchedulesOptions) error {
var err error
fsPredicates := make([]schedulePredicate, 0)
fsvPredicates := make([]variantPredicate, 0)
fsVisitors := make([]scheduleVisitor, 0)
fsvVisitors := make([]variantVisitor, 0)

for _, child := range opts {
if len(child.schedulePredicates) > 0 && len(child.variantPredicates) > 0 {
if len(child.scheduleVisitors) > 0 && len(child.variantVisitors) > 0 {
err = errors.Join(err, errors.New("cannot use both schedule and variant filters in OR clauses"))
}

maps.Copy(o.airlines, child.airlines)

if len(child.schedulePredicates) > 0 {
fsPredicates = append(fsPredicates, child.testSchedule)
if len(child.scheduleVisitors) > 0 {
fsVisitors = append(fsVisitors, child.visitSchedule)
}

if len(child.variantPredicates) > 0 {
fsvPredicates = append(fsvPredicates, child.testVariant)
if len(child.variantVisitors) > 0 {
fsvVisitors = append(fsvVisitors, child.visitVariant)
}
}

if len(fsPredicates) > 0 {
o.schedulePredicates = append(o.schedulePredicates, func(fs *common.FlightSchedule) bool {
for _, p := range fsPredicates {
if p(fs) {
return true
if len(fsVisitors) > 0 {
o.scheduleVisitors = append(o.scheduleVisitors, func(fs *common.FlightSchedule) (*common.FlightSchedule, bool) {
for _, p := range fsVisitors {
if modFs, ok := p(fs); ok {
return modFs, true
}
}

return false
return fs, false
})
}

if len(fsvPredicates) > 0 {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
for _, p := range fsvPredicates {
if p(fs, fsv) {
return true
if len(fsvVisitors) > 0 {
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
for _, p := range fsvVisitors {
if modFsv, ok := p(fs, fsv); ok {
return modFsv, true
}
}

return false
return fsv, false
})
}

return err
}

func (o *querySchedulesOptions) testSchedule(fs *common.FlightSchedule) bool {
for _, p := range o.schedulePredicates {
if !p(fs) {
return false
func (o *querySchedulesOptions) visitSchedule(fs *common.FlightSchedule) (*common.FlightSchedule, bool) {
for _, p := range o.scheduleVisitors {
if modFs, ok := p(fs); !ok {
return fs, false
} else {
fs = modFs
}
}

return true
return fs, true
}

func (o *querySchedulesOptions) testVariant(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
for _, p := range o.variantPredicates {
if !p(fs, fsv) {
return false
func (o *querySchedulesOptions) visitVariant(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
for _, p := range o.variantVisitors {
if modFsv, ok := p(fs, fsv); !ok {
return fsv, false
} else {
fsv = modFsv
}
}

return true
return fsv, true
}

func WithAirlines(airlines ...common.AirlineIdentifier) QueryScheduleOption {
Expand All @@ -126,8 +131,8 @@ func WithFlightNumber(fn common.FlightNumber) QueryScheduleOption {
o.airlines = make(common.Set[common.AirlineIdentifier])
o.airlines[fn.Airline] = struct{}{}

o.schedulePredicates = append(o.schedulePredicates, func(fs *common.FlightSchedule) bool {
return fs.Number() == fn
o.scheduleVisitors = append(o.scheduleVisitors, func(fs *common.FlightSchedule) (*common.FlightSchedule, bool) {
return fs, fs.Number() == fn
})

return nil
Expand All @@ -136,8 +141,8 @@ func WithFlightNumber(fn common.FlightNumber) QueryScheduleOption {

func WithServiceType(serviceType string) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
return fsv.Data.ServiceType == serviceType
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
return fsv, fsv.Data.ServiceType == serviceType
})

return nil
Expand All @@ -146,8 +151,8 @@ func WithServiceType(serviceType string) QueryScheduleOption {

func WithAircraftType(aircraftType string) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
return fsv.Data.AircraftType == aircraftType
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
return fsv, fsv.Data.AircraftType == aircraftType
})

return nil
Expand All @@ -156,8 +161,8 @@ func WithAircraftType(aircraftType string) QueryScheduleOption {

func WithAircraftConfigurationVersion(aircraftConfigurationVersion string) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
return fsv.Data.AircraftConfigurationVersion == aircraftConfigurationVersion
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
return fsv, fsv.Data.AircraftConfigurationVersion == aircraftConfigurationVersion
})

return nil
Expand All @@ -166,8 +171,8 @@ func WithAircraftConfigurationVersion(aircraftConfigurationVersion string) Query

func WithDepartureAirport(airport string) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
return fsv.Data.DepartureAirport == airport
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
return fsv, fsv.Data.DepartureAirport == airport
})

return nil
Expand All @@ -176,8 +181,8 @@ func WithDepartureAirport(airport string) QueryScheduleOption {

func WithArrivalAirport(airport string) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
return fsv.Data.ArrivalAirport == airport
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
return fsv, fsv.Data.ArrivalAirport == airport
})

return nil
Expand All @@ -186,8 +191,8 @@ func WithArrivalAirport(airport string) QueryScheduleOption {

func WithIgnoreCodeShares() QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
return fs.Number() == fsv.Data.OperatedAs
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
return fsv, fs.Number() == fsv.Data.OperatedAs
})

return nil
Expand All @@ -196,12 +201,18 @@ func WithIgnoreCodeShares() QueryScheduleOption {

func WithMinDepartureTime(minDepartureTime time.Time) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
if span, ok := fsv.Ranges.Span(); ok {
return fsv.DepartureTime(span[1]).After(minDepartureTime)
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
subRanges := fsv.Ranges.RemoveAll(func(d xtime.LocalDate) bool {
return fsv.DepartureTime(d).Before(minDepartureTime)
})

if subRanges.Empty() {
return fsv, false
}

return false
fsv = fsv.Clone(false)
fsv.Ranges = subRanges
return fsv, true
})

return nil
Expand All @@ -210,12 +221,18 @@ func WithMinDepartureTime(minDepartureTime time.Time) QueryScheduleOption {

func WithMaxDepartureTime(maxDepartureTime time.Time) QueryScheduleOption {
return func(o *querySchedulesOptions) error {
o.variantPredicates = append(o.variantPredicates, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) bool {
if span, ok := fsv.Ranges.Span(); ok {
return fsv.DepartureTime(span[0]).Before(maxDepartureTime)
o.variantVisitors = append(o.variantVisitors, func(fs *common.FlightSchedule, fsv *common.FlightScheduleVariant) (*common.FlightScheduleVariant, bool) {
subRanges := fsv.Ranges.RemoveAll(func(d xtime.LocalDate) bool {
return fsv.DepartureTime(d).After(maxDepartureTime)
})

if subRanges.Empty() {
return fsv, false
}

return false
fsv = fsv.Clone(false)
fsv.Ranges = subRanges
return fsv, true
})

return nil
Expand Down
2 changes: 1 addition & 1 deletion go/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
e.GET("/data/aircraft.json", web.NewAircraftEndpoint(dataHandler))
e.GET("/data/flight/:fn", web.NewFlightNumberEndpoint(dataHandler))
e.GET("/data/flight/:fn/seatmap/:departure/:arrival/:date/:aircraft", web.NewSeatMapEndpoint(dataHandler))
e.GET("/data/:airline/schedule/:aircraftType/:aircraftConfigurationVersion/v2", web.NewFlightSchedulesByConfigurationEndpoint(dataHandler))
e.GET("/data/:airline/schedule/:aircraftType/:aircraftConfigurationVersion/v3", web.NewFlightSchedulesByConfigurationEndpoint(dataHandler))
e.GET("/data/:fn/:departureDate/:departureAirport/feed.rss", web.NewFlightUpdateFeedEndpoint(dataHandler, "application/rss+xml", (*feeds.Feed).WriteRss))
e.GET("/data/:fn/:departureDate/:departureAirport/feed.atom", web.NewFlightUpdateFeedEndpoint(dataHandler, "application/atom+xml", (*feeds.Feed).WriteAtom))
e.GET("/data/allegris/feed.rss", web.NewAllegrisUpdateFeedEndpoint(s3c, bucket, ".rss"))
Expand Down
Loading

0 comments on commit bea95c0

Please sign in to comment.