# -*- coding: utf-8 -*-
import os
from itertools import (chain, )
from pickle import HIGHEST_PROTOCOL as pickle_HIGHEST_PROTOCOL
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import (ValidationError, ImproperlyConfigured)
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
import numpy as np
from picklefield.fields import PickledObjectField
from sklearn.feature_extraction.text import (CountVectorizer,
TfidfVectorizer)
from sklearn.model_selection import (cross_val_score, )
from scipy.sparse import csr_matrix
if 'DJANGO_TEST' in os.environ:
from django_ai.base.models import SupervisedLearningTechnique
from django_ai.base.utils import (get_model, )
else: # pragma: no cover
from base.models import (SupervisedLearningTechnique, )
from base.utils import (get_model, )
[docs]class SpamFilter(SupervisedLearningTechnique):
"""
Main object for the Spam Filtering System.
"""
#: BoW Decode Error choices
BOW_DECODE_ERROR_CHOICES = (
('strict', _('Strict')),
('ignore', _('Ignore')),
('replace', _('Replace')),
)
#: BoW Strip Accent choices
BOW_STRIP_ACCENTS_CHOICES = (
('ascii', 'ASCII'),
('unicode', 'Unicode'),
)
#: BoW Analyzer units choices
BOW_ANALYZER_CHOICES = (
('word', _('Word')),
('char', _('Character')),
('char_wb', _("Characters in Word-Boundaries")),
)
#: Cross Validation Available Metrics choices
CV_CHOICES = (
('accuracy', _("Accuracy")),
('average_precision', _("Average Precision")),
('f1', _("F1")),
('neg_log_loss', _("Logistic Loss")),
('precision', _("Precision")),
('recall', _("Recall")),
('roc_auc', _("Area under ROC Curve")),
)
#: Engine Object Vectorizer
engine_object_vectorizer = PickledObjectField(
"Engine Object Vectorizer",
protocol=pickle_HIGHEST_PROTOCOL,
blank=True, null=True
)
#: Engine Object Data
engine_object_data = PickledObjectField(
"Engine Object Data",
protocol=pickle_HIGHEST_PROTOCOL,
blank=True, null=True
)
#: Classifier to be used in the System, in the
#: "app_label.model|name" format, i.e.
#: "supervised_learning.SVC|My SVM"
classifier = models.CharField(
"Supervised Learning Classifier",
max_length=100, blank=True, null=True,
help_text=(
'Classifier to be used in the System, in the '
'"app_label.model|name" format, i.e. '
'"supervised_learning.SVC|My SVM"'
)
)
#: Whether to use a Spammable Model as a data source
spam_model_is_enabled = models.BooleanField(
"Use a Spammable Model?",
default=True,
help_text=(
'Use a Spammable Model'
)
)
#: "IsSpammable-Django Model" to be used with the Spam Filter (in
#: the "app_label.model" format, i.e. "examples.CommentOfMySite")
spam_model_model = models.CharField(
"Spammable Django Model",
max_length=100, blank=True, null=True,
help_text=(
'"IsSpammable-Django Model" to be used with the Spam Filter (in '
'the "app_label.model" format, i.e. "examples.CommentOfMySite")'
)
)
# -> Cross Validation
#: Metric to be evaluated in Cross Validation
cv_metric = models.CharField(
"Cross Validation Metric",
max_length=20, blank=True, null=True, choices=CV_CHOICES,
help_text=(
'Metric to be evaluated in Cross Validation'
)
)
# -> Bag of Words Transformation
#: Enable Bag of Words transformation
bow_is_enabled = models.BooleanField(
"Enable Bag of Words representation?",
default=True,
help_text=(
'Enable Bag of Words transformation'
)
)
# (skl) encoding : string, ‘utf-8’ by default.
#: Encoding to be used to decode the corpus
bow_enconding = models.CharField(
"(BoW) Encoding",
default='utf-8', max_length=20,
help_text=(
'Encoding to be used to decode.'
)
)
# (skl) decode_error : {‘strict’, ‘ignore’, ‘replace’}
#: Instruction on what to do if a byte sequence is given to
#: analyze that contains characters not of the given encoding.
#: By default, it is ‘strict’, meaning that a UnicodeDecodeError
#: will be raised. Other values are ‘ignore’ and ‘replace’.'
bow_decode_error = models.CharField(
"(BoW) Decode Error",
default='strict', max_length=20, choices=BOW_DECODE_ERROR_CHOICES,
help_text=_((
'Instruction on what to do if a byte sequence is given to '
'analyze that contains characters not of the given encoding. '
'By default, it is ‘strict’, meaning that a UnicodeDecodeError '
'will be raised. Other values are ‘ignore’ and ‘replace’.'
))
)
# (skl) strip_accents : {‘ascii’, ‘unicode’, None}
#: Remove accents during the preprocessing step. ‘ascii’ is a fast
#: method that only works on characters that have an direct ASCII
#: mapping. ‘unicode’ is a slightly slower method that works on
#: any characters. None (default) does nothing.
bow_strip_accents = models.CharField(
"(BoW) Strip Accents",
default=None, max_length=20, choices=BOW_STRIP_ACCENTS_CHOICES,
blank=True, null=True,
help_text=_((
'Remove accents during the preprocessing step. ‘ascii’ is a fast '
'method that only works on characters that have an direct ASCII '
'mapping. ‘unicode’ is a slightly slower method that works on '
'any characters. None (default) does nothing.'
))
)
# (skl) analyzer : string, {‘word’, ‘char’, ‘char_wb’} or callable
#: Whether the feature should be made of word or character n-grams.
#: Option ‘Chars in W-B’ creates character n-grams only from text inside
#: word boundaries; n-grams at the edges of words are padded with
#: space.'
bow_analyzer = models.CharField(
"(BoW) Analyzer",
default='word', max_length=20, choices=BOW_ANALYZER_CHOICES,
help_text=_((
'Whether the feature should be made of word or character n-grams. '
'Option ‘Chars in W-B’ creates character n-grams only from text '
'inside word boundaries; n-grams at the edges of words are padded '
'with space.'
))
)
# (skl) ngram_range : tuple (min_n, max_n)
#: The lower boundary of the range of n-values for
#: different n-grams to be extracted. All value
#: of n such that min_n <= n <= max_n will be used.
bow_ngram_range_min = models.SmallIntegerField(
"(BoW) n-gram Range - Min",
default=1,
help_text=_((
'The lower boundary of the range of n-values for '
'different n-grams to be extracted. All values '
'of n such that min_n <= n <= max_n will be used.'
))
)
#: The upper boundary of the range of n-values for
#: different n-grams to be extracted. All values
#: of n such that min_n <= n <= max_n will be used.
bow_ngram_range_max = models.SmallIntegerField(
"(BoW) n-gram Range - Max",
default=1,
help_text=_((
'The upper boundary of the range of n-values for '
'different n-grams to be extracted. All values '
'of n such that min_n <= n <= max_n will be used.'
))
)
# (skl) stop_words : string {‘english’}, list, or None (default)
#: If ‘english’, a built-in stop word list for English is used.
#: If a comma-separated string, that list is assumed to contain
#: stop words, all of which will be removed from the resulting
#: tokens. Only applies if analyzer == ´word´. If None, no stop
#: words will be used. max_df can be set to a value in the range
#: [0.7, 1.0) to automatically detect and filter stop words based
#: on intra corpus document frequency of terms.'
bow_stop_words = models.TextField(
"(BoW) Stop Words",
default=None, blank=True, null=True,
help_text=_((
'If ‘english’, a built-in stop word list for English is used. '
'If a comma-separated string, that list is assumed to contain '
'stop words, all of which will be removed from the resulting '
'tokens. Only applies if analyzer == ´word´. If None, no stop '
'words will be used. max_df can be set to a value in the range '
'[0.7, 1.0) to automatically detect and filter stop words based '
'on intra corpus document frequency of terms.'
))
)
# (skl) max_df : float in range [0.0, 1.0] or int, default=1.0
# When building the vocabulary ignore terms that have a document
# frequency strictly higher than the given threshold
# (corpus-specific stop words). If float, the parameter represents
# a proportion of documents, integer absolute counts. This
# parameter is ignored if vocabulary is not None.
bow_max_df = models.FloatField(
"(BoW) Maximum Document Frequency",
default=1.0,
help_text=_((
'When building the vocabulary ignore terms that have a document '
'frequency strictly higher than the given threshold '
'(corpus-specific stop words). If float, the parameter represents '
'a proportion of documents, integer absolute counts. This '
'parameter is ignored if vocabulary is not None.'
))
)
# (skl) min_df : float in range [0.0, 1.0] or int, default=1
#: When building the vocabulary ignore terms that have a document
#: frequency strictly lower than the given threshold. This value is
#: also called cut-off in the literature. If float, the parameter
#: represents a proportion of documents, integer absolute counts.
#: This parameter is ignored if vocabulary is not None.
bow_min_df = models.FloatField(
"(BoW) Minimum Document Frequency",
default=1,
help_text=_((
'When building the vocabulary ignore terms that have a document '
'frequency strictly lower than the given threshold. This value is '
'also called cut-off in the literature. If float, the parameter '
'represents a proportion of documents, integer absolute counts. '
'This parameter is ignored if vocabulary is not None.'
))
)
# (skl) max_features : int or None, default=None
#: If not None, build a vocabulary that only consider the top
#: max_features ordered by term frequency across the corpus.
#: This parameter is ignored if vocabulary is not None.
bow_max_features = models.IntegerField(
"(BoW) Maximum Features",
default=None, blank=True, null=True,
help_text=_((
'If not None, build a vocabulary that only consider the top '
'max_features ordered by term frequency across the corpus. '
' This parameter is ignored if vocabulary is not None.'
))
)
# (skl) vocabulary : Mapping or iterable, optional
#: A Mapping (e.g., a dict) where keys are terms and values
#: are indices in the feature matrix.
#: If not given, a vocabulary is determined from the input
#: documents. Indices in the mapping should not be repeated and
#: should not have any gap between 0 and the largest index.
bow_vocabulary = models.TextField(
"(BoW) Vocabulary",
default=None, blank=True, null=True,
help_text=_((
'A Mapping (e.g., a dict) where keys are terms and values '
'are indices in the feature matrix. '
'If not given, a vocabulary is determined from the input '
'documents. Indices in the mapping should not be repeated and '
'should not have any gap between 0 and the largest index.'
))
)
# (skl) binary : boolean, default=False
#: If True, all non zero counts are set to 1. This is useful for
#: discrete probabilistic models that model binary events rather
#: than integer counts.
bow_binary = models.BooleanField(
"(BoW) Use Binary representation?",
default=False,
help_text=_((
'If True, all non zero counts are set to 1. This is useful for '
'discrete probabilistic models that model binary events rather '
'than integer counts.'
))
)
#: Use the TF-IDF transformation?
bow_use_tf_idf = models.BooleanField(
"(BoW) Use the TF-IDF transformation?",
default=True,
help_text=_((
'Use the TF-IDF transformation?'
))
)
class Meta:
verbose_name = "Spam Filter"
verbose_name_plural = "Spam Filters"
# app_label = "systems.spam_filtering"
def save(self, *args, **kwargs):
# Initialize metadata field if corresponds
if self.metadata == {}:
self.metadata["current_inference"] = {}
self.metadata["previous_inference"] = {}
super(SpamFilter, self).save(*args, **kwargs)
def __str__(self):
return("[Spam Filter] {}".format(self.name))
def clean(self):
if self.classifier:
# Check the validity of the Classifier
try:
app_model, object_name = self.classifier.split("|")
app, model = app_model.split(".")
except Exception:
raise ValidationError({'classifier': _(
'Invalid format'
)})
try:
model_class = ContentType.objects.get(
app_label=app,
model=model.lower()
).model_class()
except Exception:
raise ValidationError({'classifier': _(
'The App and Model must be a valid Django App and Model'
)})
try:
model_class.objects.get(name=object_name)
except Exception:
raise ValidationError({'classifier': _(
'Cannot get the object "{}" from the '
'{} model'.format(
object_name, model_class._meta.verbose_name)
)})
if self.pretraining:
# Check the validity of the Pretraining field
try:
app, model = self.pretraining.split(".")
except Exception:
raise ValidationError({'pretraining': _(
'Invalid format'
)})
try:
model_class = ContentType.objects.get(
app_label=app,
model=model.lower()
).model_class()
except Exception:
raise ValidationError({'classifier': _(
'The App and Model must be a valid Django App and Model'
)})
if self.spam_model_is_enabled:
# Check the validity of the Spammable Model field
try:
app, model = self.spam_model_model.split(".")
except Exception:
raise ValidationError({'spam_model_model': _(
'Invalid format'
)})
try:
model_class = ContentType.objects.get(
app_label=app,
model=model.lower()
).model_class()
except Exception:
raise ValidationError({'spam_model_model': _(
'The App and Model must be a valid Django App and Model'
)})
super(SpamFilter, self).clean()
def get_pretraining_data(self):
if self.pretraining:
model = get_model(self.pretraining)
pt_data = model.objects.values_list(model.SPAMMABLE_FIELD,
flat=True)
return(list(pt_data))
else:
return(None)
def get_pretraining_labels(self):
if self.pretraining:
model = get_model(self.pretraining)
pt_labels = model.objects.values_list(model.SPAM_LABEL_FIELD,
flat=True)
return(list(pt_labels))
else:
return(None)
def get_data(self, utf8_point_repr=False):
if self.spam_model_is_enabled:
model = get_model(self.spam_model_model)
data = list(model.objects.values_list(model.SPAMMABLE_FIELD,
flat=True))
else:
data = super(SpamFilter, self).get_data()
# Flatten list
data = list(chain.from_iterable(data))
if self.pretraining:
data += self.get_pretraining_data()
if utf8_point_repr:
max_length = max([len(text) for text in data])
data = [[ord(character) for character in text.ljust(max_length)]
for text in data]
return(data)
def get_labels(self):
if self.spam_model_is_enabled:
model = get_model(self.spam_model_model)
labels = list(model.objects.values_list(model.SPAM_LABEL_FIELD,
flat=True))
else:
labels = super(SpamFilter, self).get_labels()
if self.pretraining:
labels = list(labels)
labels += self.get_pretraining_labels()
return(labels)
def get_classifier(self):
app_model, object_name = self.classifier.split("|")
app, model = app_model.split(".")
model_class = ContentType.objects.get(
app_label=app,
model=model.lower()
).model_class()
return(model_class.objects.get(name=object_name))
[docs] def get_engine_object_vectorizer(self, reconstruct=False, save=True):
"""
Retrieves / Initializes the Engine's Vectorizer and transforms the
data making it available in the `self.engine_object_data` field.
"""
if self.engine_object_vectorizer is not None and not reconstruct:
return(self.engine_object_vectorizer)
else:
if self.bow_use_tf_idf:
BoW_Vectorizer = TfidfVectorizer
else:
BoW_Vectorizer = CountVectorizer
bow_vectorizer_args = {
'encoding': self.bow_enconding,
'decode_error': self.bow_decode_error,
'strip_accents': self.bow_strip_accents,
'ngram_range': (self.bow_ngram_range_min,
self.bow_ngram_range_max),
'stop_words': self.bow_stop_words,
'max_df': self.bow_max_df,
'min_df': self.bow_min_df,
'max_features': self.bow_max_features,
'vocabulary': self.bow_vocabulary,
'binary': self.bow_binary,
'lowercase': False,
}
if not self.bow_vocabulary:
del(bow_vectorizer_args['vocabulary'])
if not self.bow_strip_accents:
del(bow_vectorizer_args['strip_accents'])
if not self.bow_stop_words:
del(bow_vectorizer_args['stop_words'])
if self.bow_min_df == 1.0:
# Workaround for defaulting to int as needed for scikit-learn
bow_vectorizer_args['min_df'] = 1
bow_vectorizer = BoW_Vectorizer(**bow_vectorizer_args)
data = self.get_data()
# Save the BoW representation of the data
self.engine_object_data = bow_vectorizer.fit_transform(data)
self.engine_object_vectorizer = bow_vectorizer
if save:
self.save()
return(self.engine_object_vectorizer)
[docs] def get_engine_object_data(self, reconstruct=False, save=True):
"""
Retrieves / Reconstructs the BoW representation of the data.
"""
if self.engine_object_data is not None and not reconstruct:
return(self.engine_object_data)
else:
self.get_engine_object_vectorizer(reconstruct=True, save=save)
return(self.engine_object_data)
def get_engine_object(self, reconstruct=False, save=True):
if self.engine_object is not None and not reconstruct:
return(self.engine_object)
# Initialize BoW Vectorizer engine object if necessary
if self.bow_is_enabled:
self.get_engine_object_vectorizer(reconstruct=reconstruct,
save=True)
classifier = self.get_classifier().get_engine_object()
self.engine_object = classifier
if save:
self.save()
return(self.engine_object)
def perform_inference(self, recalculate=False, save=True):
if not self.is_inferred or recalculate:
# No need for running the inference 'engine_meta_iterations' times
eo = self.get_engine_object(reconstruct=True)
# -> Get the data
if self.bow_is_enabled:
data = self.get_engine_object_data(
reconstruct=recalculate, save=save
)
else:
# Use the UTF-8 code point representation
data = self.get_data(utf8_point_repr=True)
# -> Get the labels
labels = self.get_labels()
# -> Remove Nones if any
data, labels = self.remove_nones_from_input(data, labels)
# -> Run the algorithm and store the updated engine object
self.engine_object = eo.fit(data, labels)
# -> Rotate metadata
self.rotate_metadata()
# -> Perform Cross Validation
if self.cv_is_enabled:
self.perform_cross_validation(data=data, labels=labels,
update_metadata=True)
# -> Update other metadata
self.metadata["current_inference"]["bow_is_enabled"] = \
self.bow_is_enabled
self.metadata["current_inference"]["input_dimensionality"] = \
np.shape(data)
self.metadata["current_inference"]["vectorizer_conf"] = \
self.get_vect_conf_dict()
self.metadata["current_inference"]["classifier_conf"] = \
self.get_classifier().get_conf_dict()
# -> Set as inferred
self.is_inferred = True
if save:
self.engine_object_timestamp = timezone.now()
self.save()
return(self.engine_object)
[docs] def predict(self, texts):
"""
Classifies a list of observations
"""
if self.is_inferred:
if self.bow_is_enabled:
transformed_text = \
self.get_engine_object_vectorizer().transform(texts)
else:
max_length = max([len(t) for t in self.get_data()])
transformed_text = \
[[ord(character) for character in text.ljust(max_length)]
for text in texts][:max_length]
classifier = self.get_engine_object()
return(classifier.predict(transformed_text))
else:
return(None)
def perform_cross_validation(self, data=None, labels=None,
update_metadata=False):
if data is None:
if self.bow_is_enabled:
data = self.get_engine_object_data()
else:
data = self.get_data(utf8_point_repr=True)
if labels is None:
labels = self.get_labels()
data, labels = self.remove_nones_from_input(data, labels)
classifier = self.get_engine_object()
scores = cross_val_score(
classifier, data, labels,
cv=self.cv_folds, scoring=self.cv_metric
)
if update_metadata:
self.metadata["current_inference"]['cv'] = {}
self.metadata["current_inference"]['cv']['conf'] = {
"folds": self.cv_folds,
"metric": self.get_cv_metric_display()
}
self.metadata["current_inference"]['cv']['scores'] = scores
self.metadata["current_inference"]['cv']['mean'] = scores.mean()
self.metadata["current_inference"]['cv']['2std'] = 2 * scores.std()
return(scores)
def remove_nones_from_input(self, data, labels):
# -> Remove data with missing labels if any
none_indices = [i for i, label in enumerate(labels)
if label is None]
if none_indices:
if isinstance(data, csr_matrix):
mask = np.ones(data.shape[0], dtype=bool)
mask[none_indices] = False
data = data[mask]
else:
data = np.delete(data, none_indices, 0)
labels = np.delete(labels, none_indices, 0).astype(bool)
return(data, labels)
def get_vect_conf_str(self):
"""
Vectorizer summary configuration string
"""
vcstr = ""
if self.bow_is_enabled:
vcstr += "BoW Representation: "
if self.bow_binary:
vcstr += "Binary"
else:
if self.bow_use_tf_idf:
vcstr += "(TF-IDF Transformation) "
vcstr += "Analyzer: "
vcstr += self.get_bow_analyzer_display()
vcstr += " ({}, {}) - ".format(self.bow_ngram_range_min,
self.bow_ngram_range_max)
vcstr += "Min / Max DF: "
vcstr += "{} / {}".format(self.bow_min_df,
self.bow_max_df)
else:
vcstr += "UTF-8 Representation (Vectorizer not enabled)"
return(vcstr)
def get_vect_conf_dict(self):
"""
Vectorizer summary configuration string
"""
vcdict = {}
vcdict['bow_is_enabled'] = self.bow_is_enabled
vcdict['bow_use_tf_idf'] = self.bow_use_tf_idf
vcdict['binary'] = self.bow_binary
vcdict['analyzer'] = self.get_bow_analyzer_display()
vcdict['ngram_range'] = "({}, {})".format(self.bow_ngram_range_min,
self.bow_ngram_range_max)
vcdict['df_min_max'] = "{} / {}".format(self.bow_min_df,
self.bow_max_df)
vcdict['str'] = self.get_vect_conf_str()
return(vcdict)
[docs]class IsSpammable(models.Model):
"""
This Abstract Model (AM) is meant to be used in Django models which may
recieve Spam.
Usage:
- Make your model inherit from this AM.
- Set the SPAM_FILTER constant to the name of the Spam Filter object
you would like to use
- Set the SPAMMABLE_FIELD to the name of the field which stores the
content.
- Example::
class CommentsOfMySite(IsSpammable):
SPAM_FILTER = "Comment Spam Filter"
SPAMMABLE_FIELD = "comment"
... # The rest of your code
"""
#: Name of the field which stores the Spammable Content
SPAMMABLE_FIELD = None
#: Name of the field which stores the Spam labels
SPAM_LABEL_FIELD = "is_spam"
#: Name of the Spam Filter object to be used
SPAM_FILTER = None
#: If the object is Spam - Label of the Object
is_spam = models.NullBooleanField(
_("Is Spam?"),
help_text=_((
'If the object is Spam'
))
)
#: If the object has been misclassified by the Spam Filter -
#: useful for some algorithms and for understanding the filter
is_misclassified = models.BooleanField(
_("Is Misclassified?"),
default=False,
help_text=_((
'If the object has been misclassified by the Spam Filter'
))
)
#: If the object classification has been revised by a Human -
#: Need for proper training and automation
is_revised = models.BooleanField(
_("Is Revised?"),
default=False,
help_text=_((
'If the object classification has been revised by a Human'
))
)
class Meta:
abstract = True
def save(self, *args, **kwargs):
try:
spam_filter = SpamFilter.objects.get(name=self.SPAM_FILTER)
except Exception:
raise ImproperlyConfigured(_(
"SPAMMABLE MODEL: "
"The SPAM_FILTER const reffers to a non-existant object")
)
try:
spammable_field = getattr(self, self.SPAMMABLE_FIELD)
except Exception:
raise ImproperlyConfigured(_(
"SPAMMABLE MODEL: "
"The SPAMMABLE_FIELD const refers to a non-existant field")
)
if spam_filter.is_inferred:
self.is_spam = spam_filter.predict([spammable_field])
super(IsSpammable, self).save(*args, **kwargs)
[docs]class SpamFilterPreTraining(models.Model):
"""
Abstract Model for pre-training Spam Filters.
Subclass this Model for incorporating datasets into the training of
a Spam Filter (the subclass must be set in the Spam Filter's
``pretraining`` field).
"""
#: Name of the field which stores the Spammable Content
SPAMMABLE_FIELD = "content"
#: Name of the field which stores the Spam labels
SPAM_LABEL_FIELD = "is_spam"
#: Content
content = models.TextField(
_("Content")
)
#: Spam label
is_spam = models.BooleanField(
_("Is Spam?"),
default=False
)
class Meta:
abstract = True
verbose_name = "Spam Filter Pre-Training"
verbose_name_plural = "Spam Filter Pre-Trainings"
def __str__(self):
is_spam = "SPAM" if self.is_spam else "HAM"
return("[{}] {}...".format(is_spam, self.content[:20]))