diff --git a/CHANGELOG b/CHANGELOG index 95cda51..c012bfd 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,12 @@ -UPCOMING -pass +TODO +Policies declaration via app config (https://www.rabbitmq.com/parameters.html) +Run separate service/consumer as worker +Add ABC to prevent circular import hell + +v 1.1.0a1 +Pydantic based app settings +Inject arguments to endpoints based on type annotations (FastAPI-like) +Inject logger or other components v 1.0.11 Make it possible to just register endpoints, without running loop (method `Mela.setup()`) diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f7d6db3 --- /dev/null +++ b/Makefile @@ -0,0 +1,66 @@ +# COLORS +GREEN := $(shell tput -Txterm setaf 2) +WHITE := $(shell tput -Txterm setaf 7) +YELLOW := $(shell tput -Txterm setaf 3) +RESET := $(shell tput -Txterm sgr0) + +.DEFAULT_GOAL := help +.PHONY: help setup run lint type flake8 mypy test testcov run clean + +VENV=.venv +PYTHON=$(VENV)/bin/python3 + +## Initialize venv and install dependencies +setup: $(VENV)/bin/activate +$(VENV)/bin/activate: + python3 -m venv $(VENV) + $(PYTHON) -m pip install pipenv==2022.9.8 + $(PYTHON) -m pipenv sync -d + +## Analyze project source code for slylistic errors +lint: setup + $(PYTHON) -m flake8 mela tests + +## Analyze project source code for typing errors +type: setup + $(PYTHON) -m mypy mela tests + +## Run flake8 +flake8: setup + $(PYTHON) -m flake8 mela tests + +## Run mypy +mypy: setup + $(PYTHON) -m mypy mela tests + +## Run project tests +test: setup + $(PYTHON) -m pytest + +## Run project tests and open HTML coverage report +testcov: setup + $(PYTHON) -m pytest --cov-report=html + xdg-open htmlcov/index.html + +## Clean up project environment +clean: + rm -rf $(VENV) *.egg-info .eggs .coverage htmlcov .pytest_cache + find . -type f -name '*.pyc' -delete + +## Show help +help: + @echo '' + @echo 'Usage:' + @echo ' ${YELLOW}make${RESET} ${GREEN}${RESET}' + @echo '' + @echo 'Targets:' + @awk '/^[a-zA-Z\-\_0-9]+:/ { \ + helpMessage = match(lastLine, /^## (.*)/); \ + if (helpMessage) { \ + helpCommand = $$1; sub(/:$$/, "", helpCommand); \ + helpMessage = substr(lastLine, RSTART + 3, RLENGTH); \ + printf " ${YELLOW}%-$(TARGET_MAX_CHAR_NUM)15s${RESET} ${GREEN}%s${RESET}\n", helpCommand, helpMessage; \ + } \ + } \ + { lastLine = $$0 }' $(MAKEFILE_LIST) + @echo '' diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..79306ae --- /dev/null +++ b/Pipfile @@ -0,0 +1,31 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +aio-pika = "==8.2.1" +envyaml = "==1.10.211231" +pydantic = "==1.10.2" + +[dev-packages] +flake8 = "==5.0.4" +flake8-broken-line = "==0.5.0" +flake8-commas = "==2.1.0" +flake8-comprehensions = "==3.10.0" +flake8-eradicate = "==1.3.0" +flake8-isort = "==4.2.0" +flake8-print = "==5.0.0" +pep8-naming = "==0.13.2" +flake8-bandit = "==4.1.1" +flake8-use-fstring = "==1.4" +bandit = "==1.7.4" +mypy = "==0.971" +pytest = "==7.1.3" +pytest-cov = "==3.0.0" +pytest-mock = "==3.8.2" +pytest-asyncio = "==0.19.0" +pytest-clarity = "==1.0.1" + +[requires] +python_version = "3.10" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..b6df22a --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,734 @@ +{ + "_meta": { + "hash": { + "sha256": "0543ccfc7a7299ac967627c09fe0be64caf92da01c62faae9b6bbd51dfd50a54" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.10" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "aio-pika": { + "hashes": [ + "sha256:e6aa366526f4e03d1040757ee5104860186be91b10e0a82c3e610d8ecb55dd5b" + ], + "index": "pypi", + "version": "==8.2.1" + }, + "aiormq": { + "hashes": [ + "sha256:fd815d2bb9d8c950361697a74c1b067bc078726c3ef3b837e979a68a4986b148", + "sha256:fdbf2efed73a2e07437de3af5e3591165efb155a18394dc7295cac9e80943c62" + ], + "markers": "python_version >= '3.7'", + "version": "==6.4.2" + }, + "envyaml": { + "hashes": [ + "sha256:88f8a076159e3c317d3450a5f404132b6ac91aecee4934ea72eac65f911f1244", + "sha256:8d7a7a6be12587cc5da32a587067506b47b849f4643981099ad148015a72de52" + ], + "index": "pypi", + "version": "==1.10.211231" + }, + "idna": { + "hashes": [ + "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4", + "sha256:90b77e79eaa3eba6de819a0c442c0b4ceefc341a7a2ab77d7562bf49f425c5c2" + ], + "markers": "python_version >= '3.5'", + "version": "==3.4" + }, + "multidict": { + "hashes": [ + "sha256:0327292e745a880459ef71be14e709aaea2f783f3537588fb4ed09b6c01bca60", + "sha256:041b81a5f6b38244b34dc18c7b6aba91f9cdaf854d9a39e5ff0b58e2b5773b9c", + "sha256:0556a1d4ea2d949efe5fd76a09b4a82e3a4a30700553a6725535098d8d9fb672", + "sha256:05f6949d6169878a03e607a21e3b862eaf8e356590e8bdae4227eedadacf6e51", + "sha256:07a017cfa00c9890011628eab2503bee5872f27144936a52eaab449be5eaf032", + "sha256:0b9e95a740109c6047602f4db4da9949e6c5945cefbad34a1299775ddc9a62e2", + "sha256:19adcfc2a7197cdc3987044e3f415168fc5dc1f720c932eb1ef4f71a2067e08b", + "sha256:19d9bad105dfb34eb539c97b132057a4e709919ec4dd883ece5838bcbf262b80", + "sha256:225383a6603c086e6cef0f2f05564acb4f4d5f019a4e3e983f572b8530f70c88", + "sha256:23b616fdc3c74c9fe01d76ce0d1ce872d2d396d8fa8e4899398ad64fb5aa214a", + "sha256:2957489cba47c2539a8eb7ab32ff49101439ccf78eab724c828c1a54ff3ff98d", + "sha256:2d36e929d7f6a16d4eb11b250719c39560dd70545356365b494249e2186bc389", + "sha256:2e4a0785b84fb59e43c18a015ffc575ba93f7d1dbd272b4cdad9f5134b8a006c", + "sha256:3368bf2398b0e0fcbf46d85795adc4c259299fec50c1416d0f77c0a843a3eed9", + "sha256:373ba9d1d061c76462d74e7de1c0c8e267e9791ee8cfefcf6b0b2495762c370c", + "sha256:4070613ea2227da2bfb2c35a6041e4371b0af6b0be57f424fe2318b42a748516", + "sha256:45183c96ddf61bf96d2684d9fbaf6f3564d86b34cb125761f9a0ef9e36c1d55b", + "sha256:4571f1beddff25f3e925eea34268422622963cd8dc395bb8778eb28418248e43", + "sha256:47e6a7e923e9cada7c139531feac59448f1f47727a79076c0b1ee80274cd8eee", + "sha256:47fbeedbf94bed6547d3aa632075d804867a352d86688c04e606971595460227", + "sha256:497988d6b6ec6ed6f87030ec03280b696ca47dbf0648045e4e1d28b80346560d", + "sha256:4bae31803d708f6f15fd98be6a6ac0b6958fcf68fda3c77a048a4f9073704aae", + "sha256:50bd442726e288e884f7be9071016c15a8742eb689a593a0cac49ea093eef0a7", + "sha256:514fe2b8d750d6cdb4712346a2c5084a80220821a3e91f3f71eec11cf8d28fd4", + "sha256:5774d9218d77befa7b70d836004a768fb9aa4fdb53c97498f4d8d3f67bb9cfa9", + "sha256:5fdda29a3c7e76a064f2477c9aab1ba96fd94e02e386f1e665bca1807fc5386f", + "sha256:5ff3bd75f38e4c43f1f470f2df7a4d430b821c4ce22be384e1459cb57d6bb013", + "sha256:626fe10ac87851f4cffecee161fc6f8f9853f0f6f1035b59337a51d29ff3b4f9", + "sha256:6701bf8a5d03a43375909ac91b6980aea74b0f5402fbe9428fc3f6edf5d9677e", + "sha256:684133b1e1fe91eda8fa7447f137c9490a064c6b7f392aa857bba83a28cfb693", + "sha256:6f3cdef8a247d1eafa649085812f8a310e728bdf3900ff6c434eafb2d443b23a", + "sha256:75bdf08716edde767b09e76829db8c1e5ca9d8bb0a8d4bd94ae1eafe3dac5e15", + "sha256:7c40b7bbece294ae3a87c1bc2abff0ff9beef41d14188cda94ada7bcea99b0fb", + "sha256:8004dca28e15b86d1b1372515f32eb6f814bdf6f00952699bdeb541691091f96", + "sha256:8064b7c6f0af936a741ea1efd18690bacfbae4078c0c385d7c3f611d11f0cf87", + "sha256:89171b2c769e03a953d5969b2f272efa931426355b6c0cb508022976a17fd376", + "sha256:8cbf0132f3de7cc6c6ce00147cc78e6439ea736cee6bca4f068bcf892b0fd658", + "sha256:9cc57c68cb9139c7cd6fc39f211b02198e69fb90ce4bc4a094cf5fe0d20fd8b0", + "sha256:a007b1638e148c3cfb6bf0bdc4f82776cef0ac487191d093cdc316905e504071", + "sha256:a2c34a93e1d2aa35fbf1485e5010337c72c6791407d03aa5f4eed920343dd360", + "sha256:a45e1135cb07086833ce969555df39149680e5471c04dfd6a915abd2fc3f6dbc", + "sha256:ac0e27844758d7177989ce406acc6a83c16ed4524ebc363c1f748cba184d89d3", + "sha256:aef9cc3d9c7d63d924adac329c33835e0243b5052a6dfcbf7732a921c6e918ba", + "sha256:b9d153e7f1f9ba0b23ad1568b3b9e17301e23b042c23870f9ee0522dc5cc79e8", + "sha256:bfba7c6d5d7c9099ba21f84662b037a0ffd4a5e6b26ac07d19e423e6fdf965a9", + "sha256:c207fff63adcdf5a485969131dc70e4b194327666b7e8a87a97fbc4fd80a53b2", + "sha256:d0509e469d48940147e1235d994cd849a8f8195e0bca65f8f5439c56e17872a3", + "sha256:d16cce709ebfadc91278a1c005e3c17dd5f71f5098bfae1035149785ea6e9c68", + "sha256:d48b8ee1d4068561ce8033d2c344cf5232cb29ee1a0206a7b828c79cbc5982b8", + "sha256:de989b195c3d636ba000ee4281cd03bb1234635b124bf4cd89eeee9ca8fcb09d", + "sha256:e07c8e79d6e6fd37b42f3250dba122053fddb319e84b55dd3a8d6446e1a7ee49", + "sha256:e2c2e459f7050aeb7c1b1276763364884595d47000c1cddb51764c0d8976e608", + "sha256:e5b20e9599ba74391ca0cfbd7b328fcc20976823ba19bc573983a25b32e92b57", + "sha256:e875b6086e325bab7e680e4316d667fc0e5e174bb5611eb16b3ea121c8951b86", + "sha256:f4f052ee022928d34fe1f4d2bc743f32609fb79ed9c49a1710a5ad6b2198db20", + "sha256:fcb91630817aa8b9bc4a74023e4198480587269c272c58b3279875ed7235c293", + "sha256:fd9fc9c4849a07f3635ccffa895d57abce554b467d611a5009ba4f39b78a8849", + "sha256:feba80698173761cddd814fa22e88b0661e98cb810f9f986c54aa34d281e4937", + "sha256:feea820722e69451743a3d56ad74948b68bf456984d63c1a92e8347b7b88452d" + ], + "markers": "python_version >= '3.7'", + "version": "==6.0.2" + }, + "pamqp": { + "hashes": [ + "sha256:15acef752356593ca569d13dfedc8ada9f17deeeb8cec4f7b77825e2b6c7de3e", + "sha256:22550ceb1ca50aafda65873e77e8c1c1b139fb5975e1a09860fae940cf8e970a" + ], + "markers": "python_version >= '3.7'", + "version": "==3.2.1" + }, + "pydantic": { + "hashes": [ + "sha256:05e00dbebbe810b33c7a7362f231893183bcc4251f3f2ff991c31d5c08240c42", + "sha256:06094d18dd5e6f2bbf93efa54991c3240964bb663b87729ac340eb5014310624", + "sha256:0b959f4d8211fc964772b595ebb25f7652da3f22322c007b6fed26846a40685e", + "sha256:19b3b9ccf97af2b7519c42032441a891a5e05c68368f40865a90eb88833c2559", + "sha256:1b6ee725bd6e83ec78b1aa32c5b1fa67a3a65badddde3976bca5fe4568f27709", + "sha256:1ee433e274268a4b0c8fde7ad9d58ecba12b069a033ecc4645bb6303c062d2e9", + "sha256:216f3bcbf19c726b1cc22b099dd409aa371f55c08800bcea4c44c8f74b73478d", + "sha256:2d0567e60eb01bccda3a4df01df677adf6b437958d35c12a3ac3e0f078b0ee52", + "sha256:2e05aed07fa02231dbf03d0adb1be1d79cabb09025dd45aa094aa8b4e7b9dcda", + "sha256:352aedb1d71b8b0736c6d56ad2bd34c6982720644b0624462059ab29bd6e5912", + "sha256:355639d9afc76bcb9b0c3000ddcd08472ae75318a6eb67a15866b87e2efa168c", + "sha256:37c90345ec7dd2f1bcef82ce49b6235b40f282b94d3eec47e801baf864d15525", + "sha256:4b8795290deaae348c4eba0cebb196e1c6b98bdbe7f50b2d0d9a4a99716342fe", + "sha256:5760e164b807a48a8f25f8aa1a6d857e6ce62e7ec83ea5d5c5a802eac81bad41", + "sha256:6eb843dcc411b6a2237a694f5e1d649fc66c6064d02b204a7e9d194dff81eb4b", + "sha256:7b5ba54d026c2bd2cb769d3468885f23f43710f651688e91f5fb1edcf0ee9283", + "sha256:7c2abc4393dea97a4ccbb4ec7d8658d4e22c4765b7b9b9445588f16c71ad9965", + "sha256:81a7b66c3f499108b448f3f004801fcd7d7165fb4200acb03f1c2402da73ce4c", + "sha256:91b8e218852ef6007c2b98cd861601c6a09f1aa32bbbb74fab5b1c33d4a1e410", + "sha256:9300fcbebf85f6339a02c6994b2eb3ff1b9c8c14f502058b5bf349d42447dcf5", + "sha256:9cabf4a7f05a776e7793e72793cd92cc865ea0e83a819f9ae4ecccb1b8aa6116", + "sha256:a1f5a63a6dfe19d719b1b6e6106561869d2efaca6167f84f5ab9347887d78b98", + "sha256:a4c805731c33a8db4b6ace45ce440c4ef5336e712508b4d9e1aafa617dc9907f", + "sha256:ae544c47bec47a86bc7d350f965d8b15540e27e5aa4f55170ac6a75e5f73b644", + "sha256:b97890e56a694486f772d36efd2ba31612739bc6f3caeee50e9e7e3ebd2fdd13", + "sha256:bb6ad4489af1bac6955d38ebcb95079a836af31e4c4f74aba1ca05bb9f6027bd", + "sha256:bedf309630209e78582ffacda64a21f96f3ed2e51fbf3962d4d488e503420254", + "sha256:c1ba1afb396148bbc70e9eaa8c06c1716fdddabaf86e7027c5988bae2a829ab6", + "sha256:c33602f93bfb67779f9c507e4d69451664524389546bacfe1bee13cae6dc7488", + "sha256:c4aac8e7103bf598373208f6299fa9a5cfd1fc571f2d40bf1dd1955a63d6eeb5", + "sha256:c6f981882aea41e021f72779ce2a4e87267458cc4d39ea990729e21ef18f0f8c", + "sha256:cc78cc83110d2f275ec1970e7a831f4e371ee92405332ebfe9860a715f8336e1", + "sha256:d49f3db871575e0426b12e2f32fdb25e579dea16486a26e5a0474af87cb1ab0a", + "sha256:dd3f9a40c16daf323cf913593083698caee97df2804aa36c4b3175d5ac1b92a2", + "sha256:e0bedafe4bc165ad0a56ac0bd7695df25c50f76961da29c050712596cf092d6d", + "sha256:e9069e1b01525a96e6ff49e25876d90d5a563bc31c658289a8772ae186552236" + ], + "index": "pypi", + "version": "==1.10.2" + }, + "pyyaml": { + "hashes": [ + "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf", + "sha256:0283c35a6a9fbf047493e3a0ce8d79ef5030852c51e9d911a27badfde0605293", + "sha256:055d937d65826939cb044fc8c9b08889e8c743fdc6a32b33e2390f66013e449b", + "sha256:07751360502caac1c067a8132d150cf3d61339af5691fe9e87803040dbc5db57", + "sha256:0b4624f379dab24d3725ffde76559cff63d9ec94e1736b556dacdfebe5ab6d4b", + "sha256:0ce82d761c532fe4ec3f87fc45688bdd3a4c1dc5e0b4a19814b9009a29baefd4", + "sha256:1e4747bc279b4f613a09eb64bba2ba602d8a6664c6ce6396a4d0cd413a50ce07", + "sha256:213c60cd50106436cc818accf5baa1aba61c0189ff610f64f4a3e8c6726218ba", + "sha256:231710d57adfd809ef5d34183b8ed1eeae3f76459c18fb4a0b373ad56bedcdd9", + "sha256:277a0ef2981ca40581a47093e9e2d13b3f1fbbeffae064c1d21bfceba2030287", + "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513", + "sha256:40527857252b61eacd1d9af500c3337ba8deb8fc298940291486c465c8b46ec0", + "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782", + "sha256:473f9edb243cb1935ab5a084eb238d842fb8f404ed2193a915d1784b5a6b5fc0", + "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92", + "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f", + "sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2", + "sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc", + "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1", + "sha256:819b3830a1543db06c4d4b865e70ded25be52a2e0631ccd2f6a47a2822f2fd7c", + "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86", + "sha256:98c4d36e99714e55cfbaaee6dd5badbc9a1ec339ebfc3b1f52e293aee6bb71a4", + "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c", + "sha256:9fa600030013c4de8165339db93d182b9431076eb98eb40ee068700c9c813e34", + "sha256:a80a78046a72361de73f8f395f1f1e49f956c6be882eed58505a15f3e430962b", + "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d", + "sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c", + "sha256:b5b9eccad747aabaaffbc6064800670f0c297e52c12754eb1d976c57e4f74dcb", + "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7", + "sha256:c5687b8d43cf58545ade1fe3e055f70eac7a5a1a0bf42824308d868289a95737", + "sha256:cba8c411ef271aa037d7357a2bc8f9ee8b58b9965831d9e51baf703280dc73d3", + "sha256:d15a181d1ecd0d4270dc32edb46f7cb7733c7c508857278d3d378d14d606db2d", + "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358", + "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53", + "sha256:d4eccecf9adf6fbcc6861a38015c2a64f38b9d94838ac1810a9023a0609e1b78", + "sha256:d67d839ede4ed1b28a4e8909735fc992a923cdb84e618544973d7dfc71540803", + "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a", + "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f", + "sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174", + "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5" + ], + "markers": "python_version >= '3.6'", + "version": "==6.0" + }, + "typing-extensions": { + "hashes": [ + "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02", + "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6" + ], + "markers": "python_version >= '3.7'", + "version": "==4.3.0" + }, + "yarl": { + "hashes": [ + "sha256:076eede537ab978b605f41db79a56cad2e7efeea2aa6e0fa8f05a26c24a034fb", + "sha256:07b21e274de4c637f3e3b7104694e53260b5fc10d51fb3ec5fed1da8e0f754e3", + "sha256:0ab5a138211c1c366404d912824bdcf5545ccba5b3ff52c42c4af4cbdc2c5035", + "sha256:0c03f456522d1ec815893d85fccb5def01ffaa74c1b16ff30f8aaa03eb21e453", + "sha256:12768232751689c1a89b0376a96a32bc7633c08da45ad985d0c49ede691f5c0d", + "sha256:19cd801d6f983918a3f3a39f3a45b553c015c5aac92ccd1fac619bd74beece4a", + "sha256:1ca7e596c55bd675432b11320b4eacc62310c2145d6801a1f8e9ad160685a231", + "sha256:1e4808f996ca39a6463f45182e2af2fae55e2560be586d447ce8016f389f626f", + "sha256:205904cffd69ae972a1707a1bd3ea7cded594b1d773a0ce66714edf17833cdae", + "sha256:20df6ff4089bc86e4a66e3b1380460f864df3dd9dccaf88d6b3385d24405893b", + "sha256:21ac44b763e0eec15746a3d440f5e09ad2ecc8b5f6dcd3ea8cb4773d6d4703e3", + "sha256:29e256649f42771829974e742061c3501cc50cf16e63f91ed8d1bf98242e5507", + "sha256:2d800b9c2eaf0684c08be5f50e52bfa2aa920e7163c2ea43f4f431e829b4f0fd", + "sha256:2d93a049d29df172f48bcb09acf9226318e712ce67374f893b460b42cc1380ae", + "sha256:31a9a04ecccd6b03e2b0e12e82131f1488dea5555a13a4d32f064e22a6003cfe", + "sha256:3d1a50e461615747dd93c099f297c1994d472b0f4d2db8a64e55b1edf704ec1c", + "sha256:449c957ffc6bc2309e1fbe67ab7d2c1efca89d3f4912baeb8ead207bb3cc1cd4", + "sha256:4a88510731cd8d4befaba5fbd734a7dd914de5ab8132a5b3dde0bbd6c9476c64", + "sha256:4c322cbaa4ed78a8aac89b2174a6df398faf50e5fc12c4c191c40c59d5e28357", + "sha256:5395da939ffa959974577eff2cbfc24b004a2fb6c346918f39966a5786874e54", + "sha256:5587bba41399854703212b87071c6d8638fa6e61656385875f8c6dff92b2e461", + "sha256:56c11efb0a89700987d05597b08a1efcd78d74c52febe530126785e1b1a285f4", + "sha256:5999c4662631cb798496535afbd837a102859568adc67d75d2045e31ec3ac497", + "sha256:59ddd85a1214862ce7c7c66457f05543b6a275b70a65de366030d56159a979f0", + "sha256:6347f1a58e658b97b0a0d1ff7658a03cb79bdbda0331603bed24dd7054a6dea1", + "sha256:6628d750041550c5d9da50bb40b5cf28a2e63b9388bac10fedd4f19236ef4957", + "sha256:6afb336e23a793cd3b6476c30f030a0d4c7539cd81649683b5e0c1b0ab0bf350", + "sha256:6c8148e0b52bf9535c40c48faebb00cb294ee577ca069d21bd5c48d302a83780", + "sha256:76577f13333b4fe345c3704811ac7509b31499132ff0181f25ee26619de2c843", + "sha256:7c0da7e44d0c9108d8b98469338705e07f4bb7dab96dbd8fa4e91b337db42548", + "sha256:7de89c8456525650ffa2bb56a3eee6af891e98f498babd43ae307bd42dca98f6", + "sha256:7ec362167e2c9fd178f82f252b6d97669d7245695dc057ee182118042026da40", + "sha256:7fce6cbc6c170ede0221cc8c91b285f7f3c8b9fe28283b51885ff621bbe0f8ee", + "sha256:85cba594433915d5c9a0d14b24cfba0339f57a2fff203a5d4fd070e593307d0b", + "sha256:8b0af1cf36b93cee99a31a545fe91d08223e64390c5ecc5e94c39511832a4bb6", + "sha256:9130ddf1ae9978abe63808b6b60a897e41fccb834408cde79522feb37fb72fb0", + "sha256:99449cd5366fe4608e7226c6cae80873296dfa0cde45d9b498fefa1de315a09e", + "sha256:9de955d98e02fab288c7718662afb33aab64212ecb368c5dc866d9a57bf48880", + "sha256:a0fb2cb4204ddb456a8e32381f9a90000429489a25f64e817e6ff94879d432fc", + "sha256:a165442348c211b5dea67c0206fc61366212d7082ba8118c8c5c1c853ea4d82e", + "sha256:ab2a60d57ca88e1d4ca34a10e9fb4ab2ac5ad315543351de3a612bbb0560bead", + "sha256:abc06b97407868ef38f3d172762f4069323de52f2b70d133d096a48d72215d28", + "sha256:af887845b8c2e060eb5605ff72b6f2dd2aab7a761379373fd89d314f4752abbf", + "sha256:b19255dde4b4f4c32e012038f2c169bb72e7f081552bea4641cab4d88bc409dd", + "sha256:b3ded839a5c5608eec8b6f9ae9a62cb22cd037ea97c627f38ae0841a48f09eae", + "sha256:c1445a0c562ed561d06d8cbc5c8916c6008a31c60bc3655cdd2de1d3bf5174a0", + "sha256:d0272228fabe78ce00a3365ffffd6f643f57a91043e119c289aaba202f4095b0", + "sha256:d0b51530877d3ad7a8d47b2fff0c8df3b8f3b8deddf057379ba50b13df2a5eae", + "sha256:d0f77539733e0ec2475ddcd4e26777d08996f8cd55d2aef82ec4d3896687abda", + "sha256:d2b8f245dad9e331540c350285910b20dd913dc86d4ee410c11d48523c4fd546", + "sha256:dd032e8422a52e5a4860e062eb84ac94ea08861d334a4bcaf142a63ce8ad4802", + "sha256:de49d77e968de6626ba7ef4472323f9d2e5a56c1d85b7c0e2a190b2173d3b9be", + "sha256:de839c3a1826a909fdbfe05f6fe2167c4ab033f1133757b5936efe2f84904c07", + "sha256:e80ed5a9939ceb6fda42811542f31c8602be336b1fb977bccb012e83da7e4936", + "sha256:ea30a42dc94d42f2ba4d0f7c0ffb4f4f9baa1b23045910c0c32df9c9902cb272", + "sha256:ea513a25976d21733bff523e0ca836ef1679630ef4ad22d46987d04b372d57fc", + "sha256:ed19b74e81b10b592084a5ad1e70f845f0aacb57577018d31de064e71ffa267a", + "sha256:f5af52738e225fcc526ae64071b7e5342abe03f42e0e8918227b38c9aa711e28", + "sha256:fae37373155f5ef9b403ab48af5136ae9851151f7aacd9926251ab26b953118b" + ], + "markers": "python_version >= '3.7'", + "version": "==1.8.1" + } + }, + "develop": { + "attrs": { + "hashes": [ + "sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6", + "sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c" + ], + "markers": "python_version >= '3.5'", + "version": "==22.1.0" + }, + "bandit": { + "hashes": [ + "sha256:2d63a8c573417bae338962d4b9b06fbc6080f74ecd955a092849e1e65c717bd2", + "sha256:412d3f259dab4077d0e7f0c11f50f650cc7d10db905d98f6520a95a18049658a" + ], + "index": "pypi", + "version": "==1.7.4" + }, + "commonmark": { + "hashes": [ + "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60", + "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9" + ], + "version": "==0.9.1" + }, + "coverage": { + "extras": [ + "toml" + ], + "hashes": [ + "sha256:01778769097dbd705a24e221f42be885c544bb91251747a8a3efdec6eb4788f2", + "sha256:08002f9251f51afdcc5e3adf5d5d66bb490ae893d9e21359b085f0e03390a820", + "sha256:1238b08f3576201ebf41f7c20bf59baa0d05da941b123c6656e42cdb668e9827", + "sha256:14a32ec68d721c3d714d9b105c7acf8e0f8a4f4734c811eda75ff3718570b5e3", + "sha256:15e38d853ee224e92ccc9a851457fb1e1f12d7a5df5ae44544ce7863691c7a0d", + "sha256:354df19fefd03b9a13132fa6643527ef7905712109d9c1c1903f2133d3a4e145", + "sha256:35ef1f8d8a7a275aa7410d2f2c60fa6443f4a64fae9be671ec0696a68525b875", + "sha256:4179502f210ebed3ccfe2f78bf8e2d59e50b297b598b100d6c6e3341053066a2", + "sha256:42c499c14efd858b98c4e03595bf914089b98400d30789511577aa44607a1b74", + "sha256:4b7101938584d67e6f45f0015b60e24a95bf8dea19836b1709a80342e01b472f", + "sha256:564cd0f5b5470094df06fab676c6d77547abfdcb09b6c29c8a97c41ad03b103c", + "sha256:5f444627b3664b80d078c05fe6a850dd711beeb90d26731f11d492dcbadb6973", + "sha256:6113e4df2fa73b80f77663445be6d567913fb3b82a86ceb64e44ae0e4b695de1", + "sha256:61b993f3998ee384935ee423c3d40894e93277f12482f6e777642a0141f55782", + "sha256:66e6df3ac4659a435677d8cd40e8eb1ac7219345d27c41145991ee9bf4b806a0", + "sha256:67f9346aeebea54e845d29b487eb38ec95f2ecf3558a3cffb26ee3f0dcc3e760", + "sha256:6913dddee2deff8ab2512639c5168c3e80b3ebb0f818fed22048ee46f735351a", + "sha256:6a864733b22d3081749450466ac80698fe39c91cb6849b2ef8752fd7482011f3", + "sha256:7026f5afe0d1a933685d8f2169d7c2d2e624f6255fb584ca99ccca8c0e966fd7", + "sha256:783bc7c4ee524039ca13b6d9b4186a67f8e63d91342c713e88c1865a38d0892a", + "sha256:7a98d6bf6d4ca5c07a600c7b4e0c5350cd483c85c736c522b786be90ea5bac4f", + "sha256:8d032bfc562a52318ae05047a6eb801ff31ccee172dc0d2504614e911d8fa83e", + "sha256:98c0b9e9b572893cdb0a00e66cf961a238f8d870d4e1dc8e679eb8bdc2eb1b86", + "sha256:9c7b9b498eb0c0d48b4c2abc0e10c2d78912203f972e0e63e3c9dc21f15abdaa", + "sha256:9cc4f107009bca5a81caef2fca843dbec4215c05e917a59dec0c8db5cff1d2aa", + "sha256:9d6e1f3185cbfd3d91ac77ea065d85d5215d3dfa45b191d14ddfcd952fa53796", + "sha256:a095aa0a996ea08b10580908e88fbaf81ecf798e923bbe64fb98d1807db3d68a", + "sha256:a3b2752de32c455f2521a51bd3ffb53c5b3ae92736afde67ce83477f5c1dd928", + "sha256:ab066f5ab67059d1f1000b5e1aa8bbd75b6ed1fc0014559aea41a9eb66fc2ce0", + "sha256:c1328d0c2f194ffda30a45f11058c02410e679456276bfa0bbe0b0ee87225fac", + "sha256:c35cca192ba700979d20ac43024a82b9b32a60da2f983bec6c0f5b84aead635c", + "sha256:cbbb0e4cd8ddcd5ef47641cfac97d8473ab6b132dd9a46bacb18872828031685", + "sha256:cdbb0d89923c80dbd435b9cf8bba0ff55585a3cdb28cbec65f376c041472c60d", + "sha256:cf2afe83a53f77aec067033199797832617890e15bed42f4a1a93ea24794ae3e", + "sha256:d5dd4b8e9cd0deb60e6fcc7b0647cbc1da6c33b9e786f9c79721fd303994832f", + "sha256:dfa0b97eb904255e2ab24166071b27408f1f69c8fbda58e9c0972804851e0558", + "sha256:e16c45b726acb780e1e6f88b286d3c10b3914ab03438f32117c4aa52d7f30d58", + "sha256:e1fabd473566fce2cf18ea41171d92814e4ef1495e04471786cbc943b89a3781", + "sha256:e3d3c4cc38b2882f9a15bafd30aec079582b819bec1b8afdbde8f7797008108a", + "sha256:e431e305a1f3126477abe9a184624a85308da8edf8486a863601d58419d26ffa", + "sha256:e7b4da9bafad21ea45a714d3ea6f3e1679099e420c8741c74905b92ee9bfa7cc", + "sha256:ee2b2fb6eb4ace35805f434e0f6409444e1466a47f620d1d5763a22600f0f892", + "sha256:ee6ae6bbcac0786807295e9687169fba80cb0617852b2fa118a99667e8e6815d", + "sha256:ef6f44409ab02e202b31a05dd6666797f9de2aa2b4b3534e9d450e42dea5e817", + "sha256:f67cf9f406cf0d2f08a3515ce2db5b82625a7257f88aad87904674def6ddaec1", + "sha256:f855b39e4f75abd0dfbcf74a82e84ae3fc260d523fcb3532786bcbbcb158322c", + "sha256:fc600f6ec19b273da1d85817eda339fb46ce9eef3e89f220055d8696e0a06908", + "sha256:fcbe3d9a53e013f8ab88734d7e517eb2cd06b7e689bedf22c0eb68db5e4a0a19", + "sha256:fde17bc42e0716c94bf19d92e4c9f5a00c5feb401f5bc01101fdf2a8b7cacf60", + "sha256:ff934ced84054b9018665ca3967fc48e1ac99e811f6cc99ea65978e1d384454b" + ], + "markers": "python_version >= '3.7'", + "version": "==6.4.4" + }, + "eradicate": { + "hashes": [ + "sha256:8bfaca181db9227dc88bdbce4d051a9627604c2243e7d85324f6d6ce0fd08bb2", + "sha256:aac7384ab25b1bf21c4c012de9b4bf8398945a14c98c911545b2ea50ab558014" + ], + "version": "==2.1.0" + }, + "flake8": { + "hashes": [ + "sha256:6fbe320aad8d6b95cec8b8e47bc933004678dc63095be98528b7bdd2a9f510db", + "sha256:7a1cf6b73744f5806ab95e526f6f0d8c01c66d7bbe349562d22dfca20610b248" + ], + "index": "pypi", + "version": "==5.0.4" + }, + "flake8-bandit": { + "hashes": [ + "sha256:068e09287189cbfd7f986e92605adea2067630b75380c6b5733dab7d87f9a84e", + "sha256:4c8a53eb48f23d4ef1e59293657181a3c989d0077c9952717e98a0eace43e06d" + ], + "index": "pypi", + "version": "==4.1.1" + }, + "flake8-broken-line": { + "hashes": [ + "sha256:7c98de9dd1385b71e888709c7f2aee3f0514107ecb5875bc95d0c03392191c97", + "sha256:daafb19b67eead0410ce7ba155d51a15b9d020ebe7630d87de9c2b93cedb6703" + ], + "index": "pypi", + "version": "==0.5.0" + }, + "flake8-commas": { + "hashes": [ + "sha256:940441ab8ee544df564ae3b3f49f20462d75d5c7cac2463e0b27436e2050f263", + "sha256:ebb96c31e01d0ef1d0685a21f3f0e2f8153a0381430e748bf0bbbb5d5b453d54" + ], + "index": "pypi", + "version": "==2.1.0" + }, + "flake8-comprehensions": { + "hashes": [ + "sha256:181158f7e7aa26a63a0a38e6017cef28c6adee71278ce56ce11f6ec9c4905058", + "sha256:dad454fd3d525039121e98fa1dd90c46bc138708196a4ebbc949ad3c859adedb" + ], + "index": "pypi", + "version": "==3.10.0" + }, + "flake8-eradicate": { + "hashes": [ + "sha256:85a71e0c5f4e07f7c6c5fec520483561fd6bd295417d622855bdeade99242e3d", + "sha256:e4c98f00d17dc8653e3388cac2624cd81e9735de2fd4a8dcf99029633ebd7a63" + ], + "index": "pypi", + "version": "==1.3.0" + }, + "flake8-isort": { + "hashes": [ + "sha256:26571500cd54976bbc0cf1006ffbcd1a68dd102f816b7a1051b219616ba9fee0", + "sha256:5b87630fb3719bf4c1833fd11e0d9534f43efdeba524863e15d8f14a7ef6adbf" + ], + "index": "pypi", + "version": "==4.2.0" + }, + "flake8-print": { + "hashes": [ + "sha256:76915a2a389cc1c0879636c219eb909c38501d3a43cc8dae542081c9ba48bdf9", + "sha256:84a1a6ea10d7056b804221ac5e62b1cee1aefc897ce16f2e5c42d3046068f5d8" + ], + "index": "pypi", + "version": "==5.0.0" + }, + "flake8-use-fstring": { + "hashes": [ + "sha256:6550bf722585eb97dffa8343b0f1c372101f5c4ab5b07ebf0edd1c79880cdd39" + ], + "index": "pypi", + "version": "==1.4" + }, + "gitdb": { + "hashes": [ + "sha256:8033ad4e853066ba6ca92050b9df2f89301b8fc8bf7e9324d412a63f8bf1a8fd", + "sha256:bac2fd45c0a1c9cf619e63a90d62bdc63892ef92387424b855792a6cabe789aa" + ], + "markers": "python_version >= '3.6'", + "version": "==4.0.9" + }, + "gitpython": { + "hashes": [ + "sha256:1c885ce809e8ba2d88a29befeb385fcea06338d3640712b59ca623c220bb5704", + "sha256:5b68b000463593e05ff2b261acff0ff0972df8ab1b70d3cdbd41b546c8b8fc3d" + ], + "markers": "python_version >= '3.7'", + "version": "==3.1.27" + }, + "iniconfig": { + "hashes": [ + "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3", + "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32" + ], + "version": "==1.1.1" + }, + "isort": { + "hashes": [ + "sha256:6f62d78e2f89b4500b080fe3a81690850cd254227f27f75c3a0c491a1f351ba7", + "sha256:e8443a5e7a020e9d7f97f1d7d9cd17c88bcb3bc7e218bf9cf5095fe550be2951" + ], + "markers": "python_full_version >= '3.6.1' and python_full_version < '4.0.0'", + "version": "==5.10.1" + }, + "mccabe": { + "hashes": [ + "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325", + "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e" + ], + "markers": "python_version >= '3.6'", + "version": "==0.7.0" + }, + "mypy": { + "hashes": [ + "sha256:02ef476f6dcb86e6f502ae39a16b93285fef97e7f1ff22932b657d1ef1f28655", + "sha256:0d054ef16b071149917085f51f89555a576e2618d5d9dd70bd6eea6410af3ac9", + "sha256:19830b7dba7d5356d3e26e2427a2ec91c994cd92d983142cbd025ebe81d69cf3", + "sha256:1f7656b69974a6933e987ee8ffb951d836272d6c0f81d727f1d0e2696074d9e6", + "sha256:23488a14a83bca6e54402c2e6435467a4138785df93ec85aeff64c6170077fb0", + "sha256:23c7ff43fff4b0df93a186581885c8512bc50fc4d4910e0f838e35d6bb6b5e58", + "sha256:25c5750ba5609a0c7550b73a33deb314ecfb559c350bb050b655505e8aed4103", + "sha256:2ad53cf9c3adc43cf3bea0a7d01a2f2e86db9fe7596dfecb4496a5dda63cbb09", + "sha256:3fa7a477b9900be9b7dd4bab30a12759e5abe9586574ceb944bc29cddf8f0417", + "sha256:40b0f21484238269ae6a57200c807d80debc6459d444c0489a102d7c6a75fa56", + "sha256:4b21e5b1a70dfb972490035128f305c39bc4bc253f34e96a4adf9127cf943eb2", + "sha256:5a361d92635ad4ada1b1b2d3630fc2f53f2127d51cf2def9db83cba32e47c856", + "sha256:77a514ea15d3007d33a9e2157b0ba9c267496acf12a7f2b9b9f8446337aac5b0", + "sha256:855048b6feb6dfe09d3353466004490b1872887150c5bb5caad7838b57328cc8", + "sha256:9796a2ba7b4b538649caa5cecd398d873f4022ed2333ffde58eaf604c4d2cb27", + "sha256:98e02d56ebe93981c41211c05adb630d1d26c14195d04d95e49cd97dbc046dc5", + "sha256:b793b899f7cf563b1e7044a5c97361196b938e92f0a4343a5d27966a53d2ec71", + "sha256:d1ea5d12c8e2d266b5fb8c7a5d2e9c0219fedfeb493b7ed60cd350322384ac27", + "sha256:d2022bfadb7a5c2ef410d6a7c9763188afdb7f3533f22a0a32be10d571ee4bbe", + "sha256:d3348e7eb2eea2472db611486846742d5d52d1290576de99d59edeb7cd4a42ca", + "sha256:d744f72eb39f69312bc6c2abf8ff6656973120e2eb3f3ec4f758ed47e414a4bf", + "sha256:ef943c72a786b0f8d90fd76e9b39ce81fb7171172daf84bf43eaf937e9f220a9", + "sha256:f2899a3cbd394da157194f913a931edfd4be5f274a88041c9dc2d9cdcb1c315c" + ], + "index": "pypi", + "version": "==0.971" + }, + "mypy-extensions": { + "hashes": [ + "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d", + "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8" + ], + "version": "==0.4.3" + }, + "packaging": { + "hashes": [ + "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb", + "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522" + ], + "markers": "python_version >= '3.6'", + "version": "==21.3" + }, + "pbr": { + "hashes": [ + "sha256:cfcc4ff8e698256fc17ea3ff796478b050852585aa5bae79ecd05b2ab7b39b9a", + "sha256:da3e18aac0a3c003e9eea1a81bd23e5a3a75d745670dcf736317b7d966887fdf" + ], + "markers": "python_version >= '2.6'", + "version": "==5.10.0" + }, + "pep8-naming": { + "hashes": [ + "sha256:59e29e55c478db69cffbe14ab24b5bd2cd615c0413edf790d47d3fb7ba9a4e23", + "sha256:93eef62f525fd12a6f8c98f4dcc17fa70baae2f37fa1f73bec00e3e44392fa48" + ], + "index": "pypi", + "version": "==0.13.2" + }, + "pluggy": { + "hashes": [ + "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159", + "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3" + ], + "markers": "python_version >= '3.6'", + "version": "==1.0.0" + }, + "pprintpp": { + "hashes": [ + "sha256:b6b4dcdd0c0c0d75e4d7b2f21a9e933e5b2ce62b26e1a54537f9651ae5a5c01d", + "sha256:ea826108e2c7f49dc6d66c752973c3fc9749142a798d6b254e1e301cfdbc6403" + ], + "version": "==0.4.0" + }, + "py": { + "hashes": [ + "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719", + "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==1.11.0" + }, + "pycodestyle": { + "hashes": [ + "sha256:2c9607871d58c76354b697b42f5d57e1ada7d261c261efac224b664affdc5785", + "sha256:d1735fc58b418fd7c5f658d28d943854f8a849b01a5d0a1e6f3f3fdd0166804b" + ], + "markers": "python_version >= '3.6'", + "version": "==2.9.1" + }, + "pyflakes": { + "hashes": [ + "sha256:4579f67d887f804e67edb544428f264b7b24f435b263c4614f384135cea553d2", + "sha256:491feb020dca48ccc562a8c0cbe8df07ee13078df59813b83959cbdada312ea3" + ], + "markers": "python_version >= '3.6'", + "version": "==2.5.0" + }, + "pygments": { + "hashes": [ + "sha256:56a8508ae95f98e2b9bdf93a6be5ae3f7d8af858b43e02c5a2ff083726be40c1", + "sha256:f643f331ab57ba3c9d89212ee4a2dabc6e94f117cf4eefde99a0574720d14c42" + ], + "markers": "python_version >= '3.6'", + "version": "==2.13.0" + }, + "pyparsing": { + "hashes": [ + "sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb", + "sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc" + ], + "markers": "python_full_version >= '3.6.8'", + "version": "==3.0.9" + }, + "pytest": { + "hashes": [ + "sha256:1377bda3466d70b55e3f5cecfa55bb7cfcf219c7964629b967c37cf0bda818b7", + "sha256:4f365fec2dff9c1162f834d9f18af1ba13062db0c708bf7b946f8a5c76180c39" + ], + "index": "pypi", + "version": "==7.1.3" + }, + "pytest-asyncio": { + "hashes": [ + "sha256:7a97e37cfe1ed296e2e84941384bdd37c376453912d397ed39293e0916f521fa", + "sha256:ac4ebf3b6207259750bc32f4c1d8fcd7e79739edbc67ad0c58dd150b1d072fed" + ], + "index": "pypi", + "version": "==0.19.0" + }, + "pytest-clarity": { + "hashes": [ + "sha256:505fe345fad4fe11c6a4187fe683f2c7c52c077caa1e135f3e483fe112db7772" + ], + "index": "pypi", + "version": "==1.0.1" + }, + "pytest-cov": { + "hashes": [ + "sha256:578d5d15ac4a25e5f961c938b85a05b09fdaae9deef3bb6de9a6e766622ca7a6", + "sha256:e7f0f5b1617d2210a2cabc266dfe2f4c75a8d32fb89eafb7ad9d06f6d076d470" + ], + "index": "pypi", + "version": "==3.0.0" + }, + "pytest-mock": { + "hashes": [ + "sha256:77f03f4554392558700295e05aed0b1096a20d4a60a4f3ddcde58b0c31c8fca2", + "sha256:8a9e226d6c0ef09fcf20c94eb3405c388af438a90f3e39687f84166da82d5948" + ], + "index": "pypi", + "version": "==3.8.2" + }, + "pyyaml": { + "hashes": [ + "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf", + "sha256:0283c35a6a9fbf047493e3a0ce8d79ef5030852c51e9d911a27badfde0605293", + "sha256:055d937d65826939cb044fc8c9b08889e8c743fdc6a32b33e2390f66013e449b", + "sha256:07751360502caac1c067a8132d150cf3d61339af5691fe9e87803040dbc5db57", + "sha256:0b4624f379dab24d3725ffde76559cff63d9ec94e1736b556dacdfebe5ab6d4b", + "sha256:0ce82d761c532fe4ec3f87fc45688bdd3a4c1dc5e0b4a19814b9009a29baefd4", + "sha256:1e4747bc279b4f613a09eb64bba2ba602d8a6664c6ce6396a4d0cd413a50ce07", + "sha256:213c60cd50106436cc818accf5baa1aba61c0189ff610f64f4a3e8c6726218ba", + "sha256:231710d57adfd809ef5d34183b8ed1eeae3f76459c18fb4a0b373ad56bedcdd9", + "sha256:277a0ef2981ca40581a47093e9e2d13b3f1fbbeffae064c1d21bfceba2030287", + "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513", + "sha256:40527857252b61eacd1d9af500c3337ba8deb8fc298940291486c465c8b46ec0", + "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782", + "sha256:473f9edb243cb1935ab5a084eb238d842fb8f404ed2193a915d1784b5a6b5fc0", + "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92", + "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f", + "sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2", + "sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc", + "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1", + "sha256:819b3830a1543db06c4d4b865e70ded25be52a2e0631ccd2f6a47a2822f2fd7c", + "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86", + "sha256:98c4d36e99714e55cfbaaee6dd5badbc9a1ec339ebfc3b1f52e293aee6bb71a4", + "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c", + "sha256:9fa600030013c4de8165339db93d182b9431076eb98eb40ee068700c9c813e34", + "sha256:a80a78046a72361de73f8f395f1f1e49f956c6be882eed58505a15f3e430962b", + "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d", + "sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c", + "sha256:b5b9eccad747aabaaffbc6064800670f0c297e52c12754eb1d976c57e4f74dcb", + "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7", + "sha256:c5687b8d43cf58545ade1fe3e055f70eac7a5a1a0bf42824308d868289a95737", + "sha256:cba8c411ef271aa037d7357a2bc8f9ee8b58b9965831d9e51baf703280dc73d3", + "sha256:d15a181d1ecd0d4270dc32edb46f7cb7733c7c508857278d3d378d14d606db2d", + "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358", + "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53", + "sha256:d4eccecf9adf6fbcc6861a38015c2a64f38b9d94838ac1810a9023a0609e1b78", + "sha256:d67d839ede4ed1b28a4e8909735fc992a923cdb84e618544973d7dfc71540803", + "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a", + "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f", + "sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174", + "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5" + ], + "markers": "python_version >= '3.6'", + "version": "==6.0" + }, + "rich": { + "hashes": [ + "sha256:2eb4e6894cde1e017976d2975ac210ef515d7548bc595ba20e195fb9628acdeb", + "sha256:63a5c5ce3673d3d5fbbf23cd87e11ab84b6b451436f1b7f19ec54b6bc36ed7ca" + ], + "markers": "python_full_version >= '3.6.3' and python_full_version < '4.0.0'", + "version": "==12.5.1" + }, + "setuptools": { + "hashes": [ + "sha256:2e24e0bec025f035a2e72cdd1961119f557d78ad331bb00ff82efb2ab8da8e82", + "sha256:7732871f4f7fa58fb6bdcaeadb0161b2bd046c85905dbaa066bdcbcc81953b57" + ], + "markers": "python_version >= '3.7'", + "version": "==65.3.0" + }, + "smmap": { + "hashes": [ + "sha256:2aba19d6a040e78d8b09de5c57e96207b09ed71d8e55ce0959eeee6c8e190d94", + "sha256:c840e62059cd3be204b0c9c9f74be2c09d5648eddd4580d9314c3ecde0b30936" + ], + "markers": "python_version >= '3.6'", + "version": "==5.0.0" + }, + "stevedore": { + "hashes": [ + "sha256:87e4d27fe96d0d7e4fc24f0cbe3463baae4ec51e81d95fbe60d2474636e0c7d8", + "sha256:f82cc99a1ff552310d19c379827c2c64dd9f85a38bcd5559db2470161867b786" + ], + "markers": "python_version >= '3.8'", + "version": "==4.0.0" + }, + "tomli": { + "hashes": [ + "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc", + "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f" + ], + "markers": "python_version < '3.11'", + "version": "==2.0.1" + }, + "typing-extensions": { + "hashes": [ + "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02", + "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6" + ], + "markers": "python_version >= '3.7'", + "version": "==4.3.0" + } + } +} diff --git a/VERSION b/VERSION index 59e9e60..6b34d9c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.11 +1.1.0a1 diff --git a/examples/bridge_between_two_rabbits/app.py b/examples/bridge_between_two_rabbits/app.py index a17a7ec..695b3cf 100644 --- a/examples/bridge_between_two_rabbits/app.py +++ b/examples/bridge_between_two_rabbits/app.py @@ -1,11 +1,10 @@ from mela import Mela app = Mela(__name__) -app.read_config_yaml('application.yml') -@app.service("input", "output") -def serve(body, message): +@app.service("bridge") +async def serve(body, message): return body diff --git a/examples/bridge_between_two_rabbits/application.yml b/examples/bridge_between_two_rabbits/application.yml index c08855a..059815c 100644 --- a/examples/bridge_between_two_rabbits/application.yml +++ b/examples/bridge_between_two_rabbits/application.yml @@ -1,30 +1,24 @@ connections: input_connection: - host: ${RABBIT_INPUT_HOST} - port: 5672 - username: rabbitmq-bridge - password: rabbitmq-bridge - connTimeout: 1000 - heartbeat: 360 + host: $RABBIT_INPUT_HOST + port: ${RABBIT_INPUT_PORT|5672} + username: ${RABBIT_INPUT_USERNAME|rabbitmq-bridge} + password: ${RABBIT_INPUT_PASSWORD|rabbitmq-bridge} output_connection: - host: ${RABBIT_OUTPUT_HOST} - port: 5672 - username: rabbitmq-bridge - password: rabbitmq-bridge - connTimeout: 1000 - heartbeat: 360 + host: $RABBIT_OUTPUT_HOST + port: ${RABBIT_OUTPUT_PORT|5672} + username: ${RABBIT_OUTPUT_USERNAME|rabbitmq-bridge} + password: ${RABBIT_OUTPUT_PASSWORD|rabbitmq-bridge} -consumers: - input: - connection: input_connection - prefetch_count: 100 - routing_key: general-sentiment-q - exchange: general-sentiment-x - queue: general-sentiment-temp-q - decode: json - -producers: - output: - connection: output_connection - routing_key: general-sentiment-q - exchange: general-sentiment-x \ No newline at end of file +services: + bridge: + consumer: + connection: input_connection + prefetch_count: ${RABBIT_INPUT_PREFETCH_COUNT|1} + routing_key: ${RABBIT_INPUT_ROUTING_KEY} + exchange: ${RABBIT_INPUT_EXCHANGE} + queue: ${RABBIT_INPUT_QUEUE} + producer: + connection: output_connection + routing_key: ${RABBIT_OUTPUT_ROUTING_KEY} + exchange: ${RABBIT_OUTPUT_EXCHANGE} diff --git a/examples/dead_letter_exchange/app.py b/examples/dead_letter_exchange/app.py index 0bc69ae..55829fd 100644 --- a/examples/dead_letter_exchange/app.py +++ b/examples/dead_letter_exchange/app.py @@ -1,25 +1,21 @@ from mela import Mela +from mela.components.exceptions import NackMessageError app = Mela(__name__) -app.read_config_yaml('application.yml') i = 0 -@app.service("printer") +@app.service("service_with_dlx") async def printer(body, message): global i i += 1 if i % 2 == 0: - raise NotImplementedError("Method is not implemented") - return i - # for obj in body: - # if i % 2 == 0: - # yield obj - # else: - # yield obj, {'routing_key': "test_queue2"} - # raise ConnectionError("something went wrong") - # i += 1 + print(body, "NO") + raise NackMessageError("Method is not implemented", requeue=False) + else: + print(body, "YES") + return body if __name__ == '__main__': diff --git a/examples/dead_letter_exchange/application.yml b/examples/dead_letter_exchange/application.yml index 0a98b07..ebf0810 100644 --- a/examples/dead_letter_exchange/application.yml +++ b/examples/dead_letter_exchange/application.yml @@ -6,7 +6,7 @@ connections: password: admin services: - printer: + service_with_dlx: consumer: exchange: dlx-test-x routing_key: dlx-test-k diff --git a/examples/error_handler/app.py b/examples/error_handler/app.py index b81f0ab..ccc8a5e 100644 --- a/examples/error_handler/app.py +++ b/examples/error_handler/app.py @@ -1,22 +1,11 @@ from mela import Mela app = Mela(__name__) -app.read_config_yaml('application.yml') @app.service("splitter") async def printer(body, message): - i = 0 - i += 1 raise NotImplementedError("Method is not implemented") - return i - # for obj in body: - # if i % 2 == 0: - # yield obj - # else: - # yield obj, {'routing_key': "test_queue2"} - # raise ConnectionError("something went wrong") - # i += 1 if __name__ == '__main__': diff --git a/examples/exchange_type_topic/app.py b/examples/exchange_type_topic/app.py index 0a942da..bc59a4e 100644 --- a/examples/exchange_type_topic/app.py +++ b/examples/exchange_type_topic/app.py @@ -1,12 +1,10 @@ -from aio_pika import IncomingMessage from mela import Mela app = Mela(__name__) -app.read_config_yaml('application.yml') @app.service("printer") -def printer(body, message: IncomingMessage): +def printer(body, message): print(body) print(message.routing_key) return body diff --git a/examples/extra_publishing_service/app.py b/examples/extra_publishing_service/app.py index 3d2a8c9..fe806d8 100644 --- a/examples/extra_publishing_service/app.py +++ b/examples/extra_publishing_service/app.py @@ -1,27 +1,23 @@ +from pydantic import BaseModel, Field from mela import Mela +from mela.components import Publisher +from mela.settings import Settings -app = Mela(__name__) -app.read_config_yaml('application.yml') -logging_publisher = app.publisher() +class Document(BaseModel): + id_: int = Field(alias='id') + +app = Mela(__name__) +app.settings = Settings() -SPLITTER_SERVICE_NAME = "splitter" +log_publisher: Publisher = app.publisher_instance('log') -@app.service(SPLITTER_SERVICE_NAME) -async def logger(body, message): - default_routing_key = app.services['splitter'].config['publisher']['routing_key'] - i = 0 - for obj in body: - if i % 2 == 0: - routing_key = default_routing_key - else: - routing_key = "test_queue2" - yield obj, {'routing_key': routing_key} - # Anyway, we should publish message into logging exchange - await logging_publisher.publish(obj, routing_key='.'.join([SPLITTER_SERVICE_NAME, routing_key])) - i += 1 +@app.service('extra_publishing') +async def logger(body: Document): + await log_publisher.publish(body) + return body if __name__ == '__main__': diff --git a/examples/extra_publishing_service/application.yml b/examples/extra_publishing_service/application.yml index 9e1ae84..4339fd9 100644 --- a/examples/extra_publishing_service/application.yml +++ b/examples/extra_publishing_service/application.yml @@ -6,7 +6,7 @@ connections: password: admin services: - splitter: + extra_publishing: consumer: exchange: general-sentiment-x routing_key: general-sentiment-q @@ -16,7 +16,7 @@ services: routing_key: test_queue publishers: - default: + log: exchange: log-x exchange_type: topic routing_key: splitter.test_queue diff --git a/examples/rpc/client.py b/examples/rpc/client.py index 7f6bd7e..038cec3 100644 --- a/examples/rpc/client.py +++ b/examples/rpc/client.py @@ -3,7 +3,6 @@ from mela import Mela app = Mela(__name__) -app.read_config_yaml('application.yml') fetcher = app.rpc_client("fetcher") @@ -14,26 +13,35 @@ async def main(): # RPC calls over RabbitMQ never were simpler! res = await fetcher.call({'asdf': 5, 'lol': [3, 8, ["haha", "wow"]]}) - print(res) + # res # we can even gather call results! g = await asyncio.gather(fetcher.call(url1), fetcher.call(url2)) - print(g) - - create_bot_result = await bot_manager.call( - {'bot_id': 1, 'bot_username': "LalkaPalka", 'bot_password': "supersecret"}, - headers={'method': 'create_bot'} + # g + + create_bot_result = await bot_manager.call({ + 'bot_id': 1, + 'bot_username': "LalkaPalka", + 'bot_password': "supersecret", + }, + headers={'method': 'create_bot'}, ) - print(f"create_bot result: {create_bot_result}") + # create_bot result {create_bot_result} get_bot_result = await bot_manager.call({'bot_id': 1}, headers={'method': 'get_bot'}) - print(f"get_bot_result: {get_bot_result}") + # get_bot_result {get_bot_result} unknown_method_result = await bot_manager.call({'bot_id': 4}, headers={'method': 'getBot'}) - print(f"unknown method result: {unknown_method_result}") + # unknown method result: {unknown_method_result} if __name__ == '__main__': - url1 = 'https://tengrinews.kz/kazakhstan_news/vorvalis-dom-izbili-almatinka-rasskazala-zaderjanii-supruga-459127/' - url2 = 'https://www.inform.kz/ru/skol-ko-lichnyh-podsobnyh-hozyaystv-naschityvaetsya-v-kazahstane_a3896073' + url1 = ( + 'https://tengrinews.kz/kazakhstan_news/vorvalis-dom-izbili-' + 'almatinka-rasskazala-zaderjanii-supruga-459127/' + ) + url2 = ( + 'https://www.inform.kz/ru/skol-ko-lichnyh-podsobnyh-' + 'hozyaystv-naschityvaetsya-v-kazahstane_a3896073' + ) app.run(main()) diff --git a/examples/rpc/server.py b/examples/rpc/server.py index e14068f..89b9283 100644 --- a/examples/rpc/server.py +++ b/examples/rpc/server.py @@ -4,7 +4,6 @@ from mela import Mela app = Mela(__name__) -app.configure_from_yaml('application.yml') async def fetch(url): @@ -13,7 +12,7 @@ async def fetch(url): return url -@app.rpc_server("fetcher") +@app.rpc_service("fetcher") async def fetcher(link, message): return await fetch(link) diff --git a/examples/simple_service/app.py b/examples/simple_service/app.py index dc01637..f666b7b 100644 --- a/examples/simple_service/app.py +++ b/examples/simple_service/app.py @@ -1,7 +1,6 @@ from mela import Mela app = Mela(__name__) -app.read_config_yaml('application.yml') @app.service("printer") diff --git a/examples/simple_service/application.yml b/examples/simple_service/application.yml index 96f0e26..03e2707 100644 --- a/examples/simple_service/application.yml +++ b/examples/simple_service/application.yml @@ -2,8 +2,8 @@ connections: default: host: localhost port: 5672 - username: user - password: bitnami + username: admin + password: admin services: printer: diff --git a/examples/simple_service_with_logging/app.py b/examples/simple_service_with_logging/app.py new file mode 100644 index 0000000..8277547 --- /dev/null +++ b/examples/simple_service_with_logging/app.py @@ -0,0 +1,28 @@ +from datetime import datetime +from logging import Logger +from logging import getLogger + +from pydantic import BaseModel +from pydantic import Field + +from mela import Mela + +app = Mela(__name__) + + +class Document(BaseModel): + + text: str + url: str + likes_count: int = Field(alias='likesCount') + date: datetime + + +@app.service("printer") +def printer(doc: Document, logger: Logger): + logger.info(doc) + return doc + + +if __name__ == '__main__': + app.run() diff --git a/examples/simple_service_with_logging/application.yml b/examples/simple_service_with_logging/application.yml new file mode 100644 index 0000000..bb7308e --- /dev/null +++ b/examples/simple_service_with_logging/application.yml @@ -0,0 +1,17 @@ +connections: + default: + host: localhost + port: 5672 + username: admin + password: admin + +services: + printer: + log_level: info + consumer: + exchange: general-sentiment-x + routing_key: general-sentiment-q + queue: general-sentiment-q + publisher: + exchange: general-sentiment-x + routing_key: general-sentiment-q \ No newline at end of file diff --git a/examples/splitter_service/app.py b/examples/splitter_service/app.py deleted file mode 100644 index cb3ecec..0000000 --- a/examples/splitter_service/app.py +++ /dev/null @@ -1,19 +0,0 @@ -from mela import Mela - -app = Mela(__name__) -app.read_config_yaml('application.yml') - - -@app.service("splitter") -def printer(body, message): - i = 0 - for obj in body: - if i % 2 == 0: - yield obj - else: - yield obj, {'routing_key': "test_queue2"} - i += 1 - - -if __name__ == '__main__': - app.run() diff --git a/examples/typed_service/app.py b/examples/typed_service/app.py new file mode 100644 index 0000000..38b28c2 --- /dev/null +++ b/examples/typed_service/app.py @@ -0,0 +1,26 @@ +from pydantic import BaseModel +from pydantic import Field +from datetime import datetime +from aio_pika import IncomingMessage + +from mela import Mela + +app = Mela(__name__) + + +class Document(BaseModel): + + text: str + url: str + likes_count: int = Field(alias='likesCount') + date: datetime + + +@app.service('printer') +def printer(body: Document, text: str, date: str, message: IncomingMessage, likes_count: str = 0, *, url: str): + print(body, text, date, message, likes_count, url) + return body + + +if __name__ == '__main__': + app.run() diff --git a/examples/splitter_service/application.yml b/examples/typed_service/application.yml similarity index 74% rename from examples/splitter_service/application.yml rename to examples/typed_service/application.yml index a9a2295..03e2707 100644 --- a/examples/splitter_service/application.yml +++ b/examples/typed_service/application.yml @@ -6,11 +6,11 @@ connections: password: admin services: - splitter: + printer: consumer: exchange: general-sentiment-x routing_key: general-sentiment-q queue: general-sentiment-q publisher: - exchange: test-x - routing_key: test_queue \ No newline at end of file + exchange: general-sentiment-x + routing_key: general-sentiment-q \ No newline at end of file diff --git a/mela/__init__.py b/mela/__init__.py index 3c53a32..6ffb1a7 100644 --- a/mela/__init__.py +++ b/mela/__init__.py @@ -1,708 +1,78 @@ -import uuid -from typing import Optional, Callable, Union - -import logging - import asyncio -import aio_pika -import aiormq -import envyaml -import inspect - -try: - import ujson as json -except ImportError: - import json - -logging.basicConfig(format='%(name)s\t%(asctime)s\t%(levelname)s\t%(message)s', level=logging.INFO) - - -class Configurable: - - def __init__(self, app: 'Mela', name=None): - self.config: Optional[dict] = None - self.name = name - self.app = app - - def configure(self, config: dict): - self.config = config - self.on_config_update() - self.ensure_configured() - - def configure_from_yaml(self, filename): - env = envyaml.EnvYAML(filename) - self.configure(dict(env)) - - def read_config_yaml(self, filename): - """ - DEPRECATED - Will be removed in v.1.1 - - Only for legacy usage - """ - logging.warning("DEPRECATION: `Configurable.read_config_yaml` will be " - "replaced with `Configurable.configure_from_yaml` in v1.1") - self.configure_from_yaml(filename) - - def on_config_update(self): - pass - - def is_configured(self): - return self.config is not None - - def ensure_configured(self): - if not self.is_configured(): - raise Exception(f"No config provided for {self}") - - -class Loggable(Configurable): - - def __init__(self, app: 'Mela', name=None): - super(Loggable, self).__init__(app, name=name) - self.log = None - self.set_logger(logging.getLogger(self.name)) - - def set_logger(self, logger): - self.log = logger - self.log.setLevel(logging.INFO) - - def on_config_update(self): - super().on_config_update() - if 'log' in self.config: - if 'name' in self.config['log']: - self.log = logging.getLogger(self.config['log']['name']) - - -class Mela(Loggable): - - def __init__(self, name): - super(Mela, self).__init__(self, name) - self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() - self._runnables = [] - self._connection_registry = ConnectionRegistry(self, 'connections') - self._results = None - self.services = {} - self.publishers = {} - self.consumers = {} - - def __default_exception_handler(self, loop, context): - loop.stop() - raise context['exception'] - - def on_config_update(self): - self.config.setdefault('publishers', {}) - self.config.setdefault('producers', {}) # TODO DEPRECATION remove in v1.1 - self.config.setdefault('consumers', {}) - self.config.setdefault('services', {}) - self.config.setdefault('connections', {}) - if 'connection' in self.config and not self.config['connections']: - self.config['connections'] = {'default': self.config['connection']} - - def publisher(self, name="default", options=None) -> 'MelaPublisher': - if options is None: - options = {} - if options: - self.log.warning("DEPRECATION: `Mela.publisher` will not " - "take options since v1.1. Use config file.") - self.ensure_configured() - if name not in self.config['publishers'] and name not in self.config['producers']: - # TODO DEPRECATION v1.1 remove `producers` - if 'publisher' in self.config: - name = 'default' - self.config['publishers'][name] = self.config['publisher'] - else: - raise KeyError(f"No config found for publisher {name}") - publisher = MelaPublisher(self, name) - publisher.configure({**(self.config['publishers'].get(name) or {}), **(self.config['producers'].get(name) or {}), **options}) - self.register(publisher) - return publisher - - def producer(self, name, options=None): - """ - DEPRECATED - Will be removed in v.1.1 - - Only for legacy usage - """ - self.log.warning("DEPRECATION: `Mela.producer` is replaced with `Mela.publisher`") - - def decorator(*args, **kwargs): - return self.publisher(name, options) - return decorator - - def consumer(self, name): - def decorator(func): - consumer = MelaConsumer(self, name) - consumer.configure(self.config['consumers'][name]) - consumer.set_processor(func) - self.register(consumer) - return consumer +from typing import Optional - return decorator +from .components.base import Component +from .components.base import ConsumingComponent +from .factories.core.connection import close_all_connections +from .factories.publisher import publisher +from .scheme import MelaScheme +from .settings import Settings - def service(self, name, producer_name=None, options_consumer=None, options_publisher=None): - if options_publisher is None: - options_publisher = {} - if options_consumer is None: - options_consumer = {} - if producer_name: - self.log.warning("DEPRECATION: `Mela.service` will use only one argument, service name. " - "Update your config format.") - if options_consumer or options_publisher: - self.log.warning("DEPRECATION: `Mela.service` will not " - "take options since v1.1. Use config file.") - def decorator(func): - service = MelaService(self, name) - if producer_name: - self.config['services'][name] = {} - self.config['services'][name]['consumer'] = {**self.config['consumers'][name], **options_consumer} - self.config['services'][name]['publisher'] = {**self.config['producers'][producer_name], - **options_publisher} - service.configure(self.config['services'][name]) - service.set_processor(func) - self.register(service) - return service +class Mela(MelaScheme): - return decorator - - def rpc_server(self, name): - - def decorator(func): - rpc_server_instance = MelaRPCServer(self, name) - rpc_server_instance.configure(self.config['rpc-services'][name]) - rpc_server_instance.set_processor(func) - self.register(rpc_server_instance) - return rpc_server_instance - - return decorator + def __init__( + self, + name: str, + settings_: Optional[Settings] = None, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): + super().__init__(name) + self._settings: Optional[Settings] = None + if settings_: + self.settings = settings_ + if loop is None: + loop = asyncio.get_event_loop() + self._loop: Optional[asyncio.AbstractEventLoop] = loop - def rpc_client(self, name): - rpc_client_instance = MelaRPCClient(self, name) - rpc_client_instance.configure(self.config['rpc-services'][name]) - self.register(rpc_client_instance) - return rpc_client_instance + def publisher_sync(self, name): + return self._loop.run_until_complete(self.publisher_instance(name)) - def register(self, other): - if isinstance(other, MelaService): - self.services[other.name] = other - elif isinstance(other, MelaConsumer): - self.consumers[other.name] = other - elif isinstance(other, MelaPublisher): - self.publishers[other.name] = other - self._runnables.append(other) + async def publisher_instance(self, name): + return await publisher(self.settings.publishers[name]) @property - def connections(self): - return self._connection_registry - - def setup(self): - self.loop.set_exception_handler(self.__default_exception_handler) - for runnable in self._runnables: - self.loop.create_task(runnable.run()) - - def run(self, coro=None): - self.setup() - - self.log.info("Running app...") - if coro: - self.loop.run_until_complete(coro) - else: - self.loop.run_forever() - - -class Connectable(Loggable): - CONNECTION_MODE = 'read' - - def __init__(self, app, name): - super().__init__(app, name) - self.loop = app.loop - self.connection: Optional[aio_pika.RobustConnection] = None - self.channel: Optional[aio_pika.RobustChannel] = None - - async def ensure_connection(self): - if self.connection is None: - try: - self.connection = await self.app.connections.get(self.config.get('connection', "default"), - mode=self.CONNECTION_MODE) - except Exception as e: - self.log.exception("Unhandled exception while connecting: ") - - async def ensure_channel(self): - await self.ensure_connection() - if self.channel is None: - try: - self.channel = await self.connection.channel() - except Exception as e: - self.log.warning("Error while creating channel") - self.log.warning(e.__class__.__name__, e.args) - elif self.channel.is_closed: - await self.channel.reopen() - - -class ConnectionRegistry(Loggable): - connection_lock = asyncio.Lock() + def settings(self): + assert self._settings, "Mela is not configured" + return self._settings - def __init__(self, app, name): - super().__init__(app, name) - self._registry = {} - self.loop = app.loop - - async def get(self, name, mode='read'): - await self.connection_lock.acquire() - connection_name = f"{name}_{mode}" - if connection_name not in self._registry: - self.log.debug(f"Making new connection {name} with mode {mode}") - connection = await self.make_connection(name, mode) - self._registry[connection_name] = connection - self.connection_lock.release() - return self._registry[connection_name] - - async def make_connection(self, name, mode): - config = self.app.config['connections'].get(name, {}) - if 'url' not in config and 'host' not in config: - raise KeyError(f"Connection {name} is not configured") - if 'username' in config and 'login' not in config: - config.setdefault('login', config['username']) - connection: Optional[aio_pika.RobustConnection] = None - while connection is None: - try: - connection = await aio_pika.connect_robust( - **config, - loop=self.loop, - client_properties={ - 'connection_name': f"{self.app.name}_{name}_{mode}" - } - ) - # await connection.connected.wait() - except ConnectionError: - self.log.warning("Connection refused, trying again") - except aiormq.exceptions.IncompatibleProtocolError: - self.log.warning("aiormq.exceptions.IncompatibleProtocolError") - except Exception as e: - self.log.warning("Unhandled exception while connecting: %s" % e.__class__.__name__) - finally: - await asyncio.sleep(1) - return connection - - -class MelaPublisher(Connectable): - CONNECTION_MODE = "write" - - def __init__(self, app: Mela, name): - self.name = name - super(MelaPublisher, self).__init__(app, self.name) - self.exchange: Optional[aio_pika.RobustExchange] = None - self.default_routing_key = None - self.is_prepared = asyncio.Event() - self.decode = self.decode - self.publishing_params = {} - self.permanent_publishing_options = {} - self.permanent_publishing_options.setdefault('content_type', "application/json") - self.permanent_publishing_options.setdefault('content_encoding', "UTF-8") - - def __call__(self, *args, **kwargs): + @settings.setter + def settings(self, value: Settings): """ - THIS METHOD WILL BE DEPRECATED IN v1.1. DO NOT USE IT. + Set config of entire Mela app. It's possible only if app is not + running yet. In other case it will raise `RuntimeError` """ - self.loop.create_task(self.publish_direct(*args, **kwargs)) + self._settings = value - def on_config_update(self): - super().on_config_update() - self.config.setdefault('connection', "default") - self.config.setdefault('skip_unroutables', False) - self.config.setdefault('exchange_type', "direct") - if 'routing_key' not in self.config: - self.log.warning(f"Default routing key is not set for publisher {self.name}") - self.default_routing_key = self.config.get('routing_key', "") - if 'exchange' not in self.config: - raise KeyError(f"Exchange is not set for publisher {self.name}") + def run(self, coro=None, loop: Optional[asyncio.AbstractEventLoop] = None): + if self._settings is None: + self.settings = Settings() + if loop is None: + loop = self._loop + assert loop + self._run_in_loop(coro, loop) @staticmethod - def decode(message): - return bytes(json.dumps(message), encoding='utf-8') - - def set_decoder(self, func): - self.decode = func - - async def ensure_exchange(self): - await self.ensure_channel() + async def waiter(): try: - self.exchange = await self.channel.declare_exchange(self.config['exchange'], - type=self.config['exchange_type'], durable=True) - except Exception as e: - self.log.warning("Error while declaring exchange") - self.log.warning(e.__class__.__name__, e.args) - - async def publish( - self, - message: Union[dict, list, int, str, float], - routing_key: Optional[str] = None, - **options - ): - """ - THIS METHOD WILL BE REPLACED WITH `publish_direct` in 1.1 - """ - return await self.publish_direct(message, routing_key, **options) - - async def prepare(self): - self.log.debug("preparing publisher") - await self.ensure_connection() - await self.ensure_exchange() - self.is_prepared.set() - - async def publish_direct( - self, - message: Union[dict, list, int, str, float], - routing_key: Optional[str] = None, - **options - ): - """ - THIS METHOD WILL BE DEPRECATED IN 1.1 - """ - await self.is_prepared.wait() - self.log.debug(f"Going to directly publish message {message}") - routing_key = routing_key or self.default_routing_key or '' - options = {**self.permanent_publishing_options, **options} - return await self.exchange.publish( - aio_pika.Message( - self.decode(message), - **options - ), - routing_key=routing_key, - **self.publishing_params - ) - - async def run(self): - self.log.debug(f"Running publisher {self.name}") - await self.prepare() - self.log.info(f"Publisher `{self.name}` is ready") - - def update_permanent_publishing_options(self, **new): - self.permanent_publishing_options.update(new) - - -class MelaConsumer(Connectable): - - def __init__(self, app, name): - super().__init__(app, name) - self.connection = None - self.connection_established = asyncio.Event() - self.channel: Optional[aio_pika.RobustChannel] = None - self.exchange: Optional[aio_pika.RobustExchange] = None - self.queue: Optional[aio_pika.RobustQueue] = None - self.encode: Optional[Callable] = self.encode - self.process: Optional[Callable] = self.process - self.on_message_processed: Optional[Callable] = self.on_message_processed - self.queue_iterator: Optional[aio_pika.queue.QueueIterator] = None - - def is_dead_letter_exchange_configured(self): - return 'dead_letter_exchange' in self.config - - def on_config_update(self): - super().on_config_update() - if 'queue' not in self.config: - raise KeyError(f"No queue found in config for {self.name}") - if 'exchange' not in self.config: - raise KeyError(f"No exchange found in config for {self.name}") - if 'routing_key' not in self.config: - raise KeyError(f"No routing key found in config for {self.name}") - self.config.setdefault('prefetch_count', 1) - self.config.setdefault('exchange_type', "direct") - - async def ensure_channel(self): - await super().ensure_channel() - await self.channel.set_qos(prefetch_count=self.config['prefetch_count']) - - async def ensure_exchange(self): - if self.exchange is None: - await self.ensure_channel() - try: - self.exchange = await self.channel.declare_exchange(self.config['exchange'], - type=self.config['exchange_type'], durable=True) - except Exception as e: - self.log.warning("Error while declaring exchange") - self.log.warning(e.__class__.__name__, e.args) - - async def ensure_queue(self): - if self.queue is None: - await self.ensure_channel() - try: - args = {} - if self.is_dead_letter_exchange_configured(): - self.config.setdefault("dead_letter_routing_key", "") - await self.channel.declare_exchange(self.config['dead_letter_exchange'], durable=True) - args['x-dead-letter-exchange'] = self.config['dead_letter_exchange'] - args['x-dead-letter-routing-key'] = self.config['dead_letter_routing_key'] - self.queue = await self.channel.declare_queue(self.config['queue'], durable=True, arguments=args) - except Exception as e: - self.log.warning("Error while declaring queue") - self.log.warning(e.__class__.__name__, e.args) - - async def ensure_binding(self): - await self.ensure_exchange() - await self.ensure_queue() - try: - await self.queue.bind(self.config['exchange'], routing_key=self.config['routing_key']) - except Exception as e: - self.log.warning("Error while declaring queue") - self.log.warning(e.__class__.__name__, e.args) - - def get_broken_messages_requeue_strategy(self): - if 'requeue_broken_messages' in self.config: - return self.config['requeue_broken_messages'] - if self.is_dead_letter_exchange_configured(): - return False - return True - - async def run(self): - await self.ensure_connection() - - should_requeue_broken_messages = self.get_broken_messages_requeue_strategy() - - async with self.connection: - self.log.debug("Connected successfully") - await self.ensure_binding() - self.log.debug("Bindings are ready") - self.log.info("Ready to process messages!") - async with self.queue.iterator() as queue_iter: - self.queue_iterator = queue_iter - async for message in queue_iter: # type: aio_pika.IncomingMessage - async with message.process(ignore_processed=True): - body = self.encode(message.body) - try: - if inspect.iscoroutinefunction(self.process): - resp = await self.process(body, message) - else: - resp = self.process(body, message) - except Exception as e: - self.log.exception("Message processing error") - await message.nack(requeue=should_requeue_broken_messages) - raise e - try: - await self.on_message_processed(resp, message) - except Exception as e: - self.log.exception("Message processing error in generator. Be careful! Possibly published " - "messages duplication") - await message.nack(requeue=should_requeue_broken_messages) - raise e - - @staticmethod - async def on_message_processed(response, message): - pass - - @staticmethod - def encode(message_body: bytes): - return json.loads(message_body) - - @staticmethod - async def process(body, message): - raise NotImplementedError - - def set_processor(self, func): - self.process = func - - def set_on_message_processed(self, func): - self.on_message_processed = func - - def set_encoder(self, func): - self.encode = func - - async def cancel(self): - await self.queue_iterator.close() - - -class MelaService(Loggable): - - async def __response_processor_for_generator(self, response): - for obj in response: - await self.publish(obj) - - async def __response_processor_for_async_generator(self, response): - async for obj in response: - await self.publish(obj) - - async def __response_processor_for_function(self, response): - await self.publish(response) - - def __init__(self, app, name): - consumer_name = name + '_consumer' - publisher_name = name + '_publisher' - super().__init__(app, name) - self.publisher = MelaPublisher(app, publisher_name) - self.consumer = MelaConsumer(app, consumer_name) - self.response_processor = None - - async def publish(self, message, **options): - if isinstance(message, tuple): - await self.publisher.publish_direct(message[0], **message[1], **options) - else: - await self.publisher.publish_direct(message, **options) - - async def on_message_processed(self, response, message: aio_pika.IncomingMessage): - if response is not None: - await self.response_processor(response) - # await self.publisher.wait() - # if not message.processed: - # self.log.warning("Message is not processed, we're going to automatically `ack` it. We recommend to " - # "explicitly process message: `ack`, `nack` or `reject` it.") - # message.ack() - - def on_config_update(self): - connection_name = self.config.get("connection", "default") - if isinstance(self.config['publisher'], str): - self.config['publisher'] = self.app.config['publishers'][self.config['publisher']] - if isinstance(self.config['consumer'], str): - self.config['consumer'] = self.app.config['consumers'][self.config['consumer']] - self.config['publisher'].setdefault('connection', connection_name) - self.config['consumer'].setdefault('connection', connection_name) - self.publisher.configure(self.config['publisher']) - self.consumer.configure(self.config['consumer']) - self.consumer.set_on_message_processed(self.on_message_processed) - - def set_processor(self, func): - self.consumer.set_processor(func) - if inspect.isgeneratorfunction(self.consumer.process): - self.response_processor = self.__response_processor_for_generator - elif inspect.isasyncgenfunction(self.consumer.process): - self.response_processor = self.__response_processor_for_async_generator - else: - self.response_processor = self.__response_processor_for_function - - async def run(self): - self.log.info(f"Running service `{self.name}`...") - publisher_task = self.app.loop.create_task(self.publisher.run()) - consumer_task = self.app.loop.create_task(self.consumer.run()) - - async def cancel(self): - await self.consumer.cancel() - - def __call__(self, *args, **kwds): - """Method that allows calling a function that is used for - service creation. - """ - return self.consumer.process(*args, **kwds) - - -class MelaRPCServer(MelaService): - - def on_config_update(self): - connection_name = self.config.get("connection", "default") - self.config['publisher'] = { - 'connection': connection_name, - 'exchange': self.config.get('response_exchange', ""), # Use default exchange if exchange is not set - 'routing_key': "", # use empty routing key as default - 'skip_unroutables': True # We should ignore unroutable messages because they can - # occasionally block RPC server - } - self.config['consumer'] = { - 'connection': connection_name, - 'exchange': self.config['exchange'], - 'routing_key': self.config['routing_key'], - 'queue': self.config['queue'] - } - self.publisher.configure(self.config['publisher']) - self.consumer.configure(self.config['consumer']) - self.consumer.set_on_message_processed(self.on_message_processed) - - async def __response_processor_for_function(self, response, message: aio_pika.IncomingMessage = None): - if message: - await self.publish(response, correlation_id=message.correlation_id, routing_key=message.reply_to) + # Wait until terminate + await asyncio.Future() + finally: + await close_all_connections() + + def _run_in_loop(self, coro, loop: asyncio.AbstractEventLoop): + assert self._settings + for requirement_name, requirement in self.requirements.items(): + instance: Component = loop.run_until_complete( + requirement.resolve(self._settings), + ) + if isinstance(instance, ConsumingComponent): + loop.run_until_complete(instance.prepare_processor(self, self.settings)) + loop.run_until_complete(instance.consume()) + if coro: + loop.run_until_complete(coro) else: - raise AttributeError("Message is not provided") - - async def on_message_processed(self, response, message: aio_pika.IncomingMessage): - if response is not None: - await self.response_processor(response, message) - # await self.publisher.wait() - # if not message.processed: - # self.log.warning("Message is not processed, we're going to automatically `ack` it. We recommend to " - # "explicitly process message: `ack`, `nack` or `reject` it.") - # message.ack() - - def set_processor(self, func): - self.consumer.set_processor(func) - self.response_processor = self.__response_processor_for_function - - -class MelaRPCClientConsumer(MelaConsumer): - - def __init__(self, app, name): - super().__init__(app, name) - self.futures = {} - self.ensure_queue_lock = asyncio.Lock() - - def process(self, body, message): - future = self.futures.pop(message.correlation_id) - future.set_result(body) - - async def ensure_queue(self): - async with self.ensure_queue_lock: - if self.queue is None: - await self.ensure_channel() - try: - self.queue = await self.channel.declare_queue(exclusive=True) - except Exception as e: - self.log.exception("Error while declaring queue") - - async def ensure_binding(self): - await self.ensure_exchange() - await self.ensure_queue() - try: - await self.queue.bind(self.config['exchange'], routing_key=self.queue.name) - except Exception as e: - self.log.warning("Error while declaring queue") - self.log.warning(e.__class__.__name__, e.args) - - def on_config_update(self): - Loggable.on_config_update(self) - if 'exchange' not in self.config: - raise KeyError(f"No exchange found in config for {self.name}") - self.config.setdefault('prefetch_count', 1) - self.config.setdefault('exchange_type', "direct") - - -class MelaRPCClient(MelaService): - - def __init__(self, app, name): - consumer_name = name + '_rpc_consumer' - publisher_name = name + '_rpc_publisher' - super().__init__(app, name) - self.publisher = MelaPublisher(app, publisher_name) - self.consumer = MelaRPCClientConsumer(app, consumer_name) - self.response_processor = None - - def on_config_update(self): - connection_name = self.config.get("connection", "default") - self.config['publisher'] = { - 'connection': connection_name, - 'exchange': self.config['exchange'], - 'routing_key': self.config['routing_key'], - } - self.config['consumer'] = { - 'connection': connection_name, - 'exchange': self.config.get('response_exchange', ""), - } - self.publisher.configure(self.config['publisher']) - self.consumer.configure(self.config['consumer']) - self.consumer.set_on_message_processed(self.on_message_processed) - - async def call(self, body, headers=None, **options): - correlation_id = str(uuid.uuid4()) - future = self.app.loop.create_future() - self.consumer.futures[correlation_id] = future - await self.consumer.ensure_binding() - await self.publisher.ensure_exchange() - await self.publisher.publish_direct( - body, - correlation_id=correlation_id, - reply_to=self.consumer.queue.name, - headers=headers, - **options - ) - return await future + loop.run_until_complete(self.waiter()) - async def run(self): - self.log.info(f"Connecting {self.name} service") - self.app.loop.create_task(self.consumer.run()) - await self.publisher.run() + def register_scheme(self, scheme_: MelaScheme): + self.merge(scheme_) diff --git a/mela/abc.py b/mela/abc.py new file mode 100644 index 0000000..fac162b --- /dev/null +++ b/mela/abc.py @@ -0,0 +1,50 @@ +import abc +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Union + +from aio_pika import Message +from aio_pika.abc import AbstractMessage +from aiormq.abc import ConfirmationFrameType +from pydantic import BaseModel + +from .components.base import Component +from .settings import Settings + + +class AbstractPublisher(abc.ABC): + + @abc.abstractmethod + async def publish_message( + self, + message: AbstractMessage, + routing_key: str = None, + timeout: int = None, + ) -> Optional[ConfirmationFrameType]: + raise NotImplementedError + + async def publish( + self, + message: Union[Dict, BaseModel, Message], + routing_key: Optional[str] = None, + ) -> Optional[ConfirmationFrameType]: + raise NotImplementedError + + +class AbstractRPCClient(abc.ABC): + + @abc.abstractmethod + async def call(self, body: Union[AbstractMessage, BaseModel, dict]) -> Union[BaseModel, dict]: + raise NotImplementedError + + +class AbstractSchemeRequirement(abc.ABC): + + @abc.abstractmethod + async def resolve(self, settings: Settings) -> Component: + raise NotImplementedError + + @abc.abstractmethod + def set_processor(self, processor: Optional[Callable]): + raise NotImplementedError diff --git a/mela/components/__init__.py b/mela/components/__init__.py new file mode 100644 index 0000000..9694d11 --- /dev/null +++ b/mela/components/__init__.py @@ -0,0 +1,7 @@ +from .consumer import Consumer +from .exceptions import NackMessageError +from .publisher import Publisher +from .service import Service + + +__all__ = ['Consumer', 'Publisher', 'Service', 'NackMessageError'] diff --git a/mela/components/base.py b/mela/components/base.py new file mode 100644 index 0000000..e928d0a --- /dev/null +++ b/mela/components/base.py @@ -0,0 +1,45 @@ +import abc +import asyncio +import logging +from typing import Optional + +from ..log import handler + + +class Component(abc.ABC): + + def __init__(self, name, log_level=None, loop=None): + self.name = name + self.loop = loop + self.log = None + if log_level is not None: + self.config_logger(log_level) + + def config_logger(self, level: str): + self.log = logging.getLogger(self.name) + self.log.addHandler(handler) + self.log.setLevel(level.upper()) + + +class ConsumingComponent(Component, abc.ABC): + + def __init__(self, name, log_level: str = None, loop: asyncio.AbstractEventLoop = None): + super().__init__(name, log_level, loop) + self._processor = None + + @abc.abstractmethod + def set_processor(self, processor): + raise NotImplementedError() + + @abc.abstractmethod + async def consume(self, **kwargs) -> str: + raise NotImplementedError() + + @abc.abstractmethod + async def cancel(self, timeout: Optional[int] = None, nowait: bool = False): + raise NotImplementedError() + + async def prepare_processor(self, scheme, settings): + if self._processor: + self._processor.cache_static_params(self, scheme) + await self._processor.solve_requirements(settings) diff --git a/mela/components/consumer.py b/mela/components/consumer.py new file mode 100644 index 0000000..73edcb0 --- /dev/null +++ b/mela/components/consumer.py @@ -0,0 +1,102 @@ +from json import JSONDecodeError +from typing import Any +from typing import Callable +from typing import Coroutine +from typing import Optional + +from aio_pika.abc import AbstractIncomingMessage +from aio_pika.abc import AbstractQueue + +from mela.components.base import ConsumingComponent +from mela.components.exceptions import NackMessageError +from mela.processor import Processor + + +class Consumer(ConsumingComponent): + + def __init__( + self, + name: str, + prefetch_count: int = 1, + timeout: Optional[int] = None, + no_ack: bool = False, + exclusive: bool = False, + consumer_tag: Optional[str] = None, + requeue_broken_messages: bool = True, + log_level: str = 'info', + *, + queue: Optional[AbstractQueue] = None, + ): + super().__init__(name, log_level) + self._prefetch_count: int = prefetch_count + self._timeout: Optional[int] = timeout + self._no_ack: bool = no_ack + self._exclusive: bool = exclusive + self._consumer_tag: Optional[str] = consumer_tag + self._queue: Optional[AbstractQueue] = None + self.requeue_broken_messages = requeue_broken_messages + if queue: + self.set_queue(queue) + + self._callback: Optional[ + Callable[ + [ + AbstractIncomingMessage, + ], + Coroutine[ + Any, + Any, + None, + ], + ], + ] = None + + def set_queue(self, queue: AbstractQueue): + self._queue = queue + + def get_queue_name(self) -> str: + assert self._queue + return self._queue.name + + def set_processor(self, processor: Processor): + self._processor = processor + + async def wrapper(message: AbstractIncomingMessage): + try: + await processor.process(message) + except NackMessageError as e: + await message.nack(requeue=e.requeue) + self.log.exception("Message is Nacked:") + except JSONDecodeError: + self.log.exception("Message cannot be serialized, so we " + "Nack it with requeue=False") + await message.nack(requeue=False) + except Exception: + await message.nack(requeue=self.requeue_broken_messages) + self.log.exception("Message is broken:") + else: + await message.ack() + + self.set_callback(wrapper) + + def set_callback(self, func: Callable[[AbstractIncomingMessage], Coroutine[Any, Any, None]]): + self._callback = func + + async def consume(self, **kwargs) -> str: + assert self._callback is not None, "We can't start without a processor, dude" + assert self._queue is not None, "Queue is not set" + consumer_tag = await self._queue.consume( + callback=self._callback, + no_ack=self._no_ack, + exclusive=self._exclusive, + arguments=kwargs, + consumer_tag=self._consumer_tag, + timeout=self._timeout, + ) + self._consumer_tag = consumer_tag + return consumer_tag + + async def cancel(self, timeout: Optional[int] = None, nowait: bool = False): + assert self._consumer_tag + assert self._queue + return await self._queue.cancel(self._consumer_tag, timeout, nowait) diff --git a/mela/components/exceptions.py b/mela/components/exceptions.py new file mode 100644 index 0000000..28e9fdd --- /dev/null +++ b/mela/components/exceptions.py @@ -0,0 +1,5 @@ +class NackMessageError(Exception): + + def __init__(self, message: str, requeue: bool = True): + super().__init__(message, f"requeue: {requeue}") + self.requeue: bool = requeue diff --git a/mela/components/publisher.py b/mela/components/publisher.py new file mode 100644 index 0000000..b239f4f --- /dev/null +++ b/mela/components/publisher.py @@ -0,0 +1,57 @@ +from typing import Dict +from typing import Optional +from typing import Union + +from aio_pika import Message +from aio_pika.abc import AbstractExchange +from aio_pika.abc import AbstractMessage +from aiormq.abc import ConfirmationFrameType +from pydantic import BaseModel + +from ..abc import AbstractPublisher +from ..components.base import Component +from ..processor import Processor + + +class Publisher(Component, AbstractPublisher): + + def __init__( + self, + name: str, + default_routing_key: str = '', + default_timeout: int = None, + log_level: str = 'info', + *, + exchange: Optional[AbstractExchange] = None, + ): + super().__init__(name, log_level) + self._default_routing_key = default_routing_key + self._default_timeout = default_timeout + self._exchange: Optional[AbstractExchange] = None + if exchange: + self.set_exchange(exchange) + + def set_exchange(self, exchange: AbstractExchange): + assert self._exchange is None, "Exchange already is set" + self._exchange = exchange + + async def publish_message( + self, + message: AbstractMessage, + routing_key: str = None, + timeout: int = None, + ) -> Optional[ConfirmationFrameType]: + assert self._exchange + if routing_key is None: + routing_key = self._default_routing_key + if timeout is None: + timeout = self._default_timeout + return await self._exchange.publish(message, routing_key, timeout=timeout) + + async def publish( + self, + message: Union[Dict, BaseModel, Message], + routing_key: Optional[str] = None, + ): + message, routing_key = Processor.wrap_response(message, routing_key) + return await self.publish_message(message, routing_key) diff --git a/mela/components/rpc.py b/mela/components/rpc.py new file mode 100644 index 0000000..ca7ac6d --- /dev/null +++ b/mela/components/rpc.py @@ -0,0 +1,155 @@ +from asyncio import AbstractEventLoop +from asyncio import Future +from asyncio import Lock +from json import JSONDecodeError +from json import loads +from typing import Optional +from typing import Type +from typing import Union +from uuid import uuid4 + +from aio_pika.abc import AbstractIncomingMessage +from aio_pika.abc import AbstractMessage +from pydantic import BaseModel + +from ..abc import AbstractRPCClient +from ..processor import Processor +from . import Consumer +from . import Publisher +from .base import ConsumingComponent +from .exceptions import NackMessageError + + +class RPC(ConsumingComponent): + + def __init__( + self, + name: str, + log_level: str = 'info', + *, + worker: Optional[Consumer] = None, + response_publisher: Optional[Publisher] = None, + request_publisher: Optional[Publisher] = None, + ): + super().__init__(name, log_level) + self._worker: Optional[Consumer] = None + self._response_publisher: Optional[Publisher] = None + self._request_publisher: Optional[Publisher] = None + if worker: + self._worker = worker + if response_publisher: + self._response_publisher = response_publisher + if request_publisher: + self._request_publisher = request_publisher + self._client: Optional[RPCClient] = None + + def set_processor(self, processor: Processor): + + async def on_message(message: AbstractIncomingMessage) -> None: + try: + outgoing_message, _ = await processor.process(message) + outgoing_message.correlation_id = message.correlation_id + await self._response_publisher.publish_message( + outgoing_message, + routing_key=message.reply_to, + ) + except NackMessageError as e: + await message.nack(requeue=e.requeue) + self.log.exception("Message is Nacked:") + except JSONDecodeError: + self.log.exception("Message cannot be serialized, so we " + "Nack it with requeue=False") + await message.nack(requeue=False) + except Exception: + await message.nack(requeue=self._worker.requeue_broken_messages) + self.log.exception("Message is broken:") + else: + await message.ack() + self._worker.set_callback(on_message) + + @property + def client(self): + assert self._client is not None + return self._client + + @client.setter + def client(self, value): + self._client = value + + async def consume(self, **kwargs) -> str: + return await self._worker.consume(**kwargs) + + async def cancel(self, timeout: Optional[int] = None, nowait: bool = False): + return await self._worker.cancel(timeout, nowait) + + +class RPCClient(ConsumingComponent, AbstractRPCClient): + + def __init__( + self, + name: str, + log_level: str = 'info', + loop: AbstractEventLoop = None, + *, + request_publisher: Optional[Publisher] = None, + response_consumer: Optional[Consumer] = None, + response_model: Optional[Type[BaseModel]] = None, + ): + super().__init__( + name=name, + loop=loop, + log_level=log_level, + ) + self._request_publisher: Optional[Publisher] = None + if request_publisher: + self._request_publisher = request_publisher + if response_consumer: + self._response_consumer = response_consumer + self._response_model = response_model + self._futures = {} + self._consuming = Lock() + + @staticmethod + def _generate_correlation_id(): + return str(uuid4()) + + async def call(self, body: Union[AbstractMessage, BaseModel, dict]): + assert self._consuming.locked(), "Consumer is not active" + message, _ = Processor.wrap_response(body) + message.correlation_id = self._generate_correlation_id() + message.reply_to = self._response_consumer.get_queue_name() + + future = self.loop.create_future() + self._futures[message.correlation_id] = future + + await self._request_publisher.publish_message(body) + + return await future + + def _prepare_callback(self): + + async def on_message(message: AbstractIncomingMessage) -> None: + if message.correlation_id is None: + raise KeyError("Message without correlation id") + + if self._response_model: + parsed_response = self._response_model.parse_raw(message.body) + else: + parsed_response = loads(message.body) + future: Future = self._futures.pop(message.correlation_id, None) + if future is not None: + future.set_result((parsed_response, message)) + + self._response_consumer.set_callback(on_message) + + async def consume(self, **kwargs) -> str: + await self._prepare_callback() + await self._consuming.acquire() + return await self._response_consumer.consume(**kwargs) + + async def cancel(self, timeout: Optional[int] = None, nowait: bool = False): + self._consuming.release() + return await self._response_consumer.cancel(timeout, nowait) + + def set_processor(self, processor): + pass diff --git a/mela/components/service.py b/mela/components/service.py new file mode 100644 index 0000000..0a97f3f --- /dev/null +++ b/mela/components/service.py @@ -0,0 +1,80 @@ +from json import JSONDecodeError +from typing import Optional + +from aio_pika.abc import AbstractIncomingMessage + +from mela.components import Consumer +from mela.components import Publisher +from mela.components.base import ConsumingComponent +from mela.components.exceptions import NackMessageError +from mela.processor import Processor + + +class Service(ConsumingComponent): + + def __init__( + self, + name: str, + log_level: str = 'info', + *, + publisher: Optional[Publisher] = None, + consumer: Optional[Consumer] = None, + ): + super().__init__(name, log_level) + self._consumer: Optional[Consumer] = None + self._publisher: Optional[Publisher] = None + if consumer: + self.consumer = consumer + if publisher: + self.publisher = publisher + + def set_processor(self, processor: Processor): + self._processor = processor + + async def on_message(message: AbstractIncomingMessage) -> None: + try: + outgoing_message, routing_key = await processor.process(message) + await self.publisher.publish_message(outgoing_message, routing_key=routing_key) + except NackMessageError as e: + await message.nack(requeue=e.requeue) + self.log.exception("Message is Nacked:") + except JSONDecodeError: + self.log.exception("Message cannot be serialized, so we " + "Nack it with requeue=False") + await message.nack(requeue=False) + except Exception: + await message.nack(requeue=self.consumer.requeue_broken_messages) + self.log.exception("Message is broken:") + else: + await message.ack() + self.consumer.set_callback(on_message) + + @property + def consumer(self) -> Consumer: + if self._consumer is None: + raise RuntimeError("Consumer is not set") + return self._consumer + + @consumer.setter + def consumer(self, value: Consumer): + if self._consumer: + raise RuntimeError("Consumer already set") + self._consumer = value + + @property + def publisher(self) -> Publisher: + if self._publisher is None: + raise RuntimeError("Publisher is not set") + return self._publisher + + @publisher.setter + def publisher(self, value: Publisher): + if self._publisher: + raise RuntimeError("Publisher already set") + self._publisher = value + + async def consume(self, **kwargs) -> str: + return await self.consumer.consume(**kwargs) + + async def cancel(self, timeout: Optional[int] = None, nowait: bool = False): + return await self.consumer.cancel(timeout, nowait) diff --git a/mela/exceptions.py b/mela/exceptions.py index 167c960..2a9736a 100644 --- a/mela/exceptions.py +++ b/mela/exceptions.py @@ -2,5 +2,5 @@ class MelaException(RuntimeError): pass -class ConfigError(MelaException): +class ConfigError(KeyError): pass diff --git a/mela/factories/__init__.py b/mela/factories/__init__.py new file mode 100644 index 0000000..16c3672 --- /dev/null +++ b/mela/factories/__init__.py @@ -0,0 +1,19 @@ +from typing import Any +from typing import Dict + +from .consumer import consumer +from .publisher import publisher +from .rpc import client as rpc_client +from .rpc import service as rpc_service +from .service import service + + +factory_dict: Dict[str, Any] = { + 'consumer': consumer, + 'publisher': publisher, + 'service': service, + 'rpc_service': rpc_service, + 'rpc_client': rpc_client, +} + +__all__ = ['consumer', 'publisher', 'service', 'rpc_service', 'rpc_client', 'factory_dict'] diff --git a/mela/factories/consumer.py b/mela/factories/consumer.py new file mode 100644 index 0000000..9dfbbdc --- /dev/null +++ b/mela/factories/consumer.py @@ -0,0 +1,45 @@ +from typing import Dict + +from ..components import Consumer +from ..factories.core.connection import connect +from ..factories.core.exchange import declare_exchange +from ..factories.core.queue import declare_queue +from ..settings import AbstractConnectionParams +from ..settings import ConsumerParams +from ..settings import ExchangeParams +from ..settings import QueueParams + + +consumers: Dict[str, Consumer] = {} + + +async def consumer(settings: ConsumerParams) -> Consumer: + assert settings.name + if settings.name not in consumers: + assert isinstance(settings.connection, AbstractConnectionParams) + connection = await connect(settings.name, settings.connection, 'r') + channel = await connection.channel() + await channel.set_qos(settings.prefetch_count) + assert isinstance(settings.queue, QueueParams) + queue = await declare_queue(settings.queue, await connection.channel()) + assert isinstance(settings.exchange, ExchangeParams) + exchange = await declare_exchange(settings.exchange, channel) + await queue.bind(exchange, routing_key=settings.routing_key) + instance = Consumer(**settings.get_params_dict(), queue=queue) + consumers[settings.name] = instance + return consumers[settings.name] + + +async def anonymous_consumer(settings: ConsumerParams) -> Consumer: + assert isinstance(settings.connection, AbstractConnectionParams) + assert settings.name + connection = await connect(settings.name, settings.connection, 'r') + channel = await connection.channel() + await channel.set_qos(1) + settings.queue = QueueParams(name="", durable=False, auto_delete=True, exclusive=True) + queue = await declare_queue(settings.queue, await connection.channel()) + assert isinstance(settings.exchange, ExchangeParams) + exchange = await declare_exchange(settings.exchange, channel) + await queue.bind(exchange, routing_key=queue.name) + instance = Consumer(**settings.get_params_dict(), queue=queue) + return instance diff --git a/mela/factories/core/__init__.py b/mela/factories/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mela/factories/core/connection.py b/mela/factories/core/connection.py new file mode 100644 index 0000000..95ce10a --- /dev/null +++ b/mela/factories/core/connection.py @@ -0,0 +1,37 @@ +from typing import Dict +from typing import Literal +from typing import Union + +from aio_pika import connect_robust +from aio_pika.abc import AbstractConnection + +from ...settings import AbstractConnectionParams + + +connections: Dict[str, AbstractConnection] = {} + + +def generate_anonymous_connection_name(component_name: str) -> str: + # TODO + name = component_name + return name + + +async def close_all_connections(): + for connection_name, connection in connections.items(): + await connection.close() + + +async def connect( + component_name: str, + connection_settings: Union[AbstractConnectionParams], + mode: Literal['r', 'w'], +) -> AbstractConnection: + if connection_settings.name is None: + connection_settings.name = generate_anonymous_connection_name(component_name) + full_connection_name = component_name + '_' + mode + if full_connection_name not in connections: + params = connection_settings.get_params_dict() + connection = await connect_robust(**params) + connections[full_connection_name] = connection + return connections[full_connection_name] diff --git a/mela/factories/core/exchange.py b/mela/factories/core/exchange.py new file mode 100644 index 0000000..e94dc1f --- /dev/null +++ b/mela/factories/core/exchange.py @@ -0,0 +1,8 @@ +from aio_pika.abc import AbstractChannel +from aio_pika.abc import AbstractExchange + +from ...settings import ExchangeParams + + +async def declare_exchange(settings: ExchangeParams, channel: AbstractChannel) -> AbstractExchange: + return await channel.declare_exchange(**settings.get_params_dict()) diff --git a/mela/factories/core/queue.py b/mela/factories/core/queue.py new file mode 100644 index 0000000..83515d8 --- /dev/null +++ b/mela/factories/core/queue.py @@ -0,0 +1,13 @@ +from aio_pika.abc import AbstractChannel +from aio_pika.abc import AbstractQueue + +from ...factories.core.exchange import declare_exchange +from ...settings import ExchangeParams +from ...settings import QueueParams + + +async def declare_queue(settings: QueueParams, channel: AbstractChannel) -> AbstractQueue: + if settings.dead_letter_exchange: + assert isinstance(settings.dead_letter_exchange, ExchangeParams) + await declare_exchange(settings.dead_letter_exchange, channel) + return await channel.declare_queue(**settings.get_params_dict(), timeout=10) diff --git a/mela/factories/publisher.py b/mela/factories/publisher.py new file mode 100644 index 0000000..095e785 --- /dev/null +++ b/mela/factories/publisher.py @@ -0,0 +1,31 @@ +from typing import Dict + +from ..components import Publisher +from ..factories.core.connection import connect +from ..factories.core.exchange import declare_exchange +from ..factories.core.queue import declare_queue +from ..settings import AbstractConnectionParams +from ..settings import ExchangeParams +from ..settings import PublisherParams +from ..settings import QueueParams + + +publishers: Dict[str, Publisher] = {} + + +async def publisher(settings: PublisherParams) -> Publisher: + assert settings.name + if settings.name not in publishers: + assert isinstance(settings.connection, AbstractConnectionParams) + connection = await connect(settings.name, settings.connection, 'w') + channel = await connection.channel() + assert isinstance(settings.exchange, ExchangeParams) + exchange = await declare_exchange(settings.exchange, channel) + if settings.queue: + async with connection.channel() as temp_channel: + assert isinstance(settings.queue, QueueParams) + queue = await declare_queue(settings.queue, temp_channel) + await queue.bind(exchange, routing_key=settings.routing_key) + instance: Publisher = Publisher(**settings.get_params_dict(), exchange=exchange) + publishers[settings.name] = instance + return publishers[settings.name] diff --git a/mela/factories/rpc.py b/mela/factories/rpc.py new file mode 100644 index 0000000..2984701 --- /dev/null +++ b/mela/factories/rpc.py @@ -0,0 +1,50 @@ +from ..components.rpc import RPC +from ..components.rpc import RPCClient +from ..settings import ConsumerParams +from ..settings import PublisherParams +from ..settings import RPCParams +from .consumer import anonymous_consumer +from .consumer import consumer +from .publisher import publisher + + +async def service(settings: RPCParams) -> 'RPC': + assert isinstance(settings.worker, ConsumerParams) + assert isinstance(settings.request_publisher, PublisherParams) + assert isinstance(settings.response_publisher, PublisherParams) + assert settings.name + worker_instance = await consumer(settings.worker) + response_publisher_instance = await publisher(settings.response_publisher) + instance = RPC( + settings.name, + log_level=settings.log_level, + worker=worker_instance, + response_publisher=response_publisher_instance, + ) + client_instance = await client(settings) + instance.client = client_instance + return instance + + +async def client(settings: RPCParams) -> 'RPCClient': + assert isinstance(settings.request_publisher, PublisherParams) + assert isinstance(settings.response_publisher, PublisherParams) + assert settings.name + request_publisher_instance = await publisher(settings.request_publisher) + + consumer_params = ConsumerParams( + name=settings.name + '_client', + connection=settings.connection, + exchange=settings.response_exchange, + routing_key='', + queue='', + ) + + response_consumer_instance = await anonymous_consumer(consumer_params) + instance = RPCClient( + settings.name, + log_level=settings.log_level, + request_publisher=request_publisher_instance, + response_consumer=response_consumer_instance, + ) + return instance diff --git a/mela/factories/service.py b/mela/factories/service.py new file mode 100644 index 0000000..36b136f --- /dev/null +++ b/mela/factories/service.py @@ -0,0 +1,20 @@ +from ..components.service import Service +from ..settings import ConsumerParams +from ..settings import PublisherParams +from ..settings import ServiceParams +from .consumer import consumer +from .publisher import publisher + + +async def service(settings: ServiceParams) -> 'Service': + assert isinstance(settings.consumer, ConsumerParams) + assert isinstance(settings.publisher, PublisherParams) + assert settings.name + publisher_instance = await publisher(settings.publisher) + consumer_instance = await consumer(settings.consumer) + instance = Service( + publisher=publisher_instance, + consumer=consumer_instance, + **settings.get_params_dict(), + ) + return instance diff --git a/mela/log.py b/mela/log.py new file mode 100644 index 0000000..f317700 --- /dev/null +++ b/mela/log.py @@ -0,0 +1,12 @@ +import logging +import sys + + +root = logging.getLogger() +root.setLevel(logging.INFO) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +root.addHandler(handler) diff --git a/mela/processor.py b/mela/processor.py new file mode 100644 index 0000000..b31fb99 --- /dev/null +++ b/mela/processor.py @@ -0,0 +1,239 @@ +import asyncio +import inspect +import json +from functools import partial +from logging import Logger +from typing import Any +from typing import Callable +from typing import Dict +from typing import ForwardRef +from typing import Iterable +from typing import Optional +from typing import Tuple +from typing import Type +from typing import Union +from warnings import warn + +from aio_pika import IncomingMessage +from aio_pika import Message +from aio_pika.abc import AbstractIncomingMessage +from aio_pika.abc import AbstractMessage +from anyio.to_thread import run_sync +from pydantic import BaseModel +from pydantic import validate_arguments +from pydantic.typing import evaluate_forwardref + +from .abc import AbstractPublisher +from .abc import AbstractRPCClient +from .abc import AbstractSchemeRequirement + + +class Processor: + + static_param_classes = [Logger, AbstractPublisher, AbstractRPCClient] + + async def __call__(self, *args, **kwargs): + return await self.__process(*args, **kwargs) + + def __init__( + self, + call: Callable, + input_class: Optional[Type[BaseModel]] = None, + validate_args: bool = False, + ): + self._call = call + if validate_args: + self._call = validate_arguments( + config={ + 'arbitrary_types_allowed': True, + }, + )(self._call) + if not self._is_coroutine(): + self.__process = self.__process_sync # type: ignore + self._input_class = input_class + self._signature = self._get_typed_signature() + self._params = self._get_typed_parameters() + self._static_params: Iterable[inspect.Parameter] = [] + self._dynamic_params: Iterable[inspect.Parameter] = [] + self._split_static_and_dynamic_params() + self._cached_static_params: Dict[str, Any] = {} + if self._input_class is None: + self._get_data_class() + self._select_solver() + self._have_static_params = False + + async def __process(self, *args, **kwargs): + return await self._call(*args, **kwargs) + + async def __process_sync(self, *args, **kwargs): + func = partial(self._call, **kwargs) + return await run_sync(func, *args) + + def cache_static_params(self, component, scheme): + for param in self._static_params: # type: inspect.Parameter + if param.annotation is Logger: + self._cached_static_params[param.name] = component.log + elif issubclass(param.annotation, AbstractPublisher): + self._cached_static_params[param.name] = scheme.publisher(param.default) + elif issubclass(param.annotation, AbstractRPCClient): + self._cached_static_params[param.name] = scheme.rpc_client(param.default) + else: + raise TypeError("Static param cannot be solved") + + async def solve_requirements(self, settings): + updated_values = {} + for param_name, requirement in self._cached_static_params.items(): + if isinstance(requirement, AbstractSchemeRequirement): + updated_values[param_name] = await requirement.resolve(settings) + self._cached_static_params.update(updated_values) + if self._cached_static_params: + self._call = partial(self._call, **self._cached_static_params) + + def call_sync(self, *args, **kwargs): + assert not self._is_coroutine() + return self._call(*args, **kwargs) + + def _split_static_and_dynamic_params(self): + for param in self._params: # type: inspect.Parameter + if param.annotation in self.static_param_classes: + self._static_params.append(param) + self._have_static_params = True + else: + self._dynamic_params.append(param) + + @staticmethod + def wrap_response( + result: Union[Dict, BaseModel, Message], + routing_key: Optional[str] = None, + ) -> Tuple[Message, Optional[str]]: + if isinstance(result, Message): + return result, routing_key + elif isinstance(result, dict): + return Message(json.dumps(result).encode()), routing_key + elif isinstance(result, BaseModel): + json_encoded = result.json(by_alias=True).encode() + return Message(json_encoded), routing_key + + async def process(self, message: AbstractIncomingMessage) -> Tuple[Message, Optional[str]]: + solved_params = self._solve_dependencies(message) + result = await self(**solved_params) + wrapped_result = self.wrap_response(result) + return wrapped_result + + @staticmethod + def _get_typed_annotation(param: inspect.Parameter, globalns: Dict[str, Any]) -> Any: + annotation = param.annotation + if isinstance(annotation, str): + annotation = ForwardRef(annotation) + annotation = evaluate_forwardref(annotation, globalns, globalns) + return annotation + + def _get_typed_signature(self) -> inspect.Signature: + signature = inspect.signature(self._call) + globalns = getattr(self._call, "__globals__", {}) + typed_params = [ + inspect.Parameter( + name=param.name, + kind=param.kind, + default=param.default, + annotation=self._get_typed_annotation(param, globalns), + ) + for param in signature.parameters.values() + ] + typed_signature = inspect.Signature(typed_params) + return typed_signature + + def _get_typed_parameters(self) -> Iterable[inspect.Parameter]: + return self._get_typed_signature().parameters.values() + + def _is_coroutine(self) -> bool: + return asyncio.iscoroutinefunction(self._call) + + def _have_no_annotations(self): + for param in self._params: + if param.annotation is not inspect.Parameter.empty: + return False + return True + + def _oldstyle_annotations(self): + params = list(self._params) + if ( + params[0].annotation is inspect.Parameter.empty + or params[0].annotation is dict + ) and issubclass(params[1].annotation, AbstractMessage): + return True + return False + + def _get_data_class(self): + message_class_candidate = None + if self._input_class is None: + for param in self._params: + if issubclass(param.annotation, BaseModel): + if message_class_candidate is None: + message_class_candidate = param.annotation + else: + # Two different base classes found. We are not + # sure which should be used to parse message. + message_class_candidate = None + raise AssertionError("Two different data classes are found") + self._input_class = message_class_candidate + return self._input_class + + def _solve_dependencies_for_data_class( + self, + message: IncomingMessage, + ) -> Dict[str, Any]: + solved = {} + assert self._input_class + parsed_message = self._input_class.parse_raw(message.body) + parsed_message_dict = parsed_message.dict(exclude_unset=True) + for param in self._dynamic_params: # type: inspect.Parameter + if param.annotation is IncomingMessage: + solved[param.name] = message + elif issubclass(param.annotation, BaseModel): + solved[param.name] = parsed_message # type: ignore + elif param.name in parsed_message_dict: + solved[param.name] = parsed_message_dict[param.name] + else: + raise KeyError(f"Key `{param.name}` cannot be solved by dataclass solver") + + return solved + + def _solve_dependencies(self, message: AbstractIncomingMessage) -> Dict[str, Any]: + raise NotImplementedError("Dependency solver is not set") + + def _solve_dependencies_for_raw_json(self, message: AbstractIncomingMessage) -> Dict[str, Any]: + solved = {} + parsed_message = json.loads(message.body) + for param in self._dynamic_params: # type: inspect.Parameter + if param.annotation is IncomingMessage: + solved[param.name] = message + elif param.name in parsed_message: + solved[param.name] = parsed_message[param.name] + else: + raise KeyError(f"Key `{param.name}` cannot be solved by JSON solver") + return solved + + def _solve_dependencies_oldstyle(self, message: AbstractIncomingMessage) -> Dict[str, Any]: + solved = {} + parsed_message = json.loads(message.body) + for i, param in enumerate(self._dynamic_params): + if i == 0: + solved[param.name] = parsed_message + elif i == 1: + solved[param.name] = message + else: + raise KeyError("Too much arguments for oldstyle solver") + return solved + + def _select_solver(self): + if len(list(self._params)) == 2 and ( + self._have_no_annotations() or self._oldstyle_annotations() + ): + warn("Oldstyle injections are deprecated. " + "Update your processor definition", DeprecationWarning) + self._solve_dependencies = self._solve_dependencies_oldstyle + elif self._input_class: + self._solve_dependencies = self._solve_dependencies_for_data_class + else: + self._solve_dependencies = self._solve_dependencies_for_raw_json diff --git a/mela/scheme/__init__.py b/mela/scheme/__init__.py new file mode 100644 index 0000000..5246153 --- /dev/null +++ b/mela/scheme/__init__.py @@ -0,0 +1,123 @@ +import warnings +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Type + +from pydantic import BaseModel + +from ..processor import Processor +from ..settings import ConsumerParams +from ..settings import PublisherParams +from ..settings import RPCParams +from ..settings import ServiceParams +from .requirement import SchemeRequirement + + +class MelaScheme: + + """ + Scheme is not runnable. It just declare relations between app components. + """ + + def __init__( + self, + name: str, + ): + self.name: str = name + self.requirements: Dict['str', SchemeRequirement] = {} + + def register_component_requirement(self, requirement: SchemeRequirement): + if requirement.name in self.requirements: + raise KeyError(f"Looks like requirement with name `{requirement.name}` already exists") + self.requirements[requirement.name] = requirement + + def service( + self, + name: str, + params: Optional[ServiceParams] = None, + validate_args: bool = False, + input_class: Type[BaseModel] = None, + ) -> Callable[[Callable], Callable]: + requirement = SchemeRequirement(name, 'service', params) + + def decorator(func: Callable) -> Callable: + processor = Processor( + func, + input_class=input_class, + validate_args=validate_args, + ) + requirement.set_processor(processor) + return processor + + self.register_component_requirement(requirement) + return decorator + + def publisher( + self, + name: str, + params: Optional[PublisherParams] = None, + ) -> SchemeRequirement: + requirement = SchemeRequirement(name, 'publisher', params) + self.register_component_requirement(requirement) + return requirement + + def consumer( + self, + name: str, + params: ConsumerParams = None, + validate_args: bool = False, + input_class: Type[BaseModel] = None, + ) -> Callable[[Callable], Callable]: + requirement = SchemeRequirement(name, 'consumer', params) + self.register_component_requirement(requirement) + + def decorator(func: Callable[..., Any]) -> Callable: + processor = Processor( + func, + input_class=input_class, + validate_args=validate_args, + ) + requirement.set_processor(processor) + return processor + return decorator + + def rpc_service( + self, + name: str, + params: Optional[RPCParams] = None, + validate_args: bool = False, + request_model: Type[BaseModel] = None, + ): + requirement = SchemeRequirement(name, 'rpc_service', params) + self.register_component_requirement(requirement) + + def decorator(func: Callable[..., Any]) -> Callable: + processor = Processor( + func, + input_class=request_model, + validate_args=validate_args, + ) + requirement.set_processor(processor) + return processor + return decorator + + def rpc_server(self, *args, **kwargs): + warnings.warn("`rpc_server` component decorator will be " + "deprecated soon. Use `rpc_service instead`", DeprecationWarning) + return self.rpc_service(*args, **kwargs) + + def rpc_client( + self, + name: str, + params: Optional[RPCParams] = None, + ) -> SchemeRequirement: + requirement = SchemeRequirement(name, 'rpc_client', params) + self.register_component_requirement(requirement) + return requirement + + def merge(self, other: 'MelaScheme') -> 'MelaScheme': + for requirement in other.requirements.values(): + self.register_component_requirement(requirement) + return self diff --git a/mela/scheme/requirement.py b/mela/scheme/requirement.py new file mode 100644 index 0000000..086514b --- /dev/null +++ b/mela/scheme/requirement.py @@ -0,0 +1,59 @@ +from typing import Callable +from typing import Literal +from typing import Mapping +from typing import Optional + +from ..abc import AbstractSchemeRequirement +from ..factories import factory_dict +from ..settings import ComponentParamsBaseModel + + +class SchemeRequirement(AbstractSchemeRequirement): + + def __init__( + self, + name: str, + type_: Literal['publisher', 'consumer', 'service', 'rpc_service', 'rpc_client'], + params: Optional[ComponentParamsBaseModel] = None, + processor: Optional[Callable] = None, + ): + self.name = name + self.type_ = type_ + self.params = params + self.factory = factory_dict[type_] + self.processor = processor + + async def _resolve(self, settings): + if self.params: + return await self.factory(self.params) + + registry: Mapping[str, ComponentParamsBaseModel] + + if self.type_ == 'publisher': + registry = settings.publishers + elif self.type_ == 'consumer': + registry = settings.consumers + elif self.type_ == 'service': + registry = settings.services + elif self.type_ == 'rpc_service': + registry = settings.rpc_services + else: + raise NotImplementedError(f"Unknown component type: `{self.type_}`") + + if self.name not in registry: + raise KeyError( + f"{self.type_.title()} `{self.name}` is not described in app settings, " + f"also params are not provided", + ) + return await self.factory(registry[self.name]) + + async def resolve(self, settings): + resolved = await self._resolve(settings) + if self.processor: + resolved.set_processor(self.processor) + return resolved + + def set_processor(self, processor: Optional[Callable]): + assert self.processor is None, (f"Processor for component " + f"requirement `{self.name}` already set") + self.processor = processor diff --git a/mela/settings/__init__.py b/mela/settings/__init__.py new file mode 100644 index 0000000..dad463e --- /dev/null +++ b/mela/settings/__init__.py @@ -0,0 +1,423 @@ +import abc +from typing import Any +from typing import Dict +from typing import Optional +from typing import Tuple +from typing import Union + +import envyaml +from aio_pika import ExchangeType +from aio_pika.abc import AbstractExchange +from aio_pika.abc import AbstractQueue +from pydantic import AmqpDsn +from pydantic import BaseModel +from pydantic import BaseSettings +from pydantic import Extra +from pydantic import Field +from pydantic import PrivateAttr +from pydantic.env_settings import SettingsSourceCallable + + +def yaml_config_settings_source(settings: 'BaseSettings') -> Dict[str, Any]: + """ + A simple settings source that loads variables from a YAML file + at the project's root. + """ + yaml_config = envyaml.EnvYAML( + settings.Config.yaml_file_path, # type: ignore + include_environment=False, + ) + clear_config = {} + for key, value in dict(yaml_config).items(): + if '.' not in key: + clear_config[key] = value + return clear_config + + +class AbstractConnectionParams(BaseModel, abc.ABC): + name: Optional[str] = None + + @abc.abstractmethod + def get_params_dict(self) -> Dict[str, Any]: + raise NotImplementedError + + class Config: + extra = Extra.forbid + + +class ConnectionParams(AbstractConnectionParams): + host: str + port: int + login: str = Field(alias='username') + password: str + virtualhost: str = '/' + ssl: bool = False + ssl_options = dict + timeout: Optional[Union[float, int]] = None + client_properties: Optional[Dict] = None + heartbeat: Optional[int] = None + + def get_params_dict(self): + if self.name: + self.client_properties = {'connection_name': self.name} + res = self.dict(exclude={'name'}) + return res + + +class URLConnectionParams(AbstractConnectionParams): + + url: AmqpDsn + + def get_params_dict(self): + return self.dict(exclude={'name'}) + + +class ExchangeParams(BaseModel): + _instance: Optional[AbstractExchange] = PrivateAttr(default=None) + + name: str + type: ExchangeType = ExchangeType.DIRECT + durable: bool = True + auto_delete: bool = False + internal: bool = False + passive: bool = False + arguments: Optional[Dict] = None + timeout: Optional[Union[float, int]] = None + + def get_params_dict(self): + return self.dict() + + +class QueueParams(BaseModel): + _instance: Optional[AbstractQueue] = PrivateAttr(default=None) + + name: str + durable: bool = True + exclusive: bool = False + passive: bool = False + auto_delete: bool = False + dead_letter_exchange: Optional[Union[str, ExchangeParams]] = None + dead_letter_exchange_type: ExchangeType = ExchangeType.DIRECT + dead_letter_routing_key: Optional[str] = None + + def solve_dead_letter_exchange(self, exchanges: Dict['str', 'ExchangeParams']): + if isinstance(self.dead_letter_exchange, str): + exchange = exchanges.get(self.dead_letter_exchange) + if not exchange: + exchange = ExchangeParams( + name=self.dead_letter_exchange, + type=self.dead_letter_exchange_type, + ) + self.dead_letter_exchange = exchange + + def get_params_dict(self): + arguments = {} + if self.dead_letter_exchange: + arguments['x-dead-letter-exchange'] = self.dead_letter_exchange.name + arguments['x-dead-letter-routing-key'] = self.dead_letter_routing_key + return { + 'name': self.name, + 'durable': self.durable, + 'exclusive': self.exclusive, + 'passive': self.passive, + 'auto_delete': self.auto_delete, + 'arguments': arguments, + } + + def solve(self, settings: 'Settings'): + self.solve_dead_letter_exchange(settings.exchanges) + + +class ComponentParamsBaseModel(BaseModel, abc.ABC): + name: Optional[str] = None + log_level: str = 'info' + + @abc.abstractmethod + def solve(self, settings: 'Settings'): + raise NotImplementedError + + @abc.abstractmethod + def get_params_dict(self) -> Dict: + raise NotImplementedError + + +class PublisherParams(ComponentParamsBaseModel): + connection: Union[str, ConnectionParams, URLConnectionParams] = 'default' + exchange: Union[str, ExchangeParams] + exchange_type: str = 'direct' # DEPRECATED will be deleted in v1.2.0 + routing_key: str + skip_unroutables: bool = False + queue: Optional[Union[str, QueueParams]] = None + timeout: Optional[Union[int, float]] = None + + def solve_connection( + self, + connections: Dict[str, Union[ConnectionParams, URLConnectionParams]], + ) -> None: + if isinstance(self.connection, str): + if self.connection not in connections: + raise KeyError(f"Connection `{self.connection}` is not described in config") + self.connection = connections[self.connection] + + def solve_exchange(self, exchanges: Dict[str, ExchangeParams]) -> None: + if isinstance(self.exchange, str): + exchange = exchanges.get(self.exchange) + if not exchange: + exchange = ExchangeParams(name=self.exchange, type=self.exchange_type) + exchanges[self.exchange] = exchange + else: + if self.exchange_type != exchange.type: + raise AssertionError("Exchange is configured with two different types") + self.exchange = exchange + + def solve_queue(self, queues: Dict[str, QueueParams]): + if isinstance(self.queue, str): + queue = queues.get(self.queue) + if queue is None: + queue = QueueParams(name=self.queue) + queues[self.queue] = queue + self.queue = queue + + def solve(self, settings: 'Settings', parent_name: Optional[str] = None): + self.solve_connection(settings.connections) + self.solve_exchange(settings.exchanges) + if self.queue: + self.solve_queue(settings.queues) + assert isinstance(self.queue, QueueParams) + self.queue.solve(settings) + if parent_name and self.name is None: + self.name = parent_name + '_publisher' + + def get_params_dict(self): + return { + 'name': self.name, + 'default_timeout': self.timeout, + 'default_routing_key': self.routing_key, + } + + +class ConsumerParams(ComponentParamsBaseModel): + connection: Union[str, ConnectionParams, URLConnectionParams] = 'default' + exchange: Union[str, ExchangeParams] + exchange_type: str = 'direct' # DEPRECATED will be deleted in v1.2.0 + routing_key: str + queue: Union[str, QueueParams] + prefetch_count: int = 1 + dead_letter_exchange: Optional[str] = None + dead_letter_routing_key: Optional[str] = None + requeue_broken_messages: bool = True + + def solve_connection( + self, + connections: Dict[str, Union[ConnectionParams, URLConnectionParams]], + ) -> None: + if isinstance(self.connection, str): + if self.connection not in connections: + raise KeyError(f"Connection `{self.connection}` is not described in config") + self.connection = connections[self.connection] + + def solve_exchange(self, exchanges: Dict[str, ExchangeParams]) -> None: + if isinstance(self.exchange, str): + exchange = exchanges.get(self.exchange) + if self.exchange_type != 'direct': + # TODO deprecation warning + pass + if not exchange: + exchange = ExchangeParams(name=self.exchange, type=self.exchange_type) + else: + if self.exchange_type != exchange.type: + raise AssertionError("Exchange is configured with two different types") + self.exchange = exchange + + def solve_queue(self, queues: Dict[str, QueueParams]): + if isinstance(self.queue, str): + queue = queues.get(self.queue) + if queue is None: + queue = QueueParams( + name=self.queue, + dead_letter_exchange=self.dead_letter_exchange, + dead_letter_routing_key=self.dead_letter_routing_key, + ) + queues[self.queue] = queue + self.queue = queue + + def solve(self, settings: 'Settings', parent_name: Optional[str] = None): + self.solve_connection(settings.connections) + self.solve_exchange(settings.exchanges) + self.solve_queue(settings.queues) + assert isinstance(self.queue, QueueParams) + self.queue.solve(settings) + if parent_name and self.name is None: + self.name = parent_name + '_consumer' + + def get_params_dict(self): + return { + 'name': self.name, + 'prefetch_count': self.prefetch_count, + 'requeue_broken_messages': self.requeue_broken_messages, + 'log_level': self.log_level, + } + + +class ServiceParams(ComponentParamsBaseModel): + + consumer: Union[str, ConsumerParams] + publisher: Union[str, PublisherParams] + requeue_broken_messages: Optional[bool] = None + + name: Optional[str] = None + + def solve_consumer(self, consumers: Dict[str, ConsumerParams]): + if isinstance(self.consumer, str): + if self.consumer not in consumers: + raise KeyError(f"Consumer `{self.consumer}` is not declared") + self.consumer = consumers[self.consumer] + if self.requeue_broken_messages is not None: + self.consumer.requeue_broken_messages = self.requeue_broken_messages + + def solve_publisher(self, publishers: Dict[str, PublisherParams]): + if isinstance(self.publisher, str): + if self.publisher not in publishers: + raise KeyError(f"Publisher `{self.publisher}` is not declared") + self.publisher = publishers[self.publisher] + + def solve(self, settings: 'Settings', deep=True): + self.solve_consumer(settings.consumers) + self.solve_publisher(settings.publishers) + assert isinstance(self.consumer, ConsumerParams) + assert isinstance(self.publisher, PublisherParams) + if deep: + self.consumer.solve(settings, parent_name=self.name) + self.publisher.solve(settings, parent_name=self.name) + + def get_params_dict(self) -> Dict: + return { + 'log_level': self.log_level, + 'name': self.name, + } + + +class RPCParams(ComponentParamsBaseModel): + connection: Union[str, ConnectionParams, URLConnectionParams] = 'default' + worker: Optional[ConsumerParams] = None + response_publisher: Optional[PublisherParams] = None + request_publisher: Optional[PublisherParams] = None + + dead_letter_exchange: Optional[str] = None + dead_letter_routing_key: Optional[str] = None + + exchange: Union[str, ExchangeParams] + routing_key: str + queue: Union[str, QueueParams] + response_exchange: Union[str, ExchangeParams] + + def solve_connection( + self, + connections: Dict[str, Union[ConnectionParams, URLConnectionParams]], + ) -> None: + if isinstance(self.connection, str): + if self.connection not in connections: + raise KeyError(f"Connection `{self.connection}` is not described in config") + self.connection = connections[self.connection] + + def solve_exchanges(self, exchanges: Dict[str, ExchangeParams]) -> None: + if isinstance(self.exchange, str): + exchange = exchanges.get(self.exchange) + if not exchange: + exchange = ExchangeParams(name=self.exchange) + self.exchange = exchange + if isinstance(self.response_exchange, str): + exchange = exchanges.get(self.response_exchange) + if not exchange: + exchange = ExchangeParams(name=self.response_exchange) + self.response_exchange = exchange + + def solve_queue(self, queues: Dict[str, QueueParams]): + if isinstance(self.queue, str): + queue = queues.get(self.queue) + if queue is None: + queue = QueueParams( + name=self.queue, + dead_letter_exchange=self.dead_letter_exchange, + dead_letter_routing_key=self.dead_letter_routing_key, + ) + queues[self.queue] = queue + self.queue = queue + + def solve(self, settings: 'Settings'): + self.solve_connection(settings.connections) + self.solve_exchanges(settings.exchanges) + self.solve_queue(settings.queues) + assert self.name + if self.worker is None: + self.worker = ConsumerParams( + name=self.name + '_service', + connection=self.connection, + exchange=self.exchange, + routing_key=self.routing_key, + queue=self.queue, + ) + if self.response_publisher is None: + self.response_publisher = PublisherParams( + name=self.name + '_response_publisher', + connection=self.connection, + exchange=self.response_exchange, + routing_key='', + ) + if self.request_publisher is None: + self.request_publisher = PublisherParams( + name=self.name + '_request_publisher', + connection=self.connection, + exchange=self.exchange, + routing_key=self.routing_key, + ) + + def get_params_dict(self) -> Dict: + pass + + +class Settings(BaseSettings): + + connections: Dict[str, Union[ConnectionParams, URLConnectionParams]] = {} + services: Dict[str, ServiceParams] = {} + consumers: Dict[str, ConsumerParams] = {} + publishers: Dict[str, PublisherParams] = {} + exchanges: Dict[str, ExchangeParams] = {} + queues: Dict[str, QueueParams] = {} + rpc_services: Dict[str, RPCParams] = Field(default_factory=dict, alias='rpc-services') + + def __init__(self, **values: Any): + super().__init__(**values) + for connection_name, connection in self.connections.items(): + connection.name = connection_name + for rpc_name, rpc_config in self.rpc_services.items(): + rpc_config.name = rpc_name + rpc_config.solve(self) + for service_name, service_config in self.services.items(): + service_config.name = service_name + service_config.solve(self, deep=True) + for consumer_name, consumer_config in self.consumers.items(): + consumer_config.name = consumer_name + consumer_config.solve(self) + for publisher_name, publisher_config in self.publishers.items(): + publisher_config.name = publisher_name + publisher_config.solve(self) + + class Config: + yaml_file_path = 'application.yml' + + extra = Extra.ignore + + @classmethod + def customise_sources( + cls, + init_settings: SettingsSourceCallable, + env_settings: SettingsSourceCallable, + file_secret_settings: SettingsSourceCallable, + ) -> Tuple[SettingsSourceCallable, ...]: + return ( + init_settings, + env_settings, + yaml_config_settings_source, + file_secret_settings, + ) diff --git a/requirements.production.txt b/requirements.production.txt index f29a02c..74f8fc1 100644 --- a/requirements.production.txt +++ b/requirements.production.txt @@ -1,2 +1,2 @@ -aio-pika==6.8.0 +aio-pika==8.0.3 envyaml==1.6.210210 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f29a02c..2378a85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -aio-pika==6.8.0 -envyaml==1.6.210210 \ No newline at end of file +aio-pika==8.2.2 +envyaml==1.10.211231 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..9695480 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,32 @@ +[flake8] +max-line-length = 99 +max-complexity = 7 +docstring-convention = google +ignore = S101, W503 + +[isort] +multi_line_output = 3 +lines_after_imports = 2 +force_single_line = True +include_trailing_comma = True +force_grid_wrap = 0 +use_parentheses = True +ensure_newline_before_comments = True +line_length = 99 + +[mypy] +plugins = pydantic.mypy +ignore_missing_imports = True + +; [tool:pytest] +; asyncio_mode = auto +; addopts= +; -n 4 +; --cov=app +; app +; tests + +[coverage:report] +show_missing = True +exclude_lines = + if __name__ == .__main__.: diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..e69de29