diff --git a/examples/java/src/main/java/glide/examples/GlideFtExample.java b/examples/java/src/main/java/glide/examples/GlideFtExample.java new file mode 100644 index 0000000000..4649711085 --- /dev/null +++ b/examples/java/src/main/java/glide/examples/GlideFtExample.java @@ -0,0 +1,225 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.examples; + +import static glide.api.logging.Logger.Level.ERROR; +import static glide.api.logging.Logger.Level.INFO; +import static glide.api.logging.Logger.Level.WARN; +import static glide.api.logging.Logger.log; +import static glide.api.models.GlideString.gs; +import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; + +import glide.api.GlideClusterClient; +import glide.api.commands.servermodules.FT; +import glide.api.logging.Logger; +import glide.api.models.ClusterValue; +import glide.api.models.commands.FT.FTCreateOptions; +import glide.api.models.commands.FT.FTCreateOptions.DataType; +import glide.api.models.commands.FT.FTCreateOptions.DistanceMetric; +import glide.api.models.commands.FT.FTCreateOptions.FieldInfo; +import glide.api.models.commands.FT.FTCreateOptions.VectorFieldHnsw; +import glide.api.models.commands.FT.FTSearchOptions; +import glide.api.models.commands.InfoOptions.Section; +import glide.api.models.configuration.GlideClusterClientConfiguration; +import glide.api.models.configuration.NodeAddress; +import glide.api.models.exceptions.ClosingException; +import glide.api.models.exceptions.ConnectionException; +import glide.api.models.exceptions.TimeoutException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class GlideFtExample { + + /** Waiting interval to let server process the data before querying */ + private static final int DATA_PROCESSING_TIMEOUT = 1000; // ms + + /** + * Creates and returns a GlideClusterClient instance. + * + *

This function initializes a GlideClusterClient with the provided list of nodes. + * The list may contain the address of one or more cluster nodes, and the client will + * automatically discover all nodes in the cluster. + * + * @return A GlideClusterClient connected to the discovered nodes. + * @throws CancellationException if the operation is cancelled. + * @throws ExecutionException if the client fails due to execution errors. + * @throws InterruptedException if the operation is interrupted. + */ + public static GlideClusterClient createClient(List nodeList) + throws CancellationException, ExecutionException, InterruptedException { + // Check `GlideClusterClientConfiguration` for additional options. + GlideClusterClientConfiguration config = + GlideClusterClientConfiguration.builder() + .addresses(nodeList) + // Enable this field if the servers are configured with TLS. + // .useTLS(true); + .build(); + + GlideClusterClient client = GlideClusterClient.createClient(config).get(); + return client; + } + + /** + * Executes the main logic of the application, performing basic operations such as FT.CREATE and + * FT.SEARCH using the provided GlideClusterClient. + * + * @param client An instance of GlideClusterClient. + * @throws ExecutionException if an execution error occurs during operations. + * @throws InterruptedException if the operation is interrupted. + */ + public static void appLogic(GlideClusterClient client) + throws ExecutionException, InterruptedException { + + String prefix = "{" + UUID.randomUUID() + "}:"; + String index = prefix + "index"; + + CompletableFuture createResponse = + FT.create( + client, + index, + new FieldInfo[] { + new FieldInfo("vec", "VEC", VectorFieldHnsw.builder(DistanceMetric.L2, 2).build()) + }, + FTCreateOptions.builder() + .dataType(DataType.HASH) + .prefixes(new String[] {prefix}) + .build()); // "OK" + + CompletableFuture hsetResponse = + client.hset( + gs(prefix + 0), + Map.of( + gs("vec"), + gs( + new byte[] { + 0, 0, 0, 0, 0, 0, 0, 0 + }))); // response is 1L which represents the number of fields that were added. + + hsetResponse = + client.hset( + gs(prefix + 1), + Map.of( + gs("vec"), + gs( + new byte[] { + 0, 0, 0, 0, 0, 0, (byte) 0x80, (byte) 0xBF + }))); // response is 1L which represents the number of fields that were added. + Thread.sleep(DATA_PROCESSING_TIMEOUT); // let server digest the data and update + + // These are the optional arguments used for the FT.search command + var options = + FTSearchOptions.builder() + .params(Map.of(gs("query_vec"), gs(new byte[] {0, 0, 0, 0, 0, 0, 0, 0}))) + .build(); + String query = "*=>[KNN 2 @VEC $query_vec]"; // This is the text query to search for + CompletableFuture searchResponse = FT.search(client, index, query, options); + + // When you call .get() on searchResponse, the result will be an Object[] as shown in the + // commented assert test below. + // assertArrayEquals( + // new Object[] { + // 2L, + // Map.of( + // gs(prefix + 0), + // Map.of(gs("__VEC_score"), gs("0"), gs("vec"), gs("\0\0\0\0\0\0\0\0")), + // gs(prefix + 1), + // Map.of( + // gs("__VEC_score"), + // gs("1"), + // gs("vec"), + // gs( + // new byte[] { + // 0, + // 0, + // 0, + // 0, + // 0, + // 0, + // (byte) 0x80, + // (byte) 0xBF + // }))) + // }, + // searchResponse.get()); + + System.out.println("Create response: " + createResponse.get()); + System.out.println("Hset response: " + hsetResponse.get()); + System.out.println("Search response: " + searchResponse.get()); + + // Send INFO REPLICATION with routing option to all nodes + ClusterValue infoResponse = + client.info(new Section[] {Section.REPLICATION}, ALL_NODES).get(); + log( + INFO, + "app", + "INFO REPLICATION responses from all nodes are " + infoResponse.getMultiValue()); + } + + /** + * Executes the application logic with exception handling. + * + * @throws ExecutionException if an execution error occurs during operations. + */ + private static void execAppLogic() throws ExecutionException { + + // Define list of nodes + List nodeList = + Collections.singletonList(NodeAddress.builder().host("localhost").port(6379).build()); + + while (true) { + try (GlideClusterClient client = createClient(nodeList)) { + appLogic(client); + return; + } catch (CancellationException e) { + log(ERROR, "glide", "Request cancelled: " + e.getMessage()); + throw e; + } catch (InterruptedException e) { + log(ERROR, "glide", "Client interrupted: " + e.getMessage()); + Thread.currentThread().interrupt(); // Restore interrupt status + throw new CancellationException("Client was interrupted."); + } catch (ExecutionException e) { + // All Glide errors will be handled as ExecutionException + if (e.getCause() instanceof ClosingException) { + // If the error message contains "NOAUTH", raise the exception + // because it indicates a critical authentication issue. + if (e.getMessage().contains("NOAUTH")) { + log(ERROR, "glide", "Authentication error encountered: " + e.getMessage()); + throw e; + } else { + log(WARN, "glide", "Client has closed and needs to be re-created: " + e.getMessage()); + } + } else if (e.getCause() instanceof ConnectionException) { + // The client wasn't able to reestablish the connection within the given retries + log(ERROR, "glide", "Connection error encountered: " + e.getMessage()); + throw e; + } else if (e.getCause() instanceof TimeoutException) { + // A request timed out. You may choose to retry the execution based on your application's + // logic + log(ERROR, "glide", "Timeout encountered: " + e.getMessage()); + throw e; + } else { + log(ERROR, "glide", "Execution error encountered: " + e.getCause()); + throw e; + } + } + } + } + + /** + * The entry point of the cluster example. This method sets up the logger configuration and + * executes the main application logic. + * + * @param args Command-line arguments passed to the application. + * @throws ExecutionException if an error occurs during execution of the application logic. + */ + public static void main(String[] args) throws ExecutionException { + // In this example, we will utilize the client's logger for all log messages + Logger.setLoggerConfig(INFO); + // Optional - set the logger to write to a file + // Logger.setLoggerConfig(Logger.Level.INFO, file) + execAppLogic(); + } +} diff --git a/examples/java/src/main/java/glide/examples/GlideJsonExample.java b/examples/java/src/main/java/glide/examples/GlideJsonExample.java new file mode 100644 index 0000000000..d877b04f10 --- /dev/null +++ b/examples/java/src/main/java/glide/examples/GlideJsonExample.java @@ -0,0 +1,145 @@ +/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.examples; + +import static glide.api.logging.Logger.Level.ERROR; +import static glide.api.logging.Logger.Level.INFO; +import static glide.api.logging.Logger.Level.WARN; +import static glide.api.logging.Logger.log; +import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleMultiNodeRoute.ALL_NODES; + +import glide.api.GlideClusterClient; +import glide.api.commands.servermodules.Json; +import glide.api.logging.Logger; +import glide.api.models.ClusterValue; +import glide.api.models.commands.InfoOptions.Section; +import glide.api.models.configuration.GlideClusterClientConfiguration; +import glide.api.models.configuration.NodeAddress; +import glide.api.models.exceptions.ClosingException; +import glide.api.models.exceptions.ConnectionException; +import glide.api.models.exceptions.TimeoutException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class GlideJsonExample { + + /** + * Creates and returns a GlideClusterClient instance. + * + *

This function initializes a GlideClusterClient with the provided list of nodes. + * The list may contain the address of one or more cluster nodes, and the client will + * automatically discover all nodes in the cluster. + * + * @return A GlideClusterClient connected to the discovered nodes. + * @throws CancellationException if the operation is cancelled. + * @throws ExecutionException if the client fails due to execution errors. + * @throws InterruptedException if the operation is interrupted. + */ + public static GlideClusterClient createClient(List nodeList) + throws CancellationException, ExecutionException, InterruptedException { + // Check `GlideClusterClientConfiguration` for additional options. + GlideClusterClientConfiguration config = + GlideClusterClientConfiguration.builder() + .addresses(nodeList) + // Enable this field if the servers are configured with TLS. + // .useTLS(true); + .build(); + + return GlideClusterClient.createClient(config).get(); + } + + /** + * Executes the main logic of the application, performing basic operations such as JSON.SET and + * JSON.GET using the provided GlideClusterClient. + * + * @param client An instance of GlideClusterClient. + * @throws ExecutionException if an execution error occurs during operations. + * @throws InterruptedException if the operation is interrupted. + */ + public static void appLogic(GlideClusterClient client) + throws ExecutionException, InterruptedException { + + CompletableFuture setResponse = Json.set(client, "key", "$", "{\"a\": 1.0,\"b\": 2}"); + System.out.println("The set response is " + setResponse.get()); // The response should be "OK" + + CompletableFuture getResponse = Json.get(client, "key", new String[] {"$.a", "$.b"}); + System.out.println( + "The get response is " + + getResponse.get()); // The response should be "{\"$.a\":[1.0],\"$.b\":[2]}" + + // Send INFO REPLICATION with routing option to all nodes + ClusterValue infoResponse = + client.info(new Section[] {Section.REPLICATION}, ALL_NODES).get(); + log( + INFO, + "app", + "INFO REPLICATION responses from all nodes are " + infoResponse.getMultiValue()); + } + + /** + * Executes the application logic with exception handling. + * + * @throws ExecutionException if an execution error occurs during operations. + */ + private static void execAppLogic() throws ExecutionException { + + // Define list of nodes + List nodeList = + Collections.singletonList(NodeAddress.builder().host("localhost").port(6379).build()); + + while (true) { + try (GlideClusterClient client = createClient(nodeList)) { + appLogic(client); + return; + } catch (CancellationException e) { + log(ERROR, "glide", "Request cancelled: " + e.getMessage()); + throw e; + } catch (InterruptedException e) { + log(ERROR, "glide", "Client interrupted: " + e.getMessage()); + Thread.currentThread().interrupt(); // Restore interrupt status + throw new CancellationException("Client was interrupted."); + } catch (ExecutionException e) { + // All Glide errors will be handled as ExecutionException + if (e.getCause() instanceof ClosingException) { + // If the error message contains "NOAUTH", raise the exception + // because it indicates a critical authentication issue. + if (e.getMessage().contains("NOAUTH")) { + log(ERROR, "glide", "Authentication error encountered: " + e.getMessage()); + throw e; + } else { + log(WARN, "glide", "Client has closed and needs to be re-created: " + e.getMessage()); + } + } else if (e.getCause() instanceof ConnectionException) { + // The client wasn't able to reestablish the connection within the given retries + log(ERROR, "glide", "Connection error encountered: " + e.getMessage()); + throw e; + } else if (e.getCause() instanceof TimeoutException) { + // A request timed out. You may choose to retry the execution based on your application's + // logic + log(ERROR, "glide", "Timeout encountered: " + e.getMessage()); + throw e; + } else { + log(ERROR, "glide", "Execution error encountered: " + e.getCause()); + throw e; + } + } + } + } + + /** + * The entry point of the cluster example. This method sets up the logger configuration and + * executes the main application logic. + * + * @param args Command-line arguments passed to the application. + * @throws ExecutionException if an error occurs during execution of the application logic. + */ + public static void main(String[] args) throws ExecutionException { + // In this example, we will utilize the client's logger for all log messages + Logger.setLoggerConfig(INFO); + // Optional - set the logger to write to a file + // Logger.setLoggerConfig(Logger.Level.INFO, file) + execAppLogic(); + } +} diff --git a/examples/python/ft_example.py b/examples/python/ft_example.py new file mode 100644 index 0000000000..b2163dc60f --- /dev/null +++ b/examples/python/ft_example.py @@ -0,0 +1,179 @@ +import asyncio +from typing import List, Tuple, Optional + +from glide.async_commands.server_modules import glide_json as json +from glide.async_commands.server_modules import ft +from glide.constants import OK, FtSearchResponse, TEncodable + +import uuid + +from glide import ( + AllNodes, + ClosingError, + ConnectionError, + GlideClusterClient, + GlideClusterClientConfiguration, + InfoSection, + Logger, + LogLevel, + NodeAddress, + RequestError, + TimeoutError, +) + +from glide.async_commands.server_modules.ft_options.ft_create_options import ( + DataType, + DistanceMetricType, + Field, + FtCreateOptions, + NumericField, + TagField, + TextField, + VectorAlgorithm, + VectorField, + VectorFieldAttributesHnsw, + VectorType, +) + +from glide.async_commands.server_modules.ft_options.ft_search_options import ( + FtSearchOptions, + ReturnField, +) + +async def create_client( + nodes_list: Optional[List[Tuple[str, int]]] = None +) -> GlideClusterClient: + """ + Creates and returns a GlideClusterClient instance. + + This function initializes a GlideClusterClient with the provided list of nodes. + The nodes_list may contain the address of one or more cluster nodes, and the + client will automatically discover all nodes in the cluster. + + Args: + nodes_list (List[Tuple[str, int]]): A list of tuples where each tuple + contains a host (str) and port (int). Defaults to [("localhost", 6379)]. + + Returns: + GlideClusterClient: An instance of GlideClusterClient connected to the discovered nodes. + """ + if nodes_list is None: + nodes_list = [("localhost", 6379)] + addresses = [NodeAddress(host, port) for host, port in nodes_list] + # Check `GlideClusterClientConfiguration` for additional options. + config = GlideClusterClientConfiguration( + addresses=addresses, + client_name="test_cluster_client", + # Enable this field if the servers are configured with TLS. + # use_tls=True + ) + return await GlideClusterClient.create(config) + +async def app_logic(client: GlideClusterClient): + """ + Executes the main logic of the application, performing basic operations + such as FT.CREATE and FT.SEARCH using the provided GlideClient. + + Args: + client (GlideClusterClient): An instance of GlideClient. + """ + # Create a vector + prefix = "{json}:" + index = prefix + str(uuid.uuid4()) + json_key1 = prefix + "1" + json_key2 = prefix + "2" + json_value1 = {"a": 11111, "b": 2, "c": 3} + json_value2 = {"a": 22222, "b": 2, "c": 3} + create_response = await ft.create(client, index, + schema=[ + NumericField("$.a", "a"), + NumericField("$.b", "b"), + ], + options=FtCreateOptions(DataType.JSON), + ) + Logger.log(LogLevel.INFO, "app", f"Create response is = {create_response!r}") # 'OK' + + # Create a json key. + assert ( + await json.set(client, json_key1, "$", json.dumps(json_value1)) + == OK + ) + assert ( + await json.set(client, json_key2, "$", json.dumps(json_value2)) + == OK + ) + + # Search for the vector + search_response = await ft.search(client, index, "*", options=ft_search_options) + ft_search_options = FtSearchOptions( + return_fields=[ + ReturnField(field_identifier="a", alias="a_new"), + ReturnField(field_identifier="b", alias="b_new"), + ] + ) + + Logger.log(LogLevel.INFO, "app", f"Search response is = {search_response!r}") + + +async def exec_app_logic(): + """ + Executes the application logic with exception handling. + """ + while True: + try: + client = await create_client() + return await app_logic(client) + except asyncio.CancelledError: + raise + except ClosingError as e: + # If the error message contains "NOAUTH", raise the exception + # because it indicates a critical authentication issue. + if "NOAUTH" in str(e): + Logger.log( + LogLevel.ERROR, + "glide", + f"Authentication error encountered: {e}", + ) + else: + Logger.log( + LogLevel.WARN, + "glide", + f"Client has closed and needs to be re-created: {e}", + ) + raise e + except TimeoutError as e: + # A request timed out. You may choose to retry the execution based on your application's logic + Logger.log(LogLevel.ERROR, "glide", f"TimeoutError encountered: {e}") + raise e + except ConnectionError as e: + # The client wasn't able to reestablish the connection within the given retries + Logger.log(LogLevel.ERROR, "glide", f"ConnectionError encountered: {e}") + raise e + except RequestError as e: + # Other error reported during a request, such as a server response error + Logger.log(LogLevel.ERROR, "glide", f"RequestError encountered: {e}") + raise e + except Exception as e: + Logger.log(LogLevel.ERROR, "glide", f"Unexpected error: {e}") + raise e + finally: + try: + await client.close() + except Exception as e: + Logger.log( + LogLevel.WARN, + "glide", + f"Error encountered while closing the client: {e}", + ) + + +def main(): + # In this example, we will utilize the client's logger for all log messages + Logger.set_logger_config(LogLevel.INFO) + # Optional - set the logger to write to a file + # Logger.set_logger_config(LogLevel.INFO, file) + asyncio.run(exec_app_logic()) + + +if __name__ == "__main__": + main() diff --git a/examples/python/json_example.py b/examples/python/json_example.py new file mode 100644 index 0000000000..bc6e8d0786 --- /dev/null +++ b/examples/python/json_example.py @@ -0,0 +1,129 @@ +import asyncio +from typing import List, Tuple, Optional + +from glide.async_commands.server_modules import glide_json as json + +from glide import ( + AllNodes, + ClosingError, + ConnectionError, + GlideClusterClient, + GlideClusterClientConfiguration, + InfoSection, + Logger, + LogLevel, + NodeAddress, + RequestError, + TimeoutError, +) + + +async def create_client( + nodes_list: Optional[List[Tuple[str, int]]] = None +) -> GlideClusterClient: + """ + Creates and returns a GlideClusterClient instance. + + This function initializes a GlideClusterClient with the provided list of nodes. + The nodes_list may contain the address of one or more cluster nodes, and the + client will automatically discover all nodes in the cluster. + + Args: + nodes_list (List[Tuple[str, int]]): A list of tuples where each tuple + contains a host (str) and port (int). Defaults to [("localhost", 6379)]. + + Returns: + GlideClusterClient: An instance of GlideClusterClient connected to the discovered nodes. + """ + if nodes_list is None: + nodes_list = [("localhost", 6379)] + addresses = [NodeAddress(host, port) for host, port in nodes_list] + # Check `GlideClusterClientConfiguration` for additional options. + config = GlideClusterClientConfiguration( + addresses=addresses, + client_name="test_cluster_client", + # Enable this field if the servers are configured with TLS. + # use_tls=True + ) + return await GlideClusterClient.create(config) + +async def app_logic(client: GlideClusterClient): + """ + Executes the main logic of the application, performing basic operations + such as JSON.SET and JSON.GET using the provided GlideClient. + + Args: + client (GlideClusterClient): An instance of GlideClient. + """ + + json_value = {"a": 1.0, "b": 2} + json_str = json.dumps(json_value) + # Send SET and GET + set_response = await json.set(client, "key", "$", json_str) + Logger.log(LogLevel.INFO, "app", f"Set response is = {set_response!r}") # 'OK' + + get_response = await json.get(client, "key", "$") + Logger.log(LogLevel.INFO, "app", f"Get response is = {get_response.decode()!r}") # "[{\"a\":1.0,\"b\":2}]" + +async def exec_app_logic(): + """ + Executes the application logic with exception handling. + """ + while True: + try: + client = await create_client() + return await app_logic(client) + except asyncio.CancelledError: + raise + except ClosingError as e: + # If the error message contains "NOAUTH", raise the exception + # because it indicates a critical authentication issue. + if "NOAUTH" in str(e): + Logger.log( + LogLevel.ERROR, + "glide", + f"Authentication error encountered: {e}", + ) + else: + Logger.log( + LogLevel.WARN, + "glide", + f"Client has closed and needs to be re-created: {e}", + ) + raise e + except TimeoutError as e: + # A request timed out. You may choose to retry the execution based on your application's logic + Logger.log(LogLevel.ERROR, "glide", f"TimeoutError encountered: {e}") + raise e + except ConnectionError as e: + # The client wasn't able to reestablish the connection within the given retries + Logger.log(LogLevel.ERROR, "glide", f"ConnectionError encountered: {e}") + raise e + except RequestError as e: + # Other error reported during a request, such as a server response error + Logger.log(LogLevel.ERROR, "glide", f"RequestError encountered: {e}") + raise e + except Exception as e: + Logger.log(LogLevel.ERROR, "glide", f"Unexpected error: {e}") + raise e + finally: + try: + await client.close() + except Exception as e: + Logger.log( + LogLevel.WARN, + "glide", + f"Error encountered while closing the client: {e}", + ) + + +def main(): + # In this example, we will utilize the client's logger for all log messages + Logger.set_logger_config(LogLevel.INFO) + # Optional - set the logger to write to a file + # Logger.set_logger_config(LogLevel.INFO, file) + asyncio.run(exec_app_logic()) + + +if __name__ == "__main__": + main()