From af96f5c6d6f69a234789e549dcad395dc2abb5fb Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Mon, 23 Oct 2023 17:08:49 +0200 Subject: [PATCH] door,pool: handle multiple possible KafkaExceptions Motivation: KafaTemplate class might throw org.springframework.kafka.KafkaException as well as org.apache.kafka.common.KafkaException Thus KafkaTemplate#send should be wrapped ti try-catch block that handles both. Result: proper handling of multiple KafkaException. Acked-by: Lea Morschel Target: master, 9.2, 9.1, 9.0, 8.2 Require-book: no Require-notes: yes (cherry picked from commit 2ddd74a780a209504d1b38f62be43b40b6f10422) Signed-off-by: Tigran Mkrtchyan --- .../java/org/dcache/webdav/DcacheResourceFactory.java | 5 ++--- .../main/java/org/dcache/xrootd/door/XrootdDoor.java | 7 +++---- .../pool/classic/DefaultPostTransferService.java | 5 ++--- .../src/main/java/org/dcache/pool/classic/PoolV4.java | 5 ++--- .../dcache/pool/nearline/NearlineStorageHandler.java | 11 ++++------- .../src/main/java/org/dcache/util/Transfer.java | 5 ++--- 6 files changed, 15 insertions(+), 23 deletions(-) diff --git a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java index 214885f1919..379b104935f 100644 --- a/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java +++ b/modules/dcache-webdav/src/main/java/org/dcache/webdav/DcacheResourceFactory.java @@ -1231,9 +1231,8 @@ private void sendRemoveInfoToBilling(FileAttributes attributes, FsPath path) { try { _kafkaSender.accept(infoRemove); - } catch (KafkaException e) { - LOGGER.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } } diff --git a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java index f1cd19a23c3..864ce724506 100644 --- a/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java +++ b/modules/dcache-xrootd/src/main/java/org/dcache/xrootd/door/XrootdDoor.java @@ -1,6 +1,6 @@ /* dCache - http://www.dcache.org/ * - * Copyright (C) 2014 - 2022 Deutsches Elektronen-Synchrotron + * Copyright (C) 2014 - 2023 Deutsches Elektronen-Synchrotron * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as @@ -731,9 +731,8 @@ private void sendRemoveInfoToBilling(PnfsId pnfsId, FsPath path, Subject subject try { _kafkaSender.accept(infoRemove); - } catch (KafkaException e) { - _log.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + _log.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } } diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java index b733335e27a..c2283b485d2 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/DefaultPostTransferService.java @@ -157,9 +157,8 @@ private void sendBillingInfo(MoverInfoMessage moverInfoMessage) { try { _kafkaSender.accept(moverInfoMessage); - } catch (KafkaException e) { - LOGGER.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } } diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java b/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java index 81803450e7e..ad79519b36c 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java @@ -610,9 +610,8 @@ public void stateChanged(StateChangeEvent event) { try { _kafkaSender.accept(msg); - } catch (KafkaException e) { - LOGGER.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } } } diff --git a/modules/dcache/src/main/java/org/dcache/pool/nearline/NearlineStorageHandler.java b/modules/dcache/src/main/java/org/dcache/pool/nearline/NearlineStorageHandler.java index 921c67fa0b0..e2eb120f118 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/nearline/NearlineStorageHandler.java +++ b/modules/dcache/src/main/java/org/dcache/pool/nearline/NearlineStorageHandler.java @@ -1197,12 +1197,10 @@ private void done(@Nullable Throwable cause) { addFromNearlineStorage(infoMsg, storage); billingStub.notify(infoMsg); - try { _kafkaSender.accept(infoMsg); - } catch (KafkaException e) { - LOGGER.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } flushRequests.removeAndCallback(pnfsId, cause); } @@ -1435,9 +1433,8 @@ private void done(@Nullable Throwable cause) { billingStub.notify(infoMsg); try { _kafkaSender.accept(infoMsg); - } catch (KafkaException e) { - LOGGER.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } stageRequests.removeAndCallback(pnfsId, cause); } diff --git a/modules/dcache/src/main/java/org/dcache/util/Transfer.java b/modules/dcache/src/main/java/org/dcache/util/Transfer.java index 9827f31c811..a20d552da24 100644 --- a/modules/dcache/src/main/java/org/dcache/util/Transfer.java +++ b/modules/dcache/src/main/java/org/dcache/util/Transfer.java @@ -1192,9 +1192,8 @@ public synchronized void notifyBilling(int code, String error) { try { _kafkaSender.accept(msg); - } catch (KafkaException e) { - _log.warn(Throwables.getRootCause(e).getMessage()); - + } catch (KafkaException | org.apache.kafka.common.KafkaException e) { + _log.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage()); } }