Skip to content

Commit

Permalink
EBP-18: Implemented refactor of cache functionality as requested in P…
Browse files Browse the repository at this point in the history
…R review. Removed dead code/comments from tests.
  • Loading branch information
TrentDaniel committed Jan 20, 2025
1 parent aa2abc5 commit 4ef9b68
Show file tree
Hide file tree
Showing 7 changed files with 543 additions and 603 deletions.
15 changes: 8 additions & 7 deletions internal/ccsmp/ccsmp_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,20 @@ type CacheEventInfo struct {
}

func (eventInfo *CacheEventInfo) String() string {
var errString string
if eventInfo.err != nil {
errString = eventInfo.err.Error()
} else {
errString = "nil"
}
var errString string
if eventInfo.err != nil {
errString = eventInfo.err.Error()
} else {
errString = "nil"
}
return fmt.Sprintf("CacheEventInfo:\n\tcacheSessionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, errString)
}

const (
SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1
)

func NewConfiguredCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) CacheEventInfo {
func NewCacheEventInfoForFailure(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) CacheEventInfo {
return CacheEventInfo{
cacheSessionP: cacheSessionP,
event: SolClientCacheEventRequestCompletedNotice,
Expand All @@ -197,6 +197,7 @@ func CacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo CacheEventInfoPt, us
returnCode: SolClientReturnCode(eventCallbackInfo.rc),
subCode: SolClientSubCode(eventCallbackInfo.subCode),
cacheRequestID: message.CacheRequestID(eventCallbackInfo.cacheRequestId),
err: nil,
}
}

Expand Down
459 changes: 459 additions & 0 deletions internal/impl/core/cache_manager.go

Large diffs are not rendered by default.

41 changes: 31 additions & 10 deletions internal/impl/core/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,8 @@ type Receiver interface {
IncrementDuplicateAckCount()
// Creates a new persistent receiver with the given callback
NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo)
/* FFC: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an,
* anti-pattern so we should look into clarifying this and maybe doing this differently in a future iteration.*/

// Retrieves the sesion pointer
GetSessionPointer() ccsmp.SolClientSessionPt
// CacheManager() returns the manager that can be used to run cache operations
CacheManager() CacheManager
}

// PersistentReceiver interface
Expand Down Expand Up @@ -126,6 +123,13 @@ type FlowEventInfo interface {
EventInfo
}

/* NOTE: This data type could be changed from int32 if necessary, but must match the type of
* BasicCacheManager.cacheResponseChanCounter. */

// cacheResponseChannelMaxSize indicates the maximum number of cache responses that can be buffered by the API without
// being processed by the application.
const cacheResponseChannelMaxSize int32 = 1024

// Implementation
type ccsmpBackedReceiver struct {
events *ccsmpBackedEvents
Expand All @@ -142,6 +146,23 @@ type ccsmpBackedReceiver struct {
subscriptionCorrelationID SubscriptionCorrelationID

subscriptionOkEvent, subscriptionErrorEvent uint

// cacheSessionMap is used to map the cache session pointer to the method for handling the cache response,
// as specified by the application on a call to a [ReceiverCacheRequester] interface.
cacheSessionMap sync.Map // ([keyType]valueType) [ccsmp.SolClientCacheSessionPt]CacheResponseProcessor
//cacheSessionMap map[ccsmp.SolClientCacheSessionPt]CacheResponseProcessor // ([keyType]valueType) [ccsmp.SolClientCacheSessionPt]CacheResponseProcessor

// cacheResponseChan is used to buffer the cache responses from CCSMP.
cacheResponseChan chan ccsmp.CacheEventInfo
// cacheResponseChanCounter is used to prevent cache requests from being submitted if the
// cacheResponseChan buffer is full.
cacheResponseChanCounter int32
// cachePollingRunning is used to determine whether or not the goroutine that polls the cacheResponseChan
// has been started yet.
cachePollingRunning uint32

// cacheLogger is used for logging
cacheLogger logging.LogLevelLogger
}

func newCcsmpReceiver(session *ccsmp.SolClientSession, events *ccsmpBackedEvents, metrics *ccsmpBackedMetrics) *ccsmpBackedReceiver {
Expand Down Expand Up @@ -201,6 +222,11 @@ func (receiver *ccsmpBackedReceiver) SendReply(replyMsg ReplyPublishable) ErrorI
return receiver.session.SolClientSessionPublish(replyMsg)
}

// Return self under a different interface to constrain usage to cache operations
func (receiver *ccsmpBackedReceiver) CacheManager() CacheManager {
return receiver
}

func (receiver *ccsmpBackedReceiver) Events() Events {
return receiver.events
}
Expand Down Expand Up @@ -381,11 +407,6 @@ func (receiver *ccsmpBackedReceiver) NewPersistentReceiver(properties []string,
}, nil
}

// GetSessionPointer returns the opaque pointer to the session associated with the given receiver.
func (receiver *ccsmpBackedReceiver) GetSessionPointer() ccsmp.SolClientSessionPt {
return receiver.session.GetPointer()
}

// Destroy destroys the flow
func (receiver *ccsmpBackedPersistentReceiver) Destroy(freeMemory bool) ErrorInfo {
var err ErrorInfo
Expand Down
228 changes: 0 additions & 228 deletions internal/impl/receiver/cache_util.go

This file was deleted.

Loading

0 comments on commit 4ef9b68

Please sign in to comment.