Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batchprocessing #208

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions antidox/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@wikidetox_requirements//:requirements.bzl", "requirement")

py_binary(
name = "perspective",
srcs = ["perspective.py"],
deps = [
requirement("python-dateutil"),
requirement("google-api-python-client"),
requirement("pandas"),
requirement("uritemplate"),
],
)

py_test(
name = "perspective_test",
srcs = ["perspective_test.py"],
deps = [":perspective"],
)

py_binary(
name = "wikiwatcher",
srcs = ["wikiwatcher.py"],
deps = [
requirement("sseclient"),
],
)

py_test(
name = "wikiwatcher_test",
srcs = ["wikiwatcher_test.py"],
deps = [":wikiwatcher"],
)
7 changes: 5 additions & 2 deletions antidox/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ will be tested in wikipedia chat rooms as a staring point.
bazel run :perspective --input_file=$PWD/example.csv
--api_key=$PWD/api_key.json
```

5. To run code on a distributed system:
```shell
--setup_file ,/setup.py
```
Run the given model that test the comment from the csv file for toxicity and personally identifiable information.

5. Run unittest to ensure the functions contains_toxicity(), and contains_pii(), are working properly.
6. Run unittest to ensure the functions contains_toxicity(), and contains_pii(), are working properly.
```shell
bazel test :perspective_test --test_output=all
```
Expand Down
Empty file removed antidox/__init__.py
Empty file.
112 changes: 0 additions & 112 deletions antidox/clean.py

This file was deleted.

136 changes: 104 additions & 32 deletions antidox/perspective.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
""" inputs comments to perspective and dlp apis and detects
toxicity and personal information> has support for csv files,
bigquery tables, and wikipedia talk pages"""
#TODO(tamajongnc): configure pipeline to distribute work to multiple machines
#TODO(tamajongnc): use windowing technique to accomodate large and continuous data sets
# pylint: disable=fixme, import-error
# pylint: disable=fixme, unused-import
import argparse
import json
import sys
from google.cloud import bigquery
from googleapiclient import discovery
from googleapiclient import errors as google_api_errors
import pandas as pd
import requests
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
import requests
import pandas as pd
from antidox import clean
from googleapiclient import errors as google_api_errors
from googleapiclient import discovery
from google.cloud import bigquery
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam import window
import clean



def get_client():
""" generates API client with personalized API key """
Expand Down Expand Up @@ -72,10 +77,10 @@ def dlp_request(dlp, apikey_data, comment):
"name":"PASSPORT"
},
{
"name":"PERSON_NAME"
"name":"GCP_CREDENTIALS"
},
{
"name":"ALL_BASIC"
"name":"SWIFT_CODE"
}
],
"minLikelihood":"POSSIBLE",
Expand Down Expand Up @@ -116,6 +121,22 @@ def contains_toxicity(perspective_response):
is_toxic = True
return is_toxic

def contains_threat(perspective_response):
"""Checking/returning comments with a threat value of over 50 percent."""
is_threat = False
if (perspective_response['attributeScores']['THREAT']['summaryScore']
['value'] >= .5):
is_threat = True
return is_threat

def contains_insult(perspective_response):
"""Checking/returning comments with an insult value of over 50 percent."""
is_insult = False
if (perspective_response['attributeScores']['INSULT']['summaryScore']
['value'] >= .5):
is_insult = True
return is_insult


def get_wikipage(pagename):
""" Gets all content from a wikipedia page and turns it into plain text. """
Expand All @@ -126,11 +147,14 @@ def get_wikipage(pagename):
text_response = response['query']['pages'][0]['revisions'][0]['content']
return text_response


def wiki_clean(get_wikipage):
"""cleans the comments from wikipedia pages"""
text = clean.content_clean(get_wikipage)
print (text)
print(text)
return text


def use_query(content, sql_query, big_q):
"""make big query api request"""
query_job = big_q.query(sql_query)
Expand All @@ -140,8 +164,8 @@ def use_query(content, sql_query, big_q):
strlst.append(row[content])
return strlst


# pylint: disable=fixme, too-many-locals
# pylint: disable=fixme, too-many-statements
def main(argv):
""" runs dlp and perspective on content passed in """
parser = argparse.ArgumentParser(description='Process some integers.')
Expand All @@ -152,62 +176,110 @@ def main(argv):
parser.add_argument('--csv_file', help='choose CSV file to process')
parser.add_argument('--wiki_pagename', help='insert the talk page name')
parser.add_argument('--content', help='specify a column in dataset to retreive data from')
parser.add_argument('--output', help='path for output file')
parser.add_argument('--suffix', help='output file suffix')
parser.add_argument('--project', help='project id for bigquery table')
args = parser.parse_args(argv)
parser.add_argument('--output', help='path for output file in cloud bucket')
parser.add_argument('--nd_output', help='gcs path to store ndjson results')
parser.add_argument('--project', help='project id for bigquery table', \
default='wikidetox-viz')
parser.add_argument('--gproject', help='gcp project id')
parser.add_argument('--temp_location', help='cloud storage path for temp files \
must begin with gs://')
args, pipe_args = parser.parse_known_args(argv)
apikey_data, perspective, dlp = get_client()
with beam.Pipeline(options=PipelineOptions()) as pipeline:
options = PipelineOptions(pipe_args)
gcloud_options = options.view_as(GoogleCloudOptions)
gcloud_options.project = 'google.com:new-project-242016'
gcloud_options.staging_location = 'gs://tj_cloud_bucket/stage'
gcloud_options.temp_location = 'gs://tj_cloud_bucket/tmp'
options.view_as(StandardOptions).runner = 'dataflow'
options.view_as(WorkerOptions).num_workers = 100
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as pipeline:
if args.wiki_pagename:
wiki_response = get_wikipage(args.wiki_pagename)
wikitext = wiki_clean(wiki_response)
text = wikitext.split("\n")
comments = pipeline | beam.Create(text)
if args.csv_file:
comments = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(args.csv_file)
comments = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(pd.read_csv(args.csv_file))
if args.sql_query:
comments = (
pipeline
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(
query=args.sql_query,
use_standard_sql=True))
use_standard_sql=True)) \
| beam.Map(lambda elem: elem[args.content]))

# pylint: disable=fixme, too-few-public-methods
class NDjson(beam.DoFn):
"""class for NDJson"""

# pylint: disable=fixme, no-self-use
# pylint: disable=fixme, inconsistent-return-statements
def process(self, element):
"""Takes toxicity and dlp results and converst them to NDjson"""
try:
dlp_response = dlp_request(dlp, apikey_data, element)
perspective_response = perspective_request(perspective, element)
if contains_toxicity(perspective_response):
data = {'comment': element,
'Toxicity': str(perspective_response['attributeScores']
['TOXICITY']['summaryScore']['value'])}
return [json.dumps(data) + '\n']
has_pii_bool, pii_type = contains_pii(dlp_response)
if has_pii_bool:
data = {'comment': element, \
'pii_detected': str(pii_type) \
}
return [json.dumps(data) + '\n']
except google_api_errors.HttpError as err:
print('error', err)

# pylint: disable=fixme, too-few-public-methods
class GetToxicity(beam.DoFn):

"""The DoFn to perform on each element in the input PCollection"""

# pylint: disable=fixme, no-self-use
# pylint: disable=fixme, inconsistent-return-statements
def process(self, element):
"""Runs every element of collection through perspective and dlp"""
print(element)

print(repr(element))
print('==============================================\n')
if not element:
return None
try:
dlp_response = dlp_request(dlp, apikey_data, element)
has_pii_bool, pii_type = contains_pii(dlp_response)
perspective_response = perspective_request(perspective, element)
has_pii_bool, pii_type = contains_pii(dlp_response)
if has_pii_bool:
pii = [element+"\n"+'contains pii?'+"Yes"+"\n"+str(pii_type)+"\n" \
+"==============================================="+"\n"]
pii = (json.dumps({"comment_text":element, "contains_pii": True, "pii_type":pii_type})+"\n")
return pii
if contains_toxicity(perspective_response):
tox = [element+"\n" +"contains TOXICITY?:"+"Yes"
+"\n"+str(perspective_response['attributeScores']
['TOXICITY']['summaryScore']['value'])+"\n"
+"=========================================="+"\n"]
tox = (json.dumps({"comment_text":element, "contains_toxicity": True,
"summaryScore":perspective_response['attributeScores']
['TOXICITY']['summaryScore']['value']})+"\n")
return tox
if contains_threat(perspective_response):
threat = (json.dumps({"comment_text":element, "contains_threat": True,
"summaryScore":perspective_response['attributeScores']
['THREAT']['summaryScore']['value']})+"\n")
return threat
if contains_insult(perspective_response):
insult = (json.dumps({"comment_text":element, "contains_insult": True,
"summaryScore":perspective_response['attributeScores']
['INSULT']['summaryScore']['value']})+"\n")
return insult
except google_api_errors.HttpError as err:
print('error', err)
results = comments\
results = comments \
| beam.ParDo(GetToxicity())
json_results = comments \
| beam.ParDo(NDjson())
# pylint: disable=fixme, expression-not-assigned
results | 'WriteToText' >> beam.io.WriteToText(
args.output, \
file_name_suffix=args.suffix)
'gs://tj_cloud_bucket/beam.txt')
json_results | 'WriteToText2' >> beam.io.WriteToText(
'gs://tj_cloud_bucket/results.json')
if __name__ == '__main__':
main(sys.argv[1:])


Loading