From 22a891e778d84fb876e82cbac833df04924cda03 Mon Sep 17 00:00:00 2001 From: Shantanu Tiwari Date: Mon, 22 Jul 2024 15:44:09 +0530 Subject: [PATCH] Open Source DLM Initial Changes --- .editorconfig | 672 +++++++++ .github/workflows/sonarcloud-checks.yml | 36 + .gitignore | 47 + .gitlab-ci.yml | 63 + CODE_OF_CONDUCT.md | 46 + README.md | 134 ++ build.properties | 1 + lombok.config | 3 + pom.xml | 319 +++++ .../phonepe/dlm/DistributedLockManager.java | 136 ++ .../com/phonepe/dlm/common/Constants.java | 25 + .../phonepe/dlm/exception/DLMException.java | 59 + .../com/phonepe/dlm/exception/ErrorCode.java | 25 + .../java/com/phonepe/dlm/lock/ILockable.java | 92 ++ src/main/java/com/phonepe/dlm/lock/Lock.java | 38 + .../com/phonepe/dlm/lock/base/LockBase.java | 103 ++ .../com/phonepe/dlm/lock/level/LockLevel.java | 55 + .../com/phonepe/dlm/lock/mode/LockMode.java | 42 + .../phonepe/dlm/lock/storage/ILockStore.java | 29 + .../storage/aerospike/AerospikeStore.java | 140 ++ .../dlm/lock/storage/hbase/HBaseStore.java | 134 ++ .../com/phonepe/dlm/utils/AerospikeUtils.java | 46 + .../java/com/phonepe/dlm/utils/Timer.java | 31 + .../dlm/DistributedLockWithAerospikeTest.java | 252 ++++ .../dlm/DistributedLockWithHBaseTest.java | 246 ++++ .../phonepe/dlm/util/DLSExceptionMatcher.java | 52 + .../phonepe/dlm/util/HBaseConnectionStub.java | 100 ++ .../com/phonepe/dlm/util/HBaseTableStub.java | 1203 +++++++++++++++++ .../java/com/phonepe/dlm/util/TestUtils.java | 33 + 29 files changed, 4162 insertions(+) create mode 100644 .editorconfig create mode 100644 .github/workflows/sonarcloud-checks.yml create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 CODE_OF_CONDUCT.md create mode 100644 README.md create mode 100644 build.properties create mode 100644 lombok.config create mode 100644 pom.xml create mode 100644 src/main/java/com/phonepe/dlm/DistributedLockManager.java create mode 100644 src/main/java/com/phonepe/dlm/common/Constants.java create mode 100644 src/main/java/com/phonepe/dlm/exception/DLMException.java create mode 100644 src/main/java/com/phonepe/dlm/exception/ErrorCode.java create mode 100644 src/main/java/com/phonepe/dlm/lock/ILockable.java create mode 100644 src/main/java/com/phonepe/dlm/lock/Lock.java create mode 100644 src/main/java/com/phonepe/dlm/lock/base/LockBase.java create mode 100644 src/main/java/com/phonepe/dlm/lock/level/LockLevel.java create mode 100644 src/main/java/com/phonepe/dlm/lock/mode/LockMode.java create mode 100644 src/main/java/com/phonepe/dlm/lock/storage/ILockStore.java create mode 100644 src/main/java/com/phonepe/dlm/lock/storage/aerospike/AerospikeStore.java create mode 100644 src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java create mode 100644 src/main/java/com/phonepe/dlm/utils/AerospikeUtils.java create mode 100644 src/main/java/com/phonepe/dlm/utils/Timer.java create mode 100644 src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java create mode 100644 src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java create mode 100644 src/test/java/com/phonepe/dlm/util/DLSExceptionMatcher.java create mode 100644 src/test/java/com/phonepe/dlm/util/HBaseConnectionStub.java create mode 100644 src/test/java/com/phonepe/dlm/util/HBaseTableStub.java create mode 100644 src/test/java/com/phonepe/dlm/util/TestUtils.java diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..4e881d4 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,672 @@ +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 +indent_style = space +insert_final_newline = false +max_line_length = 120 +tab_width = 4 +ij_continuation_indent_size = 8 +ij_formatter_off_tag = @formatter:off +ij_formatter_on_tag = @formatter:on +ij_formatter_tags_enabled = true +ij_smart_tabs = false +ij_visual_guides = +ij_wrap_on_typing = false + +[*.java] +ij_java_align_consecutive_assignments = false +ij_java_align_consecutive_variable_declarations = false +ij_java_align_group_field_declarations = false +ij_java_align_multiline_annotation_parameters = false +ij_java_align_multiline_array_initializer_expression = false +ij_java_align_multiline_assignment = false +ij_java_align_multiline_binary_operation = false +ij_java_align_multiline_chained_methods = false +ij_java_align_multiline_deconstruction_list_components = true +ij_java_align_multiline_extends_list = false +ij_java_align_multiline_for = true +ij_java_align_multiline_method_parentheses = false +ij_java_align_multiline_parameters = true +ij_java_align_multiline_parameters_in_calls = false +ij_java_align_multiline_parenthesized_expression = false +ij_java_align_multiline_records = true +ij_java_align_multiline_resources = true +ij_java_align_multiline_ternary_operation = false +ij_java_align_multiline_text_blocks = false +ij_java_align_multiline_throws_list = false +ij_java_align_subsequent_simple_methods = false +ij_java_align_throws_keyword = false +ij_java_align_types_in_multi_catch = true +ij_java_annotation_parameter_wrap = off +ij_java_array_initializer_new_line_after_left_brace = false +ij_java_array_initializer_right_brace_on_new_line = false +ij_java_array_initializer_wrap = off +ij_java_assert_statement_colon_on_next_line = false +ij_java_assert_statement_wrap = off +ij_java_assignment_wrap = off +ij_java_binary_operation_sign_on_next_line = false +ij_java_binary_operation_wrap = off +ij_java_blank_lines_after_anonymous_class_header = 0 +ij_java_blank_lines_after_class_header = 0 +ij_java_blank_lines_after_imports = 1 +ij_java_blank_lines_after_package = 1 +ij_java_blank_lines_around_class = 1 +ij_java_blank_lines_around_field = 0 +ij_java_blank_lines_around_field_in_interface = 0 +ij_java_blank_lines_around_initializer = 1 +ij_java_blank_lines_around_method = 1 +ij_java_blank_lines_around_method_in_interface = 1 +ij_java_blank_lines_before_class_end = 0 +ij_java_blank_lines_before_imports = 1 +ij_java_blank_lines_before_method_body = 0 +ij_java_blank_lines_before_package = 0 +ij_java_block_brace_style = end_of_line +ij_java_block_comment_add_space = false +ij_java_block_comment_at_first_column = true +ij_java_builder_methods = +ij_java_call_parameters_new_line_after_left_paren = false +ij_java_call_parameters_right_paren_on_new_line = false +ij_java_call_parameters_wrap = off +ij_java_case_statement_on_separate_line = true +ij_java_catch_on_new_line = false +ij_java_class_annotation_wrap = split_into_lines +ij_java_class_brace_style = end_of_line +ij_java_class_count_to_use_import_on_demand = 5 +ij_java_class_names_in_javadoc = 1 +ij_java_deconstruction_list_wrap = normal +ij_java_do_not_indent_top_level_class_members = false +ij_java_do_not_wrap_after_single_annotation = false +ij_java_do_not_wrap_after_single_annotation_in_parameter = false +ij_java_do_while_brace_force = never +ij_java_doc_add_blank_line_after_description = true +ij_java_doc_add_blank_line_after_param_comments = false +ij_java_doc_add_blank_line_after_return = false +ij_java_doc_add_p_tag_on_empty_lines = true +ij_java_doc_align_exception_comments = true +ij_java_doc_align_param_comments = true +ij_java_doc_do_not_wrap_if_one_line = false +ij_java_doc_enable_formatting = true +ij_java_doc_enable_leading_asterisks = true +ij_java_doc_indent_on_continuation = false +ij_java_doc_keep_empty_lines = true +ij_java_doc_keep_empty_parameter_tag = true +ij_java_doc_keep_empty_return_tag = true +ij_java_doc_keep_empty_throws_tag = true +ij_java_doc_keep_invalid_tags = true +ij_java_doc_param_description_on_new_line = false +ij_java_doc_preserve_line_breaks = false +ij_java_doc_use_throws_not_exception_tag = true +ij_java_else_on_new_line = false +ij_java_enum_constants_wrap = off +ij_java_extends_keyword_wrap = off +ij_java_extends_list_wrap = off +ij_java_field_annotation_wrap = split_into_lines +ij_java_field_name_prefix = +ij_java_field_name_suffix = +ij_java_finally_on_new_line = false +ij_java_for_brace_force = never +ij_java_for_statement_new_line_after_left_paren = false +ij_java_for_statement_right_paren_on_new_line = false +ij_java_for_statement_wrap = off +ij_java_generate_final_locals = false +ij_java_generate_final_parameters = false +ij_java_if_brace_force = never +ij_java_imports_layout = *,|,javax.**,java.**,|,$* +ij_java_indent_case_from_switch = true +ij_java_insert_inner_class_imports = false +ij_java_insert_override_annotation = true +ij_java_keep_blank_lines_before_right_brace = 2 +ij_java_keep_blank_lines_between_package_declaration_and_header = 2 +ij_java_keep_blank_lines_in_code = 2 +ij_java_keep_blank_lines_in_declarations = 2 +ij_java_keep_builder_methods_indents = false +ij_java_keep_control_statement_in_one_line = true +ij_java_keep_first_column_comment = true +ij_java_keep_indents_on_empty_lines = false +ij_java_keep_line_breaks = true +ij_java_keep_multiple_expressions_in_one_line = false +ij_java_keep_simple_blocks_in_one_line = false +ij_java_keep_simple_classes_in_one_line = false +ij_java_keep_simple_lambdas_in_one_line = false +ij_java_keep_simple_methods_in_one_line = false +ij_java_label_indent_absolute = false +ij_java_label_indent_size = 0 +ij_java_lambda_brace_style = end_of_line +ij_java_layout_static_imports_separately = true +ij_java_line_comment_add_space = false +ij_java_line_comment_add_space_on_reformat = false +ij_java_line_comment_at_first_column = true +ij_java_local_variable_name_prefix = +ij_java_local_variable_name_suffix = +ij_java_method_annotation_wrap = split_into_lines +ij_java_method_brace_style = end_of_line +ij_java_method_call_chain_wrap = off +ij_java_method_parameters_new_line_after_left_paren = false +ij_java_method_parameters_right_paren_on_new_line = false +ij_java_method_parameters_wrap = off +ij_java_modifier_list_wrap = false +ij_java_multi_catch_types_wrap = normal +ij_java_names_count_to_use_import_on_demand = 3 +ij_java_new_line_after_lparen_in_annotation = false +ij_java_new_line_after_lparen_in_deconstruction_pattern = true +ij_java_new_line_after_lparen_in_record_header = false +ij_java_packages_to_use_import_on_demand = java.awt.*,javax.swing.* +ij_java_parameter_annotation_wrap = off +ij_java_parameter_name_prefix = +ij_java_parameter_name_suffix = +ij_java_parentheses_expression_new_line_after_left_paren = false +ij_java_parentheses_expression_right_paren_on_new_line = false +ij_java_place_assignment_sign_on_next_line = false +ij_java_prefer_longer_names = true +ij_java_prefer_parameters_wrap = false +ij_java_record_components_wrap = normal +ij_java_repeat_annotations = +ij_java_repeat_synchronized = true +ij_java_replace_instanceof_and_cast = false +ij_java_replace_null_check = true +ij_java_replace_sum_lambda_with_method_ref = true +ij_java_resource_list_new_line_after_left_paren = false +ij_java_resource_list_right_paren_on_new_line = false +ij_java_resource_list_wrap = off +ij_java_rparen_on_new_line_in_annotation = false +ij_java_rparen_on_new_line_in_deconstruction_pattern = true +ij_java_rparen_on_new_line_in_record_header = false +ij_java_space_after_closing_angle_bracket_in_type_argument = false +ij_java_space_after_colon = true +ij_java_space_after_comma = true +ij_java_space_after_comma_in_type_arguments = true +ij_java_space_after_for_semicolon = true +ij_java_space_after_quest = true +ij_java_space_after_type_cast = true +ij_java_space_before_annotation_array_initializer_left_brace = false +ij_java_space_before_annotation_parameter_list = false +ij_java_space_before_array_initializer_left_brace = false +ij_java_space_before_catch_keyword = true +ij_java_space_before_catch_left_brace = true +ij_java_space_before_catch_parentheses = true +ij_java_space_before_class_left_brace = true +ij_java_space_before_colon = true +ij_java_space_before_colon_in_foreach = true +ij_java_space_before_comma = false +ij_java_space_before_deconstruction_list = false +ij_java_space_before_do_left_brace = true +ij_java_space_before_else_keyword = true +ij_java_space_before_else_left_brace = true +ij_java_space_before_finally_keyword = true +ij_java_space_before_finally_left_brace = true +ij_java_space_before_for_left_brace = true +ij_java_space_before_for_parentheses = true +ij_java_space_before_for_semicolon = false +ij_java_space_before_if_left_brace = true +ij_java_space_before_if_parentheses = true +ij_java_space_before_method_call_parentheses = false +ij_java_space_before_method_left_brace = true +ij_java_space_before_method_parentheses = false +ij_java_space_before_opening_angle_bracket_in_type_parameter = false +ij_java_space_before_quest = true +ij_java_space_before_switch_left_brace = true +ij_java_space_before_switch_parentheses = true +ij_java_space_before_synchronized_left_brace = true +ij_java_space_before_synchronized_parentheses = true +ij_java_space_before_try_left_brace = true +ij_java_space_before_try_parentheses = true +ij_java_space_before_type_parameter_list = false +ij_java_space_before_while_keyword = true +ij_java_space_before_while_left_brace = true +ij_java_space_before_while_parentheses = true +ij_java_space_inside_one_line_enum_braces = false +ij_java_space_within_empty_array_initializer_braces = false +ij_java_space_within_empty_method_call_parentheses = false +ij_java_space_within_empty_method_parentheses = false +ij_java_spaces_around_additive_operators = true +ij_java_spaces_around_annotation_eq = true +ij_java_spaces_around_assignment_operators = true +ij_java_spaces_around_bitwise_operators = true +ij_java_spaces_around_equality_operators = true +ij_java_spaces_around_lambda_arrow = true +ij_java_spaces_around_logical_operators = true +ij_java_spaces_around_method_ref_dbl_colon = false +ij_java_spaces_around_multiplicative_operators = true +ij_java_spaces_around_relational_operators = true +ij_java_spaces_around_shift_operators = true +ij_java_spaces_around_type_bounds_in_type_parameters = true +ij_java_spaces_around_unary_operator = false +ij_java_spaces_within_angle_brackets = false +ij_java_spaces_within_annotation_parentheses = false +ij_java_spaces_within_array_initializer_braces = false +ij_java_spaces_within_braces = false +ij_java_spaces_within_brackets = false +ij_java_spaces_within_cast_parentheses = false +ij_java_spaces_within_catch_parentheses = false +ij_java_spaces_within_deconstruction_list = false +ij_java_spaces_within_for_parentheses = false +ij_java_spaces_within_if_parentheses = false +ij_java_spaces_within_method_call_parentheses = false +ij_java_spaces_within_method_parentheses = false +ij_java_spaces_within_parentheses = false +ij_java_spaces_within_record_header = false +ij_java_spaces_within_switch_parentheses = false +ij_java_spaces_within_synchronized_parentheses = false +ij_java_spaces_within_try_parentheses = false +ij_java_spaces_within_while_parentheses = false +ij_java_special_else_if_treatment = true +ij_java_static_field_name_prefix = +ij_java_static_field_name_suffix = +ij_java_subclass_name_prefix = +ij_java_subclass_name_suffix = Impl +ij_java_ternary_operation_signs_on_next_line = false +ij_java_ternary_operation_wrap = off +ij_java_test_name_prefix = +ij_java_test_name_suffix = Test +ij_java_throws_keyword_wrap = off +ij_java_throws_list_wrap = off +ij_java_use_external_annotations = false +ij_java_use_fq_class_names = false +ij_java_use_relative_indents = false +ij_java_use_single_class_imports = true +ij_java_variable_annotation_wrap = off +ij_java_visibility = public +ij_java_while_brace_force = never +ij_java_while_on_new_line = false +ij_java_wrap_comments = false +ij_java_wrap_first_method_in_call_chain = false +ij_java_wrap_long_lines = false + +[*.properties] +ij_properties_align_group_field_declarations = false +ij_properties_keep_blank_lines = false +ij_properties_key_value_delimiter = equals +ij_properties_spaces_around_key_value_delimiter = false + +[.editorconfig] +ij_editorconfig_align_group_field_declarations = false +ij_editorconfig_space_after_colon = false +ij_editorconfig_space_after_comma = true +ij_editorconfig_space_before_colon = false +ij_editorconfig_space_before_comma = false +ij_editorconfig_spaces_around_assignment_operators = true + +[{*.ant,*.fxml,*.jhm,*.jnlp,*.jrxml,*.jspx,*.pom,*.rng,*.tagx,*.tld,*.wsdl,*.xml,*.xsd,*.xsl,*.xslt,*.xul}] +ij_xml_align_attributes = true +ij_xml_align_text = false +ij_xml_attribute_wrap = normal +ij_xml_block_comment_add_space = false +ij_xml_block_comment_at_first_column = true +ij_xml_keep_blank_lines = 2 +ij_xml_keep_indents_on_empty_lines = false +ij_xml_keep_line_breaks = true +ij_xml_keep_line_breaks_in_text = true +ij_xml_keep_whitespaces = false +ij_xml_keep_whitespaces_around_cdata = preserve +ij_xml_keep_whitespaces_inside_cdata = false +ij_xml_line_comment_at_first_column = true +ij_xml_space_after_tag_name = false +ij_xml_space_around_equals_in_attribute = false +ij_xml_space_inside_empty_tag = false +ij_xml_text_wrap = normal +ij_xml_use_custom_settings = false + +[{*.bash,*.sh,*.zsh}] +indent_size = 2 +tab_width = 2 +ij_shell_binary_ops_start_line = false +ij_shell_keep_column_alignment_padding = false +ij_shell_minify_program = false +ij_shell_redirect_followed_by_space = false +ij_shell_switch_cases_indented = false +ij_shell_use_unix_line_separator = true + +[{*.gant,*.groovy,*.gy}] +ij_groovy_align_group_field_declarations = false +ij_groovy_align_multiline_array_initializer_expression = false +ij_groovy_align_multiline_assignment = false +ij_groovy_align_multiline_binary_operation = false +ij_groovy_align_multiline_chained_methods = false +ij_groovy_align_multiline_extends_list = false +ij_groovy_align_multiline_for = true +ij_groovy_align_multiline_list_or_map = true +ij_groovy_align_multiline_method_parentheses = false +ij_groovy_align_multiline_parameters = true +ij_groovy_align_multiline_parameters_in_calls = false +ij_groovy_align_multiline_resources = true +ij_groovy_align_multiline_ternary_operation = false +ij_groovy_align_multiline_throws_list = false +ij_groovy_align_named_args_in_map = true +ij_groovy_align_throws_keyword = false +ij_groovy_array_initializer_new_line_after_left_brace = false +ij_groovy_array_initializer_right_brace_on_new_line = false +ij_groovy_array_initializer_wrap = off +ij_groovy_assert_statement_wrap = off +ij_groovy_assignment_wrap = off +ij_groovy_binary_operation_wrap = off +ij_groovy_blank_lines_after_class_header = 0 +ij_groovy_blank_lines_after_imports = 1 +ij_groovy_blank_lines_after_package = 1 +ij_groovy_blank_lines_around_class = 1 +ij_groovy_blank_lines_around_field = 0 +ij_groovy_blank_lines_around_field_in_interface = 0 +ij_groovy_blank_lines_around_method = 1 +ij_groovy_blank_lines_around_method_in_interface = 1 +ij_groovy_blank_lines_before_imports = 1 +ij_groovy_blank_lines_before_method_body = 0 +ij_groovy_blank_lines_before_package = 0 +ij_groovy_block_brace_style = end_of_line +ij_groovy_block_comment_add_space = false +ij_groovy_block_comment_at_first_column = true +ij_groovy_call_parameters_new_line_after_left_paren = false +ij_groovy_call_parameters_right_paren_on_new_line = false +ij_groovy_call_parameters_wrap = off +ij_groovy_catch_on_new_line = false +ij_groovy_class_annotation_wrap = split_into_lines +ij_groovy_class_brace_style = end_of_line +ij_groovy_class_count_to_use_import_on_demand = 5 +ij_groovy_do_while_brace_force = never +ij_groovy_else_on_new_line = false +ij_groovy_enable_groovydoc_formatting = true +ij_groovy_enum_constants_wrap = off +ij_groovy_extends_keyword_wrap = off +ij_groovy_extends_list_wrap = off +ij_groovy_field_annotation_wrap = split_into_lines +ij_groovy_finally_on_new_line = false +ij_groovy_for_brace_force = never +ij_groovy_for_statement_new_line_after_left_paren = false +ij_groovy_for_statement_right_paren_on_new_line = false +ij_groovy_for_statement_wrap = off +ij_groovy_ginq_general_clause_wrap_policy = 2 +ij_groovy_ginq_having_wrap_policy = 1 +ij_groovy_ginq_indent_having_clause = true +ij_groovy_ginq_indent_on_clause = true +ij_groovy_ginq_on_wrap_policy = 1 +ij_groovy_ginq_space_after_keyword = true +ij_groovy_if_brace_force = never +ij_groovy_import_annotation_wrap = 2 +ij_groovy_imports_layout = *,|,javax.**,java.**,|,$* +ij_groovy_indent_case_from_switch = true +ij_groovy_indent_label_blocks = true +ij_groovy_insert_inner_class_imports = false +ij_groovy_keep_blank_lines_before_right_brace = 2 +ij_groovy_keep_blank_lines_in_code = 2 +ij_groovy_keep_blank_lines_in_declarations = 2 +ij_groovy_keep_control_statement_in_one_line = true +ij_groovy_keep_first_column_comment = true +ij_groovy_keep_indents_on_empty_lines = false +ij_groovy_keep_line_breaks = true +ij_groovy_keep_multiple_expressions_in_one_line = false +ij_groovy_keep_simple_blocks_in_one_line = false +ij_groovy_keep_simple_classes_in_one_line = true +ij_groovy_keep_simple_lambdas_in_one_line = true +ij_groovy_keep_simple_methods_in_one_line = true +ij_groovy_label_indent_absolute = false +ij_groovy_label_indent_size = 0 +ij_groovy_lambda_brace_style = end_of_line +ij_groovy_layout_static_imports_separately = true +ij_groovy_line_comment_add_space = false +ij_groovy_line_comment_add_space_on_reformat = false +ij_groovy_line_comment_at_first_column = true +ij_groovy_method_annotation_wrap = split_into_lines +ij_groovy_method_brace_style = end_of_line +ij_groovy_method_call_chain_wrap = off +ij_groovy_method_parameters_new_line_after_left_paren = false +ij_groovy_method_parameters_right_paren_on_new_line = false +ij_groovy_method_parameters_wrap = off +ij_groovy_modifier_list_wrap = false +ij_groovy_names_count_to_use_import_on_demand = 3 +ij_groovy_packages_to_use_import_on_demand = java.awt.*,javax.swing.* +ij_groovy_parameter_annotation_wrap = off +ij_groovy_parentheses_expression_new_line_after_left_paren = false +ij_groovy_parentheses_expression_right_paren_on_new_line = false +ij_groovy_prefer_parameters_wrap = false +ij_groovy_resource_list_new_line_after_left_paren = false +ij_groovy_resource_list_right_paren_on_new_line = false +ij_groovy_resource_list_wrap = off +ij_groovy_space_after_assert_separator = true +ij_groovy_space_after_colon = true +ij_groovy_space_after_comma = true +ij_groovy_space_after_comma_in_type_arguments = true +ij_groovy_space_after_for_semicolon = true +ij_groovy_space_after_quest = true +ij_groovy_space_after_type_cast = true +ij_groovy_space_before_annotation_parameter_list = false +ij_groovy_space_before_array_initializer_left_brace = false +ij_groovy_space_before_assert_separator = false +ij_groovy_space_before_catch_keyword = true +ij_groovy_space_before_catch_left_brace = true +ij_groovy_space_before_catch_parentheses = true +ij_groovy_space_before_class_left_brace = true +ij_groovy_space_before_closure_left_brace = true +ij_groovy_space_before_colon = true +ij_groovy_space_before_comma = false +ij_groovy_space_before_do_left_brace = true +ij_groovy_space_before_else_keyword = true +ij_groovy_space_before_else_left_brace = true +ij_groovy_space_before_finally_keyword = true +ij_groovy_space_before_finally_left_brace = true +ij_groovy_space_before_for_left_brace = true +ij_groovy_space_before_for_parentheses = true +ij_groovy_space_before_for_semicolon = false +ij_groovy_space_before_if_left_brace = true +ij_groovy_space_before_if_parentheses = true +ij_groovy_space_before_method_call_parentheses = false +ij_groovy_space_before_method_left_brace = true +ij_groovy_space_before_method_parentheses = false +ij_groovy_space_before_quest = true +ij_groovy_space_before_record_parentheses = false +ij_groovy_space_before_switch_left_brace = true +ij_groovy_space_before_switch_parentheses = true +ij_groovy_space_before_synchronized_left_brace = true +ij_groovy_space_before_synchronized_parentheses = true +ij_groovy_space_before_try_left_brace = true +ij_groovy_space_before_try_parentheses = true +ij_groovy_space_before_while_keyword = true +ij_groovy_space_before_while_left_brace = true +ij_groovy_space_before_while_parentheses = true +ij_groovy_space_in_named_argument = true +ij_groovy_space_in_named_argument_before_colon = false +ij_groovy_space_within_empty_array_initializer_braces = false +ij_groovy_space_within_empty_method_call_parentheses = false +ij_groovy_spaces_around_additive_operators = true +ij_groovy_spaces_around_assignment_operators = true +ij_groovy_spaces_around_bitwise_operators = true +ij_groovy_spaces_around_equality_operators = true +ij_groovy_spaces_around_lambda_arrow = true +ij_groovy_spaces_around_logical_operators = true +ij_groovy_spaces_around_multiplicative_operators = true +ij_groovy_spaces_around_regex_operators = true +ij_groovy_spaces_around_relational_operators = true +ij_groovy_spaces_around_shift_operators = true +ij_groovy_spaces_within_annotation_parentheses = false +ij_groovy_spaces_within_array_initializer_braces = false +ij_groovy_spaces_within_braces = true +ij_groovy_spaces_within_brackets = false +ij_groovy_spaces_within_cast_parentheses = false +ij_groovy_spaces_within_catch_parentheses = false +ij_groovy_spaces_within_for_parentheses = false +ij_groovy_spaces_within_gstring_injection_braces = false +ij_groovy_spaces_within_if_parentheses = false +ij_groovy_spaces_within_list_or_map = false +ij_groovy_spaces_within_method_call_parentheses = false +ij_groovy_spaces_within_method_parentheses = false +ij_groovy_spaces_within_parentheses = false +ij_groovy_spaces_within_switch_parentheses = false +ij_groovy_spaces_within_synchronized_parentheses = false +ij_groovy_spaces_within_try_parentheses = false +ij_groovy_spaces_within_tuple_expression = false +ij_groovy_spaces_within_while_parentheses = false +ij_groovy_special_else_if_treatment = true +ij_groovy_ternary_operation_wrap = off +ij_groovy_throws_keyword_wrap = off +ij_groovy_throws_list_wrap = off +ij_groovy_use_flying_geese_braces = false +ij_groovy_use_fq_class_names = false +ij_groovy_use_fq_class_names_in_javadoc = true +ij_groovy_use_relative_indents = false +ij_groovy_use_single_class_imports = true +ij_groovy_variable_annotation_wrap = off +ij_groovy_while_brace_force = never +ij_groovy_while_on_new_line = false +ij_groovy_wrap_chain_calls_after_dot = false +ij_groovy_wrap_long_lines = false + +[{*.har,*.json}] +indent_size = 2 +ij_json_array_wrapping = split_into_lines +ij_json_keep_blank_lines_in_code = 0 +ij_json_keep_indents_on_empty_lines = false +ij_json_keep_line_breaks = true +ij_json_keep_trailing_comma = false +ij_json_object_wrapping = split_into_lines +ij_json_property_alignment = do_not_align +ij_json_space_after_colon = true +ij_json_space_after_comma = true +ij_json_space_before_colon = false +ij_json_space_before_comma = false +ij_json_spaces_within_braces = false +ij_json_spaces_within_brackets = false +ij_json_wrap_long_lines = false + +[{*.htm,*.html,*.sht,*.shtm,*.shtml}] +ij_html_add_new_line_before_tags = body,div,p,form,h1,h2,h3 +ij_html_align_attributes = true +ij_html_align_text = false +ij_html_attribute_wrap = normal +ij_html_block_comment_add_space = false +ij_html_block_comment_at_first_column = true +ij_html_do_not_align_children_of_min_lines = 0 +ij_html_do_not_break_if_inline_tags = title,h1,h2,h3,h4,h5,h6,p +ij_html_do_not_indent_children_of_tags = html,body,thead,tbody,tfoot +ij_html_enforce_quotes = false +ij_html_inline_tags = a,abbr,acronym,b,basefont,bdo,big,br,cite,cite,code,dfn,em,font,i,img,input,kbd,label,q,s,samp,select,small,span,strike,strong,sub,sup,textarea,tt,u,var +ij_html_keep_blank_lines = 2 +ij_html_keep_indents_on_empty_lines = false +ij_html_keep_line_breaks = true +ij_html_keep_line_breaks_in_text = true +ij_html_keep_whitespaces = false +ij_html_keep_whitespaces_inside = span,pre,textarea +ij_html_line_comment_at_first_column = true +ij_html_new_line_after_last_attribute = never +ij_html_new_line_before_first_attribute = never +ij_html_quote_style = double +ij_html_remove_new_line_before_tags = br +ij_html_space_after_tag_name = false +ij_html_space_around_equality_in_attribute = false +ij_html_space_inside_empty_tag = false +ij_html_text_wrap = normal + +[{*.kt,*.kts}] +ij_kotlin_align_in_columns_case_branch = false +ij_kotlin_align_multiline_binary_operation = false +ij_kotlin_align_multiline_extends_list = false +ij_kotlin_align_multiline_method_parentheses = false +ij_kotlin_align_multiline_parameters = true +ij_kotlin_align_multiline_parameters_in_calls = false +ij_kotlin_allow_trailing_comma = false +ij_kotlin_allow_trailing_comma_on_call_site = false +ij_kotlin_assignment_wrap = off +ij_kotlin_blank_lines_after_class_header = 0 +ij_kotlin_blank_lines_around_block_when_branches = 0 +ij_kotlin_blank_lines_before_declaration_with_comment_or_annotation_on_separate_line = 1 +ij_kotlin_block_comment_add_space = false +ij_kotlin_block_comment_at_first_column = true +ij_kotlin_call_parameters_new_line_after_left_paren = false +ij_kotlin_call_parameters_right_paren_on_new_line = false +ij_kotlin_call_parameters_wrap = off +ij_kotlin_catch_on_new_line = false +ij_kotlin_class_annotation_wrap = split_into_lines +ij_kotlin_continuation_indent_for_chained_calls = true +ij_kotlin_continuation_indent_for_expression_bodies = true +ij_kotlin_continuation_indent_in_argument_lists = true +ij_kotlin_continuation_indent_in_elvis = true +ij_kotlin_continuation_indent_in_if_conditions = true +ij_kotlin_continuation_indent_in_parameter_lists = true +ij_kotlin_continuation_indent_in_supertype_lists = true +ij_kotlin_else_on_new_line = false +ij_kotlin_enum_constants_wrap = off +ij_kotlin_extends_list_wrap = off +ij_kotlin_field_annotation_wrap = split_into_lines +ij_kotlin_finally_on_new_line = false +ij_kotlin_if_rparen_on_new_line = false +ij_kotlin_import_nested_classes = false +ij_kotlin_imports_layout = *,java.**,javax.**,kotlin.**,^ +ij_kotlin_insert_whitespaces_in_simple_one_line_method = true +ij_kotlin_keep_blank_lines_before_right_brace = 2 +ij_kotlin_keep_blank_lines_in_code = 2 +ij_kotlin_keep_blank_lines_in_declarations = 2 +ij_kotlin_keep_first_column_comment = true +ij_kotlin_keep_indents_on_empty_lines = false +ij_kotlin_keep_line_breaks = true +ij_kotlin_lbrace_on_next_line = false +ij_kotlin_line_break_after_multiline_when_entry = true +ij_kotlin_line_comment_add_space = false +ij_kotlin_line_comment_add_space_on_reformat = false +ij_kotlin_line_comment_at_first_column = true +ij_kotlin_method_annotation_wrap = split_into_lines +ij_kotlin_method_call_chain_wrap = off +ij_kotlin_method_parameters_new_line_after_left_paren = false +ij_kotlin_method_parameters_right_paren_on_new_line = false +ij_kotlin_method_parameters_wrap = off +ij_kotlin_name_count_to_use_star_import = 5 +ij_kotlin_name_count_to_use_star_import_for_members = 3 +ij_kotlin_packages_to_use_import_on_demand = java.util.*,kotlinx.android.synthetic.**,io.ktor.** +ij_kotlin_parameter_annotation_wrap = off +ij_kotlin_space_after_comma = true +ij_kotlin_space_after_extend_colon = true +ij_kotlin_space_after_type_colon = true +ij_kotlin_space_before_catch_parentheses = true +ij_kotlin_space_before_comma = false +ij_kotlin_space_before_extend_colon = true +ij_kotlin_space_before_for_parentheses = true +ij_kotlin_space_before_if_parentheses = true +ij_kotlin_space_before_lambda_arrow = true +ij_kotlin_space_before_type_colon = false +ij_kotlin_space_before_when_parentheses = true +ij_kotlin_space_before_while_parentheses = true +ij_kotlin_spaces_around_additive_operators = true +ij_kotlin_spaces_around_assignment_operators = true +ij_kotlin_spaces_around_equality_operators = true +ij_kotlin_spaces_around_function_type_arrow = true +ij_kotlin_spaces_around_logical_operators = true +ij_kotlin_spaces_around_multiplicative_operators = true +ij_kotlin_spaces_around_range = false +ij_kotlin_spaces_around_relational_operators = true +ij_kotlin_spaces_around_unary_operator = false +ij_kotlin_spaces_around_when_arrow = true +ij_kotlin_variable_annotation_wrap = off +ij_kotlin_while_on_new_line = false +ij_kotlin_wrap_elvis_expressions = 1 +ij_kotlin_wrap_expression_body_functions = 0 +ij_kotlin_wrap_first_method_in_call_chain = false + +[{*.markdown,*.md}] +ij_markdown_force_one_space_after_blockquote_symbol = true +ij_markdown_force_one_space_after_header_symbol = true +ij_markdown_force_one_space_after_list_bullet = true +ij_markdown_force_one_space_between_words = true +ij_markdown_format_tables = true +ij_markdown_insert_quote_arrows_on_wrap = true +ij_markdown_keep_indents_on_empty_lines = false +ij_markdown_keep_line_breaks_inside_text_blocks = true +ij_markdown_max_lines_around_block_elements = 1 +ij_markdown_max_lines_around_header = 1 +ij_markdown_max_lines_between_paragraphs = 1 +ij_markdown_min_lines_around_block_elements = 1 +ij_markdown_min_lines_around_header = 1 +ij_markdown_min_lines_between_paragraphs = 1 +ij_markdown_wrap_text_if_long = true +ij_markdown_wrap_text_inside_blockquotes = true + +[{*.toml,Cargo.lock,Cargo.toml.orig,Gopkg.lock,Pipfile,poetry.lock}] +ij_toml_keep_indents_on_empty_lines = false + +[{*.yaml,*.yml}] +indent_size = 2 +ij_yaml_align_values_properties = do_not_align +ij_yaml_autoinsert_sequence_marker = true +ij_yaml_block_mapping_on_new_line = false +ij_yaml_indent_sequence_value = true +ij_yaml_keep_indents_on_empty_lines = false +ij_yaml_keep_line_breaks = true +ij_yaml_sequence_on_new_line = false +ij_yaml_space_before_colon = false +ij_yaml_spaces_within_braces = true +ij_yaml_spaces_within_brackets = true diff --git a/.github/workflows/sonarcloud-checks.yml b/.github/workflows/sonarcloud-checks.yml new file mode 100644 index 0000000..a19c7ef --- /dev/null +++ b/.github/workflows/sonarcloud-checks.yml @@ -0,0 +1,36 @@ +name: Build +on: + push: + branches: + - main + pull_request: + types: [opened, synchronize, reopened] +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis + - name: Set up JDK 17 + uses: actions/setup-java@v1 + with: + java-version: 17 + - name: Cache SonarCloud packages + uses: actions/cache@v1 + with: + path: ~/.sonar/cache + key: ${{ runner.os }}-sonar + restore-keys: ${{ runner.os }}-sonar + - name: Cache Maven packages + uses: actions/cache@v1 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + - name: Build and analyze + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + run: mvn -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=PhonePe_dlm diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fafaf4d --- /dev/null +++ b/.gitignore @@ -0,0 +1,47 @@ +### Java template +*.class + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear + +hs_err_pid* +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio + +## File-based project format: +*.ipr +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties + +## Directory-based project format: +/.idea/ +/target/ +benchmarking/.idea/ +benchmarking/target/ +.classpath +.project +.settings/ +.DS_Store +*.iml + +/bin/ \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..5215846 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,63 @@ +image: docker.phonepe.com:5000/pp-maven:3.8.4-openjdk-all + +.default: &default + only: + - merge_requests + - web + +.default_tag: &default_tag + tags: + - backend-docker-large + +stages: + - build + - quality + - deploy + - merge + - release + +build_package: + stage: build + <<: *default + <<: *default_tag + when: manual + allow_failure: false + script: + - mvn clean package -U -DskipTests=true + +merge_ready: + stage: quality + only: + - merge_requests + <<: *default_tag + script: + - mvn clean verify -U sonar:sonar -Pquality_check -Dsonar.pullrequest.key=$CI_MERGE_REQUEST_IID -Dsonar.pullrequest.branch=$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME -Dsonar.pullrequest.base=$CI_MERGE_REQUEST_TARGET_BRANCH_NAME + allow_failure: false + +deploy_snapshot: + stage: deploy + <<: *default_tag + <<: *default + only: + - web + allow_failure: false + script: + - mvn deploy -DskipTests=true -Pdocker -Dmaven.install.skip=true + when: manual + +release: + <<: *default_tag + stage: release + script: + - git remote set-url origin "git@gitlab.phonepe.com:${CI_PROJECT_PATH}.git" + - git checkout main + - git pull origin main + - git checkout develop + - git pull origin develop + - mvn -U jgitflow:release-start jgitflow:release-finish + - git push --all + - git push --tags origin + when: manual + only: + refs: + - develop \ No newline at end of file diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..c76dcae --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,46 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at [TBD]. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version] + +[homepage]: http://contributor-covenant.org +[version]: http://contributor-covenant.org/version/1/4/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5b9ccb6 --- /dev/null +++ b/README.md @@ -0,0 +1,134 @@ +# Distributed Lock Manager (DLM) + +Locking is a very common expectation in SoA, where a vulnerable entity needs to be protected for a certain duration. +And the definition of vulnerable entity changes from one client to another depending on the use-cases at hand. + +Distributed Lock Manager is an easy-to-use library to achieve various modes of locking, be it - Exclusive or Limited Protected(LP). +In current version, Exclusive locking mode is supported with Aerospike as the underlying storage base by leveraging +its MVCC (MultiVersion Concurrency Control) capability. + +## Add Maven Dependency + +```xml + + com.phonepe + distributed-lock-manager + 3.0.6 + +``` + + +### Usage + +#### Initializing Distributed Lock Manager + +##### With Aerospike as lock base + +``` java +DistributedLockManager lockManager = DistributedLockManager.builder() + .clientId("CLIENT_ID") + .farmId("FA1") + .lockBase(AerospikeLockBase.builder() + .mode(LockMode.EXCLUSIVE) + .store(AerospikeStore.builder() + .aerospikeClient(aerospikeClient) + .namespace("NAMESPACE") + .setSuffix("distributed_lock") + .build()) + .build()) + .build(); +lockManager.initialize(); +``` + +##### With HBase as lock base + +``` java +DistributedLockManager lockManager = DistributedLockManager.builder() + .clientId("CLIENT_ID") + .farmId("FA1") + .lockBase(HBaseLockBase.builder() + .mode(LockMode.EXCLUSIVE) + .store(HBaseStore.builder() + .connection(connection) // HBase connection reference + .tableName("table_name") + .build()) + .build()) + .build(); +lockManager.initialize(); +``` + +PS : For optimum performance, DO NOT pre-create the HBase table. Library will do it for you. + + +# Entity Lock Management + +This library offers various methods for acquiring and releasing locks on critical entities. Below are the available methods: + +1. **`tryAcquireLock(lock)`** + - Attempts immediate lock acquisition. Throws an exception if the lock is unavailable. Does not wait if the lock is currently held by another thread. The default lock duration is 90 seconds. + +2. **`tryAcquireLock(lock, duration)`** + - Immediate lock acquisition attempt with a specified duration. Throws an exception if the lock is unavailable. Does not wait if the lock is held by another thread. + +3. **`acquireLock(lock)`** + - Tries to acquire the lock. If the lock is held by another thread, it waits until the lock becomes available, blocking the thread. The default timeout is 90 seconds, and the lock duration defaults to 90 seconds. + +4. **`acquireLock(lock, duration)`** + - Similar to `acquireLock(lock)` but allows a specified duration for the lock. + +5. **`acquireLock(lock, duration, timeout)`** + - Attempts to acquire the lock and waits for a limited time for it to become available. If the lock is not acquired within the given time, it returns with a failure indication. This method blocks the thread until the lock is acquired. + +## Example Usage + +```java +// Representing a vulnerable entity by LOCK_ID +final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); +try { + lockManager.tryAcquireLock(lock); // Attempts to acquire the lock for the default duration of 90 seconds + // OR lockManager.tryAcquireLock(lock, 120); // Tries to acquire the lock for 120 seconds + + // Perform actions once the lock is successfully acquired. + +} catch (DLSException e) { + if (ErrorCode.LOCK_UNAVAILABLE.equals(e.getErrorCode)) { + // Actions to take if the lock can't be acquired. + } +} finally { + // Verify if the lock was released successfully. + boolean released = lockManager.release(lock); +} +``` + +```java +// Vulnerable entity represented by LOCK_ID +// Representing a vulnerable entity by LOCK_ID +final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); +try { + lockManager.acquireLock(lock); // Attempts to acquire the lock for the default duration of 90 seconds and waits for 90 seconds + // OR lockManager.acquireLock(lock, 30); // Tries to acquire the lock for 30 seconds, waiting for 90 seconds + // OR lockManager.acquireLock(lock, 30, 30); // Tries to acquire the lock for 30 seconds, waiting for 30 seconds + + // Perform actions once the lock is successfully acquired. +} catch (DLSException e) { + if (ErrorCode.LOCK_UNAVAILABLE.equals(e.getErrorCode)) { + // Actions to take if the lock can't be acquired. + } +} finally { + // Verify if the lock was released successfully. + boolean released = lockManager.release(lock); +} +``` + +#### Lock Levels +* DC - Acquiring/releasing lock within a DC +* XDC - Acquiring/releasing lock across DCs. + + +**Caution**: Concurrently obtaining a lock on the same entity using XDC across multiple data centers may result in unexpected behavior due to the replication of data between centers. Therefore, it is recommended to utilize DC locks whenever feasible. + +For XDC locks requiring strong consistency, opt for a multi-site Aerospike cluster. + +#### Notes + +A lock exists only within the scope of a Client represented by `CLIENT_ID`. diff --git a/build.properties b/build.properties new file mode 100644 index 0000000..be05629 --- /dev/null +++ b/build.properties @@ -0,0 +1 @@ +BUILD_PROJECT_VERSION=1.0.0 \ No newline at end of file diff --git a/lombok.config b/lombok.config new file mode 100644 index 0000000..9505a3c --- /dev/null +++ b/lombok.config @@ -0,0 +1,3 @@ +lombok.addLombokGeneratedAnnotation = true +lombok.anyConstructor.addConstructorProperties = true +config.stopBubbling = true \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..fbf7b79 --- /dev/null +++ b/pom.xml @@ -0,0 +1,319 @@ + + 4.0.0 + + com.phonepe + distributed-lock-manager + 3.0.7-SNAPSHOT + + https://github.com/PhonePe/DLM + Distributed Lock Manager + + + scm:git:https://github.com/PhonePe/DLM.git + scm:git:https://github.com/PhonePe/DLM.git + HEAD + https://github.com/PhonePe/DLM.git + + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + + + + + MISBMS + Mohammed Irfanulla S + mohammed.irfanulla.s1@gmail.com + + owner + developer + + + + tshan10 + Shantanu Tiwari + tshantanu510@gmail.com + + owner + developer + + + + + + GitHub Issues + https://github.com/PhonePe/DLM/issues + + + + Travis CI + https://travis-ci.org/PhonePe/DLM + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + + clojars.org + https://clojars.org/repo + + + + + 2.0.1 + 0.0.2 + 6.1.7 + 1.18.22 + 2.9.6 + 2.0.0 + + + 3.9.0 + true + false + 3.8.5 + 3.2.0 + 3.3.1 + 3.0.0-M5 + 0.8.7 + 1.6.7 + + + 4.3.1 + 1.0.6 + 5.7.0 + 4.1.1 + 4.12 + + + 17 + 17 + 17 + + + + + org.projectlombok + lombok + ${lombok.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.aerospike + aerospike-client + ${aerospike.client.version} + + + com.github.rholder + guava-retrying + ${guava.retrying.version} + + + org.apache.hbase + hbase-shaded-client + ${hbase.client.version} + + + com.sematext.hbase.ds + hbase-ds + ${hbase.ds.version} + + + com.google.protobuf + protobuf-java + 2.6.1 + test + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-inline + ${mockito.version} + + + org.mockito + mockito-core + ${mockito.version} + + + org.awaitility + awaitility + ${awaitility.version} + test + + + io.appform.testcontainer + junit-testcontainer-aerospike + ${junit.testcontainer.version} + test + + + com.aerospike + aerospike-client + + + io.dropwizard + * + + + + + net.java.dev.jna + jna + ${jna.version} + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven.compiler.plugin.version} + + ${jdk.source.version} + ${jdk.target.version} + ${jdk.release.version} + true + false + + + + org.apache.maven.plugins + maven-source-plugin + ${maven.source.plugin.version} + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + ${maven.javadoc.plugin.version} + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven.surefire.plugin.version} + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.7 + true + + ossrh + https://s01.oss.sonatype.org/ + true + + + + + + + + release + + + release + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + true + gpg2 + + + + + + + + + + coverage + + + + org.jacoco + jacoco-maven-plugin + ${jacoco.maven.plugin.version} + + + + prepare-agent + + + + report + test + + report + + + + + + + + + + + diff --git a/src/main/java/com/phonepe/dlm/DistributedLockManager.java b/src/main/java/com/phonepe/dlm/DistributedLockManager.java new file mode 100644 index 0000000..2953c37 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/DistributedLockManager.java @@ -0,0 +1,136 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm; + +import com.phonepe.dlm.exception.ErrorCode; +import com.phonepe.dlm.lock.Lock; +import com.phonepe.dlm.lock.base.LockBase; +import com.phonepe.dlm.lock.level.LockLevel; +import com.phonepe.dlm.exception.DLMException; +import lombok.AllArgsConstructor; +import lombok.Builder; + +@Builder +@AllArgsConstructor +public class DistributedLockManager { + private final String clientId; + private final String farmId; + private final LockBase lockBase; + + public void initialize() { + lockBase.getLockStore().initialize(); + } + + /** + * This method attempts to acquire the lock immediately and throws exception if lock is unavailable + * It does not wait if the lock is currently held by another thread. + *

