# -*- coding: utf-8 -*-
import os
from importlib import import_module
from collections import Counter
from pickle import HIGHEST_PROTOCOL as pickle_HIGHEST_PROTOCOL
import matplotlib
matplotlib.use('Agg')
from graphviz import Digraph
import numpy as np
import bayespy as bp
from django.db import models
from django.core.files.base import ContentFile
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
from django.db.models.signals import post_save
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
from picklefield.fields import PickledObjectField
from jsonfield import JSONField
from .utils import (parse_node_args, )
if 'DJANGO_TEST' in os.environ:
from django_ai.base.models import StatisticalModel
from django_ai.bayesian_networks import bayespy_constants as bp_consts
else: # pragma: no cover
from base.models import StatisticalModel
from bayesian_networks import bayespy_constants as bp_consts
[docs]class BayesianNetwork(StatisticalModel):
"""
Main object of a Bayesian Network.
It gathers all Nodes and Edges of the DAG that defines the Network and
provides an interface for performing and resetting the inference and
related objects.
See base.StatisticalTechnique for the fields and methods already included.
"""
_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
_eo_meta_iterations = {}
BN_TYPE_GENERAL = 0
BN_TYPE_CLUSTERING = 1
# Legacy support of constants
TYPE_CLUSTERING = BN_TYPE_CLUSTERING
TYPE_GENERAL = BN_TYPE_GENERAL
NETWORK_TYPE_CHOICES = (
(BN_TYPE_GENERAL, "General"),
(BN_TYPE_CLUSTERING, "Clustering"),
)
ACTIONS_KEYWORDS = [":recalculate", ]
network_type = models.SmallIntegerField(choices=NETWORK_TYPE_CHOICES,
default=BN_TYPE_GENERAL,
blank=True, null=True)
image = models.ImageField("Image", blank=True, null=True)
class Meta:
verbose_name = "Bayesian Network"
verbose_name_plural = "Bayesian Networks"
def __str__(self):
return("[BN: {0}]".format(self.name))
def save(self, *args, **kwargs):
# Initialize metadata field if corresponds
if self.metadata == {}:
if self.network_type == self.BN_TYPE_CLUSTERING:
self.metadata["clusters_labels"] = {}
self.metadata["prev_clusters_labels"] = {}
self.metadata["clusters_means"] = {}
self.metadata["prev_clusters_means"] = {}
self.metadata["clusters_sizes"] = {}
self.metadata["prev_clusters_sizes"] = {}
self.metadata["columns"] = []
# Defaults the engine iterations if corresponds
if not self.engine_iterations:
self.engine_iterations = 1000
# Runs threshold actions if corresponds
self.parse_and_run_threshold_actions()
super(BayesianNetwork, self).save(*args, **kwargs)
[docs] def parse_and_run_threshold_actions(self):
if self.counter_threshold:
if self.counter_threshold <= self.counter:
self.counter = 0
actions = self.threshold_actions.split(" ")
for action in actions:
if action == ":recalculate":
self.perform_inference(recalculate=True)
return(True)
else:
return(False)
def get_graph(self):
dot = Digraph(comment=self.name)
nodes = self.nodes.all()
for node in nodes:
dot.node(name=node.name, label=node.name)
edges = self.edges.all()
for edge in edges:
# Pending: debug constraint param
dot.edge(str(edge.parent.name),
str(edge.child.name))
return(dot)
def get_raw_network_image(self):
dot = self.get_graph()
dot.format = "png"
return ContentFile(dot.pipe())
def update_image(self):
if self.image:
self.image.delete()
image_name = "{0}/{1}".format(
os.path.join("django_ai",
"bayesian_networks"),
self.name + ".png")
self.image.save(image_name,
self.get_raw_network_image())
def get_nodes_names(self):
return(self.nodes.all().values_list("name", flat=True))
[docs] def get_engine_object(self, reconstruct=False, propagate=True, save=True):
"""
Constructs the Engine Objects of all Nodes in the Network for
initializing the Inference Engine of the Network.
This is the method that should be called for initializing the EOs of
the Nodes, as it handle the dependencies correctly and then propagate
them to the Nodes' objects.
CAVEAT: You might have to call 'node.refresh_from_db()' if for some
reason the Nodes are already retrieved before this method is ran.
"""
if self.engine_object and not reconstruct:
return(self.engine_object)
nodes = self.nodes.all()
# Initialize the "EOs Struct" - a structure needed for keeping the
# same Python Objects in order to correctly initialize the BP's
# Inference Engine.
# {node_name: {django_model:, engine_object:}}
eos_struct = {n.name: {"dm": n, "eo": None}
for n in nodes}
# Root nodes have no problem getting their engine object and serve
# as the recursion base step
root_nodes = {n.name:
{"dm": n,
"eo": n.get_engine_object(reconstruct=reconstruct)
}
for n in nodes if not n.parents()}
# Update with the root nodes
eos_struct.update(root_nodes)
# Child nodes are "intermediate" nodes and "final" ones ("leafs" in
# trees). These are the ones that need the EOs Struct for recursion
child_nodes = [n for n in nodes if n.parents()]
for cn in child_nodes:
eos_struct = BayesianNetwork.update_eos_struct(eos_struct, cn)
# Collect the EOs
nodes_eos = []
for node in eos_struct:
nodes_eos.append(eos_struct[node]["eo"])
# Initialize the BP's Inference Engine
self.engine_object = bp.inference.VB(*nodes_eos)
# Propagate to the network
if propagate:
for node in eos_struct:
eos_struct[node]["dm"].engine_object = self.engine_object[node]
# Save the BN and its nodes
if save:
self.engine_object_timestamp = timezone.now()
self.save()
if propagate:
for node in eos_struct:
eos_struct[node]["dm"].save()
return(self.engine_object)
def reset_engine_object(self, save=True):
"""
Resets the Engine Object and timestamp
"""
self.engine_object = None
self.engine_object_timestamp = None
self.metadata = {}
if save:
self.save()
return(True)
[docs] def reset_inference(self, save=True):
"""
Resets the Engine Object and timestamp from the Network
(the Network object itself and all the Nodes objects in it)
"""
self.reset_engine_object(save=save)
for node in self.nodes.all():
node.reset_inference(save=save)
if self.network_type == self.BN_TYPE_CLUSTERING:
self.store_results(reset=True)
return(True)
def assign_cluster(self, observation):
"""
If the Network is for Clustering, returns the cluster label for a
new observation given the current state of the inference.
Assumptions:
- The network has a topology of a Gaussian Mixture Model with
one Categorical Node for cluster assigments with prior Dirichlet
probabilities
"""
if self.network_type == self.BN_TYPE_CLUSTERING:
if not self.is_inferred:
return(False)
# if not self._clusters_labels:
# self.assign_clusters_labels()
eo = self.engine_object
prior_clusters_probs = self.nodes.get(
distribution=bp_consts.DIST_DIRICHLET).engine_inferred_object
clusters_means = self.nodes.get(
distribution=bp_consts.DIST_GAUSSIAN).engine_inferred_object
clusters_cov_matrices = self.nodes.get(
distribution=bp_consts.DIST_WISHART).engine_inferred_object
#
Z_new = bp.nodes.Categorical(prior_clusters_probs)
Z_new.initialize_from_random()
Y_new = bp.nodes.Mixture(Z_new, bp.nodes.Gaussian,
clusters_means, clusters_cov_matrices)
Y_new.observe(observation)
Q_0 = bp.inference.VB(Z_new, Y_new, *eo.model)
Q_0.update(Z_new)
cluster_label = self.metadata["clusters_labels"][
str(np.argmax(Z_new.get_moments()[0]))]
return(cluster_label)
else:
return(False)
def assign_clusters_labels(self, save=True):
"""
Filters the clusters and constructs a dict for labelling
Assumptions:
- The network as a topology of a Gaussian Mixture Model
"""
clusters_means_node_eo = self.nodes.get(
distribution=bp_consts.DIST_GAUSSIAN).engine_inferred_object
clusters_means = clusters_means_node_eo.get_moments()[0]
# cmean_dim = len(clusters_means[0])
# origin = np.zeros(cmean_dim)
filtered_means = list(np.unique(clusters_means, axis=0))
# Sort cluster means by norm
filtered_means.sort(key=np.linalg.norm)
if not self.metadata["clusters_labels"] == {}:
self.metadata["prev_clusters_labels"] = self.metadata[
"clusters_labels"]
self.metadata["clusters_labels"] = {}
if not self.metadata["clusters_means"] == {}:
self.metadata["prev_clusters_means"] = self.metadata[
"clusters_means"]
self.metadata["clusters_means"] = {}
for index, cm in enumerate(clusters_means):
fm_index = np.argwhere(filtered_means == cm)[0][0]
cluster_label = self._alphabet[fm_index]
self.metadata["clusters_labels"][str(index)] = cluster_label
self.metadata["clusters_means"][cluster_label] = cm
if save:
self.save()
def columns_names_to_metadata(self, save=True):
"""
Gets the columns names and stores them in metadata
Assumptions:
- The network as a topology of a Gaussian Mixture Model
"""
observations_node = self.nodes.get(
distribution=bp_consts.DIST_MIXTURE)
columns = observations_node.data_columns.order_by("position")
columns_names = columns.values_list("ref_column", flat=True)
self.metadata["columns"] = list(columns_names)
if save:
self.save()
def metadata_update_cluster_sizes(self, save=True):
results = self.get_results()
# Move cluster_sizes to prev_c_s if it already existed
if not self.metadata["clusters_sizes"] == {}:
self.metadata["prev_clusters_sizes"] = self.metadata[
"clusters_sizes"]
sizes = dict(Counter(results))
# Add empty clusters
for label in self.metadata["clusters_labels"].values():
if label not in sizes:
sizes[label] = 0
self.metadata["clusters_sizes"] = sizes
if save:
self.save()
def get_results(self):
"""
Returns a list with the clustering assignation for each observations.
Assumptions:
- The network as a topology of a Gaussian Mixture Model
"""
if self.network_type == self.BN_TYPE_CLUSTERING:
if not self.is_inferred:
return(False)
clusters_assig_eo = self.nodes.get(
distribution=bp_consts.DIST_CATEGORICAL).engine_inferred_object
assignations = []
for assig in clusters_assig_eo.get_moments()[0]:
label = self.metadata["clusters_labels"][str(np.argmax(assig))]
assignations.append(label)
return(assignations)
else:
return(None)
@staticmethod
def update_eos_struct(eos_struct, node):
"""
Auxiliary recursive function to "populate" a "branch" (all the
ancestors of the node) of the "EOs Struct" of the Network DAG.
By how BayesPy is implemented, this is needed to hold the same objects
in order to correctly initialize the BP inference engine ("simple
recursion" via Django models does not work, different "branches" may
point to different instances of objects, as they are initialized
independently)
"""
parents = [p.name for p in node.parents()]
# Check if the parents' eos are already present, otherwise initialize
# them recursively
for parent in parents:
if not eos_struct[parent]["eo"]:
BayesianNetwork.update_eos_struct(
eos_struct, eos_struct[parent]["dm"])
# Initialize the Node EO and store it in the matrix
eos_struct[node.name]["eo"] = \
node.get_engine_object(parents=eos_struct, reconstruct=True)
return(eos_struct)
@property
def is_inferred(self):
return(self.engine_object is not None)
class BayesianNetworkNodeColumn(models.Model):
"""
A dimension / axis / column of an Observable Bayesian Network Node.
"""
node = models.ForeignKey("BayesianNetworkNode",
on_delete=models.CASCADE,
related_name="data_columns")
ref_model = models.ForeignKey(ContentType, on_delete=models.CASCADE)
ref_column = models.CharField("Reference Column", max_length=100)
position = models.SmallIntegerField(blank=True, null=True)
class Meta:
verbose_name = "Bayesian Networks Node Column"
verbose_name_plural = "Bayesian Networks Node Columns"
unique_together = [("node", "ref_model", "ref_column"),
("node", "position")]
def __str__(self):
return("{0} | {1} - {2}".format(self.node, self.ref_model,
self.ref_column))
def clean(self):
# Check the validity of the Reference Column
try:
mc = self.ref_model.model_class()
except Exception:
raise ValidationError({'ref_model': _(
'The Reference Model must be a valid Django Model'
)})
try:
getattr(mc, self.ref_column)
except Exception:
raise ValidationError({'ref_column': _(
'The column must be a valid attribute of '
'the ' + self.ref_model.name + ' model'
)})
class BayesianNetworkNode(models.Model):
"""
A Node in a Bayesian Network
"""
_engine_object = None # Deprecated?
_engine_inferred_object = None
_data = None
NODE_TYPE_STOCHASTIC = 0
NODE_TYPE_DETERMINISTIC = 1
NODE_TYPE_CHOICES = (
(NODE_TYPE_STOCHASTIC, "Stochastic"),
(NODE_TYPE_DETERMINISTIC, "Deterministic"),
)
FIELDS_STOCHASTIC = [
"is_observable", "distribution", "distribution_params",
"graph_interval"
]
FIELDS_DETERMINISTIC = [
"deterministic", "deterministic_params"
]
network = models.ForeignKey(BayesianNetwork, related_name="nodes",
on_delete=models.CASCADE)
name = models.CharField("Node Name", max_length=50)
node_type = models.SmallIntegerField("Type", choices=NODE_TYPE_CHOICES,
default=NODE_TYPE_STOCHASTIC)
is_observable = models.BooleanField("Is Observable?", default=False)
distribution = models.CharField("Distribution", max_length=50,
choices=bp_consts.DISTRIBUTION_CHOICES,
blank=True, null=True)
distribution_params = models.CharField("Distribution Parameters",
max_length=200,
blank=True, null=True)
deterministic = models.CharField("Deterministic", max_length=50,
choices=bp_consts.DETERMINISTIC_CHOICES,
blank=True, null=True)
deterministic_params = models.CharField("Deterministic Parameters",
max_length=200,
blank=True, null=True)
engine_object = PickledObjectField(protocol=pickle_HIGHEST_PROTOCOL,
blank=True, null=True)
engine_object_timestamp = models.DateTimeField(blank=True, null=True)
engine_inferred_object = PickledObjectField(blank=True, null=True)
engine_inferred_object_timestamp = models.DateTimeField(
blank=True, null=True)
graph_interval = models.CharField("Graph Interval", max_length=20,
blank=True, null=True)
image = models.ImageField("Image", blank=True, null=True)
def parents(self):
"""
Returns the Parents of the Node as a QuerySet
"""
parents_ids = self.child_edges.values_list("parent", flat=True)
return(BayesianNetworkNode.objects.filter(id__in=parents_ids))
def children(self):
"""
Returns the Children of the Node as a QuerySet
"""
children_ids = self.parent_edges.values_list("child", flat=True)
return(BayesianNetworkNode.objects.filter(id__in=children_ids))
def clean(self):
error_dict = {}
# FIRST STEP: Check there are no fields that don't correspond to
# the Node type
if self.node_type == self.NODE_TYPE_STOCHASTIC:
for field in self.FIELDS_DETERMINISTIC:
if getattr(self, field) is not None:
error_dict[field] = _(
'Using Deterministic fields on a Stochastic '
'Node is not allowed')
else:
for field in self.FIELDS_STOCHASTIC:
field_value = getattr(self, field)
# For avoiding the use of NullBooleanField in 'is_observable'
if field_value is False:
field_value = None
if field_value is not None:
error_dict[field] = _(
'Using Stochastic fields on a Deterministic '
'Node is not allowed')
if error_dict:
raise ValidationError(error_dict)
# SECOND STEP: Check validity on Stochastic Nodes
if self.node_type == self.NODE_TYPE_STOCHASTIC:
# Validations on the Distribution of Stochastic Nodes
if self.distribution is None:
error_dict['distribution'] = _(
'A Stochastic Node must have a distribution')
if self.distribution_params is None:
error_dict['distribution_params'] = _(
'A Stochastic Node must have a distribution'
'parameters')
# Raise if any
if error_dict:
raise ValidationError(error_dict)
# THIRD STEP: Check Validity on Deterministic Nodes
else:
if self.deterministic is None:
error_dict['deterministic'] = _(
'A Deterministic Node must have a function')
if self.deterministic_params is None:
error_dict['deterministic_params'] = _(
'A Deterministic Node must have deterministic'
'parameters')
# Raise if any
if error_dict:
raise ValidationError(error_dict)
# FOURTH STEP: Check args parsing
try:
params = parse_node_args(self.get_params(), flat=True)
except Exception as e:
msg = e.args[0]
raise ValidationError({
self.get_params_type() + "_params": msg
})
# FINAL STEP: Check if the Engine Object (BayesPy) can be initialized
# Check only if the Node hasn't other Nodes as params (otherwise the
# networkd Edges should have been created already to resolve the names
# to Nodes)
if not any([isinstance(p, str) for p in params]):
try:
self.get_engine_object(reconstruct=True, save=False)
except Exception as e:
msg = e.args[0]
raise ValidationError({
"distribution_params": "[BayesPy] " + msg})
# Use StatisticalModel's get_data() instead of defining one of its own
get_data = StatisticalModel.get_data
def get_params(self):
"""
Returns the params according to the node type
"""
if self.node_type == self.NODE_TYPE_STOCHASTIC:
return(self.distribution_params)
else:
return(self.deterministic_params)
def get_params_type(self):
"""
Returns the node type as a string
"""
if self.node_type == self.NODE_TYPE_STOCHASTIC:
return("distribution")
else:
return("deterministic")
def resolve_eos_in_params(self, params=[], kwparams={}, parents={}):
"""
Resolve Nodes' EOs from names passed in params and kwparams
"""
error_str = _("Can't resolve name to node"
" - maybe Network Edges are missing?")
for index, param in enumerate(params):
if isinstance(param, str) and not param.startswith(":"):
if param in parents:
params[index] = parents[param]["eo"]
else:
# Strings can be only Node names
raise ValueError(error_str)
for kwparam in kwparams:
if (isinstance(kwparams[kwparam], str) and
not kwparams[kwparam].startswith(":")):
if kwparams[kwparam] in parents:
kwparams[kwparam] = parents[kwparam]["eo"]
else:
# Strings can be only Node names
raise ValueError(error_str)
return((params, kwparams))
def get_engine_object(self, parents={}, reconstruct=False, save=True):
"""
Method for initializing the Node's Engine Object (currently BayesPy
only).
This is meant to be called from the BayesianNetwork object method -
BN.get_engine_object() - as it handles all the dependecies of the DAG,
passing the proper parents' objects and then propagating the results
to the Nodes.
Otherwise, if not a root node, you will have to provide the parents.
"""
if self.engine_object and not reconstruct:
return self.engine_object
# Parse Params
parsed_params = parse_node_args(self.get_params())
params, kwparams = self.resolve_eos_in_params(parsed_params['args'],
parsed_params['kwargs'],
parents)
# Remove Custom Keywords from params
custom_keywords = []
for p in params:
if isinstance(p, str) and p.startswith(":"):
params.remove(p)
custom_keywords.append(p)
kwparams['name'] = self.name
if self.node_type == self.NODE_TYPE_STOCHASTIC:
node_distribution = getattr(bp.nodes, self.distribution)
# Only assign if plates are not provided on observables
if self.is_observable:
self._data = self.get_data()
if 'plates' not in kwparams:
kwparams['plates'] = np.shape(self._data)
else:
if 'plates' in kwparams:
# Process Custom Keywords in plates
length = kwparams['plates'][0]
if isinstance(length, str) and length.startswith(":dl"):
node = self.network.nodes.get(name=length[4:])
length = node.get_data().shape[0]
if len(kwparams['plates']) == 2:
kwparams['plates'] = (length, kwparams['plates'][1])
else:
kwparams['plates'] = (length, )
if ":noplates" in custom_keywords:
del(kwparams['plates'])
# Initialize the BP Node
self.engine_object = node_distribution(*params, **kwparams)
# and if it is observable, load the data
if self.is_observable:
data = self.get_data()
self.engine_object.observe(data)
# Initializate from random if requested
if ":ifr" in custom_keywords:
self.engine_object.initialize_from_random()
else:
# Deterministic Type
node_deterministic_fun = getattr(bp.nodes, self.deterministic)
# Initialize the BP Node
self.engine_object = node_deterministic_fun(*params, **kwparams)
if save:
self.engine_object_timestamp = timezone.now()
self.save()
return(self.engine_object)
def get_engine_inferred_object(self, recalculate=False, save=True):
Q = self.network.perform_inference(recalculate=recalculate)
self.engine_inferred_object = Q[self.name]
if save:
self.engine_inferred_object_timestamp = timezone.now()
self.save()
return(self.engine_inferred_object)
def reset_engine_object(self, save=True):
self.engine_object = None
self.engine_object_timestamp = None
if save:
self.save()
return(True)
def reset_inference(self, save=True):
self.engine_inferred_object = None
self.engine_inferred_object_timestamp = None
self.image.delete()
if save:
self.save()
return(True)
def update_image(self):
if (not self.engine_inferred_object or
not self.node_type == self.NODE_TYPE_STOCHASTIC):
return False
if self.image:
self.image.delete()
image_name = "{0}/{1}".format(
os.path.join("django_ai",
"bayesian_networks"),
self.network.name + "_" + self.name + ".png")
save = False
if self.distribution == bp_consts.DIST_GAUSSIAN_ARD:
if not self.graph_interval:
return(False)
a, b = self.graph_interval.split(", ")
bp.plot.pdf(self.engine_inferred_object,
np.linspace(float(a), float(b), num=100),
name=self.name)
bp.plot.pyplot.savefig(settings.MEDIA_ROOT + '/' + image_name)
bp.plot.pyplot.close()
save = True
elif self.distribution == bp_consts.DIST_MIXTURE:
if self.data_columns.count() == 2:
bp.plot.gaussian_mixture_2d(
self.engine_inferred_object, scale=2)
bp.plot.pyplot.savefig(settings.MEDIA_ROOT + '/' + image_name)
bp.plot.pyplot.close()
save = True
if save:
self.image = image_name
self.save()
def __str__(self):
return(str(self.network) + " - " + self.name)
class Meta:
unique_together = ["network", "name", ]
class BayesianNetworkEdge(models.Model):
"""
An Edge between Bayesian Network Nodes
"""
parent = models.ForeignKey(BayesianNetworkNode,
related_name="parent_edges",
on_delete=models.CASCADE)
child = models.ForeignKey(BayesianNetworkNode,
related_name="child_edges",
on_delete=models.CASCADE)
network = models.ForeignKey(BayesianNetwork, related_name="edges",
on_delete=models.CASCADE)
description = models.CharField("Description", max_length=50)
def __str__(self):
return(self.description)
def save(self, *args, **kwargs):
self.network = self.parent.network
super(BayesianNetworkEdge, self).save(*args, **kwargs)
def update_bn_image(sender, **kwargs):
kwargs['instance'].network.update_image()
post_save.connect(update_bn_image, sender=BayesianNetworkNode)