Skip to content

Commit

Permalink
Siren: implemented subscribeTo.
Browse files Browse the repository at this point in the history
  • Loading branch information
achirkin committed Feb 3, 2017
1 parent f01663d commit 0eadae6
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 133 deletions.
3 changes: 3 additions & 0 deletions apps/hs/helen/src/Helen/Core/Service.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
-- This module is responsible for managing all services:
-- register, unregister, process message.
--
-- TODO: When any client disconnects, I need to send cancel messages to all
-- nonBlocking pending services.
-- At this moment, without this, memory leak can happen on a service side.
-----------------------------------------------------------------------------
{-# LANGUAGE OverloadedStrings #-}
module Helen.Core.Service
Expand Down
148 changes: 88 additions & 60 deletions services/siren/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,23 @@ module Main (main) where
import Luci.Connect
import Luci.Messages
import Luci.Connect.Base
import Control.Arrow ((&&&))
import Control.Monad (void)
import Data.Aeson as JSON
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Data.Conduit
import Data.Int
import Data.Semigroup
--import Data.Monoid ((<>))
-- import Data.Maybe (fromMaybe)
import Data.Maybe (fromMaybe)
import Data.ByteString (ByteString)
-- import qualified Data.ByteString.Internal as BSI
import qualified Data.ByteString.Lazy as BSL
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import qualified Control.Lens as Lens
import Control.Lens.Operators ((%%=), (%=))
import Control.Lens.Operators ((%=), (<+=))

import Data.List (stripPrefix)
-- import Control.Monad.Logger
Expand All @@ -42,22 +43,22 @@ import Lib
----------------------------------------------------------------------------------------------------

data ServiceState = ServiceState
{ _currentRunToken :: Token
, _tokenGen :: Token
, _connection :: Connection
{ _tokenGen :: !Token
, _connection :: !Connection
, _subscribers :: !(HashMap Int64 [Token])
}


Lens.makeLenses ''ServiceState

genToken :: MonadState ServiceState m => m Token
genToken = tokenGen %%= (id &&& (+1))

genToken = tokenGen <+= 1


-- | entry point
main :: IO ()
main = withPostgres sets $ \conn ->
void $ runLuciClient (ServiceState 0 0 conn) processMessages
void $ runLuciClient (ServiceState 0 conn HashMap.empty) processMessages
where
sets = PSSettings
{ uName = "siren"
Expand Down Expand Up @@ -95,6 +96,7 @@ processMessages = do
genToken >>= yield . headerBytes . registerUpdateScenario
genToken >>= yield . headerBytes . registerDeleteScenario
genToken >>= yield . headerBytes . registerRecoverScenario
genToken >>= yield . headerBytes . registerSubscribeTo

-- reply to all run requests
awaitForever responseMsgs
Expand All @@ -105,56 +107,74 @@ responseMsgs :: Message -> Conduit Message (LuciProgram ServiceState) ByteString
-- Respond only to run messages with correct input

responseMsgs (MsgRun token "scenario.GetList" _ _) = do
conn <- _connection <$> get
conn <- Lens.use connection
eresultBS <- liftIO $ listScenarios conn (fromIntegral token)
yieldAnswer token eresultBS

responseMsgs (MsgRun token "scenario.geojson.Get" pams _)
| Just (Success scId) <- fromJSON <$> HashMap.lookup "ScID" pams = do
conn <- _connection <$> get
eresultBS <- liftIO $ getScenario conn (fromIntegral token) scId
| Just (Success scID) <- fromJSON <$> HashMap.lookup "ScID" pams = do
conn <- Lens.use connection
eresultBS <- liftIO $ getScenario conn (fromIntegral token) (ScenarioId scID)
yieldAnswer token eresultBS

responseMsgs (MsgRun token "scenario.geojson.Create" pams _)
| Just (Success scName) <- fromJSON <$> HashMap.lookup "name" pams
, Just geom_input <- BSL.toStrict . JSON.encode <$> HashMap.lookup "geometry_input" pams = do
conn <- _connection <$> get
conn <- Lens.use connection
eresultBS <- liftIO $ createScenario conn (fromIntegral token) (BSC.pack scName) geom_input
yieldAnswer token eresultBS

responseMsgs (MsgRun token "scenario.geojson.Update" pams _)
| Just (Success scID) <- fromJSON <$> HashMap.lookup "ScID" pams
, Just geom_input <- BSL.toStrict . JSON.encode <$> HashMap.lookup "geometry_input" pams = do
conn <- _connection <$> get
eresultBS <- liftIO $ updateScenario conn (fromIntegral token) scID geom_input
conn <- Lens.use connection
eresultBS <- liftIO $ updateScenario conn (fromIntegral token) (ScenarioId scID) geom_input
yieldAnswer token eresultBS
-- send update to all subscribers
subTokens <- map fromIntegral . fromMaybe [] . HashMap.lookup scID <$> Lens.use subscribers
eGetMsgs <- liftIO $ getLastScUpdates conn subTokens (ScenarioId scID)
case eGetMsgs of
Left errbs -> logWarnN $ Text.decodeUtf8 errbs
Right rezs -> mapM_ yield rezs

responseMsgs (MsgRun token "scenario.geojson.Delete" pams _)
| Just (Success scID) <- fromJSON <$> HashMap.lookup "ScID" pams = do
conn <- _connection <$> get
eresultBS <- liftIO $ deleteScenario conn (fromIntegral token) scID
conn <- Lens.use connection
eresultBS <- liftIO $ deleteScenario conn (fromIntegral token) (ScenarioId scID)
yieldAnswer token eresultBS

responseMsgs (MsgRun token "scenario.geojson.Recover" pams _)
| Just (Success scID) <- fromJSON <$> HashMap.lookup "ScID" pams = do
conn <- _connection <$> get
eresultBS <- liftIO $ recoverScenario conn (fromIntegral token) scID
conn <- Lens.use connection
eresultBS <- liftIO $ recoverScenario conn (fromIntegral token) (ScenarioId scID)
yieldAnswer token eresultBS

responseMsgs (MsgRun token _ _ _) = do
currentRunToken %= const token
yield . headerBytes $ MsgError token "Failed to understand scenario service request: incorrect input in the 'run' message."
responseMsgs (MsgRun token "scenario.SubscribeTo" pams _)
| Just (Success scIDs) <- fromJSON <$> HashMap.lookup "ScIDs" pams = do
subscribers %= flip (foldr (HashMap.alter addSubscriber)) (scIDs :: [Int64])
yield . headerBytes $ MsgProgress token 0 Nothing []
where
addSubscriber Nothing = Just [token]
addSubscriber (Just xs) = Just (token:xs)

responseMsgs (MsgRun token _ _ _) = yield . headerBytes $ MsgError token
"Failed to understand scenario service request: incorrect input in the 'run' message."

responseMsgs (MsgCancel token) =
-- remove a subscriber with this token
subscribers %= HashMap.map (filter (token /=))

responseMsgs (MsgError token s) = do
-- remove a subscriber with this token
subscribers %= HashMap.map (filter (token /=))
-- log a little
logWarnN $ "[Luci error message] " <> s


responseMsgs (MsgError _ s) = logWarnN $ "[Luci error message] " <> s
responseMsgs msg = logInfoN . ("[Ignore Luci message] " <>) . showJSON . toJSON . fst $ makeMessage msg

yieldAnswer :: Token
-> Either ByteString ByteString
-> Conduit Message (LuciProgram ServiceState) ByteString
yieldAnswer token eresultBS = yield $ case eresultBS of
Left errbs -> headerBytes $ MsgError token (Text.decodeUtf8 errbs)
Right resultBS -> resultBS



----------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -316,44 +336,52 @@ registerUpdateScenario token = MsgRun token "RemoteRegister" o []
]
]


-- -- | A message we send to register in luci
-- registerMessage :: Token -> Message
-- registerMessage token = MsgRun token "RemoteRegister" o []
-- where
-- o = HashMap.fromList
-- [ "description" .= String "Show the distance to the closest building line"
-- , "serviceName" .= String "DistanceToWalls"
-- , "qua-view-compliant" .= Bool True
-- , "inputs" .= object
-- [ "ScID" .= String "number"
-- , "mode" .= String "string"
-- , "points" .= String "attachment"
-- ]
-- , "outputs" .= object
-- [ "units" .= String "string"
-- , "values" .= String "attachment"
-- ]
-- , "constraints" .= object
-- [ "mode" .= ["points" :: Text]
-- ]
-- , "exampleCall" .= object
-- [ "run" .= String "DistanceToWalls"
-- , "ScId" .= Number 1
-- , "mode" .= String "points"
-- , "points" .= object
-- [ "format" .= String "Float32Array"
-- ]
-- ]
-- ]
-- | A message we send to register in luci
registerSubscribeTo :: Token -> Message
registerSubscribeTo token = MsgRun token "RemoteRegister" o []
where
o = HashMap.fromList
[ "description" .= Text.unlines
[ "Subscribe to all changes in listed scenarios"
, "Note, this service never returns; instead, it send all updates in 'progress' messages"
]
, "serviceName" .= String "scenario.SubscribeTo"
, "nonBlocking" .= Bool True
, "inputs" .= object
[ "ScIDs" .= String "[ScID]"
]
, "outputs" .= object
[ "created" .= String "number"
, "lastmodified" .= String "number"
, "geometry_output" .= object
[ "format" .= String "'GeoJSON'"
, "name" .= String "string"
, "geometry" .= String "{FeatureCollection}"
, "properties" .= String "object"
]
]
, "exampleCall" .= object
[ "run" .= String "scenario.SubscribeTo"
, "ScIDs" .= [Number 3, Number 6]
]
]



--------------------------------------------------------------------------------

headerBytes :: Message -> ByteString
headerBytes = BSL.toStrict . JSON.encode . fst . makeMessage



yieldAnswer :: Token
-> Either ByteString ByteString
-> Conduit Message (LuciProgram ServiceState) ByteString
yieldAnswer token eresultBS = yield $ case eresultBS of
Left errbs -> headerBytes $ MsgError token (Text.decodeUtf8 errbs)
Right resultBS -> resultBS

--------------------------------------------------------------------------------

runLuciClient :: s
Expand Down
12 changes: 3 additions & 9 deletions services/siren/sql/create_scenario.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
-- Create a new scenario.
-- Input geometry is assumed to be in metric system.
-- Optional parameters are longitude/latitute in degrees and altitude in meters
CREATE OR REPLACE FUNCTION create_scenario( token jsonb
, scName text
CREATE OR REPLACE FUNCTION create_scenario( scName text
, geom_input jsonb)
RETURNS jsonb AS
$func$
Expand Down Expand Up @@ -124,16 +123,11 @@ BEGIN
WHERE ph.ts_update = curTime AND ph.scenario_id = ScID;

-- finish!
RETURN (
SELECT jsonb_build_object(
'result', jsonb_build_object
RETURN jsonb_build_object
( 'ScID' , ScID
, 'name' , scName
, 'created' , round(EXTRACT(epoch FROM curTime))
, 'lastmodified', round(EXTRACT(epoch FROM curTime))
),
'callID', token
)
);
);
END;
$func$ LANGUAGE plpgsql;
11 changes: 3 additions & 8 deletions services/siren/sql/delete_scenario.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION delete_scenario(token jsonb, ScID bigint)
CREATE OR REPLACE FUNCTION delete_scenario(ScID bigint)
RETURNS jsonb AS
$func$
BEGIN
Expand All @@ -10,16 +10,11 @@ BEGIN
SET alive = FALSE
WHERE scenario.id = ScID;
-- finish!
RETURN (
SELECT jsonb_build_object(
'result', jsonb_build_object
RETURN jsonb_build_object
( 'ScID' , ScID
, 'name' , (SELECT name FROM scenario WHERE scenario.id = ScID)
, 'created' , (SELECT round(EXTRACT(epoch FROM MIN(ts_update))) FROM sc_geometry_history WHERE scenario_id = ScID)
, 'lastmodified', (SELECT round(EXTRACT(epoch FROM MAX(ts_update))) FROM sc_geometry_history WHERE scenario_id = ScID)
),
'callID', token
)
);
);
END;
$func$ LANGUAGE plpgsql;
66 changes: 66 additions & 0 deletions services/siren/sql/get_last_sc_update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
CREATE OR REPLACE FUNCTION get_last_sc_update(ScID bigint)
RETURNS jsonb AS
-- TODO: add feature with deleted geom ids
$func$
DECLARE
features jsonb;
geometry_output jsonb;
lastTime TIMESTAMP;
BEGIN
IF (SELECT count(*) < 1 FROM scenario WHERE scenario.id = ScID) THEN
RAISE EXCEPTION 'Nonexistent scenario (ScID = %)', ScID
USING HINT = 'You are trying to get a scenario that has not ever existed!';
END IF;

