Skip to content

Commit

Permalink
feat: ✨ OpenAI parser (#245)
Browse files Browse the repository at this point in the history
* feat: ✨ OpenAI parser

* refinement in logic

* logic refinement

* Update README.md

Co-authored-by: Glenn Matthews <[email protected]>

* Update circuit_maintenance_parser/parser.py

Co-authored-by: Glenn Matthews <[email protected]>

* improve question

* make more explicit the text parsing

* Automate token management for local tests

* Make openai library an extra

* Adopt OpenAI library changes

* fix mypy

---------

Co-authored-by: Glenn Matthews <[email protected]>
  • Loading branch information
chadell and glennmatthews authored Nov 15, 2023
1 parent e611c91 commit 13dfea4
Show file tree
Hide file tree
Showing 12 changed files with 862 additions and 379 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,33 @@ By default, there is a `GenericProvider` that support a `SimpleProcessor` using

> Note: Because these providers do not support the BCOP standard natively, maybe there are some gaps on the implemented parser that will be refined with new test cases. We encourage you to report related **issues**!
#### LLM-powered Parsers

The library supports an optional parser option leveraging Large Language Model (LLM) to provide best-effort parsing when the specific parsers have not been successful.

> Warning: Some of these integrations, such as OpenAI, require of extras installations parameters. Check the [extras section](#extras)
When the appropriate environment variable(s) are set (see below), these LLM parsers are automatically appended after all existing processors for each defined Provider.

> These integrations may involve some costs for API usage. Use it carefully! As an order of magnitude, a parsing of an email with OpenAI GPT gpt-3.5-turbo model costs $0.004.
These are the currently supported LLM integrations:

- [OpenAI](https://openai.com/product), these are the supported ENVs:
- `OPENAI_API_KEY` (Required): OpenAI API Key.
- `OPENAI_MODEL` (Optional): Model to use, it defaults to "gpt-3.5-turbo".

## Installation

The library is available as a Python package in pypi and can be installed with pip:
`pip install circuit-maintenance-parser`

### Extras

#### OpenAI

`pip install circuit-maintenance-parser[openai]`

## How to use it?

The library requires two things:
Expand Down Expand Up @@ -319,6 +341,7 @@ The project is following Network to Code software development guidelines and is
...omitted debug logs...
====================================================== 99 passed, 174 deselected, 17 warnings in 10.35s ======================================================
```

7. Run some final CI tests locally to ensure that there is no linting/formatting issues with your changes. You should look to get a code score of 10/10. See the example below: `invoke tests --local`

```
Expand Down
199 changes: 188 additions & 11 deletions circuit_maintenance_parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import quopri
from typing import Dict, List
from email.utils import parsedate_tz, mktime_tz
import hashlib

import bs4 # type: ignore
from bs4.element import ResultSet # type: ignore

from pydantic import BaseModel, Extra
from pydantic import BaseModel
from icalendar import Calendar # type: ignore

from circuit_maintenance_parser.errors import ParserError
Expand All @@ -23,7 +24,7 @@
logger = logging.getLogger(__name__)


class Parser(BaseModel, extra=Extra.forbid):
class Parser(BaseModel):
"""Parser class.
A Parser handles one or more specific data type(s) (specified in `data_types`).
Expand All @@ -34,14 +35,15 @@ class Parser(BaseModel, extra=Extra.forbid):
# _data_types are used to match the Parser to to each type of DataPart
_data_types = ["text/plain", "plain"]

# TODO: move it to where it is used, Cogent parser
_geolocator = Geolocator()

@classmethod
def get_data_types(cls) -> List[str]:
"""Return the expected data type."""
return cls._data_types

def parser_hook(self, raw: bytes) -> List[Dict]:
def parser_hook(self, raw: bytes, content_type: str) -> List[Dict]:
"""Custom parser logic.
This method is used by the main `Parser` classes (such as `ICal` or `Html` parser) to define a shared
Expand All @@ -53,14 +55,14 @@ def parser_hook(self, raw: bytes) -> List[Dict]:
"""
raise NotImplementedError

def parse(self, raw: bytes) -> List[Dict]:
def parse(self, raw: bytes, content_type: str) -> List[Dict]:
"""Execute parsing.
Do not override this method!
Instead, each main `Parser` class should implement its own custom logic within the `parser_hook` method.
"""
try:
result = self.parser_hook(raw)
result = self.parser_hook(raw, content_type)
except Exception as exc:
raise ParserError from exc
if any(not partial_result for partial_result in result):
Expand All @@ -86,7 +88,7 @@ class ICal(Parser):

_data_types = ["text/calendar", "ical", "icalendar"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
# iCalendar data sometimes comes encoded with base64
# TODO: add a test case
Expand Down Expand Up @@ -163,7 +165,7 @@ def remove_hex_characters(string):
"""Convert any hex characters to standard ascii."""
return string.encode("ascii", errors="ignore").decode("utf-8")

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
Expand Down Expand Up @@ -195,7 +197,7 @@ class EmailDateParser(Parser):

_data_types = [EMAIL_HEADER_DATE]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
parsed_date = parsedate_tz(raw.decode())
if parsed_date:
Expand All @@ -208,7 +210,7 @@ class EmailSubjectParser(Parser):

_data_types = [EMAIL_HEADER_SUBJECT]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_subject(self.bytes_to_string(raw).replace("\r", "").replace("\n", "")):
Expand All @@ -230,7 +232,7 @@ class Csv(Parser):

_data_types = ["application/csv", "text/csv", "application/octet-stream"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
for data in self.parse_csv(raw):
Expand All @@ -249,7 +251,7 @@ class Text(Parser):

_data_types = ["text/plain"]

def parser_hook(self, raw: bytes):
def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
text = self.get_text_hook(raw)
Expand All @@ -265,3 +267,178 @@ def get_text_hook(raw: bytes) -> str:
def parse_text(self, text) -> List[Dict]:
"""Custom text parsing."""
raise NotImplementedError


class LLM(Parser):
"""LLM parser."""

_data_types = ["text/html", "html", "text/plain"]

_llm_question = """Please, could you extract a JSON form without any other comment,
with the following JSON schema (timestamps in EPOCH):
{
"type": "object",
"properties": {
"start": {
"type": "int",
},
"end": {
"type": "int",
},
"account": {
"type": "string",
},
"summary": {
"type": "string",
},
"maintenance_id": {
"type": "string",
},
"account": {
"type": "string",
},
"status": {
"type": "string",
},
"impact": {
"type": "string",
},
"circuit_ids": {
"type": "array",
"items": {
"type": "string",
}
}
}
More context:
* Circuit IDs are also known as service or order
* Status could be confirmed, ongoing, cancelled, completed or rescheduled
"""

def parser_hook(self, raw: bytes, content_type: str):
"""Execute parsing."""
result = []
if content_type in ["html", "text/html"]:
soup = bs4.BeautifulSoup(quopri.decodestring(raw), features="lxml")
content = soup.text
elif content_type in ["text/plain"]:
content = self.get_text_hook(raw)

for data in self.parse_content(content):
result.append(data)
return result

@staticmethod
def get_text_hook(raw: bytes) -> str:
"""Can be overwritten by subclasses."""
return raw.decode()

@staticmethod
def get_key_with_string(dictionary: dict, string: str):
"""Returns the key in the dictionary that contains the given string."""
for key in dictionary.keys():
if string in key:
return key
return None

def get_llm_response(self, content):
"""Method to retrieve the response from the LLM for some content."""
raise NotImplementedError

def _get_impact(self, generated_json: dict):
"""Method to get a general Impact for all Circuits."""
impact_key = self.get_key_with_string(generated_json, "impact")
if impact_key:
if "no impact" in generated_json[impact_key].lower():
return Impact.NO_IMPACT
if "partial" in generated_json[impact_key].lower():
return Impact.DEGRADED

return Impact.OUTAGE

def _get_circuit_ids(self, generated_json: dict, impact: Impact):
"""Method to get the Circuit IDs and use a general Impact."""
circuits = []
circuits_ids_key = self.get_key_with_string(generated_json, "circuit")
for circuit in generated_json[circuits_ids_key]:
if isinstance(circuit, str):
circuits.append(CircuitImpact(circuit_id=circuit, impact=impact))
elif isinstance(circuit, dict):
circuit_key = self.get_key_with_string(circuit, "circuit")
circuits.append(CircuitImpact(circuit_id=circuit[circuit_key], impact=impact))

return circuits

def _get_start(self, generated_json: dict):
"""Method to get the Start Time."""
return generated_json[self.get_key_with_string(generated_json, "start")]

def _get_end(self, generated_json: dict):
"""Method to get the End Time."""
return generated_json[self.get_key_with_string(generated_json, "end")]

def _get_summary(self, generated_json: dict):
"""Method to get the Summary."""
return generated_json[self.get_key_with_string(generated_json, "summary")]

def _get_status(self, generated_json: dict):
"""Method to get the Status."""
status_key = self.get_key_with_string(generated_json, "status")

if "confirmed" in generated_json[status_key].lower():
return Status.CONFIRMED
if "rescheduled" in generated_json[status_key].lower():
return Status.RE_SCHEDULED
if "cancelled" in generated_json[status_key].lower():
return Status.CANCELLED
if "ongoing" in generated_json[status_key].lower():
return Status.IN_PROCESS
if "completed" in generated_json[status_key].lower():
return Status.COMPLETED

return Status.CONFIRMED

def _get_account(self, generated_json: dict):
"""Method to get the Account."""
account = generated_json[self.get_key_with_string(generated_json, "account")]
if not account:
return "Not found"

return account

def _get_maintenance_id(self, generated_json: dict, start, end, circuits):
"""Method to get the Maintenance ID."""
maintenance_key = self.get_key_with_string(generated_json, "maintenance")
if maintenance_key and generated_json["maintenance_id"] != "N/A":
return generated_json["maintenance_id"]

maintenance_id = str(start) + str(end) + "".join(list(circuits))
return hashlib.md5(maintenance_id.encode("utf-8")).hexdigest() # nosec

def parse_content(self, content):
"""Parse content via LLM."""
generated_json = self.get_llm_response(content)
if not generated_json:
return []

impact = self._get_impact(generated_json)

data = {
"circuits": self._get_circuit_ids(generated_json, impact),
"start": int(self._get_start(generated_json)),
"end": int(self._get_end(generated_json)),
"summary": str(self._get_summary(generated_json)),
"status": self._get_status(generated_json),
"account": str(self._get_account(generated_json)),
}

data["maintenance_id"] = str(
self._get_maintenance_id(
generated_json,
data["start"],
data["end"],
data["circuits"],
)
)

return [data]
59 changes: 59 additions & 0 deletions circuit_maintenance_parser/parsers/openai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""OpenAI Parser."""
import os
import logging
import json
from typing import List, Optional

try:
from openai import OpenAI # type: ignore
except ImportError:
_HAS_OPENAI = False
else:
_HAS_OPENAI = True

from circuit_maintenance_parser.parser import LLM

logger = logging.getLogger(__name__)


class OpenAIParser(LLM):
"""Notifications Parser powered by OpenAI ChatGPT."""

def get_llm_response(self, content) -> Optional[List]:
"""Get LLM processing from OpenAI."""
if not _HAS_OPENAI:
raise ImportError("openai extra is required to use OpenAIParser.")

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
try:
response = client.chat.completions.create(
model=model,
messages=[
{ # type: ignore
"role": "system",
"content": self._llm_question,
},
{ # type: ignore
"role": "user",
"content": content,
},
],
)

# TODO: Maybe asking again about the generated response could refine it

except Exception as err: # pylint: disable=broad-exception-caught
logger.error(err)
return None

logger.info("Used OpenAI tokens: %s", response.usage)
generated_text = response.choices[0].message.content
logger.info("Response from LLM: %s", generated_text)
try:
return json.loads(generated_text) # type: ignore
except ValueError as err:
logger.error(err)
return None

return None
Loading

0 comments on commit 13dfea4

Please sign in to comment.