diff --git a/antidox/BUILD b/antidox/BUILD new file mode 100644 index 00000000..1e541d99 --- /dev/null +++ b/antidox/BUILD @@ -0,0 +1,32 @@ +load("@wikidetox_requirements//:requirements.bzl", "requirement") + +py_binary( + name = "perspective", + srcs = ["perspective.py"], + deps = [ + requirement("python-dateutil"), + requirement("google-api-python-client"), + requirement("pandas"), + requirement("uritemplate"), + ], +) + +py_test( + name = "perspective_test", + srcs = ["perspective_test.py"], + deps = [":perspective"], +) + +py_binary( + name = "wikiwatcher", + srcs = ["wikiwatcher.py"], + deps = [ + requirement("sseclient"), + ], +) + +py_test( + name = "wikiwatcher_test", + srcs = ["wikiwatcher_test.py"], + deps = [":wikiwatcher"], +) diff --git a/antidox/README.md b/antidox/README.md index d113c8fb..edb40c3b 100644 --- a/antidox/README.md +++ b/antidox/README.md @@ -23,10 +23,13 @@ will be tested in wikipedia chat rooms as a staring point. bazel run :perspective --input_file=$PWD/example.csv --api_key=$PWD/api_key.json ``` - +5. To run code on a distributed system: + ```shell + --setup_file ,/setup.py + ``` Run the given model that test the comment from the csv file for toxicity and personally identifiable information. -5. Run unittest to ensure the functions contains_toxicity(), and contains_pii(), are working properly. +6. Run unittest to ensure the functions contains_toxicity(), and contains_pii(), are working properly. ```shell bazel test :perspective_test --test_output=all ``` diff --git a/antidox/__init__.py b/antidox/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/antidox/clean.py b/antidox/clean.py deleted file mode 100755 index 65587b18..00000000 --- a/antidox/clean.py +++ /dev/null @@ -1,112 +0,0 @@ -# -*- coding: utf-8 -*- -r"""HTML cleaning utilties. - -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); you may not -use this file except in compliance with the License. - -You may obtain a copy of the License at -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -------------------------------------------------------------------------------- -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -import re - -from bs4 import BeautifulSoup -import mwparserfromhell - - -months = [ - 'January', - 'February', - 'March', - 'April', - 'May', - 'June', - 'July', - 'August', - 'September', - 'October', - 'November', - 'December', - 'Jan', - 'Feb', - 'Mar', - 'Apr', - 'May', - 'Jun', - 'Jul', - 'Aug', - 'SJep', - 'Oct', - 'Nov', - 'Dec', -] - -month_or = '|'.join(months) -date_p = re.compile(r'\d\d:\d\d,( \d?\d)? (%s)( \d?\d)?,? \d\d\d\d (\(UTC\))?' % - month_or) - - -def remove_date(comment): - return re.sub(date_p, lambda x: '', comment) - - -pre_sub_patterns = [(r'\[\[Image:.*?\]\]', ''), - (r'', '[BLOCKING_ACTION]'), - (r'\[\[File:.*?\]\]', ''), (r'\[\[User:.*?\]\]', ''), - (r'\[\[user:.*?\]\]', ''), - (r'\(?\[\[User talk:.*?\]\]\)?', ''), - (r'\(?\[\[user talk:.*?\]\]\)?', ''), - (r'\(?\[\[User Talk:.*?\]\]\)?', ''), - (r'\(?\[\[User_talk:.*?\]\]\)?', ''), - (r'\(?\[\[user_talk:.*?\]\]\)?', ''), - (r'\(?\[\[User_Talk:.*?\]\]\)?', ''), - (r'\(?\[\[Special:Contributions.*?\]\]\)?', '')] - -post_sub_patterns = [(r'--', ''), (' :', ' '), - (r'—Preceding .* comment added by \u2022', '')] - - -def substitute_patterns(s, sub_patterns): - for p, r in sub_patterns: - s = re.sub(p, r, s, flags=re.UNICODE) - return s - - -def strip_html(s): - try: - s = BeautifulSoup(s, 'html.parser').get_text() - except: # pylint: disable=bare-except - pass - return s - - -def strip_mw(s): - try: - parsed = mwparserfromhell.parse(s, skip_style_tags=True).strip_code() - except: # pylint: disable=bare-except - return s - return parsed - - -def content_clean(rev): - ret = remove_date(rev) - ret = substitute_patterns(ret, pre_sub_patterns) - ret = strip_mw(ret) - ret = strip_html(ret) - ret = substitute_patterns(ret, post_sub_patterns) - return ret diff --git a/antidox/perspective.py b/antidox/perspective.py index f4f32562..919672ab 100644 --- a/antidox/perspective.py +++ b/antidox/perspective.py @@ -1,22 +1,27 @@ """ inputs comments to perspective and dlp apis and detects toxicity and personal information> has support for csv files, bigquery tables, and wikipedia talk pages""" -#TODO(tamajongnc): configure pipeline to distribute work to multiple machines -#TODO(tamajongnc): use windowing technique to accomodate large and continuous data sets # pylint: disable=fixme, import-error # pylint: disable=fixme, unused-import import argparse import json import sys +from google.cloud import bigquery +from googleapiclient import discovery +from googleapiclient import errors as google_api_errors +import pandas as pd +import requests import apache_beam as beam from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.pipeline_options import PipelineOptions -import requests -import pandas as pd -from antidox import clean -from googleapiclient import errors as google_api_errors -from googleapiclient import discovery -from google.cloud import bigquery +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import WorkerOptions +from apache_beam import window +import clean + + def get_client(): """ generates API client with personalized API key """ @@ -72,10 +77,10 @@ def dlp_request(dlp, apikey_data, comment): "name":"PASSPORT" }, { - "name":"PERSON_NAME" + "name":"GCP_CREDENTIALS" }, { - "name":"ALL_BASIC" + "name":"SWIFT_CODE" } ], "minLikelihood":"POSSIBLE", @@ -116,6 +121,22 @@ def contains_toxicity(perspective_response): is_toxic = True return is_toxic +def contains_threat(perspective_response): + """Checking/returning comments with a threat value of over 50 percent.""" + is_threat = False + if (perspective_response['attributeScores']['THREAT']['summaryScore'] + ['value'] >= .5): + is_threat = True + return is_threat + +def contains_insult(perspective_response): + """Checking/returning comments with an insult value of over 50 percent.""" + is_insult = False + if (perspective_response['attributeScores']['INSULT']['summaryScore'] + ['value'] >= .5): + is_insult = True + return is_insult + def get_wikipage(pagename): """ Gets all content from a wikipedia page and turns it into plain text. """ @@ -126,11 +147,14 @@ def get_wikipage(pagename): text_response = response['query']['pages'][0]['revisions'][0]['content'] return text_response + def wiki_clean(get_wikipage): + """cleans the comments from wikipedia pages""" text = clean.content_clean(get_wikipage) - print (text) + print(text) return text + def use_query(content, sql_query, big_q): """make big query api request""" query_job = big_q.query(sql_query) @@ -140,8 +164,8 @@ def use_query(content, sql_query, big_q): strlst.append(row[content]) return strlst - # pylint: disable=fixme, too-many-locals +# pylint: disable=fixme, too-many-statements def main(argv): """ runs dlp and perspective on content passed in """ parser = argparse.ArgumentParser(description='Process some integers.') @@ -152,36 +176,74 @@ def main(argv): parser.add_argument('--csv_file', help='choose CSV file to process') parser.add_argument('--wiki_pagename', help='insert the talk page name') parser.add_argument('--content', help='specify a column in dataset to retreive data from') - parser.add_argument('--output', help='path for output file') - parser.add_argument('--suffix', help='output file suffix') - parser.add_argument('--project', help='project id for bigquery table') - args = parser.parse_args(argv) + parser.add_argument('--output', help='path for output file in cloud bucket') + parser.add_argument('--nd_output', help='gcs path to store ndjson results') + parser.add_argument('--project', help='project id for bigquery table', \ + default='wikidetox-viz') + parser.add_argument('--gproject', help='gcp project id') + parser.add_argument('--temp_location', help='cloud storage path for temp files \ + must begin with gs://') + args, pipe_args = parser.parse_known_args(argv) apikey_data, perspective, dlp = get_client() - with beam.Pipeline(options=PipelineOptions()) as pipeline: + options = PipelineOptions(pipe_args) + gcloud_options = options.view_as(GoogleCloudOptions) + gcloud_options.project = 'google.com:new-project-242016' + gcloud_options.staging_location = 'gs://tj_cloud_bucket/stage' + gcloud_options.temp_location = 'gs://tj_cloud_bucket/tmp' + options.view_as(StandardOptions).runner = 'dataflow' + options.view_as(WorkerOptions).num_workers = 100 + options.view_as(SetupOptions).save_main_session = True + with beam.Pipeline(options=options) as pipeline: if args.wiki_pagename: wiki_response = get_wikipage(args.wiki_pagename) wikitext = wiki_clean(wiki_response) text = wikitext.split("\n") comments = pipeline | beam.Create(text) if args.csv_file: - comments = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(args.csv_file) + comments = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(pd.read_csv(args.csv_file)) if args.sql_query: comments = ( pipeline | 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource( query=args.sql_query, - use_standard_sql=True)) + use_standard_sql=True)) \ | beam.Map(lambda elem: elem[args.content])) + # pylint: disable=fixme, too-few-public-methods + class NDjson(beam.DoFn): + """class for NDJson""" + + # pylint: disable=fixme, no-self-use + # pylint: disable=fixme, inconsistent-return-statements + def process(self, element): + """Takes toxicity and dlp results and converst them to NDjson""" + try: + dlp_response = dlp_request(dlp, apikey_data, element) + perspective_response = perspective_request(perspective, element) + if contains_toxicity(perspective_response): + data = {'comment': element, + 'Toxicity': str(perspective_response['attributeScores'] + ['TOXICITY']['summaryScore']['value'])} + return [json.dumps(data) + '\n'] + has_pii_bool, pii_type = contains_pii(dlp_response) + if has_pii_bool: + data = {'comment': element, \ + 'pii_detected': str(pii_type) \ + } + return [json.dumps(data) + '\n'] + except google_api_errors.HttpError as err: + print('error', err) + # pylint: disable=fixme, too-few-public-methods class GetToxicity(beam.DoFn): - """The DoFn to perform on each element in the input PCollection""" + # pylint: disable=fixme, no-self-use # pylint: disable=fixme, inconsistent-return-statements def process(self, element): """Runs every element of collection through perspective and dlp""" - print(element) + + print(repr(element)) print('==============================================\n') if not element: return None @@ -189,25 +251,35 @@ def process(self, element): dlp_response = dlp_request(dlp, apikey_data, element) has_pii_bool, pii_type = contains_pii(dlp_response) perspective_response = perspective_request(perspective, element) + has_pii_bool, pii_type = contains_pii(dlp_response) if has_pii_bool: - pii = [element+"\n"+'contains pii?'+"Yes"+"\n"+str(pii_type)+"\n" \ - +"==============================================="+"\n"] + pii = (json.dumps({"comment_text":element, "contains_pii": True, "pii_type":pii_type})+"\n") return pii if contains_toxicity(perspective_response): - tox = [element+"\n" +"contains TOXICITY?:"+"Yes" - +"\n"+str(perspective_response['attributeScores'] - ['TOXICITY']['summaryScore']['value'])+"\n" - +"=========================================="+"\n"] + tox = (json.dumps({"comment_text":element, "contains_toxicity": True, + "summaryScore":perspective_response['attributeScores'] + ['TOXICITY']['summaryScore']['value']})+"\n") return tox + if contains_threat(perspective_response): + threat = (json.dumps({"comment_text":element, "contains_threat": True, + "summaryScore":perspective_response['attributeScores'] + ['THREAT']['summaryScore']['value']})+"\n") + return threat + if contains_insult(perspective_response): + insult = (json.dumps({"comment_text":element, "contains_insult": True, + "summaryScore":perspective_response['attributeScores'] + ['INSULT']['summaryScore']['value']})+"\n") + return insult except google_api_errors.HttpError as err: print('error', err) - results = comments\ + results = comments \ | beam.ParDo(GetToxicity()) + json_results = comments \ + | beam.ParDo(NDjson()) # pylint: disable=fixme, expression-not-assigned results | 'WriteToText' >> beam.io.WriteToText( - args.output, \ - file_name_suffix=args.suffix) + 'gs://tj_cloud_bucket/beam.txt') + json_results | 'WriteToText2' >> beam.io.WriteToText( + 'gs://tj_cloud_bucket/results.json') if __name__ == '__main__': main(sys.argv[1:]) - - diff --git a/antidox/perspective_test.py b/antidox/perspective_test.py index 37926acb..d8e4961e 100644 --- a/antidox/perspective_test.py +++ b/antidox/perspective_test.py @@ -188,8 +188,20 @@ def test_contains_toxicity_false(self): "en" ] } - is_toxic = perspective.contains_toxicity(perspective_response) + is_toxic = perspective.contains_insult(perspective_response) self.assertFalse(is_toxic) + + def test_contains_toxicity_false(self): + perspective_response = \ + {'attributeScores': {'TOXICITY': {'spanScores': [{'begin': 0, 'end': 25, 'score': {'value': 0.9312127, 'type': 'PROBABILITY'}}], 'summaryScore': {'value': 0.9312127, 'type': 'PROBABILITY'}}, 'THREAT': {'spanScores': [{'begin': 0, 'end': 25, 'score': {'value': 0.15875438, 'type': 'PROBABILITY'}}], 'summaryScore': {'value': 0.15875438, 'type': 'PROBABILITY'}}, 'INSULT': {'spanScores': [{'begin': 0, 'end': 25, 'score': {'value': 0.93682694, 'type': 'PROBABILITY'}}], 'summaryScore': {'value': 0.93682694, 'type': 'PROBABILITY'}}}, 'languages': ['en'], 'detectedLanguages': ['en']} + is_insult = perspective.contains_toxicity(perspective_response) + self.assertTrue(is_insult) + + def contains_threat(perspective_response): + perspective_response = \ + {'attributeScores': {'INSULT': {'spanScores': [{'begin': 0, 'end': 21, 'score': {'value': 0.55873775, 'type': 'PROBABILITY'}}], 'summaryScore': {'value': 0.55873775, 'type': 'PROBABILITY'}}, 'TOXICITY': {'spanScores': [{'begin': 0, 'end': 21, 'score': {'value': 0.9759337, 'type': 'PROBABILITY'}}], 'summaryScore': {'value': 0.9759337, 'type': 'PROBABILITY'}}, 'THREAT': {'spanScores': [{'begin': 0, 'end': 21, 'score': {'value': 0.9980843, 'type': 'PROBABILITY'}}], 'summaryScore': {'value': 0.9980843, 'type': 'PROBABILITY'}}}, 'languages': ['en'], 'detectedLanguages': ['en']} + is_threat = perspective.contains_toxicity(perspective_response) + self.assertTrue(is_threat) def test_get_wikipage(self): wiki_response = \ u"""{{talkheader|wp=yes|WT:NYC|WT:WPNYC}} diff --git a/antidox/wikiwatcher.py b/antidox/wikiwatcher.py index 9993178a..b108dc19 100644 --- a/antidox/wikiwatcher.py +++ b/antidox/wikiwatcher.py @@ -7,6 +7,7 @@ import json import pprint import argparse +import pywikibot import requests import sseclient from googleapiclient import errors as google_api_errors @@ -28,17 +29,15 @@ def log_event(apikey_data, toxicity, dlp, change): # print('\n########## change:') from_id = (str(change['revision']['old'])) to_id = (str(change['revision']['new'])) - page = ("https://en.wikipedia.org/w/api.php?action=compare&fromrev=" - + from_id + "&torev=" + to_id + "&format=json") + page = ('https://en.wikipedia.org/w/api.php?action=compare&fromrev=' + + from_id + '&torev=' + to_id + '&format=json') get_page = requests.get(page) - response = json.loads(get_page.content) + response = json.loads(get_page.content.decode('utf-8')) revision = response['compare']['*'] text = clean.content_clean(revision) # for line in text: - pii_results = open("pii_results.txt", "a+") - toxicity_results = open("toxicity_results.txt", "a+") print(text) if not text: return @@ -47,24 +46,53 @@ def log_event(apikey_data, toxicity, dlp, change): perspective_response = perspective.perspective_request(toxicity, text) # Perspective can't handle language errors at this time except google_api_errors.HttpError as err: - print("Error:", err) + print('Error:', err) return has_pii_bool, pii_type = perspective.contains_pii(dlp_response) if has_pii_bool: - pii_results.write(u'user:{user} namespace:{namespace} bot:{bot} comment:{comment}'+ - 'title:{title}'.format(**change)+"\n"+str(text)+"\n"+'contains pii?' - +"Yes"+"\n" - +str(pii_type)+"\n" - +"==============================================="+"\n") + header = '==Possible Doxxing Detected: Waiting for review==' + result = ( + u'{' + 'user:{user}, namespace:{namespace}, bot:{bot}, comment:{comment}' + + 'title:{title},'.format(**change) + ', ' + 'comment_text:' + str(text) + + ', ' + 'contains_pii:' + 'True' + ', ' + 'pii_type:' + str(pii_type) + + ', ' + '}' + '\n') + wiki_write(result, header) + if perspective.contains_toxicity(perspective_response): - toxicity_results.write(u'user:{user} namespace:{namespace} bot:{bot} comment:{comment}'+ - 'title:{title}'.format(**change)+"\n"+str(text)+"\n" - +"contains TOXICITY?:"+"Yes"+"\n"+ - str(perspective_response['attributeScores'] - ['TOXICITY']['summaryScore']['value'])+"\n" - +"=========================================="+"\n") - toxicity_results.close() - pii_results.close() + header = '==Possibly Toxic Detected: Waiting for review==' + result = ( + u'{' + 'user:{user}, namespace:{namespace}, bot:{bot}, comment:{comment}' + + 'title:{title}'.format(**change) + ', ' + 'comment_text:' + str(text) + + ', ' + 'contains_toxicity:' + 'True' + ', ' + 'toxic_score:' + + str(perspective_response['attributeScores'] + + ['TOXICITY']['summaryScore']['value']) + ', ' + '}' + '\n') + wiki_write(result, header) + + +def wiki_write(result, header): + site = pywikibot.Site() + repo = site.data_repository() + page = pywikibot.Page(site, u'User_talk:DoxDetective') + + heading = (header) + content = (result) + message = '\n\n{}\n{} --~~~~'.format(heading, content) + page.save( + summary='Testing', + watch=None, + minor=False, + botflag=True, + force=False, + async=False, + callback=None, + apply_cosmetic_changes=None, + appendtext=message) def watcher(event_source, wiki_filter, namespaces_filter, callback): @@ -81,7 +109,7 @@ def watcher(event_source, wiki_filter, namespaces_filter, callback): try: change = json.loads(event.data) except json.decoder.JSONDecodeError as err: - print("Error:", err) + print('Error:', err) pprint.pprint(event.data) continue if change['bot']: @@ -90,9 +118,9 @@ def watcher(event_source, wiki_filter, namespaces_filter, callback): continue if change['namespace'] not in namespaces_filter: continue - if "revision" not in change: + if 'revision' not in change: continue - if "old" not in change['revision']: + if 'old' not in change['revision']: continue callback(change) @@ -104,7 +132,7 @@ def watcher(event_source, wiki_filter, namespaces_filter, callback): parser.add_argument( '--namespaces', default='1,3', - help='Namespaces defined in http://phabricator.wikimedia.'+ + help='Namespaces defined in http://phabricator.wikimedia.' + 'org/source/mediawiki/browse/master/includes/Defines.php separated by commas.' ) parser.add_argument( @@ -117,6 +145,8 @@ def watcher(event_source, wiki_filter, namespaces_filter, callback): client = sseclient.SSEClient(args.url) apikey_data, toxicity, dlp = perspective.get_client() + def log_change(change): return log_event(apikey_data, toxicity, dlp, change) + watcher(client, args.wiki_filter, namespaces, log_change) diff --git a/antidox/wikiwatcher_test.py b/antidox/wikiwatcher_test.py index fba4f51f..57adf050 100644 --- a/antidox/wikiwatcher_test.py +++ b/antidox/wikiwatcher_test.py @@ -7,7 +7,7 @@ import sys import unittest -from antidox import wikiwatcher +import wikiwatcher if sys.version_info >= (3, 3): from unittest import mock # pylint: disable=g-import-not-at-top,g-importing-member @@ -26,10 +26,10 @@ class WikiWatcherTest(unittest.TestCase): def test_wikiwatcher(self): events = [ - FakeEvent('{"wiki":"enwiki","bot":false,"title":"Yep!","namespace":1,"revision":{"old":1, "new":2}}'), - FakeEvent('{"wiki":"frwiki","bot":false,"title":"Non!","namespace":0,"revision":{"old":1, "new":2}}'), - FakeEvent('{"wiki":"enwiki","bot":true,"title":"Nope","namespace":0,"revision":{"old":1, "new":2}}'), - FakeEvent('{"wiki":"enwiki","bot":true,"title":"Nope","namespace":1,"revision":{"old":1, "new":2}}') + FakeEvent('{"wiki":"enwiki","bot":false,"title":"Yep!","namespace":1}'), + FakeEvent('{"wiki":"frwiki","bot":false,"title":"Non!","namespace":0}'), + FakeEvent('{"wiki":"enwiki","bot":true,"title":"Nope","namespace":0}'), + FakeEvent('{"wiki":"enwiki","bot":true,"title":"Nope","namespace":1}') ] callback = mock.Mock() wikiwatcher.watcher(events, 'enwiki', set([1, 3]), callback) @@ -37,8 +37,7 @@ def test_wikiwatcher(self): u'wiki': u'enwiki', u'namespace': 1, u'bot': False, - u'title': u'Yep!', - u'revision': {u'old':1, u'new':2}, + u'title': u'Yep!' })