-- get the time of the last scenario update
lastTime := greatest
( (SELECT max(last_update) FROM sc_geometry WHERE scenario_id = ScID)
, (SELECT max(last_update) FROM sc_geometry_prop WHERE scenario_id = ScID)
, (SELECT max(last_update) FROM scenario_prop WHERE scenario_id = ScID)
);

SELECT jsonb_build_object(
'type', 'FeatureCollection',
'features', jsonb_agg(fcs.feature)
) as FeatureCollection
FROM (
SELECT jsonb_build_object(
'type', 'Feature',
'id', CAST(g.id AS text),
'geometry', ST_AsGeoJSON(ST_Transform(g.geom, (SELECT srid FROM scenario WHERE id = ScID)))::jsonb,
'properties', coalesce(
( SELECT jsonb_object_agg(ph.name, ph.value)
FROM sc_geometry_prop_history ph
WHERE ph.ts_update = lastTime
AND ph.scenario_id = ScID
AND ph.geometry_id = g.id
AND ph.alive
), '{}') || jsonb_build_object('geomID' , g.id)
) as feature
FROM (SELECT gh.*
FROM sc_geometry_history gh
WHERE gh.ts_update = lastTime
AND gh.scenario_id = ScID
AND gh.alive) g
WHERE g.geom IS NOT NULL
) fcs
INTO features;