+ * The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS} + * + * @param lock The lock to be acquired. + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired + */ + public void tryAcquireLock(final Lock lock) { + lockBase.tryAcquireLock(lock); + } + + /** + * This method attempts to acquire the lock immediately and throws exception if lock is unavailable + * It does not wait if the lock is currently held by another thread. + * + * @param lock The lock to be acquired + * @param duration The lock duration in seconds for which lock will be held + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired + */ + public void tryAcquireLock(final Lock lock, final int duration) { + lockBase.tryAcquireLock(lock, duration); + } + + /** + * This method tries to acquire the lock, and if the lock is currently held by another thread, + * it will wait until the lock becomes available. + * It blocks the thread until the lock is acquired. + *

+ * By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS} + * The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS} + * + * @param lock The lock to be acquired. + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout + */ + public void acquireLock(final Lock lock) { + lockBase.acquireLock(lock); + } + + /** + * This method tries to acquire the lock, and if the lock is currently held by another thread, + * it will wait until the lock becomes available. + * It blocks the thread until the lock is acquired. + *

+ * By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS} + * + * @param lock The lock to be acquired. + * @param duration The lock duration in seconds for which lock will be held + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout + */ + public void acquireLock(final Lock lock, final int duration) { + lockBase.acquireLock(lock, duration); + } + + /** + * This method attempts to acquire the lock and waits for a limited time for the lock to become available. + * If the lock is not acquired within the given time, it returns with a failure indication. + *

+ * It blocks the thread until the lock is acquired. + * + * @param lock The lock to be acquired. + * @param duration The lock duration in seconds for which lock will be held + * @param timeout The timeout(wait duration in seconds) for a lock to become available + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout + */ + public void acquireLock(final Lock lock, final int duration, final int timeout) { + lockBase.acquireLock(lock, duration, timeout); + } + + /** + * This method releases the acquired lock, allowing other threads to acquire it. + * + * @param lock The lock which is acquired + * @return true if the lock is successfully released, otherwise false + */ + public boolean releaseLock(final Lock lock) { + return lockBase.releaseLock(lock); + } + + /** + * Method to get lock instance + * + * @param lockId - Identifier on which lock needs to be acquired/released + * @param lockLevel - DC or XDC (Cross data center) + * @return LockInstance + */ + public Lock getLockInstance(final String lockId, final LockLevel lockLevel) { + return Lock.builder() + .lockId(String.format("%s#%s", clientId, lockId)) + .lockLevel(lockLevel) + .farmId(farmId) + .build(); + } + + public void destroy() { + lockBase.getLockStore().close(); + } +} diff --git a/src/main/java/com/phonepe/dlm/common/Constants.java b/src/main/java/com/phonepe/dlm/common/Constants.java new file mode 100644 index 0000000..067b3ea --- /dev/null +++ b/src/main/java/com/phonepe/dlm/common/Constants.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.common; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class Constants { + public static final String DELIMITER = "#"; +} \ No newline at end of file diff --git a/src/main/java/com/phonepe/dlm/exception/DLMException.java b/src/main/java/com/phonepe/dlm/exception/DLMException.java new file mode 100644 index 0000000..b2d1d23 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/exception/DLMException.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.exception; + +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +public class DLMException extends RuntimeException { + private static final long serialVersionUID = 7310153558992797589L; + private final ErrorCode errorCode; + + public DLMException(ErrorCode errorCode) { + super(); + this.errorCode = errorCode; + } + + @Builder + public DLMException(ErrorCode errorCode, String message, Throwable cause) { + super(message, cause); + this.errorCode = errorCode; + } + + public static DLMException propagate(final Throwable throwable) { + return propagate("Error occurred", throwable); + } + + public static DLMException propagate(final String message, final Throwable throwable) { + if (throwable instanceof DLMException) { + return (DLMException) throwable; + } else if (throwable.getCause() instanceof DLMException) { + return (DLMException) throwable.getCause(); + } + return DLMException.builder() + .errorCode(ErrorCode.INTERNAL_ERROR) + .message(message) + .cause(throwable) + .build(); + } + +} diff --git a/src/main/java/com/phonepe/dlm/exception/ErrorCode.java b/src/main/java/com/phonepe/dlm/exception/ErrorCode.java new file mode 100644 index 0000000..27630ba --- /dev/null +++ b/src/main/java/com/phonepe/dlm/exception/ErrorCode.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.exception; + +public enum ErrorCode { + CONNECTION_ERROR, + INTERNAL_ERROR, + RETRIES_EXHAUSTED, + LOCK_UNAVAILABLE, + TABLE_CREATION_ERROR +} diff --git a/src/main/java/com/phonepe/dlm/lock/ILockable.java b/src/main/java/com/phonepe/dlm/lock/ILockable.java new file mode 100644 index 0000000..fc742ce --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/ILockable.java @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock; + +import com.phonepe.dlm.exception.ErrorCode; +import com.phonepe.dlm.lock.base.LockBase; +import com.phonepe.dlm.exception.DLMException; + +public interface ILockable { + + /** + * This method attempts to acquire the lock immediately and throws exception if lock is unavailable + * It does not wait if the lock is currently held by another thread. + *

+ * The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS} + * + * @param lock The lock to be acquired. + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired + */ + void tryAcquireLock(final Lock lock); + + /** + * This method attempts to acquire the lock immediately and throws exception if lock is unavailable + * It does not wait if the lock is currently held by another thread. + * + * @param lock The lock to be acquired + * @param duration The lock duration in seconds for which lock will be held + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired + */ + void tryAcquireLock(final Lock lock, final int duration); + + /** + * This method tries to acquire the lock, and if the lock is currently held by another thread, + * it will wait until the lock becomes available. + * It blocks the thread until the lock is acquired. + *

+ * By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS} + * The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS} + * + * @param lock The lock to be acquired. + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout + */ + void acquireLock(final Lock lock); + + /** + * This method tries to acquire the lock, and if the lock is currently held by another thread, + * it will wait until the lock becomes available. + * It blocks the thread until the lock is acquired. + *

+ * By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS} + * + * @param lock The lock to be acquired. + * @param duration The lock duration in seconds for which lock will be held + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout + */ + void acquireLock(final Lock lock, final int duration); + + /** + * This method attempts to acquire the lock and waits for a limited time for the lock to become available. + * If the lock is not acquired within the given time, it returns with a failure indication. + *

+ * It blocks the thread until the lock is acquired. + * + * @param lock The lock to be acquired. + * @param duration The lock duration in seconds for which lock will be held + * @param timeout The timeout(wait duration in seconds) for a lock to become available + * @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout + */ + void acquireLock(final Lock lock, final int duration, final int timeout); + + /** + * This method releases the acquired lock, allowing other threads to acquire it. + * + * @param lock The lock which is acquired + * @return true if the lock is successfully released, otherwise false + */ + boolean releaseLock(final Lock lock); +} diff --git a/src/main/java/com/phonepe/dlm/lock/Lock.java b/src/main/java/com/phonepe/dlm/lock/Lock.java new file mode 100644 index 0000000..033c394 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/Lock.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock; + +import com.phonepe.dlm.lock.level.LockLevel; +import lombok.Builder; +import lombok.Getter; + +import java.util.concurrent.atomic.AtomicBoolean; + +@Getter +public class Lock { + private final String lockId; + private final LockLevel lockLevel; + private final String farmId; + private final AtomicBoolean acquiredStatus = new AtomicBoolean(); + + @Builder + public Lock(final String lockId, final LockLevel lockLevel, final String farmId) { + this.lockId = lockId; + this.lockLevel = lockLevel; + this.farmId = farmId; + } +} diff --git a/src/main/java/com/phonepe/dlm/lock/base/LockBase.java b/src/main/java/com/phonepe/dlm/lock/base/LockBase.java new file mode 100644 index 0000000..0208229 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/base/LockBase.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock.base; + +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; +import com.phonepe.dlm.lock.ILockable; +import com.phonepe.dlm.lock.Lock; +import com.phonepe.dlm.lock.mode.LockMode; +import com.phonepe.dlm.lock.storage.ILockStore; +import com.phonepe.dlm.utils.Timer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@AllArgsConstructor +@Builder +@Getter +public class LockBase implements ILockable { + public static final int DEFAULT_LOCK_TTL_SECONDS = 90; + public static final int DEFAULT_WAIT_FOR_LOCK_IN_SECONDS = 90; + public static final int WAIT_TIME_FOR_NEXT_RETRY = 1000; // 1 second + + private final ILockStore lockStore; + private final LockMode mode; // Not implemented now, but can be leveraged in the future. + + @Override + public void tryAcquireLock(final Lock lock) { + tryAcquireLock(lock, DEFAULT_LOCK_TTL_SECONDS); + } + + @Override + public void tryAcquireLock(final Lock lock, final int duration) { + writeToStore(lock, duration); + } + + @Override + public void acquireLock(final Lock lock) { + acquireLock(lock, DEFAULT_LOCK_TTL_SECONDS, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS); + } + + @Override + public void acquireLock(final Lock lock, final int duration) { + acquireLock(lock, duration, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS); + } + + @SneakyThrows + @Override + public void acquireLock(final Lock lock, final int duration, final int timeout) { + final Timer timer = new Timer(System.currentTimeMillis(), timeout); + final AtomicBoolean success = new AtomicBoolean(false); + do { + try { + writeToStore(lock, duration); + success.set(true); + } catch (DLMException e) { + if (timer.isExpired()) { + log.error("Lock wait time {}secs is over, lock is still not available", timeout); + throw e; + } + if (e.getErrorCode() == ErrorCode.LOCK_UNAVAILABLE) { + Thread.sleep(WAIT_TIME_FOR_NEXT_RETRY); + continue; + } + throw e; + } + } while (!success.get()); + } + + @Override + public boolean releaseLock(final Lock lock) { + if (lock.getAcquiredStatus().get()) { + lockStore.remove(lock.getLockId(), lock.getLockLevel(), lock.getFarmId()); + lock.getAcquiredStatus().compareAndSet(true, false); + return !lock.getAcquiredStatus().get(); + } + return false; + } + + private void writeToStore(final Lock lock, final int ttlSeconds) { + lockStore.write(lock.getLockId(), lock.getLockLevel(), lock.getFarmId(), ttlSeconds); + lock.getAcquiredStatus().compareAndSet(false, true); + } +} diff --git a/src/main/java/com/phonepe/dlm/lock/level/LockLevel.java b/src/main/java/com/phonepe/dlm/lock/level/LockLevel.java new file mode 100644 index 0000000..2e97347 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/level/LockLevel.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock.level; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author shantanu.tiwari + * Created on 29/12/21 + */ +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public enum LockLevel { + DC(LockLevel.DC_TEXT) { + @Override + public T accept(Visitor visitor) { + return visitor.visitDC(); + } + }, + XDC(LockLevel.XDC_TEXT) { + @Override + public T accept(Visitor visitor) { + return visitor.visitXDC(); + } + }; + + public static final String DC_TEXT = "DC"; + public static final String XDC_TEXT = "XDC"; + + @Getter + private final String value; + + public abstract T accept(Visitor visitor); + + public interface Visitor { + T visitDC(); + + T visitXDC(); + } +} diff --git a/src/main/java/com/phonepe/dlm/lock/mode/LockMode.java b/src/main/java/com/phonepe/dlm/lock/mode/LockMode.java new file mode 100644 index 0000000..9a81569 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/mode/LockMode.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock.mode; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public enum LockMode { + EXCLUSIVE(LockMode.EXCLUSIVE_TEXT) { + @Override + public T accept(Visitor visitor) { + return visitor.visitExclusive(); + } + }; + + static final String EXCLUSIVE_TEXT = "EXCLUSIVE"; + + @Getter + private final String value; + + public abstract T accept(Visitor visitor); + + public interface Visitor { + T visitExclusive(); + } +} diff --git a/src/main/java/com/phonepe/dlm/lock/storage/ILockStore.java b/src/main/java/com/phonepe/dlm/lock/storage/ILockStore.java new file mode 100644 index 0000000..e43c5b5 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/storage/ILockStore.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock.storage; + +import com.phonepe.dlm.lock.level.LockLevel; + +public interface ILockStore { + void initialize(); + + void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds); + + void remove(String lockId, LockLevel lockLevel, String farmId); + + void close(); +} diff --git a/src/main/java/com/phonepe/dlm/lock/storage/aerospike/AerospikeStore.java b/src/main/java/com/phonepe/dlm/lock/storage/aerospike/AerospikeStore.java new file mode 100644 index 0000000..dae6b48 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/storage/aerospike/AerospikeStore.java @@ -0,0 +1,140 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock.storage.aerospike; + +import com.aerospike.client.*; +import com.aerospike.client.policy.CommitLevel; +import com.aerospike.client.policy.GenerationPolicy; +import com.aerospike.client.policy.WritePolicy; +import com.github.rholder.retry.RetryException; +import com.phonepe.dlm.common.Constants; +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; +import com.phonepe.dlm.lock.level.LockLevel; +import com.phonepe.dlm.lock.storage.ILockStore; +import com.phonepe.dlm.utils.AerospikeUtils; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +@Data +@Builder +@AllArgsConstructor +public class AerospikeStore implements ILockStore { + private static final String DATA_BIN = "data"; + private static final String MODIFIED_AT_BIN = "uat"; + + private final IAerospikeClient aerospikeClient; + private final String namespace; + private final String setSuffix; + + @Override + public void initialize() { + // Nothing to initialise + } + + @Override + public void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds) { + final WritePolicy writePolicy = new WritePolicy(aerospikeClient.getWritePolicyDefault()); + writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; + writePolicy.generation = 0; + writePolicy.expiration = ttlSeconds; + writePolicy.commitLevel = CommitLevel.COMMIT_MASTER; // Committing to master only, as there is no read required so there is no chance of dirty reads. + try { + final List binList = new ArrayList<>(); + binList.add(new Bin(AerospikeUtils.getBin(DATA_BIN, farmId), 1)); + binList.add(new Bin(AerospikeUtils.getBin(MODIFIED_AT_BIN, farmId), System.currentTimeMillis())); + + AerospikeUtils.retryer.call(() -> { + write(lockId, lockLevel, farmId, writePolicy, binList); + return null; + }); + } catch (ExecutionException | RetryException e) { + if (e.getCause() instanceof DLMException) { + throw DLMException.propagate(e); + } + throw DLMException.builder() + .cause(e.getCause()) + .errorCode(ErrorCode.CONNECTION_ERROR) + .message(String.format("Error writing lock in aerospike [id = %s]", lockId)) + .build(); + } + } + + @Override + public void remove(String lockId, LockLevel lockLevel, String farmId) { + try { + AerospikeUtils.retryer.call(() -> + aerospikeClient.delete(aerospikeClient.getWritePolicyDefault(), + new Key(namespace, getSetName(lockLevel, farmId), lockId)) + ); + } catch (RetryException e) { + throw DLMException.builder() + .cause(e) + .errorCode(ErrorCode.RETRIES_EXHAUSTED) + .message(String.format("Error removing lock in aerospike [id = %s]", lockId)) + .build(); + } catch (ExecutionException e) { + throw DLMException.builder() + .cause(e) + .errorCode(ErrorCode.CONNECTION_ERROR) + .message(String.format("Error removing lock in aerospike [id = %s]", lockId)) + .build(); + } + } + + @Override + public void close() { + aerospikeClient.close(); + } + + private void write(final String lockId, final LockLevel lockLevel, final String farmId, + final WritePolicy writePolicy, final List binList) { + try { + aerospikeClient.put(writePolicy, + new Key(namespace, getSetName(lockLevel, farmId), lockId), + binList.toArray(new Bin[0])); + } catch (AerospikeException ae) { + if (ae.getResultCode() == ResultCode.GENERATION_ERROR) { + throw DLMException.builder() + .cause(ae.getCause()) + .errorCode(ErrorCode.LOCK_UNAVAILABLE) + .message(String.format("Error acquiring lock in aerospike [id = %s]", lockId)) + .build(); + } + throw ae; + } + } + + private String getSetName(final LockLevel lockLevel, final String farmId) { + return lockLevel.accept(new LockLevel.Visitor<>() { + @Override + public String visitDC() { + return String.join(Constants.DELIMITER, lockLevel.getValue(), farmId, setSuffix); + } + + @Override + public String visitXDC() { + return String.join(Constants.DELIMITER, lockLevel.getValue(), setSuffix); + } + }); + } +} diff --git a/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java b/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java new file mode 100644 index 0000000..2ecf121 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java @@ -0,0 +1,134 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.lock.storage.hbase; + +import com.phonepe.dlm.common.Constants; +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; +import com.phonepe.dlm.lock.level.LockLevel; +import com.phonepe.dlm.lock.storage.ILockStore; +import com.sematext.hbase.ds.AbstractRowKeyDistributor; +import com.sematext.hbase.ds.RowKeyDistributorByHashPrefix; +import com.sematext.hbase.ds.RowKeyDistributorByHashPrefix.Hasher; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; + +@Data +@Builder +@AllArgsConstructor +public class HBaseStore implements ILockStore { + private static final Hasher ONE_BYTE_HASHER = new RowKeyDistributorByHashPrefix.OneByteSimpleHash(256); + private static final AbstractRowKeyDistributor ROW_KEY_DISTRIBUTOR = new RowKeyDistributorByHashPrefix( + ONE_BYTE_HASHER); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("D"); + private static final byte[] COLUMN_NAME = Bytes.toBytes("L"); + private static final byte[] COLUMN_DATA = Bytes.toBytes("M"); + private Connection connection; + private String tableName; + + @Override + public void initialize() { + final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY) + .setCompressionType(Compression.Algorithm.GZ) + .setMaxVersions(1) + .build()) + .build(); + try { + if (!connection.getAdmin().tableExists(TableName.valueOf(Bytes.toBytes(tableName)))) { + connection.getAdmin() + .createTable(tableDescriptor, ONE_BYTE_HASHER.getAllPossiblePrefixes()); + } + } catch (Exception e) { + throw DLMException.builder() + .cause(e) + .errorCode(ErrorCode.TABLE_CREATION_ERROR) + .message(String.format("Could not create table: %s", tableName)) + .build(); + } + } + + @Override + public void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds) { + final byte[] normalisedRowKey = getNormalisedRowKey(lockId, lockLevel, farmId); + + try (final Table table = getTable()) { + final boolean result = table.checkAndMutate(normalisedRowKey, COLUMN_FAMILY) + .qualifier(COLUMN_NAME) + .ifNotExists() + .thenPut(new Put(normalisedRowKey, System.currentTimeMillis()).setTTL(ttlSeconds * 1_000L) + .addColumn(COLUMN_FAMILY, COLUMN_NAME, COLUMN_DATA)); + if (!result) { + throw DLMException.builder() + .errorCode(ErrorCode.LOCK_UNAVAILABLE) + .message(String.format("Error acquiring lock in HBase [id = %s]", lockId)) + .build(); + } + } catch (IOException e) { + throw DLMException.builder() + .cause(e) + .errorCode(ErrorCode.CONNECTION_ERROR) + .message(String.format("Error writing lock in HBase [id = %s]", lockId)) + .build(); + } + } + + @Override + public void remove(String lockId, LockLevel lockLevel, String farmId) { + try (final Table table = getTable()) { + table.delete(new Delete(getNormalisedRowKey(lockId, lockLevel, farmId))); + } catch (IOException e) { + throw DLMException.propagate(e); + } + } + + @Override + public void close() { + try { + connection.close(); + } catch (Exception e) { + throw DLMException.propagate(e); + } + } + + private Table getTable() throws IOException { + return connection.getTable(TableName.valueOf(tableName)); + } + + private byte[] getNormalisedRowKey(final String lockId, final LockLevel lockLevel, final String farmId) { + final String rowKey = lockLevel.accept(new LockLevel.Visitor<>() { + @Override + public String visitDC() { + return String.join(Constants.DELIMITER, lockLevel.getValue(), farmId, lockId); + } + + @Override + public String visitXDC() { + return String.join(Constants.DELIMITER, lockLevel.getValue(), lockId); + } + }); + return ROW_KEY_DISTRIBUTOR.getDistributedKey(Bytes.toBytes(rowKey)); + } + +} diff --git a/src/main/java/com/phonepe/dlm/utils/AerospikeUtils.java b/src/main/java/com/phonepe/dlm/utils/AerospikeUtils.java new file mode 100644 index 0000000..7bf0a97 --- /dev/null +++ b/src/main/java/com/phonepe/dlm/utils/AerospikeUtils.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.utils; + +import com.aerospike.client.AerospikeException; +import com.github.rholder.retry.*; +import lombok.experimental.UtilityClass; + +import java.util.concurrent.TimeUnit; + +/** + * @author shantanu.tiwari + * Created on 21/09/23 + */ +@UtilityClass +public class AerospikeUtils { + public static final String BIN_FARM_SEPARATOR = "##"; + public static final String BIN_FORMAT = "%s" + BIN_FARM_SEPARATOR + "%s"; + private static final int DEFAULT_SLEEP_TIME = 80; + private static final int DEFAULT_RETRY_ATTEMPTS = 5; + + public static final Retryer retryer = RetryerBuilder.newBuilder() + .retryIfExceptionOfType(AerospikeException.class) + .withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_ATTEMPTS)) + .withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_SLEEP_TIME, TimeUnit.MILLISECONDS)) + .withBlockStrategy(BlockStrategies.threadSleepStrategy()) + .build(); + + public String getBin(final String binSuffix, final String farm) { + return BIN_FORMAT.formatted(farm, binSuffix); + } +} diff --git a/src/main/java/com/phonepe/dlm/utils/Timer.java b/src/main/java/com/phonepe/dlm/utils/Timer.java new file mode 100644 index 0000000..b83b6bf --- /dev/null +++ b/src/main/java/com/phonepe/dlm/utils/Timer.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.utils; + +import lombok.AllArgsConstructor; +import lombok.Builder; + +@Builder +@AllArgsConstructor +public class Timer { + private final long startTime; + private final long duration; + + public boolean isExpired() { + return System.currentTimeMillis() - startTime > (duration * 1000); + } +} \ No newline at end of file diff --git a/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java b/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java new file mode 100644 index 0000000..1164f4b --- /dev/null +++ b/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java @@ -0,0 +1,252 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm; + +import com.aerospike.client.AerospikeClient; +import com.aerospike.client.Host; +import com.aerospike.client.policy.ClientPolicy; +import com.google.common.collect.Maps; +import com.phonepe.dlm.lock.Lock; +import com.phonepe.dlm.lock.base.LockBase; +import com.phonepe.dlm.lock.level.LockLevel; +import com.phonepe.dlm.lock.mode.LockMode; +import com.phonepe.dlm.lock.storage.aerospike.AerospikeStore; +import com.phonepe.dlm.util.TestUtils; +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; +import io.appform.testcontainers.aerospike.AerospikeContainerConfiguration; +import io.appform.testcontainers.aerospike.container.AerospikeContainer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** + * @author shantanu.tiwari + */ +public class DistributedLockWithAerospikeTest { + public static final String AEROSPIKE_HOST = "localhost"; + public static final String AEROSPIKE_DOCKER_IMAGE = "aerospike/aerospike-server:latest"; + public static final String AEROSPIKE_NAMESPACE = "DLM"; + public static final int AEROSPIKE_PORT = 3000; + private static final AerospikeContainer AEROSPIKE_DOCKER_CONTAINER; + + private DistributedLockManager lockManager; + public AerospikeClient aerospikeClient; + + static { + AerospikeContainerConfiguration aerospikeContainerConfig = new AerospikeContainerConfiguration( + true, AEROSPIKE_DOCKER_IMAGE, AEROSPIKE_NAMESPACE, AEROSPIKE_HOST, AEROSPIKE_PORT); + aerospikeContainerConfig.setWaitTimeoutInSeconds(300L); + AEROSPIKE_DOCKER_CONTAINER = new AerospikeContainer(aerospikeContainerConfig); + AEROSPIKE_DOCKER_CONTAINER.start(); + } + + @Before + public void setUp() { + aerospikeClient = new AerospikeClient(new ClientPolicy(), + new Host(AEROSPIKE_DOCKER_CONTAINER.getContainerIpAddress(), AEROSPIKE_DOCKER_CONTAINER.getConnectionPort())); + + lockManager = DistributedLockManager.builder() + .clientId("CLIENT_ID") + .farmId("FA1") + .lockBase(LockBase.builder() + .mode(LockMode.EXCLUSIVE) + .lockStore(AerospikeStore.builder() + .aerospikeClient(aerospikeClient) + .namespace(AEROSPIKE_NAMESPACE) + .setSuffix("distributed_lock") + .build()) + .build()) + .build(); + lockManager.initialize(); + } + + @After + public void destroy() { + lockManager.destroy(); + } + + @Test + public void lockTestPositiveSiloDC() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus() + .get()); + + boolean released = lockManager.releaseLock(lock); + Assert.assertTrue(released); + Assert.assertFalse(lock.getAcquiredStatus() + .get()); + + // Attempt to release it again + released = lockManager.releaseLock(lock); + Assert.assertFalse(released); + } + + @Test + public void lockTestPositiveXDC() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.XDC); + lockManager.tryAcquireLock(lock, 90); + Assert.assertTrue(lock.getAcquiredStatus() + .get()); + + boolean released = lockManager.releaseLock(lock); + Assert.assertTrue(released); + Assert.assertFalse(lock.getAcquiredStatus() + .get()); + + // Attempt to release it again + released = lockManager.releaseLock(lock); + Assert.assertFalse(released); + + } + + @Test + public void testAcquireLockWithWait() { + final Lock lock = lockManager.getLockInstance("NEW_LOCK_ID", LockLevel.DC); + lockManager.acquireLock(lock, 2); // Lock acquired for 1 seconds + Assert.assertTrue(lock.getAcquiredStatus().get()); + + try { + lockManager.tryAcquireLock(lock); // Try acquiring a lock and fail if lock is already acquired + } catch (DLMException e) { + Assert.assertEquals(ErrorCode.LOCK_UNAVAILABLE, e.getErrorCode()); + } + + lockManager.acquireLock(lock); // Wait and try acquiring the lock. + Assert.assertTrue(lock.getAcquiredStatus().get()); + + try { + lockManager.acquireLock(lock, 2, 2); // Wait for 2 seconds only for acquiring the lock + Assert.fail("Flow should not have reached here"); + } catch (DLMException e) { + Assert.assertEquals(ErrorCode.LOCK_UNAVAILABLE, e.getErrorCode()); // As it won't be released for next 90 secs default + } + } + + @Test(expected = DLMException.class) + public void lockTestNegative1() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus() + .get()); + lockManager.tryAcquireLock(lock); + } + + @Test(expected = DLMException.class) + public void lockTestNegative2() { + Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus() + .get()); + lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + } + + @Test(expected = DLMException.class) + public void lockTestNegative3() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus() + .get()); + final Lock lock1 = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock1); + } + + @Test + public void concurrentLockAttempt() throws InterruptedException { + final int attempts = Runtime.getRuntime() + .availableProcessors(); + final Map trackers = Maps.newConcurrentMap(); + final ExecutorService service = Executors.newFixedThreadPool(attempts); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(attempts); + + final List> futures = new ArrayList<>(); + for (int i = 0; i < attempts; i++) { + TestUtils.waitSometime(100, TimeUnit.MILLISECONDS); + futures.add(service.submit(() -> { + Lock lock = null; + try { + lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + if (lock.getAcquiredStatus() + .get()) { + trackers.computeIfAbsent("SUCCESSFUL_ACQUIRES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } + latch.await(); + } catch (DLMException e) { + trackers.computeIfAbsent("FAILED_ACQUIRES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } catch (Exception e) { + // ignore; + } finally { + boolean result = lockManager.releaseLock(lock); + Assert.assertFalse(lock.getAcquiredStatus() + .get()); + if (result) { + trackers.computeIfAbsent("SUCCESSFUL_RELEASES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } else { + trackers.computeIfAbsent("FAILED_RELEASES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } + } + })); + } + futures.parallelStream() + .forEach(future -> { + try { + future.get(); + if (counter.decrementAndGet() <= 1) { + latch.countDown(); + } + } catch (InterruptedException | ExecutionException e1) { + // ignore; + } + }); + + // Only one successful acquire / release of locks should take place + assertEquals(1, + trackers.getOrDefault("SUCCESSFUL_ACQUIRES", new AtomicInteger(0)) + .get()); + assertEquals(1, + trackers.getOrDefault("SUCCESSFUL_RELEASES", new AtomicInteger(0)) + .get()); + assertEquals(attempts - 1, + trackers.getOrDefault("FAILED_ACQUIRES", new AtomicInteger(0)) + .get()); + assertEquals(attempts - 1, + trackers.getOrDefault("FAILED_RELEASES", new AtomicInteger(0)) + .get()); + } + + @After + public void tearDown() { + aerospikeClient.truncate(aerospikeClient.getInfoPolicyDefault(), AEROSPIKE_NAMESPACE, null, null); + } +} \ No newline at end of file diff --git a/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java b/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java new file mode 100644 index 0000000..e918ed7 --- /dev/null +++ b/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java @@ -0,0 +1,246 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm; + +import com.google.common.collect.Maps; +import com.phonepe.dlm.lock.Lock; +import com.phonepe.dlm.lock.base.LockBase; +import com.phonepe.dlm.lock.level.LockLevel; +import com.phonepe.dlm.lock.mode.LockMode; +import com.phonepe.dlm.lock.storage.hbase.HBaseStore; +import com.phonepe.dlm.util.HBaseConnectionStub; +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; +import com.phonepe.dlm.util.DLSExceptionMatcher; +import com.phonepe.dlm.util.TestUtils; +import org.junit.*; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +@RunWith(MockitoJUnitRunner.class) +public class DistributedLockWithHBaseTest { + @Rule + public ExpectedException exceptionThrown = ExpectedException.none(); + + private DistributedLockManager lockManager; + private HBaseConnectionStub connection; + + @Before + public void setUp() { + connection = new HBaseConnectionStub(); + lockManager = DistributedLockManager.builder() + .clientId("CLIENT_ID") + .farmId("FA1") + .lockBase(LockBase.builder() + .mode(LockMode.EXCLUSIVE) + .lockStore(HBaseStore.builder() + .connection(connection) + .tableName("dlm_locks") + .build()) + .build()) + .build(); + lockManager.initialize(); + } + + @After + public void destroy() { + lockManager.destroy(); + } + + @Test + public void lockTestPositiveSiloDC() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus().get()); + + boolean released = lockManager.releaseLock(lock); + Assert.assertTrue(released); + Assert.assertFalse(lock.getAcquiredStatus().get()); + + // Attempt to release it again + released = lockManager.releaseLock(lock); + Assert.assertFalse(released); + } + + @Test + public void lockTestPositiveXDC() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.XDC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus().get()); + + boolean released = lockManager.releaseLock(lock); + Assert.assertTrue(released); + Assert.assertFalse(lock.getAcquiredStatus().get()); + + // Attempt to release it again + released = lockManager.releaseLock(lock); + Assert.assertFalse(released); + } + + @Test(expected = DLMException.class) + public void lockTestNegative1() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus().get()); + lockManager.tryAcquireLock(lock); + } + + @Test(expected = DLMException.class) + public void lockTestNegative2() { + Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus().get()); + lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + } + + @Test(expected = DLMException.class) + public void lockTestNegative3() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + Assert.assertTrue(lock.getAcquiredStatus().get()); + final Lock lock1 = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock1); + } + + @Test + public void testInitializeWithException() { + final HBaseConnectionStub connectionSpy = Mockito.spy(connection); + Mockito.doThrow(RuntimeException.class) + .when(connectionSpy) + .getAdmin(); + lockManager = DistributedLockManager.builder() + .clientId("CLIENT_ID") + .lockBase(LockBase.builder() + .mode(LockMode.EXCLUSIVE) + .lockStore(HBaseStore.builder() + .connection(connectionSpy) + .tableName("dlm_locks") + .build()) + .build()) + .build(); + exceptionThrown.expect(DLSExceptionMatcher.hasCode(ErrorCode.TABLE_CREATION_ERROR)); + lockManager.initialize(); + } + + @Test + public void testCloseWithException() { + final HBaseConnectionStub connectionSpy = Mockito.spy(connection); + Mockito.doThrow(RuntimeException.class) + .when(connectionSpy) + .close(); + lockManager = DistributedLockManager.builder() + .clientId("CLIENT_ID") + .lockBase(LockBase.builder() + .mode(LockMode.EXCLUSIVE) + .lockStore(HBaseStore.builder() + .connection(connectionSpy) + .tableName("dlm_locks") + .build()) + .build()) + .build(); + + try { + lockManager.destroy(); + Assert.fail("Should have thrown an exception"); + } catch (DLMException e) { + Assert.assertEquals(ErrorCode.INTERNAL_ERROR, e.getErrorCode()); + // revert the behavior + Mockito.doNothing() + .when(connectionSpy) + .close(); + } + } + + @Test + public void concurrentLockAttempt() { + final int attempts = Runtime.getRuntime() + .availableProcessors(); + final Map trackers = Maps.newConcurrentMap(); + final ExecutorService service = Executors.newFixedThreadPool(attempts); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(attempts); + + final List> futures = new ArrayList<>(); + for (int i = 0; i < attempts; i++) { + TestUtils.waitSometime(100, TimeUnit.MILLISECONDS); + futures.add(service.submit(() -> { + Lock lock = null; + try { + lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.tryAcquireLock(lock); + if (lock.getAcquiredStatus() + .get()) { + trackers.computeIfAbsent("SUCCESSFUL_ACQUIRES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } + latch.await(); + } catch (DLMException e) { + trackers.computeIfAbsent("FAILED_ACQUIRES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } catch (Exception e) { + // ignore; + } finally { + boolean result = lockManager.releaseLock(lock); + if (result) { + trackers.computeIfAbsent("SUCCESSFUL_RELEASES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } else { + trackers.computeIfAbsent("FAILED_RELEASES", x -> new AtomicInteger(0)) + .getAndIncrement(); + } + } + })); + } + futures.parallelStream() + .forEach(future -> { + try { + future.get(); + if (counter.decrementAndGet() <= 1) { + latch.countDown(); + } + } catch (InterruptedException | ExecutionException e1) { + // ignore; + } + }); + + // Only one successful acquire / release of locks should take place + assertEquals(1, + trackers.getOrDefault("SUCCESSFUL_ACQUIRES", new AtomicInteger(0)) + .get()); + assertEquals(1, + trackers.getOrDefault("SUCCESSFUL_RELEASES", new AtomicInteger(0)) + .get()); + assertEquals(attempts - 1, + trackers.getOrDefault("FAILED_ACQUIRES", new AtomicInteger(0)) + .get()); + assertEquals(attempts - 1, + trackers.getOrDefault("FAILED_RELEASES", new AtomicInteger(0)) + .get()); + } + +} diff --git a/src/test/java/com/phonepe/dlm/util/DLSExceptionMatcher.java b/src/test/java/com/phonepe/dlm/util/DLSExceptionMatcher.java new file mode 100644 index 0000000..d2f13d5 --- /dev/null +++ b/src/test/java/com/phonepe/dlm/util/DLSExceptionMatcher.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.util; + +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; + +/** + * @author shantanu.tiwari + * Created on 29/12/21 + */ +public class DLSExceptionMatcher extends TypeSafeMatcher { + private final ErrorCode expectedErrorCode; + private ErrorCode foundErrorCode; + + private DLSExceptionMatcher(ErrorCode expectedErrorCode) { + this.expectedErrorCode = expectedErrorCode; + } + + public static DLSExceptionMatcher hasCode(ErrorCode errorCode) { + return new DLSExceptionMatcher(errorCode); + } + + @Override + protected boolean matchesSafely(final DLMException exception) { + foundErrorCode = exception.getErrorCode(); + return foundErrorCode == expectedErrorCode; + } + + @Override + public void describeTo(Description description) { + description.appendValue(foundErrorCode) + .appendText(" was not found instead of ") + .appendValue(expectedErrorCode); + } +} diff --git a/src/test/java/com/phonepe/dlm/util/HBaseConnectionStub.java b/src/test/java/com/phonepe/dlm/util/HBaseConnectionStub.java new file mode 100644 index 0000000..28ad54e --- /dev/null +++ b/src/test/java/com/phonepe/dlm/util/HBaseConnectionStub.java @@ -0,0 +1,100 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.mockito.Mockito; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +@SuppressWarnings("serial") +public class HBaseConnectionStub implements Connection, Serializable { + + private Map tables = new HashMap<>(); + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public HBaseTableStub getTable(TableName tableName) { + String name = tableName.getNameAsString(); + return tables.computeIfAbsent(name, HBaseTableStub::new); + } + + @Override + public HBaseTableStub getTable(TableName tableName, ExecutorService pool) { + return getTable(tableName); + } + + public HBaseTableStub getTable(String tableName) { + return tables.get(tableName); + } + + public Map getTables() { + return tables; + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public Admin getAdmin() { + return Mockito.mock(Admin.class); + } + + @Override + public void close() { + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) { + return null; + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } +} \ No newline at end of file diff --git a/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java b/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java new file mode 100644 index 0000000..df5f1f9 --- /dev/null +++ b/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java @@ -0,0 +1,1203 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.util.Bytes; + + +@SuppressWarnings("deprecation") +public class HBaseTableStub implements Table { + private final String tableName; + private final List columnFamilies = new ArrayList<>(); + + private NavigableMap>>> data = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + + private static List toCell(byte[] row, + NavigableMap>> rowdata, + int maxVersions) { + return toCell(row, rowdata, 0, Long.MAX_VALUE, maxVersions); + } + + public HBaseTableStub(String tableName) { + this.tableName = tableName; + } + + public HBaseTableStub(String tableName, String... columnFamilies) { + this.tableName = tableName; + this.columnFamilies.addAll(Arrays.asList(columnFamilies)); + } + + public void addColumnFamily(String columnFamily) { + this.columnFamilies.add(columnFamily); + } + + public NavigableMap>>> getData() { + return data; + } + + /** + * {@inheritDoc} + */ + @Override + public TableName getName() { + return TableName.valueOf(tableName); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getConfiguration() { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + /** + * {@inheritDoc} + */ + @Override + public HTableDescriptor getTableDescriptor() { + HTableDescriptor table = new HTableDescriptor(getName()); + for (String columnFamily : columnFamilies) { + table.addFamily(new HColumnDescriptor(columnFamily)); + } + return table; + } + + @Override + public TableDescriptor getDescriptor() throws IOException { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public void mutateRow(RowMutations rm) { + // currently only support Put and Delete + for (Mutation mutation : rm.getMutations()) { + if (mutation instanceof Put) { + put((Put) mutation); + } else if (mutation instanceof Delete) { + delete((Delete) mutation); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public Result append(Append append) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + private static List toCell(byte[] row, + NavigableMap>> rowdata, + long timestampStart, + long timestampEnd, + int maxVersions) { + List ret = new ArrayList<>(); + byte putType = KeyValue.Type.Put.getCode(); + for (byte[] family : rowdata.keySet()) + for (byte[] qualifier : rowdata.get(family) + .keySet()) { + int versionsAdded = 0; + for (Map.Entry tsToVal : rowdata.get(family) + .get(qualifier) + .descendingMap() + .entrySet()) { + if (versionsAdded++ == maxVersions) + break; + Long timestamp = tsToVal.getKey(); + if (timestamp < timestampStart) + continue; + if (timestamp > timestampEnd) + continue; + byte[] value = tsToVal.getValue(); + ret.add(CellUtil.createCell(row, family, qualifier, timestamp, putType, value)); + } + } + return ret; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean exists(Get get) throws IOException { + Result result = get(get); + return result != null && !result.isEmpty(); + } + + @Override + public boolean[] exists(List list) throws IOException { + return new boolean[0]; + } + + @Override + public boolean[] existsAll(List gets) throws IOException { + boolean[] result = new boolean[gets.size()]; + for (int i = 0; i < gets.size(); i++) { + result[i] = exists(gets.get(i)); + } + return result; + } + + @Override + public void batch(List list, Object[] objects) throws IOException, InterruptedException { + + } + + /** + * {@inheritDoc} + */ + public void batch(List actions) throws IOException { + batch(actions); + } + + /** + * {@inheritDoc} + */ + + @Override + public void batchCallback(List actions, Object[] results, Batch.Callback callback) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + /** + * {@inheritDoc} + */ + @Override + public Result get(Get get) throws IOException { + if (!data.containsKey(get.getRow())) + return new Result(); + byte[] row = get.getRow(); + List cells = new ArrayList<>(); + if (!get.hasFamilies()) { + cells = toCell(row, data.get(row), get.getMaxVersions()); + } else { + for (byte[] family : get.getFamilyMap() + .keySet()) { + if (data.get(row) + .get(family) == null) + continue; + NavigableSet qualifiers = get.getFamilyMap() + .get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row) + .get(family) + .navigableKeySet(); + for (byte[] qualifier : qualifiers) { + if (qualifier == null) + qualifier = "".getBytes(); + if (!data.get(row) + .containsKey(family) + || !data.get(row) + .get(family) + .containsKey(qualifier) + || data.get(row) + .get(family) + .get(qualifier) + .isEmpty()) + continue; + Map.Entry timestampAndValue = data.get(row) + .get(family) + .get(qualifier) + .lastEntry(); + cells.add(new KeyValue(row, + family, + qualifier, + timestampAndValue.getKey(), + timestampAndValue.getValue())); + } + } + } + Filter filter = get.getFilter(); + if (filter != null) { + cells = filter(filter, cells); + } + cells.sort(new CellComparator() { + @Override + public int compare(Cell cell, Cell cell1) { + return 0; + } + + @Override + public boolean equals(Object obj) { + return false; + } + + @Override + public int compare(Cell cell, Cell cell1, boolean b) { + return 0; + } + + @Override + public int compareRows(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareRows(Cell cell, byte[] bytes, int i, int i1) { + return 0; + } + + @Override + public int compareWithoutRow(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareFamilies(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareQualifiers(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareTimestamps(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareTimestamps(long l, long l1) { + return 0; + } + + @SuppressWarnings("rawtypes") + @Override + public Comparator getSimpleComparator() { + return null; + } + }); + return Result.create(cells); + } + + /** + * {@inheritDoc} + */ + @Override + public Result[] get(List gets) throws IOException { + List results = new ArrayList<>(); + for (Get g : gets) { + results.add(get(g)); + } + return results.toArray(new Result[0]); + } + + /** + * {@inheritDoc} + */ + @SuppressWarnings("unused") + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + final List ret = new ArrayList<>(); + byte[] st = scan.getStartRow(); + byte[] sp = scan.getStopRow(); + Filter filter = scan.getFilter(); + + for (byte[] row : data.keySet()) { + // if row is equal to startRow emit it. When startRow (inclusive) and + // stopRow (exclusive) is the same, it should not be excluded which would + // happen w/o this control. + if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) != 0) { + // if row is before startRow do not emit, pass to next row + if (Bytes.BYTES_COMPARATOR.compare(st, row) > 0) + continue; + // if row is equal to stopRow or after it do not emit, stop iteration + if (sp != null && sp.length > 0 && Bytes.BYTES_COMPARATOR.compare(sp, row) < 0) + break; + } + + List kvs; + if (!scan.hasFamilies()) { + kvs = toCell(row, + data.get(row), + scan.getTimeRange() + .getMin(), + scan.getTimeRange() + .getMax(), + scan.getMaxVersions()); + } else { + kvs = new ArrayList<>(); + for (byte[] family : scan.getFamilyMap() + .keySet()) { + if (data.get(row) + .get(family) == null) + continue; + NavigableSet qualifiers = scan.getFamilyMap() + .get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row) + .get(family) + .navigableKeySet(); + for (byte[] qualifier : qualifiers) { + if (data.get(row) + .get(family) + .get(qualifier) == null) + continue; + for (Long timestamp : data.get(row) + .get(family) + .get(qualifier) + .descendingKeySet()) { + if (timestamp < scan.getTimeRange() + .getMin()) + continue; + if (timestamp > scan.getTimeRange() + .getMax()) + continue; + byte[] value = data.get(row) + .get(family) + .get(qualifier) + .get(timestamp); + kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); + if (kvs.size() == scan.getMaxVersions()) { + break; + } + } + } + } + } +// skip filter implementation +// if (filter != null) { +// kvs = filter(filter, kvs); +// // Check for early out optimization +// if (filter.filterAllRemaining()) { +// break; +// } +// } + if (!kvs.isEmpty()) { + kvs.sort(new CellComparator() { + @Override + public int compare(Cell cell, Cell cell1) { + return 0; + } + + @Override + public boolean equals(Object obj) { + return false; + } + + @Override + public int compare(Cell cell, Cell cell1, boolean b) { + return 0; + } + + @Override + public int compareRows(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareRows(Cell cell, byte[] bytes, int i, int i1) { + return 0; + } + + @Override + public int compareWithoutRow(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareFamilies(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareQualifiers(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareTimestamps(Cell cell, Cell cell1) { + return 0; + } + + @Override + public int compareTimestamps(long l, long l1) { + return 0; + } + + @SuppressWarnings("rawtypes") + @Override + public Comparator getSimpleComparator() { + return null; + } + }); + ret.add(Result.create(kvs)); + } + } + + return new ResultScanner() { + private final Iterator iterator = ret.iterator(); + + public Iterator iterator() { + return iterator; + } + + public Result[] next(int nbRows) { + ArrayList resultSets = new ArrayList<>(nbRows); + for (int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[0]); + } + + public Result next() { + try { + return iterator().next(); + } catch (NoSuchElementException e) { + return null; + } + } + + public void close() { + } + + @Override + public boolean renewLease() { + return false; + } + + @Override + public ScanMetrics getScanMetrics() { + return null; + } + }; + } + + /** + * Follows the logical flow through the filter methods for a single row. + * + * @param filter HBase filter. + * @param cells List of a row's Cells + * @return List of Cells that were not filtered. + */ + private List filter(Filter filter, List cells) throws IOException { + filter.reset(); + + List tmp = new ArrayList<>(cells.size()); + tmp.addAll(cells); + + /* + * Note. Filter flow for a single row. Adapted from + * "HBase: The Definitive Guide" (p. 163) by Lars George, 2011. See Figure 4-2 + * on p. 163. + */ + boolean filteredOnRowKey = false; + List nkvs = new ArrayList<>(tmp.size()); + for (Cell cell : tmp) { + if (filter.filterRowKey(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) { + filteredOnRowKey = true; + break; + } + Filter.ReturnCode filterResult = filter.filterKeyValue(cell); + if (filterResult == Filter.ReturnCode.INCLUDE) { + nkvs.add(cell); + } else if (filterResult == Filter.ReturnCode.NEXT_ROW) { + break; + } + /* + * Ignoring next key hint which is a optimization to reduce file system IO + */ + } + if (filter.hasFilterRow() && !filteredOnRowKey) { + filter.filterRowCells(nkvs); + } + if (filter.filterRow() || filteredOnRowKey) { + nkvs.clear(); + } + return nkvs; + } + + /** + * {@inheritDoc} + */ + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return getScanner(scan); + } + + /** + * {@inheritDoc} + */ + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return getScanner(scan); + } + + private V forceFind(NavigableMap map, K key, V newObject) { + return map.computeIfAbsent(key, k -> newObject); + } + + /** + * {@inheritDoc} + */ + @Override + public void put(Put put) { + byte[] row = put.getRow(); + NavigableMap>> rowData = forceFind(data, + row, + new TreeMap<>(Bytes.BYTES_COMPARATOR)); + for (byte[] family : put.getFamilyCellMap() + .keySet()) { +// if (!columnFamilies.contains(new String(family))) { +// throw new RuntimeException("Not Exists columnFamily : " + new String(family)); +// } + NavigableMap> familyData = forceFind(rowData, + family, + new TreeMap<>(Bytes.BYTES_COMPARATOR)); + for (Cell cell : put.getFamilyCellMap() + .get(family)) { + byte[] qualifier = CellUtil.cloneQualifier(cell); + NavigableMap qualifierData = forceFind(familyData, qualifier, new TreeMap<>()); + qualifierData.put(cell.getTimestamp(), CellUtil.cloneValue(cell)); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void put(List puts) { + for (Put put : puts) { + put(put); + } + + } + + private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) { + if (value == null || value.length == 0) + return !data.containsKey(row) || !data.get(row) + .containsKey(family) + || !data.get(row) + .get(family) + .containsKey(qualifier); + else + return data.containsKey(row) && data.get(row) + .containsKey(family) + && data.get(row) + .get(family) + .containsKey(qualifier) + && !data.get(row) + .get(family) + .get(qualifier) + .isEmpty() + && Arrays.equals(data.get(row) + .get(family) + .get(qualifier) + .lastEntry() + .getValue(), value); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) { + if (check(row, family, qualifier, value)) { + put(put); + return true; + } + return false; + } + + @Override + public boolean checkAndPut(byte[] row, + byte[] family, + byte[] qualifier, + CompareFilter.CompareOp compareOp, + byte[] value, + Put put) { + return false; + } + + @Override + public boolean checkAndPut(byte[] bytes, + byte[] bytes1, + byte[] bytes2, + CompareOperator compareOperator, + byte[] bytes3, + Put put) throws IOException { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public void delete(Delete delete) { + byte[] row = delete.getRow(); + NavigableMap> familyCellMap = delete.getFamilyCellMap(); + if (data.get(row) == null) + return; + if (familyCellMap.size() == 0) { + data.remove(row); + return; + } + for (byte[] family : familyCellMap.keySet()) { + if (data.get(row) + .get(family) == null) + continue; + if (familyCellMap.get(family) + .isEmpty()) { + data.get(row) + .remove(family); + continue; + } + for (Cell cell : familyCellMap.get(family)) { + data.get(row) + .get(CellUtil.cloneFamily(cell)) + .remove(CellUtil.cloneQualifier(cell)); + } + if (data.get(row) + .get(family) + .isEmpty()) { + data.get(row) + .remove(family); + } + } + if (data.get(row) + .isEmpty()) { + data.remove(row); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void delete(List deletes) { + for (Delete delete : deletes) { + delete(delete); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) { + if (check(row, family, qualifier, value)) { + delete(delete); + return true; + } + return false; + } + + // TODO: Implement? + @Override + public boolean checkAndDelete(byte[] row, + byte[] family, + byte[] qualifier, + CompareFilter.CompareOp compareOp, + byte[] value, + Delete delete) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public boolean checkAndDelete(byte[] bytes, + byte[] bytes1, + byte[] bytes2, + CompareOperator compareOperator, + byte[] bytes3, + Delete delete) throws IOException { + return false; + } + + @Override + public CheckAndMutateBuilder checkAndMutate(byte[] bytes, byte[] bytes1) { + return new CheckAndMutateBuilder() { + + @Override + public CheckAndMutateBuilder ifMatches(CompareOperator arg0, byte[] arg1) { + // TODO Auto-generated method stub + return this; + } + + @Override + public CheckAndMutateBuilder ifNotExists() { + // TODO Auto-generated method stub + return this; + } + + @Override + public CheckAndMutateBuilder qualifier(byte[] arg0) { + // TODO Auto-generated method stub + return this; + } + + @Override + public boolean thenDelete(Delete arg0) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean thenMutate(RowMutations arg0) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean thenPut(Put arg0) throws IOException { + if (!check(arg0.getRow(), Bytes.toBytes("D"), Bytes.toBytes("L"), Bytes.toBytes("M"))) { + put(arg0); + return true; + } + return false; + } + + @Override + public CheckAndMutateBuilder timeRange(TimeRange arg0) { + // TODO Auto-generated method stub + return this; + } + }; + } + + /** + * {@inheritDoc} + */ + @Override + public Result increment(Increment increment) { + List cells = new ArrayList<>(); + Map> famToVal = increment.getFamilyMapOfLongs(); + NavigableMap>> rowData = forceFind(data, + increment.getRow(), + new TreeMap<>(Bytes.BYTES_COMPARATOR)); + for (Map.Entry> ef : famToVal.entrySet()) { + byte[] family = ef.getKey(); + NavigableMap qToVal = ef.getValue(); + NavigableMap> familyData = forceFind(rowData, + family, + new TreeMap<>(Bytes.BYTES_COMPARATOR)); + for (Map.Entry eq : qToVal.entrySet()) { + long newValue = incrementColumnValue(increment.getRow(), family, eq.getKey(), eq.getValue()); + Cell cell = CellUtil.createCell(increment.getRow(), + family, + eq.getKey(), + System.currentTimeMillis(), + KeyValue.Type.Put.getCode(), + Bytes.toBytes(newValue)); + cells.add(cell); + byte[] qualifier = CellUtil.cloneQualifier(cell); + NavigableMap qualifierData = forceFind(familyData, qualifier, new TreeMap<>()); + qualifierData.put(cell.getTimestamp(), CellUtil.cloneValue(cell)); + } + } + return Result.create(cells); + } + + // TODO: Implement? + /** + * {@inheritDoc} + */ + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) { + return incrementColumnValue(row, family, qualifier, amount, Durability.USE_DEFAULT); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) { + try { + Get get = new Get(row); + if (get(get).getValue(family, qualifier) == null) { + return amount; + } else { + return Bytes.toLong(get(get).getValue(family, qualifier)) + amount; + } + } catch (Exception e) { + e.printStackTrace(); + } + return amount; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public Map coprocessorService( + Class aClass, + byte[] bytes, + byte[] bytes1, + Batch.Call call) throws ServiceException, + Throwable { + return null; + } + + @Override + public void coprocessorService( + Class aClass, + byte[] bytes, + byte[] bytes1, + Batch.Call call, + Batch.Callback callback) throws ServiceException, + Throwable { + + } + + @Override + public Map batchCoprocessorService( + org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + org.apache.hadoop.hbase.shaded.com.google.protobuf.Message message, + byte[] bytes, + byte[] bytes1, + R r) throws ServiceException, + Throwable { + return null; + } + + @Override + public void batchCoprocessorService( + org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + org.apache.hadoop.hbase.shaded.com.google.protobuf.Message message, + byte[] bytes, + byte[] bytes1, + R r, + Batch.Callback callback) throws ServiceException, + Throwable { + + } + + /** + * {@inheritDoc} + */ +// @Override +// public long getWriteBufferSize() { +// throw new RuntimeException(this.getClass() + " does NOT implement this method."); +// } + + /** + * {@inheritDoc} + */ +// @Override +// public void setWriteBufferSize(long writeBufferSize) { +// throw new RuntimeException(this.getClass() + " does NOT implement this method."); +// } +// +// @Override +// public Map batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) { +// throw new RuntimeException(this.getClass() + " does NOT implement this method."); +// } +// +// @Override +// public void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback callback) { +// throw new RuntimeException(this.getClass() + " does NOT implement this method."); +// } + + // TODO: Implement? + @Override + public boolean checkAndMutate(byte[] row, + byte[] family, + byte[] qualifier, + CompareFilter.CompareOp compareOp, + byte[] value, + RowMutations mutation) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public boolean checkAndMutate(byte[] bytes, + byte[] bytes1, + byte[] bytes2, + CompareOperator compareOperator, + byte[] bytes3, + RowMutations rowMutations) throws IOException { + return false; + } + + @Override + public long getRpcTimeout(TimeUnit timeUnit) { + return 0; + } + + @Override + public void setOperationTimeout(int operationTimeout) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public int getOperationTimeout() { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + @Override + public long getReadRpcTimeout(TimeUnit timeUnit) { + return 0; + } + + @Override + public int getReadRpcTimeout() { + return 0; + } + + @Override + public void setReadRpcTimeout(int i) { + + } + + @Override + public long getWriteRpcTimeout(TimeUnit timeUnit) { + return 0; + } + + @Override + public int getWriteRpcTimeout() { + return 0; + } + + @Override + public void setWriteRpcTimeout(int i) { + + } + + @Override + public long getOperationTimeout(TimeUnit timeUnit) { + return 0; + } + + @Override + public int getRpcTimeout() { + throw new RuntimeException(this.getClass() + " does NOT implement this method."); + } + + private static void indent(int level, StringBuilder sb) { + String indent = new String(new char[level]).replace("\0", " "); + sb.append(indent); + } + + /** + * Create a MockHBaseTable with some pre-loaded data. Parameter should be a map + * of column-to-data mappings of rows. It can be created with data like: + * + *

+     * rowid:
+     *   family1:qualifier1: value1
+     *   family2:qualifier2: value2
+     * 
+ * + * @param name name of the new table + * @param data data to initialize the table with + * @return a new MockHBaseTable loaded with given data + */ + public static HBaseTableStub with(String name, Map> data) { + HBaseTableStub table = new HBaseTableStub(name); + for (String row : data.keySet()) { + for (String column : data.get(row) + .keySet()) { + String val = data.get(row) + .get(column); + put(table, row, column, val); + } + } + return table; + } + + /** + * Create a MockHBaseTable with some pre-loaded data. Parameter should be an + * array of string arrays which define every column value individually. + * + *
+     * new String[][] {
+     *   { "<rowid>", "<column>", "<value>" },
+     *   { "id", "family:qualifier1", "data1" },
+     *   { "id", "family:qualifier2", "data2" }
+     * });
+     * 
+ * + * @param name name of the new table + * @param data data to initialize the table with + * @return a new MockHBaseTable loaded with given data + */ + public static HBaseTableStub with(String name, String[][] data) { + HBaseTableStub ret = new HBaseTableStub(name); + for (String[] row : data) { + put(ret, row[0], row[1], row[2]); + } + return ret; + } + + /** + * Helper method of pre-loaders, adds parameters to data. + * + * @param table table to load data into + * @param row row id + * @param column family:qualifier encoded value + * @param val value + */ + private static void put(HBaseTableStub table, String row, String column, String val) { + String[] fq = split(column); + byte[] family = Bytes.toBytes(fq[0]); + byte[] qualifier = Bytes.toBytes(fq[1]); + NavigableMap>> families = table + .forceFind(table.data, Bytes.toBytes(row), new TreeMap<>(Bytes.BYTES_COMPARATOR)); + NavigableMap> qualifiers = table + .forceFind(families, family, new TreeMap<>(Bytes.BYTES_COMPARATOR)); + NavigableMap values = table.forceFind(qualifiers, qualifier, new TreeMap<>()); + values.put(System.currentTimeMillis(), Bytes.toBytes(val)); + } + + /** + * Column identification helper + * + * @param column column name in the format family:qualifier + * @return {"family", "qualifier"} + */ + private static String[] split(String column) { + return new String[] { column.substring(0, column.indexOf(':')), column.substring(column.indexOf(':') + 1) }; + } + + /** + * Read a value saved in the object. Useful for making assertions in tests. + * + * @param rowid rowid of the data to read + * @param column family:qualifier of the data to read + * @return value or null if row or column of the row does not exist + */ + public byte[] read(String rowid, String column) { + NavigableMap>> row = data.get(Bytes.toBytes(rowid)); + if (row == null) + return null; + String[] fq = split(column); + byte[] family = Bytes.toBytes(fq[0]); + byte[] qualifier = Bytes.toBytes(fq[1]); + if (!row.containsKey(family)) + return null; + if (!row.get(family) + .containsKey(qualifier)) + return null; + return row.get(family) + .get(qualifier) + .lastEntry() + .getValue(); + } + + @Override + public String toString() { + String nl = System.getProperty("line.separator"); + int i = 1; + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append(nl); + for (Map.Entry>>> row : getData() + .entrySet()) { + indent(i, sb); + sb.append(Bytes.toString(row.getKey())); + sb.append(":"); + sb.append(nl); + i++; + for (Map.Entry>> family : row.getValue() + .entrySet()) { + indent(i, sb); + sb.append(Bytes.toString(family.getKey())); + sb.append(":"); + sb.append(nl); + i++; + for (Map.Entry> column : family.getValue() + .entrySet()) { + indent(i, sb); + sb.append(Bytes.toString(column.getKey())); + sb.append(": "); + sb.append(Bytes.toString(column.getValue() + .lastEntry() + .getValue())); + sb.append(nl); + } + i--; + } + i--; + } + sb.append(nl); + sb.append("}"); + return sb.toString(); + } + +} \ No newline at end of file diff --git a/src/test/java/com/phonepe/dlm/util/TestUtils.java b/src/test/java/com/phonepe/dlm/util/TestUtils.java new file mode 100644 index 0000000..48d188e --- /dev/null +++ b/src/test/java/com/phonepe/dlm/util/TestUtils.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2023 Original Author(s), PhonePe India Pvt. Ltd. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.phonepe.dlm.util; + +import org.awaitility.Awaitility; + +import java.util.concurrent.TimeUnit; + +/** + * @author shantanu.tiwari + * Created on 29/12/21 + */ +public class TestUtils { + + public static void waitSometime(final int delay, final TimeUnit timeUnit) { + Awaitility.await().pollDelay(delay, timeUnit) + .until(() -> true); + } +}