Skip to content

Commit

Permalink
Merge pull request #1 from MarvinMiao/ns-mod
Browse files Browse the repository at this point in the history
Ns mod
  • Loading branch information
gfork authored Jan 19, 2021
2 parents 73eaf51 + e0dceae commit 929419b
Show file tree
Hide file tree
Showing 32 changed files with 3,042 additions and 2,216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CommonMLRunner(MLContext mlContext, NodeServer server) {

protected boolean doRegisterAction() throws Exception {
createNodeSpec(true);
getCurrentJobVersion();
SimpleResponse response = amClient.registerNode(version, nodeSpec);
if (RpcCode.OK.ordinal() == response.getCode()) {
return true;
Expand Down
2 changes: 1 addition & 1 deletion flink-ai-flow/QUICKSTART.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def build_workflow():
op_2 = af.user_define_operation(af.PythonObjectExecutor(PrintHelloExecutor('job_2')))
with af.config('job_3'):
op_3 = af.user_define_operation(af.PythonObjectExecutor(PrintHelloExecutor('job_2')))
op_3 = af.user_define_operation(af.PythonObjectExecutor(PrintHelloExecutor('job_3')))
af.stop_before_control_dependency(op_3, op_1)
af.stop_before_control_dependency(op_3, op_2)
Expand Down
4 changes: 2 additions & 2 deletions flink-ai-flow/ai_flow/notification/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# under the License.
#
from notification_service import service
from ai_flow.store.sqlalchemy_store import SqlAlchemyStore
from notification_service.event_storage import DbEventStorage


class NotificationService(service.NotificationService):

def __init__(self, backend_store_uri):
super().__init__(storage=SqlAlchemyStore(backend_store_uri))
super().__init__(storage=DbEventStorage(backend_store_uri))
2 changes: 1 addition & 1 deletion flink-ai-flow/lib/airflow/airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from airflow.exceptions import AirflowException, AirflowWebServerTimeout
from airflow.executors import get_default_executor
from airflow.models import (
Connection, DagModel, DagBag, DagPickle, TaskInstance, DagRun, Variable, DAG, EventModel
Connection, DagModel, DagBag, DagPickle, TaskInstance, DagRun, Variable, DAG
)
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_QUEUED_DEPS)
from airflow.utils import cli as cli_utils, db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from notification_service.client import NotificationClient, EventWatcher
from airflow.models.event import DagRunEvent, Event, EventType, TaskInstanceHelper, DagRunFinishedEvent
from airflow.utils.mailbox import Mailbox
from airflow.models.event import EventModel
from notification_service.util.db import EventModel


class EventDagFileProcessor(DagFileProcessor):
Expand Down
1 change: 0 additions & 1 deletion flink-ai-flow/lib/airflow/airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from airflow.models.taskreschedule import TaskReschedule # noqa: F401
from airflow.models.variable import Variable # noqa: F401
from airflow.models.xcom import XCOM_RETURN_KEY, XCom # noqa: F401
from airflow.models.event import EventModel

try:
from airflow.models.kubernetes import KubeResourceVersion, KubeWorkerIdentifier # noqa: F401
Expand Down
93 changes: 1 addition & 92 deletions flink-ai-flow/lib/airflow/airflow/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
# under the License.

from enum import Enum
import time

from notification_service.base_notification import BaseEvent

from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow import LoggingMixin
from airflow.models import Base
from sqlalchemy import Column, Integer, BigInteger, String


class EventType(str, Enum):
Expand Down Expand Up @@ -140,92 +135,6 @@ def event_model_list_to_events(event_model_list):
for event_model in event_model_list:
event = Event(key=event_model.key, value=event_model.value,
event_type=event_model.event_type, version=event_model.version,
create_time=event_model.create_time, id=event_model.id)
create_time=event_model.create_time)
events.append(event)
return events


class EventModel(Base, LoggingMixin):

__tablename__ = "event_model"
id = Column(Integer, primary_key=True)
key = Column(String(1024), nullable=False)
version = Column(Integer, nullable=False)
value = Column(String(4096))
event_type = Column(String(256))
create_time = Column(BigInteger)

@staticmethod
@provide_session
def add_event(event: Event, session=None):
event_model = EventModel()
event_model.key = event.key

def next_version():
return session.query(EventModel).filter(EventModel.key == event.key).count() + 1

event_model.create_time = time.time_ns()
event_model.version = next_version()
event_model.value = event.value
if event.event_type is None:
event_model.event_type = EventType.UNDEFINED
else:
event_model.event_type = event.event_type
session.add(event_model)
session.commit()
return event_model

@staticmethod
@provide_session
def list_events(key: str, version: int, session=None):
if key is None:
raise Exception('key cannot be empty.')

if version is None:
conditions = [
EventModel.key == key,
]
else:
conditions = [
EventModel.key == key,
EventModel.version > version
]
event_model_list = session.query(EventModel).filter(*conditions).all()
return event_model_list

@staticmethod
@provide_session
def list_all_events(start_time: int, session=None):

conditions = [
EventModel.create_time >= start_time
]
event_model_list = session.query(EventModel).filter(*conditions).all()
return event_model_list

@staticmethod
@provide_session
def list_all_events_from_id(id: int, session=None):

conditions = [
EventModel.id > id
]
event_model_list = session.query(EventModel).filter(*conditions).all()
return event_model_list

@staticmethod
@provide_session
def sync_event(event: Event, session=None):
event_model = EventModel()
event_model.key = event.key
event_model.create_time = event.create_time
event_model.version = event.version
event_model.value = event.value
if event.event_type is None or not EventType.is_in(event.event_type):
event_model.event_type = EventType.UNDEFINED
else:
event_model.event_type = event.event_type
session.add(event_model)
session.commit()
return event_model

Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.settings import engine, Session
from notification_service.util.db import prepare_db

from notification_service.event_storage import BaseEventStorage
from airflow.models import EventModel
from airflow.models.event import Event
from notification_service.event_storage import DbEventStorage


class EventModelStorage(BaseEventStorage):
def add_event(self, event: Event):
return EventModel.add_event(event)
class EventModelStorage(DbEventStorage):

def list_events(self, key: str, version: int):
return EventModel.list_events(key, version)

def list_all_events(self, start_time: int):
return EventModel.list_all_events(start_time)

def list_all_events_from_id(self, id: int):
return EventModel.list_all_events_from_id(id)
def __init__(self):
prepare_db(engine, Session)
super(EventModelStorage, self).__init__()
4 changes: 3 additions & 1 deletion flink-ai-flow/lib/airflow/airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from airflow import settings
from airflow.utils.net import get_hostname

from notification_service.util.db import EventModel

csrf = CSRFProtect()


Expand Down Expand Up @@ -110,7 +112,7 @@ def create_app(config=None, testing=False):
Session, name="Task Instances", category="Browse"))
av(vs.TaskExecutionModelView(models.TaskExecution,
Session, name="Task Executions", category="Browse"))
av(vs.EventModelView(models.EventModel,
av(vs.EventModelView(EventModel,
Session, name="Events", category="Browse"))
av(vs.LogModelView(
models.Log, Session, name="Logs", category="Browse"))
Expand Down
8 changes: 4 additions & 4 deletions flink-ai-flow/lib/airflow/airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3167,15 +3167,15 @@ def get_one(self, id):
class EventModelView(ModelViewOnly):
verbose_name_plural = "events"
verbose_name = "event"
column_filters = ('id', 'key', 'version', 'event_type', 'create_time')
column_filters = ('key', 'event_type', 'create_time')
filter_converter = wwwutils.UtcFilterConverter()
named_filter_urls = True
column_formatters = dict(
key=event_key
)
column_searchable_list = ('key', 'version', 'event_type')
column_default_sort = ('id', True)
column_list = ('id', 'key', 'version', 'value', 'event_type', 'create_time')
column_searchable_list = ('key', 'event_type')
column_default_sort = ('version', True)
column_list = ('key', 'version', 'value', 'event_type', 'create_time', 'context', 'namespace')
page_size = PAGE_SIZE

def get_one(self, id):
Expand Down
14 changes: 7 additions & 7 deletions flink-ai-flow/lib/airflow/airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
DateTimeWithNumRunsWithDagRunsForm,
DagRunForm, ConnectionForm)
from airflow.www_rbac.widgets import AirflowModelListWidget

from notification_service.util.db import EventModel

PAGE_SIZE = conf.getint('webserver', 'page_size')
FILTER_TAGS_COOKIE = 'tags_filter'
Expand Down Expand Up @@ -2837,21 +2837,21 @@ def get_one(self, id):


class EventModelView(AirflowModelView):
route_base = '/event'
route_base = '/event_model'

datamodel = AirflowModelView.CustomSQLAInterface(models.EventModel)
datamodel = AirflowModelView.CustomSQLAInterface(EventModel)

base_permissions = ['can_list']

page_size = PAGE_SIZE

list_columns = ['id', 'key', 'version', 'value', 'event_type', 'create_time']
list_columns = ['key', 'version', 'value', 'event_type', 'create_time', 'context', 'namespace']

search_columns = ['id', 'key', 'version', 'event_type', 'create_time']
search_columns = ['key', 'version', 'event_type', 'create_time']

base_order = ('id', 'asc')
base_order = ('version', 'asc')

base_filters = [['id', DagFilter, lambda: []]]
base_filters = [['version', DagFilter, lambda: []]]

formatters_columns = {
'key': wwwutils.event_key
Expand Down
51 changes: 0 additions & 51 deletions flink-ai-flow/lib/airflow/tests/models/test_event.py

This file was deleted.

33 changes: 18 additions & 15 deletions flink-ai-flow/lib/airflow/tests/notification/test_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,41 @@
from notification_service.service import NotificationService

from airflow.notification.event_model_storage import EventModelStorage
from tests.test_utils.db import clear_db_event_model
import unittest
from airflow.models.event import Event
from notification_service.master import NotificationMaster
from notification_service.client import NotificationClient, EventWatcher


class NotificationTest(unittest.TestCase):
def setUp(self):
clear_db_event_model()

@classmethod
def setUpClass(cls):
cls.master = NotificationMaster(NotificationService(EventModelStorage()))
cls.storage = EventModelStorage()
cls.master = NotificationMaster(NotificationService(cls.storage))
cls.master.run()
cls.client = NotificationClient(server_uri="localhost:50051")

@classmethod
def tearDownClass(cls):
cls.master.stop()

def setUp(self):
self.storage.clean_up()
self.client = NotificationClient(server_uri="localhost:50051")

def tearDown(self):
self.client.stop_listen_events()
self.client.stop_listen_event()

def test_send_event(self):
event = self.client.send_event(Event(key="key", value="value1"))
self.assertEqual(1, event.version)
self.assertTrue(event.version > 0)

def test_list_events(self):
event = self.client.send_event(Event(key="key", value="value1"))
event = self.client.send_event(Event(key="key", value="value2"))
event = self.client.send_event(Event(key="key", value="value3"))
events = self.client.list_events("key", version=1)
event1 = self.client.send_event(Event(key="key", value="value1"))
event2 = self.client.send_event(Event(key="key", value="value2"))
event3 = self.client.send_event(Event(key="key", value="value3"))
events = self.client.list_events("key", version=event1.version)
self.assertEqual(2, len(events))

def test_listen_events(self):
Expand All @@ -65,12 +70,12 @@ def __init__(self, event_list) -> None:
def process(self, events: List[Event]):
self.event_list.extend(events)

event = self.client.send_event(Event(key="key", value="value1"))
self.client.start_listen_event(key="key", watcher=TestWatch(event_list), version=1)
event1 = self.client.send_event(Event(key="key", value="value1"))
self.client.start_listen_event(key="key", watcher=TestWatch(event_list), version=event1.version)
event = self.client.send_event(Event(key="key", value="value2"))
event = self.client.send_event(Event(key="key", value="value3"))
self.client.stop_listen_event("key")
events = self.client.list_events("key", version=1)
events = self.client.list_events("key", version=event1.version)
self.assertEqual(2, len(events))
self.assertEqual(2, len(event_list))

Expand Down Expand Up @@ -100,5 +105,3 @@ def process(self, events: List[Event]):
finally:
self.client.stop_listen_events()
self.assertEqual(3, len(event_list))


Loading

0 comments on commit 929419b

Please sign in to comment.