geometry_output := jsonb_build_object
( 'format' , 'GeoJSON'
, 'name' , (SELECT name FROM scenario WHERE id = ScID)
, 'geometry' , features
, 'properties' , ( SELECT jsonb_object_agg(ph.name, ph.value)
FROM scenario_prop_history ph
WHERE ph.ts_update = lastTime
AND ph.scenario_id = ScID
AND ph.alive )
);

RETURN jsonb_build_object
( 'geometry_output' , geometry_output
, 'created' , (SELECT round(EXTRACT(epoch FROM MIN(ts_update))) FROM sc_geometry_history WHERE scenario_id = ScID)
, 'lastmodified' , round(EXTRACT(epoch FROM lastTime))
);
END;
$func$ LANGUAGE plpgsql;
11 changes: 2 additions & 9 deletions services/siren/sql/get_scenario.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION get_scenario(token jsonb, ScID bigint)
CREATE OR REPLACE FUNCTION get_scenario(ScID bigint)
RETURNS jsonb AS
$func$
DECLARE
Expand Down Expand Up @@ -56,17 +56,10 @@ BEGIN
AND ph.alive )
);

result := jsonb_build_object
RETURN jsonb_build_object
( 'geometry_output' , geometry_output
, 'created' , (SELECT round(EXTRACT(epoch FROM MIN(ts_update))) FROM sc_geometry_history WHERE scenario_id = ScID)
, 'lastmodified' , (SELECT round(EXTRACT(epoch FROM MAX(ts_update))) FROM sc_geometry_history WHERE scenario_id = ScID)
);

return (
SELECT jsonb_build_object(
'result', result,
'callID', token
)
);
END;
$func$ LANGUAGE plpgsql;
Loading

0 comments on commit 0eadae6

Please sign in to comment.