forked from hasura/graphql-engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conftest.py
1031 lines (914 loc) · 40 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import collections
import http.server
import json
import inspect
import os
import pytest
import socket
import sqlalchemy
import subprocess
import sys
import threading
import time
from typing import Any, Optional
import urllib.parse
import uuid
import auth_webhook_server
from context import ActionsWebhookServer, EvtsWebhookServer, GQLWsClient, GraphQLWSClient, HGECtx, HGECtxGQLServer, HGECtxWebhook, PytestConf
import fixtures.hge
import fixtures.jwt
import fixtures.postgres
import fixtures.tls
import jwk_server
import ports
import webhook
import PortToHaskell
def pytest_addoption(parser):
parser.addoption(
"--hge-bin",
metavar="HGE_BIN",
required=False,
help="Hasura GraphQL Engine binary executable",
)
parser.addoption(
"--hge-urls",
metavar="HGE_URLS",
help="csv list of urls for graphql-engine",
required=False,
nargs='+'
)
parser.addoption(
"--pg-urls", metavar="PG_URLS",
help="csv list of urls for connecting to Postgres directly",
required=False,
nargs='+'
)
parser.addoption('--tls-ca-cert', help='The CA certificate used for helper services', required=False)
parser.addoption('--tls-ca-key', help='The CA key used for helper services', required=False)
parser.addoption(
"--test-hge-scale-url",
metavar="<url>",
required=False,
help="Run testcases for horizontal scaling"
)
parser.addoption(
"--test-startup-db-calls",
action="store_true",
default=False,
required=False,
help="Run testcases for startup database calls"
)
parser.addoption(
"--accept",
action="store_true",
default=False,
required=False,
help="Accept any failing test cases from YAML files as correct, and write the new files out to disk."
)
parser.addoption(
"--redis-url",
metavar="REDIS_URL",
help="redis url for cache server",
default=False
)
parser.addoption(
"--backend",
help="run integration tests using a particular backend",
default="postgres"
)
parser.addoption(
"--pro-tests",
action="store_true",
default=False,
help="Flag to specify if the pro tests are to be run"
)
parser.addoption(
"--port-to-haskell",
action="store_true",
default=False,
required=False,
help="Rather than running tests, generate .hs modules into the api-tests suite"
)
#By default,
#1) Set test grouping to by class (--dist=loadscope)
#2) Set default parallelism to one
def pytest_cmdline_preparse(config, args):
worker = os.environ.get('PYTEST_XDIST_WORKER')
if 'xdist' in sys.modules and not worker: # pytest-xdist plugin
num = 1
args[:] = ['--dist=loadscope', f'-n{num}'] + args
def pytest_configure(config):
# Pytest has removed the global pytest.config
# As a solution we are going to store it in PytestConf.config
PytestConf.config = config
if is_help_option_present(config):
return
if is_master(config):
assert not config.getoption('--exitfirst'), 'The "--exitfirst"/"-x" option does not work with xdist.\nSee: https://github.com/pytest-dev/pytest-xdist/issues/54'
if not (config.getoption('--hge-bin') or config.getoption('--hge-urls')):
print("either --hge-bin or --hge-urls should be specified")
if config.getoption('--hge-bin') and config.getoption('--hge-urls'):
print("only one of --hge-bin or --hge-urls should be specified")
if not config.getoption('--pg-urls'):
print("pg-urls should be specified")
config.hge_url_list = config.getoption('--hge-urls')
config.pg_url_list = config.getoption('--pg-urls')
if not config.getoption('--hge-bin') and config.getoption('-n', default=None):
xdist_threads = config.getoption('-n')
assert config.getoption('--hge-bin') or xdist_threads <= len(config.hge_url_list), "Not enough hge_urls specified, Required " + str(xdist_threads) + ", got " + str(len(config.hge_url_list))
assert xdist_threads <= len(config.pg_url_list), "Not enough pg_urls specified, Required " + str(xdist_threads) + ", got " + str(len(config.pg_url_list))
@pytest.hookimpl(optionalhook=True)
def pytest_configure_node(node):
if is_help_option_present(node.config):
return
if not node.config.getoption('--hge-bin'):
node.workerinput["hge-url"] = node.config.hge_url_list.pop()
node.workerinput["pg-url"] = node.config.pg_url_list.pop()
@pytest.fixture(scope='class', autouse=True)
def current_backend(request: pytest.FixtureRequest) -> str:
return request.config.getoption('--backend') # type: ignore
def run_on_current_backend(request: pytest.FixtureRequest, current_backend: str):
# Currently, we default all tests to run on Postgres with or without a --backend flag.
# As our test suite develops, we may consider running backend-agnostic tests on all
# backends, unless a specific `--backend` flag is passed.
desired_backends = set(name for marker in request.node.iter_markers('backend') for name in marker.args) or set(['postgres'])
return current_backend in desired_backends
def per_backend_tests_fixture(request: pytest.FixtureRequest, current_backend: str):
"""
This fixture ignores backend-specific tests unless the relevant --backend flag has been passed.
"""
if not run_on_current_backend(request, current_backend):
desired_backends = set(name for marker in request.node.iter_markers('backend') for name in marker.args)
pytest.skip('Skipping test. This test can run on ' + ', '.join(desired_backends) + '.')
@pytest.fixture(scope='class', autouse=True)
def per_backend_test_class(request: pytest.FixtureRequest, current_backend: str):
return per_backend_tests_fixture(request, current_backend)
@pytest.fixture(scope='function', autouse=True)
def per_backend_test_function(request: pytest.FixtureRequest, current_backend: str):
return per_backend_tests_fixture(request, current_backend)
@pytest.fixture(scope='session')
def owner_engine(request: pytest.FixtureRequest, hge_bin: str):
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
return sqlalchemy.engine.create_engine(request.config.workerinput["pg-url"]) # type: ignore
return fixtures.postgres.owner_engine(request)
@pytest.fixture(scope='session')
def runner_engine(owner_engine: sqlalchemy.engine.Engine):
return fixtures.postgres.runner_engine(owner_engine)
@pytest.fixture(scope='class')
def metadata_schema_url(
request: pytest.FixtureRequest,
owner_engine: sqlalchemy.engine.Engine,
runner_engine: sqlalchemy.engine.Engine,
hge_bin: Optional[str],
):
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
return request.config.workerinput["pg-url"] # type: ignore
return fixtures.postgres.metadata_schema_url(request, owner_engine, runner_engine)
@pytest.fixture(scope='class')
def source_backend(
request: pytest.FixtureRequest,
owner_engine: sqlalchemy.engine.Engine,
runner_engine: sqlalchemy.engine.Engine,
hge_ctx_fixture: HGECtx,
hge_bin: Optional[str],
):
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
yield None
return
yield from fixtures.postgres.source_backend(request, owner_engine, runner_engine, hge_ctx_fixture)
@pytest.fixture(scope='class')
def add_source(
request: pytest.FixtureRequest,
owner_engine: sqlalchemy.engine.Engine,
runner_engine: sqlalchemy.engine.Engine,
hge_ctx_fixture: HGECtx,
hge_bin: Optional[str],
):
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
def ignoring_errors(f):
def ignoring_errors_impl(*args, **kwargs):
try:
f(*args, **kwargs)
except:
pass
return ignoring_errors_impl
def impl(name: str, read_only: bool = False, customization: Any = None) -> fixtures.postgres.Backend:
if read_only:
raise Exception('Cannot add a read-only source.')
if name == 'pg1':
env_var = 'HASURA_GRAPHQL_PG_SOURCE_URL_1'
elif name == 'pg2' or name == 'postgres':
env_var = 'HASURA_GRAPHQL_PG_SOURCE_URL_2'
else:
raise Exception(f'Cannot add source {name}.')
hge_ctx_fixture.v1metadataq({
'type': 'pg_add_source',
'args': {
'name': name,
'configuration': {
'connection_info': {
'database_url': {
'from_env': env_var,
},
},
},
'customization': customization,
},
})
request.addfinalizer(ignoring_errors(lambda: hge_ctx_fixture.v1metadataq({
'type': 'pg_drop_source',
'args': {
'name': name,
},
})))
engine = sqlalchemy.create_engine(os.environ[env_var])
return fixtures.postgres.Backend(name, engine)
return impl
def impl(name: str, read_only: bool = False, customization: Any = None) -> fixtures.postgres.Backend:
backend = fixtures.postgres.create_schema(request, owner_engine, runner_engine, f'source_{name}', read_only)
fixtures.postgres.add_source(hge_ctx_fixture, backend, name, customization)
request.addfinalizer(lambda: fixtures.postgres.drop_source(hge_ctx_fixture, name))
return backend
return impl
@pytest.fixture(scope='class')
def postgis(owner_engine: sqlalchemy.engine.Engine, source_backend: fixtures.postgres.Backend):
return fixtures.postgres.postgis(owner_engine, source_backend)
@pytest.fixture(scope='session')
def tls_ca_configuration(request: pytest.FixtureRequest, tmp_path_factory: pytest.TempPathFactory) -> Optional[fixtures.tls.TLSCAConfiguration]:
cert_file: Optional[str] = request.config.getoption('--tls-ca-cert') # type: ignore
key_file: Optional[str] = request.config.getoption('--tls-ca-key') # type: ignore
if cert_file and key_file:
tmp_path = tmp_path_factory.mktemp('tls')
return fixtures.tls.TLSCAConfiguration(cert_file, key_file, tmp_path)
else:
return None
# TODO: remove once parallelization work is completed
@pytest.fixture(scope='class', autouse=True)
def hge_skip(request: pytest.FixtureRequest, hge_server: Optional[Any], hge_fixture_env: dict[str, str]):
# Let `hge_server` manage this stuff.
if hge_server:
return
# Ensure that the correct environment variables have been set for the given test.
hge_marker_env: dict[str, str] = {marker.args[0]: marker.args[1] for marker in request.node.iter_markers('hge_env')}
hge_env = {**hge_marker_env, **hge_fixture_env}
incorrect_env = {name: value for name, value in hge_env.items() if os.getenv(name) != value}
if len(incorrect_env) > 0:
pytest.skip(
'This test expects the following environment variables: '
+ ', '.join([f'{name!r} = {value!r} (not {os.getenv(name)!r})' for name, value in incorrect_env.items()]))
# Make sure that if there's a webhook, the test expects that there's a webhook
if 'HASURA_GRAPHQL_AUTH_HOOK' in os.environ:
if 'HASURA_GRAPHQL_AUTH_HOOK' not in hge_fixture_env:
pytest.skip('HGE expects a running webhook, but this test does not provide one.')
if os.environ['HASURA_GRAPHQL_AUTH_HOOK'] != hge_fixture_env['HASURA_GRAPHQL_AUTH_HOOK']:
pytest.skip(f'HGE expects a running webhook at {os.environ["HASURA_GRAPHQL_AUTH_HOOK"]}, but this test provides one at {hge_fixture_env["HASURA_GRAPHQL_AUTH_HOOK"]}.')
@pytest.fixture(scope='session')
def hge_bin(request: pytest.FixtureRequest) -> Optional[str]:
return request.config.getoption('--hge-bin') # type: ignore
@pytest.fixture(scope='class')
def hge_port(worker_id: str) -> int:
return fixtures.hge.hge_port(worker_id)
@pytest.fixture(scope='class')
def hge_url(request: pytest.FixtureRequest, hge_bin: Optional[str], hge_port: int) -> str:
if hge_bin:
return f'http://localhost:{hge_port}'
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
else:
return request.config.workerinput['hge-url'] # type: ignore
# A hack to inject environment variables from other fixtures into HGE.
# All of this is because we cannot cleanly express dependencies between
# fixtures of the form "this loads before that, IF that is loaded".
#
# This is marked as "early" so the `fixture-order` plugin ensures that it is
# loaded _before_ `hge_server`. Similarly, any fixture using it must be at
# the same scope level and also marked as "early", to ensure that it is
# mutated before `hge_server` uses the data.
#
# In short, we use `early` to ensure that writes happen before reads.
@pytest.fixture(scope='class')
@pytest.mark.early
def hge_fixture_env() -> dict[str, str]:
return {}
@pytest.fixture(scope='class')
def hge_key(
request: pytest.FixtureRequest,
hge_bin: Optional[str],
) -> Optional[str]:
marker = request.node.get_closest_marker('admin_secret')
if hge_bin:
# If the test requests an admin secret, generate one.
return str(uuid.uuid4()) if marker else None
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
else:
# If the environment variable is set, use it.
# This will be used in the event that we start the server outside the test harness.
# We skip the test if it explicitly requires that we have no admin secret.
super_marker = request.node.get_closest_marker('requires_an_admin_secret')
anti_marker = request.node.get_closest_marker('no_admin_secret')
env_key = os.environ.get('HASURA_GRAPHQL_ADMIN_SECRET')
if super_marker and not env_key:
pytest.skip('This test requires that the admin secret is set.')
if anti_marker and env_key:
pytest.skip('This test requires that the admin secret is not set.')
return env_key
@pytest.fixture(scope='class')
def hge_server(
request: pytest.FixtureRequest,
hge_bin: Optional[str],
hge_port: int,
hge_url: str,
hge_key: Optional[str],
hge_fixture_env: dict[str, str],
metadata_schema_url: str,
) -> Optional[subprocess.Popen[bytes]]:
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
return None
return fixtures.hge.hge_server(request, hge_bin, hge_port, hge_url, hge_key, hge_fixture_env, metadata_schema_url)
@pytest.fixture(scope='class')
def enabled_apis(request: pytest.FixtureRequest, hge_bin: Optional[str]) -> Optional[set[str]]:
if hge_bin:
hge_marker_env: dict[str, str] = {marker.args[0]: marker.args[1] for marker in request.node.iter_markers('hge_env') if marker.args[1] is not None}
enabled_apis_str = hge_marker_env.get('HASURA_GRAPHQL_ENABLED_APIS')
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
else:
enabled_apis_str = os.environ.get('HASURA_GRAPHQL_ENABLED_APIS')
if not enabled_apis_str:
return None
return set(enabled_apis_str.split(','))
@pytest.fixture(scope='class')
def hge_ctx_fixture(
request: pytest.FixtureRequest,
hge_url: str,
hge_bin: Optional[str],
metadata_schema_url: str,
hge_key: Optional[str],
enabled_apis: Optional[set[str]],
):
# This madness allows us to figure out whether there is a webhook running.
# We need this information because we dynamically decide how we run queries according to the authentication method.
# This is probably terrible, but refactoring that logic would require rewriting every test.
webhook: Optional[HGECtxWebhook] = None
if webhook_server.__name__ in request.fixturenames:
webhook = request.getfixturevalue(webhook_server.__name__)
elif 'query_echo_webhook' in request.fixturenames: # in test_webhook_request_context.py
webhook = HGECtxWebhook(tls_trust=None)
hge_ctx = HGECtx(
hge_url=hge_url,
metadata_schema_url=metadata_schema_url,
hge_key=hge_key,
webhook=webhook,
enabled_apis=enabled_apis,
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
clear_dbs=not hge_bin,
config=request.config,
)
yield hge_ctx
hge_ctx.teardown()
time.sleep(1) # TODO why do we sleep here?
# tie everything together
@pytest.fixture(scope='class')
def hge_ctx(
hge_ctx_fixture: HGECtx,
hge_server,
hge_url,
source_backend,
):
return hge_ctx_fixture
@pytest.fixture(scope='class')
@pytest.mark.early
def evts_webhook(hge_fixture_env: dict[str, str]):
server = EvtsWebhookServer(extract_server_address_from('EVENT_WEBHOOK_HANDLER'))
thread = threading.Thread(target=server.serve_forever)
thread.start()
print(f'{evts_webhook.__name__} server started on {server.url}')
hge_fixture_env['EVENT_WEBHOOK_HANDLER'] = server.url
yield server
server.shutdown()
server.server_close()
thread.join()
@pytest.fixture(scope='class')
@pytest.mark.early
def actions_fixture(hge_url: str, hge_key: Optional[str], hge_fixture_env: dict[str, str]):
# Start actions' webhook server
server = ActionsWebhookServer(hge_url, hge_key, server_address=extract_server_address_from('ACTION_WEBHOOK_HANDLER'))
thread = threading.Thread(target=server.serve_forever)
thread.start()
print(f'{actions_fixture.__name__} server started on {server.url}')
hge_fixture_env['ACTION_WEBHOOK_HANDLER'] = server.url
yield server
server.shutdown()
server.server_close()
thread.join()
use_action_fixtures = pytest.mark.usefixtures(
'actions_fixture',
'per_class_db_schema_for_mutation_tests',
'per_method_db_data_for_mutation_tests'
)
@pytest.fixture(scope='class')
@pytest.mark.early
def auth_hook(hge_fixture_env: dict[str, str]):
server = auth_webhook_server.create_server(extract_server_address_from('HASURA_GRAPHQL_AUTH_HOOK'))
thread = threading.Thread(target = server.serve_forever)
thread.start()
print(f'{auth_hook.__name__} server started on {server.url}')
hge_fixture_env['HASURA_GRAPHQL_AUTH_HOOK'] = server.url + '/auth'
ports.wait_for_port(server.server_port)
yield server
auth_webhook_server.stop_server(server)
@pytest.fixture(scope='class')
@pytest.mark.early
def webhook_server(
request: pytest.FixtureRequest,
hge_bin: Optional[str],
hge_fixture_env: dict[str,str],
tls_ca_configuration: Optional[fixtures.tls.TLSCAConfiguration],
):
server_address = extract_server_address_from('HASURA_GRAPHQL_AUTH_HOOK')
scheme = None
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
scheme = str(urllib.parse.urlparse(os.getenv('HASURA_GRAPHQL_AUTH_HOOK')).scheme)
if tls_ca_configuration:
if scheme is not None and scheme != 'https':
pytest.skip(f'Cannot run the remote schema server with TLS; HGE is configured to talk to it over "{scheme}".')
server = http.server.HTTPServer(server_address, webhook.Handler)
use_tls = request.node.get_closest_marker('no_tls_webhook_server') is None
if use_tls:
insecure = request.node.get_closest_marker('tls_insecure_certificate') is not None
tls_trust = fixtures.tls.TLSTrust.INSECURE if insecure else fixtures.tls.TLSTrust.SECURE
tls_ca_configuration.configure(server, tls_trust)
else:
tls_trust = None
else:
if scheme is not None and scheme != 'http':
pytest.skip(f'Cannot run the remote schema server without TLS; HGE is configured to talk to it over "{scheme}".')
if request.node.get_closest_marker('tls_webhook_server') is not None:
pytest.skip('Only running this test with TLS enabled; skipping the version with TLS disabled.')
server = http.server.HTTPServer(server_address, webhook.Handler)
tls_trust = None
thread = threading.Thread(target=server.serve_forever)
thread.start()
request.addfinalizer(server.shutdown)
scheme = 'https' if tls_trust else 'http'
# We must use 'localhost' and not `server.server_address[0]`
# because when using TLS, we need a domain name, not an IP address.
host = 'localhost'
port = server.server_address[1]
hge_fixture_env['HASURA_GRAPHQL_AUTH_HOOK'] = f'{scheme}://{host}:{port}/'
ports.wait_for_port(port)
return HGECtxWebhook(tls_trust=tls_trust)
@pytest.fixture(scope='class')
def pro_tests_fixtures(hge_ctx):
if not hge_ctx.pro_tests:
pytest.skip('These tests are meant to be run with --pro-tests set')
@pytest.fixture(scope='class')
@pytest.mark.early
def scheduled_triggers_evts_webhook(hge_fixture_env: dict[str, str]):
server = EvtsWebhookServer(extract_server_address_from('SCHEDULED_TRIGGERS_WEBHOOK_DOMAIN'))
thread = threading.Thread(target=server.serve_forever)
thread.start()
print(f'{scheduled_triggers_evts_webhook.__name__} server started on {server.url}')
hge_fixture_env['SCHEDULED_TRIGGERS_WEBHOOK_DOMAIN'] = server.url
yield server
server.shutdown()
server.server_close()
thread.join()
# This will use TLS on CI, and won't when run locally. Unfortunately,
# parameterization doesn't really work (see test_webhook.py and test_tests.py
# for details), so we shall avoid it in favor of just testing what we can.
@pytest.fixture(scope='class')
@pytest.mark.early
def gql_server(
request: pytest.FixtureRequest,
hge_bin: Optional[str],
hge_url: str,
hge_fixture_env: dict[str,str],
tls_ca_configuration: Optional[fixtures.tls.TLSCAConfiguration],
):
server_address = extract_server_address_from('REMOTE_SCHEMAS_WEBHOOK_DOMAIN')
scheme = None
# TODO: remove once parallelization work is completed
# `hge_bin` will no longer be optional
if not hge_bin:
scheme = str(urllib.parse.urlparse(os.getenv('REMOTE_SCHEMAS_WEBHOOK_DOMAIN')).scheme)
if socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect_ex(server_address) == 0:
# The server is already running. This might be the case if we're running the server upgrade/downgrade tests.
# In this case, skip starting it and rely on the external service.
url = f'http://{server_address[0]}:{server_address[1]}'
hge_fixture_env['REMOTE_SCHEMAS_WEBHOOK_DOMAIN'] = url
# Create an object with a field named "url", set to `url`.
return collections.namedtuple('ExternalGraphQLServer', ['url'])(url)
if tls_ca_configuration:
if scheme is not None and scheme != 'https':
pytest.skip(f'Cannot run the remote schema server with TLS; HGE is configured to talk to it over "{scheme}".')
else:
if scheme is not None and scheme != 'http':
pytest.skip(f'Cannot run the remote schema server without TLS; HGE is configured to talk to it over "{scheme}".')
hge_urls: list[str] = request.config.getoption('--hge-urls') or [hge_url] # type: ignore
server = HGECtxGQLServer(
server_address=server_address,
tls_ca_configuration=tls_ca_configuration,
hge_urls=hge_urls,
)
server.start_server()
print(f'{gql_server.__name__} server started on {server.url}')
hge_fixture_env['REMOTE_SCHEMAS_WEBHOOK_DOMAIN'] = server.url
request.addfinalizer(server.stop_server)
return server
@pytest.fixture(scope='class')
def ws_client(hge_ctx):
"""
This fixture provides an Apollo GraphQL websockets client
"""
client = GQLWsClient(hge_ctx, '/v1/graphql')
time.sleep(0.1)
yield client
client.teardown()
@pytest.fixture(scope='class')
def ws_client_graphql_ws(hge_ctx):
"""
This fixture provides an GraphQL-WS client
"""
client = GraphQLWSClient(hge_ctx, '/v1/graphql')
time.sleep(0.1)
yield client
client.teardown()
@pytest.fixture(scope='class')
@pytest.mark.early
def jwt_configuration(
request: pytest.FixtureRequest,
tmp_path_factory: pytest.TempPathFactory,
hge_fixture_env: dict[str, str],
) -> Optional[fixtures.jwt.JWTConfiguration]:
marker = request.node.get_closest_marker('jwt')
if not marker:
raise Exception('JWT configuration is required.')
algorithm = marker.args[0]
try:
configuration = marker.args[1]
except IndexError:
configuration = {}
tmp_path = tmp_path_factory.mktemp('jwt')
match algorithm:
case 'rsa':
configuration = fixtures.jwt.init_rsa(tmp_path, configuration)
case 'ed25519':
configuration = fixtures.jwt.init_ed25519(tmp_path, configuration)
case 'es':
configuration = fixtures.jwt.init_es256(tmp_path, configuration)
case _:
raise Exception(f'Unsupported JWT configuration: {marker.args!r}')
hge_fixture_env['HASURA_GRAPHQL_JWT_SECRET'] = json.dumps(configuration.server_configuration)
return configuration
@pytest.fixture(scope='class')
@pytest.mark.early
def jwk_server_url(request: pytest.FixtureRequest, hge_fixture_env: dict[str, str]):
path_marker = request.node.get_closest_marker('jwk_path')
assert path_marker is not None, 'The test must set the `jwk_path` marker.'
path: str = path_marker.args[0]
# If the JWK server was started outside, just set the environment variable
# so that the test is skipped if the value is wrong.
env_var = os.getenv('JWK_SERVER_URL')
if env_var:
hge_fixture_env['HASURA_GRAPHQL_JWT_SECRET'] = '{"jwk_url": "' + env_var + path + '"}'
return env_var
server_address = extract_server_address_from('JWK_SERVER_URL')
server = jwk_server.create_server(server_address)
thread = threading.Thread(target=server.serve_forever)
thread.start()
request.addfinalizer(server.shutdown)
host = server.server_address[0]
port = server.server_address[1]
ports.wait_for_port(port)
url = f'http://{host}:{port}'
print(f'{jwk_server_url.__name__} server started on {url}')
hge_fixture_env['HASURA_GRAPHQL_JWT_SECRET'] = '{"jwk_url": "' + url + path + '"}'
return url
def extract_server_address_from(env_var: str) -> tuple[str, int]:
"""
Extracts a server address (a pair of host and port) from the given environment variable.
If the environment variable doesn't exist, returns `('localhost', 0)`.
"""
value = os.getenv(env_var)
if not value:
return ('localhost', 0)
url = urllib.parse.urlparse(value)
if not url.hostname:
raise Exception(f'Invalid host in {env_var}.')
if not url.port:
raise Exception(f'Invalid port in {env_var}.')
print(f'Found {env_var} = {value!r}, so {inspect.stack()[1].function} server will start on {url.geturl()}')
return (url.hostname, url.port)
@pytest.fixture(scope='class')
def per_class_tests_db_state(request, hge_ctx):
"""
Set up the database state for select queries.
Has a class level scope, since select queries does not change database state
Expects either `dir()` method which provides the directory
with `setup.yaml` and `teardown.yaml` files
Or class variables `setup_files` and `teardown_files` that provides
the list of setup and teardown files respectively.
By default, for a postgres backend the setup and teardown is done via
the `/v1/query` endpoint, to setup using the `/v1/metadata` (metadata setup)
and `/v2/query` (DB setup), set the `setup_metadata_api_version` to "v2"
"""
hge_ctx.request = request
if PytestConf.config.getoption("--port-to-haskell"):
request.addfinalizer(PortToHaskell.write_tests_to_port)
yield from db_state_context(request, hge_ctx)
@pytest.fixture(scope='function')
def per_method_tests_db_state(request, hge_ctx):
"""
This fixture sets up the database state for metadata operations
Has a function level scope, since metadata operations may change both the schema and data
Class method/variable requirements are similar to that of per_class_tests_db_state fixture
"""
yield from db_state_context(request, hge_ctx)
@pytest.fixture(scope='class')
def per_class_db_schema_for_mutation_tests(request, hge_ctx):
"""
This fixture sets up the database schema for mutations.
It has a class level scope, since mutations does not change schema.
Expects either `dir()` class method which provides the directory with `schema_setup.yaml` and `schema_teardown.yaml` files,
or variables `schema_setup_files` and `schema_teardown_files`
that provides the list of setup and teardown files respectively
"""
# setting the default metadata API version to v1
setup_metadata_api_version = getattr(request.cls, 'setup_metadata_api_version',"v1")
(setup, teardown, schema_setup, schema_teardown, pre_setup, post_teardown) = [
hge_ctx.backend_suffix(filename) + ".yaml"
for filename in ['setup', 'teardown', 'schema_setup', 'schema_teardown', 'pre_setup', 'post_teardown']
]
if hge_ctx.is_default_backend:
if setup_metadata_api_version == "v1":
db_context = db_context_with_schema_common(
request, hge_ctx,
'schema_setup_files', 'schema_setup.yaml',
'schema_teardown_files', 'schema_teardown.yaml',
)
else:
db_context = db_context_with_schema_common_new(
request, hge_ctx,
'schema_setup_files', setup,
'schema_teardown_files', teardown,
schema_setup, schema_teardown,
pre_setup, post_teardown,
)
else:
db_context = db_context_with_schema_common_new(
request, hge_ctx,
'schema_setup_files', setup,
'schema_teardown_files', teardown,
schema_setup, schema_teardown,
pre_setup, post_teardown,
)
yield from db_context
@pytest.fixture(scope='function')
def per_method_db_data_for_mutation_tests(request, hge_ctx, per_class_db_schema_for_mutation_tests):
"""
This fixture sets up the data for mutations.
Has a function level scope, since mutations may change data.
Having just the setup file(s), or the teardown file(s) is allowed.
Expects either `dir()` class method which provides the directory with `values_setup.yaml` and / or `values_teardown.yaml` files.
The class may provide `values_setup_files` variables which contains the list of data setup files,
Or the `values_teardown_files` variable which provides the list of data teardown files.
"""
# Non-default (Postgres) backend tests expect separate setup and schema_setup
# files for v1/metadata and v2/query requests, respectively.
(values_setup, values_teardown) = [
hge_ctx.backend_suffix(filename) + ".yaml"
for filename in ['values_setup', 'values_teardown']
]
yield from db_context_common(
request, hge_ctx,
'values_setup_files', values_setup,
'values_teardown_files', values_teardown,
)
def db_state_context(request, hge_ctx):
# Non-default (Postgres) backend tests expect separate setup and schema_setup
# files for v1/metadata and v2/query requests, respectively.
(setup, teardown, schema_setup, schema_teardown, pre_setup, post_teardown) = [
hge_ctx.backend_suffix(filename) + ".yaml"
for filename in ['setup', 'teardown', 'schema_setup', 'schema_teardown', 'pre_setup', 'post_teardown']
]
# setting the default metadata API version to v1
setup_metadata_api_version = getattr(request.cls, 'setup_metadata_api_version',"v1")
if hge_ctx.is_default_backend:
if setup_metadata_api_version == "v1":
# setup the metadata and DB schema using the `/v1/query` endpoint
db_context = db_context_with_schema_common(
request, hge_ctx,
'setup_files', 'setup.yaml',
'teardown_files', 'teardown.yaml',
)
elif setup_metadata_api_version == "v2":
# setup the metadata using the "/v1/metadata" and the DB schema using the `/v2/query` endpoints
db_context = db_context_with_schema_common_new(
request, hge_ctx,
'setup_files', setup,
'teardown_files', teardown,
schema_setup, schema_teardown,
pre_setup, post_teardown,
)
else:
raise NotImplementedError('Invalid API version.')
else:
# setup the metadata using the "/v1/metadata" and the DB schema using the `/v2/query` endpoints
db_context = db_context_with_schema_common_new(
request, hge_ctx,
'setup_files', setup,
'teardown_files', teardown,
schema_setup, schema_teardown,
pre_setup, post_teardown,
)
yield from db_context
def db_context_with_schema_common(
request, hge_ctx,
setup_files_attr, setup_default_file,
teardown_files_attr, teardown_default_file,
):
yield from db_context_common(
request, hge_ctx,
setup_files_attr, setup_default_file,
teardown_files_attr, teardown_default_file,
)
def db_context_with_schema_common_new(
request, hge_ctx,
setup_files_attr, setup_default_file,
teardown_files_attr, teardown_default_file,
setup_sql_file, teardown_sql_file,
pre_setup_file, post_teardown_file,
):
yield from db_context_common_new(
request, hge_ctx,
setup_files_attr, setup_default_file, setup_sql_file,
teardown_files_attr, teardown_default_file, teardown_sql_file,
pre_setup_file, post_teardown_file,
)
def db_context_common(
request, hge_ctx,
setup_files_attr, setup_default_file,
teardown_files_attr, teardown_default_file,
):
def get_files(attr, default_file):
files = getattr(request.cls, attr, None)
if not files:
files = os.path.join(request.cls.dir(), default_file)
return files
setup = get_files(setup_files_attr, setup_default_file)
teardown = get_files(teardown_files_attr, teardown_default_file)
if hge_ctx.is_default_backend:
yield from setup_and_teardown_v1q(
request, hge_ctx,
setup, teardown,
)
else:
yield from setup_and_teardown_v2q(
request, hge_ctx,
setup, teardown,
)
def db_context_common_new(
request, hge_ctx,
setup_files_attr, setup_default_file, setup_default_sql_file,
teardown_files_attr, teardown_default_file, teardown_default_sql_file,
pre_setup_file, post_teardown_file,
):
def get_files(attr, default_file):
files = getattr(request.cls, attr, None)
if not files:
files = os.path.join(request.cls.dir(), default_file)
return files
setup = get_files(setup_files_attr, setup_default_file)
teardown = get_files(teardown_files_attr, teardown_default_file)
setup_default_sql_file = os.path.join(request.cls.dir(), setup_default_sql_file)
teardown_default_sql_file = os.path.join(request.cls.dir(), teardown_default_sql_file)
pre_setup_default_file = os.path.join(request.cls.dir(), pre_setup_file)
post_teardown_default_file = os.path.join(request.cls.dir(), post_teardown_file)
yield from setup_and_teardown(
request, hge_ctx,
setup, teardown,
setup_default_sql_file, teardown_default_sql_file,
pre_setup_default_file, post_teardown_default_file,
)
def setup_and_teardown_v1q(
request, hge_ctx,
setup_files, teardown_files,
):
if PytestConf.config.getoption("--port-to-haskell"):
backend = hge_ctx.backend.title()
hs_test = PortToHaskell.with_test(request.cls.__qualname__)
def appendSetupIfExists(name, url):
def curried(f):
if os.path.isfile(f):
with open(f, 'r') as content:
hs_test.add_setup(backend, PortToHaskell.Setup(name, f, url, content.read()))
return curried
run_on_elem_or_list(appendSetupIfExists("setup", "/v1/query"), setup_files)
def v1q_f(filepath):
if os.path.isfile(filepath):
return hge_ctx.v1q_f(filepath)
run_on_elem_or_list(v1q_f, setup_files)
yield
run_on_elem_or_list(v1q_f, teardown_files)
def setup_and_teardown_v2q(
request, hge_ctx,
setup_files, teardown_files,
):
if PytestConf.config.getoption("--port-to-haskell"):
backend = hge_ctx.backend.title()
hs_test = PortToHaskell.with_test(request.cls.__qualname__)
def appendSetupIfExists(name, url):
def curried(f):
if os.path.isfile(f):
with open(f, 'r') as content:
hs_test.add_setup(backend, PortToHaskell.Setup(name, f, url, content.read()))
return curried
run_on_elem_or_list(appendSetupIfExists("setup", "/v2/query"), setup_files)
def v2q_f(filepath):
if os.path.isfile(filepath):
return hge_ctx.v2q_f(filepath)
run_on_elem_or_list(v2q_f, setup_files)
yield
run_on_elem_or_list(v2q_f, teardown_files)
def setup_and_teardown(
request, hge_ctx,
setup_files, teardown_files,
sql_schema_setup_file, sql_schema_teardown_file,
pre_setup_file, post_teardown_file,
):
if PytestConf.config.getoption("--port-to-haskell"):
backend = hge_ctx.backend.title()
hs_test = PortToHaskell.with_test(request.cls.__qualname__)
def appendSetupIfExists(name, url):
def curried(f):
if os.path.isfile(f):
with open(f, 'r') as content:
hs_test.add_setup(backend, PortToHaskell.Setup(name, f, url, content.read()))
return curried
run_on_elem_or_list(appendSetupIfExists("pre_setup", "/v1/metadata"), pre_setup_file)
run_on_elem_or_list(appendSetupIfExists("schema_setup", "/v2/query"), sql_schema_setup_file)
run_on_elem_or_list(appendSetupIfExists("setup_metadata", "/v1/metadata"), setup_files)
yield
else:
def v2q_f(f):
if os.path.isfile(f):
try:
hge_ctx.v2q_f(f)
except AssertionError:
try:
run_on_elem_or_list(pre_post_metadataq_f, post_teardown_file)
except:
pass
raise
def metadataq_f(f):
if os.path.isfile(f):
try:
hge_ctx.v1metadataq_f(f)
except AssertionError:
try:
# drop the sql setup, if the metadata calls fail
run_on_elem_or_list(v2q_f, sql_schema_teardown_file)
run_on_elem_or_list(pre_post_metadataq_f, post_teardown_file)