From d72ddcbcc4b3a4d74143997747d076116a182f05 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 17 May 2021 14:42:59 +0100 Subject: [PATCH 1/5] Add API for getting single document and test to back it up and test for running a script --- .../ElasticsearchClient+Requests.swift | 9 ++++ .../ElasticsearchNIOClientTests.swift | 50 +++++++++++++++++++ .../SomeItem.swift | 7 +++ 3 files changed, 66 insertions(+) diff --git a/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift b/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift index c73b650..17f4c19 100644 --- a/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift +++ b/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift @@ -3,6 +3,15 @@ import NIO import NIOHTTP1 extension ElasticsearchClient { + public func get(id: String, from indexName: String) -> EventLoopFuture> { + do { + let url = try buildURL(path: "/\(indexName)/_doc/\(id)") + return sendRequest(url: url, method: .GET, headers: .init(), body: nil) + } catch { + return self.eventLoop.makeFailedFuture(error) + } + } + public func bulk(_ operations: [ESBulkOperation]) -> EventLoopFuture { guard operations.count > 0 else { return self.eventLoop.makeFailedFuture(ElasticSearchClientError(message: "No operations to perform for the bulk API", status: nil)) diff --git a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift index 86b708c..14e430c 100644 --- a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift +++ b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift @@ -196,6 +196,56 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertTrue(results.hits.hits.contains(where: { $0.source.name == "Some 29 Apples" })) } + func testGetItem() throws { + let item = SomeItem(id: UUID(), name: "Some item") + _ = try client.createDocumentWithID(item, in: self.indexName).wait() + + Thread.sleep(forTimeInterval: 1.0) + + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: item.id.uuidString, from: self.indexName).wait() + XCTAssertEqual(retrievedItem.source.name, item.name) + } + + func testBulkUpdateWithScript() throws { + var items = [SomeItem]() + for index in 1...10 { + let name: String + if index % 2 == 0 { + name = "Some \(index) Apples" + } else { + name = "Some \(index) Bananas" + } + let item = SomeItem(id: UUID(), name: name) + _ = try client.createDocumentWithID(item, in: self.indexName).wait() + items.append(item) + } + + // This is required for ES to settle and load the indexes to return the right results + Thread.sleep(forTimeInterval: 1.0) + + struct ScriptRequest: Codable { + let script: ScriptBody + } + + struct ScriptBody: Codable { + let inline: String + } + + let scriptBody = ScriptBody(inline: #""inline": "ctx._source.count = ctx._source.count ? ctx._source.count += 1 : 1""#) + let request = ScriptRequest(script: scriptBody) + + let bulkOperation = [ + ESBulkOperation(operationType: .update, index: self.indexName, id: items[0].id.uuidString, document: request), + ] + + let response = try client.bulk(bulkOperation).wait() + XCTAssertEqual(response.items.count, 1) + XCTAssertNotNil(response.items.first?.update) + XCTAssertFalse(response.errors) + + + } + // MARK: - Private private func setupItems() throws { for index in 1...10 { diff --git a/Tests/ElasticsearchNIOClientTests/SomeItem.swift b/Tests/ElasticsearchNIOClientTests/SomeItem.swift index c241757..8049b19 100644 --- a/Tests/ElasticsearchNIOClientTests/SomeItem.swift +++ b/Tests/ElasticsearchNIOClientTests/SomeItem.swift @@ -3,4 +3,11 @@ import Foundation struct SomeItem: Codable, Identifiable { let id: UUID let name: String + let count: Int? + + init(id: UUID, name: String, count: Int? = nil) { + self.id = id + self.name = name + self.count = count + } } From 4bfe2c6d0d0a325a534e4d2e8b7dfc3b0b770700 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 17 May 2021 15:19:39 +0100 Subject: [PATCH 2/5] Get some scripting working --- .../ElasticsearchClient+Requests.swift | 27 ++++++++++++-- .../BulkOperations/BulkUpdateScript.swift | 9 +++++ .../Models/ESBulkOperation.swift | 1 + .../ElasticsearchNIOClientTests.swift | 35 ++++++++++++++++--- 4 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 Sources/ElasticsearchNIOClient/Models/BulkOperations/BulkUpdateScript.swift diff --git a/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift b/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift index 17f4c19..745188d 100644 --- a/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift +++ b/Sources/ElasticsearchNIOClient/ElasticsearchClient+Requests.swift @@ -42,7 +42,7 @@ extension ElasticsearchClient { bodyString.append("\(deleteLineString)\n") case .index: guard let document = operation.document else { - return self.eventLoop.makeFailedFuture(ElasticSearchClientError(message: "No document provided for create bulk operation", status: nil)) + return self.eventLoop.makeFailedFuture(ElasticSearchClientError(message: "No document provided for index bulk operation", status: nil)) } let indexInfo = BulkIndex(index: bulkOperationBody) let indexLine = try self.jsonEncoder.encode(indexInfo) @@ -53,7 +53,7 @@ extension ElasticsearchClient { bodyString.append("\(indexLineString)\n\(dataLineString)\n") case .update: guard let document = operation.document else { - return self.eventLoop.makeFailedFuture(ElasticSearchClientError(message: "No document provided for create bulk operation", status: nil)) + return self.eventLoop.makeFailedFuture(ElasticSearchClientError(message: "No document provided for update bulk operation", status: nil)) } let updateInfo = BulkUpdate(update: bulkOperationBody) let updateLine = try self.jsonEncoder.encode(updateInfo) @@ -62,6 +62,17 @@ extension ElasticsearchClient { throw ElasticSearchClientError(message: "Failed to convert bulk data from Data to String", status: nil) } bodyString.append("\(updateLineString)\n\(dataLineString)\n") + case .updateScript: + guard let document = operation.document else { + return self.eventLoop.makeFailedFuture(ElasticSearchClientError(message: "No script provided for update script bulk operation", status: nil)) + } + let updateInfo = BulkUpdateScript(update: bulkOperationBody) + let updateLine = try self.jsonEncoder.encode(updateInfo) + let dataLine = try self.jsonEncoder.encode(BulkUpdateScriptDocument(script: document)) + guard let updateLineString = String(data: updateLine, encoding: .utf8), let dataLineString = String(data: dataLine, encoding: .utf8) else { + throw ElasticSearchClientError(message: "Failed to convert bulk data from Data to String", status: nil) + } + bodyString.append("\(updateLineString)\n\(dataLineString)\n") } } let body = ByteBuffer(string: bodyString) @@ -109,6 +120,18 @@ extension ElasticsearchClient { } } + public func updateDocumentWithScript(_ script: Script, id: String, in indexName: String) -> EventLoopFuture { + do { + let url = try buildURL(path: "/\(indexName)/_update/\(id)") + let body = try ByteBuffer(data: self.jsonEncoder.encode(script)) + var headers = HTTPHeaders() + headers.add(name: "content-type", value: "application/json") + return sendRequest(url: url, method: .POST, headers: headers, body: body) + } catch { + return self.eventLoop.makeFailedFuture(error) + } + } + public func deleteDocument(id: String, from indexName: String) -> EventLoopFuture { do { let url = try buildURL(path: "/\(indexName)/_doc/\(id)") diff --git a/Sources/ElasticsearchNIOClient/Models/BulkOperations/BulkUpdateScript.swift b/Sources/ElasticsearchNIOClient/Models/BulkOperations/BulkUpdateScript.swift new file mode 100644 index 0000000..0b4b793 --- /dev/null +++ b/Sources/ElasticsearchNIOClient/Models/BulkOperations/BulkUpdateScript.swift @@ -0,0 +1,9 @@ +import Foundation + +struct BulkUpdateScript: Codable { + let update: BulkOperationBody +} + +struct BulkUpdateScriptDocument: Encodable { + let script: Script +} diff --git a/Sources/ElasticsearchNIOClient/Models/ESBulkOperation.swift b/Sources/ElasticsearchNIOClient/Models/ESBulkOperation.swift index 570553f..da9c075 100644 --- a/Sources/ElasticsearchNIOClient/Models/ESBulkOperation.swift +++ b/Sources/ElasticsearchNIOClient/Models/ESBulkOperation.swift @@ -19,4 +19,5 @@ public enum BulkOperationType { case delete case index case update + case updateScript } diff --git a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift index 14e430c..acf8a4a 100644 --- a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift +++ b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift @@ -15,7 +15,8 @@ class ElasticSearchIntegrationTests: XCTestCase { // MARK: - Overrides override func setUpWithError() throws { eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - let logger = Logger(label: "io.brokenhands.swift-soto-elasticsearch.test") + var logger = Logger(label: "io.brokenhands.swift-soto-elasticsearch.test") + logger.logLevel = .trace httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) client = ElasticsearchClient(httpClient: httpClient, eventLoop: eventLoopGroup.next(), logger: logger, scheme: "http", host: "localhost", port: 9200) _ = try client.deleteIndex("_all").wait() @@ -215,7 +216,7 @@ class ElasticSearchIntegrationTests: XCTestCase { } else { name = "Some \(index) Bananas" } - let item = SomeItem(id: UUID(), name: name) + let item = SomeItem(id: UUID(), name: name, count: 0) _ = try client.createDocumentWithID(item, in: self.indexName).wait() items.append(item) } @@ -231,11 +232,11 @@ class ElasticSearchIntegrationTests: XCTestCase { let inline: String } - let scriptBody = ScriptBody(inline: #""inline": "ctx._source.count = ctx._source.count ? ctx._source.count += 1 : 1""#) + let scriptBody = ScriptBody(inline: "ctx._source.count = ctx._source.count += 1") let request = ScriptRequest(script: scriptBody) let bulkOperation = [ - ESBulkOperation(operationType: .update, index: self.indexName, id: items[0].id.uuidString, document: request), + ESBulkOperation(operationType: .updateScript, index: self.indexName, id: items[0].id.uuidString, document: request), ] let response = try client.bulk(bulkOperation).wait() @@ -243,7 +244,33 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertNotNil(response.items.first?.update) XCTAssertFalse(response.errors) + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: items[0].id.uuidString, from: self.indexName).wait() + XCTAssertEqual(retrievedItem.source.count, 1) + } + + func testUpdateWithScript() throws { + let item = SomeItem(id: UUID(), name: "Some Item", count: 0) + _ = try client.createDocumentWithID(item, in: self.indexName).wait() + + // This is required for ES to settle and load the indexes to return the right results + Thread.sleep(forTimeInterval: 1.0) + + struct ScriptRequest: Codable { + let script: ScriptBody + } + + struct ScriptBody: Codable { + let inline: String + } + + let scriptBody = ScriptBody(inline: "ctx._source.count = ctx._source.count += 1") + let request = ScriptRequest(script: scriptBody) + let response = try client.updateDocumentWithScript(request, id: item.id.uuidString, in: self.indexName).wait() + XCTAssertEqual(response.result, "updated") + + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: item.id.uuidString, from: self.indexName).wait() + XCTAssertEqual(retrievedItem.source.count, 1) } // MARK: - Private From ce122a123b79471eccfb8f2003ad2eed689e54ec Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 17 May 2021 15:21:29 +0100 Subject: [PATCH 3/5] Get scripting working for bulk operations --- .../ElasticsearchNIOClientTests.swift | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift index acf8a4a..c07cc59 100644 --- a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift +++ b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift @@ -224,19 +224,14 @@ class ElasticSearchIntegrationTests: XCTestCase { // This is required for ES to settle and load the indexes to return the right results Thread.sleep(forTimeInterval: 1.0) - struct ScriptRequest: Codable { - let script: ScriptBody - } - struct ScriptBody: Codable { let inline: String } let scriptBody = ScriptBody(inline: "ctx._source.count = ctx._source.count += 1") - let request = ScriptRequest(script: scriptBody) let bulkOperation = [ - ESBulkOperation(operationType: .updateScript, index: self.indexName, id: items[0].id.uuidString, document: request), + ESBulkOperation(operationType: .updateScript, index: self.indexName, id: items[0].id.uuidString, document: scriptBody), ] let response = try client.bulk(bulkOperation).wait() From 678069170743635f365d6007b7c4eb4558ab3c7f Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 17 May 2021 15:31:27 +0100 Subject: [PATCH 4/5] More complex examples of scripting --- .../ElasticsearchNIOClientTests.swift | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift index c07cc59..e326504 100644 --- a/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift +++ b/Tests/ElasticsearchNIOClientTests/ElasticsearchNIOClientTests.swift @@ -15,8 +15,7 @@ class ElasticSearchIntegrationTests: XCTestCase { // MARK: - Overrides override func setUpWithError() throws { eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - var logger = Logger(label: "io.brokenhands.swift-soto-elasticsearch.test") - logger.logLevel = .trace + let logger = Logger(label: "io.brokenhands.swift-soto-elasticsearch.test") httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) client = ElasticsearchClient(httpClient: httpClient, eventLoop: eventLoopGroup.next(), logger: logger, scheme: "http", host: "localhost", port: 9200) _ = try client.deleteIndex("_all").wait() @@ -268,6 +267,67 @@ class ElasticSearchIntegrationTests: XCTestCase { XCTAssertEqual(retrievedItem.source.count, 1) } + func testUpdateWithNonExistentFieldScript() throws { + let item = SomeItem(id: UUID(), name: "Some Item") + _ = try client.createDocumentWithID(item, in: self.indexName).wait() + + // This is required for ES to settle and load the indexes to return the right results + Thread.sleep(forTimeInterval: 1.0) + + struct ScriptRequest: Codable { + let script: ScriptBody + } + + struct ScriptBody: Codable { + let inline: String + } + + let scriptBody = ScriptBody(inline: "if(ctx._source.containsKey('count')) { ctx._source.count += 1 } else { ctx._source.count = 1 }") + let request = ScriptRequest(script: scriptBody) + + let response = try client.updateDocumentWithScript(request, id: item.id.uuidString, in: self.indexName).wait() + XCTAssertEqual(response.result, "updated") + + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: item.id.uuidString, from: self.indexName).wait() + XCTAssertEqual(retrievedItem.source.count, 1) + } + + func testBulkUpdateWithNonExistentFieldScript() throws { + var items = [SomeItem]() + for index in 1...10 { + let name: String + if index % 2 == 0 { + name = "Some \(index) Apples" + } else { + name = "Some \(index) Bananas" + } + let item = SomeItem(id: UUID(), name: name) + _ = try client.createDocumentWithID(item, in: self.indexName).wait() + items.append(item) + } + + // This is required for ES to settle and load the indexes to return the right results + Thread.sleep(forTimeInterval: 1.0) + + struct ScriptBody: Codable { + let inline: String + } + + let scriptBody = ScriptBody(inline: "if(ctx._source.containsKey('count')) { ctx._source.count += 1 } else { ctx._source.count = 1 }") + + let bulkOperation = [ + ESBulkOperation(operationType: .updateScript, index: self.indexName, id: items[0].id.uuidString, document: scriptBody), + ] + + let response = try client.bulk(bulkOperation).wait() + XCTAssertEqual(response.items.count, 1) + XCTAssertNotNil(response.items.first?.update) + XCTAssertFalse(response.errors) + + let retrievedItem: ESGetSingleDocumentResponse = try client.get(id: items[0].id.uuidString, from: self.indexName).wait() + XCTAssertEqual(retrievedItem.source.count, 1) + } + // MARK: - Private private func setupItems() throws { for index in 1...10 { From 37d135db8f3ca6dc75b7130798a33905b8da12a7 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 17 May 2021 15:35:55 +0100 Subject: [PATCH 5/5] Add new APIs to README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 359dc31..38b6688 100644 --- a/README.md +++ b/README.md @@ -37,9 +37,11 @@ Currently the library supports: * Document delete * Document search * Document count +* Document retrieve * Bulk create/update/delete/index * Index delete * Index exists +* Scripting If you'd like to add extra functionality, either [open an issue](https://github.com/brokenhandsio/elasticsearch-nio-client/issues/new) and raise a PR. Any contributions are gratefully accepted!