это конспект доклада Е.Борисова и К.Толкачова https://www.youtube.com/watch?v=4QpnCbi5QyQ Доклад про реактивщину в сложных приложениях, и как ее настроить. ProjectReact и WebFlux.
По задумке у нас есть почтальон (Печкин),который отправляет письма снеограниченной скоростью. Эти письма проверяет Большой Брат, но скорость его проверки ограничена. Если письмо подозрительное, онопередается Агенту Смиту, онужеработает с отправителем, и работает еще более медленно. Задача в том, чтобы оптимизировать тредлпулы, буферы, сгладить нагрузку, чтобы буферы не переполнялись, почтальоны отсылали письма с нужной скоростью,и т.д.
11:00 Начнём с ProjectReact, Просто сделаем пруф оф concept и посмотрим если это как бы нам поможет потом на продаже (скорее всеготак не будет:) Есть у нас есть три персонажа - большой брат печкин и смит, Давайте попробуем что-нибудь с помощью них сэмулировать на projectreactor'e, как это будет вообще выглядеть. 11:35 Cделаем как бы какой-то простой тест, котором у нас там magic(?) соответственно. Первым из первым делом мы создаем всех этих трех ребят. Теперь наш Печкин должен сгенерировать какой-то поток писем, то есть мы говорим pechkin.letters(). 12:19 Cделаем этот метод. Мы уже упомянули про projectReactor, там есть такая штука как Flux, то есть некоторый поток писем, в которой и месяца(?) новые сообщения, в нашем случае это будут объекты писем вот у нас уже есть какой-то подозрительный e-mail, с которым мы будем работать. И мы должны сгенерировать поток писем. так как это все-таки демка, сгенерируем простым способом, ну и пока больше нам ничего не нужно в принципе.
return Flux.generate(synchronousSink -> {synchronousSink.next(new SuspiciousEmail());
(кто не знаком совсем с концепцией Flux, можете плюс-минус представлять, что это стримы, все оч похоже)
13:02 потом покажем, что это было в нашем старом императивным коде, когда мы как воде когда мы писали без project реактор и нас с помощью спринг MVC, это было,условно, while(true), который как-то куда-то что-то отправлял, но здесь стоит вопрос - а с какой скоростью вообще вся эта шняга работает? к этому мы сейчас вернемся, давайте к нашему пруф-оф-концептиу впрнемся, у нас есть этот Flux, дальше мы должны ну передать его дальше, то есть мы должны сказать: "а теперь отдай его вот соответственно большому брату а он там ну типа как ты там проанализирует".
pechkin.letters()
.transform(BigBrother::analyze());
13:39 Cоздадим метод analyze(), возвращает уже проанализированнып письма Flux, а на входе все те же самые подозрительные e-mail. Вот у нас есть наш мы должны условно как ты его проанализировать, не будем заморачиваться как именно, просто return new AnalyzedEmail()
public Flux<AnalyzedEmail> analyze(Flux<SuspiciousEmail> suspiciousEmailFlux) {
return suspiciousEmailFlux
.log("bigbrother-")
.map(suspiciousEmail -> new AnalyzedEmail());
}
13:51 В общем то эта вся полезная работа которую он должен сделать но мы же как-то эмулируем всю эту работу поэтому нам нужно логировать (как долго работает, или еще что-то). И то же самое мы сделаем с нашим Печкиным - мы скажем, чтобы логировалось нормально 14:35 Дальше мы должны сэмулировать более длительную работу, сделаем это самым кондовым способом
.doOnNext(analyzedEmail->{TimeUnit.MILLISECONDS.sleep(50)});
14:55 сделаем чтобы эмулировать какую-то работу. 15:01 и для того чтобы это работало отдельно от других сервисов и они не шли одним потоком....Да хотя потом прикрутим, а пока что так запустим.
15:12
остался последний чувак в нашей цепочке - агент Smith, который производит карательные действия над теми кто пишет письма reactFor(smith::reactFor)
, вот он тоже у нас должен что-то вернуть - он должен вернуть Flux reactionForEmail (наказать, запретить, еще что-нибудь.. с React'ом не связано:)
да он в принципе делает ровно то же самое да - то есть мы тоже ставим у него лог и говорим что это у нас "smith-", чтобы мы видели что происходит. Затем мы делаем какую-то полезную работу - в нашем случае создаем супер полезный объект, и вот он у нас уходит ну и конечно же мы должны сэмулировать, что мы делали это очень
долго, в этот раз давайте подольше да - все-таки,более ответственная работа, полсекунды поспим.
16:13 Вроде похоже на правду, давайте смотреть что из этого вышло. Просто вызовем после всех "transform-ов" .blockLast(), и увидим что происходит: сейчас у нас должны появиться какие-то логи, и вся система должна генерировать эти письма с какой-то скоростью.
16:47 смотрим что происходит: вот у нас пошли какие то сообщения видно что идут они не очень быстро вы помните должна мы хотим проверить мы хотим проверить что у нас есть бэк прыжок потому что печкин 17:00 генерит как не в себя эти сообщения, следующий работают медленнее, потому что они спят. Но при этом мы хотим видеть, что как бы это все же как-то сбалансируется-затормозится и никакие письма не пропадут, что просто кто-то будет этого печкина подтормаживать,и он будет с нужной скоростью эти сообщения генерить. В данный момент мы видим, что письма генерируется достаточно медленно - со скоростью в полсекунды, как раз как мы выставляли в самом медленном нашем сервисе.
17:25 Но выполняются они все на одном потоке. То есть они на самом деле сейчас зависят друг от друга, этот поток которому мы запускаем приложение. все они последовательны, это совершенно не интересно, потому что мы пытаемся симулировать ситуацию, в которой они независимы.
мы сейчас что доказали что бэк-прежур (back-pressure) на одном потоке у нас действительно работает. Вопрос, что будет если мы сейчас прикрутим экзекутор сервис, у нас будет разное количество потоков на разных частях системы - сможет ли тогда projectReactor это все сбалансировать? И тут мы говорим, что вот допустим для Смита, предположим, пусть будет два потока
.publishOn(Schedulesr.fromExecutor(PohHelper.executor("smith-", 2)))
то есть этот метод у нас просто генерирует просто экзекутор? авот и PocHelper, там есть ExecutorService, тредпул - это нам все сейчас понадобится
public class PocHelper{
p s ExecutorService executor(String prefix, int threads){
return executor.newFixedThreadPool(threads, new CustomizableThreadFactory(prefix));
}
}
18:12
Для того чтобы как-то следить за тем как это работает мы сказали выполняя всю полезную работу на вот этом карт пули и такую штуку мы в принципе в каждого сейчас запихнем, чтобы можно было все это как-то наблюдать. только назовем их Печкин и поставим кол-во тредов 1, и БигБро сделаем с 4 тредами. Теперь мы должны увидеть картину, в которой они стали как минимум должны независимо исполнять полезную работу на разных потоках. Вот мы видим что агент смит работает, все остальные почему-то нет. вот тут печкин поработал, а большой брат где-то посередине застрял.
19:22
Давай каунт прикрутим сразу, что было видно. вот как замерить что у нас система не имеет "долгов", и ничего никуда не теряется? Очень просто: мы делаем статический каунтер, и тот кто посылает письма, накручивает этот каунтер, а тот кто обрабатывает его, скручивает обратно.
Для этого заведем в PocHelper'e public static final AtomicInteger COUNTER = new ...
(атомик потому что мультитред)
(Печкин)
.doOnNext(suspiciousEmail->PocHelper.COUNTER.incrementAndGet())
И мы все время будем выводить его на экран, чтобы увидеть как растет наш долг, ну ка бы самым тупым способом. Нас сейчас в принципе это не сильно интересует, но какую-то примерную картинку мы получим, даже если она будет не совсем корректной. Ну а здесь мы это дело выведем на то есть у нас есть sout и напишем сколько нас сейчас этот каунтер
(тест)
.doOnNext(reactionForEmail->sout(" = " + PocHelper.COUNTER.decrementAndGet()))
19:59 генератор будет его все время увеличивать, Смит будет все время уменьшать, давайте смотреть что у нас получилось? В идеале где-то число должно флуктутрировать около 0, но мы видим что тут достаточно большие значения. Видим что вначале у нас улетел этот самый request (unbounded) и request(256) два раза - давайте разберется -откуда берется такая цифра 256 - вроде не настраивали сколько - что происходит? на уж нас каунтер 256, а в конце вот уже 400 да то есть но он снова уменьшается. и потом он снова увеличится то есть у нас есть какой-то бэк-прешшур, то есть реакция на то что нам посылают достаточно много сообщений, а мы в ответ посылаем, условно, команду "давай нам поменьше", просто путем того что говорим сколько дать. 20:50 предположим,это так и работает. но есть плохая ситуация, давайте попробуем ее сэмулировать, это поможет нам понять вообще зачем это все нужно. можно в нашем тесте воткнуться перед всеми трансформациями и сказать, что doOnSubscribe() получить наш subscription и сказать в нем сколько нам нужно
.doOnSubscribe(subscription->subscription.request(Long.MAX_VALUE))
вот очень много нам нужно - и это важно, потому что на самом деле вот эти цифры, которые мы видели с вами в логе, вот эти (256) и (unbounded) - откуда они взялись? .blockLast()
условно делает подписку на этот наш стрим, и он просит "очень много", он посылает максВэлью.
После него идет наш агент смит, у него нет никаких явных request(n). Но publishOn() преобразует наш стрим немного в другой, потому что он заставляет исполняться его на другом потоке, и у него есть как раз вот эти самые методики того, сколько в эти потоки можно понапихать. И он по умолчанию делает request(256), то есть наш бесконечный запрос "дай нам максимально много" уже превратился в 256 и полетел дальше по стриму. дальше соответственно тоже такая же ситуация у нас большим братом, здесь тоже есть publishOn() и он тоже 256 отправляет. Ну а с Печкиным просто, он у него такого нет, потому что он никому не отправляет, он просто генерирует это все дело, и никаких вопросов у него не возникает.
22:15
Соответственно, можно попробовать здесь сделать некоторыё хак: в этом publishOn() мы добавим 4й параметр (3й ставим false -он нужен чтобы просто показать ошибку), его число будет говорить сколько мы запрашиваем. Если не ставить его, он просит столько, сколько может влезть в буфер (256 это дефолтное значение которое совпадает со значением буфера). Мы сейчас поставим 8, и смотрим что произошло.
Тут надо читать снизу вверх, с самого последнего кто подписался,потому что собирается с обратной стороны. То есть наш агент смит попросил 256, потом наш большой брат просил 8, пошел-пошел процесс - и тут бабах какой-то, что случилось не так?
"Queue is full" - то есть это какие-то как раз та штука, которая генерирует эти сообщения. Почему так случилось случилось? так просто потому что мы вот воткнулись с вами в нашем тесте, и сказали что вне зависимости от того что попросил большой брат, дай нам Long.MAX_VALUE, а печкин генерирует с большой скоростью, и вместо того чтобы дать 8 как попросили, он начал отдавать сообщения с максимально скоростью, они все прилетели в большого брата - а у него буфер всего 8, у него буфер гораздо меньше (на самом деле 8 это prefetch, но несмотря на то что буфер другого размера но оно все равно не влезло), потому что начало выдаваться с большой скоростью, и вот поэтому у нас выпал эксепшен, ничего хорошего в этом нет. Нам же так делать не нужно мы сами не будем себе вредить поэтому, это нам не нужно, но когда мы делаем ПруфОфКонцепт, нам обязательно нужно что-то сломать. И когда мы отдадим этот концепт, мы скажем - а мы знаем как это можно сломать, но мы так делать не будем уже теперь :)
24:00 Зато мы отлично поняли, как это работает. И если вот таких хаков не делать, то получается у нас есть бэк-прешшур из коробки, queue не переполняется, все тормозится, можно идти в продакшен. Но последний момент, накоторый надообраьтить внимание перед продакшном (и на который мы будем обращать внимание дальше) - это вот этот request(.) Мы увидели, что он исполняется в начале, но он у нас будет исполняться и дальше. Вначале было request(8), затем если поискать будет request(6). Видно что проходит какое-то количество сообщений, и снова 6, тут снова 6. Каунтер крутится, растет, падает, а реквест почему 6? потому что он по умолчанию начинает послать этот request, когда 75 процентов буфера заполнено - вот и всё, если бы там было 32, то он бы послал 24 (проверим - так и есть) Итак, с POC на этом всё, следующим шагом идём в продакшен :)