Source code for scrubadub.post_processors.filth_replacer


import os
import math
import hashlib

from typing import Sequence, Optional, Union, Dict
from collections import defaultdict

from scrubadub.filth import Filth, MergedFilth, TaggedEvaluationFilth
from scrubadub.post_processors.base import PostProcessor
from scrubadub.post_processors.catalogue import register_post_processor
from scrubadub import utils


[docs]class FilthReplacer(PostProcessor): """Creates tokens that are used to replace the Filth found in the text of a document. This can be configured to include the filth type (eg phone, name, email, ...), a unique number for each piece of Filth, and a hash of the Filth. >>> import scrubadub >>> scrubber = scrubadub.Scrubber(post_processor_list=[ ... scrubadub.post_processors.FilthReplacer(), ... ]) >>> scrubber.clean("Contact me at 522-368-8530 or hernandezjenna@example.com") 'Contact me at PHONE or EMAIL' >>> scrubber = scrubadub.Scrubber(post_processor_list=[ ... scrubadub.post_processors.FilthReplacer(include_hash=True, hash_salt='example', hash_length=8), ... ]) >>> scrubber.clean("Contact me at 522-368-8530 or hernandezjenna@example.com") 'Contact me at PHONE-7358BF44 or EMAIL-AC0B8AC3' >>> scrubber = scrubadub.Scrubber(post_processor_list=[ ... scrubadub.post_processors.FilthReplacer(include_count=True), ... ]) >>> scrubber.clean("Contact me at taylordaniel@example.com or hernandezjenna@example.com, " ... "but taylordaniel@example.com is probably better.") 'Contact me at EMAIL-0 or EMAIL-1, but EMAIL-0 is probably better.' """ name = 'filth_replacer' # type: str autoload = False index = 0 # NOTE: this is not an efficient way to store this in memory. could # alternatively hash the type and text and do away with the overhead # bits of storing the tuple in the lookup typed_lookup = defaultdict(lambda: utils.Lookup(), {}) # type: Dict[str, utils.Lookup]
[docs] def __init__(self, include_type: bool = True, include_count: bool = False, include_hash: bool = False, uppercase: bool = True, separator: Optional[str] = None, hash_length: Optional[int] = None, hash_salt: Optional[Union[str, bytes]] = None, **kwargs): """Initialise the FilthReplacer. :param include_type: :type include_type: bool, default True :param include_count: :type include_count: bool, default False :param include_hash: :type include_hash: bool, default False :param uppercase: Make the label uppercase :type uppercase: bool, default True :param separator: Used to separate labels if a merged filth is being replaced :type separator: Optional[str], default None :param hash_length: The length of the hexadecimal hash :type hash_length: Optional[int], default None :param hash_salt: The salt used in the hashing process :type hash_salt: Optional[Union[str, bytes]], default None """ super(FilthReplacer, self).__init__(**kwargs) self.include_type = include_type self.include_count = include_count self.include_hash = include_hash self.uppercase = uppercase self.separator = separator or '+' self.hash_length = hash_length or 16 if isinstance(hash_salt, str): self.hash_salt = hash_salt.encode('utf8') # type: bytes else: self.hash_salt = os.urandom(128)
[docs] @classmethod def reset_lookup(cls): """Reset the lookups that maintain a map of filth to a numeric ID.""" cls.typed_lookup = defaultdict(lambda: utils.Lookup(), {})
[docs] def filth_label(self, filth: Filth) -> str: """This function takes a filth and creates a label that can be used to replace the original text. :param filth: Limit the named entities to those in this list, defaults to ``{'PERSON', 'PER', 'ORG'}`` :type filth: Filth :return: The replacement label that should be used for this `Filth`. :rtype: str """ filths = [filth] if isinstance(filth, MergedFilth): filths = filth.filths replacements = set() for f in filths: replacement_pieces = [] if self.include_type: filth_type = getattr(f, 'type', None) if filth_type is None: continue if filth_type == TaggedEvaluationFilth.type: filth_comparison_type = getattr(f, 'comparison_type', None) if filth_comparison_type is not None: filth_type += '_' + filth_comparison_type filth_type = filth_type.replace(' ', '_') replacement_pieces.append(filth_type) if self.include_count: replacement_pieces.append(str(FilthReplacer.typed_lookup[filth_type][f.text.lower()])) if self.include_hash: replacement_pieces.append(FilthReplacer.get_hash(f.text.lower(), self.hash_salt, self.hash_length)) if len(replacement_pieces) == 0: replacement_pieces = ['filth'] replacements.add('-'.join(replacement_pieces)) label = self.separator.join(sorted(replacements)) if self.uppercase: label = label.upper() return label
[docs] @staticmethod def get_hash(text: str, salt: bytes, length: int) -> str: """Get a hash of some text, that has been salted and truncated. :param text: The text to be hashed :type text: str :param salt: The salt that should be used in this hashing :type salt: bytes :param length: The number of characters long that the hexadecimal hash should be :type length: int :return: The hash of the text :rtype: str """ return hashlib.pbkdf2_hmac( hash_name='sha256', password=text.encode('utf8'), salt=salt, iterations=100000, dklen=math.ceil(length / 2), ).hex()[:length]
[docs] def process_filth(self, filth_list: Sequence[Filth]) -> Sequence[Filth]: """Processes the filth to replace the original text :param filth_list: The text to be hashed :type filth_list: Sequence[Filth] :return: The processed filths :rtype: Sequence[Filth] """ for filth_item in filth_list: filth_item.replacement_string = self.filth_label(filth=filth_item) return filth_list
register_post_processor(FilthReplacer) __all__ = ['FilthReplacer']