diff --git a/build.sbt b/build.sbt index 221110e..7d4c9ca 100644 --- a/build.sbt +++ b/build.sbt @@ -94,7 +94,9 @@ lazy val publishSettings = Seq( versionScheme := Some("early-semver"), publish / skip := false, publishMavenStyle := true, - publishTo := Some("GitHub raw-labs Apache Maven Packages" at "https://maven.pkg.github.com/raw-labs/das-server-scala"), + publishTo := Some( + "GitHub raw-labs Apache Maven Packages" at "https://maven.pkg.github.com/raw-labs/das-server-scala" + ), publishConfiguration := publishConfiguration.value.withOverwrite(isCI) ) diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index f5db97d..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - %d{yyyy-MM-dd HH:mm:ss.SSSZ} [%thread] %-5level %logger{36} %X{taskId}- %msg%n%xEx{30} - - - - - - - - - - - - diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..e69de29 diff --git a/src/main/scala/com/rawlabs/das/server/Cache.scala b/src/main/scala/com/rawlabs/das/server/Cache.scala deleted file mode 100644 index 48099fc..0000000 --- a/src/main/scala/com/rawlabs/das/server/Cache.scala +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Copyright 2024 RAW Labs S.A. - * - * Use of this software is governed by the Business Source License - * included in the file licenses/BSL.txt. - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0, included in the file - * licenses/APL.txt. - */ - -package com.rawlabs.das.server - -import com.rawlabs.protocol.das.Rows -import com.rawlabs.utils.core.RawService - -import java.io.Closeable -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{CancellationException, ConcurrentHashMap, Executors, TimeUnit} -import java.util.concurrent.locks.{Condition, ReentrantLock} -import scala.collection.mutable.ListBuffer -import scala.concurrent.{Future, Promise} -import scala.util.Try - -/** - * A cache service that stores rows of data, allowing concurrent access by multiple readers. - * The service uses a bounded fullHistory for each cache entry and handles synchronization - * between writers and readers using locks and condition variables. - */ -class Cache extends RawService { - - // Map to store cache entries, each consisting of a fullHistory, lock, and two conditions for synchronization - private val cacheMap = new ConcurrentHashMap[String, (ListBuffer[Rows], ReentrantLock, Condition, Condition)]() - - // Map to track the completion status of cache writing tasks - private val completionMap = new ConcurrentHashMap[String, Promise[Unit]]() - - // Map to track the number of active readers for each cache entry - private val readerCountMap = new ConcurrentHashMap[String, AtomicInteger]() - - // Map to track the most advanced reader's position - private val mostAdvancedReaderMap = new ConcurrentHashMap[String, AtomicInteger]() - - // Map to store the last access time for each cache entry, used for eviction purposes - private val accessTimeMap = new ConcurrentHashMap[String, Long]() - - // Executor service to handle asynchronous tasks for writing data to the cache - private val executor = Executors.newCachedThreadPool() - - // Maximum number of cache entries allowed in the cacheMap before evicting old entries - private val maxCacheEntries = 100 - - // Maximum difference between the writer and the most advanced reader before pausing writes - private val maxAllowedDifference = 100 - - // Timeout in milliseconds for a writer to wait for new readers before self-terminating - private val writerTimeoutMillis = 30000L - - /** - * Writes data to the cache if the specified cache entry does not already exist. - * - * @param cacheId The unique identifier for the cache entry. - * @param task An iterator over rows to be written to the cache, which will be closed after use. - * @return A Future that completes when the task is done or fails if an error occurs. - */ - def writeIfNotExists(cacheId: String, task: => Iterator[Rows] with Closeable): Future[Unit] = { - logger.debug("writeIfNotExists called for cacheId {}", cacheId) - - // Check if the number of cache entries exceeds the maximum allowed - if (cacheMap.size() > maxCacheEntries) { - // Sort cache entries by their last access time to find the least recently used entry - logger.info("Cache size exceeded maxCacheEntries. Attempting to evict old entries.") - val sortedCacheIds = accessTimeMap - .entrySet() - .toArray(Array.empty[java.util.Map.Entry[String, Long]]) - .sortBy(_.getValue) - .map(_.getKey) - - var done = false - // Attempt to evict the least recently used cache entry with no active readers - for (cacheId <- sortedCacheIds if !done) { - done = cancelIfNoReaders(cacheId) - } - } - - // Create a new cache entry if it does not already exist - cacheMap.computeIfAbsent( - cacheId, - _ => { - logger.debug("Creating new cache entry for id {}", cacheId) - val lock = new ReentrantLock() - val notFull = lock.newCondition() // Condition to signal that the fullHistory is not full - val notEmpty = lock.newCondition() // Condition to signal that the fullHistory is not empty - val fullHistory = ListBuffer.empty[Rows] // Buffer to hold rows of data - - val promise = Promise[Unit]() // Promise to track the completion status of the task - completionMap.put(cacheId, promise) - mostAdvancedReaderMap.put(cacheId, new AtomicInteger(0)) - - // Update the access time whenever a new cache entry is created - accessTimeMap.put(cacheId, System.currentTimeMillis()) - logger.debug("Cache entry created for id {}. Submitting task to executor.", cacheId) - - // Submit a task to the executor to handle writing data to the cache - executor.submit(new Runnable { - override def run(): Unit = { - try { - val taskIterator = task - try { - // Iterate over the rows provided by the task and write them to the fullHistory - for (chunk <- taskIterator) { - lock.lock() - try { - var mostAdvancedReader = mostAdvancedReaderMap.getOrDefault(cacheId, new AtomicInteger(0)).get() - - // If the fullHistory is full, wait until there is space available or timeout - while ((fullHistory.size - mostAdvancedReader) >= maxAllowedDifference) { - logger.debug( - "Writer for cacheId {} waiting for readers to catch up. Most advanced reader: {}", - cacheId, - mostAdvancedReader - ) - val awaitSuccess = notFull.await(writerTimeoutMillis, TimeUnit.MILLISECONDS) - // Check if the writer should self-terminate due to inactivity (no readers) - if (!awaitSuccess) { - val readerCount = readerCountMap.getOrDefault(cacheId, new AtomicInteger(0)).get() - if (readerCount == 0) { - throw new CancellationException(s"Writer self-terminated due to inactivity for id $cacheId") - } else { - logger.debug( - "Writer timed out waiting for readers for id {} but active readers. Reader count: {}", - cacheId, - readerCount - ) - } - } - mostAdvancedReader = mostAdvancedReaderMap.getOrDefault(cacheId, new AtomicInteger(0)).get() - } - fullHistory.append(chunk) // Add the chunk of data to the fullHistory - notEmpty.signalAll() // Notify readers that new data is available - logger.debug( - "Data chunk added to fullHistory for cacheId {}. Buffer size now {}", - cacheId, - fullHistory.size - ) - } finally { - lock.unlock() - } - } - } finally { - // Ensure the task is closed when done or in case of an exception - logger.debug("Closing task iterator for cacheId {}", cacheId) - Try(taskIterator.close()).failed.foreach(_ => ()) // Ignore any exceptions from close - } - } catch { - case ex: Exception => - logger.error("Exception occurred while writing to cacheId {}: {}", cacheId, ex.getMessage) - promise.tryFailure(ex) // If an exception occurs, fail the promise - lock.lock() - try { - notEmpty.signalAll() // Notify all readers that the task has failed - } finally { - lock.unlock() - } - } finally { - lock.lock() - try { - if (!promise.isCompleted) { - logger.debug("Marking task as completed for cacheId {}", cacheId) - promise.trySuccess(()) // Mark the task as completed if not already done - } - notEmpty.signalAll() // Notify all readers that the task is complete - } finally { - lock.unlock() - } - } - } - }) - - (fullHistory, lock, notFull, notEmpty) // Return the new cache entry - } - ) - - // Return a future that completes when the task is done - logger.debug("Returning future for cacheId {}", cacheId) - completionMap.get(cacheId).future - } - - /** - * Reads data from the cache for the specified cache entry. - * - * @param cacheId The unique identifier for the cache entry to read from. - * @return An iterator over the rows stored in the cache with closeable functionality. - */ - def read(cacheId: String): Iterator[Rows] with Closeable = { - logger.debug("read called for cacheId {}", cacheId) - - // Retrieve the cache entry for the specified ID or throw an exception if it doesn't exist - val (fullHistory, lock, notFull, notEmpty) = Option(cacheMap.get(cacheId)) - .getOrElse { - logger.warn("No cache found for id {}", cacheId) - throw new NoSuchElementException(s"No cache found for id $cacheId") - } - - // Retrieve the promise tracking the completion status of the cache writing task - val promise = Option(completionMap.get(cacheId)) - .getOrElse { - logger.warn("No task found for id {}", cacheId) - throw new NoSuchElementException(s"No task found for id $cacheId") - } - - // Increment the reader count for the cache entry - val readerCount = readerCountMap.computeIfAbsent(cacheId, _ => new AtomicInteger(0)) - readerCount.incrementAndGet() - logger.debug("Reader count incremented for cacheId {}. Current reader count: {}", cacheId, readerCount.get()) - - // Update the access time whenever the cache entry is read - accessTimeMap.put(cacheId, System.currentTimeMillis()) - - val mostAdvancedReader = mostAdvancedReaderMap.getOrDefault(cacheId, new AtomicInteger(0)) - - new Iterator[Rows] with Closeable { - private var currentIndex = 0 - private var finished = false - private var closed = false - - /** - * Checks if more data is available in the cache. - * - * @return True if more data is available, false otherwise. - */ - override def hasNext: Boolean = { - if (closed) throw new IllegalStateException("Iterator has been closed") - - lock.lock() // Acquire the lock - try { - // Wait for data to be available if the fullHistory is empty and the task is not finished - while (currentIndex >= fullHistory.size && !finished) { - if (promise.isCompleted) { - finished = true - logger.debug("Task completed for cacheId {}. No more data to read.", cacheId) - } else { - logger.debug("Buffer empty for cacheId {}. Waiting for data.", cacheId) - notEmpty.await() // Wait for data to be added to the fullHistory - } - } - - // If the task is complete and an exception occurred, rethrow it - if (promise.isCompleted && currentIndex >= fullHistory.size && promise.future.value.exists(_.isFailure)) { - val failure = promise.future.value.get.failed.get - logger.error("Task failed for cacheId {}: {}", cacheId, failure.getMessage) - throw failure // Rethrow the exception from the task - } - - currentIndex < fullHistory.size // Check if there is more data in the fullHistory - } finally { - lock.unlock() // Release the lock after checking - } - } - - /** - * Retrieves the next row from the cache. - * - * @return The next row of data. - * @throws NoSuchElementException if there are no more elements. - */ - override def next(): Rows = { - if (!hasNext) throw new NoSuchElementException("No more elements") - if (closed) throw new IllegalStateException("Iterator has been closed") - - lock.lock() // Acquire the lock before accessing the fullHistory - try { - val result = fullHistory(currentIndex) - currentIndex += 1 - logger.debug("Returning next element for cacheId {}. Current index: {}", cacheId, currentIndex) - // Notify the writer if this reader is the most advanced reader, as it can now write more data - if (mostAdvancedReader.updateAndGet(oldPosition => math.max(oldPosition, currentIndex)) == currentIndex) { - notFull.signalAll() - } - result - } finally { - lock.unlock() // Release the lock after processing - } - } - - /** - * Closes the iterator, releasing resources and decrementing the reader count. - */ - override def close(): Unit = { - if (!closed) { - logger.debug("Closing iterator for cacheId {}", cacheId) - lock.lock() - try { - if (!closed) { - closed = true - readerCount.decrementAndGet() - logger.debug( - "Decrementing reader count for cacheId {}. Current reader count: {}", - cacheId, - readerCount.get() - ) - // Notify the writer if this reader is the most advanced reader, as it can now write more data - if (mostAdvancedReader.updateAndGet(oldPosition => math.max(oldPosition, currentIndex)) == currentIndex) { - notFull.signalAll() - } - } - } finally { - lock.unlock() - } - } - } - } - } - - /** - * Cancels and removes a cache entry if there are no active readers. - * - * @param cacheId The unique identifier for the cache entry to cancel. - * @return True if the entry was cancelled, false otherwise. - */ - private def cancelIfNoReaders(cacheId: String): Boolean = { - logger.debug("Attempting to cancel cacheId {} if no readers exist.", cacheId) - - // Get the number of active readers for the cache entry - val readerCount = readerCountMap.getOrDefault(cacheId, new AtomicInteger(0)) - - // If there are no active readers, proceed to cancel the cache entry - if (readerCount.get() == 0) { - logger.debug("No readers found for cacheId {}. Cancelling cache entry.", cacheId) - - // Try to fail the associated promise, indicating that the task was cancelled due to no readers - Option(completionMap.get(cacheId)).foreach(_.tryFailure(new CancellationException("No readers left"))) - - // Remove the cache entry and related metadata from all maps - cacheMap.remove(cacheId) - readerCountMap.remove(cacheId) - completionMap.remove(cacheId) - accessTimeMap.remove(cacheId) // Also remove the entry from the access time map - - logger.info("Cache entry {} successfully cancelled and removed.", cacheId) - true // Return true indicating that the entry was cancelled - } else { - logger.debug("Active readers found for cacheId {}. Cache entry not cancelled.", cacheId) - false // Return false if there are still active readers - } - } - - /** - * Stops the cache service by shutting down the executor. - * This method is typically called when the service is being stopped or terminated. - */ - override def doStop(): Unit = { - // Shut down the executor to stop accepting new tasks and terminate existing ones - logger.info("Stopping Cache service. Shutting down executor.") - executor.shutdown() - if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { - logger.warn("Executor did not terminate in the allotted time. Forcing shutdown.") - executor.shutdownNow() - } else { - logger.info("Executor successfully terminated.") - } - } -} diff --git a/src/main/scala/com/rawlabs/das/server/DASChunksCache.scala b/src/main/scala/com/rawlabs/das/server/DASChunksCache.scala new file mode 100644 index 0000000..9e37a92 --- /dev/null +++ b/src/main/scala/com/rawlabs/das/server/DASChunksCache.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2024 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.server + +import com.google.common.cache.{Cache, CacheBuilder} +import com.rawlabs.protocol.das.Rows +import com.rawlabs.protocol.das.services.ExecuteRequest +import com.typesafe.scalalogging.StrictLogging + +import scala.collection.mutable + +object DASChunksCache extends StrictLogging { + // Maximum number of entries cache + private val N = 1000 + + // Initialize the cache with a LRU eviction policy + private val cache: Cache[String, mutable.Buffer[Rows]] = CacheBuilder + .newBuilder() + .maximumSize(N) + .build() + + def put(request: ExecuteRequest, all: mutable.Buffer[Rows]): Unit = { + logger.debug(s"Putting request in cache: $request") + cache.put(request.toString, all) + } + + def get(request: ExecuteRequest): Option[mutable.Buffer[Rows]] = { + logger.debug(s"Getting request from cache: $request") + val r = Option(cache.getIfPresent(request.toString)) + logger.debug(s"Cache hit: ${r.isDefined}") + r + } +} diff --git a/src/main/scala/com/rawlabs/das/server/DASResultCache.scala b/src/main/scala/com/rawlabs/das/server/DASResultCache.scala new file mode 100644 index 0000000..f139c64 --- /dev/null +++ b/src/main/scala/com/rawlabs/das/server/DASResultCache.scala @@ -0,0 +1,288 @@ +/* + * Copyright 2024 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.server + +import com.rawlabs.protocol.das.Rows +import com.rawlabs.utils.core.RawUtils +import com.typesafe.scalalogging.StrictLogging + +import java.io.Closeable +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock +import scala.collection.concurrent.TrieMap +import scala.collection.mutable + +// TODO (msb): This is work-in-progress and not concluded. +// What is missing: +// - no limit to the number of cache entries, i.e. no eviction policy +// - potentially exhausting the worker pool if too many cache entries are created +// - potential memory exhaustion if the slowest reader has a very large lag to the producer +class DASResultCache { + + private val workerPool = Executors.newFixedThreadPool(10, RawUtils.newThreadFactory("cache")) + + private val cacheMap = mutable.HashMap[String, CacheEntry]() + private val cacheMapLock = new Object + + def writeIfNotExists(cacheId: String, task: => Iterator[Rows] with Closeable): Unit = { + cacheMapLock.synchronized { + cacheMap.get(cacheId) match { + case Some(_) => // Do nothing if cache already exists + case None => + val cache = new CacheEntry(cacheId, task) + cacheMap.put(cacheId, cache) + workerPool.submit(new cache.WriterTask) + } + } + } + + def read(cacheId: String): Iterator[Rows] with Closeable = { + cacheMapLock.synchronized { + cacheMap.get(cacheId) match { + case Some(cache) => cache.read() + case None => throw new AssertionError(s"Cache with id $cacheId does not exist") + } + } + } + +} + +class CacheEntry(cacheId: String, task: Iterator[Rows] with Closeable) extends StrictLogging { + + private val maxBufferAhead = 10 + + private val cache = mutable.ArrayBuffer[Rows]() + + // Index to track the producer's position + @volatile private var producerIndex: Long = 0 + + // Lock and Condition to protect reader positions and to signal producer + private val readerLock = new ReentrantLock() + private val readersUpdated = readerLock.newCondition() + + // Map to store reader positions (consumer name -> position) + private val readerPositions = TrieMap[Int, Int]().withDefaultValue(0) + + private val readerCounter = new AtomicInteger(0) + + @volatile private var exception: Option[Throwable] = None + + class WriterTask extends Runnable { + override def run(): Unit = { + logger.trace(s"WriterTask started for cacheId $cacheId") + try { + while (true) { + readerLock.lock() + try { + var fastestReaderPosition = getFastestReaderPosition + + logger.trace( + s"Writer checking buffer ahead for cacheId $cacheId: producerIndex=$producerIndex, " + + s"fastestReaderPosition=$fastestReaderPosition, " + + s"bufferAhead=${producerIndex - fastestReaderPosition}" + ) + + while (producerIndex - fastestReaderPosition >= maxBufferAhead) { + logger.trace(s"Writer waiting for readers to catch up for cacheId $cacheId") + readersUpdated.await() + fastestReaderPosition = getFastestReaderPosition + logger.trace( + s"Writer woke up for cacheId $cacheId: producerIndex=$producerIndex, " + + s"fastestReaderPosition=$fastestReaderPosition, " + + s"bufferAhead=${producerIndex - fastestReaderPosition}" + ) + } + } finally { + readerLock.unlock() + } + + // Produce more data + try { + if (task.hasNext) { + val rows = task.next() + readerLock.lock() + try { + cache.append(rows) + producerIndex += 1 + logger.trace(s"Writer appended data for cacheId $cacheId: producerIndex=$producerIndex") + readersUpdated.signalAll() + logger.trace(s"Writer signaled readers after appending data for cacheId $cacheId") + } finally { + readerLock.unlock() + } + } else { + task.close() + readerLock.lock() + try { + cache.append(null) + logger.trace(s"Writer reached end of data for cacheId $cacheId, appended null to cache") + readersUpdated.signalAll() + logger.trace(s"Writer signaled readers after appending end marker for cacheId $cacheId") + } finally { + readerLock.unlock() + } + logger.trace(s"WriterTask exiting for cacheId $cacheId") + return + } + } catch { + case e: Exception => + readerLock.lock() + try { + exception = Some(e) + logger.error(s"Writer encountered exception for cacheId $cacheId: ${e.getMessage}", e) + readersUpdated.signalAll() + logger.trace(s"Writer signaled readers after exception for cacheId $cacheId") + } finally { + readerLock.unlock() + } + return + } + } + } catch { + case e: Exception => + readerLock.lock() + try { + exception = Some(e) + logger.error(s"Writer encountered exception outside loop for cacheId $cacheId: ${e.getMessage}", e) + readersUpdated.signalAll() + logger.trace(s"Writer signaled readers after exception outside loop for cacheId $cacheId") + } finally { + readerLock.unlock() + } + } + } + } + + def read(): Iterator[Rows] with Closeable = { + new Iterator[Rows] with Closeable { + private val readerId = readerCounter.incrementAndGet() + private var readerPosition: Int = 0 + private var lastFetchedData: Rows = _ + + // Register reader's position when iterator is created + readerLock.lock() + try { + readerPositions.update(readerId, readerPosition) + logger.trace( + s"Reader $readerId created for cacheId $cacheId: initial readerPosition=$readerPosition, " + + s"current readerPositions=${readerPositions.toMap}" + ) + } finally { + readerLock.unlock() + } + + override def hasNext: Boolean = { + readerLock.lock() + try { + logger.trace( + s"Reader $readerId checking hasNext for cacheId $cacheId: readerPosition=$readerPosition, " + + s"cacheSize=${cache.size}, exception=$exception" + ) + + // Wait until data is available at the current position + while (cache.size <= readerPosition && exception.isEmpty) { + logger.trace(s"Reader $readerId waiting for data for cacheId $cacheId") + readersUpdated.await() // Wait until producer adds data + logger.trace(s"Reader $readerId woke up for cacheId $cacheId") + } + + if (exception.nonEmpty) { + // Producer has failed + logger.error(s"Reader $readerId detected exception for cacheId $cacheId: ${exception.get.getMessage}") + throw exception.get + } + + if (cache.size <= readerPosition) { + // Producer has stopped and cache has no more data + logger.trace(s"Reader $readerId has no more data for cacheId $cacheId") + return false + } + + lastFetchedData = cache(readerPosition) + val hasMore = lastFetchedData != null // Return true if data is available + logger.trace(s"Reader $readerId hasNext result for cacheId $cacheId: $hasMore") + hasMore + } finally { + readerLock.unlock() + } + } + + override def next(): Rows = { + if (!hasNext) throw new NoSuchElementException("No more elements in the queue") + if (lastFetchedData == null) throw new NoSuchElementException("End of data reached") + + // Update the consumer's position and return the fetched data + readerPosition += 1 + updateReaderPosition(readerId, readerPosition) + logger.trace( + s"Reader $readerId advanced to readerPosition=$readerPosition for cacheId $cacheId, " + + s"current readerPositions=${readerPositions.toMap}" + ) + lastFetchedData + } + + override def close(): Unit = { + readerLock.lock() + try { + readerPositions.remove(readerId) + logger.trace( + s"Reader $readerId closed for cacheId $cacheId, removed from readerPositions, " + + s"remaining readers=${readerPositions.keys}" + ) + readersUpdated.signalAll() // Signal producer that a consumer has stopped + logger.trace(s"Reader $readerId signaled producer after closing for cacheId $cacheId") + } finally { + readerLock.unlock() + } + } + } + } + + // Helper method to update a reader's position and signal the producer + private def updateReaderPosition(readerId: Int, position: Int): Unit = { + readerLock.lock() + try { + readerPositions.update(readerId, position) + logger.trace( + s"Reader $readerId updated position to $position for cacheId $cacheId, " + + s"current readerPositions=${readerPositions.toMap}" + ) + readersUpdated.signalAll() // Signal producer that a consumer has advanced + logger.trace(s"Reader $readerId signaled producer after updating position for cacheId $cacheId") + } finally { + readerLock.unlock() + } + } + + // Helper method to get the fastest reader's position + private def getFastestReaderPosition: Int = { + readerLock.lock() + try { + val fastestPosition = + if (readerPositions.nonEmpty) { + readerPositions.values.max + } else { + producerIndex.toInt // Return producerIndex when no readers are present + } + logger.trace( + s"Computed fastestReaderPosition=$fastestPosition for cacheId $cacheId, " + + s"current readerPositions=${readerPositions.toMap}" + ) + fastestPosition + } finally { + readerLock.unlock() + } + } + +} diff --git a/src/main/scala/com/rawlabs/das/server/DASServer.scala b/src/main/scala/com/rawlabs/das/server/DASServer.scala index b1cd4cc..f4bfd3b 100644 --- a/src/main/scala/com/rawlabs/das/server/DASServer.scala +++ b/src/main/scala/com/rawlabs/das/server/DASServer.scala @@ -21,7 +21,7 @@ class DASServer(implicit settings: RawSettings) { private[this] var server: Server = _ private val dasSdkManager = new DASSdkManager - private val cache = new Cache() + private val cache = new DASResultCache() private val registrationService = RegistrationServiceGrpc.bindService(new RegistrationServiceGrpcImpl(dasSdkManager)) private val tablesService = TablesServiceGrpc.bindService(new TableServiceGrpcImpl(dasSdkManager, cache)) diff --git a/src/main/scala/com/rawlabs/das/server/TableServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/TableServiceGrpcImpl.scala index 790e625..b7a6212 100644 --- a/src/main/scala/com/rawlabs/das/server/TableServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/TableServiceGrpcImpl.scala @@ -28,7 +28,7 @@ import scala.collection.mutable * @param provider Provides access to DAS (Data Access Service) instances. * @param cache Cache for storing query results. */ -class TableServiceGrpcImpl(provider: DASSdkManager, cache: Cache) +class TableServiceGrpcImpl(provider: DASSdkManager, cache: DASResultCache) extends TablesServiceGrpc.TablesServiceImplBase with StrictLogging { @@ -166,78 +166,60 @@ class TableServiceGrpcImpl(provider: DASSdkManager, cache: Cache) .getDAS(request.getDasId) .getTable(request.getTableId.getName) match { case Some(table) => - logger.debug(s"Executing query for Table ID: ${request.getTableId.getName}, Plan ID: ${request.getPlanId}") - val result = table.execute( - request.getQualsList.asScala, - request.getColumnsList.asScala, - if (request.hasSortKeys) Some(request.getSortKeys.getSortKeysList.asScala) else None, - if (request.hasLimit) Some(request.getLimit) else None - ) - val context = Context.current() - val MAX_CHUNK_SIZE = 100 - logger.debug( - s"Creating iterator (chunk size $MAX_CHUNK_SIZE rows) for query execution for Table ID: ${request.getTableId.getName}, Plan ID: ${request.getPlanId}" - ) + def task(): Iterator[Rows] with Closeable = { + logger.debug(s"Executing query for Table ID: ${request.getTableId.getName}, Plan ID: ${request.getPlanId}") + val result = table.execute( + request.getQualsList.asScala, + request.getColumnsList.asScala, + if (request.hasSortKeys) Some(request.getSortKeys.getSortKeysList.asScala) else None, + if (request.hasLimit) Some(request.getLimit) else None + ) + + val MAX_CHUNK_SIZE = 100 + logger.debug( + s"Creating iterator (chunk size $MAX_CHUNK_SIZE rows) for query execution for Table ID: ${request.getTableId.getName}, Plan ID: ${request.getPlanId}" + ) + // Wrap the result processing logic in the iterator + new ChunksIterator(request, result, MAX_CHUNK_SIZE) + } + // Wrap the result processing logic in the iterator - val it = new ChunksIterator(result, MAX_CHUNK_SIZE) + val it = { + DASChunksCache.get(request) match { + case Some(cachedChunks) => + logger.debug(s"Using cached chunks for Table ID: ${request.getTableId.getName}") + val cachedChunksIterator = cachedChunks.iterator + new Iterator[Rows] with Closeable { + override def hasNext: Boolean = cachedChunksIterator.hasNext + + override def next(): Rows = cachedChunksIterator.next() + + override def close(): Unit = {} + } + case None => + logger.debug(s"Cache miss for Table ID: ${request.getTableId.getName}") + task() + } + } + + val context = Context.current() try { it.foreach { rows => if (context.isCancelled) { logger.warn("Context cancelled during query execution. Closing reader.") - it.close() return } responseObserver.onNext(rows) } - responseObserver.onCompleted() logger.debug("Query execution completed successfully.") + responseObserver.onCompleted() } catch { case ex: Exception => logger.error("Error occurred during query execution.", ex) - it.close() responseObserver.onError(ex) + } finally { + it.close() } -// assert(request.hasPlanId, "Plan ID is required for caching query results.") -// -// def task(): Iterator[Rows] with Closeable = { -// logger.debug(s"Executing query for Table ID: ${request.getTableId.getName}, Plan ID: ${request.getPlanId}") -// val result = table.execute( -// request.getQualsList.asScala, -// request.getColumnsList.asScala, -// if (request.hasSortKeys) Some(request.getSortKeys.getSortKeysList.asScala) else None, -// if (request.hasLimit) Some(request.getLimit) else None -// ) -// -// val MAX_CHUNK_SIZE = 100 -// logger.debug( -// s"Creating iterator (chunk size $MAX_CHUNK_SIZE rows) for query execution for Table ID: ${request.getTableId.getName}, Plan ID: ${request.getPlanId}" -// ) -// // Wrap the result processing logic in the iterator -// new ChunksIterator(result, MAX_CHUNK_SIZE) -// } -// -// val cacheKey = s"${request.getTableId.getName}:${request.getPlanId}" -// logger.debug(s"Storing result in cache with key: $cacheKey") -// cache.writeIfNotExists(cacheKey, task()) -// -// val it = cache.read(cacheKey) -// try { -// it.foreach { rows => -// if (context.isCancelled) { -// logger.warn("Context cancelled during query execution. Closing reader.") -// it.close() -// return -// } -// responseObserver.onNext(rows) -// } -// responseObserver.onCompleted() -// logger.debug("Query execution completed successfully.") -// } catch { -// case ex: Exception => -// logger.error("Error occurred during query execution.", ex) -// it.close() -// responseObserver.onError(ex) -// } case None => logger.error(s"Table ${request.getTableId.getName} not found.") responseObserver.onError(new RuntimeException(s"Table ${request.getTableId.getName} not found")) @@ -388,24 +370,41 @@ class TableServiceGrpcImpl(provider: DASSdkManager, cache: Cache) /** * Iterator implementation for processing query results in chunks. * + * @param request The request containing query details (used to build a cache key). * @param resultIterator The iterator of query results. * @param maxChunkSize The maximum size of each chunk. */ class ChunksIterator( + request: ExecuteRequest, resultIterator: Iterator[Row] with Closeable, maxChunkSize: Int ) extends Iterator[Rows] with Closeable with StrictLogging { - private val currentChunk: mutable.Buffer[Row] = mutable.Buffer.empty + logger.debug(s"Initializing ChunksIterator with maxChunkSize: $maxChunkSize, request: $request") + + private val currentChunk = mutable.Buffer[Row]() + private val maxChunksToCache = 5 + private var chunkCounter = 0 + private var eofReached = false + + private val completeChunkCache = mutable.Buffer[Rows]() /** * Checks if there are more rows to process. * * @return true if there are more rows, false otherwise. */ - override def hasNext: Boolean = resultIterator.hasNext || currentChunk.nonEmpty + override def hasNext: Boolean = { + val hasNext = resultIterator.hasNext || currentChunk.nonEmpty + logger.debug(s"hasNext() called. hasNext: $hasNext") + if (!hasNext) { + eofReached = true + logger.debug("EOF reached in hasNext()") + } + hasNext + } /** * Retrieves the next chunk of rows. @@ -413,16 +412,51 @@ class ChunksIterator( * @return The next chunk of rows. */ override def next(): Rows = { - if (!hasNext) throw new NoSuchElementException("No more elements") + if (!hasNext) { + logger.debug("No more elements in next()") + throw new NoSuchElementException("No more elements") + } - currentChunk.clear() + logger.debug(s"Fetching next chunk. Chunk counter: $chunkCounter") + + val nextChunk = getNextChunk() + logger.debug(s"Next chunk fetched with ${nextChunk.getRowsCount} rows") + + // Cache the chunks up to a certain limit + if (chunkCounter < maxChunksToCache) { + // Append the chunk to the cache + completeChunkCache.append(nextChunk) + logger.debug(s"Appended chunk to cache. Cache size: ${completeChunkCache.size}") + + // If we reached the end of the result set (or this is the last chunk, since it's not complete), + // cache the complete chunks read thus far for future use + if (eofReached || nextChunk.getRowsCount < maxChunkSize) { + logger.debug("Reached end of result set or last incomplete chunk. Caching complete chunks.") + DASChunksCache.put(request, completeChunkCache) + logger.debug("Chunks cached successfully.") + } + } else if (chunkCounter == maxChunksToCache) { + // We bail out of trying to cache chunks because it's getting too big + completeChunkCache.clear() + logger.debug("Reached maxChunksToCache limit. Cleared completeChunkCache.") + } + + chunkCounter += 1 + logger.debug(s"Incremented chunk counter to $chunkCounter") + nextChunk + } + + // Builds a chunk of rows by reading from the result iterator. + private def getNextChunk(): Rows = { + currentChunk.clear() + logger.debug("Cleared currentChunk") while (resultIterator.hasNext && currentChunk.size < maxChunkSize) { currentChunk += resultIterator.next() } - - // Return the current chunk - Rows.newBuilder().addAllRows(currentChunk.asJava).build() + val rows = Rows.newBuilder().addAllRows(currentChunk.asJava).build() + logger.debug(s"Built next chunk with ${currentChunk.size} rows") + rows } /** @@ -430,6 +464,7 @@ class ChunksIterator( */ override def close(): Unit = { resultIterator.close() + logger.debug(s"Closed resultIterator for $request") } } diff --git a/src/test/scala/com/rawlabs/das/server/DASResultCacheTest.scala b/src/test/scala/com/rawlabs/das/server/DASResultCacheTest.scala new file mode 100644 index 0000000..2b69e9e --- /dev/null +++ b/src/test/scala/com/rawlabs/das/server/DASResultCacheTest.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2024 RAW Labs S.A. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0, included in the file + * licenses/APL.txt. + */ + +package com.rawlabs.das.server + +import com.rawlabs.protocol.das.{Row, Rows} +import com.rawlabs.protocol.raw.{Value, ValueInt} + +import java.io.Closeable +import scala.collection.JavaConverters._ + +object DASResultCacheTest extends App { + + class TestIterator extends Iterator[Rows] with Closeable { + var i = 0 + override def hasNext: Boolean = i < 10 + override def next(): Rows = { + i += 1 + Rows + .newBuilder() + .addRows( + Row + .newBuilder() + .putAllData(Map("col1" -> Value.newBuilder().setInt(ValueInt.newBuilder().setV(i)).build()).asJava) + ) + .build() + } + override def close(): Unit = println("Closed") + } + + println("CacheTest") + + val cache = new DASResultCache() + cache.writeIfNotExists("1", new TestIterator) + + val reader1 = cache.read("1") + val reader2 = cache.read("1") + + // Read data from both readers + new Thread(() => { + var i = 0 + reader1.foreach { v => + i += 1 + println("Thread 1 data: " + v.getRowsList.asScala.map(_.getDataMap.asScala)) + } + println("Thread 1 read " + i + " rows") + reader1.close() + }).start() + + new Thread(() => { + var i = 0 + reader2.foreach { v => + i += 1 + println("Thread 2 data: " + v.getRowsList.asScala.map(_.getDataMap.asScala)) + } + println("Thread 2 read " + i + " rows") + reader2.close() + }).start() + + println("Done") + +}