Source code for scrubadub.scrubbers

import warnings
from typing import Optional, Sequence, Generator, Dict, Type, Union, List

from . import detectors
from . import post_processors
from .detectors import Detector
from .post_processors import PostProcessor
from .filth import Filth


[docs]class Scrubber: """The Scrubber class is used to clean personal information out of dirty dirty text. It manages a set of ``Detector``'s that are each responsible for identifying ``Filth``. ``PostProcessor`` objects are used to alter the found Filth. This could be to replace the Filth with a hash or token. """
[docs] def __init__(self, detector_list: Optional[Sequence[Union[Type[Detector], Detector, str]]] = None, post_processor_list: Optional[Sequence[Union[Type[PostProcessor], PostProcessor, str]]] = None, locale: Optional[str] = None): """Create a ``Scrubber`` object. :param detector_list: The list of detectors to use in this scrubber. :type detector_list: Optional[Sequence[Union[Type[Detector], Detector, str]]] :param post_processor_list: The locale that the phone number should adhere to. :type post_processor_list: Optional[Sequence[Union[Type[Detector], Detector, str]]] :param locale: The locale of the documents in the format: 2 letter lower-case language code followed by an underscore and the two letter upper-case country code, eg "en_GB" or "de_CH". :type locale: str, optional """ super().__init__() # instantiate all of the detectors which, by default, uses all of the # detectors that are in the detectors.types dictionary self._detectors = {} # type: Dict[str, Detector] self._post_processors = [] # type: List[PostProcessor] if locale is None: locale = 'en_US' self._locale = locale # type: str if detector_list is None: # First we gather all detectors that should automatically load detector_list = [ detector for detector in detectors.catalogue.detector_catalogue.get_all().values() if detector.autoload and ( # Then we filter out ones that don't support the current locale not hasattr(detector, 'supported_locale') or ( hasattr(detector, 'supported_locale') and detector.supported_locale(locale) # type: ignore ) ) ] for detector in detector_list: self.add_detector(detector, warn=True) if post_processor_list is None: post_processor_list = [ post_processor for post_processor in sorted( post_processors.catalogue.post_processor_catalogue.get_all().values(), key=lambda pp: pp.index, ) if post_processor.autoload ] for post_processor in post_processor_list: self.add_post_processor(post_processor)
[docs] def add_detector(self, detector: Union[Detector, Type[Detector], str], warn: bool = True): """Add a ``Detector`` to Scrubber You can add a detector to a ``Scrubber`` by passing one of three objects to this function: 1. the uninitalised class to this function, which initialises the class with default settings. 2. an instance of a ``Detector`` class, where you can initialise it with the settings desired. 3. a string containing the name of the detector, which again initialises the class with default settings. .. code:: pycon >>> import scrubadub >>> scrubber = scrubadub.Scrubber(detector_list=[]) >>> scrubber.add_detector(scrubadub.detectors.CreditCardDetector) >>> scrubber.add_detector('skype') >>> detector = scrubadub.detectors.DateOfBirthDetector(require_context=False) >>> scrubber.add_detector(detector) :param detector: The ``Detector`` to add to this scrubber. :type detector: a Detector class, a Detector instance, or a string with the detector's name :param warn: raise a warning if the locale is not supported by the detector. :type warn: bool, default True """ if isinstance(detector, type): if not issubclass(detector, Detector): raise TypeError(( '"%(detector)s" is not a subclass of Detector' ) % locals()) self._check_and_add_detector(detector(locale=self._locale), warn=warn) elif isinstance(detector, Detector): self._check_and_add_detector(detector, warn=warn) elif isinstance(detector, str): detector_cls = detectors.catalogue.detector_catalogue.get(detector) self._check_and_add_detector(detector_cls(locale=self._locale), warn=warn)
[docs] def remove_detector(self, detector: Union[Detector, Type[Detector], str]): """Remove a ``Detector`` from a Scrubber You can remove a detector from a ``Scrubber`` by passing one of three objects to this function: 1. the uninitalised class to this function, which removes the initalised detector of the same name. 2. an instance of a ``Detector`` class, which removes the initalised detector of the same name. 3. a string containing the name of the detector, which removed the detector of that name. .. code:: pycon >>> import scrubadub >>> scrubber = scrubadub.Scrubber() >>> scrubber.remove_detector(scrubadub.detectors.CreditCardDetector) >>> scrubber.remove_detector('url') >>> detector = scrubadub.detectors.email.EmailDetector() >>> scrubber.remove_detector(detector) :param detector: The ``Detector`` to remove from this scrubber. :type detector: a Detector class, a Detector instance, or a string with the detector's name """ if isinstance(detector, type): self._detectors.pop(detector().name) elif isinstance(detector, detectors.base.Detector): self._detectors.pop(detector.name) elif isinstance(detector, str): self._detectors.pop(detector)
def _check_and_add_detector(self, detector: Detector, warn: bool = False): """Check the types and add the detector to the scrubber""" if not isinstance(detector, Detector): raise TypeError(( 'The detector "{}" is not an instance of the ' 'Detector class.' ).format(detector)) name = detector.name if hasattr(detector, 'supported_locale'): if not detector.supported_locale(self._locale): # type: ignore if warn: warnings.warn("Detector {} does not support the scrubber locale '{}'.".format(name, self._locale)) if name in self._detectors: raise KeyError(( 'can not add Detector "%(name)s" to this Scrubber, this name is already in use. ' 'Try removing it first.' ) % locals()) self._detectors[name] = detector
[docs] def add_post_processor(self, post_processor: Union[PostProcessor, Type[PostProcessor], str], index: int = None): """Add a ``PostProcessor`` to a Scrubber You can add a post-processor to a ``Scrubber`` by passing one of three objects to this function: 1. the uninitalised class to this function, which initialises the class with default settings. 2. an instance of a ``PostProcessor`` class, where you can initialise it with the settings desired. 3. a string containing the name of the detector, which again initialises the class with default settings. .. code:: pycon >>> import scrubadub, scrubadub.post_processors >>> scrubber = scrubadub.Scrubber() >>> scrubber.add_post_processor('filth_replacer') >>> scrubber.add_post_processor(scrubadub.post_processors.PrefixSuffixReplacer) :param post_processor: The ``PostProcessor`` to remove from this scrubber. :type post_processor: a PostProcessor class, a PostProcessor instance, or a string with the post-processor's name """ if isinstance(post_processor, type): if not issubclass(post_processor, PostProcessor): raise TypeError(( '"%(post_processor)s" is not a subclass of PostProcessor' ) % locals()) self._check_and_add_post_processor(post_processor(), index=index) elif isinstance(post_processor, PostProcessor): self._check_and_add_post_processor(post_processor, index=index) elif isinstance(post_processor, str): if post_processor in post_processors.catalogue.post_processor_catalogue: self._check_and_add_post_processor( post_processors.catalogue.post_processor_catalogue.get(post_processor)(), index=index ) else: raise ValueError("Unknown PostProcessor: {}".format(post_processor))
[docs] def remove_post_processor(self, post_processor: Union[PostProcessor, Type[PostProcessor], str]): """Remove a ``PostProcessor`` from a Scrubber You can remove a post-processor from a ``Scrubber`` by passing one of three objects to this function: 1. the uninitalised class to this function, which removes the initalised post-processor of the same name. 2. an instance of a ``PostProcessor`` class, which removes the initalised post-processor of the same name. 3. a string containing the name of the detector, which removed the post-processor of that name. .. code:: pycon >>> import scrubadub, scrubadub.post_processors >>> scrubber = scrubadub.Scrubber() >>> scrubber.remove_post_processor('filth_type_replacer') >>> scrubber.remove_post_processor(scrubadub.post_processors.PrefixSuffixReplacer) :param post_processor: The ``PostProcessor`` to remove from this scrubber. :type post_processor: a PostProcessor class, a PostProcessor instance, or a string with the post-processor's name """ if isinstance(post_processor, type): self._post_processors = [x for x in self._post_processors if x.name != post_processor().name] elif isinstance(post_processor, post_processors.base.PostProcessor): self._post_processors = [x for x in self._post_processors if x.name != post_processor.name] elif isinstance(post_processor, str): self._post_processors = [x for x in self._post_processors if x.name != post_processor]
def _check_and_add_post_processor(self, post_processor: PostProcessor, index: int = None): """Check the types and add the PostProcessor to the scrubber""" if not isinstance(post_processor, PostProcessor): raise TypeError(( 'The PostProcessor "{}" is not an instance of the ' 'PostProcessor class.' ).format(post_processor)) name = post_processor.name if name in [pp.name for pp in self._post_processors]: raise KeyError(( 'can not add PostProcessor "%(name)s" to this Scrubber, this name is already in use. ' 'Try removing it first.' ) % locals()) if index is None: self._post_processors.append(post_processor) else: self._post_processors.insert(index, post_processor)
[docs] def clean(self, text: str, **kwargs) -> str: """This is the master method that cleans all of the filth out of the dirty dirty ``text``. All keyword arguments to this function are passed through to the ``Filth.replace_with`` method to fine-tune how the ``Filth`` is cleaned. """ if 'replace_with' in kwargs: warnings.warn("Use of replace_with is depreciated in favour of using PostProcessors", DeprecationWarning) # We are collating all Filths so that they can all be passed to the post processing step together. # This is needed for some operations within the PostProcesssors. # It could be improved if we know which post processors need collated Filths. filth_list = list(self.iter_filth(text, document_name=None)) # type: Sequence[Filth] filth_list = self._post_process_filth_list(filth_list) return self._replace_text(text=text, filth_list=filth_list, document_name=None, **kwargs)
[docs] def clean_documents(self, documents: Union[Sequence[str], Dict[Optional[str], str]], **kwargs) -> \ Union[Dict[Optional[str], str], Sequence[str]]: """This is the master method that cleans all of the filth out of the dirty dirty ``text``. All keyword arguments to this function are passed through to the ``Filth.replace_with`` method to fine-tune how the ``Filth`` is cleaned. """ if 'replace_with' in kwargs: warnings.warn("Use of replace_with is depreciated in favour of using PostProcessors", DeprecationWarning) # We are collating all Filths so that they can all be passed to the post processing step together. # This is needed for some operations within the PostProcesssors. # It could be improved if we know which post processors need collated Filths. filth_list = [] # type: Sequence[Filth] if isinstance(documents, (list, dict)): filth_list = list(self.iter_filth_documents(documents=documents, run_post_processors=True)) else: raise TypeError( 'documents type should be one of: list of strings or a dict of strings with the key as the ' 'document title.' ) if isinstance(documents, list): clean_documents = [ self._replace_text(text=text, filth_list=filth_list, document_name=str(name), **kwargs) for name, text in enumerate(documents) ] # type: Union[Dict[Optional[str], str], Sequence[str]] elif isinstance(documents, dict): clean_documents = { name: self._replace_text(text=text, filth_list=filth_list, document_name=name, **kwargs) for name, text in documents.items() } return clean_documents
def _replace_text( self, text: str, filth_list: Sequence[Filth], document_name: Optional[str], **kwargs ) -> str: filth_list = [filth for filth in filth_list if filth.document_name == document_name] if len(filth_list) == 0: return text filth_list = self._sort_filths(filth_list) # TODO: expensive sort may not be needed filth = None # type: Optional[Filth] clean_chunks = [] for next_filth in filth_list: clean_chunks.append(text[(0 if filth is None else filth.end):next_filth.beg]) if next_filth.replacement_string is not None: clean_chunks.append(next_filth.replacement_string) else: clean_chunks.append(next_filth.replace_with(**kwargs)) filth = next_filth if filth is not None: clean_chunks.append(text[filth.end:]) return u''.join(clean_chunks) def _post_process_filth_list(self, filth_list: Sequence[Filth]) -> Sequence[Filth]: # We are collating all Filths so that they can all be passed to the post processing step together. # This is needed for some operations within the PostProcesssors. # It could be improved if we know which post processors need collated Filths. for post_processor in self._post_processors: filth_list = post_processor.process_filth(filth_list) return filth_list
[docs] def iter_filth( self, text: str, document_name: Optional[str] = None, run_post_processors: bool = True ) -> Generator[Filth, None, None]: """Iterate over the different types of filth that can exist. """ # Iterates using iter_filth documents. # If a name is not provided, passes a list with one element, [text] yield from self.iter_filth_documents(documents={document_name: text}, run_post_processors=run_post_processors)
@staticmethod def _detector_iter_filth_iterator(detector: Detector, document_list: Sequence[str], document_names: Sequence[Optional[str]]) -> Generator[Filth, None, None]: for doc_name, text in zip(document_names, document_list): yield from detector.iter_filth(text, document_name=doc_name)
[docs] def iter_filth_documents( self, documents: Union[Sequence[str], Dict[Optional[str], str]], run_post_processors: bool = True ) -> Generator[Filth, None, None]: """Iterate over the different types of filth that can exist.""" if not isinstance(documents, (dict, list)): raise TypeError('documents must be one of a string, list of strings or dict of strings.') # Figures out which detectors have iter_filth_documents and applies to them if isinstance(documents, dict): document_names = list(documents.keys()) document_texts = list(documents.values()) elif isinstance(documents, (tuple, list)): document_texts = documents document_names = [str(x) for x in range(len(documents))] else: raise TypeError(f'documents should be one of dict, list or tuple, but got unsupported type: ' f'{type(documents)}') # currently doing this by aggregating all_filths and then sorting # inline instead of with a Filth.__cmp__ method, which is apparently # much slower http://stackoverflow.com/a/988728/564709 # # NOTE: we could probably do this in a more efficient way by iterating # over all detectors simultaneously. just trying to get something # working right now and we can worry about efficiency later filth_list = [] # type: List[Filth] for name, detector in self._detectors.items(): try: filth_iterator = detector.iter_filth_documents( document_list=document_texts, document_names=document_names, ) except NotImplementedError: filth_iterator = self._detector_iter_filth_iterator( detector=detector, document_list=document_texts, document_names=document_names, ) for filth in filth_iterator: if not isinstance(filth, Filth): raise TypeError('iter_filth must always yield Filth') if not filth.is_valid(): continue filth_list.append(filth) # This is split up so that we only have to use lists if we have to post_process Filth if run_post_processors: all_filths = list(self._merge_filths(filth_list)) all_filths = list(self._post_process_filth_list(all_filths)) # Here we loop over a list of Filth... for filth in all_filths: yield filth else: # ... but here, we're using a generator. If we try to use the same variable it would have two types and # fail static typing in mypy yield from self._merge_filths(filth_list)
@staticmethod def _sort_filths(filth_list: Sequence[Filth]) -> List[Filth]: """Sorts a list of filths, needed before merging and concatenating""" # Sort by start position. If two filths start in the same place then # return the longer one first filth_list = list(filth_list) filth_list.sort(key=lambda f: ( str(getattr(f, 'document_name', None) if hasattr(f, 'document_name') else ''), f.beg, -f.end )) return filth_list @staticmethod def _merge_filths(filth_list: Sequence[Filth]) -> Generator[Filth, None, None]: """This is where the Scrubber does its hard work and merges any overlapping filths. """ if not filth_list: return document_name_set = {f.document_name for f in filth_list} document_names = [] # type: Sequence[Optional[str]] if None in document_name_set: list_with_none = [None] # type: Sequence[Optional[str]] list_with_others = sorted([x for x in document_name_set if x is not None]) # type: Sequence[Optional[str]] document_names = list(list_with_none) + list(list_with_others) else: document_names = sorted([x for x in document_name_set if x is not None]) for document_name in document_names: document_filth_list = Scrubber._sort_filths([f for f in filth_list if f.document_name == document_name]) filth = document_filth_list[0] for next_filth in document_filth_list[1:]: if filth.end < next_filth.beg: yield filth filth = next_filth else: filth = filth.merge(next_filth) yield filth